Friday, November 24, 2017

Publishing Sensor Data to MQTT

When publishing sensor data to MQTT, two question to ask are: how often is the data published?, and with what distribution?

Here are two examples:

  • Publish temperature, humidity, and barometric pressure from one sensor every second
  • Publish and verify RFID card swipes that employees use when entering a building.

In the first example, we have a good idea of the frequency of the data as well as how that data is distributed: data is sent once every second and it is uniformly distributed - in essence, it is time series data. In the second example, the frequency of data transmission depends on the time of day. Just before the business day starts, there will be a large number of RFID card swipes as the employees "clock in", then for the rest of the day the number of RFID events will be far lower.

The answers to those two questions will determine various aspects of the backend IoT architecture: the database that should be used to store and otherwise interact with this sensor data, the "analytics pipeline" (my term, ©2017, Patriot Geek) used to analyze and visualize that data, etc.

For this blog post, we will publish time series data using our old friends, the BME280 and the ESP8266! Please see this earlier blog post for wiring and importing the proper libraries:

http://patriot-geek.blogspot.com/2017/03/nodemcu-and-digital-sensors.html

Temperature, humidity, and barometric pressure will be published once every second to a MQTT broker. Compared to industrial IoT applications, this is not a lot of data! We can thus use a database like MongoDB or even MySQL for storage. The analytics pipeline will be discussed in a later post.

The next question to ask is: how should that data be formatted? One solution is to use a JSON format:

{
    "temperature": 71.55,
    "humidity": 41.46,
    "barometric-pressure": 29.73
}

An alternative is to use a CSV format like this:

71.55,41.46,29.73

The JSON format contains a human-readable description of those three values, whereas the CSV format contains considerably fewer characters. Since MQTT is all about short messages, we will use the CSV format. This will also serve us well should we want to use LoRa to send the same information. If for any reason we want to reformat the CSV into JSON, we can do so later.

Here is the code that will have our ESP8266 publish this data:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
/**
 * BME280-MQTT.js
 *
 * By: Mike Klepper
 * Date: 24 November 2017
 *
 * This program demonstrates how to publish to MQTT
 *
 * See blog post on patriot-geek.blogspot.com
 * for instructions.
 */


#include "ESP8266WiFi.h"
#include "PubSubClient.h"
#include "Wire.h"
#include "Adafruit_Sensor.h"
#include "Adafruit_BME280.h"

WiFiClient wifiClient;
PubSubClient mqttClient(wifiClient);
Adafruit_BME280 bme;

const char* SSID                  = "**********";
const char* PASSWORD              = "**********";

const char* MQTT_BROKER           = "192.168.1.11";
const int   MQTT_PORT             = 1883;
const char* MQTT_TOPIC            = "home/living-room";
const char* MQTT_CLIENT_NAME      = "ESP8266Client";

const int   BME280_DELAY          = 3000;
const int   RECHECK_INTERVAL      = 500;
const int   PUBLISH_INTERVAL      = 1000;

long        lastReconnectAttempt  = 0;
long        lastPublishAttempt    = 0;


void setup() 
{
  Serial.begin(115200);
  delay(10);

  setupWifi();
  setupBME280();
  setupMQTT();
}

void loop() 
{
  long now = millis();
  
  if(!mqttClient.connected()) 
  {
    if(now - lastReconnectAttempt > RECHECK_INTERVAL) 
    {
      lastReconnectAttempt = now;
      mqttClient.connect(MQTT_CLIENT_NAME);

      if(mqttClient.connected())
      {
        // Resubscribe to any topics, if necessary
        // This is also a good place to publish an error to a separate topic!
      }
    }
  }
  else
  {
    if(now - lastPublishAttempt > PUBLISH_INTERVAL)
    {
      lastPublishAttempt = now;
      readAndPublishData();
      mqttClient.loop();
    }
  }
}


