什么是apache气流?
数据管道是一系列数据处理任务,必须在源系统和目标系统之间运行才能自动化数据移动和转换。
Apache气流是用于数据管道生成的面向批处理的工具。它用于编程创建,安排和监视通常称为工作流编排的数据管道。气流是一个开源平台,用于管理与数据管道中数据处理相关的各种任务。
Apache气流如何工作?
气流中的数据管道使用Python编程语言中的直接无环图(DAG)编写。通过将数据管道绘制为图形,气流明确定义了任务之间的依赖性。在DAG中,任务显示为节点,而任务之间的依赖性则使用不同任务节点之间的直接边缘说明。
边缘的方向描绘了依赖关系的方向,边缘从一个任务指向另一个任务。指示在继续进行下一个任务之前必须完成哪个任务。
使用Python代码在Apache气流中定义了DAG。 Python文件描述了相关的DAG结构。结果,每个DAG文件通常描述给定DAG的各种任务以及各种任务的依赖项。 Apache气流解析它们以创建DAG结构。此外,DAGS气流文件还包括指示气流何时以及如何执行文件的其他元数据。
用Python代码定义气流DAG的好处是,该程序方法在构建管道时为用户提供了很大的灵活性。例如,用户可以使用Python代码根据某些条件生成动态管道。适应性允许进行出色的工作流定制,从而使用户可以根据其特定要求量身定制气流。
在Apache气流中调度和执行数据管道
将数据管道的结构定义为DAGS之后,Apache AirFlow允许用户为每个DAG指定一个计划间隔。时间表决定气流何时运行管道。结果,用户可以指示气流每周,一天或小时运行。另外,您可以定义更复杂的时间表间隔以交付所需的工作流量输出。
要更好地了解气流如何运行DAG,我们必须首先检查开发和运行DAG的整体过程。
Apache气流的组件
-
气流调度程序负责解析DAG,检查其时间表,监视其间隔以及计划DAG任务,以便如果通过时间表通过,则为气流工人进行处理。
-
气流工人负责捡起和执行任务。
-
气流网络服务器用于可视化由解析的DAG运行的管道。 Web服务器还用作主要气流UI(用户界面),允许用户跟踪其DAG和结果的进度。
气流操作员是气流DAG的基础。它们包含如何处理管道中数据的逻辑。 DAG任务是通过实例化操作员来定义的。
在气流中,有许多不同类型的操作员。一些操作员,例如Python函数,运行用户提供的常规代码,而另一些操作员则执行非常具体的操作,例如数据传输从一个系统到另一系统。
一些最常用的气流操作员如下:
- Pythonoperator:此类运行Python函数。
- Bashoperator:此类运行一个bash脚本。
- pythonvirtualenvoperatodecorator:在新的python虚拟环境中执行python可可。 Virtualenv包需要安装在运行气流的环境中
XCOMS (“交叉通信”的简短)是一种允许任务相互通信的机制,因为通常是隔离的,并且可能在不同的机器上运行。
XCOM由密钥(基本上是其名称)以及其启动的任务ID和DAG ID标识。它们可以具有任何(可序列化的)值,但仅用于少量数据;请勿使用它们来传递大量值,例如DataFrames。
任务实例上的XCOM PUSH和XCOM拉动方法用于显式“推”和“拉” XCOMS降落到存储。如果DO XCOM推动参数设置为true,那么许多运营商将其结果自动添加到称为返回值的XCOM密钥中。
"""Example DAG demonstrating the usage of XComs."""
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
dag = DAG(
'example_xcom',
schedule_interval="@once",
start_date=days_ago(2),
default_args={'owner': 'airflow'},
tags=['example'],
)
value_1 = [1, 2, 3]
value_2 = {'a': 'b'}
def push(**kwargs):
"""Pushes an XCom without a specific target"""
kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)
def push_by_returning(**kwargs):
"""Pushes an XCom without a specific target, just by returning it"""
return value_2
def puller(**kwargs):
"""Pull all previously pushed XComs and check if the pushed values match the pulled values."""
ti = kwargs['ti']
# get value_1
pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
if pulled_value_1 != value_1:
raise ValueError(f'The two values differ {pulled_value_1} and {value_1}')
# get value_2
pulled_value_2 = ti.xcom_pull(task_ids='push_by_returning')
if pulled_value_2 != value_2:
raise ValueError(f'The two values differ {pulled_value_2} and {value_2}')
使用Tweepy
您的第一个气流DAG
使用气流和曲折显示每20分钟的趋势主题标签。首先,创建一个名为Tweepy_airFlow的新文件夹。
运行此命令:curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.3.3/docker-compose.yaml
下载Docker组成的文件。该文件包含多个服务定义,包括气流安排机,气流 - 网络服务器,气流工人,气流 - 内部,Postgres和redis
将数据卷添加到Docker-Compose文件
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
# volume to store data
- ./data:/opt/airflow/data
初始化气流并运行DAG
确保Docker运行,然后运行docker-compose up airflow-init
。 DAG包含/放入DAGS文件夹中。初始化气流开始服务 - > RUN docker-compose up
使用docker ps
确认服务正在运行,然后导航到Localhost:8080
tweepy_dag.py(trending_haashtags day)
包含可以连接到Twitter的功能的DAG。DAG使用PythonVirtualvenvOperator
来创建和激活使用斜纹模块的VENV。所有操作都包裹在一个可呼叫功能中以避免XCOM问题。
DAG的整个代码:
#importing libraries
from datetime import timedelta
import airflow
import os
#from app import tweepy
from airflow import DAG
from airflow.operators.python import PythonOperator # for executing python functions
from airflow.operators.python import PythonVirtualenvOperator # for working with venvs airflow
# function to get twitter API, get trending topics for Nairobi and log them
def get_api():
# importing a package in the function ensures that it is accessible when the venv is created
import tweepy
# api credentials - input yours
consumer_key = " "
consumer_secret = " "
access_token = " "
access_token_secret = " "
# authentication of consumer key and secret
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
# authentication of access token and secret
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth, wait_on_rate_limit=True)
print("successfuly activated virtual env and connected to twitter API")
# coordinates for Nairobi city
lat= 1.2921
long= 36.8219
# methods to get trends- tweepy==4.6.0
closest_loc = api.closest_trends(lat, long)
trends = api.get_place_trends(closest_loc[0]["woeid"])
trends_ = trends[0]["trends"]
hashtags = [trend["name"] for trend in trends_ if "#" in trend["name"]]
# print hashtags
for elem in hashtags:
print(elem)
def message():
print("task complete")
default_args = {
'owner':'airflow',
'depends_on_past' : False,
'start_date': airflow.utils.dates.days_ago(7),
}
trending_hashtags = DAG(
'trending_hashtags', #name of the dag
default_args = default_args,
schedule_interval = timedelta(minutes=20),
catchup = False
)
get_api_2 = PythonVirtualenvOperator(
task_id='connecting_to_twitter_api',
python_callable = get_api,
requirements = ["tweepy"],
system_site_packages=False,
dag = trending_hashtags,
)
message_out = PythonOperator(
task_id = 'task_complete_message',
python_callable = message,
dag = trending_hashtags,
)
get_api_2 >> message_out
气流Web UI
使用Web UI访问趋势主题标签DAG解开并触发DAG。使用任何视图选项来监视DAG的进度。检查日志是否输出和可能错误。
气流在软件公司,金融机构,游戏开发人员等中变得越来越流行。无论您是否是技术专业人员,诸如DagFactory之类的图书馆都可以为整个企业的专业人员提供气流,而不仅仅是数据工程师。气流DAG由通过操作员执行各种操作的任务组成,利用Python,Bash,HTTP和数据库功能。
您可以使用此创新工具将数据管道策略提升到一个新的水平,并在最复杂的环境中进行协调。