Detect the connection state of your IoT devices with MQTT

This article covers the topic of determining the connection state of your IoT devices assuming MQTT is used as your underlying messaging protocol. Specifically, we employ features of version 3.1 of the protocol in order to write less code, detect state changes faster and consume less resources than a typical heartbeat protocol. Our solution assumes an entity we call the state server which is essentially a MQTT client that stores the states of the devices.

Device states

The connection states in our use cases are represented by a finite state machine with more than the two obvious states: connected and disconnected. From our view it is useful to detect when a device has disconnected gracefully (e.g. following a reboot ordered from the backend) or ungracefully (e.g. due to some runtime error which crashed our application). We would like to distinguish between these two types of disconnection as in the latter case, we would probably want to be notified in order to investigate the cause of the crash.

From a strictly point of view the connected and disconnected states express connectivity in the context of our IoT application. In the disconnected state we do not care to distinct between the scenarios where the device is connected or not to the Internet. When a device is connected we assume our application is running smoothly and is reachable via our messaging protocol. On the other hand, when a device is disconnected we consider it dead as it is unreachable and thus cannot do any useful work for our application.

In addition to the afore mentioned states we will include the state unknown which expresses that our state server has no information about the status of a device. Devices could be found in this state for example when the MQTT broker is detected to be down. Or when our state server has just booted up. Devices could still be running smoothly however their status is (yet) unknown.

Summing up, a device state could have one of the following values:

  • connected: the device is connected
  • disconnected: the device is not connected since the state server booted up
  • gracefully disconnected: the device was connected earlier but not anymore due to a graceful disconnection
  • ungracefully disconnected: the device was connected earlier but not anymore due to an ungraceful disconnection
  • unknown: the device status is (yet) unknown

MQTT v3.1 features

In this section we shall describe the features of MQTT v3.1 our state server employs.

Qualities of service for message delivery

MQTT delivers messages according to the levels defined in a Quality of Service (QoS) which are:

  • QoS level 0 – at most once delivery: The message is delivered at most once, or it is not delivered at all. The message might be lost if the client is disconnected, or if the broker fails. Its delivery across the network is not acknowledged.
  • QoS level 1 – at least once delivery: The message is always delivered at least once. If the sender does not receive an acknowledgment, the message is sent again until an acknowledgment is received. As a result, the receiver may receive and process the same message multiple times.
  • QoS level 2 – exactly once delivery: The message is always delivered exactly once. This level ensures that duplicate messages are not delivered to the receiving application. This is the highest level of delivery, for use when duplicate messages are not acceptable. There is a slight increase in network traffic, but it is usually acceptable because of the importance of the message content.

Last will testament (LWT)

A LWT message defines a message to be published by the broker when its connection with a client is torn down. The LWT message and the topic on which the message will be published is set individually by each client during the initial connection establishment with the broker.

A real world analogy is that of a real last will and testament. Every person can formulate a legal document, called testament, in which he specifies what actions should be taken after he has passed away. The wishes are legally binding and performed by an executor on the person’s behalf. Similarly, in the MQTT context a client can formulate a testament, in which it defines what message and on which topic the message needs to be sent on it’s behalf when the broker recognizes that the client went offline. Apparently, that message is sent by the broker since the client is offline and cannot publish anymore.

A following question is how does the broker recognize that the client went offline? Well, MQTT is based on TCP/IP hence it is a connection-oriented protocol. MQTT has a feature called Keep Alive timer. This timer defines the maximum time interval between messages sent by a client. This enables the broker to detect that the network connection with a client has dropped, without having to wait for the long TCP/IP timeout. The client has responsibility to send at least one message within each Keep Alive time period. If there are no real data-related message during the time period, the client is obliged to send a ping-type message to the broker to which the broker responds with a pong. If the broker does not receive any message from the client within one and a half times the Keep Alive time period, it tears down the connection and assumes the client has went offline.

