集成apache spark和QuestDB以进行时间序列分析
#教程 #database #spark #questdb

Spark是用于大规模数据工程的分析引擎。尽管历史悠久,但它仍然在大数据景观中当之无愧。另一方面,QuestDB是一个时间序列数据库,具有很高的数据摄入率。这意味着火花迫切需要数据,其中很多! ... QuestDB拥有它,在天堂做的比赛。

当然,有熊猫用于数据分析!这里的关键是表达式大规模。与熊猫不同,Spark是一个分布式系统,可以很好地扩展。

这是什么意思?

让我们看一下如何在Spark中处理数据。

出于本文的目的,我们只需要知道SPARK作业由多个任务组成,并且每个任务都在单个数据分区上工作。任务是在群集上分布的阶段并行执行的。阶段依赖于先前的阶段;来自不同阶段的任务不能并行运行。

下面的示意图描述了一个示例作业:

Diagram showing data exported to Spark as jobs

在本教程中,我们将从QuestDB表中加载数据到Spark应用程序,并探索Spark的内部工作以完善数据加载。
最后,我们将修改数据并将数据保存回QUESTDB。

加载数据以激发

首先,我们需要从QuestDB加载时间序列数据。我将使用现有的表trades,刚好超过130万行。

它包含比特币交易,跨越了3天:不完全是一个大数据方案,但足以实验。

该表包含以下列:

列名 列类型
symbol 符号
side 符号
price
amount
timestamp 时间戳

桌子按白天分配,timestamp列用作指定的时间戳。

QUESTDB通过Postgres电线协议接受连接,因此我们可以使用JDBC进行集成。您可以从各种语言中进行选择来创建Spark应用程序,在这里我们将选择Python。

创建脚本,sparktest.py

from pyspark.sql import SparkSession

# create Spark session
spark = SparkSession.builder.appName("questdb_test").getOrCreate()

# load 'trades' table into the dataframe
df = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:8812/questdb") \
    .option("driver", "org.postgresql.Driver") \
    .option("user", "admin").option("password", "quest") \
    .option("dbtable", "trades") \
    .load()

# print the number of rows
print(df.count())

# do some filtering and print the first 3 rows of the data
df = df.filter(df.symbol == 'BTC-USD').filter(df.side == 'buy')
df.show(3, truncate=False)

相信与否,这个小型应用程序已作为火花作业提交时已经从数据库中读取数据。

spark-submit --jars postgresql-42.5.1.jar sparktest.py

作业打印以下行计数:

1322570

这些是过滤表的前3行:

+-------+----+--------+---------+--------------------------+
|symbol |side|price   |amount   |timestamp                 |
+-------+----+--------+---------+--------------------------+
|BTC-USD|buy |23128.72|6.4065E-4|2023-02-01 00:00:00.141334|
|BTC-USD|buy |23128.33|3.7407E-4|2023-02-01 00:00:01.169344|
|BTC-USD|buy |23128.33|8.1796E-4|2023-02-01 00:00:01.184992|
+-------+----+--------+---------+--------------------------+
only showing top 3 rows

尽管sparktest.py自言自语,但仍然值得一提的是,此应用程序对位于postgresql-42.5.1.jar中的JDBC驱动程序有依赖性。没有这种依赖性,它就无法运行;因此,必须将其与应用程序一起提交起来。

用火花优化数据加载

我们将数据加载到Spark中。现在,我们将研究如何完成以及要考虑的一些方面。

在引擎盖下窥视的最简单方法是检查QuestDB的日志,这应该告诉我们Spark如何与数据库相互作用。我们还将利用Spark UI,该Spark UI显示执行的有用见解,包括阶段和任务。

火花连接到QuestDB:Spark是懒惰的

QUESTDB日志显示,Spark连接到数据库3次。为简单起见,我仅在日志中显示相关行:

2023-03-21T21:12:24.031443Z I pg-server connected [ip=127.0.0.1, fd=129]
2023-03-21T21:12:24.060520Z I i.q.c.p.PGConnectionContext parse [fd=129, q=SELECT * FROM trades WHERE 1=0]
2023-03-21T21:12:24.072262Z I pg-server disconnected [ip=127.0.0.1, fd=129, src=queue]

Spark创建数据框时首先查询数据库,但是事实证明,它对数据本身并不太感兴趣。查询看起来像这样:
SELECT * FROM trades WHERE 1=0

