Saturday, November 25, 2017

Node-RED: Subscribing to a MQTT Topic

For our next Node-RED flow, we will build a MQTT client that will subscribe to BME280 sensor data published by an ESP8266. The hardeare and code for publishing that data is described in an earlier post.

The messages published to the MQTT topic home/living-room are CSV strings containing temperature, humidity, and barometric pressure in British units. Here's an example of such a message:

71.55,41.46,29.73

Our goal for this post is to build a Node-RED flow that subscribes to that topic, and parses that data into JSON like this:

{
    "temperature": 71.55,
    "humidity": 41.46,
    "barometric-pressure": 29.73
}

For this flow, we'll be using the following three types of nodes:

MQTT In
Connects to a MQTT server and subscribes to a specified topic
CSV
Converts CSV data into JSON
Debug
Displays messages in the debug tab

These nodes will be connected as follows:

Double-click the MQTT In node and set the following properties:

Server: 192.168.1.11:1883
Topic: home/living-room
QoS: 0

Next, double-click the CSV node and set the following properties:

Columns: temperature,humidity,barometric-pressure
Separator: Comma
Name: Parse CSV into JSON

Finally, double-click each of the Debug nodes and set the following properties:

Output: complete msg object
to: debug tab

The CSV node uses the column names for the field names, and assigns values to those fields in the order of the values in the incoming CSV. The end result is set to be the message's payload. This is how the messages look in the debug tab:

Pretty-printing the top entry in the debug list shows that the msg object has the following structure:

{
  "topic": "home/living-room",
  "payload":
  {
    "temperature": 73.8,
    "humidity": 37.12,
    "barometric-pressure": 29.6
  },
  "qos": 0,
  "retain": false,
  "_msgid": "1c091ae3.456e45"
}

Great, we've subscribed to a MQTT topic and formatted the incoming CSV data as a JSON object!

Wouldn't it be nice if there were timestamps in these objects? As explained in the post where we built and coded the sensor, our little ESP8266 doesn't have a clock running on battery, nor are we getting time from a network time service. Instead, we will add the timestamp to the data once it arrives in our Node-RED flow.

To accomplish this, we will use a Function node, which will run a block of JS. Wire this Function node into the rest of the flow as follows:

Double-click on the Function node, and set the name to be Add Time Info. In the "Function" textarea, add the following code:

1
2
3
4
var now = new Date();
msg.payload["timestamp"] = now.getTime();
msg.payload["fomratted-time"] = now.toUTCString();
return msg;

All that this code does is to add two additional properties to msg.payload. The JavaScript .getDate() method returns the number of milliseconds since January 1, 1970, and the .toUTCString() formats that data into a human-readable form in the GMT timezone.

Clear the debug tab, deploy the edited flow, and examine the incoming data. Here's an example of what the final result will be, once we pretty-print it:

{
  "topic": "home/living-room",
  "payload": 
  {
    "temperature": 73.76,
    "humidity": 37.35,
    "barometric-pressure": 29.6,
    "timestamp": 1511588927869,
    "formatted-time": "Sat, 25 Nov 2017 05:48:47 GMT"
  },
  "qos": 0,
  "retain": false,
  "_msgid":"3867cbee.a2cfa4"
}

Here's the source code for this last flow:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
[
    {
        "id": "ee409dc6.f511b",
        "type": "mqtt in",
        "z": "e260f053.ca405",
        "name": "",
        "topic": "home/living-room",
        "qos": "0",
        "broker": "a481358c.031c18",
        "x": 120,
        "y": 160,
        "wires": [
            [
                "f90ccd18.b0f84",
                "b7c62375.17de5"
            ]
        ]
    },
    {
        "id": "f90ccd18.b0f84",
        "type": "debug",
        "z": "e260f053.ca405",
        "name": "",
        "active": false,
        "console": "false",
        "complete": "true",
        "x": 770,
        "y": 160,
        "wires": []
    },
    {
        "id": "b7c62375.17de5",
        "type": "csv",
        "z": "e260f053.ca405",
        "name": "Parse CSV into JSON",
        "sep": ",",
        "hdrin": "",
        "hdrout": "",
        "multi": "one",
        "ret": "\\n",
        "temp": "temperature,humidity,barometric-pressure",
        "x": 380,
        "y": 220,
        "wires": [
            [
                "5097ac3c.7d4b04"
            ]
        ]
    },
    {
        "id": "adf30d5c.857be",
        "type": "debug",
        "z": "e260f053.ca405",
        "name": "",
        "active": true,
        "console": "false",
        "complete": "true",
        "x": 770,
        "y": 220,
        "wires": []
    },
    {
        "id": "5097ac3c.7d4b04",
        "type": "function",
        "z": "e260f053.ca405",
        "name": "Add Time Info",
        "func": "var now = new Date();\nmsg.payload[\"timestamp\"] = now.getTime();\nmsg.payload[\"fomratted-time\"] = now.toUTCString();\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "x": 600,
        "y": 220,
        "wires": [
            [
                "adf30d5c.857be"
            ]
        ]
    },
    {
        "id": "a481358c.031c18",
        "type": "mqtt-broker",
        "z": "",
        "broker": "192.168.1.11",
        "port": "1883",
        "clientid": "",
        "usetls": false,
        "compatmode": true,
        "keepalive": "60",
        "cleansession": true,
        "willTopic": "",
        "willQos": "0",
        "willPayload": "",
        "birthTopic": "",
        "birthQos": "0",
        "birthPayload": ""
    }
]

What are we going to do with all this data flowing in? That's the topic of the next post!

1 comment: