MQTT protocol在物联网应用中非常受欢迎。这是连接不同数据源的简单方法
使用您的应用程序使用发布/订阅模型。有时您可能需要将MQTT数据的历史记录到
将其用于模型培训,诊断或指标。如果您的数据源提供了可以
的不同数据格式
不被解释为浮标的时间序列,还需要还原。
让我们制作一个简单的mqtt应用程序以查看其工作原理。
先决条件
对于此用法示例,我们有以下要求:
- Linux AMD64
- Docker和Docker撰写
- nodejs> = 16
如果您是Ubuntu用户,请使用此命令安装依赖项:
$ sudo apt-get update
$ sudo apt-get install docker-compose nodejs
与Docker合作运行MQTT经纪人和还原商店
运行经纪人和存储的最简单方法是使用Docker组成。因此,我们应该创建一个docker-compose.yml
在示例的文件夹中使用服务:
version: "3"
services:
reduct-storage:
image: reductstore/reductstore:latest
volumes:
- ./data:/data
ports:
- "8383:8383"
mqtt-broker:
image: eclipse-mosquitto:1.6
ports:
- "1883:1883"
然后运行配置:
docker-compose up
Docker撰写了下载图像并运行了容器。请注意我们为MQTT出版了1883年港口
协议和8383,用于ReductStore HTTP API。
写Nodejs脚本
现在,我们准备好用代码弄脏。让我们初始化NPM软件包和
安装MQTT Client和
JavaScript Client SDK.
$ npm init
$ npm install --save reduct-js async-mqtt
当我们安装了所有依赖项时,我们可以编写脚本:
const MQTT = require('async-mqtt');
const {Client} = require('reduct-js');
MQTT.connectAsync('tcp://localhost:1883').then(async (mqttClient) => {
await mqttClient.subscribe('mqtt_data');
const reductClient = new Client('http://localhost:8383');
const bucket = await reductClient.getOrCreateBucket('mqtt');
mqttClient.on('message', async (topic, msg) => {
const record = await bucket.beginWrite(topic);
await record.write(msg)
console.log('Received message "%s" from topic "%s" is written', msg,
topic);
});
}).catch(error => console.error(error));
让我们详细查看代码。首先,我们必须连接到MQTT经纪人并订阅主题。主题名称
只是随机字符串,生产者应该知道的。在我们的情况下,是mqtt_data
:
MQTT.connectAsync('tcp://localhost:1883').then(async (mqttClient) => {
await mqttClient.subscribe('mqtt_data');
// rest of code
}).catch(error => console.error(error));
如果MQTT连接成功,我们可以开始处理还原商店。要开始在那里编写数据,我们需要一个
桶。我们创建一个名称为mqtt
或获得现有的存储桶:
const reductClient = new Client('http://localhost:8383');
const bucket = await reductClient.getOrCreateBucket('mqtt');
最后一步是将收到的消息写入存储。我们必须使用回调
对于event message
,捕获它。然后,我们将消息写入输入mqtt_data
:
mqttClient.on('message', async (topic, msg) => {
const record = await bucket.beginWrite(topic);
await record.write(msg)
console.log('Received message "%s" from topic "%s" was written', data,
topic);
});
当我们调用bucket.beginWrite
时,如果不存在,我们会在存储桶中创建一个条目。然后我们将数据写入
使用当前时间戳进入。现在,我们的MQTT数据在存储中很安全且声音,我们可以使用
访问它们
同一SDK。
将数据发布到MQTT主题
启动脚本时,它无能为力,因为MQTT没有数据。您必须将某些内容发布到主题
mqtt_data
。我更喜欢使用mosquitto_pub。对于Ubuntu用户,这是一个
mosquitto-clients
软件包的一部分:
$ sudo apt-get install mosquitto-clients
$ mosuitto_pub -t mqtt_data -m "Hello, world!"
从还原商店获取数据
现在,您知道如何从MQTT获取数据并将其写入还原商店,但是我们需要一些Nodejs脚本来读取数据
从存储中:
const {Client} = require('reduct-js');
const client = new Client('http://localhost:8383');
client.getBucket('mqtt').then(async (bucket) => {
const record = await bucket.beginRead('mqtt_data');
console.log('Last record: %s', await record.readAsString());
// Get data for lash hour
const stopTime = BigInt(Date.now() * 1000);
const startTime = stopTime - 3_600_000_000n;
for await (const record of bucket.query('mqtt_data', startTime, stopTime)) {
const data = await record.read();
console.log('Found record "%s" with timestamp "%d"', data, record.time);
}
}).catch(error => console.error(error));
阅读条目中的最新记录非常简单:
const record = await bucket.beginRead('mqtt_data');
const data = await record.readAsString();
要获取TimeInterval的数据,我们可以使用query
方法。它返回异步迭代器,因此我们可以使用for await
循环遍历记录:
const stopTime = BigInt(Date.now() * 1000);
const startTime = stopTime - 3_600_000_000n;
for await (const record of bucket.query('mqtt_data', startTime, stopTime)) {
const data = await record.read();
console.log('Found record "%s" with timestamp "%d"', data, record.time);
}
请注意,存储使用的时间戳具有微秒的精度,因此我们不能使用Date
类和number
类型。
为什么我们使用BigInt
。
结论
您可以看到,MQTT协议和还原商店非常简单的技术,可以很容易地在
中使用
nodejs。
您可以找到示例here的源代码。如果您有任何疑问或运行的问题。随意
制作an issue。
我希望本教程有所帮助。谢谢!