带有Apache气流 + Minio的Twitter数据管道(S3兼容对象存储)
您阅读的越多,您会知道的东西越多。您学到的越多,您会去的地方越多。
dr。 Seuss
动机
在作为数据工程师的旅途中,我偶然发现了许多工具。
引起我注意的一个是Minio,Minio是一个兼容的多云对象存储。
要了解有关它的更多信息,我构建了一条数据管道,该数据管道使用Apache Airflow使用Twitter API拉动Elon Musk推文,并将结果存储在存储在MinIO中的CSV中(AWS S3的OSS替代品)对象存储桶。
。。然后,我们将使用Docker-Compose轻松部署我们的代码。
表中的内容
- 什么是Apache气流?
- 什么是Minio?
- 代码
- get_twitter_data()
- dump_data_to_bucket()
- dag(直接无环图)
- docker-compose&.env文件
什么是Apache气流
气流是社区创建的平台,以编程作者,计划和监视工作流程。
Apache气流是用Python编写的OpenSource Workflow编排。它使用dag(直接无环图)表示工作流程。它是高度可定制/灵活的,并且拥有一个非常活跃的社区。
您可以阅读更多here。
什么是米尼奥
Minio提供高性能,S3兼容对象存储。
Minio是一种OpenSource多云对象存储,并且与AWS S3完全兼容。使用Minio,您可以托管自己的本地或云对象存储。
您可以阅读更多here。
代码
可以访问完整的代码。
源代码:
https://github.com/mikekenneth/airflow_minio_twitter_data_pipeline
get_twitter_data()
以下是Python任务,将Elon的推文从Twitter API提取到Python列表:
import os
import json
import requests
from airflow.decorators import dag, task
@task
def get_twitter_data():
TWITTER_BEARER_TOKEN = os.getenv("TWITTER_BEARER_TOKEN")
# Get tweets using Twitter API v2 & Bearer Token
BASE_URL = "https://api.twitter.com/2/tweets/search/recent"
USERNAME = "elonmusk"
FIELDS = {"created_at", "lang", "attachments", "public_metrics", "text", "author_id"}
url = f"{BASE_URL}?query=from:{USERNAME}&tweet.fields={','.join(FIELDS)}&expansions=author_id&max_results=50"
response = requests.get(url=url, headers={"Authorization": f"Bearer {TWITTER_BEARER_TOKEN}"})
response = json.loads(response.content)
data = response["data"]
includes = response["includes"]
# Refine tweets data
tweet_list = []
for tweet in data:
refined_tweet = {
"tweet_id": tweet["id"],
"username": includes["users"][0]["username"], # Get username from the included data
"user_id": tweet["author_id"],
"text": tweet["text"],
"like_count": tweet["public_metrics"]["like_count"],
"retweet_count": tweet["public_metrics"]["retweet_count"],
"created_at": tweet["created_at"],
}
tweet_list.append(refined_tweet)
return tweet_list
dump_data_to_bucket()
以下是Python任务,将Tweet列表转换为Pandas dataFrame,然后将其转储为CSV文件中的Minio对象存储:
import os
from airflow.decorators import dag, task
@task
def dump_data_to_bucket(tweet_list: list):
import pandas as pd
from minio import Minio
from io import BytesIO
MINIO_BUCKET_NAME = os.getenv("MINIO_BUCKET_NAME")
MINIO_ROOT_USER = os.getenv("MINIO_ROOT_USER")
MINIO_ROOT_PASSWORD = os.getenv("MINIO_ROOT_PASSWORD")
df = pd.DataFrame(tweet_list)
csv = df.to_csv(index=False).encode("utf-8")
client = Minio("minio:9000", access_key=MINIO_ROOT_USER, secret_key=MINIO_ROOT_PASSWORD, secure=False)
# Make MINIO_BUCKET_NAME if not exist.
found = client.bucket_exists(MINIO_BUCKET_NAME)
if not found:
client.make_bucket(MINIO_BUCKET_NAME)
else:
print(f"Bucket '{MINIO_BUCKET_NAME}' already exists!")
# Put csv data in the bucket
client.put_object(
"airflow-bucket", "twitter_elon_musk.csv", data=BytesIO(csv), length=len(csv), content_type="application/csv"
)
DAG(直接无环图)
以下是允许指定任务之间的依赖项的DAG本身:
from datetime import datetime
from airflow.decorators import dag, task
@dag(
schedule="0 */2 * * *",
start_date=datetime(2022, 12, 26),
catchup=False,
tags=["twitter", "etl"],
)
def twitter_etl():
dump_data_to_bucket(get_twitter_data())
twitter_etl()
docker-compose&.env文件
以下是we need to create容纳运行我们管道所需的环境变量的.env
文件:
您可以阅读this学习我们的生成
TWITTER_BEARER_TOKEN
。
# Twitter (Must not be empty)
TWITTER_BEARER_TOKEN=""
# Meta-Database
POSTGRES_USER=airflow
POSTGRES_PASSWORD=airflow
POSTGRES_DB=airflow
# Airflow Core
AIRFLOW__CORE__FERNET_KEY=''
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=True
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW_UID=50000
AIRFLOW_GID=0
# Backend DB
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS=False
# Airflow Init
_AIRFLOW_DB_UPGRADE=True
_AIRFLOW_WWW_USER_CREATE=True
_AIRFLOW_WWW_USER_USERNAME=airflow
_AIRFLOW_WWW_USER_PASSWORD=airflow
_PIP_ADDITIONAL_REQUIREMENTS= "minio pandas requests"
# Minio
MINIO_ROOT_USER=minio_user
MINIO_ROOT_PASSWORD=minio_password123
MINIO_BUCKET_NAME='airflow-bucket'
及以下是docker-compose.yaml
文件,它允许我们的管道旋转所需的基础架构:
version: '3.4'
x-common:
&common
image: apache/airflow:2.5.0
user: "${AIRFLOW_UID}:0"
env_file:
- .env
volumes:
- ./app/dags:/opt/airflow/dags
- ./app/logs:/opt/airflow/logs
x-depends-on:
&depends-on
depends_on:
postgres:
condition: service_healthy
airflow-init:
condition: service_completed_successfully
services:
minio:
image: minio/minio:latest
ports:
- '9000:9000'
- '9090:9090'
volumes:
- './minio_data:/data'
env_file:
- .env
command: server --console-address ":9090" /data
postgres:
image: postgres:13
container_name: postgres
ports:
- "5433:5432"
healthcheck:
test: [ "CMD", "pg_isready", "-U", "airflow" ]
interval: 5s
retries: 5
env_file:
- .env
scheduler:
<<: *common
<<: *depends-on
container_name: airflow-scheduler
command: scheduler
restart: on-failure
ports:
- "8793:8793"
webserver:
<<: *common
<<: *depends-on
container_name: airflow-webserver
restart: always
command: webserver
ports:
- "8080:8080"
healthcheck:
test:
[
"CMD",
"curl",
"--fail",
"http://localhost:8080/health"
]
interval: 30s
timeout: 30s
retries: 5
airflow-init:
<<: *common
container_name: airflow-init
entrypoint: /bin/bash
command:
- -c
- |
mkdir -p /sources/logs /sources/dags
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags}
exec /entrypoint airflow version
当我们访问Apache-Airflow Web UI时,我们可以看到DAG,并且可以直接运行以查看结果。
dag(apael-airfow we Weste):
在我们的存储桶中生成的推文文件(Minio Console):
这是一个包裹。我希望这对您有帮助。
关于我
我是一名数据工程师,拥有3年以上的经验,并且是软件工程师(5年以上)。我喜欢学习和教学(主要是学习ð)。
您可以通过email,Twitter和LinkedIn与我联系。
使用bloggu.io发布的文章。免费尝试。