Analyze Azure IoTHub Data in Grafana using InfluxDB

When sending data to Azure IoTHub it is possible to subscribe to the received data using the IoTHub Service endpoints. Microsoft offers SDKs for various programming languages, making subscribing to- and processing IoTHub Device-to-Cloud messages easy.

In this post the Azure IoT SDK will be used to provide an easy way of processing data and Grafana + InfluxDB will provide cost-efficient way of storing and analyzing IoT data from IoTHub.

Thus the following topics will be covered:

  • setup of a server to contain InfluxDB (as a time series database) and Grafana for visualization

  • setup of an event processor that is able to read all incoming messages from IoTHub and put them into InfluxDB. Furthermore the event-processor transforms any incoming JSON payload into a generic structure in InfluxDB. This way the same event processor can be used for any incoming message, as long as it is JSON formatted.

  • configure the visualization in Grafana

The following steps should be done beforehand:

  • create a IoTHub in your Azure subscription. A free instance is sufficient for this example

  • have a basic understanding of Node.JS and npm

Setup InfluxDB and Grafana

The following description is made for Ubuntu but should be applicable for other Linux systems, too.

First, make sure that all ports that should not be exposed to the internet (like for example the influxdb port) are firewalled. Adopt as needed if you run other services on your server, too.

# Allow the grafana dashboard and ssh
ufw allow 3000
ufw allow ssh

# Enable  the firewall
ufw enable

Next, install the needed packages for Grafana like so:

# 1. Download and install Grafana. Get the current version here: 
# https://grafana.com/grafana/download, http://docs.grafana.org/installation/debian/
wget https://dl.grafana.com/oss/release/grafana_5.4.2_amd64.deb

sudo apt-get install -y adduser libfontconfig
sudo dpkg -i grafana_5.4.2_amd64.deb
sudo service grafana-server start

Grafana should now be available at port 3000, you can login with admin/admin.

Next install InfluxDB.

# 2. Download and Install InfluxDB. Use a repo for that to keep up to date
curl -sL https://repos.influxdata.com/influxdb.key | sudo apt-key add -
source /etc/lsb-release
echo "deb https://repos.influxdata.com/${DISTRIB_ID,,} ${DISTRIB_CODENAME} stable" | sudo tee /etc/apt/sources.list.d/influxdb.list

sudo apt-get update && sudo apt-get install influxdb

# List the current config 
influxd config

# Start services
sudo systemctl enable influxdb
sudo systemctl start influxdb

Edit the configuration file as needed. I would recommend creating a user and securing it via this user. The internet provides instructions for that so I will not go into detail here. If you just want to test it out and secure all ports with ufw, you should be fine even without password for InfluxDB.

Now create an initial database.

# Start the influx cli and creat the Database
influx
CREATE DATABASE iot01
SHOW DATABASES

Setup an event processor

This event processor is written in Javascript (Node.JS). To get started run the following commands:

# 1. Create a new folder
mkdir eventProcessor
cd eventProcessor

# 2. Init the package.json
npm init

# 3. Install dependencies
npm install influx @azure/event-hubs azure-iot-device --save

First let’s look at how data can be written to InfluxDB and write a module for it.

In the following snippet a new schema with one measurement called generic_messages is created. It contains two fields, jsonvalue for strings and jsonasnumber for numbers. This is because Grafana can draw charts only for numbers and cannot transform the strings on the fly. So when a number is placed in an JSON, it needs to be parsed in this event processor and stored as number in InfluxDB. Only then Grafana is able to display it correctly in a graph.Additionally the schema contains two tags. Tags are indexed in InfluxDB, and in this case contain information about what tag is actually stored and from which device the message came. Finally the writeToInfluxDB function takes the needed parameters (key, value pair from JSON and deviceID), tries to parse the message as number and sends it to InfluxDB. Make sure to change the host to match your database.

influxwriter.js

const Influx = require('influx');

//So this is some generic influxDB schema for IoT Data.
const influx = new Influx.InfluxDB({
    host: 'localhost',
    database: 'iot01',
    schema: [
        {
            measurement: 'generic_messages',
            fields: {
                jsonvalue: Influx.FieldType.STRING,
                jsonasnumber: Influx.FieldType.FLOAT
            },
            tags: [
                'jsontag',
                'deviceId'
            ]
        }
    ]
});

/**
 * Writes a generic json key/value pair to InfluxDB... 
 * @param {string} key key of the json pair
 * @param {string} value  value of the json pair
 * @param {string} deviceId iothub deviceId
 */
