气流:使用tweepy创建第一个气流DAG
#初学者 #python #airflow #tweepy

什么是apache气流?
数据管道是一系列数据处理任务,必须在源系统和目标系统之间运行才能自动化数据移动和转换。

Apache气流是用于数据管道生成的面向批处理的工具。它用于编程创建,安排和监视通常称为工作流编排的数据管道。气流是一个开源平台,用于管理与数据管道中数据处理相关的各种任务。

Apache气流如何工作?
气流中的数据管道使用Python编程语言中的直接无环图(DAG)编写。通过将数据管道绘制为图形,气流明确定义了任务之间的依赖性。在DAG中,任务显示为节点,而任务之间的依赖性则使用不同任务节点之间的直接边缘说明。

dag illustration

边缘的方向描绘了依赖关系的方向,边缘从一个任务指向另一个任务。指示在继续进行下一个任务之前必须完成哪个任务。
使用Python代码在Apache气流中定义了DAG。 Python文件描述了相关的DAG结构。结果,每个DAG文件通常描述给定DAG的各种任务以及各种任务的依赖项。 Apache气流解析它们以创建DAG结构。此外,DAGS气流文件还包括指示气流何时以及如何执行文件的其他元数据。

用Python代码定义气流DAG的好处是,该程序方法在构建管道时为用户提供了很大的灵活性。例如,用户可以使用Python代码根据某些条件生成动态管道。适应性允许进行出色的工作流定制,从而使用户可以根据其特定要求量身定制气流。

在Apache气流中调度和执行数据管道
将数据管道的结构定义为DAGS之后,Apache AirFlow允许用户为每个DAG指定一个计划间隔。时间表决定气流何时运行管道。结果,用户可以指示气流每周,一天或小时运行。另外,您可以定义更复杂的时间表间隔以交付所需的工作流量输出。

要更好地了解气流如何运行DAG,我们必须首先检查开发和运行DAG的整体过程。

Apache气流的组件

  1. 气流调度程序负责解析DAG,检查其时间表,监视其间隔以及计划DAG任务,以便如果通过时间表通过,则为气流工人进行处理。

  2. 气流工人负责捡起和执行任务。

  3. 气流网络服务器用于可视化由解析的DAG运行的管道。 Web服务器还用作主要气流UI(用户界面),允许用户跟踪其DAG和结果的进度。

气流操作员是气流DAG的基础。它们包含如何处理管道中数据的逻辑。 DAG任务是通过实例化操作员来定义的。
在气流中,有许多不同类型的操作员。一些操作员,例如Python函数,运行用户提供的常规代码,而另一些操作员则执行非常具体的操作,例如数据传输从一个系统到另一系统。

一些最常用的气流操作员如下:

  • Pythonoperator:此类运行Python函数。
  • Bashoperator:此类运行一个bash脚本。
  • pythonvirtualenvoperatodecorator:在新的python虚拟环境中执行python可可。 Virtualenv包需要安装在运行气流的环境中

XCOMS (“交叉通信”的简短)是一种允许任务相互通信的机制,因为通常是隔离的,并且可能在不同的机器上运行。

XCOM由密钥(基本上是其名称)以及其启动的任务ID和DAG ID标识。它们可以具有任何(可序列化的)值,但仅用于少量数据;请勿使用它们来传递大量值,例如DataFrames。
任务实例上的XCOM PUSH和XCOM拉动方法用于显式“推”和“拉” XCOMS降落到存储。如果DO XCOM推动参数设置为true,那么许多运营商将其结果自动添加到称为返回值的XCOM密钥中。

"""Example DAG demonstrating the usage of XComs."""
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

dag = DAG(
    'example_xcom',
    schedule_interval="@once",
    start_date=days_ago(2),
    default_args={'owner': 'airflow'},
    tags=['example'],
)

value_1 = [1, 2, 3]
value_2 = {'a': 'b'}


def push(**kwargs):
    """Pushes an XCom without a specific target"""
    kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)


def push_by_returning(**kwargs):
    """Pushes an XCom without a specific target, just by returning it"""
    return value_2


