创建数据管道作为Apache气流中的DAG(第1部分)
#python #airflow #dataengineering #dags

DAG是一种特殊的图形。

那么,图到底是什么?

图表用于表达或说明对象之间的关系。用更多的技术术语,图表用于描述节点之间的任何一组节点和边缘(关系)。下图是图的简单说明。

graph

边缘没有方向的图,即边缘连接两个节点,而无需指定哪个是源,哪个是目的地,被称为“无向图”。上图是无向图的示例。这些图可用于表示对称关系,例如与双向友谊的社交网络中发现的关系。

a 有向图是与无向图的完全相反。有向图是其边缘具有定义方向的图形,因此边缘与目标节点相连。

a directed graph

当图中的一条路径开始并在相同的节点上结束并且至少具有一个边缘时,该路径被称为 cycle

如果它具有至少一个周期,则将图称为“循环” 。另一方面,“ acyclic ”图是没有周期的图。

cycle in a graph

现在,我们了解了什么是图形,有向图和无向图之间的差异以及环状图和无环图之间的差异,让我们定义一个dag是什么。

什么是DAG?

DAG是一种没有周期的有向图。结果,通过查看边缘的方向从一个节点到DAG中的另一个节点,从一个循环或周期都没有任何循环或周期。

用于描述工作之间的工作流或依赖关系,DAG由于其无环性而特别有用。 DAG中的每个节点代表要完成的任务,边缘显示了这些任务如何相互关联。您可能会弄清楚需要完成哪些订单来满足所有依赖项,从而遵循DAG的有向边缘。

例如,想象一下您要从源中提取数据,转换数据并将数据加载到数据库中。我们可以为此工作流程绘制DAG插图,如下所示:

a DAG

从上面的图中,我们看到了代表一系列任务序列的三个不同节点。第一个任务是数据提取。提取完成后,将启动下一个任务,从而改变数据。转换完成后,最后一个任务是将数据加载到数据库中。

让我们考虑另一个示例:

假设您想从各种服务器接收几个日志文件,解析日志文件,然后如果没有问题,则将解析的数据发送到可视化服务器,如果有的话,请发送电子邮件给DevOps团队。将其转换为DAG,我们有:

a DAG

上面显示的DAG类似于一棵树。

根据谚语;

所有树是dags,但并非所有的小吃都是树。

树上的一个节点只能有一个父母,但是“任务4”节点有几个父母,因此该dag不能归类为树。

Apache气流中的DAG

DAG用于表示Apache气流中的工作流或管道。

您的数据管道的各个任务分别表示为DAG中的节点,并且两个任务之间的任何依赖性表示为DAG中的有向边缘。换句话说,边缘指定了应执行两个作业的顺序。因此,在气流中使用了DAG来指定应运行的任务以及以什么顺序运行。

在Apache气流中,使用Python脚本表示DAG的结构。结果,任务及其依赖项被描述为代码。此外,DAG的脚本包含计划指令作为代码。

在DAG中执行的每个任务也用Python编写,并由操作员实施。

什么是气流DAG操作员

操作员是工作流程的基础。操作员是作为工作流程的一部分执行的单个任务。他们能够执行广泛的操作,例如运行Python功能,SQL查询或Shell命令。

在Apache气流中,DAG(定向无环图)操作员是工作流程的构建块。操作员是作为工作流程的一部分执行的单个任务,他们可以执行各种各样的操作,例如运行Python函数,执行SQL查询或运行Shell命令。

可以立即利用许多气流中的许多预制操作员,包括以下内容:

  1. bashoperator:执行bash命令或脚本。

  2. pythonoperator:执行python函数。

  3. sqloperator:执行SQL查询。

这些只是气流提供的几个操作员中的几个。为了开展特定的活动或与外部系统的接口,用户还可以设计其自定义操作员。

一天作为代码

要在本节中进行编码,请确保您在Python环境中安装了气流Python库。

pip install apache-airflow

python脚本中DAG定义的典型逻辑块如下:

  1. 库导入

    导入所需的Python库。

    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.utils.dates import days_ago
    from datetime import timedelta
    
  2. d参数

    接下来,此块指定DAG对象的默认参数

    args = {
        'owner': 'John Doe',
        'start_date': days_ago(1),
        'email': ['johndoe@random.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    }
    
  3. 爸爸定义

    要实例化DAG,您需要DAG,example_dag,默认参数,描述和计划间隔的名称。

    
    dag = DAG(
        'example_dag',
        default_args=args,
        description='John Doe\'s DAG',
        schedule_interval=timedelta(days=1),
    )
    
  4. 任务定义

    在这里,我们使用BashOperator创建单个任务定义。这些任务定义是DAG的节点。

    task_1 =  BashOperator(
        task_id='task_1',
        bash_command='echo "Task 1"',
        dag=dag
    )
    
    task_2 =  BashOperator(
        task_id='task_2',
        bash_command='echo "Task 2"',
        dag=dag
    )
    
    task_3 =  BashOperator(
        task_id='task_3',
        bash_command='echo "Task 3"',
        dag=dag
    )
    
  5. 任务管道

    任务管道指定任务之间的依赖项。

    task_1.set_downstream(task_2)
    task_2.set_downstream(task_3)
    

    在这里,task_2取决于task_1,而task_3取决于task_2

您可以使用Apache AirFlow Scheduler在一系列工人上分发工作流程。它遵循您在DAG中列出的任务和依赖项。根据您在每个DAG中将代码设置为代码的“开始日期”,一旦启动了AirFlow Scheduler实例,您的DAG将开始执行。然后,调度程序以您指定的时间间隔启动每个下一个DAG运行。

这是完整的代码:

# import libraries
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

# Specify default arguments
args = {
    'owner': 'John Doe',
    'start_date': days_ago(1),
    'email': ['johndoe@random.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Instantiate the DAG object
dag = DAG(
    'example_dag',
    default_args=args,
    description='John Doe\'s DAG',
    schedule_interval=timedelta(days=1),
)

# define the tasks
task_1 =  BashOperator(
    task_id='task_1',
    bash_command='echo "Task 1"',
    dag=dag
)

task_2 =  BashOperator(
    task_id='task_2',
    bash_command='echo "Task 2"',
    dag=dag
)

task_3 =  BashOperator(
    task_id='task_3',
    bash_command='echo "Task 3"',
    dag=dag
)

# Specify the pipeline dependencies
task_1.set_downstream(task_2)
task_2.set_downstream(task_3)

将数据管道表示为代码的事实是Apache Airffore将数据管道作为DAG建模的方法的主要好处之一。当工作流定义为代码时,它们变得更多:

  1. 可维护:通过阅读代码,开发人员可以直接遵循已指示的内容。

  2. 版本:代码修订可以通过版本控制系统(例如。

  3. )轻松跟踪
  4. 协作:开发人员团队可能会在整个工作流程中很容易合作于代码的创建和维护。

  5. 可测试:可以在任何更改后运行单元测试,以确保代码继续按预期运行。