Skip to main content

Saving data in InfluxDB using Apache NiFi

· 5 min read

Enterprises produce a huge amount of data from a variety of sources, such as, for instance, sensors. Sensors are used to measure different physical characteristics of a machine, zone, etc., e.g., pressure, ph, temperature and so on. These sensors provide sensor data tags as time series data. In this post, we will save data in InfluxDB, an open-source time series database (TSDB),  using an Apache NiFi dataflow.

In order to simulate data provided by sensors, we will use Apache NiFi Simulator Bundle, which provides a utility that allows for generating random and realistic time series data. This simulator is driven by a configuration file that defines the shape (patterns, noise, cycles, etc.) and then converts that data into a time series value. The configuration file documentation is located here. We will use the following configuration file for generating temperature data randomly (temperature.json):

{
"generators": [
{
"name": "monthly-basis",
"type": "monthly",
"points": {
"january": 3.3,
"february": 3.7,
"march": 6.8,
"april": 9.8,
"may": 13.6,
"june": 16.2,
"july": 18.4,
"august": 18,
"september": 14.9,
"october": 11.1,
"november": 6.8,
"december": 3.9
}
},
{
"name": "daily-variation",
"type": "daily",
"points": {
"00:00:00": -3,
"02:00:00": -3.9,
"04:00:00": -5,
"06:00:00": -4.6,
"08:00:00": -5.7,
"10:00:00": -2.2,
"12:00:00": 1,
"14:00:00": 3,
"16:00:00": 2.3,
"18:00:00": 0.9,
"20:00:00": -2.3,
"22:00:00": -2.7
}
},
{
"name": "noise1",
"type": "arma",
"origin": "2019-01-01 00:00:00",
"model": {
"std": 0.2,
"c": 0,
"seed": 1234
},
"timestep": 1000
},
{
"name": "noise2",
"type": "arma",
"origin": "2019-01-01 00:00:00",
"model": {
"std": 0.2,
"c": 0,
"seed": 4567
},
"timestep": 1000
},
{
"name": "result1",
"type": "aggregate",
"aggregator": "sum",
"generators": [
"monthly-basis",
"daily-variation",
"noise1"
]
},
{
"name": "result2",
"type": "aggregate",
"aggregator": "sum",
"generators": [
"monthly-basis",
"daily-variation",
"noise2"
]
}
],
"exported": [
{
"name": "temp-zone1",
"generator": "result1",
"frequency": 2000
},
{
"name": "temp-zone2",
"generator": "result2",
"frequency": 2000
}
],
"from": "2018-01-01 00:00:00",
"to": "2019-12-31 23:59:59.999"
}

This file has been built using and modifying some of the configurations downloaded from this page.

Before defining the Apache NiFi dataflow, we will install the Apache NiFi Simulator bundle following this guideline, executing maven statements. We have to navigate to the nifi-simulator-bundle-nar/target directory and copy the nifi-simulator-bundle-nar-1.0-SNAPSHOT.nar file to the Apache NiFi /lib folder and the restart NiFi.

We will design a dataflow as illustrated in the diagram below.

In our example, with this dataflow we will control the temperature of two points or zones (temp-zone1 and temp-zone2) of a machine (machine01)

First processor, GenerateTimeSeriesFlowFile, generate random temperature data driven by the configuration file aforementioned. The properties of this processor are:

Simulator configuration file can be wherever you want, but must be reached by the Apache NiFi server. This processor generate FlowFiles like the following:

temp-zone1,1540550813883,3.852215625676443
temp-zone2,1540550813883,4.1659104145333865

In order to save this data in an InfluxDB, we need to convert this format to InfluxDB line protocol. Basically, line protocol is:

Dataflow executes this conversion in two steps. First step is carried out by Convert to InfluxDB line protocol process. In this process we get the part

so that we transform data to this format

temp-zone1=3.852215625676443,temp-zone2=4.1659104145333865 1540550813883000000

Timestamp is in nano-seconds resolution and because of that it ends with six zeros. The properties of this processor are:

We are using a Groovy script engine, and the script body is:

import java.nio.charset.StandardCharsets

def flowFile = session.get()
if(!flowFile) return

flowFile = session.write(flowFile, {inputStream, outputStream ->
String result = ''
String ts = ''
inputStream.eachLine { line ->
a = line.tokenize(',')
ts = a[1]
result += a[0] + "=" + a[2] + ","
}
result = result.substring(0, result.length()-1) + ' ' + ts + '000000'
outputStream.write(result.getBytes(StandardCharsets.UTF_8))

} as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)

The second step of the conversion is responsibility of ReplaceText process. This process deals with this part of the line protocol:

The properties of this processor are:

${measurement} and ${machine} are general variables

Right clicking on NiFi canvas you get this pop-up menu

Clicking on Variables you will see all variables

The result of this process is:

temperature,machine=machine01 temp-zone1=3.852215625676443,temp-zone2=4.1659104145333865 1540550813883000000

Finally, we will put the data in an InfluxDB database using a PutInfluxDB proccesor. The properties of this processor are:

Username and Password properties must be set only if your InfluxDB database needs authentication. You also must set connection url properly depending on where your InfluxDB is.

After executing the dataflow, we can consult the data in our InfluxDB.

Connected to http://localhost:8086 version 1.7.1
InfluxDB shell version: 1.7.1
Enter an InfluxQL query
> auth
username: admin
password:
> use test1
Using database test1
> select machine,"temp-zone1","temp-zone2" from temperature order by time desc limit 1
name: temperature
time machine temp-zone1 temp-zone2
---- ------- ---------- ----------
1540550813883000000 machine01 3.852215625676443 4.1659104145333865
>

Conclusion

In this post, we have learned how easy it is to save data in InfluxDB using Apache NiFi with a few concrete processes. Apache NiFi is a versatile tool that easily converts and transforms data without the need to program.

You can try this dataflow out by downloading the template from this gist. You will need to tweak URL, Username and Password properties in PutInfluxDB processor and you will have to add measurement and machine variables.