The default value for the „Keep Alive“ period is around 30 seconds. If the value is to be re-configured, one needs to take into account the trade-off between a small and a large interval. A small interval means that the broker will detect and announce much sooner that a client has disconnected. If the period is x seconds our application will be aware that a device disconnected in 1.5x seconds. At the same time this setting may occupy more bandwidth and resources from the server as it will need to respond to the pings more frequently. On the other hand a large „Keep Alive“ period will delay significantly to detect disconnections. In practice, brokers employ additional techniques which detect dropped connections much faster. In our experiments, even though we set the keepalive timer to 15 seconds, our VerneMQ Broker instance could detect half-open connections almost instantaneously.

Detecting connection state of devices

In this section we present the code which shows the sequence of actions each entity of our application needs to follow from the moment it boots up till it shuts downs in order to monitor successfully the connection states. We have three types of entities:

  • Device (MQTT client) – many
  • State server (MQTT client) – one
  • Messaging server (MQTT broker) – one

Below we show the sample code for the first two entities:

Device

import logging
import paho.mqtt.client as mqtt
from threading import Lock
from time import sleep

logging.basicConfig(level=logging.DEBUG,
                    format='%(name)s: %(message)s',)
logger = logging.getLogger('DeviceStateUpdater')
logger.setLevel(logging.INFO)

class DeviceStateUpdater(mqtt.Client):

    def __init__(self, settings, device_id, keepalive=60):
        logger.info('initializing...')
        self._settings = settings
        self._keepalive = keepalive

        self._state_topic = 'states/{}'.format(device_id)
        self._ping_topic = 'pings/{}'.format(device_id)

        self._connected = False
        # creating a lock for the above var for thread-safe reasons
        self._lock = Lock()

        super(DeviceStateUpdater, self).__init__()

    def connect(self):
        logger.info('connecting...')
        self.username_pw_set(self._settings['user'], self._settings['pass'])
        self.will_set(self._state_topic, 'ungraceful_disconnection', qos=2)

        super(DeviceStateUpdater, self).connect(self._settings['host'],
                                                self._settings['port'],
                                                keepalive=self._keepalive)
        self.loop_start()

        while True:

            with self._lock:
                if self._connected:
                    logger.info('connected')
                    break

            sleep(1)

    def disconnect(self):
        logger.info('disconnecting...')
        with self._lock:
            # if already disconnected, don't do anything
            if not self._connected:
                return

        # inform the state server that the device will disconnect
        self.publish(self._state_topic, 'graceful_disconnection', qos=2)
        logger.info('publishing graceful_disconnection')
        # sleep for 3 secs so we receive TCP acknowledgement for the above message
        sleep(3)
        super(DeviceStateUpdater, self).disconnect()

    # BELOW WE OVERRIDE CALLBACK FUNCTIONS

    def on_connect(self, client, userdata, flags, rc):
        # successful connection
        if rc == 0:
            logger.info('successful connection')

            # inform the state server that the device is connected
            self.publish(self._state_topic, 'connected', qos=2)
            # subscribe to the ping topic so when the server pings the device can respond with a pong
            self.subscribe(self._ping_topic, qos=2)

            with self._lock:
                self._connected = True

    def on_disconnect(self, client, userdata, rc):
        logger.info('on_disconnect')
        with self._lock:
            self._connected = False

    def on_message(self, client, userdata, msg):
        # when message is received from the ping topic respond with pong ('connected' state)
        if msg.topic == self._ping_topic:
            logger.info('received ping. responding with state')
            self.publish(self._state_topic, 'connected', qos=2)

if __name__ == '__main__':

    settings = {
        'host': 'localhost',
        'port': 1883,
        'user': '',
        'pass': '',
    }

    device_state_updater = DeviceStateUpdater(settings, device_id=0)
    device_state_updater.connect()

    while True:

        # ... replace sleeping below with doing some useful work ...
        logger.info('sleeping for 1 sec')
        sleep(1)

State server

import logging
import paho.mqtt.client as mqtt
from threading import Lock
from time import sleep

logging.basicConfig(level=logging.DEBUG,
                    format='%(name)s: %(message)s',)
logger = logging.getLogger('DeviceStateUpdater')
logger.setLevel(logging.INFO)

