使用DAG处理雪花的ZIP文件
#python #googlecloud #gcp #snowflake

在本文中,我们将使用GCP Composer中的气流加载到雪花上时处理如何处理ZIP文件。它是典型的数据管道,但是如果您是初学者或从不完全处理ETL,仍然很难处理。

在进一步前进之前,我们假设您启用了以下服务:

  • Google云平台帐户
  • 雪花帐户
  • 与雪花基本集成设置的GCP作曲家环境
  • 代码编辑
  • python已安装

案例研究
假设您正在为一个著名的组织工作,该组织最近将其数据平台从Bigquery转移到了雪花。现在,您的所有组织的数据都包含在雪花中,所有BI/DATAOPS都完全在雪花中发生。

一个良好的早晨,您被分配了一项任务来构建数据管道来进行归因分析。所有归因数据均以组织合作伙伴的zip文件的形式删除在GCS存储桶中。我知道您在想什么,您只需使用文件格式创建“ copy in in in in of to in compression = zip'的语句。但这不是真的,您不能直接以文件格式使用“ zip”。另外,您不能使用“放气”类型。

该怎么办?
您可以利用GCP的功能来协调和自动化数据加载。但是首先,您必须确保在雪花中创建了一个集成对象,向GCP存储桶。之后,您创建一个外部阶段,以定位GCS存储桶中的路径以进行直接加载。

一旦完成了上述事情,您现在可以为GCP Composer实施DAG脚本。 GCP Composer是一种托管的Apache气流服务,可以在Kubernetes顶部快速部署气流。

救援的气流
Apache气流是一项优雅的任务调度服务,允许以有效而有效的方式处理数据操作。它提供了一个直观的Web UI,允许用户管理任务工作流。您还可以创建并行工作流,而无需创建自己的自定义应用程序处理多处理,线程,并发。

在介绍中足够,让我们开始编码。

首先,您必须打开代码编辑器并创建一个Python文件。将其命名为DAG_Sflk_loader.py

在上述步骤之后,导入所有必要的软件包。

from datetime import datetime,timedelta,date
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.snowflake_operator import SnowflakeOperator
import pandas as pd
from google.cloud import storage
import zipfile
import io

要声明一个dag脚本,您必须从这样的气流包装中使用dag对象:

default_args = {
    'owner': 'ORGANIZATION',
    'start_date': datetime(2023, 2, 19),
    'email': ['username@email.com'],
    'email_on_failure': True,
    'email_on_retry': False
}

dag = DAG('SFLK_ZIP_LOAD', description = 'This DAG is loads ZIP files to snowflake', max_active_runs=1, catchup=False,default_args=default_args)

在上面的代码段中,首先,我们定义要传递给dag对象的参数,例如“所有者”,“ start_date”,“ emair”,“ email_on_failure'等,我们在此之后,我们创建了一个使用DAG的数据管道的上下文管理器上的上下文管理器对象。

好吧,现在是时候开始定义Python和Snowflake的自定义任务了。为此,我们使用操作员。操作员是单独的任务单元,类似于Snowflake SQL,Python,Bash Command,Gsutil等任何类型的任务单元等。对于我们的讨论,我们只会使用Python和Snowflake操作员。

我们将以以下方式分发我们的数据管道:

TRUNCATE_TABLE_TASK --> UNZIP_FILES_IN_GCS_TASK --> LOAD_FILES_TO_TABLE_TASK

截断任务
在将数据加载到雪花之前,我们将首先截断表。这是一个增量负载,因此我们不会直接使用TRUNCATE TABLE <TABLE_NAME>。如果存在,我们只会删除CURRENT_DATE的数据。

TRUNC_QUERY = '''DELETE FROM <DATABASE_NAME.SCHEMA_NAME.TABLE_NAME> WHERE <DATE_FIELD> = CURRENT_DATE'''

trunc_task = SnowflakeOperator(
               task_id='TRUNCATE_TASK',
               sql=[TRUNC_QUERY],
               snowflake_conn_id='<connection_id>',
               database='<DATABASE_NAME>',
               schema='<SCHEMA_NAME>',
               warehouse = '<DATAWAREHOUSE_NAME>',
               role = '<ROLE_NAME>',
               dag=dag) 

解压缩任务

对于GCS存储桶中的解压缩文件,我们将使用三个库

  • zipfile
  • <这里iDe
  • Google-Cloud-Storage

在这里,我们将此任务定义为Python可召唤。 Python操作员使用此可可。

def unzip_file_in_gcs(**context):
    #Define GCS Client parameters
    bucket_name = '<BUCKET_NAME>'
    file_name = '<FILE_NAME>.zip'

    # Connect to the GCS bucket
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_name)

    # Download the zip file to memory
    zip_file_content = blob.download_as_string()

    # Unzip the file
    zip_file = zipfile.ZipFile(io.BytesIO(zip_file_content))
    zip_file.extractall(path='/home/airflow/gcs/data/temp/')

    # Upload each file in the zip to the GCS bucket
    with open('/home/airflow/gcs/data/temp/<FILE_NAME>.csv', 'rb') as f:
        file_content = f.read()
        new_blob = bucket.blob('<FILE_NAME>.csv')
        new_blob.upload_from_string(file_content)

unzip_task = PythonOperator(
        task_id="UNZIP_TASK",
        python_callable=unzip_file_in_gcs,
        provide_context=True,
        dag=dag
    )

加载任务
一旦完成拉链,现在您可以使用COPY INTO <TABLE_NAME>语句将数据加载到雪花表中。

这是任务定义:

LOAD_QUERY = '''COPY INTO <DATABASE_NAME>.<SCHEMA_NAME>.<TABLE_NAME> FROM @<DATABASE_NAME>.<SCHEMA_NAME>.<STAGE_NAME>/<FILE_NAME>.csv
file_format = (format_name = <DATABASE_NAME>.<SCHEMA_NAME>.FF_CSV)'''

load_task = SnowflakeOperator(
        task_id='LOAD_TASK',
        sql=[LOAD_QUERY],
        snowflake_conn_id='<connection_id>',
        database='<DATABASE_NAME>',
        schema='<SCHEMA_NAME>',
        warehouse = '<DATAWAREHOUSE_NAME>',
        role = '<ROLE_NAME>',
        dag=dag) 

提供任务流
最后,您必须将所有任务放在一个衬里中。最后使用此代码段:

with dag:
  trunc_task >> unzip_task >> load_task

上传并运行DAG脚本
现在,将您刚刚创建的DAG脚本上传到附加到Composer Environment Bucket的GCS存储桶中。 Apache AirFlow WebUI将在几分钟后自动反映新的DAG,并将开始运行。

结论
在本文中,我们学会了如何使用Apache气流加载在GCS存储桶中存储的ZIP文件。我们还经历了GCS Composer中DAG的创建和部署。对于将来的讨论,我们将探索Apache气流中的其他集成方法。

直到那时,再见!!