班级到气流自定义操作员
#编程 #教程 #python #dataengineering

在上一篇文章中,我们创建了一个非常简单的单个任务气流DAG来运行Sprott Scraper。但是,就像我们的功能性刮刀一样,DAG是一个技巧小马,不容易用于其他东西。为此,我们需要像在抽象类方法中使用提取对象,转换对象和加载对象那样将其分解。这次,每个对象成为一个任务,该任务由操作员在气流中执行。首先,让我们看一下如何从Python类中创建自定义操作员。

如果您在我制作功能性Sprott Scraper DAG后注意到了代码块,您会看到我用pythonoperator明确地将其替换为@task装饰器。您可以想象您可以将所有任务写入Python函数,但是如果您想像我们在Python Pipeline中所做的那样创建可重复使用的类,该怎么办?您可以非常轻松地创建自己的气流操作员!

让我们从几周前开始的班级刮擦开始。我将仅在2个斑点中修改它并创建一个自定义操作员。

# fund_scraper_operator.py
import requests
from bs4 import BeautifulSoup
import json
# To turn into an operator, you just need to inherit the BaseOperator
from airflow.models.baseoperator import BaseOperator


class FundScraperOperator(BaseOperator):
    def __init__(self, url):
        self.url = url

    def web_call(self):  # Extract
        r = requests.get(self.url)
        if r.status_code == 200:
            soup = BeautifulSoup(r.content, "html.parser")
            return soup
        else:
            return r.status_code

    def get_fund_values(self, soup, index, class_name, replace_list):  # Transform
        fund_values = soup.find_all('div', class_=class_name)
        value = fund_values[index].contents
        for x in replace_list:
            value = value.replace(x, '')
        return str(value[0]).strip()

    def write_json(self, data, filename='data.json'):  # Load
        with open(filename, 'w') as f:
            json.dump(data, f, indent=4)
    # You will override "execute" in the BaseOperator with this.
    def execute(self):
        soup = self.web_call()
        data = {}
        data['shareprice'] = self.get_fund_values(
            soup, 4, 'fundHeader_value', ['$US', ','])
        data['u3o8_stock'] = self.get_fund_values(
            soup, 6, 'fundHeader_value', ['$US', ','])
        self.write_json(data)

就是这样!您会注意到原始脚本中唯一的更改是从气流导入BaseOperator类并继承它。然后,我们通过将其添加到所采用的参数来“扩展”它(它将从subepererator获取所有DAG/任务上下文,因此我们不能使用superipererator受保护的参数),并使用我们自己的execute方法覆盖execute方法。制作操作员非常容易。

在您的气流设置中,在dags目录旁边的Airflow Home中,将此自定义运算符放入名为plugins的目录中。就像dags一样,plugins将在您的pythonpath中,因此您可以将其导入这样的DAG:

from bs4 import BeautifulSoup
import pendulum
from datetime import timedelta

from airflow import DAG
from airflow.decorators import task
from fund_scraper_operator import FundScraperOperator


# Default args used when create a new dag
args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'schedule_interval': '@daily',
}

with DAG(
    dag_id='Functional_Sprott_Scraper',
    schedule_interval='5 20 * * 1-6',
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    default_args=args,
    render_template_as_native_obj=True,
    tags=['price', 'scraper']
) as dag:

    scrape_task = FundScraperOperator(
                    url='https://sprott.com/investment-strategies/physical-commodity-funds/uranium/')

气流社区非常充满活力,因此您会发现运营商可以与大量服务进行互动。想写信给S3吗?社区支持的钩子和操作员为此提供了支持。您要查询PSQL数据库吗?也有已编写的钩子/操作员。这就是您不想像本文中那样创建整体操作员的原因之一。在我们将任务分开/抽象的过程中,例如与抽象课程进行的任务之前,重要的是要谈论气流挂钩。

一如既往,代码在github here中升起。