揭开Apache箭头的神秘面纱
#python #datascience #arrow #dataframes

这篇文章是我最初发表的in the Orchest blog的改编。

序言:pandas很棒,但还不够

pandas是由韦斯·麦金尼(Wes McKinney)于2008年创建的,作为他的数据分析任务的“ Skunkworks项目”,从那时起,它已成长为one of the key enablers of the Python growth in the Data Science industry。其功能强大的CSV阅读功能,类似SQL的聚合和分组功能,其丰富的时间序列处理方法以及与Jupyter的集成使Pandas成为任何数据科学家工具的必不可少的工具。


In [1]: import pandas as pd  

In [2]: df = pd.read_csv("...")  

In [3]: df.head()

Growth of pandas in Stack Overflow compared to other Python-related tags

与其他与Python相关的标签(source)相比,堆栈溢出中熊猫的生长

但是,正如其自己的创建者所描述的那样,Pandas有一些无法轻易修改的设计缺陷,并且存在一些缺点,这些缺点将其适用性限制在中小型数据集中。 This blog post描述了其中一些局限性,可以总结如下:

(1)许多熊猫的操作不利用多个核心或查询计划

pandas并未考虑到大数据集并使用急切的评估模型,因此,复杂的链式操作创建了许多中间对象,在某些情况下可能很大。另一方面,即使最近有一些努力来利用pandas,the results are somewhat heterogeneous的多核心,并且在许多情况下,熊猫一定会遇到Python的全球口译员锁,这会强制执行只有一个线程可以在给定时间访问CPU。<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< /p>

(2)糟糕的内存管理

有几种处理丢失数据的方法,and each one has its tradeoffs:Pandas使用Sentinel值的选择可以减少所需的内存量,但同时又引入了现在正在解决的数据类型的小小的不一致之处,并且使其变得更加困难,并且使其变得更加困难向CPU应用矢量化数学操作。另一方面,缺乏对内存映射的支持(以下更多内容)以及处理字符串和类别的方式也降低了某些操作的效率。

由于这些限制,几个库试图将熊猫扩展到更大的工作量或创建更快,更有效的替代方案。

揭开Apache箭头的神秘面纱

Apache Arrow(简称箭头)是一个开源项目,将自己定义为“独立于语言的柱状记忆格式”(稍后再详细介绍)。它是Apache软件基金会的一部分,因此受几个利益相关者的社区的约束。它具有implementations in several languages(C ++,还有Rust,Julia,Go,甚至JavaScript),以及用于Python,Râ和其他包含C ++实施的其他绑定。

它的创作者之一是韦斯·麦金尼本人,所以Python is one of the main targets of Arrow

也就不足为奇了。

但是...箭头到底是什么?

围绕箭头周围有widespread confusion,它与诸如Parquet之类的东西进行了比较。该主题值得一提。

箭头定义了两个二进制表示:箭头IPC流式格式箭头IPCâ文件(或随机访问) - 格式。对前者进行了优化,以处理任意长度的批次(因此“流”),而后者则需要固定数量的批次,而后者又支持寻求操作(因此“随机访问”)。

鉴于这些有点令人困惑的名称,必须坚持哪个箭头不是:

  • 箭头是不是文件格式。当我们谈论文件格式时,我们通常会考虑存储在磁盘上的东西,而箭头就是“运行时内存中表示”。箭头文件格式和箭头流格式都没有定义编码规则以保存磁盘上的数据。相反,将它们视为您在代码中使用的数据结构
  • 箭头是不是用于长期存储的。相反,它是用于临时瞬态,内存存储的。

您可能会在想:``如果箭头不是文件格式而设计为内存表示形式,那么 一个人如何序列化或将某些箭头数据存储在磁盘上?为此,有两个主要选择:

  • Apache Parquet(简称镶木材),如今,它是将柱状数据存储在磁盘上的行业标准。它以高效率压缩数据,并提供快速的读写速度。作为written in the Arrow documentation,“箭头是使用镶木quet文件读取或写入数据的理想的内存传输层”。
  • Feather File Format(简称羽毛),它编码箭头IPC文件格式(有限,与无尽的流不同)。像镶木quet一样,羽毛文件也被压缩并针对柱状数据进行了优化。