void setupWifi()
{
  Serial.println("");
  Serial.print("Connecting to ");
  Serial.print(SSID);

  WiFi.begin(SSID, PASSWORD);

  while(WiFi.status() != WL_CONNECTED) 
  {
    delay(RECHECK_INTERVAL);
    Serial.print(".");
    yield();
  }

  Serial.println("");
  Serial.println("WiFi connected");
  Serial.print("IP address: ");
  Serial.println(WiFi.localIP());
}

void setupBME280()
{
  Serial.println("");
  Serial.print("Setting up BME280");

  while(!bme.begin())
  {
    delay(RECHECK_INTERVAL);
    Serial.print(".");
    yield();
  }
  
  Serial.println("");
  Serial.println("BME280 found!");
  delay(BME280_DELAY);
}

void setupMQTT()
{
  mqttClient.setServer(MQTT_BROKER, MQTT_PORT);
  
  Serial.println("");
  Serial.print("Connecting to MQTT");
  
  while(!mqttClient.connected())
  {
    Serial.print(".");
    mqttClient.connect(MQTT_CLIENT_NAME);
    delay(RECHECK_INTERVAL);
  }
  
  Serial.println("");
  Serial.println("Connected!");
  Serial.println("");
}

void readAndPublishData()
{
  if(mqttClient.connected())
  {
    float tempC = bme.readTemperature();
    float tempF = 9.0/5.0 * tempC + 32.0;
    float humidity = bme.readHumidity();
    float pressurePascals = bme.readPressure();
    float pressureInchesOfMercury = 0.000295299830714 * pressurePascals;
  
    String msg = String(tempF) + "," + String(humidity) + "," + String(pressureInchesOfMercury);
    char msgAsCharAway[msg.length()];
    msg.toCharArray(msgAsCharAway, msg.length());
  
    Serial.print(msg);
    mqttClient.publish(MQTT_TOPIC, msgAsCharAway);
    Serial.println(" - sent");
  }
}

Here's a walkthrough of the code. The functions for initializing the wifi and the BME280 are the same as in previous posts; the new stuff is how we publish to MQTT, and how we time those publications.

14 - 18 The various libraries we'll be using
24 - 25 Name and password for wifi network
45 - 47 Connect to wifi, start the BME280, and connect to MQTT broker
120 Initialize the MQTT client, specifying the broker's IP address and port
125 While we're not connected...
128 ...attempt to connect
139 If the MQTT client is connected...
141 - 145 ...read data from the BME280 and convert to British units...
147 - 149 ...format the data as CSV, and prepare it for publication...
152 ...and publish it to the broker!
50 - 77 This loop function is based on the non-blocking reconnect example from the PubSubClient library
52 ...get the number of milliseconds since this sketch started running...
54 If we're not connected to the broker...
56 ...check to see how long it has been since we last tried reconnecting; if it has been more than a specified time...
58 ...reset the timer...
59 ...and try to reconnect
61 If we were successful...
63 - 64 ...perform any housekeeping activities, like reporting an error
68 Otherwise we're connected...
70 ...check to see how long it has been since we last published data; if sufficient time has passed...
72 ...reset the timer...
73 ...publish the data...
74 ...and process any incoming messages.

To test the application, first open a terminal window and start MQTT broker with the command:

mosquitto

Open another terminal window and start a MQTT client and subscribe to the topic home/living-room:

mosquitto_sub -h localhost -t home/living-room

Finally, watch the incoming data!

Some notes about this application:

  1. Data is transmitted in an insecure manner!
  2. A timestamp is not included in the data.
These two issues will be addressed in a later blog post.

7 comments:

  1. what is the name of sensor please

    ReplyDelete
    Replies
    1. It is a BME280, a combined temp/humidity/air pressure sensor. It is available from SparkFun, Adafruit, etc.

      Delete
  2. Great blog. Here I found valuable information on MQTT service. Thanks for sharing

    ReplyDelete
  3. Very nice blog. This is exactly what I was trying to. Your code is very clear and it worked well for me. Thanks.

    ReplyDelete
  4. This comment has been removed by a blog administrator.

    ReplyDelete