我如何通过利用Apache Arrow生态系统来降低ETL成本
#python #data #etl #dataengineering

在数据工程领域,Apache Spark Framework是提取和处理数据的最著名和最有力的方法之一。
它是受信任的,一旦您设置了基础架构,也非常容易使用。
可以理解的是,大多数工程师都会为每个任务选择它。
但是,从很多方面来说,这可能是过度的。还有一个非常昂贵的。

我们的数据平台旅程

在我们的堆栈中,我们管理了我们团队构建的不同微服务的几个数据库。
我们还为两个由外部公司构建的平台提供了一些旧数据库。

为了为董事带来洞察力并启用数据驱动的决策,我们需要一种对所有董事进行分析查询的方法。
启用它需要我们运行ETL(提取,转换和加载脚本)才能将数据库从数据库中获取到我们的数据湖中。

在撰写本文时,我是负责我公司的大多数数据集成和提取的专业人员。
我也是我们的数据平台和基础架构的主要建筑师,我将解释它如何发展为我们今天拥有的。

提取工作:ETL的外观

ETL (Extract Transform Load) diagram
ETL(提取转换负载)图

我们的ETL系统的第一次迭代使用AWS胶水作业服务。
由于它是Apache Spark的无服务器产品(顶部有一些自定义),因此很容易开始。

这些提取将从数据库中获取所有数据,并将所有数据保存为Apache Parquet文件中的S3存储桶,将数据库分为文件夹,并将其表分为子文件夹。
我们使用AWS Athena(类似于开源Presto/Trino Project)在所有这些文件上运行SQL查询,就好像它是一个大数据软件。

在堆栈结束时,我们将Metabase作为我们的可视化工具,用于制作美丽的仪表板。
最后两个仍然是我们堆栈的一部分。我们已经测试了不同的项目,例如Dremio和Trino来运行查询,以及Apache Superset viz,但最终坚持了我们的第一个选择。
我最更改的是我们如何运行提取脚本。

第一个更改是将一些ETL迁移到AWS EMR(托管Apache Spark簇)。
这些脚本变得太复杂了,无法在胶水上运行,在EMR上,我们可以按照我们的意愿扩展集群。
运行它们的成本也是迁移的一个很好的理由,因为AWS胶水比其他替代品要贵得多。

第二步是放弃亚马逊管理这些簇的帮助,并在kubernetes上自己做。
我使用Spark spark-on-k8s-operator来运行我们的火花工作。
它使他们更快地(EMR的启动时间很慢),更便宜且更易于管理。
使用该操作员,将作业作为Kubernetes自定义资源提交,并且操作员创建了所有必要的pod来运行脚本。

到目前为止,我们一直在编写Apache Spark脚本,只是更改运行方式和何处。最后一步是不同的。
使用Apache箭头和简单的容器,我使大多数旧提取都过时了。

Apache Spark VS Apache Arrow(不等效)

Apache Spark是为分布式工作制作的。
为此,它通常是在一个或几个驱动程序/主节点的集群中设置的,至少有几个执行器节点。
当您需要使用大型数据集时,这真是太神奇了,并且它们无法适应一台价格合理的机器的记忆。
但是总会有一个缺点。即使是这种情况,分发处理也不总是完美的。
可能有必要在节点之间共享数据,这会导致许多网络流量。
有些操作只需要数据才能存储在内存中。

在另一种情况下,当工作量不大时,分发它不会产生真正的收益。
由于多种类型的开销(如同步和运输),它很可能会伤害它。
您可以在没有群集的情况下运行Apache Spark,但不是为此制作的。
因此,我决定测试一些新项目,这些项目将在不分发的情况下完成我需要的事情。

我用来创建新提取的工具是PolarsConnectorX
Polars是一个数据框库,例如Pandas,但使用Apache Arrow数据模型在Rust中实现。
它是一个柱状数据模型,它是由任何语言实现的。
但这不是令人印象深刻的部分:您可以在完全不同的代码库(例如Rust,Java和Python)之间共享数据,而无需序列化甚至重新分配。
如果您可以在内存中访问相同的空间,则可以与箭头生态系统一起使用。
当我说Spark Shuffles在网络上引起大量流量时,它还需要在接收端发送和进行序列化之前序列化所有这些数据。
这浪费了很多时间和资源。

代码时间

Connectorx已集成到Porars中,如果两者都安装,则可以调用polars.read_sql
我会使用它的dienclty:

import connectorx as cx

arrow_table = cx.read_sql(
    query="select * from table_x",
    conn="postgres://user:pass@host:port/database",
    return_type="arrow2",
    protocol="binary",
)

我的示例使用Postgres,但是ConnectOrx支持其他几个数据库。
此功能的输出是一个apache箭头表。
arrow2表示它使用非正式的生锈实施Arrow2
Rust中还有官方的Apache Arrow实现,Pyarrow使用的C ++实现。

下一个呼叫指示polar读取箭头表存储空间。正如文档所说:

此操作在大多数情况下将是零副本。
可以将不受Polars支持的类型投入到最接近的支持类型。

import polars as pl
df = pl.from_arrow(arrow_table)

将其加载到Porars中后,您可以随意操纵此数据。
可以选择将其变成懒惰的操作,就像Apache Spark一样。
这非常有用,因为它允许Porars在可能的情况下优化查询计划。
这也使流媒体用于大型内存操作。

在此示例中,我将简单地将其写入文件。
这可以完成本地路径,或几乎所有目的地fsspec支持。
例如,一个有效的路径可以是s3://bucket/database/folder/
如果您不指定文件名,它将生成随机的名称。如果要保留单个文件或要替换现有文件,请确保指定文件名。

df.write_parquet("output.snappy.parquet", compression:"snappy")

也可以使用pyarrow做到这一点。
正如我之前说的,Pyarrow使用箭头的C ++实现。
但是数据可以在无需序列化甚至内存复制的情况下无缝流动。

import pyarrow.parquet as pq

pq.write_table(
    df.to_arrow(),
    where="output.snappy.parquet",
    compression="snappy",
)

我在Github中创建了一个示例存储库,将所有这些都放在一起。检查一下!

github.com/auyer/polars-extraction

结论

Apache Spark是建立复杂ETL的既定框架。
但是它后面带有沉重的JVM堆栈。
正如我们在这里讨论的那样,如果您担心成本(应该是),那么对于小数据集来说,这不是一个不错的选择。

Apache Arrow生态系统正在增长。
它还不能替代火花,但是有一天我敢打赌。
但是,在做现在可以做的事情时,它可以更快地完成并消耗较少的机器资源。
几乎每周都将新功能实现为极点,因此很快将成为数据帧的无处不在工具。

connectorx是这一成功的重要作用。它没有使我完全替换我的火花所需的所有功能,因为它不支持所有Postgres类型。
我对少数(例如Enum and Ltree)实施了支持,但其他人仍然缺少诸如字符串数组。它可以从社区那里得到更多的爱。

希望这篇文章值得一读!谢谢!