使用AWS IoT核心规则引擎使用MQTT 5元数据丰富有效载荷
#javascript #iot #mqtt5 #awsiot

介绍

此博客文章说明了如何构建车辆命令日志存储,以跟踪命令请求和通过应用程序客户发送给车辆的响应。为此,我们考虑从使用MQTT V5发布的MQTT消息中提取MQTT 5元数据信息,并通过AWS IoT IoT Core Core Rules Engine Strure规则丰富有效载荷。处理后的数据存储在Amazon DynamoDB中。

目标是在云侧使用尽可能小的定制代码,并依靠AWS云侧的本机集成,例如带有Dynamo DB Action的IoT规则。

有关此解决方案的现场演示,您可以观看下面在IoT Builders YouTube channel上链接的视频:

用MQTT 5 Metadata

丰富有效载荷

MQTT 5请求/响应模式

作为此博客文章的一部分,我们探索了MQTT 5:Request/Response pattern的功能。

请求/响应消息模式是一种以异步方式跟踪对客户端请求的响应的方法。这是MQTTV5中实现的一种机制,允许发布者指定主题以根据特定请求发送响应。当订户收到请求时,它还收到主题以发送响应。此模式支持一个相关数据字段,该字段允许跟踪数据包,例如请求或设备标识参数。

让我们看一个示例:

我们想从客户端应用程序通过MQTT发送命令。流量如下:

  1. 车辆订阅请求主题,客户端应用程序订阅其确定的响应主题,可以取决于应用程序实例ID。

  2. 应用客户端在请求主题上发布请求。在我们的情况下,有效载荷是一个简单的文本DOOR_LOCK,指示命令锁定车门。遵循MQTT 5模式,除了有效载荷外,我们还发送元数据,例如Response TopicContent TypeCorrelation Data

    • a request id
    • a
    • user id

    此数据有助于将MQTT请求及其响应相关联。

  3. 当设备接收MQTT消息时,它执行命令并在指定的响应主题上发布响应。发布后,它重新介绍了相关数据,但也附加了响应特定信息,例如:

    • a response timestamp
    • command正在回应, 图表上有红色。
  4. 应用程序实例接收有关其先前订阅的响应主题的此信息。

MQTT 5 Example Flow

先决条件和方法

先决条件

要在本地重新创建此演示,需要已经进行:

  • 一个具有创建物联网资源许可的AWS帐户。
  • 两个为应用程序和汽车模拟器创建的AWS IoT事物,并带有本地存储的证书和密钥,可以连接到AWS IoT Core。
  • 一个物联网策略,允许在配置主题上进行连接,订阅和数据发布。
  • 需要事先创建一个Amazon DynamoDB表,其中一个唯一的标识符作为主键。

本节查看要执行的步骤:

步骤1:设置MQTTJS客户端

首先,我们为汽车和应用程序客户端构建了两个客户端模拟器。我们使用MQTTJS for Javascript及其MQTT V5实施支持。下面的步骤1将描述MQTT客户端实现的详细信息。

运行应用程序模拟器后,它会连接,订阅响应主题并保持连接。每10秒,应用程序模拟器发布DOOR_LOCK文本消息命令。运行汽车模拟器后,它连接并保持连接,然后从应用程序客户端接收命令,使用JavaScript计时器模拟其执行,并将DOOR_LOCK_SUCCESS响应发送回。

MQTT请求/响应模式实现在下面的代码段中举例说明:

汽车模拟器

连接选项和连接创建:

//create options for MQTT v5 client
        const options = {
            clientId: CLIENT_ID,
            host: ENDPOINT,
            port: PORT,
            protocol: 'mqtts',
            protocolVersion: 5,
            cert: fs.readFileSync(CERT_FILE),
            key: fs.readFileSync(KEY_FILE),
            reconnectPeriod: 0,
            enableTrace: false
        }
        //connect
        const client = mqtt.connect(options);

MQTT事件处理程序实现:

client.on('connect', (packet) => {
            console.log('connected');
            console.log('subscribing to', SUB_TOPIC);
            client.subscribe(SUB_TOPIC);
        });
