Py-Arrow的介绍
#教程 #python #apache #timeseries

因此,到目前为止,您可能知道Influxdata一直在忙于建造下一代的InfluxDB存储引擎。如果您更深入地挖掘,您将开始发现一些可能对您陌生的概念:

  • Apache Parquet
  • Apache Arrow
  • 箭头飞行

这些开源项目是构成新存储引擎的一些核心构建块。在大多数情况下,您将不必担心引擎盖下的情况。虽然,如果您像我一样,想要对某些项目有什么更实际的了解,请加入我的发现之旅。

我们要研究的第一个组件是Apache箭头。我的同事查尔斯(Charles)提供了一个很好的高级概述,您可以找到here

简而言之:
箭头管理数组中的数据,可以在表中分组以表示表格数据中的数据列。 Arrow还提供了各种格式的支持,以将这些表格数据输入和退出磁盘和网络。最常用的格式是镶木(您会相当多地接触到这个概念)。

出于性能原因,我们的开发人员使用Rust来编码InfluxDB的新存储引擎。我个人喜欢在Python学习新的编码概念,因此我们将使用Pyarrow客户库。

基础

在Apache Arrow中,您有两个主要数据容器/类:Arrays and Tables。我们将稍后再挖掘这些内容,但是让我们首先编写一个快速的代码段来创建每个代码:

import pyarrow as pa

# Create a array from a list of values
animal = pa.array(["sheep", "cows", "horses", "foxes"], type=pa.string())
count = pa.array([12, 5, 2, 1], type=pa.int8())
year = pa.array([2022, 2022, 2022, 2022], type=pa.int16())

# Create a table from the arrays
table = pa.Table.from_arrays([animal, count, year], names=['animal', 'count', 'year'])
print(table)

因此,在此示例中,您可以看到我们构建了3个值阵列:动物,计数和年。我们可以组合这些阵列以形成表的列。运行此代码的结果看起来像:

animal: string
count: int8
year: int16
----
animal: [["sheep","cows","horses","foxes"]]
count: [[12,5,2,1]]
year: [[2022,2022,2022,2022]]

现在,我们可以使用一张桌子,让我们看看我们能做什么。箭头的第一个主要功能是提供节省和恢复表格数据的设施(最常见的是镶木quet格式,这将在以后的博客中具有很大的特征)。

让我们保存并加载我们新创建的表:

import pyarrow as pa
import pyarrow.parquet as pq

# Create a array from a list of values
animal = pa.array(["sheep", "cows", "horses", "foxes"], type=pa.string())
count = pa.array([12, 5, 2, 1], type=pa.int8())
year = pa.array([2022, 2022, 2022, 2022], type=pa.int16())

# Create a table from the arrays
table = pa.Table.from_arrays([animal, count, year], names=['animal', 'count', 'year'])

# Save the table to a Parquet file
pq.write_table(table, 'example.parquet')

# Load the table from the Parquet file
table2 = pq.read_table('example.parquet')
print(table2)

最后,要完成基础知识,让我们尝试一个计算功能(value_counts)。我们可以将计算功能应用于数组和表,然后使我们能够将转换应用于数据集。我们将在下一部分中更详细地介绍这些内容,但让我们以一个简单的示例开始:

import pyarrow as pa
import pyarrow.compute as pc

# Create a array from a list of values
animal = pa.array(["sheep", "cows", "horses", "foxes", "sheep"], type=pa.string())
count = pa.array([12, 5, 2, 1, 10], type=pa.int8())
year = pa.array([2022, 2022, 2022, 2022, 2021], type=pa.int16())

# Create a table from the arrays
table = pa.Table.from_arrays([animal, count, year], names=['animal', 'count', 'year'])

count_y = pc.value_counts(table['animal'])
print(count_y)

您可以看到,将库pyarrow.com调用为PC,并使用内置计数函数。这使我们能够计算给定数组或表中的值数量。我们选择计算动物的数量,该动物数量产生以下输出:

-- child 0 type: string
  [
    "sheep",
    "cows",
    "horses",
    "foxes"
  ]
