带有Apache气流 + Minio的Twitter数据管道(S3兼容对象存储)
#twitter #python #minio

带有Apache气流 + Minio的Twitter数据管道(S3兼容对象存储)

您阅读的越多,您会知道的东西越多。您学到的越多,您会去的地方越多。
dr。 Seuss

动机

在作为数据工程师的旅途中,我偶然发现了许多工具。
引起我注意的一个是Minio,Minio是一个兼容的多云对象存储。

要了解有关它的更多信息,我构建了一条数据管道,该数据管道使用Apache Airflow使用Twitter API拉动Elon Musk推文,并将结果存储在存储在MinIO中的CSV中(AWS S3的OSS替代品)对象存储桶。

然后,我们将使用Docker-Compose轻松部署我们的代码。

https://raw.githubusercontent.com/mikekenneth/airflow_minio_twitter_data_pipeline/main/docs/architecture.png

表中的内容

  • 什么是Apache气流?
  • 什么是Minio?
  • 代码
    • get_twitter_data()
    • dump_data_to_bucket()
    • dag(直接无环图)
    • docker-compose&.env文件

什么是Apache气流

气流是社区创建的平台,以编程作者,计划和监视工作流程。

Apache气流是用Python编写的OpenSource Workflow编排。它使用dag(直接无环图)表示工作流程。它是高度可定制/灵活的,并且拥有一个非常活跃的社区。

您可以阅读更多here

什么是米尼奥

Minio提供高性能,S3兼容对象存储。

Minio是一种OpenSource多云对象存储,并且与AWS S3完全兼容。使用Minio,您可以托管自己的本地或云对象存储。

您可以阅读更多here

代码

可以访问完整的代码。

源代码:

https://github.com/mikekenneth/airflow_minio_twitter_data_pipeline

get_twitter_data()

以下是Python任务,将Elon的推文从Twitter API提取到Python列表:

import os
import json
import requests
from airflow.decorators import dag, task

@task
def get_twitter_data():
    TWITTER_BEARER_TOKEN = os.getenv("TWITTER_BEARER_TOKEN")

    # Get tweets using Twitter API v2 & Bearer Token
    BASE_URL = "https://api.twitter.com/2/tweets/search/recent"
    USERNAME = "elonmusk"
    FIELDS = {"created_at", "lang", "attachments", "public_metrics", "text", "author_id"}

    url = f"{BASE_URL}?query=from:{USERNAME}&tweet.fields={','.join(FIELDS)}&expansions=author_id&max_results=50"
    response = requests.get(url=url, headers={"Authorization": f"Bearer {TWITTER_BEARER_TOKEN}"})
    response = json.loads(response.content)

    data = response["data"]
    includes = response["includes"]

    # Refine tweets data
    tweet_list = []
    for tweet in data:
        refined_tweet = {
            "tweet_id": tweet["id"],
            "username": includes["users"][0]["username"],  # Get username from the included data
            "user_id": tweet["author_id"],
            "text": tweet["text"],
            "like_count": tweet["public_metrics"]["like_count"],
            "retweet_count": tweet["public_metrics"]["retweet_count"],
            "created_at": tweet["created_at"],
        }
        tweet_list.append(refined_tweet)
    return tweet_list

dump_data_to_bucket()

以下是Python任务,将Tweet列表转换为Pandas dataFrame,然后将其转储为CSV文件中的Minio对象存储:

import os
from airflow.decorators import dag, task

@task
def dump_data_to_bucket(tweet_list: list):
    import pandas as pd
    from minio import Minio
    from io import BytesIO
    MINIO_BUCKET_NAME = os.getenv("MINIO_BUCKET_NAME")
    MINIO_ROOT_USER = os.getenv("MINIO_ROOT_USER")
    MINIO_ROOT_PASSWORD = os.getenv("MINIO_ROOT_PASSWORD")

    df = pd.DataFrame(tweet_list)
    csv = df.to_csv(index=False).encode("utf-8")

    client = Minio("minio:9000", access_key=MINIO_ROOT_USER, secret_key=MINIO_ROOT_PASSWORD, secure=False)

    # Make MINIO_BUCKET_NAME if not exist.
    found = client.bucket_exists(MINIO_BUCKET_NAME)
    if not found:
        client.make_bucket(MINIO_BUCKET_NAME)
    else:
        print(f"Bucket '{MINIO_BUCKET_NAME}' already exists!")

    # Put csv data in the bucket
    client.put_object(
        "airflow-bucket", "twitter_elon_musk.csv", data=BytesIO(csv), length=len(csv), content_type="application/csv"
    )