def puller(**kwargs):
    """Pull all previously pushed XComs and check if the pushed values match the pulled values."""
    ti = kwargs['ti']

    # get value_1
    pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
    if pulled_value_1 != value_1:
        raise ValueError(f'The two values differ {pulled_value_1} and {value_1}')

    # get value_2
    pulled_value_2 = ti.xcom_pull(task_ids='push_by_returning')
    if pulled_value_2 != value_2:
        raise ValueError(f'The two values differ {pulled_value_2} and {value_2}')

使用Tweepy
您的第一个气流DAG 使用气流和曲折显示每20分钟的趋势主题标签。首先,创建一个名为Tweepy_airFlow的新文件夹。
运行此命令:curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.3.3/docker-compose.yaml下载Docker组成的文件。该文件包含多个服务定义,包括气流安排机,气流 - 网络服务器,气流工人,气流 - 内部,Postgres和redis

将数据卷添加到Docker-Compose文件

    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
    # volume to store data
    - ./data:/opt/airflow/data

初始化气流并运行DAG
确保Docker运行,然后运行docker-compose up airflow-init。 DAG包含/放入DAGS文件夹中。初始化气流开始服务 - > RUN docker-compose up使用docker ps确认服务正在运行,然后导航到Localhost:8080

tweepy_dag.py(trending_haashtags day)
包含可以连接到Twitter的功能的DAG。DAG使用PythonVirtualvenvOperator来创建和激活使用斜纹模块的VENV。所有操作都包裹在一个可呼叫功能中以避免XCOM问题。

DAG的整个代码:

#importing libraries
from datetime import timedelta
import airflow
import os 

#from app import tweepy
from airflow import DAG
from airflow.operators.python import PythonOperator # for executing python functions
from airflow.operators.python import PythonVirtualenvOperator # for working with venvs airflow

# function to get twitter API, get trending topics for Nairobi and log them
def get_api():
    # importing a package in the function ensures that it is accessible when the venv is created
    import tweepy
    # api credentials - input yours
    consumer_key = " "
    consumer_secret = " "
    access_token = " "
    access_token_secret = " "

    # authentication of consumer key and secret
    auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
    # authentication of access token and secret
    auth.set_access_token(access_token, access_token_secret)
    api = tweepy.API(auth, wait_on_rate_limit=True)
    print("successfuly activated virtual env and connected to twitter API")
    # coordinates for Nairobi city
    lat= 1.2921
    long= 36.8219
    # methods to get trends- tweepy==4.6.0
    closest_loc = api.closest_trends(lat, long)
    trends = api.get_place_trends(closest_loc[0]["woeid"])
    trends_ = trends[0]["trends"]
    hashtags = [trend["name"] for trend in trends_ if "#" in trend["name"]]
    # print hashtags
    for elem in hashtags:
        print(elem) 


def message():
    print("task complete")

default_args = {
 'owner':'airflow',
 'depends_on_past' : False,
 'start_date': airflow.utils.dates.days_ago(7),
}

trending_hashtags = DAG(
    'trending_hashtags', #name of the dag
    default_args = default_args,
    schedule_interval = timedelta(minutes=20),
    catchup = False
)

get_api_2 = PythonVirtualenvOperator(
    task_id='connecting_to_twitter_api',
    python_callable = get_api,
    requirements = ["tweepy"],
    system_site_packages=False,
    dag = trending_hashtags,
)

message_out = PythonOperator(
    task_id = 'task_complete_message',
    python_callable = message,
    dag = trending_hashtags,
)

get_api_2 >> message_out

气流Web UI
使用Web UI访问趋势主题标签DAG解开并触发DAG。使用任何视图选项来监视DAG的进度。检查日志是否输出和可能错误。

graph view of the dag

tree view of the dag

output of the DAG
使用Docker-Compose停止服务

气流在软件公司,金融机构,游戏开发人员等中变得越来越流行。无论您是否是技术专业人员,诸如DagFactory之类的图书馆都可以为整个企业的专业人员提供气流,而不仅仅是数据工程师。气流DAG由通过操作员执行各种操作的任务组成,利用Python,Bash,HTTP和数据库功能。

您可以使用此创新工具将数据管道策略提升到一个新的水平,并在最复杂的环境中进行协调。