我是我们在工作中集成团队中的解决方案架构师和数据工程师,我拥有API产品之一,A Data Lake GraphQl API 。在本文中,我想与您分享我从数据方面开始如何构建它的过程。
本文假设您熟悉诸如Data Lake Storage,Data Factory和Databricks之类的Azure服务,并且对GraphQl,Python/Pyspark和SQL具有一些一般知识。
。问题
数据湖旨在存储通常用于数据分析和报告的大量数据,但是我们工作中的一个用例是将数据湖变成 graphql api 的源。有点像A 反向ETL ,但应用程序(内部和SaaS)都会致电我们的API以获取所需的数据。
为什么?好吧,这是一个包含我们用户所需的所有内容的中央数据源,并且数据非常相关,非常适合GraphQl。
。但是,有一些问题,主要与数据:
- 我们无法控制数据湖,并且由于另一个群体拥有该数据。因此,我们采取他们可以给予的东西。
- 数据在湖中存储为镶木文件。我们需要将其移动到SQL数据库中,以便能够使用ORM并利用这种关系优势。
- 在我们的数据湖中,没有执行主要钥匙唯一性,这意味着有时会摄入数据而无需重复数据删除。
- 不能使用字段来表示数据是新的或更新的,例如 Updation_at 字段,因此没有简单的方法来获取Delta。
- 我们的数据湖使用Star模式模型,一些事实表可以得到真的很大(〜7,000万行),因此每次从头开始摄入将近8个小时。
设计
我们的API高级体系结构看起来像这样:
在本系列的第1部分中,我将首先讨论前三个阶段并解决数据工程方面。
让我们浏览上面的每个问题,看看我们能做什么。
- 数据质量控制 - 我们可以创建自己的管道,以更多地控制我们从湖中获得的数据。
- parquet to SQL Transformation - 我们可以在Databricks中做到这一点。
- 重复数据删除 - 我们也可以在Databricks中执行此操作。
- 更改数据捕获 - 由于缺乏从我们的来源的任何版本化,更新时间戳或日志,我需要实现自己的CDC系统。对我们有用的是创建所有列的哈希,并将其存储为单独的列。基本上是校验和列。如果单列的值更改,则哈希也会更改。
- 大数据卷 - 我需要创建一个Delta Load Pipeline,该管道自上次加载以来只能检测和提取新的和更新的数据,以最大程度地减少写入我们数据库的数据量。该管道应包括一个识别数据变化(插入,更新和删除)的机制,并在我们的SQL数据库中反映这一点。
让上述管道仅分为三个具体步骤,这是测试和开发几个月的结果:
- 卫生
- deduplication
- Delta Loading
卫生
我们需要对源表进行消毒和修改,以确保数据质量在将其摄入SQL数据库之前。这包括过滤记录,填充无效值并删除数据质量问题。
以下是我们的数据串笔记本中的代码段(在Apache Spark上运行):
源表是从ADLS2读取的,并作为数据框架返回。
# Read the source table
dfSource = (spark.read
.format('parquet')
.option("mode", "FAILFAST")
.load(adls2sourcepath))
数据框取决于它是什么表。
# do source table modifications
if pTableName == 'fact_table_1':
dfSource = dfSource.filter(f"fiscal_year_num >= '{datetime.datetime.now().year - 1}'") # filter records to current year-1
dfSource = dfSource.fillna(value='',subset=["tdcval_code"]) #tdcval_code is part of the primary key and cannot be null
elif pTableName == 'fact_table_2':
dfSource = dfSource.filter("contract_id is not null") # remove empty Contract id's that were data issues
dfSource = dfSource.fillna(value=0,subset=["allocation_id"]) #allocation_id is part of the primary key and cannot be null
elif pTableName == 'fact_table_3':
dfSource = dfSource.filter(f"doc_post_date >= '{datetime.datetime.now().year - 1}'") # filter records to current year-1
elif pTableName == 'fact_table_4':
dfSource = dfSource.filter(f"partition_key > '201712'") # filter for data quality issues. If you remove that filter data before that time is not of good quality.
重复数据删除
要重复说明,我们需要知道主要密钥。为了动态执行此操作,我在SQL Server中创建了一个函数,该函数输出表定义。
这将返回每个列的名称,类型,长度,精度,比例,如果是无效的或主键。
CREATE FUNCTION [config].[func_get_table_definition]
(
@SchemaName [NVARCHAR](128),@TableName [NVARCHAR](225)
)
RETURNS TABLE
AS
RETURN
(
SELECT
c.name 'column_name',
t.Name 'data_type',
c.max_length/2 'length',
c.precision ,
c.scale ,
c.is_nullable,
ISNULL(i.is_primary_key, 0) 'is_pk'
FROM
sys.columns c
INNER JOIN
sys.types t ON c.user_type_id = t.user_type_id
-- OUTER APPLY selects a single row that matches each row from the left table.
OUTER APPLY
(
SELECT TOP 1 *
FROM sys.index_columns M
WHERE M.object_id = c.object_id AND M.column_id = c.column_id
) m
LEFT OUTER JOIN
sys.indexes i ON m.object_id = i.object_id AND m.index_id = i.index_id
WHERE
c.object_id = OBJECT_ID(@SchemaName + '.' + @TableName)
)
workflow_status_dim 表的示例输出,我将在其余示例中使用它:
column_name | data_type | 长度 | 精度 | 比例 | is_nullable | is_pk |
---|---|---|---|---|---|---|
workflow_status | nvarchar | 10 | 0 | 0 | false | true |
workflow_status_description | nvarchar | 35 | 0 | 0 | true | false |
要在Databricks中运行该函数,我们只是通过SELECT
语句像其他任何表一样读取它:
# Read target table definition
dfTargetDef = spark.read \
.format("jdbc") \
.option("url", jdbcUrl) \
.option("query", "select * from config.func_get_table_definition('{0}', '{1}')".format(pSchemaName, pTableName)) \
.option("user", dbUser) \
.option("password", dbPass).load()
一旦我们拥有主要密钥,我们就可以使用Pyspark的dropDuplicates
方法重复说明:
# dedup source data according to primary key columns
pkColumns = []
pkColumnsDf = dfTargetDef.filter(dfTargetDef.is_pk == 'true')
for row in pkColumnsDf.collect():
pkColumns.append(f"{row.column_name}")
if('char' in row.data_type): dfSource = dfSource.withColumn(row.column_name, trim(col(row.column_name))) # trim string primary columns, remove leading and trailing spaces. Spaces cause issues in merging/primary keys
display(dfSource.groupBy(pkColumns).agg(count(
'*').alias("count_duplicates")).filter(
col('count_duplicates') >= 2)) # display the duplicate records
dfSourceCount = dfSource.count()
dfSource = dfSource.dropDuplicates(pkColumns)
dfSourceDedupCount = dfSource.count()
dupCount = dfSourceCount - dfSourceDedupCount
print(dupCount)
# Create a temp view of the source table
dfSource.createOrReplaceTempView("SourceTable")
除了重复数据删除外,我们还做了一个次要但重要的卫生步骤,它正在修剪char
型主键列。钥匙中的流氓空间在合并数据时可能会引起问题(不幸的是,这是基于经验)。
增量加载
生成校验和
增量加载过程的第一步是在源中创建校验和列。
下面的代码动态生成一个SQL语句,该语句在Databricks HMS(Hive Metastore)中创建 delta table 名为{pTableName}_src
。这包含源表列加上其他列,例如:
-
check_sum ,其中包含使用pyspark的内置
HASH
函数创建的所有列的哈希 - 日期列(修改和创建的日期)
- 标志列(is_modified and is_deleted)能够过滤出更改
# Create source table with check_sum hash
spark.sql("REFRESH TABLE SourceTable") # Invalidates the cached entries
spark.sql(f"DROP TABLE IF EXISTS {pTableName}_src")
entries = [f"CREATE TABLE {pTableName}_src AS SELECT"]
for row in dfTargetDef.collect():
entries.append(f"SourceTable.{row.column_name},")
entries.append("HASH (")
for idx,row in dfTargetDef.toPandas().iterrows():
entries.append(f"SourceTable.{row.column_name}{'' if idx == len(dfTargetDef.toPandas().index)-1 else ','}")
entries.append(") as check_sum,")
entries.append("current_timestamp() as created_date,\ncurrent_timestamp() as modified_date,\nfalse as is_modified,\nfalse as is_deleted")
entries.append("FROM SourceTable;")
sql_comm = "\n".join(str(x) for x in entries)
spark.sql(sql_comm)
这将生成以下类似的SQL语句:
CREATE TABLE workflow_status_dim_src AS SELECT
SourceTable.workflow_status,
SourceTable.workflow_status_description,
HASH (
SourceTable.workflow_status,
SourceTable.workflow_status_description
) as check_sum,
current_timestamp() as created_date,
current_timestamp() as modified_date,
false as is_modified,
false as is_deleted
FROM SourceTable;
合并数据
我们首先在SQL数据库中读取目标表(我们的工作表位于api
架构中)。我们将其存储在databricks中。
dfTarget = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", jdbcUrl) \
.option("dbtable", "api." + pTableName) \
.option("user", dbUser) \
.option("password", dbPass).load()
dfTarget.createOrReplaceTempView("TargetTable")
# Create the delta/managed tables. This initially loads the rows from the source and target tables.
spark.sql(f"CREATE TABLE IF NOT EXISTS {pTableName}_dlt USING delta AS SELECT * FROM TargetTable;")
接下来是将源与目标表合并。以下代码动态生成了MERGE
语句将{pTableName}_src
合并到{pTableName}_dlt
中。
# Merge source into target table
entries = [f"MERGE INTO {pTableName}_dlt tgt USING {pTableName}_src src ON ("]
# Get primary keys
df_pk = dfTargetDef.filter(dfTargetDef.is_pk == 'true').toPandas()
for idx,row in df_pk.iterrows():
entries.append(f"src.{row.column_name} = tgt.{row.column_name}{'' if idx == len(df_pk.index)-1 else ' AND'}")
entries.append(') WHEN MATCHED AND tgt.check_sum <> src.check_sum THEN UPDATE SET')
# https://www.geeksforgeeks.org/how-to-iterate-over-rows-and-columns-in-pyspark-dataframe/
for row in dfTargetDef.collect():
entries.append(f"tgt.{row.column_name} = src.{row.column_name},")
entries.append('tgt.check_sum = src.check_sum,\ntgt.modified_date = src.modified_date,\ntgt.is_modified = true,\ntgt.is_deleted = false')
entries.append('WHEN NOT MATCHED BY TARGET THEN INSERT (')
for row in dfTargetDef.collect():
entries.append(f"{row.column_name},")
entries.append('check_sum,\ncreated_date,\nmodified_date,\nis_modified,\nis_deleted')
entries.append(') VALUES (')
for row in dfTargetDef.collect():
entries.append(f"src.{row.column_name},")
entries.append('src.check_sum,\nsrc.created_date,\nsrc.modified_date,\ntrue,\nfalse')
entries.append(') WHEN NOT MATCHED BY SOURCE THEN UPDATE SET tgt.is_deleted = true;')
sql_comm = "\n".join(str(x) for x in entries)
spark.sql(sql_comm)
这将生成以下类似的SQL语句:
MERGE INTO workflow_status_dim_dlt tgt USING workflow_status_dim_src src ON (
src.workflow_status = tgt.workflow_status
) WHEN MATCHED AND tgt.check_sum <> src.check_sum THEN UPDATE SET
tgt.workflow_status = COALESCE(src.workflow_status, tgt.workflow_status),
tgt.workflow_status_description = COALESCE(src.workflow_status_description, tgt.workflow_status_description),
tgt.check_sum = src.check_sum,
tgt.modified_date = src.modified_date,
tgt.is_modified = true,
tgt.is_deleted = false
WHEN NOT MATCHED BY TARGET THEN INSERT (
workflow_status,
workflow_status_description,
check_sum,
created_date,
modified_date,
is_modified,
is_deleted
) VALUES (
src.workflow_status,
src.workflow_status_description,
src.check_sum,
src.created_date,
src.modified_date,
true,
false
) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET tgt.is_deleted = true;
- 如果主要键匹配和校验和不匹配,则意味着记录已更新。将
is_modified
设置为true。 - 当没有主键匹配时,这意味着它是一个新记录。插入并将
is_modified
设置为true。 - 当源中不存在主键时,它是删除的记录。将
is_deleted
设置为true。
写更改
一旦合并了三角洲表,我们将在另一个架构中将更改(具有true is_modified
或is_deleted
)编写为SQL数据库(我们使用delta
)。
我们将Microsoft的custom Spark connector用于SQL Server来利用BULK INSERT
功能并使我们的写入更快。
dfWrite = spark.sql(f"SELECT * FROM {pTableName}_dlt where is_modified=True OR is_deleted=True;")
dfWrite.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("overwrite") \
.option("url", jdbcUrl) \
.option("dbtable", "delta." + pTableName) \
.option("reliabilityLevel", "BEST_EFFORT") \
.option("batchsize", 100000) \
.option("tableLock", "true") \
.option("user", dbUser) \
.option("password", dbPass) \
.option("schemaCheckEnabled", "false") \
.save()
由于我们将更改写入另一个表,因此我们必须将其合并到api
架构中的主表中。这次,MERGE
语句在SQL数据库中的存储过程中运行,而不是在Databricks中。
由于列不时在我们的数据湖桌上更改,并且在湖中有多个表,因此我必须动态存储过程。为此,我通过databricks创建了该过程。
i直接攻入数据库连接对象,以便能够在SQL数据库中创建一个存储过程。
# Create stored procedure code to merge delta table to api table in database
driver_manager = spark._sc._gateway.jvm.java.sql.DriverManager
con = driver_manager.getConnection(jdbcUrl, dbUser, dbPass)
exec_statement = con.prepareCall(f"IF (OBJECT_ID('config.delta_load_{pTableName}', 'P') IS NOT NULL) DROP PROCEDURE [config].[delta_load_{pTableName}];")
exec_statement.execute()
entries = [f"CREATE PROCEDURE [config].[delta_load_{pTableName}] AS BEGIN"]
entries.append(f"MERGE INTO [api].[{pTableName}] AS tgt USING [delta].[{pTableName}] src ON (") # https://stackoverflow.com/questions/10724348/merge-violation-of-primary-key-constraint
# Get primary keys
df_pk = dfTargetDef.filter(dfTargetDef.is_pk == 'true').toPandas()
for idx,row in df_pk.iterrows():
entries.append(f"src.{row.column_name} = tgt.{row.column_name}{'' if idx == len(df_pk.index)-1 else ' AND'}")
entries.append(') WHEN MATCHED AND src.is_modified = 1 THEN UPDATE SET')
for row in dfTargetDef.collect():
entries.append(f"tgt.{row.column_name} = src.{row.column_name},")
entries.append('tgt.check_sum = src.check_sum,\ntgt.modified_date = src.modified_date')
entries.append('WHEN MATCHED AND src.is_deleted = 1 THEN DELETE WHEN NOT MATCHED THEN INSERT (')
for row in dfTargetDef.collect():
entries.append(f"{row.column_name},")
entries.append('check_sum,\ncreated_date,\nmodified_date')
entries.append(') VALUES (')
for row in dfTargetDef.collect():
entries.append(f"src.{row.column_name},")
entries.append('src.check_sum,\nsrc.created_date,\nsrc.modified_date')
entries.append(f'); DROP TABLE [delta].{pTableName}; END')
sql_proc = "\n".join(str(x) for x in entries)
# create stored procedure in database
exec_statement = con.prepareCall(sql_proc)
exec_statement.execute()
# Close connections
exec_statement.close()
con.close()
这将生成以下类似的SQL语句:
CREATE PROCEDURE [config].[delta_load_workflow_status_dim] AS BEGIN
MERGE INTO [api].[workflow_status_dim] tgt USING [delta].[workflow_status_dim] src ON (
src.workflow_status = tgt.workflow_status
) WHEN MATCHED AND src.is_modified = 1 THEN UPDATE SET
tgt.workflow_status = COALESCE(src.workflow_status, tgt.workflow_status),
tgt.workflow_status_description = COALESCE(src.workflow_status_description, tgt.workflow_status_description),
tgt.check_sum = src.check_sum,
tgt.modified_date = src.modified_date
WHEN MATCHED AND src.is_deleted = 1 THEN DELETE WHEN NOT MATCHED THEN INSERT (
workflow_status,
workflow_status_description,
check_sum,
created_date,
modified_date
) VALUES (
src.workflow_status,
src.workflow_status_description,
src.check_sum,
src.created_date,
src.modified_date
); DROP TABLE [delta].workflow_status_dim; END
- 如果主要键匹配和
is_modified
是正确的,则意味着记录已更新。 - 如果主要键匹配和
is_deleted
是正确的,则意味着该记录已被删除。 - 当没有主键匹配时,这意味着它是一个新的记录。
- 合并完成后,我们将桌子放在
delta
架构中。
编排
最后但并非最不重要的一点是整理整个过程。为此,我们使用数据工厂。
上述流是我们在数据湖中的每个表中使用的。
- 记录开始日期
- 运行Databricks笔记本以将更改加载到SQL DB
- 如果有更改,请运行存储过程以合并SQL DB中的更改
- 如果步骤2或3失败,请使用
RESTORE TABLE
在数据链球上回滚表,以确保其状态与SQL DB中的表相同。 - 最后,运行一个存储过程,以保存元数据以进行调试和分析目的(错误,添加/删除/更改的行数等)。我们将其保存在SQL DB的另一个表中。
结果
通过我们的三角洲负载实施,我们看到了 70%的降低在数据负载时间与从湖上从湖上加载的每张桌子从湖上加载到我们的SQL数据库(这是我们的SQL数据库(这是我们的非常非常第一次迭代)。
之前,我们的数据管道每天需要7-10个小时才能完成,但是现在只需1-2个小时。这在很大程度上取决于您的计算能力,但是在我们的情况下( standard_ds3_v2 2-8 Worker databricks群集和 gp_s_gen5_12 sql数据库)假设数据集中有10%或更少:
- 数万排中的一个表只需一分钟即可加载新数据。
- 一百万行的桌子大约需要10分钟或更短的时间。
- 一张约8000万行的桌子大约需要一个小时。
让我知道您是否有下面的评论,问题或建议,或者在hello@kenzojrc.com或我的社交中与我联系。如果这篇文章以任何方式帮助您,也很高兴听到它!
请继续关注下一个。