-- child 1 type: int64
  [
    2,
    1,
    1,
    1
  ]

一个实用的例子

因此,我决定跳过向您列出所有数据类型和处理器,并以为我会向您展示一个更现实的示例,即将Apache Arrow与InfluxDB的TSM引擎一起使用。 现在,破坏者:这不是您与InfluxDB的新存储引擎互动的方式(查询比这非常光滑)。这纯粹是将大型样品数据集从InfluxDB中拉到Pyarrow中的一种手段,因此我们可以对其进行试验。 InfluxDB的新存储引擎将允许自动导出您的数据作为镶木文件。

所以计划:

  1. 使用InfluxDB Python client library 的常规方法查询infuxdb(使用数据框架方法)。
  2. 使用Apache Arrow的内置PANDAS DataFrame转换方法将我们的数据设置转换为箭头表数据结构。
  3. 然后,我们将使用一个新功能将表作为一系列分区的镶木文件保存到磁盘。
  4. 最后一个脚本将重新加载分区并在我们的箭头表结构上执行一系列基本聚合。

让我们看一下代码:

create_parquet.py

from influxdb_client import InfluxDBClient
from pandas import DataFrame as df
import pyarrow.dataset as ds
import pyarrow as pa

with InfluxDBClient(url="http://localhost:8086", token="edge", org="influxdb", debug=False) as client:
    query_api = client.query_api()
    query = '''
    import "influxdata/influxdb/sample"
    sample.data(set: "usgs")
        |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
        |> group() 
    '''

df = query_api.query_data_frame(query=query)

table = pa.Table.from_pandas(df)
print(table)
print("Saving to parquet files...")

# Drop result and table columns
table = table.drop(["result", "table"])
print(table)

# partitioning of your data in smaller chunks
ds.write_dataset(table, "usgs", format="parquet",
                 partitioning=ds.partitioning(
                    pa.schema([table.schema.field("_measurement")])
                ))

因此,您将不熟悉的两个功能是;

  • pa.table.pandas(df):这将自动将我们的数据框架转换为箭头表。注意:注意您的数据类型!请咨询官方documentation以获取更多信息。
  • write_dataset(分区= ds.partitioning(â€)):这种修改的方法将我们的表根据我们的测量列中的值将我们的表划分为Parquet文件。这看起来像是目录树。此方法有助于将大型数据集分离为更可管理的资产。

让我们现在查看第二个脚本,该脚本可与我们保存的镶木材料一起使用:

import pyarrow.dataset as ds

# Loading back the partitioned dataset will detect the chunks                
usgs = ds.dataset("usgs", format="parquet", partitioning=["_measurement"])
print(usgs.files)

# Convert to a table
usgs = usgs.to_table()
print(usgs)

# Grouped Aggregation example
aggregation = usgs.group_by("_measurement").aggregate([("rms", "mean"), ("rms", "max"), ("rms", "min") ]).to_pandas()
print(aggregation)

在此脚本中,如果您使用大熊猫或其他查询引擎:group_by和contregate,我们可能会熟悉几个新功能。我们使用这些功能根据测量值对数据点进行分组,并为每个组提供数学汇总(均值,最大,模式)。这将基于聚合生成新的箭头表。然后,我们将表转换回数据框以获得可读性。

结论

我希望该博客能够使您能够开始更深入地研究Apache Arrow,并帮助您了解为什么我们决定投资于Apache Arrow及其儿童产品的未来。我也希望它为您提供基础,以开始探索如何从此框架构建自己的分析应用程序。 InfuxDB的新存储引擎强调了其对更大生态系统的承诺。例如,允许出口镶木quet文件使我们有机会在Rapid Miner和其他分析平台等平台中分析我们的数据。

我为您致电的电话是查看here代码,并发现其他处理器功能Apache Arrow提供。即将出现的许多内容都将围绕Apache Parquet,因此,如果有任何产品/平台使用Parquet,您希望我们可以谈论我们。快来加入我们的Slackforums。分享您的想法 - 我期待在那里见到您!