class StateServer(mqtt.Client):

    def __init__(self, settings, device_ids, keepalive=60):
        logger.info('initializing...')

        # ... set all device states to 'unknown' ...

        self._settings = settings
        self._device_ids = device_ids
        self._keepalive = keepalive

        self._connected = False
        # creating a lock for the above var for thread-safe reasons
        self._lock = Lock()

        super(StateServer, self).__init__()

    def connect(self):
        logger.info('connecting...')
        self.username_pw_set(self._settings['user'], self._settings['pass'])

        super(StateServer, self).connect(self._settings['host'],
                                         self._settings['port'],
                                         keepalive=self._keepalive)
        self.loop_start()

        while True:

            with self._lock:
                if self._connected:
                    break

            sleep(1)

    def disconnect(self):
        logger.info('disconnecting...')
        with self._lock:
            # if already disconnected, don't do anything
            if not self._connected:
                return

        super(StateServer, self).disconnect()

    # BELOW WE OVERRIDE CALLBACK FUNCTIONS

    def on_connect(self, client, userdata, flags, rc):
        # successful connection
        if rc == 0:
            logger.info('successful connection')

            # ... set all device states to 'disconnected' ...

            # subscribe to all state channels
            self.subscribe('states/#', qos=2)

            # ping all devices to see if they are connected
            for device_id in self._device_ids:
                logger.info('pinging device: {}'.format(device_id))
                self.publish('pings/{d_id}'.format(d_id=device_id), '', qos=2)

            with self._lock:
                self._connected = True

    def on_disconnect(self, client, userdata, rc):
        logger.info('disconnected')
        with self._lock:
            self._connected = False

    def on_message(self, client, userdata, msg):
        topic_prefix, device_id = msg.topic.split('/')

        if topic_prefix == 'states':
            state = msg.payload

            logger.info('received new state: `{}` for device with id: `{}`'.format(state, device_id))

            # ... set new state for device with above device_id ...

if __name__ == '__main__':

    settings = {
        'host': 'localhost',
        'port': 1883,
        'user': '',
        'pass': '',
    }

    state_server = StateServer(settings, device_ids=[0])
    state_server.connect()

    while True:

        # ... replace sleeping below with doing some useful work ...
        logger.info('sleeping for 1 sec')
        sleep(1)

Now, let’s examine what happens in the following scenarios:

Broker is up and running – State server is up and running – Device just boots up

When the device boots up it tries to connect with the broker. As soon as connection is established the on_connect callback is executed and the new state (connected) is published and stored in the state server.

Broker is up and running – State server just boots up – Device is up and running

Initially, when the state server boots up it sets all device states to unknown and tries to connect with the broker. As soon as connection is established the on_connect callback is executed. In the callback function, the server sets all device states to disconnected before pinging them. All connected devices respond and the server sets their state to connected.

Broker goes down and later up again – State server is up and running – Device is up and running

When the broker goes down the devices and the state server constantly try to reconnect with the broker. As soon as the broker is up again and connections are established, the two entities try to communicate. The device informs the server for its connected state. At the same time the server pings the devices to send their state. In case one of the two entities is faster to establish connection with the broker, the delayed one will miss the state update or the ping. Nevertheless, the delayed entity will advertise or ping once it connects so there is no way of having inconsistencies in the state server.

Summing up

In this article we covered the topic of monitoring the connection states of IoT devices which employ MQTT as the messaging protocol. We presented the different device states our application is interested in tracking. After that, we listed the MQTT version 3.1 features of the protocol we employ in our solution. Lastly, we showed the code which implements our state-monitoring solution which essentially takes advantage of an already existing heartbeat mechanism integrated in MQTT/TCP. Our solution allowed us to write less code, detect state changes faster and consume less resources than a typical heartbeat protocol.



References

https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html
https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.0.0/com.ibm.mq.dev.doc/q029090_.htm
https://stackoverflow.com/questions/17270863/mqtt-what-is-the-purpose-or-usage-of-last-will-testament
http://blog.stephencleary.com/2009/05/detection-of-half-open-dropped.html

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.