DAG(直接无环图)

以下是允许指定任务之间的依赖项的DAG本身:

from datetime import datetime
from airflow.decorators import dag, task

@dag(
    schedule="0 */2 * * *",
    start_date=datetime(2022, 12, 26),
    catchup=False,
    tags=["twitter", "etl"],
)
def twitter_etl():
    dump_data_to_bucket(get_twitter_data())

twitter_etl()

docker-compose&.env文件

以下是we need to create容纳运行我们管道所需的环境变量的.env文件:

您可以阅读this学习我们的生成TWITTER_BEARER_TOKEN

# Twitter (Must not be empty)
TWITTER_BEARER_TOKEN=""

# Meta-Database
POSTGRES_USER=airflow
POSTGRES_PASSWORD=airflow
POSTGRES_DB=airflow

# Airflow Core
AIRFLOW__CORE__FERNET_KEY=''
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=True
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW_UID=50000
AIRFLOW_GID=0

# Backend DB
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS=False

# Airflow Init
_AIRFLOW_DB_UPGRADE=True
_AIRFLOW_WWW_USER_CREATE=True
_AIRFLOW_WWW_USER_USERNAME=airflow
_AIRFLOW_WWW_USER_PASSWORD=airflow
_PIP_ADDITIONAL_REQUIREMENTS= "minio pandas requests"

# Minio
MINIO_ROOT_USER=minio_user
MINIO_ROOT_PASSWORD=minio_password123
MINIO_BUCKET_NAME='airflow-bucket'

及以下是docker-compose.yaml文件,它允许我们的管道旋转所需的基础架构:

version: '3.4'

x-common:
  &common
  image: apache/airflow:2.5.0
  user: "${AIRFLOW_UID}:0"
  env_file:
    - .env
  volumes:
    - ./app/dags:/opt/airflow/dags
    - ./app/logs:/opt/airflow/logs

x-depends-on:
  &depends-on
  depends_on:
    postgres:
      condition: service_healthy
    airflow-init:
      condition: service_completed_successfully

services:
  minio:
    image: minio/minio:latest
    ports:
      - '9000:9000'
      - '9090:9090'
    volumes:
      - './minio_data:/data'
    env_file:
      - .env
    command: server --console-address ":9090" /data

  postgres:
    image: postgres:13
    container_name: postgres
    ports:
      - "5433:5432"
    healthcheck:
      test: [ "CMD", "pg_isready", "-U", "airflow" ]
      interval: 5s
      retries: 5
    env_file:
      - .env

  scheduler:
    <<: *common
    <<: *depends-on
    container_name: airflow-scheduler
    command: scheduler
    restart: on-failure
    ports:
      - "8793:8793"

  webserver:
    <<: *common
    <<: *depends-on
    container_name: airflow-webserver
    restart: always
    command: webserver
    ports:
      - "8080:8080"
    healthcheck:
      test:
        [
          "CMD",
          "curl",
          "--fail",
          "http://localhost:8080/health"
        ]
      interval: 30s
      timeout: 30s
      retries: 5

  airflow-init:
    <<: *common
    container_name: airflow-init
    entrypoint: /bin/bash
    command:
      - -c
      - |
        mkdir -p /sources/logs /sources/dags
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags}
        exec /entrypoint airflow version

当我们访问Apache-Airflow Web UI时,我们可以看到DAG,并且可以直接运行以查看结果。

dag(apael-airfow we Weste):

https://raw.githubusercontent.com/mikekenneth/blogpost_resources/main/twitter_pipeline_airflow_ui.png

在我们的存储桶中生成的推文文件(Minio Console):

https://raw.githubusercontent.com/mikekenneth/blogpost_resources/main/twitter_pipeline_minio_ui.png

这是一个包裹。我希望这对您有帮助。

关于我

我是一名数据工程师,拥有3年以上的经验,并且是软件工程师(5年以上)。我喜欢学习和教学(主要是学习ð)。

您可以通过emailTwitterLinkedIn与我联系。

使用bloggu.io发布的文章。免费尝试。