用于火花结构化流的新的Kedro数据集
#python #流媒体 #spark #kedro

本文指导数据实践者如何设置Kedro项目使用新的SparkStreaming KEDRO数据集(示例用例),并深入研究某些设计注意事项。它是针对熟悉Kedro的数据从业者的,因此我们不会涵盖项目的基础知识,但是您可以在Kedro documentation中熟悉它们。

什么是Kedro?

Kedro是一种开源Python工具箱,将软件工程原理应用于数据科学代码。它使团队更容易将软件工程原则应用于数据科学法规,这减少了重写数据科学实验所花费的时间,因此它们适合生产。

Kedro出生于Quantumblack,以解决数据科学项目定期面临的挑战,并通过标准化的团队工作流程促进团队合作。它现在由LF AI & Data Foundation作为一个孵化项目主持。

什么是Kedro数据集?

Kedro datasets是用于阅读和加载数据的抽象,旨在将这些操作与您的业务逻辑解除。这些数据集管理了来自各种来源的阅读和编写数据,同时还确保了一致性,跟踪和版本化。它们允许用户保持专注于核心数据处理,将数据I/O任务留给Kedro。

什么是火花结构化流?

Spark Structured Streaming建立在Spark SQL发动机上。您可以按照静态数据表达批处理计算的方式表达流计算,并且Spark SQL引擎将逐步和连续运行并更新最终结果,因为流数据继续到达。

整合Kedro和Spark结构化流媒体

kedro在您自己的工作流程中很容易扩展,本文说明了添加新功能的方法之一。为了使Kedro能够与Spark结构化流媒体合作,QuantumBlack Labs中的一个团队开发了新的Spark Streaming Dataset,因为现有的Kedro Spark数据集与Spark Streaming用例不兼容。为了确保无缝流的流媒体,新数据集具有检查点位置规范,以避免在流式用例中进行数据重复,并且在_save方法末尾使用.start()来启动流。

设置一个将Kedro与Spark结构化流的项目

该项目使用Kedro数据集构建一个结构化数据管道,该数据管线可以实时读取和写入Spark结构化流和过程数据流的数据流。您需要在Kedro项目中添加两个单独的钩子,以使其能够充当流式应用程序。

集成涉及以下步骤:

  1. 创建一个Kedro项目。
  2. 注册必要的pyspark和与流相关的钩子。
  3. catalog.yml文件中配置自定义数据集,以定义流源和接收器。
  4. 使用Kedro的新dataset for Spark Structured Streaming存储在火花流过程中生成的中间数据框。

创建一个Kedro项目

确保您安装了大于0.18.9版本的Kedro版本,而kedro-datasets大于1.4.0。

pip install kedro~=0.18.0 kedro-datasets~=1.4.0

使用Kedro pyspark启动器创建一个新的Kedro项目:

kedro new --starter=pyspark

注册必要的pyspark和与流有关的钩子

要使用多个流节点,需要两个钩子。首先是集成Pyspark:有关详细信息,请参见Build a Kedro pipeline with PySpark。除非发生例外,否则您还需要一个挂钩来运行流媒体查询而无需终止。

将以下代码添加到src/$your_kedro_project_name/hooks.py

from pyspark import SparkConf
from pyspark.sql import SparkSession

from kedro.framework.hooks import hook_impl


class SparkHooks:
    @hook_impl
    def after_context_created(self, context) -> None:
        """Initialises a SparkSession using the config
        defined in project's conf folder.
        """

        # Load the spark configuration in spark.yaml using the config loader
        parameters = context.config_loader.get("spark*", "spark*/**")
        spark_conf = SparkConf().setAll(parameters.items())

        # Initialise the spark session
        spark_session_conf = (
            SparkSession.builder.appName(context._package_name)
            .enableHiveSupport()
            .config(conf=spark_conf)
        )

        _spark_session = spark_session_conf.getOrCreate()
        _spark_session.sparkContext.setLogLevel("WARN")


class SparkStreamsHook:
    @hook_impl
    def after_pipeline_run(self) -> None:
        """Starts a spark streaming await session
        once the pipeline reaches the last node.
        """

        spark = SparkSession.builder.getOrCreate()
        spark.streams.awaitAnyTermination()

src/$your_kedro_project_name/settings.py中注册钩子:

"""Project settings. There is no need to edit this file unless you want to change values
from the Kedro defaults. For further information, including these default values, see
https://kedro.readthedocs.io/en/stable/kedro_project_setup/settings.html."""

from .hooks import SparkHooks, SparkStreamsHook

HOOKS = (SparkHooks(), SparkStreamsHook())

# Instantiated project hooks.
# from streaming.hooks import ProjectHooks
# HOOKS = (ProjectHooks(),)