let writeToInfluxDB = function (key, value, deviceId) {
    let parsedNumber = 0;
    try {
        parsedNumber = parseFloat(value);
        influx.writePoints([
            {
                measurement: 'generic_messages',
                fields: { jsonvalue: value, jsonasnumber: parsedNumber  },
                tags: { jsontag: key, deviceId: deviceId }
            }
        ])
    } catch (e){
        //couldnt parse, so send string only
        influx.writePoints([
            {
                measurement: 'generic_messages',
                fields: { jsonvalue: value },
                tags: { jsontag: key, deviceId: deviceId }
            }
        ])
    }
}

module.exports = { writeToInfluxDB: writeToInfluxDB}

Wiring it together, the main file index.js uses the @azure/event-hubs package to receive all incoming data from IoTHub. This is just copied from the quick start tutorials available online and I seperated the callbacks on message receive into a separate module.

index.js

var { EventHubClient, EventPosition } = require('@azure/event-hubs');
let Message = require('azure-iot-device').Message;
let iothubreader = require('./iothubreader');

var connectionString = '<yourServiceConnectionString>';



EventHubClient.createFromIotHubConnectionString(connectionString).then(function (client) {
    console.log("Successully created the EventHub Client from iothub connection string.");
    ehClient = client;
    return ehClient.getPartitionIds();
  }).then(function (ids) {
    console.log("The partition ids are: ", ids);
    return ids.map(function (id) {
      return ehClient.receive(id, iothubreader.processMessage, iothubreader.printError, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
    });
  }).catch(iothubreader.printError);

The sperate module takes the messages received in index.js (they are already parsed at that point) and iterates over each object in the JSON. Finally it calls the writeToInfluxDB function and writes the data into InfluxDB.

iothubreader.js

let influxwriter = require('./influxwriter');

let printError = function (err) {
    console.log(err.message);
};
let processMessage = function (message) {
    let body = message.body;
    let additionalProperties = message.applicationProperties;
    let deviceId = message.annotations["iothub-connection-device-id"];

    Object.keys(body).forEach((key) => {
        influxwriter.writeToInfluxDB(key, body[key], deviceId);
    })
};

module.exports = { printError: printError, processMessage: processMessage }

Run the processor using this command:

node index.js

Setup a Device Simulator

In order to test if data is written into InfluxDB, create the following file.

deviceSimulator.js

var Mqtt = require('azure-iot-device-mqtt').Mqtt;
var DeviceClient = require('azure-iot-device').Client
var Message = require('azure-iot-device').Message;
var connectionString = '<deviceConnectionString>';
var client = DeviceClient.fromConnectionString(connectionString, Mqtt);

// Create a message and send it to the IoT hub every second
setInterval(function(){
  // Simulate telemetry.
  var temperature = 20 + (Math.random() * 15);
  var message = new Message(JSON.stringify({
    temperature: temperature,
    humidity: 60 + (Math.random() * 20)
  }));

  // Add a custom application property to the message.
  // An IoT hub can filter on these properties without access to the message body.
  message.properties.add('temperatureAlert', (temperature > 30) ? 'true' : 'false');

  console.log('Sending message: ' + message.getData());

  // Send the message.
  client.sendEvent(message, function (err) {
    if (err) {
      console.error('send error: ' + err.toString());
    } else {
      console.log('message sent');
    }
  });
}, 1000);

Execute it like so:

npm install azure-iot-device-mqtt --save
node deviceSimulator.js

As soon as it sends, make sure messages are incoming in Influx like so:

root@grafana:~# influx
Connected to http://localhost:8086 version 1.7.4
InfluxDB shell version: 1.7.4
Enter an InfluxQL query
> use iot01
Using database iot01
> SELECT * FROM generic_messages
name: generic_messages
time                deviceId jsonasnumber       jsontag     jsonvalue
----                -------- ------------       -------     ---------
1550778617361208495 device01 20.885118266462264 temperature 20.885118266462264
1550778617362267330 device01 68.9148289461136   humidity    68.9148289461136
1550778617787434192 device01 31.184911719983816 temperature 31.184911719983816
1550778617788229636 device01 62.67180622678427  humidity    62.67180622678427
1550778618820907849 device01 31.098909969814596 temperature 31.098909969814596
1550778618821550928 device01 66.9168538078389   humidity    66.9168538078389

Configure Visualization in Grafana

Now that all is setup and data is incoming, connect Grafana to InfluxDB by adding a new InfluxDB datasource. If you have it running at the same server as Grafana, configure it like so.

Finally, create a dashboard and select queries like so. This is the part where you can get creative ;)