在本文中,我们将使用GCP Composer中的气流加载到雪花上时处理如何处理ZIP文件。它是典型的数据管道,但是如果您是初学者或从不完全处理ETL,仍然很难处理。
在进一步前进之前,我们假设您启用了以下服务:
- Google云平台帐户
- 雪花帐户
- 与雪花基本集成设置的GCP作曲家环境
- 代码编辑
- python已安装
案例研究
假设您正在为一个著名的组织工作,该组织最近将其数据平台从Bigquery转移到了雪花。现在,您的所有组织的数据都包含在雪花中,所有BI/DATAOPS都完全在雪花中发生。
一个良好的早晨,您被分配了一项任务来构建数据管道来进行归因分析。所有归因数据均以组织合作伙伴的zip文件的形式删除在GCS存储桶中。我知道您在想什么,您只需使用文件格式创建“ copy in in in in of to in compression = zip'的语句。但这不是真的,您不能直接以文件格式使用“ zip”。另外,您不能使用“放气”类型。
该怎么办?
您可以利用GCP的功能来协调和自动化数据加载。但是首先,您必须确保在雪花中创建了一个集成对象,向GCP存储桶。之后,您创建一个外部阶段,以定位GCS存储桶中的路径以进行直接加载。
一旦完成了上述事情,您现在可以为GCP Composer实施DAG脚本。 GCP Composer是一种托管的Apache气流服务,可以在Kubernetes顶部快速部署气流。
救援的气流
Apache气流是一项优雅的任务调度服务,允许以有效而有效的方式处理数据操作。它提供了一个直观的Web UI,允许用户管理任务工作流。您还可以创建并行工作流,而无需创建自己的自定义应用程序处理多处理,线程,并发。
在介绍中足够,让我们开始编码。
首先,您必须打开代码编辑器并创建一个Python文件。将其命名为DAG_Sflk_loader.py
。
在上述步骤之后,导入所有必要的软件包。
from datetime import datetime,timedelta,date
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.snowflake_operator import SnowflakeOperator
import pandas as pd
from google.cloud import storage
import zipfile
import io
要声明一个dag脚本,您必须从这样的气流包装中使用dag对象:
default_args = {
'owner': 'ORGANIZATION',
'start_date': datetime(2023, 2, 19),
'email': ['username@email.com'],
'email_on_failure': True,
'email_on_retry': False
}
dag = DAG('SFLK_ZIP_LOAD', description = 'This DAG is loads ZIP files to snowflake', max_active_runs=1, catchup=False,default_args=default_args)
在上面的代码段中,首先,我们定义要传递给dag对象的参数,例如“所有者”,“ start_date”,“ emair”,“ email_on_failure'等,我们在此之后,我们创建了一个使用DAG的数据管道的上下文管理器上的上下文管理器对象。
好吧,现在是时候开始定义Python和Snowflake的自定义任务了。为此,我们使用操作员。操作员是单独的任务单元,类似于Snowflake SQL,Python,Bash Command,Gsutil等任何类型的任务单元等。对于我们的讨论,我们只会使用Python和Snowflake操作员。
我们将以以下方式分发我们的数据管道:
TRUNCATE_TABLE_TASK --> UNZIP_FILES_IN_GCS_TASK --> LOAD_FILES_TO_TABLE_TASK
截断任务
在将数据加载到雪花之前,我们将首先截断表。这是一个增量负载,因此我们不会直接使用TRUNCATE TABLE <TABLE_NAME>
。如果存在,我们只会删除CURRENT_DATE
的数据。
TRUNC_QUERY = '''DELETE FROM <DATABASE_NAME.SCHEMA_NAME.TABLE_NAME> WHERE <DATE_FIELD> = CURRENT_DATE'''
trunc_task = SnowflakeOperator(
task_id='TRUNCATE_TASK',
sql=[TRUNC_QUERY],
snowflake_conn_id='<connection_id>',
database='<DATABASE_NAME>',
schema='<SCHEMA_NAME>',
warehouse = '<DATAWAREHOUSE_NAME>',
role = '<ROLE_NAME>',
dag=dag)
解压缩任务
对于GCS存储桶中的解压缩文件,我们将使用三个库
- zipfile <这里iDe she>
- Google-Cloud-Storage
在这里,我们将此任务定义为Python可召唤。 Python操作员使用此可可。
def unzip_file_in_gcs(**context):
#Define GCS Client parameters
bucket_name = '<BUCKET_NAME>'
file_name = '<FILE_NAME>.zip'
# Connect to the GCS bucket
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
# Download the zip file to memory
zip_file_content = blob.download_as_string()
# Unzip the file
zip_file = zipfile.ZipFile(io.BytesIO(zip_file_content))
zip_file.extractall(path='/home/airflow/gcs/data/temp/')
# Upload each file in the zip to the GCS bucket
with open('/home/airflow/gcs/data/temp/<FILE_NAME>.csv', 'rb') as f:
file_content = f.read()
new_blob = bucket.blob('<FILE_NAME>.csv')
new_blob.upload_from_string(file_content)
unzip_task = PythonOperator(
task_id="UNZIP_TASK",
python_callable=unzip_file_in_gcs,
provide_context=True,
dag=dag
)
加载任务
一旦完成拉链,现在您可以使用COPY INTO <TABLE_NAME>
语句将数据加载到雪花表中。
这是任务定义:
LOAD_QUERY = '''COPY INTO <DATABASE_NAME>.<SCHEMA_NAME>.<TABLE_NAME> FROM @<DATABASE_NAME>.<SCHEMA_NAME>.<STAGE_NAME>/<FILE_NAME>.csv
file_format = (format_name = <DATABASE_NAME>.<SCHEMA_NAME>.FF_CSV)'''
load_task = SnowflakeOperator(
task_id='LOAD_TASK',
sql=[LOAD_QUERY],
snowflake_conn_id='<connection_id>',
database='<DATABASE_NAME>',
schema='<SCHEMA_NAME>',
warehouse = '<DATAWAREHOUSE_NAME>',
role = '<ROLE_NAME>',
dag=dag)
提供任务流
最后,您必须将所有任务放在一个衬里中。最后使用此代码段:
with dag:
trunc_task >> unzip_task >> load_task
上传并运行DAG脚本
现在,将您刚刚创建的DAG脚本上传到附加到Composer Environment Bucket的GCS存储桶中。 Apache AirFlow WebUI将在几分钟后自动反映新的DAG,并将开始运行。
结论
在本文中,我们学会了如何使用Apache气流加载在GCS存储桶中存储的ZIP文件。我们还经历了GCS Composer中DAG的创建和部署。对于将来的讨论,我们将探索Apache气流中的其他集成方法。
直到那时,再见!!