使用Sparksnake改善AWS的ETL工作
#python #分析 #spark #etl

您是否曾经考虑过拥有一堆火花功能和代码块,以在您在胶水和EMR等AWS服务中开发Spark应用程序的所有旅程中进行改进?在本文中,我将向您介绍koude0作为功能强大的Python软件包,因为在AWS上进行了Spark应用程序开发的游戏。

Sparksnake背后的想法

要了解将sparksnake栩栩如生的主要原因,让我们首先快速查看胶合板代码,无论在AWS控制台上创建新工作时,都会出现:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

现在让我向您展示不同级别的胶水用户的两个简单观点。

  • 初学者:可以合理地说,上面的块代码不是我们每天在胶水外看到的东西,对吗?因此,对于第一次尝试胶水的人来说,会有有关GlueContext,Job和特殊方法等元素的疑问。

  • 经验丰富的开发人员:即使对于这个组,“胶水设置”也可能是痛苦的(特别是如果您每次开始启动新的工作开发时需要这样做)。 /p>

因此,sparksnake背后的主要思想是在使用AWS服务开发的SPARK应用程序中采取所有共同的步骤,并将其封装在使用单个代码行的用户可以调用的类和方法上。换句话说,上面显示的所有样板代码将在sparksnake中替换为:

# Importing sparksnake's main class
from sparksnake.manager import SparkETLManager

# Initializing a glue job
spark_manager = SparkETLManager(mode="glue")
spark_manager.init_job()

这只是sparksnake中可用的一系列功能之一!它的伟大是能够调用方法和功能在作业中使用Spark共同功能的能力,无论您是在胶水和EMR等AWS服务还是本地运行。

A simple diagram showing how sparksnake package inherits features from AWS services like Glue and EMR to provide users a custom experience

库结构

sparksnake上快速概述之后,重要的是要了解图书馆在引擎盖下的结构。

这次,包装上有两个模块:

  • manager:中央模块,托管具有共同火花功能的SparkETLManager类。它根据用户选择的操作模式继承其他类的功能
  • glue:侧模块,托管GlueJobManager类,具有胶水作业中使用的特殊功能

在常见的用法模式中,用户会导入SparkETLManager类,并根据开发和部署Spark应用程序的位置选择操作模式。此操作模式指导SparkETLManager类从AWS服务等AWS服务中继承功能,以为用户提供自定义体验。

特征

现在,您对Sparksnake的主要概念有了更多的了解,让我们总结一下它的一些功能:

ð昆 ð使用常见的火花操作使用自定义类和方法来改进ETL步骤
•无需过多地思考硬且复杂的服务设置(例如,使用Sparksnake,您可以拥有所有代码的AWS上的胶合作业的所有元素)
ð管理可观察性改进,并在CloudWatch中使用详细的日志消息
ð的异常处理已经嵌入了库方法

快速启动

要开始使用sparksnake,只需使用pip安装它:

pip install sparksnake

现在说,例如,我们正在在AWS上开发新的胶水工作,我们想使用sparksnake使事情变得更容易。为了提供一个有用的示例,说明库的功能强大,请想象我们有一系列数据源需要阅读到工作中。编写多行代码以读取目录的每个数据源会非常痛苦。

使用sparksnake,我们可以使用一行代码从目录中读取多个数据源:

# Generating a dictionary of Spark DataFrames from catalog
dfs_dict = spark_manager.generate_dataframes_dict()

# Indexing to get individual DataFrames
df_orders = dfs_dict["orders"]
df_customers = dfs_dict["customers"]

以及在数据目录上编写有关S3并将其分类的数据呢?不用担心,也可以使用一行代码来完成:

# Writing data on S3 and cataloging on Data Catalog
spark_manager.write_and_catalog_data(
    df=df_orders,
    s3_table_uri="s3://bucket-name/table-name",
    output_database_name="db-name",
    output_table_name="table-name",
    partition_name="partition-name",
    output_data_format="data-format" # e.g. "parquet"
)

再次,这些仅是图书馆上已经可用的一系列功能的两个示例,本文的编写是为所有用户提供了一种不同的学习方式,以学习和提高AWS中Spark应用程序的技能。

了解更多

有一些有关Sparksnake的有用链接和文档。在以下查看: