介绍
此博客文章说明了如何构建车辆命令日志存储,以跟踪命令请求和通过应用程序客户发送给车辆的响应。为此,我们考虑从使用MQTT V5发布的MQTT消息中提取MQTT 5元数据信息,并通过AWS IoT IoT Core Core Rules Engine Strure规则丰富有效载荷。处理后的数据存储在Amazon DynamoDB中。
目标是在云侧使用尽可能小的定制代码,并依靠AWS云侧的本机集成,例如带有Dynamo DB Action的IoT规则。
有关此解决方案的现场演示,您可以观看下面在IoT Builders YouTube channel上链接的视频:
用MQTT 5 Metadata
丰富有效载荷
MQTT 5请求/响应模式
作为此博客文章的一部分,我们探索了MQTT 5:Request/Response pattern
的功能。
请求/响应消息模式是一种以异步方式跟踪对客户端请求的响应的方法。这是MQTTV5中实现的一种机制,允许发布者指定主题以根据特定请求发送响应。当订户收到请求时,它还收到主题以发送响应。此模式支持一个相关数据字段,该字段允许跟踪数据包,例如请求或设备标识参数。
让我们看一个示例:
我们想从客户端应用程序通过MQTT发送命令。流量如下:
-
车辆订阅请求主题,客户端应用程序订阅其确定的响应主题,可以取决于应用程序实例ID。
-
应用客户端在请求主题上发布请求。在我们的情况下,有效载荷是一个简单的文本
DOOR_LOCK
,指示命令锁定车门。遵循MQTT 5模式,除了有效载荷外,我们还发送元数据,例如Response Topic
,Content Type
和Correlation Data
:- a
request id
, - a
-
user id
。
此数据有助于将MQTT请求及其响应相关联。
- a
-
当设备接收MQTT消息时,它执行命令并在指定的响应主题上发布响应。发布后,它重新介绍了相关数据,但也附加了响应特定信息,例如:
- a
response timestamp
, -
command
正在回应, 图表上有红色。
- a
-
应用程序实例接收有关其先前订阅的响应主题的此信息。
先决条件和方法
先决条件
要在本地重新创建此演示,需要已经进行:
- 一个具有创建物联网资源许可的AWS帐户。
- 两个为应用程序和汽车模拟器创建的AWS IoT事物,并带有本地存储的证书和密钥,可以连接到AWS IoT Core。
- 一个物联网策略,允许在配置主题上进行连接,订阅和数据发布。
- 需要事先创建一个Amazon DynamoDB表,其中一个唯一的标识符作为主键。
本节查看要执行的步骤:
步骤1:设置MQTTJS客户端
首先,我们为汽车和应用程序客户端构建了两个客户端模拟器。我们使用MQTTJS for Javascript及其MQTT V5实施支持。下面的步骤1将描述MQTT客户端实现的详细信息。
运行应用程序模拟器后,它会连接,订阅响应主题并保持连接。每10秒,应用程序模拟器发布DOOR_LOCK
文本消息命令。运行汽车模拟器后,它连接并保持连接,然后从应用程序客户端接收命令,使用JavaScript计时器模拟其执行,并将DOOR_LOCK_SUCCESS
响应发送回。
MQTT请求/响应模式实现在下面的代码段中举例说明:
汽车模拟器
连接选项和连接创建:
//create options for MQTT v5 client
const options = {
clientId: CLIENT_ID,
host: ENDPOINT,
port: PORT,
protocol: 'mqtts',
protocolVersion: 5,
cert: fs.readFileSync(CERT_FILE),
key: fs.readFileSync(KEY_FILE),
reconnectPeriod: 0,
enableTrace: false
}
//connect
const client = mqtt.connect(options);
MQTT事件处理程序实现:
client.on('connect', (packet) => {
console.log('connected');
console.log('subscribing to', SUB_TOPIC);
client.subscribe(SUB_TOPIC);
});
//handle Messages
client.on('message', (topic, message, properties) => {
console.log('Received Message',message.toString());
console.log('Received Message Properties', properties );
if(message && message.toString() === 'DOOR_LOCK' ) {
console.log('Executing', JSON.stringify(message));
setTimeout(() => {
const response = 'DOOR_LOCK_SUCCESS';
const responseTopic = properties.properties.responseTopic;
console.log("ResponseTopic: ", responseTopic);
console.log('Publishing Response: ', response," to Topic: ", responseTopic.toString());
const cDataString = properties.properties.correlationData.toString();
console.log('Correlation Data String: ', cDataString)
let correlationDataJSON = JSON.parse(cDataString);
correlationDataJSON.resp_ts = new Date().toISOString();
correlationDataJSON.cmd = message.toString();
client.publish(responseTopic.toString(), response, {
qos: 1,
properties: {
contentType: 'text/plain',
correlationData: JSON.stringify(correlationDataJSON)
}
}, (error, packet) => {
//ERROR Handlers here.
});
}, 5000);
}
});
应用客户端模拟器
连接设置与上述相同。
MQTT事件处理程序实现:
//handle the message
client.on('message', (topic, message, properties) => {
//Only log for now.
console.log('Received message: ', message.toString());
console.log('Received message properties: ', properties );
});
以测试目的发布命令:
//publish
setInterval(() => {
const requestId = v4();
// const message = JSON.stringify({ping: 'pong'});
console.log('Publishing message ');
client.publish(PUB_TOPIC, 'DOOR_LOCK', {
qos: 1, properties: {
responseTopic: SUB_TOPIC,
contentType: 'text/plain',
correlationData: JSON.stringify({
requestId: requestId,
userId: userId,
req_ts: new Date().toISOString()
})
}
}
, (error, packet) => {
console.log(JSON.stringify(packet))
})
}, 10000);
步骤2:AWS IOT规则Amazon DynamoDB Action
AWS IoT规则可用于使用MQTT 5响应主题信息以及相关数据和内容类型来丰富消息有效负载。我们对命令请求和响应都感兴趣,每个响应都发布给了不同的MQTT主题。因此,我们创建的物联网规则必须从两个主题中选择数据,如下图所示。
由于我们正在构建车辆命令日志存储,在这种情况下,此类有效载荷是使用Content-type text/plain
发送的,因此我们仅在规则SQL语句中对此类消息进行过滤。
由于所需的行为是将命令消息和元数据存储在发电机DB表中,因此该规则将准备一个新的JSON对象要存储。每个键将在Amazon Dynamo DB中存储为单独的列。创建了一个新的唯一消息标识符(msgId
)是表的主要键。为了提取MQTT 5元数据,规则SQL使用koude14 SQL函数。编码/解码功能用于操纵来自base64
编码字符串的数据。
要指定的规则操作是Amazon DynamoDB,指向先前创建的表。
IoT规则SQL如下所示:
SELECT
{"msgId": newuuid(),
"name": decode(encode(*, 'base64'), 'base64'),
"requestId": decode(get_mqtt_property('correlation_data'), 'base64').requestId, "req_ts": decode(get_mqtt_property('correlation_data'), 'base64').req_ts,
"cmd": decode(get_mqtt_property('correlation_data'), 'base64').cmd,
"resp_ts": decode(get_mqtt_property('correlation_data'), 'base64').resp_ts,
"userId": decode(get_mqtt_property('correlation_data'), 'base64').userId, "responseTopic": get_mqtt_property('response_topic') }
FROM 'cmd/#'
where get_mqtt_property('content_type') = 'text/plain'
运行模拟几分钟后,您应该看到带有数据条目的亚马逊发电机DB表,如下图所示:
结论
本博客文章显示了如何使用MQTT 5 IOT规则SQL函数从消息中提取MQTT 5元数据,并使用它们来丰富设备有效负载。设计目标之一是通过本机云集成实现丰富和存储,而无定制消息处理代码。
演示的车辆命令日志店用例就是一个例子,可以探索可能的艺术。有关MQTT 5支持的AWS IoT核心规则引擎SQL功能的更多信息,请查看documentation。
如果您对此解决方案的现场演示有详细的演示感兴趣,则可以观看YouTube video。要通知更多物联网内容,您可以订阅IoT Builders YouTube channel。