本文指导数据实践者如何设置Kedro项目使用新的SparkStreaming
KEDRO数据集(示例用例),并深入研究某些设计注意事项。它是针对熟悉Kedro的数据从业者的,因此我们不会涵盖项目的基础知识,但是您可以在Kedro documentation中熟悉它们。
什么是Kedro?
Kedro是一种开源Python工具箱,将软件工程原理应用于数据科学代码。它使团队更容易将软件工程原则应用于数据科学法规,这减少了重写数据科学实验所花费的时间,因此它们适合生产。
Kedro出生于Quantumblack,以解决数据科学项目定期面临的挑战,并通过标准化的团队工作流程促进团队合作。它现在由LF AI & Data Foundation作为一个孵化项目主持。
什么是Kedro数据集?
Kedro datasets是用于阅读和加载数据的抽象,旨在将这些操作与您的业务逻辑解除。这些数据集管理了来自各种来源的阅读和编写数据,同时还确保了一致性,跟踪和版本化。它们允许用户保持专注于核心数据处理,将数据I/O任务留给Kedro。
什么是火花结构化流?
Spark Structured Streaming建立在Spark SQL发动机上。您可以按照静态数据表达批处理计算的方式表达流计算,并且Spark SQL引擎将逐步和连续运行并更新最终结果,因为流数据继续到达。
。整合Kedro和Spark结构化流媒体
kedro在您自己的工作流程中很容易扩展,本文说明了添加新功能的方法之一。为了使Kedro能够与Spark结构化流媒体合作,QuantumBlack Labs中的一个团队开发了新的Spark Streaming Dataset,因为现有的Kedro Spark数据集与Spark Streaming用例不兼容。为了确保无缝流的流媒体,新数据集具有检查点位置规范,以避免在流式用例中进行数据重复,并且在_save
方法末尾使用.start()
来启动流。
设置一个将Kedro与Spark结构化流的项目
该项目使用Kedro数据集构建一个结构化数据管道,该数据管线可以实时读取和写入Spark结构化流和过程数据流的数据流。您需要在Kedro项目中添加两个单独的钩子,以使其能够充当流式应用程序。
集成涉及以下步骤:
- 创建一个Kedro项目。
- 注册必要的pyspark和与流相关的钩子。
- 在
catalog.yml
文件中配置自定义数据集,以定义流源和接收器。 - 使用Kedro的新dataset for Spark Structured Streaming存储在火花流过程中生成的中间数据框。
创建一个Kedro项目
确保您安装了大于0.18.9版本的Kedro版本,而kedro-datasets
大于1.4.0。
pip install kedro~=0.18.0 kedro-datasets~=1.4.0
使用Kedro pyspark
启动器创建一个新的Kedro项目:
kedro new --starter=pyspark
注册必要的pyspark和与流有关的钩子
要使用多个流节点,需要两个钩子。首先是集成Pyspark:有关详细信息,请参见Build a Kedro pipeline with PySpark。除非发生例外,否则您还需要一个挂钩来运行流媒体查询而无需终止。
将以下代码添加到src/$your_kedro_project_name/hooks.py
:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from kedro.framework.hooks import hook_impl
class SparkHooks:
@hook_impl
def after_context_created(self, context) -> None:
"""Initialises a SparkSession using the config
defined in project's conf folder.
"""
# Load the spark configuration in spark.yaml using the config loader
parameters = context.config_loader.get("spark*", "spark*/**")
spark_conf = SparkConf().setAll(parameters.items())
# Initialise the spark session
spark_session_conf = (
SparkSession.builder.appName(context._package_name)
.enableHiveSupport()
.config(conf=spark_conf)
)
_spark_session = spark_session_conf.getOrCreate()
_spark_session.sparkContext.setLogLevel("WARN")
class SparkStreamsHook:
@hook_impl
def after_pipeline_run(self) -> None:
"""Starts a spark streaming await session
once the pipeline reaches the last node.
"""
spark = SparkSession.builder.getOrCreate()
spark.streams.awaitAnyTermination()
在src/$your_kedro_project_name/settings.py
中注册钩子:
"""Project settings. There is no need to edit this file unless you want to change values
from the Kedro defaults. For further information, including these default values, see
https://kedro.readthedocs.io/en/stable/kedro_project_setup/settings.html."""
from .hooks import SparkHooks, SparkStreamsHook
HOOKS = (SparkHooks(), SparkStreamsHook())
# Instantiated project hooks.
# from streaming.hooks import ProjectHooks
# HOOKS = (ProjectHooks(),)
# Installed plugins for which to disable hook auto-registration.
# DISABLE_HOOKS_FOR_PLUGINS = ("kedro-viz",)
# Class that manages storing KedroSession data.
# from kedro.framework.session.shelvestore import ShelveStore
# SESSION_STORE_CLASS = ShelveStore
# Keyword arguments to pass to the `SESSION_STORE_CLASS` constructor.
# SESSION_STORE_ARGS = {
# "path": "./sessions"
# }
# Class that manages Kedro's library components.
# from kedro.framework.context import KedroContext
# CONTEXT_CLASS = KedroContext
# Directory that holds configuration.
# CONF_SOURCE = "conf"
# Class that manages how configuration is loaded.
# CONFIG_LOADER_CLASS = ConfigLoader
# Keyword arguments to pass to the `CONFIG_LOADER_CLASS` constructor.
# CONFIG_LOADER_ARGS = {
# "config_patterns": {
# "spark" : ["spark*/"],
# "parameters": ["parameters*", "parameters*/**", "**/parameters*"],
# }
# }
# Class that manages the Data Catalog.
# from kedro.io import DataCatalog
# DATA_CATALOG_CLASS = DataCatalog
�如何设置您的Kedro项目以读取流源的数据
设置项目后,您可以使用新的Kedro Spark streaming dataset。您需要在conf/base/catalog.yml
中配置数据目录,如下所示,要从流json文件中读取:
raw_json:
type: spark.SparkStreamingDataSet
filepath: data/01_raw/stream/inventory/
file_format: json
可以通过load_args
键配置其他选项。
int.new_inventory:
type: spark.SparkStreamingDataSet
filepath: data/02_intermediate/inventory/
file_format: csv
load_args:
header: True
�如何设置您的Kedro项目以将数据写入流式水槽
所有其他参数都可以在save_args
键下保存:
processed.sensor:
type: spark.SparkStreamingDataSet
file_format: csv
filepath: data/03_primary/processed_sensor/
save_args:
output_mode: append
checkpoint: data/04_checkpoint/processed_sensor
header: True
请注意,当您使用Kafka格式时,应将各自的软件包添加到spark.yml
configuration中:
spark.jars.packages: org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1
设计注意事项
管道设计
为了从Spark的内部查询优化中受益,我们建议将任何临时数据集存储为存储器数据集。
所有流都同时启动,因此任何对写入文件接收器的节点依赖的节点(即该节点的输入是另一个节点的输出),在第一个运行中都会失败。这是因为文件接收器中没有文件启动时处理的文件。
我们建议您要么将中间数据集保存在内存中,要么将处理分解为两个管道,然后从触发第一个管道来构建一些初始历史记录。
功能创建
请注意,窗口操作仅允许在时间列上窗口。
必须为连接定义水印。仅允许某些类型的连接,这些连接取决于文件类型(流式,流静态),这有时使多个表的连接有些复杂。有关加入类型和水印的更多信息或建议,请查看PySpark documentation或在Kedro Slack workspace上伸出援手。
登录
启动时,Kedro管道将下载Spark Kafka所需的罐子。第一次运行后,它不会再次下载文件,而只需从存储之前下载的文件中检索它。
对于每个节点,将显示以下日志:加载数据,运行节点,保存数据,从y任务中完成x。
完成的日志并不意味着该节点中的流处理已停止。这意味着已经创建了火花计划,如果将输出数据集保存到水槽中,则该流已经启动。
一旦Kedro贯穿了所有节点并创建了完整的Spark执行计划,您将看到INFO Pipeline execution completed successfully
。
这并不意味着随着后运行钩使Spark Session保持活力,流处理已经停止。随着新数据的到来,即使完成“管道执行”日志,也将显示新的火花日志。
如果输入数据中存在错误,则火花错误日志将通过,Kedro将关闭SparkContext和其中的所有流。
总而言之
在本文中,我们解释了如何通过构建新数据集来创建流式管道来利用扩展Kedro的方法之一。我们使用Kedro Kude5starter创建了一个新的Kedro项目,并说明了如何使用钩子,将其添加到Kedro项目中,以使其能够充当流媒体应用程序。然后,数据集很容易通过Kedro数据目录进行配置,从而可以使用新数据集,定义流源和接收器。
目前存在一些限制,因为它尚未准备好与服务经纪人一起使用,例如kafka,需要一个附加的罐装包。
如果您想了解更多有关扩展Kedro的方法,请查看advanced Kedro documentation的更多信息,以获取有关Kedro插件,数据集和挂钩的更多信息。
贡献者
这篇文章是由Tingting Wan,Tom Kurian和Haris Michailidis创建的,它们都是QuantumBlack, AI by McKinsey的伦敦办公室的数据工程师。