Spark唯一想知道的是表格的模式,以创建一个空数据帧。 Spark懒洋洋地评估表达式,并且只能在每个步骤中需要最低限度。毕竟,这是为了分析大数据,
因此,资源对于Spark来说是非常珍贵的。尤其是内存:默认情况下不会缓存数据。

第二个连接发生在Spark计数数据框的行时。这次也没有查询数据。有趣的是,它不是通过运行SELECT count(*) FROM trades将聚合推向数据库,而是
只是查询每个记录的1SELECT 1 FROM trades
Spark将1添加在一起以获得实际的计数。

2023-03-21T21:12:25.692098Z I pg-server connected [ip=127.0.0.1, fd=129]
2023-03-21T21:12:25.693863Z I i.q.c.p.PGConnectionContext parse [fd=129, q=SELECT 1 FROM trades     ]
2023-03-21T21:12:25.695284Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-01.2 [rowCount=487309, partitionNameTxn=2, transientRowCount=377783, partitionIndex=0, partitionCount=3]
2023-03-21T21:12:25.749986Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-02.1 [rowCount=457478, partitionNameTxn=1, transientRowCount=377783, partitionIndex=1, partitionCount=3]
2023-03-21T21:12:25.800765Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-03.0 [rowCount=377783, partitionNameTxn=0, transientRowCount=377783, partitionIndex=2, partitionCount=3]
2023-03-21T21:12:25.881785Z I pg-server disconnected [ip=127.0.0.1, fd=129, src=queue]

使用数据本身最终也迫使Spark也可以品尝到桌子的内容。默认情况下将过滤器推到数据库。

2023-03-21T21:12:26.132570Z I pg-server connected [ip=127.0.0.1, fd=28]
2023-03-21T21:12:26.134355Z I i.q.c.p.PGConnectionContext parse [fd=28, q=SELECT "symbol","side","price","amount","timestamp" FROM trades  WHERE ("symbol" IS NOT NULL) AND ("side" IS NOT NULL) AND ("symbol" = 'BTC-USD') AND ("side" = 'buy')   ]
2023-03-21T21:12:26.739124Z I pg-server disconnected [ip=127.0.0.1, fd=28, src=queue]

我们可以看到Spark与数据库的互动相当复杂,并且优化以实现良好的性能而不会浪费资源。 Spark DataFrame是负责优化的关键组件,值得进一步分析。

什么是Spark DataFrame?

名称DataFrame听起来像一个容器可以容纳数据,但是我们之前已经看到这并不是真的。那么什么是Spark DataFrame?

观察Spark SQL的一种方法,有过度简化它的风险,是它是一个查询引擎。 df.filter(predicate)实际上只是说
的另一种方式 WHERE predicate。考虑到这一点,数据帧几乎是一个查询,或者实际上更像是一个查询计划。

大多数数据库都带有功能来显示查询计划,Spark也有!让我们检查一下我们刚刚创建的上述数据框的计划:

df.explain(extended=True)

