在上一篇文章中,我们创建了一个非常简单的单个任务气流DAG来运行Sprott Scraper。但是,就像我们的功能性刮刀一样,DAG是一个技巧小马,不容易用于其他东西。为此,我们需要像在抽象类方法中使用提取对象,转换对象和加载对象那样将其分解。这次,每个对象成为一个任务,该任务由操作员在气流中执行。首先,让我们看一下如何从Python类中创建自定义操作员。
如果您在我制作功能性Sprott Scraper DAG后注意到了代码块,您会看到我用pythonoperator明确地将其替换为@task装饰器。您可以想象您可以将所有任务写入Python函数,但是如果您想像我们在Python Pipeline中所做的那样创建可重复使用的类,该怎么办?您可以非常轻松地创建自己的气流操作员!
让我们从几周前开始的班级刮擦开始。我将仅在2个斑点中修改它并创建一个自定义操作员。
# fund_scraper_operator.py
import requests
from bs4 import BeautifulSoup
import json
# To turn into an operator, you just need to inherit the BaseOperator
from airflow.models.baseoperator import BaseOperator
class FundScraperOperator(BaseOperator):
def __init__(self, url):
self.url = url
def web_call(self): # Extract
r = requests.get(self.url)
if r.status_code == 200:
soup = BeautifulSoup(r.content, "html.parser")
return soup
else:
return r.status_code
def get_fund_values(self, soup, index, class_name, replace_list): # Transform
fund_values = soup.find_all('div', class_=class_name)
value = fund_values[index].contents
for x in replace_list:
value = value.replace(x, '')
return str(value[0]).strip()
def write_json(self, data, filename='data.json'): # Load
with open(filename, 'w') as f:
json.dump(data, f, indent=4)
# You will override "execute" in the BaseOperator with this.
def execute(self):
soup = self.web_call()
data = {}
data['shareprice'] = self.get_fund_values(
soup, 4, 'fundHeader_value', ['$US', ','])
data['u3o8_stock'] = self.get_fund_values(
soup, 6, 'fundHeader_value', ['$US', ','])
self.write_json(data)
就是这样!您会注意到原始脚本中唯一的更改是从气流导入BaseOperator
类并继承它。然后,我们通过将其添加到所采用的参数来“扩展”它(它将从subepererator获取所有DAG/任务上下文,因此我们不能使用superipererator受保护的参数),并使用我们自己的execute
方法覆盖execute
方法。制作操作员非常容易。
在您的气流设置中,在dags
目录旁边的Airflow Home
中,将此自定义运算符放入名为plugins
的目录中。就像dags
一样,plugins
将在您的pythonpath中,因此您可以将其导入这样的DAG:
from bs4 import BeautifulSoup
import pendulum
from datetime import timedelta
from airflow import DAG
from airflow.decorators import task
from fund_scraper_operator import FundScraperOperator
# Default args used when create a new dag
args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'schedule_interval': '@daily',
}
with DAG(
dag_id='Functional_Sprott_Scraper',
schedule_interval='5 20 * * 1-6',
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
default_args=args,
render_template_as_native_obj=True,
tags=['price', 'scraper']
) as dag:
scrape_task = FundScraperOperator(
url='https://sprott.com/investment-strategies/physical-commodity-funds/uranium/')
气流社区非常充满活力,因此您会发现运营商可以与大量服务进行互动。想写信给S3吗?社区支持的钩子和操作员为此提供了支持。您要查询PSQL数据库吗?也有已编写的钩子/操作员。这就是您不想像本文中那样创建整体操作员的原因之一。在我们将任务分开/抽象的过程中,例如与抽象课程进行的任务之前,重要的是要谈论气流挂钩。
一如既往,代码在github here中升起。