羽毛格式是在箭头旁边创建的,如今,它提供了体面的压缩(尽管木板文件通常较小),并且读写速度(甚至比Parquet更好)。另一方面,镶木木格式的采用更广泛,并且更可互操作。如果您不确定哪个是最好的,并且不担心尽可能多地挤压速度,则可以安全选择Parquet。

File size of Feather vs other file formats

羽毛的文件大小与其他文件格式(source

Pyarrow的第一步

要安装箭头的python绑定,您可以使用conda/mamba或pip:

$ mamba install "pyarrow=7.0"

让我们与经典的NYC Taxi dataset合作。我们之所以选择2015年,是因为这些文件很大,虽然还太老了(沿途有一些模式更改)。从终端下载相关文件:

$ wget -v "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2015-01.csv" -O "/data/yellow_tripdata_2015-01.csv"

现在,您已经准备好将CSV数据读为箭头:

import pyarrow as pa  
from pyarrow import csv  

nyc = csv.read_csv("/data/yellow_tripdata_2015-01.csv")  
print(len(nyc))

A PyArrow table with its schema

带有模式的Pyarrow表

请注意,数据集包含超过1200万行。让我们检查模式并计算存储此数据需要多少ramâ:

In [5]: nyc.schema  
Out [5]:  
VendorID: int64  
tpep_pickup_datetime: timestamp[s]  
tpep_dropoff_datetime: timestamp[s]  
passenger_count: int64  
trip_distance: double  
pickup_longitude: double  
pickup_latitude: double  
RateCodeID: int64  
store_and_fwd_flag: string  
dropoff_longitude: double  
dropoff_latitude: double  
payment_type: int64  
fare_amount: double  
extra: double  
mta_tax: double  
tip_amount: double  
tolls_amount: double  
improvement_surcharge: double  
total_amount: double  
  
In [6]: print("RSS (RAM): {}MB".format(pa.total_allocated_bytes() >> 20))  
RSS (RAM): 1812MB

整个CSV在磁盘上需要超过2.5Gbâ,但在内存中只有1.8Gbâ。还观察到两列自动被检测为时间戳,这与pandas.read_csv的默认行为背道而驰。

箭头的read_csv函数返回了一个Table对象,该对象包含列的集合。这些列中的每一列都是ChunkedArray,这是箭头中可用的众多数组类型之一:

In [7]: nyc["trip_distance"]  
Out [7]:  
<pyarrow.lib.ChunkedArray object at 0x7f2cec1023b0>  
[  
 [  
   1.59,  
   3.3,  
   1.8,  
   0.5,  
   3,  
   9,  
   2.2,  
   0.8,  
   18.2,  
   0.9,  
   ...

像pandas dataframes一样,按列索引作品。要选择多个列,您可以使用 .select 方法:

In [8]: nyc.select(["trip_distance", "total_amount"])  
Out [8]: pyarrow.Table  
trip_distance: double  
total_amount: double  
----  
trip_distance: [[1.59,3.3,1.8,0.5,3,9,2.2,0.8,18.2,0.9,...  
total_amount: [[17.05,17.8,10.8,4.8,16.3,40.33,15.3,9.96,58.13,9.35,...

和切成特定的行,您可以使用.slice.take方法:

nyc.slice(100, 3).to_pandas()  
nyc.take([100, 101, 102]).to_pandas()

熊猫有一些差异

箭头以一些有趣的方式从熊猫出发,这些方式在首次使用时立即引起了注意。最重要的是,数据是不变的:

nyc["trip_distance"] = 0  # Raises TypeError!

As stated in the documentation,“许多箭头对象是不变的:一旦构造,它们的逻辑属性就无法更改。这使得可以在多线程的场景中使用它们,而无需乏味且容易发生同步”。

>

>

但是,有多种方法可以有效地将行添加到现有表中:如果这两个表的模式相同,则pyarrow.concat_tables将执行零拷贝的串联:

pa.concat_tables(  
    [nyc.slice(1_000, 3), nyc.slice(2_000, 3)]  
).to_pandas()

另一个有趣的区别是如何处理丢失值:pandas现在正在尝试基于掩码的方法,但是在箭头中,它们从一开始就在那里。此外,由于箭头阵列存储了缺少值的数量,因此,基础代码可以跳过一些检查,如果不丢失值:

In [12]: nyc["tip_amount"].null_count  
Out [12]: 0

箭头很棒

箭头最有趣的功能是处理memory-mapped files的能力。这允许箭头读取比可用的RAM大的数据集,而不会产生任何额外的费用。

例如,您可以从上面的部分内存映射相同的CSV文件:

In [13]: mmap = pa.memory_map("/data/yellow_tripdata_2015-01.csv")

并验证分配了0个字节:

In [14]: print("RSS: {} MB".format(pa.total_allocated_bytes() >> 20))  
RSS: 0 MB

可以分批读取此内存映射的文件,因此您无需将文件的所有内容加载到内存中:

from pyarrow.csv import open_csv  

# Create a CSVStreamingReader from the memory-mapped file  
reader = open_csv(mmap)  

# Iterate over all the batches of the file  
reader.read_next_batch().to_pandas()

可能的用例是将巨大的CSV文件通过批量转换为Parquet,如下所示:

import pyarrow.parquet as pq  
  
# "Rewind" the CSV file  
mmap.seek(0)  
reader = open_csv(mmap)  
  
# Open parquet file for writing with same schema as the CSV file  
with pq.ParquetWriter("/data/yellow_tripdata_2015-01.parquet", reader.schema) as writer:  
   while True:  
       try:  
           batch = reader.read_next_batch()  
           writer.write_batch(batch)  
       except StopIteration:  
           break  

# Load data directly from Parquet  
reloaded_nyc = pq.read_table("/data/yellow_tripdata_2015-01.parquet")

实际上,箭头支持从任意文件样对象的读取和编写数据批次,该对象可以是磁盘,插座或内存对象上的文件:

import io  

buf = io.BytesIO()  

# Create new stream wrapping the BytesIO object  
# using the NYC table schema  
with pa.ipc.new_stream(buf, reloaded_nyc.schema) as writer:  
    # Write 5 batches  
   for index, batch in enumerate(reloaded_nyc.to_batches()):  
       writer.write_batch(batch)  
       if index > 5:  
           break  

print(writer.stats)  # WriteStats(num_messages=8, num_record_batches=7, ...  

# "Rewind" the BytesIO object  
buf.seek(0)  

# Open the BytesIO for reading  
with pa.ipc.open_stream(buf) as reader:  
   schema = reader.schema  
   batches = [b for b in reader]  

# Create a PyArrow Table from the batches  
pa.Table.from_batches(batches)

魔术!

您应该使用(PY)箭头吗?

首先,您已经使用PYARROW了:pandas optionally uses PyArrow用于读取CSV和PARQUET文件,以及生态系统利用箭头中的其他数据框架库进行性能。

箭头是其他高级库的构建块,它利用其功能来挤压系统性能。但是,如您所见,Pyarrow本身是相当低的。是的,支持某些操作,例如GroupBys和Contregations,但不存在一些来自Pandas的更高级别的结构。

因此,作为一些一般建议:

  • 如果您要构建效率很重要的高性能应用程序,并且要利用箭头块和流式传输功能。
  • 如果您正在寻找可以轻松迁移到的熊猫的替代速度的替代速度。

在本系列的下一篇文章中,我们将描述其中一些选择。敬请期待!