如何使用node.js保留MQTT数据的历史记录
#node #database #reductstore #iot

MQTT protocol在物联网应用中非常受欢迎。这是连接不同数据源的简单方法
使用您的应用程序使用发布/订阅模型。有时您可能需要将MQTT数据的历史记录到
将其用于模型培训,诊断或指标。如果您的数据源提供了可以
的不同数据格式 不被解释为浮标的时间序列,还需要还原。

让我们制作一个简单的mqtt应用程序以查看其工作原理。

先决条件

对于此用法示例,我们有以下要求:

  • Linux AMD64
  • Docker和Docker撰写
  • nodejs> = 16

如果您是Ubuntu用户,请使用此命令安装依赖项:

$ sudo apt-get update
$ sudo apt-get install docker-compose nodejs

与Docker合作运行MQTT经纪人和还原商店

运行经纪人和存储的最简单方法是使用Docker组成。因此,我们应该创建一个docker-compose.yml
在示例的文件夹中使用服务:

version: "3"
services:
  reduct-storage:
    image: reductstore/reductstore:latest
    volumes:
      - ./data:/data
    ports:
      - "8383:8383"

  mqtt-broker:
    image: eclipse-mosquitto:1.6
    ports:
      - "1883:1883"

然后运行配置:

docker-compose up

Docker撰写了下载图像并运行了容器。请注意我们为MQTT出版了1883年港口
协议和8383,用于ReductStore HTTP API

写Nodejs脚本

现在,我们准备好用代码弄脏。让我们初始化NPM软件包和
安装MQTT Client
JavaScript Client SDK.

$ npm init
$ npm install --save reduct-js async-mqtt 

当我们安装了所有依赖项时,我们可以编写脚本:

const MQTT = require('async-mqtt');
const {Client} = require('reduct-js');

MQTT.connectAsync('tcp://localhost:1883').then(async (mqttClient) => {
    await mqttClient.subscribe('mqtt_data');

    const reductClient = new Client('http://localhost:8383');
    const bucket = await reductClient.getOrCreateBucket('mqtt');

    mqttClient.on('message', async (topic, msg) => {
        const record = await bucket.beginWrite(topic);
        await record.write(msg)
        console.log('Received message "%s" from topic "%s" is written', msg,
            topic);
    });

}).catch(error => console.error(error));

让我们详细查看代码。首先,我们必须连接到MQTT经纪人并订阅主题。主题名称
只是随机字符串,生产者应该知道的。在我们的情况下,是mqtt_data


MQTT.connectAsync('tcp://localhost:1883').then(async (mqttClient) => {
    await mqttClient.subscribe('mqtt_data');

    // rest of code
}).catch(error => console.error(error));

如果MQTT连接成功,我们可以开始处理还原商店。要开始在那里编写数据,我们需要一个
桶。我们创建一个名称为mqtt或获得现有的存储桶:

const reductClient = new Client('http://localhost:8383');
const bucket = await reductClient.getOrCreateBucket('mqtt');

最后一步是将收到的消息写入存储。我们必须使用回调
对于event message,捕获它。然后,我们将消息写入输入mqtt_data

mqttClient.on('message', async (topic, msg) => {
    const record = await bucket.beginWrite(topic);
    await record.write(msg)
    console.log('Received message "%s" from topic "%s" was written', data,
        topic);
});

当我们调用bucket.beginWrite时,如果不存在,我们会在存储桶中创建一个条目。然后我们将数据写入
使用当前时间戳进入。现在,我们的MQTT数据在存储中很安全且声音,我们可以使用
访问它们 同一SDK

将数据发布到MQTT主题

启动脚本时,它无能为力,因为MQTT没有数据。您必须将某些内容发布到主题
mqtt_data。我更喜欢使用mosquitto_pub。对于Ubuntu用户,这是一个
mosquitto-clients软件包的一部分:

$ sudo apt-get install mosquitto-clients
$ mosuitto_pub -t mqtt_data -m "Hello, world!"

从还原商店获取数据

现在,您知道如何从MQTT获取数据并将其写入还原商店,但是我们需要一些Nodejs脚本来读取数据
从存储中:

const {Client} = require('reduct-js');

const client = new Client('http://localhost:8383');

client.getBucket('mqtt').then(async (bucket) => {
    const record  = await bucket.beginRead('mqtt_data');
    console.log('Last record: %s', await record.readAsString());

    // Get data for lash hour
    const stopTime = BigInt(Date.now() * 1000);
    const startTime = stopTime - 3_600_000_000n;

    for await (const record of bucket.query('mqtt_data', startTime, stopTime)) {
        const data = await record.read();
        console.log('Found record "%s" with timestamp "%d"', data, record.time);
    }

}).catch(error => console.error(error));

阅读条目中的最新记录非常简单:

const record  = await bucket.beginRead('mqtt_data');
const data = await record.readAsString();

要获取TimeInterval的数据,我们可以使用query方法。它返回异步迭代器,因此我们可以使用for await
循环遍历记录:

const stopTime = BigInt(Date.now() * 1000);
const startTime = stopTime - 3_600_000_000n;

for await (const record of bucket.query('mqtt_data', startTime, stopTime)) {
    const data = await record.read();
    console.log('Found record "%s" with timestamp "%d"', data, record.time);
}

请注意,存储使用的时间戳具有微秒的精度,因此我们不能使用Date类和number类型。
为什么我们使用BigInt

结论

您可以看到,MQTT协议和还原商店非常简单的技术,可以很容易地在
中使用 nodejs。
您可以找到示例here的源代码。如果您有任何疑问或运行的问题。随意
制作an issue

我希望本教程有所帮助。谢谢!

链接