== Parsed Logical Plan ==
Filter (side#1 = buy)
+- Filter (symbol#0 = BTC-USD)
   +- Relation [symbol#0,side#1,price#2,amount#3,timestamp#4] JDBCRelation(trades) [numPartitions=1]

== Analyzed Logical Plan ==
symbol: string, side: string, price: double, amount: double, timestamp: timestamp
Filter (side#1 = buy)
+- Filter (symbol#0 = BTC-USD)
   +- Relation [symbol#0,side#1,price#2,amount#3,timestamp#4] JDBCRelation(trades) [numPartitions=1]

== Optimized Logical Plan ==
Filter ((isnotnull(symbol#0) AND isnotnull(side#1)) AND ((symbol#0 = BTC-USD) AND (side#1 = buy)))
+- Relation [symbol#0,side#1,price#2,amount#3,timestamp#4] JDBCRelation(trades) [numPartitions=1]

== Physical Plan ==
*(1) Scan JDBCRelation(trades) [numPartitions=1] [symbol#0,side#1,price#2,amount#3,timestamp#4] PushedFilters: [*IsNotNull(symbol), *IsNotNull(side), *EqualTo(symbol,BTC-USD), *EqualTo(side,buy)], ReadSchema: struct<symbol:string,side:string,price:double,amount:double,timestamp:timestamp>

如果数据框知道如何通过记住执行计划来重现数据,则不需要存储实际数据。这正是我们之前看到的。 Spark拼命尝试不加载我们的数据,但这也可能有缺点。

缓存数据

不用缓存数据从根本上减少了Spark的内存足迹,但是这里有一些杂耍。数据不必被缓存,因为上面打印的计划可以一次又一次地执行...

现在想象一个仅仅是一个大小的火花集群如何使我们的孤独QuestDB实例遭受mart难。

在包含许多分区的大量表的情况下,Spark将生成大量的任务,要在集群的不同节点上平行执行。这些任务几乎可以同时查询表,从而在数据库上重负载。因此,如果您在数据库服务器上找到同事烹饪早餐,请考虑强迫Spark缓存一些数据以减少数据库的旅行数量。

这可以通过致电df.cache()来完成。在大型应用程序中,可能需要考虑值得缓存的内容以及如何确保Spark执行者有足够的内存来存储数据。

实际上,您应该考虑在整个应用程序的生活中经常使用经常使用的小型数据集。

让我们通过微小的修改重新运行代码,添加.cache()

from pyspark.sql import SparkSession

# create Spark session
spark = SparkSession.builder.appName("questdb_test").getOrCreate()

# load 'trades' table into the dataframe
df = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:8812/questdb") \
    .option("driver", "org.postgresql.Driver") \
    .option("user", "admin").option("password", "quest") \
    .option("dbtable", "trades") \
    .load() \
    .cache()

# print the number of rows
print(df.count())

# print the first 3 rows of the data
df.show(3, truncate=False)

这次火花仅击中数据库仅两次。首先,它是用于架构的,第二次获取数据:
SELECT "symbol","side","price","amount","timestamp" FROM trades.

2023-03-21T21:13:04.122390Z I pg-server connected [ip=127.0.0.1, fd=129]
2023-03-21T21:13:04.147353Z I i.q.c.p.PGConnectionContext parse [fd=129, q=SELECT * FROM trades WHERE 1=0]
2023-03-21T21:13:04.159470Z I pg-server disconnected [ip=127.0.0.1, fd=129, src=queue]
2023-03-21T21:13:05.873960Z I pg-server connected [ip=127.0.0.1, fd=129]
2023-03-21T21:13:05.875951Z I i.q.c.p.PGConnectionContext parse [fd=129, q=SELECT "symbol","side","price","amount","timestamp" FROM trades     ]
2023-03-21T21:13:05.876615Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-01.2 [rowCount=487309, partitionNameTxn=2, transientRowCount=377783, partitionIndex=0, partitionCount=3]
2023-03-21T21:13:06.259472Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-02.1 [rowCount=457478, partitionNameTxn=1, transientRowCount=377783, partitionIndex=1, partitionCount=3]
2023-03-21T21:13:06.653769Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-03.0 [rowCount=377783, partitionNameTxn=0, transientRowCount=377783, partitionIndex=2, partitionCount=3]
2023-03-21T21:13:08.479209Z I pg-server disconnected [ip=127.0.0.1, fd=129, src=queue]

很明显,即使是一些精心放置的.cache()呼叫也可以改善应用程序的整体性能,有时也可以显着。考虑性能时,我们还应该考虑什么?

之前,我们提到我们的Spark应用程序由任务组成,这些任务正在处理数据的不同分区。因此,分区的数据平均并行性,这会导致更好的性能。

火花数据分区

现在我们转向Spark UI。

它告诉我们,这项工作是在一个任务中完成的:

Spark UI showing one task completed

事实是我们已经怀疑了这一点。执行计划告诉我们(numPartitions=1),我们在QuestDB日志中也没有看到任何并行性。我们可以使用一些其他代码显示有关此分区的更多详细信息:

from pyspark.sql.functions import spark_partition_id, min, max, count

df = df.withColumn("partitionId", spark_partition_id())
df.groupBy(df.partitionId) \
    .agg(min(df.timestamp), max(df.timestamp), count(df.partitionId)) \
    .show(truncate=False)

+-----------+--------------------------+--------------------------+------------------+
|partitionId|min(timestamp)            |max(timestamp)            |count(partitionId)|
+-----------+--------------------------+--------------------------+------------------+
|0          |2023-02-01 00:00:00.078073|2023-02-03 23:59:59.801778|1322570           |
+-----------+--------------------------+--------------------------+------------------+

UI帮助我们确认数据已加载为单个分区。 QuestDB将此数据存储在三个分区中。我们应该尝试解决此问题。

尽管不建议使用,但我们可以尝试使用DataFrame.repartition()。该调用在分区数据时会在集群上重新填充数据,因此应该是我们的最后一个度假胜地。运行df.repartition(3, df.timestamp)后,我们看到了3个分区,但并不完全按照我们的预期方式。分区相互重叠:

+-----------+--------------------------+--------------------------+------------------+
|partitionId|min(timestamp)            |max(timestamp)            |count(partitionId)|
+-----------+--------------------------+--------------------------+------------------+
|0          |2023-02-01 00:00:00.698152|2023-02-03 23:59:59.801778|438550            |
|1          |2023-02-01 00:00:00.078073|2023-02-03 23:59:57.188894|440362            |
|2          |2023-02-01 00:00:00.828943|2023-02-03 23:59:59.309075|443658            |
+-----------+--------------------------+--------------------------+------------------+

似乎DataFrame.repartition()使用哈希在三个分区中分布行。这意味着所有3个任务都需要所有3个QuestDB分区的数据。

让我们尝试一下:df.repartitionByRange(3, "timestamp")

+-----------+--------------------------+--------------------------+------------------+
|partitionId|min(timestamp)            |max(timestamp)            |count(partitionId)|
+-----------+--------------------------+--------------------------+------------------+
|0          |2023-02-01 00:00:00.078073|2023-02-01 21:22:35.656399|429389            |
|1          |2023-02-01 21:22:35.665599|2023-02-02 21:45:02.613339|470372            |
|2          |2023-02-02 21:45:02.641778|2023-02-03 23:59:59.801778|422809            |
+-----------+--------------------------+--------------------------+------------------+

这看起来更好,但仍然不理想。那是因为DaraFrame.repartitionByRange()采样数据集然后估算分区的边界。

我们真正想要的是数据帧分区,以与我们在QuestDB中看到的分区完全匹配。这样,在Spark中并行运行的任务不会在QuestDB中跨越,可能会导致更好的性能。

数据源选项是为了营救!让我们尝试以下内容:

from pyspark.sql import SparkSession
from pyspark.sql.functions import spark_partition_id, min, max, count

# create Spark session
spark = SparkSession.builder.appName("questdb_test").getOrCreate()

# load 'trades' table into the dataframe
df = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:8812/questdb") \
    .option("driver", "org.postgresql.Driver") \
    .option("user", "admin").option("password", "quest") \
    .option("dbtable", "trades") \
    .option("partitionColumn", "timestamp") \
    .option("numPartitions", "3") \
    .option("lowerBound", "2023-02-01T00:00:00.000000Z") \
    .option("upperBound", "2023-02-04T00:00:00.000000Z") \
    .load()

df = df.withColumn("partitionId", spark_partition_id())
df.groupBy(df.partitionId) \
    .agg(min(df.timestamp), max(df.timestamp), count(df.partitionId)) \
    .show(truncate=False)

+-----------+--------------------------+--------------------------+------------------+
|partitionId|min(timestamp)            |max(timestamp)            |count(partitionId)|
+-----------+--------------------------+--------------------------+------------------+
|0          |2023-02-01 00:00:00.078073|2023-02-01 23:59:59.664083|487309            |
|1          |2023-02-02 00:00:00.188002|2023-02-02 23:59:59.838473|457478            |
|2          |2023-02-03 00:00:00.565319|2023-02-03 23:59:59.801778|377783            |
+-----------+--------------------------+--------------------------+------------------+

在指定partitionColumnnumPartitionslowerBoundupperBound之后,情况要好得多:分区中的行计数符合我们之前在QuestDB日志中看到的内容:rowCount=487309rowCount=457478rowCount=457478rowCount=377783。看起来我们做到了!

2023-03-21T21:13:05.876615Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-01.2 [rowCount=487309, partitionNameTxn=2, transientRowCount=377783, partitionIndex=0, partitionCount=3]
2023-03-21T21:13:06.259472Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-02.1 [rowCount=457478, partitionNameTxn=1, transientRowCount=377783, partitionIndex=1, partitionCount=3]
2023-03-21T21:13:06.653769Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-03.0 [rowCount=377783, partitionNameTxn=0, transientRowCount=377783, partitionIndex=2, partitionCount=3]

我们可以再次检查Spark UI;它还确认该作业是在三个单独的任务中完成的,每个任务都在一个分区上工作。

Spark UI showing three tasks completed

有时知道创建数据框时知道最小和最大时间戳可能很棘手。在最坏的情况下,您可以通过普通连接查询数据库。

我们设法在Spark中复制了我们的QUESTDB分区,但是数据并不总是来自一个表。如果需要的数据是查询的结果怎么办?我们可以加载它吗?是否可以将其划分?

加载数据的选项:SQL查询与表

我们可以在SQL查询的帮助下使用query选项从QuestDB加载数据:

# 1-minute aggregated trade data
df = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:8812/questdb") \
    .option("driver", "org.postgresql.Driver") \
    .option("user", "admin").option("password", "quest") \
    .option("query", "SELECT symbol, sum(amount) as volume, "
                     "min(price) as minimum, max(price) as maximum, "
                     "round((max(price)+min(price))/2, 2) as mid, "
                     "timestamp as ts "
                     "FROM trades WHERE symbol = 'BTC-USD' "
                     "SAMPLE BY 1m ALIGN to CALENDAR") \
    .load()

根据数据和实际查询的数量,您可能会发现,将聚合推向QUESTDB比在Spark中完成它要快。当数据集确实很大时,Spark肯定具有优势。

现在,让我们尝试使用以前使用选项dbtable使用的选项对此数据框架进行分区。不幸的是,我们将收到一条错误消息:

Options 'query' and 'partitionColumn' can not be specified together.

但是,我们可以通过给查询一个别名名称来欺骗Spark。这意味着我们可以再次使用dbtable选项,这使我们可以指定分区。请参阅下面的示例:

from pyspark.sql import SparkSession
from pyspark.sql.functions import spark_partition_id, min, max, count

# create Spark session
spark = SparkSession.builder.appName("questdb_test").getOrCreate()

# load 1-minute aggregated trade data into the dataframe
df = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:8812/questdb") \
    .option("driver", "org.postgresql.Driver") \
    .option("user", "admin").option("password", "quest") \
    .option("dbtable", "(SELECT symbol, sum(amount) as volume, "
                     "min(price) as minimum, max(price) as maximum, "
                     "round((max(price)+min(price))/2, 2) as mid, "
                     "timestamp as ts "
                     "FROM trades WHERE symbol = 'BTC-USD' "
                     "SAMPLE BY 1m ALIGN to CALENDAR) AS fake_table") \
    .option("partitionColumn", "ts") \
    .option("numPartitions", "3") \
    .option("lowerBound", "2023-02-01T00:00:00.000000Z") \
    .option("upperBound", "2023-02-04T00:00:00.000000Z") \
    .load()

df = df.withColumn("partitionId", spark_partition_id())
df.groupBy(df.partitionId) \
    .agg(min(df.ts), max(df.ts), count(df.partitionId)) \
    .show(truncate=False)

+-----------+-------------------+-------------------+------------------+
|partitionId|min(ts)            |max(ts)            |count(partitionId)|
+-----------+-------------------+-------------------+------------------+
|0          |2023-02-01 00:00:00|2023-02-01 23:59:00|1440              |
|1          |2023-02-02 00:00:00|2023-02-02 23:59:00|1440              |
|2          |2023-02-03 00:00:00|2023-02-03 23:59:00|1440              |
+-----------+-------------------+-------------------+------------------+

看起来不错。现在看来,我们可以通过将SQL查询传递到数据框架来将任何数据从QuestDB加载到SPARK中。我们真的吗?

我们的trades表仅限于三种数据类型。您可以在QuestDB中找到的所有其他类型呢?

我们期望从数据库查询时,Spark将成功映射doubletimestamp,但是geohash呢?并不是那么明显会发生什么。

一如既往,当不确定时,我们应该测试。

键入映射

我在数据库中有另一个表格,带有不同的模式。该表具有QuestDB中当前可用类型的列。

CREATE TABLE all_types (
  symbol SYMBOL,
  string STRING,
  char CHAR,
  long LONG,
  int INT,
  short SHORT,
  byte BYTE,
  double DOUBLE,
  float FLOAT,
  bool BOOLEAN,
  uuid UUID,
  --long128 LONG128,
  long256 LONG256,
  bin BINARY,
  g5c GEOHASH(5c),
  date DATE,
  timestamp TIMESTAMP
) timestamp (timestamp) PARTITION BY DAY;

INSERT INTO all_types values('sym1', 'str1', 'a', 456, 345, 234, 123, 888.8,
  11.1, true, '9f9b2131-d49f-4d1d-ab81-39815c50d341',
  --to_long128(10, 5),
  rnd_long256(), rnd_bin(10,20,2), rnd_geohash(35),
  to_date('2022-02-01', 'yyyy-MM-dd'),
  to_timestamp('2022-01-15T00:00:03.234', 'yyyy-MM-ddTHH:mm:ss.SSS'));

long128尚未得到QuestDB的完全支持,因此会评论。

让我们尝试加载和打印数据;我们还可以查看数据框的架构:

from pyspark.sql import SparkSession

# create Spark session
spark = SparkSession.builder.appName("questdb_test").getOrCreate()

# create dataframe
df = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:8812/questdb") \
    .option("driver", "org.postgresql.Driver") \
    .option("user", "admin").option("password", "quest") \
    .option("dbtable", "all_types") \
    .load()

# print the schema
print(df.schema)

# print the content of the dataframe
df.show(truncate=False)

令我惊讶的是,Spark设法创建了数据框并映射了所有类型。

这是架构:

StructType([
  StructField('symbol', StringType(), True),
  StructField('string', StringType(), True),
  StructField('char', StringType(), True),
  StructField('long', LongType(), True),
  StructField('int', IntegerType(), True),
  StructField('short', ShortType(), True),
  StructField('byte', ShortType(), True),
  StructField('double', DoubleType(), True),
  StructField('float', FloatType(), True),
  StructField('bool', BooleanType(), True),
  StructField('uuid', StringType(), True),
#   StructField('long128', StringType(), True),
  StructField('long256', StringType(), True),
  StructField('bin', BinaryType(), True),
  StructField('g5c', StringType(), True),
  StructField('date', TimestampType(), True),
  StructField('timestamp', TimestampType(), True)
])

看起来还不错,但是您可能想知道将long256geohash类型映射到String是否是个好主意。 QuestDB不为这些类型提供算术,因此这没什么大不了的。

GeoHashes基本上是32个基数,以其字符串格式表示并存储。 256位长的值也被视为字符串文字。 long256用于存储加密货币私钥。

现在让我们查看数据:

+------+------+----+----+---+-----+----+------+-----+----+------------------------------------+
|symbol|string|char|long|int|short|byte|double|float|bool|uuid                                |
+------+------+----+----+---+-----+----+------+-----+----+------------------------------------+
|sym1  |str1  |a   |456 |345|234  |123 |888.8 |11.1 |true|9f9b2131-d49f-4d1d-ab81-39815c50d341|
+------+------+----+----+---+-----+----+------+-----+----+------------------------------------+

+------------------------------------------------------------------+----------------------------------------------------+
|long256                                                           |bin                                                 |
+------------------------------------------------------------------+----------------------------------------------------+
|0x8ee3c2a42acced0bb0bdb411c82bb01e7e487689228c189d1e865fa0e025973c|[F2 4D 4B F6 18 C2 9A A7 87 C2 D3 19 4A 2C 4B 92 C4]|
+------------------------------------------------------------------+----------------------------------------------------+

+-----+-------------------+-----------------------+
|g5c  |date               |timestamp              |
+-----+-------------------+-----------------------+
|q661k|2022-02-01 00:00:00|2022-01-15 00:00:03.234|
+-----+-------------------+-----------------------+

它看起来也不错,但是我们可以从日期字段结束时省略00:00:00。我们可以看到它映射到Timestamp而不是Date

我们还可以尝试将数字字段之一映射到Decimal。如果以后我们要进行需要高精度的计算,这将很有用。

我们可以使用customSchema选项自定义列类型。我们修改的代码:

from pyspark.sql import SparkSession

# create Spark session
spark = SparkSession.builder.appName("questdb_test").getOrCreate()

# create dataframe
df = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:8812/questdb") \
    .option("driver", "org.postgresql.Driver") \
    .option("user", "admin").option("password", "quest") \
    .option("dbtable", "all_types") \
    .option("customSchema", "date DATE, double DECIMAL(38, 10)") \
    .load()

# print the schema
print(df.schema)

# print the content of the dataframe
df.show(truncate=False)

新模式:

StructType([
  StructField('symbol', StringType(), True),
  StructField('string', StringType(), True),
  StructField('char', StringType(), True),
  StructField('long', LongType(), True),
  StructField('int', IntegerType(), True),
  StructField('short', ShortType(), True),
  StructField('byte', ShortType(), True),
  StructField('double', DecimalType(38,10), True),
  StructField('float', FloatType(), True),
  StructField('bool', BooleanType(), True),
  StructField('uuid', StringType(), True),
#   StructField('long128', StringType(), True),
  StructField('long256', StringType(), True),
  StructField('bin', BinaryType(), True),
  StructField('g5c', StringType(), True),
  StructField('date', DateType(), True),
  StructField('timestamp', TimestampType(), True)
])

数据显示为:

+------+------+----+----+---+-----+----+--------------+-----+----+------------------------------------+
|symbol|string|char|long|int|short|byte|double        |float|bool|uuid                                |
+------+------+----+----+---+-----+----+--------------+-----+----+------------------------------------+
|sym1  |str1  |a   |456 |345|234  |123 |888.8000000000|11.1 |true|9f9b2131-d49f-4d1d-ab81-39815c50d341|
+------+------+----+----+---+-----+----+--------------+-----+----+------------------------------------+

+------------------------------------------------------------------+----------------------------------------------------+
|long256                                                           |bin                                                 |
+------------------------------------------------------------------+----------------------------------------------------+
|0x8ee3c2a42acced0bb0bdb411c82bb01e7e487689228c189d1e865fa0e025973c|[F2 4D 4B F6 18 C2 9A A7 87 C2 D3 19 4A 2C 4B 92 C4]|
+------------------------------------------------------------------+----------------------------------------------------+

+-----+----------+-----------------------+
|g5c  |date      |timestamp              |
+-----+----------+-----------------------+
|q661k|2022-02-01|2022-01-15 00:00:03.234|
+-----+----------+-----------------------+

似乎Spark几乎可以处理所有数据库类型。唯一的问题是long128,但是这种类型是当前在QuestDB中进行的工作。完成后,它将映射为String,就像long256一样。

将数据写回数据库

只剩下一件事:将数据写回QuestDB。

在此示例中,首先,我们将加载数据库中的一些数据,并添加两个新功能:

  • 10分钟移动平均线
  • 标准偏差,也可以在最后10分钟的窗口
  • 上计算

然后,我们将尝试将修改后的数据框架保存回QUESTDB作为新表。我们需要照顾某种类型的映射,因为Double列以FLOAT8发送到QuestDB,因此我们最终得到了此代码:

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, stddev, when

# create Spark session
spark = SparkSession.builder.appName("questdb_test").getOrCreate()

# load 1-minute aggregated trade data into the dataframe
df = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:8812/questdb") \
    .option("driver", "org.postgresql.Driver") \
    .option("user", "admin").option("password", "quest") \
    .option("dbtable", "(SELECT symbol, sum(amount) as volume, "
                       "round((max(price)+min(price))/2, 2) as mid, "
                       "timestamp as ts "
                       "FROM trades WHERE symbol = 'BTC-USD' "
                       "SAMPLE BY 1m ALIGN to CALENDAR) AS fake_table") \
    .option("partitionColumn", "ts") \
    .option("numPartitions", "3") \
    .option("lowerBound", "2023-02-01T00:00:00.000000Z") \
    .option("upperBound", "2023-02-04T00:00:00.000000Z") \
    .load()

# add new features
window_10 = Window.partitionBy(df.symbol).rowsBetween(-10, Window.currentRow)
df = df.withColumn("ma10", avg(df.mid).over(window_10))
df = df.withColumn("std", stddev(df.mid).over(window_10))
df = df.withColumn("std", when(df.std.isNull(), 0.0).otherwise(df.std))

# save the data as 'trades_enriched'
df.write.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:8812/questdb") \
    .option("driver", "org.postgresql.Driver") \
    .option("user", "admin").option("password", "quest") \
    .option("dbtable", "trades_enriched") \
    .option("createTableColumnTypes", "volume DOUBLE, mid DOUBLE, ma10 DOUBLE, std DOUBLE") \
    .save()

所有工作都可以,但是我们很快就意识到我们的新桌子trades_enriched没有被分区,也没有指定的时间戳,这不是理想的。显然Spark不知道QuestDB细节。

如果我们预先创建表并将数据保存到其中,它将更好地工作。我们放下桌子并重新创建它;这次,它被分区并具有指定的时间戳:

DROP TABLE trades_enriched;
CREATE TABLE trades_enriched (
  volume DOUBLE,
  mid DOUBLE,
  ts TIMESTAMP,
  ma10 DOUBLE,
  std DOUBLE
) timestamp (ts) PARTITION BY DAY;

表是空的,并且正在等待数据。

我们重新运行代码;所有作品,没有投诉。数据在表格中,已分区。

将数据写入数据库的一个方面是我们是否可以创建重复。如果我尝试在不放置桌子的情况下再次重新运行代码怎么办?这次会让我保存数据吗?
不,我们有一个错误:

pyspark.sql.utils.AnalysisException: Table or view 'trades_enriched' already exists. SaveMode: ErrorIfExists.

错误消息的最后一部分看起来很有趣:SaveMode: ErrorIfExists

什么是SaveMode?事实证明,如果表已经存在,我们可以配置应该发生的事情。我们的选择是:

  • errorifexists:默认行为是在表格的情况下返回错误 已经存在,Spark在这里安全
  • append:数据将附加到已经存在的现有行 表
  • overwrite:表的内容将完全由新的替代 保存的数据
  • ignore:如果表不空,则我们的保存操作将被忽略而没有 任何错误

我们已经看到了errorifexists的行为,appendignore似乎很简单就可以工作了。

但是,overwrite并不简单。在保存新数据之前,必须清除表的内容。 Spark将默认删除并重新创建表格,这意味着丢失分区和指定的时间戳。

通常,我们不希望Spark为我们创建桌子。幸运的是,使用truncate选项,我们可以告诉Spark使用TRUNCATE清除表,而不是删除它:

# save the data as 'trades_enriched', overwrite if already exists
df.write.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:8812/questdb") \
    .option("driver", "org.postgresql.Driver") \
    .option("user", "admin").option("password", "quest") \
    .option("dbtable", "trades_enriched") \
    .option("truncate", True) \
    .option("createTableColumnTypes", "volume DOUBLE, mid DOUBLE, ma10 DOUBLE, std DOUBLE") \
    .save(mode="overwrite")

上面的工作原理。

结论

我们的骑行似乎颠簸了,但我们终于拥有一切
在职的。我们的新座右铭应该是“所有内容都有一个配置选项!”。

总结我们发现的内容:

  • 我们可以使用Spark的JDBC数据源与QuestDB集成。
  • 建议使用dbtable选项,即使我们使用SQL查询来加载数据。
  • 始终尝试指定分区选项(partitionColumnnumPartitionslowerBoundupperBound),当加载数据时,理想情况下,分区应与QuestDB分区匹配。
  • 有时在Spark中缓存一些数据以减少数据库的旅行数量是有意义的。
  • 根据任务以及涉及多少数据,将工作推向数据库可能是有益的。使用QuestDB的特定时间序列特定功能,例如SAMPLE BY,而不是试图在Spark中重写。
  • 加载数据时可以通过customSchema选项自定义类型映射。
  • 将数据写入QuestDB时总是指定所需的保存模式。
  • 如果您预先创建表格并且不让Spark创建它,通常会更好地工作,因为这样您可以添加分区和指定的时间戳。
  • 如果选择了overwrite节省模式,则应启用truncate选项,以确保Spark不会删除表;因此分区,指定的时间戳不会丢失。
  • 键入映射可以通过保存数据时通过createTableColumnTypes选项自定义。

我仅提到了使用QUESTDB时最有可能经过调整的配置选项;完整的选项集可以在这里找到:
Spark data source options.

未来会带来什么?

总的来说,一切都起作用,但是拥有一种更加无缝的集成方式真是太好了,可以自动处理分区。将数据保存到QUESTDB时,某些类型的映射也可以使用更好的默认值。
overwrite节省模式可能默认使用TRUNCATE

更多的无缝集成并非不可能实现。如果QuestDB为Spark提供了自己的JDBCDialect实现,则可以处理上述细微差别。
我们可能应该考虑添加此。

最后,我们还没有提及数据局部性。那是因为,当前QuestDB无法作为群集运行。但是,我们正在积极研究解决方案 - 查看The Inner Workings of Distributed Databases
有关更多信息。

时间到来时,我们应确保还考虑数据位置。理想情况下,每个火花节点都可以处理需要从本地(或最接近)QUESTDB实例加载的分区的任务。

但是,这不是我们现在应该关注的事情...现在,只需享受数据误解即可!