eResult team data engineering

Kubernetes MQTT broker for Omniaplace big data pipeline

Maurizio Vivarelli
9 min readJun 8, 2020

Omniaplace is eResult integrated data management platform. It offer a wide range of application function and is the heart of company solution offering.

In this article I will describe a fully functional module implementing MQTT broker service on Kubernetes as part of Ominaplace big data pipeline capabilities.

A basic knowledge of Kubernetes infrastructure and Helm package manager is required.

As a little introduction, MQTT or Message Queue Telemetry Transport is a lightweight, publish-subscribe network protocol that transports messages between devices.

Tipical utilization is to connect IOT devices with centralized big data pipelines, for example a system in witch an ECG wearable device send data via MQTT for realtime health analysis.

This module is developed over an on premise 3 node kubernetes cluster where each node is built over ubuntu 18.04 and kubernetes version1.18.3.

While in Kubernetes three type of scaling exists, this paper is focused on horizontal scaling, the one in which the Horizontal Pod Autoscaler (a built-in Kubernetes feature) adjust the number of replicas of an application, based on some kind of metrica (we will discuss this in detail later).

Said that in the following you will find a deeper description of theese steps:

  • activation of custom metrics infrastructure (Prometheus + Grafana);
  • installation of MQTT broker software (VerneMQ);
  • make VerneMQ installation scale bidirectional;
  • check discovery of VerneMQ metrics in Prometheus;
  • finding the most suitable matric for the purpose of this module;
  • create a stress test script;
  • create a Grafana dashboard and verify the metric;
  • integrate the metric into the kubernetes API (Prometheus adapter);
  • create kubernetes autoscaler and test.

Activation of custom metrics infrastructure (Prometheus + Grafana)

To autoscale an app, the Horizontal Pod Autoscaler executes an eternal control loop:

And at each loop it need to query some system for the current metric value.

This is where a new Kubernetes component, The metrics registry come into play. The purpose of the metrics registry is to provide a standard interface for clients to query metrics from. The interface of the metrics registry consists of three separate APIs:

  • The Resource Metrics API
  • The Custom Metrics API
  • The External Metrics API

So if you want your application to scale based on custom metric you need to provide a backend for the Custom Metrics API, backend that is composed of a Metric collector and a Metric API server.

In this paragraph I will discuss activation of a popular choice of Metric collector, Prometheus.

I activated this by applying Helm package stable/prometheus-operator, this chart includes multiple components, mainly Prometheus itself, Grafana (a tool to create dashboard on metrics) and built-in service monitors to scrape metrics from internal kubernetes components.

This is the command:

helm install my-mon stable/prometheus-operator -f values.yaml

Where inside values.yaml I customized:

serviceMonitorSelectorNilUsesHelmValues: false

This is necessar to discover metrics on component other than the standard Kubernetes infrastructure modules. At this point you should be able to connect to Prometheus web interface:

and Grafana web interface:

Installation of MQTT broker software (VerneMQ)

Besides the various implementations of MQTT broker, I choosed VerneMQ:

I installed this through the official vernemq/vernemq helm chart. For doing this I first cloned the chart locally:

helm pull vernemq/vernemq --untar true

And after that I applied the following customization to values.yaml:

serviceMonitor:
create: true

this settings create a serviceMonitor to allow prometheus to scrape the VerneMQ custom metrics interface, and:

additionalEnv:
- name: DOCKER_VERNEMQ_ALLOW_ANONYMOUS
value: "on"
- name: DOCKER_VERNEMQ_ACCEPT_EULA
value: "yes"

That are basic parameters to make VerneMQ works in a test environment. After that I issued the following:

helm install myVerneMQ . --set replicaCount=2 -f values.yaml

At this time you should be able to connect, below you can find a sample Python code to check that:

import paho.mqtt.client as paho
import time
broker="localhost"
port=1883
client1 = paho.Client("control1")client1.connect(broker,port)

The output in Jupyter notebook should look like this:

Make VerneMQ installation scale bidirectional

Before we can go ahead we need to understand that an MQTT broker is a stateful application and so VerneMQ cluster is a stateful cluster.

This means that clustered VerneMQ nodes will share information about connected clients, sessions and also meta-information about the cluster itself.

For instance, if you stop a cluster node, the VerneMQ cluster will not just forget about it. It will know that there’s a node missing and it will keep looking for it.

Indeed the remaining nodes do not known if there’s a netsplit situation, the node is stopped or is definitely lost.

This doesn’t mean that a VerneMQ cluster cannot dynamically grow and shrink. But it means you have to tell the cluster what you intend to do, by using join and leave shell commands.

The cluster implemented in the chart handle correctly the up scaling, to make it handle correctly also the down scaling it’s necessar to edit the statefulset.yaml file, adding the lines below:

lifecycle:
preStop:
exec:
command: ["/bin/bash","-c","vmq-admin cluster leave node=VerneMQ@$HOSTNAME.myvernemq-headless.default.svc.cluster.local -k -i 1 -t 30"]

Check discovery of VerneMQ metrics in Prometheus

At this point, if all went well, thank’s to the auto scrape mechanism, you should be able to see VerneMQ metrics in Prometheus.

Let’s open the Prometheus web interface and look for mqtt metrics:

Finding the most suitable matric for the purpose of this prototype

Analyzing the various metrics offered I found the one below:

representing the number of messages published to each Pods.

Create a stress test script

With the objective in mind to create a fully functional prototype is necessar to have a method to put load over the cluster.

And is necessar to dynamically shard this load over all the existing replicas.

To do that I created a Python script based on Paho library:

And I created the script in a way that it create and close a connection before and after each message (not very efficient I understand, but this is required to obtain the sharding of messages).

This script teamed with Kubernetes round robin ClusterIP balancer realized the stress test module I needed.

It’s important to note that this is a prototype, in real world scenario you want a situation in witch some kind of balancer affinity stick each client with one replica. Othewise the cluster handling system gone crazy keeping integrity of queue across the cluster.

The stress test is run inside a Jupyter notebook, this is the connection cell:

import paho.mqtt.client as paho
import time
broker="localhost"
port=1883
def on_publish(client,userdata,mid): #create function for callback
print("Published message to topic","ecg/omniacare/devices")
print("mid=", mid)
pass
client1 = paho.Client("control1") #create client objectclient1.on_publish = on_publish #assign function to callbackff = open("/home/mqtt_ecg.txt","r")
ss = ff.read()

and this is the executor cell:

x = 3000
while True:
x += 1
client1.connect(broker,port) #establish connection
client1.loop_start() #start the loop
ss2 = ss.replace("mario.rossi", "mario.rossi."+str(x).zfill(6))
print("Publishing message to topic","ecg/omniacare/devices")
res = client1.publish("/ecg/omniacare/devices/0X345000DFG",ss2,2,0)
print("Post publishing message to topic","ecg/omniacare/devices",res)
client1.loop_stop(force=True)
client1.disconnect()
time.sleep(5) # wait

It’s important to note the last line where a sleep command give a simple way to scale up and down the load.

Create a Grafana dashboard and verify the metric

This is the dashboard I created to test this prototype:

In the upper right corner there is the total number of messages received by the VerneMQ cluster.

In the upper left there is the number of messages received per minutes per cluster.

The lines below is the number of messages per minutes per replica.

The numbers above are relative to a sleep parameter of 5 seconds, indeed this delay give a peak value of 12 messages per minute, but obviously it’s necessary to consider the time required to send the messages too.

Integrate the metric into the kubernetes API (Prometheus adapter)

As I stated above if you want your application to scale based on custom metric you need to provide a backend for the Custom Metrics API, backend that is composed of a Metric collector and a Metric API server.

In this paragraph I will discuss activation of a popular choice of Metric API server, Prometheus adapter.

After having analyzed prerequisites I installed the stable/prometheus-adapter chart.

To do this I needed to customize the values.yaml with the correct prometheus url:

# Url to access prometheus
prometheus:
url: http://my-mon-prometheus-operator-prometheus.default.svc.cluster.local
port: 9090

and create a custom rule to convert an absolute metric (mqtt_publish_received=total number of messages received per replica) into a rate metric, a metric that return the number of millirequest per second per replica:

rules:
- seriesQuery:
'{namespace!="",__name__="mqtt_publish_received",mqtt_version="4"}'
seriesFilters: []
resources:
template: <<.Resource>>
metricsQuery: sum(irate(<<.Series>>{<<.LabelMatchers>>}[1m])) by (<<.GroupBy>>)

Create kubernetes autoscaler and test

Now we have created all the building blocks needed and the last step and really the easier is the creation of the autoscaler.

This is the yaml:

apiVersion: autoscaling/v2beta1
kind: HorizontalPodAutoscaler
metadata:
name: vernemq-autoscaler
spec:
scaleTargetRef:
# point the HPA at the sample application
# you created above
apiVersion: apps/v1
kind: StatefulSet
name: myvernemq
# autoscale between 1 and 10 replicas
minReplicas: 1
maxReplicas: 10
metrics:
# use a "Pods" metric, which takes the average of the
# given metric across all pods controlled by the autoscaling target
- type: Pods
pods:
# use the metric that you used above: pods/http_requests
metricName: mqtt_publish_received
# target 500 milli-requests per second,
# which is 1 request every two seconds
targetAverageValue: 60m

A finally I have three video to describe the prototype in action.

Let’s start with a description of the test console:

In the upper left corner you can see the status of the VerneMQ cluster, in the istant in which the image was taken the cluster was compose of a single replica.

In the upper right corner there is the stress test running in the Jupyter notebook, important to note there the sleep number.

In the lower left corner you can see the Grafana dashboard where is represented the total rate of messages/minute per cluster and below the same metric but per single replica, single replica gauge that Grafana is able to create and destroy following the autoscaler actions.

Finally in the lower right corner there is the status of the kubernetes autoscaler where it’s important to note the average actual load/target load.

At the beginning the average load is 0:

Sooner after applying a ligth load, with a sleep time of 5 seconds, the average load start increasing:

And after some time the value exceeds the threshold:

Sooner the autoscaler react adjusting the number of replicas and start creating new pods:

As the numer of pods increase the average load decrease and Grafana start detecting new pods too:

Until the system stabilize:

This is the complete video:

In the second video the stress load increase and the number of replicas goes to the maximun allowed for the autoscaler configuration of 10:

And finally in the third video the stress decrease to 7 replicas:

That’s all.

Thank’s for reading.

--

--

Maurizio Vivarelli

Data Engineer, ERP developer. I like build things, sport, space and futurism.