//handle Messages
        client.on('message', (topic, message, properties) => {
            console.log('Received Message',message.toString());
            console.log('Received Message Properties', properties );

            if(message && message.toString() === 'DOOR_LOCK' ) {
                console.log('Executing', JSON.stringify(message));
                setTimeout(() => {
                    const response = 'DOOR_LOCK_SUCCESS';
                    const responseTopic = properties.properties.responseTopic;
                    console.log("ResponseTopic: ", responseTopic);
                    console.log('Publishing Response: ', response," to Topic: ", responseTopic.toString());
                    const cDataString = properties.properties.correlationData.toString();
                    console.log('Correlation Data String: ', cDataString)
                    let correlationDataJSON = JSON.parse(cDataString);
                    correlationDataJSON.resp_ts = new Date().toISOString();
                    correlationDataJSON.cmd = message.toString();

                    client.publish(responseTopic.toString(), response, {
                        qos: 1,
                        properties: {
                            contentType: 'text/plain',
                            correlationData: JSON.stringify(correlationDataJSON)
                        }
                    }, (error, packet) => {
                          //ERROR Handlers here.
                    });
                }, 5000);
            }
        });

应用客户端模拟器

连接设置与上述相同。

MQTT事件处理程序实现:

//handle the message
        client.on('message', (topic, message,  properties) => {
        //Only log for now.
            console.log('Received message: ', message.toString());
            console.log('Received message properties: ', properties );   
        });

以测试目的发布命令:

//publish
        setInterval(() => {
            const requestId = v4();
            // const message = JSON.stringify({ping: 'pong'});
            console.log('Publishing message ');
            client.publish(PUB_TOPIC, 'DOOR_LOCK', {
                    qos: 1, properties: {
                        responseTopic: SUB_TOPIC,
                        contentType: 'text/plain',
                        correlationData: JSON.stringify({
                            requestId: requestId,
                            userId: userId,
                            req_ts: new Date().toISOString()
                        })
                    }
                }
                , (error, packet) => {
                    console.log(JSON.stringify(packet))
                })
        }, 10000);

步骤2:AWS IOT规则Amazon DynamoDB Action

AWS IoT规则可用于使用MQTT 5响应主题信息以及相关数据和内容类型来丰富消息有效负载。我们对命令请求和响应都感兴趣,每个响应都发布给了不同的MQTT主题。因此,我们创建的物联网规则必须从两个主题中选择数据,如下图所示。

Iot Rule for MQTT 5

由于我们正在构建车辆命令日志存储,在这种情况下,此类有效载荷是使用Content-type text/plain发送的,因此我们仅在规则SQL语句中对此类消息进行过滤。

由于所需的行为是将命令消息和元数据存储在发电机DB表中,因此该规则将准备一个新的JSON对象要存储。每个键将在Amazon Dynamo DB中存储为单独的列。创建了一个新的唯一消息标识符(msgId)是表的主要键。为了提取MQTT 5元数据,规则SQL使用koude14 SQL函数。编码/解码功能用于操纵来自base64编码字符串的数据。

要指定的规则操作是Amazon DynamoDB,指向先前创建的表。

IoT规则SQL如下所示:

SELECT 
{"msgId": newuuid(), 
"name": decode(encode(*, 'base64'), 'base64'), 
"requestId": decode(get_mqtt_property('correlation_data'), 'base64').requestId, "req_ts": decode(get_mqtt_property('correlation_data'), 'base64').req_ts, 
"cmd": decode(get_mqtt_property('correlation_data'), 'base64').cmd, 
"resp_ts": decode(get_mqtt_property('correlation_data'), 'base64').resp_ts, 
"userId": decode(get_mqtt_property('correlation_data'), 'base64').userId, "responseTopic": get_mqtt_property('response_topic') } 
FROM 'cmd/#' 
where get_mqtt_property('content_type') = 'text/plain'

运行模拟几分钟后,您应该看到带有数据条目的亚马逊发电机DB表,如下图所示:

DynamoDB Entries

结论

本博客文章显示了如何使用MQTT 5 IOT规则SQL函数从消息中提取MQTT 5元数据,并使用它们来丰富设备有效负载。设计目标之一是通过本机云集成实现丰富和存储,而无定制消息处理代码。

演示的车辆命令日志店用例就是一个例子,可以探索可能的艺术。有关MQTT 5支持的AWS IoT核心规则引擎SQL功能的更多信息,请查看documentation

如果您对此解决方案的现场演示有详细的演示感兴趣,则可以观看YouTube video。要通知更多物联网内容,您可以订阅IoT Builders YouTube channel

作者Biaoqian19