使用新的InfluxDB 3.0 Python CLI和客户库库
Jay Clifford
社区客户库与InfluxDB 3.0一起回来。如果您想要每个客户端库的概述,那么我强烈建议您查看Anais’s blog on their status。
在这个由两部分组成的博客系列中,我们深入研究了新的Python Client Library和CLI。最后,您应该对当前功能,内部内容的工作方式以及我对两个项目的未来想法有很好的了解。我希望它是为您提供了为您的未来做出贡献并有发言权的机会。
在这篇文章(第1部分)中,我们将主要关注客户库,因为它是Python CLI的基础。
Python客户库
所以,让我们从Python客户端库开始。范围很简单:构建一个可以写入并查询InfluxDB 3.0的库。由于写入端点没有更改IninfluxDB 3.0,因此我们可以从V2库中提出许多功能,例如批处理写入,数据解析,点对象等等。但是,在查询方面,我们不得不完全对其进行重制。我们想专注于Arrow Flight 的功能,并为基于SQL和InfuxQL的查询提供支持。 PyArrow还为Pandas和Polars等图书馆提供了更好的生态系统支持,但稍后我将有更多的支持。
让我们一起构建一个简单的python应用程序,编写和查询infuxdb 3.0。
安装
要安装客户端库(我建议首先制作Python虚拟环境):
$ python3 -m venv ./.venv \
$ source .venv/bin/activate \
$ pip install –upgrade pip
$ pip install influxdb3-python
这组命令创建我们的虚拟Python环境,激活它,更新我们的Python软件包安装程序,最后安装新的客户端库。
创建客户
在本节中,我们导入新安装的库并建立客户端。我还讨论了一些配置参数及其背后的推理。
让我们创建一个带有以下代码的main.py文件:
from influxdb_client_3 import InfluxDBClient3, Point
import pandas as pd
import numpy as np
import datetime
client = InfluxDBClient3( token="",
host="eu-central-1-1.aws.cloud2.influxdata.com",
org="6a841c0c08328fb1",
database="pokemon-codex")
此示例显示了客户端的最小配置。像以前的客户端一样,它需要以下参数:
令牌 | 这为客户提供了从infuxdb cloud Serverless或Dedicated读写的身份验证。注意:如果您希望使用这两个功能,则需要具有读写身份验证的令牌。 |
主机 | infuxdb主机 - 这只能是没有协议的域(https://) |
org | Cloudless仍然需要用户组织ID将数据写入3.0。专用的用户只能使用任意字符串。 |
数据库 | 您想查询和写入的数据库。 |
我建议以每个数据库创建客户端,尽管如果您只想创建一个客户端,则可以更新_database
实例变量。
接下来,让我们看一下客户端的高级参数:
Flight_client_options | 这提供了对飞行查询协议的参数的访问。您可以找到配置选项here。 Example. |
write_client_options | 这提供了对V2写入客户端使用的参数的访问,您可以找到here。 Example. |
**夸尔格斯 | 最后,这提供了对V2客户端使用的参数的访问,您可以找到here。
Example.(GZIP压缩) |
让我们通过讨论写功能来继续我们的原始示例。
写数据
现在,我们建立了客户,在本节中,我们查看您可以用来将数据写入InfuxDB 3.0的不同方法。大多数人会熟悉他们的摄入方法与v2相同的摄入方法。
让我们从基本的点构建开始:
# Continued from the Client's example \
\
now = datetime.datetime.now(datetime.timezone.utc)
data = Point("caught").tag("trainer", "ash").tag("id", "0006").tag("num", "1")\
.field("caught", "charizard")\
.field("level", 10).field("attack", 30)\
.field("defense", 40).field("hp", 200)\
.field("speed", 10)\
.field("type1", "fire").field("type2", "flying")\
.time(now)
try:
client.write(data)
except Exception as e:
print(f"Error writing point: {e}")
在此示例中,您可以看到我们使用点类的实例构建了我们的行协议,然后将其转换为行协议:\
Point,trainer=ash,id=0006,num=1 caught="charizard",level=10i,attack=30i,defense=40i,hp=200i,speed=10i,type1="fire",type2="flying" <timestamp>
您还可以将其格式化为一个点:\
data = []
# Adding first point
data.append(
Point("caught")
.tag("trainer", "ash")
.tag("id", "0006")
.tag("num", "1")
.field("caught", "charizard")
.field("level", 10)
.field("attack", 30)
.field("defense", 40)
.field("hp", 200)
.field("speed", 10)
.field("type1", "fire")
.field("type2", "flying")
.time(now)
)
# Adding second point
data.append(
Point("caught")
.tag("trainer", "ash")
.tag("id", "0007")
.tag("num", "2")
.field("caught", "bulbasaur")
.field("level", 12)
.field("attack", 31)
.field("defense", 31)
.field("hp", 190)
.field("speed", 11)
.field("type1", "grass")
.field("type2", "poison")
.time(now)
)
您也可以通过字典编码和结构化数据方法编写。我最喜欢的摄入方法之一是通过熊猫数据框架。
让我们看一个利用此方法的示例:
# Convert the list of dictionaries to a DataFrame
caught_pokemon_df = pd.DataFrame(data).set_index('timestamp')
# Print the DataFrame
print(caught_pokemon_df)
try:
client.write(caught_pokemon_df, data_frame_measurement_name='caught',
data_frame_tag_columns=['trainer', 'id', 'num'])
except Exception as e:
print(f"Error writing point: {e}")
此示例为本届会议创建了我们抓住的Pokemon的熊猫数据框架。我们将数据框的索引设置为捕获口袋妖怪的时间戳,然后提供数据框以及以下写入参数到write()函数:
data_frame_measurement_name | 您希望将Pandas DataFrame写入的测量名称。 |
data_frame_tag_columns | 包含您希望制作标签的列名称的字符串列表。 |
data_frame_timestamp_column | 如果未将索引设置为时间戳,请使用此参数设置时间戳列。 |
确保查看完整的示例here。您还可以找到一个批处理示例here。
从文件中写数据
上一个客户端库的一个备受要求的功能是上传和解析不同文件数据格式的更多方法。利用Pyarrow的实用程序,我们现在可以支持以下格式上传文件:
CSV | Example here. |
JSON | Example here. |
羽毛 | Example here. |
ORC | Example here. |
镶木 | Example here. |
查询数据
现在,我们将一些数据写入InfluxDB 3.0中,让我们谈谈如何查询它。 3.0提供了完全支持的Apache Arrow Flight端点,该端点允许用户使用SQL或InfluxQl查询。
让我们首先看一下SQL和InfluxQl;
的基本时间序列查询
from influxdb_client_3 import InfluxDBClient3
import pandas as pd
client = InfluxDBClient3(
token="",
host="eu-central-1-1.aws.cloud2.influxdata.com",
org="6a841c0c08328fb1",
database="pokemon-codex")
sql = '''SELECT * FROM caught WHERE trainer = 'ash' AND time >= now() - interval '1 hour' LIMIT 5'''
table = client.query(query=sql, language='sql', mode='all')
print(table)
influxql = '''SELECT * FROM caught WHERE trainer = 'ash' AND time > now() - 1h LIMIT 5'''
table = client.query(query=influxql, language='influxql', mode='pandas')
print(table)
您在此示例中可以看到,我们使用相同的客户端与InfluxQL和SQL查询。让我们快速查看查询参数,以查看它们如何塑造我们返回的结果。
查询 | 此参数当前接受您的SQL或InfluxQl查询的字符串字符串。我们希望尽快将准备好的陈述添加。 |
语言 | 此参数接受字符串字面的字符串或infuxql |
模式 | 当前有5种返回模式:
|
未来的希望
罗马没有一天建造,并且有很多生活质量的改进和新功能需要添加。这是一张概述几张的表:\
功能 | 状态 |
合并V2客户端的写入API以删除外部库依赖关系。 | 正在进行 |
准备查询的语句 | 做 |
and table for InfluxdB | 做 |
改善双极支撑 | 做 |
整合三角洲共享 | 做 |
自己试试吧
我们为Python的InfuxDB 3.0建立了我希望成为一个伟大的社区驱动客户库的基础。我的呼吁是,如果您还没有这样做,请尝试图书馆并通过其步调。我们可能不知道的边缘案例太多,我们不会在没有社区帮助的情况下找到那些。我热切地等待问题和功能请求。