# Installed plugins for which to disable hook auto-registration.
# DISABLE_HOOKS_FOR_PLUGINS = ("kedro-viz",)

# Class that manages storing KedroSession data.
# from kedro.framework.session.shelvestore import ShelveStore
# SESSION_STORE_CLASS = ShelveStore
# Keyword arguments to pass to the `SESSION_STORE_CLASS` constructor.
# SESSION_STORE_ARGS = {
#     "path": "./sessions"
# }

# Class that manages Kedro's library components.
# from kedro.framework.context import KedroContext
# CONTEXT_CLASS = KedroContext

# Directory that holds configuration.
# CONF_SOURCE = "conf"

# Class that manages how configuration is loaded.
# CONFIG_LOADER_CLASS = ConfigLoader
# Keyword arguments to pass to the `CONFIG_LOADER_CLASS` constructor.
# CONFIG_LOADER_ARGS = {
#       "config_patterns": {
#           "spark" : ["spark*/"],
#           "parameters": ["parameters*", "parameters*/**", "**/parameters*"],
#       }
# }

# Class that manages the Data Catalog.
# from kedro.io import DataCatalog
# DATA_CATALOG_CLASS = DataCatalog

�如何设置您的Kedro项目以读取流源的数据

设置项目后,您可以使用新的Kedro Spark streaming dataset。您需要在conf/base/catalog.yml中配置数据目录,如下所示,要从流json文件中读取:

raw_json:
  type: spark.SparkStreamingDataSet
  filepath: data/01_raw/stream/inventory/
  file_format: json

可以通过load_args键配置其他选项。

int.new_inventory:
   type: spark.SparkStreamingDataSet
   filepath: data/02_intermediate/inventory/
   file_format: csv
   load_args:
     header: True

�如何设置您的Kedro项目以将数据写入流式水槽

所有其他参数都可以在save_args键下保存:

processed.sensor:
   type: spark.SparkStreamingDataSet
   file_format: csv
   filepath: data/03_primary/processed_sensor/
   save_args:
     output_mode: append
     checkpoint: data/04_checkpoint/processed_sensor
     header: True

请注意,当您使用Kafka格式时,应将各自的软件包添加到spark.yml configuration中:

spark.jars.packages: org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1 

设计注意事项

管道设计

为了从Spark的内部查询优化中受益,我们建议将任何临时数据集存储为存储器数据集。

所有流都同时启动,因此任何对写入文件接收器的节点依赖的节点(即该节点的输入是另一个节点的输出),在第一个运行中都会失败。这是因为文件接收器中没有文件启动时处理的文件。

我们建议您要么将中间数据集保存在内存中,要么将处理分解为两个管道,然后从触发第一个管道来构建一些初始历史记录。

功能创建

请注意,窗口操作仅允许在时间列上窗口。

必须为连接定义水印。仅允许某些类型的连接,这些连接取决于文件类型(流式,流静态),这有时使多个表的连接有些复杂。有关加入类型和水印的更多信息或建议,请查看PySpark documentation或在Kedro Slack workspace上伸出援手。

登录

启动时,Kedro管道将下载Spark Kafka所需的罐子。第一次运行后,它不会再次下载文件,而只需从存储之前下载的文件中检索它。

Spark logging

对于每个节点,将显示以下日志:加载数据,运行节点,保存数据,从y任务中完成x。

完成的日志并不意味着该节点中的流处理已停止。这意味着已经创建了火花计划,如果将输出数据集保存到水槽中,则该流已经启动。

Spark logging

一旦Kedro贯穿了所有节点并创建了完整的Spark执行计划,您将看到INFO Pipeline execution completed successfully

这并不意味着随着后运行钩使Spark Session保持活力,流处理已经停止。随着新数据的到来,即使完成“管道执行”日志,也将显示新的火花日志。

Spark logging

如果输入数据中存在错误,则火花错误日志将通过,Kedro将关闭SparkContext和其中的所有流。

Spark logging

总而言之

在本文中,我们解释了如何通过构建新数据集来创建流式管道来利用扩展Kedro的方法之一。我们使用Kedro Kude5starter创建了一个新的Kedro项目,并说明了如何使用钩子,将其添加到Kedro项目中,以使其能够充当流媒体应用程序。然后,数据集很容易通过Kedro数据目录进行配置,从而可以使用新数据集,定义流源和接收器。

目前存在一些限制,因为它尚未准备好与服务经纪人一起使用,例如kafka,需要一个附加的罐装包。

如果您想了解更多有关扩展Kedro的方法,请查看advanced Kedro documentation的更多信息,以获取有关Kedro插件,数据集和挂钩的更多信息。

贡献者

这篇文章是由Tingting WanTom KurianHaris Michailidis创建的,它们都是QuantumBlack, AI by McKinsey的伦敦办公室的数据工程师。