用还原C ++ SDK订阅新记录
#教程 #database #cpp #reductstore

本文提供了ReductStore的简介,并解释了如何使用Reduct C++ SDK订阅数据库中的数据。

先决条件

要订阅新记录,我们应该使用一个连续查询,该查询自V1.4版本以来得到了ReductStore的支持。我们可以使用以下Docker命令运行它:

docker pull reduct/store:latest
docker run -p 8383:8383 reduct/store:latest 

现在,我们需要安装Reduct Client SDK for C++。请参阅these instructions

完整的例子

现在,查看示例的代码。

#include <reduct/client.h>

#include <iostream>
#include <thread>

using reduct::IBucket;
using reduct::IClient;

int main() {
  auto writer = std::thread([]() {
    auto client = IClient::Build("http://127.0.0.1:8383");

    auto [bucket, err] = client->GetOrCreateBucket("bucket");
    if (err) {
      std::cerr << "Error: " << err;
      return;
    }

    for (int i = 0; i < 10; ++i) {
      const IBucket::WriteOptions opts{
          .timestamp = IBucket::Time::clock::now(),
          .labels = {{"good", i % 2 == 0 ? "true" : "false"}},
      };

      const auto msg = "Hey " + std::to_string(i);
      [[maybe_unused]] auto write_err = bucket->Write("entry-1", opts, [msg](auto rec) { rec->WriteAll(msg); });
      std::cout << "Write: " << msg << std::endl;
      std::this_thread::sleep_for(std::chrono::seconds(1));
    }
  });

  // Subscribe to good messages
  int good_count = 0;
  auto client = IClient::Build("http://127.0.0.1:8383");
  auto [bucket, err] = client->GetOrCreateBucket("bucket");
  if (err) {
    std::cerr << "Error: " << err;
    return -1;
  }

  const auto opts = IBucket::QueryOptions{
      .include = {{"good", "true"}},
      .continuous = true,
      .poll_interval = std::chrono::milliseconds{100},
  };

  // Continuously read messages until we get 3 good ones
  auto query_err =
      bucket->Query("entry-1", IBucket::Time::clock::now(), std::nullopt, opts, [&good_count](auto &&record) {
        auto [msg, read_err] = record.ReadAll();
        if (read_err) {
          std::cerr << "Error: " << read_err;
          return false;
        }
        std::cout << "Read: " << msg << std::endl;
        return ++good_count != 3;
      });

  writer.join();

  if (query_err) {
    std::cerr << "Query error:" << query_err;
    return -1;
  }
}

构建使用此cmakelists.txt:

cmake_minimum_required(VERSION 3.18)

project(ReductCppExamples)
set(CMAKE_CXX_STANDARD 20)

find_package(ZLIB)
find_package(OpenSSL)

find_package(ReductCpp 1.4.0)

add_executable(subscription subscription.cc)
target_link_libraries(subscription ${REDUCT_CPP_LIBRARIES} ${ZLIB_LIBRARIES} OpenSSL::SSL OpenSSL::Crypto)

示例代码演示了如何使用C ++还原SDK从存储桶中订阅新记录。该程序将10个记录写入一个存储桶,读取标签“良好”设置为“ true”或“ false”的记录,并不断读取记录,直到它读取了3个记录,此标签将其设置为“ true”。

让我们详细考虑示例。

用标签编写新记录

要与还原商店实例进行通信,请先创建客户端。

auto client = IClient::Build("http://127.0.0.1:8383");

在此示例中,我们使用默认设置在本地运行数据库,但是我们可能需要具有授权的API令牌。

像许多其他Blob储藏一样,还原存储器将数据保存在存储桶中,以进行颗粒状访问控制和配额。对于读/写操作,我们必须获得一个水桶或创建一个:

 auto [bucket, err] = client->GetOrCreateBucket("bucket");
  if (err) {
    std::cerr << "Error: " << err;
    return;
  }

创建存储桶时,您可以选择提供其他设置。一个特别有用的设置是 FIFO配额,当桶大小达到一定限制时,它会自动删除旧数据。此功能对边缘设备特别有益,因为它有助于防止设备耗尽磁盘空间。

存储桶包含条目,您可以将它们理解为主题或文件夹。还原车站没有提供条目树,它们必须具有独特的名称。让我们写一个记录到entry-1

const IBucket::WriteOptions opts{
    .timestamp = IBucket::Time::clock::now(),
    .labels = {{"good", i % 2 == 0 ? "true" : "false"}},
};

const auto msg = "Hey " + std::to_string(i);
auto write_err = bucket->Write("entry-1", opts, [msg](auto rec) { rec->WriteAll(msg); });

还原存储店是时间序列数据库将数据存储为blob。每个斑点都是必须具有时间戳的记录。但是,您可以将其他信息附加到记录中,例如标签,并将其用于注释或数据过滤。在这种情况下,我们将标签goog分配为truefalse

在此示例中,我们仅出于演示目的发送简短的短信。还原店更适合处理较大的数据斑点,例如图像,声音或二进制数据(例如Protobuf消息)。

连续查询

在分开线程中运行编写标签后,我们只能查询好数据并等待第一个3:

const auto opts = IBucket::QueryOptions{
    .include = {{"good", "true"}},
    .continuous = true,
    .poll_interval = std::chrono::milliseconds{100},
};

auto query_err =
    bucket->Query("entry-1", IBucket::Time::clock::now(), std::nullopt, opts, [&good_count](auto &&record) {
      auto [msg, read_err] = record.ReadAll();
      if (read_err) {
        std::cerr << "Error: " << read_err;
        return false;
      }
      std::cout << "Read: " << msg << std::endl;
      return ++good_count != 3;
    });

这是一个简单的示例。我们使用标志continuous表示我们将等待新记录,并每100毫秒进行一次调查。

在还原商店中,查询作为迭代器。存储了多少记录都没关系;我们只要求下一个。当我们使用连续查询时,即使我们没有收到以前的记录,我们也总是在要求新记录。我们使用pool_interval选项来指定我们要求新记录的频率。

怎么有用?

还原存储店是一个专注于边缘计算和AI/ML应用的open source database。它的连续查询可作为发布订阅通信模型,类似于MQTT。您可以将数据库用作消息代理,也可以通过创建订阅新记录并将其写入或仅将其标记为另一个数据库的程序来轻松地将其与您的仓库集成在一起。

您可以在 python或JavaScript应用程序中使用此功能使用PythonJavaScript客户端SDK。

希望您发现此版本有用。如果您有任何疑问或反馈,请不要犹豫在Discord或通过就GitHub进行讨论。

感谢您使用ReductStore