Kafka Mapper

Kafka Mapper

The following application was created by a project with the purpose to connect MQTT parts with the Kafka Broker.

The goal was to implement an application that is capable of transforming incoming MQTT topics to a format that is compliant with Kafka, as the original data was streamed with MQTT, whereas the subscriber was only able to receive data sent in Kafka JSON format.

Basic information about the application and key features

The MQTT-to-Kafka mapper is based on a spring boot java microservice architecture. Therefore, one of the key features of the application is the low consumption of resources, as the framework allows for the efficient distribution and usage of resources in general.

Furthermore the architecture includes multiple packages, such as the Java MQTT Paho client, Apache Kafka client, the JSON libraries JSON-Simple and the Apache Maven package for Build, Deployment, and Testing.

Another key feature provided by this application is the mapping by objects. This feature allows the application to transmit the topic from its very origin (MQTT) up to its destination (Kafka).

The next feature is the automatic detection of optional values inside the input stream. This particularity allows the application to determine these values and redirects it to the output stream. It is also possible to set a default flag for the output elements in the config file.

It is also possible to pass MQTT Topics to the same Kafka Topics.

Lastly, the application also supports simultaneous, the conversion of multiple and single MQTT Topics to Kafka Topics at once.

Architecture and functional principles of the MQTT-to-Kafka mapper

Preconditons

Before starting the application, you must ensure that both the MQTT server and the corresponding Kafka server are running.

The current version does not have an interface that performs checks such as the above-mentioned prerequisites, as in a general operation both servers must be active.

In case these preconditions are not met, the application will throw multiple errors and will eventually crash.

Functional principles

At first, a JSON string is being sent via MQTT (HTTP is possible as well). This string is being directed into the MQTT-to-Kafka mapper, where the string is then being transformed into a Kafka-compliant format, which is predefined.

The application then executes the mapping by objects. For this process, a configuration file had been created which defines all rules necessary for the execution of this mapping.  As soon as the mapping is done, the string is transmitted to the Kafka broker and can then be used for further business processes.

Version Informations

The application is currently stable and the actual status is stable (Version 1.0.1).

Version Status Change Description
0.9.1Beta
1.0.1Stable

Installation

Configuration File

The following table explains you, the individual functions of the configuration file.

File OptionExample
Meaning
General Config
CONVERTER_MODE0MQTT mode (Application works only with valid MQTT JSON commands)
1HTTP mode [URL: localhost:8080/HTTPToApacheKafkaConverterApi?jsonMessage:{}] (Application works only with valid HTTP JSON commands)
2MQTT and HTTP mode [URL: localhost:8080/HTTPToApacheKafkaConverterApi?jsonMessage:{}] (Application works with valid HTTP JSON and MQTT JSON commands)
CONVERTER_DATETIME_FORMATyyyy-MM-dd HH:mm:ssDefine the incoming DateTime format
CONVERTER_TIME_FORMAT HH:mm:ss
Define the incoming Time format
CONVERTER_DATE_FORMAT yyyy-MM-dd
Define the incoming Time format
MQTT_HTTP_APACHE_KAFKA_STREAM_LOGGER_MODE0No logging (No logging of the input and output stream)
1HTTP/MQTT stream logging
(Only logging of the input stream)
2Kafka stream logging
(Only logging of the output stream)
3HTTP/MQTT and Kafka stream logging (Logging of the input and output stream)
MQTT_HTTP_APACHE_KAFKA_LOGGER_PATHPath/Micro_Services/MqttAndHttpApacheKafkaConverterLog.logDefine the log path and file
MQTT_HTTP_APACHE_KAFKA_TRANSLATOR_MODE0Basic mode (Translation of the input to the output stream) [Example: {“name”:”value”}]
1Advanced mode (Translation and conversion of the input to the output stream) [Example: {“payload”:{“name”:”value”},
“name”:”foobar”,”optional”:
false,”type”:”struct”,”fields”
:[{“field”:”name”,”
optional”:false,”type”:”string”
}]}]
Apache Kafka Config
MQTT_HTTP_APACHE_KAFKA_TYPEstructDefine the Kafka output stream type
MQTT_HTTP_APACHE_KAFKA_OPTIONALfalseDon’t use the optional flag for all elements of the Kafka output stream (Information: By default, the application checks the input stream and determined the optional values. You can set here the flag inside the output stream, to not using these values)
MQTT_HTTP_APACHE_KAFKA_OPTIONALtrueUse the optional flag for all elements of the Kafka output stream (Information: By default the application checks the input stream and determined the optional values. You can set here the flag inside the output stream, to use these values)
MQTT_HTTP_APACHE_KAFKA_NAMEfoobarDefine a name for the output stream
KAFKA_BROKERSPLAINTEXT://
Kafka_Broker_Hostname:Kafka_Broker_Port
Define the Kafka Broker Hostname or IP and the Port (Default: 9092)
KAFKA_MESSAGE_COUNT1Define the Kafka message count
KAFKA_CLIENT_IDMQTTToApacheKafkaStreamClientDefine the Kafka client ID
KAFKA_TOPIC_NAMEMQTT.TopicDefine the Kafka Topic
MQTT.Topic;MQTT.TopicDefine the Kafka Topics separated with ‘;’
KAFKA_TOPIC_AS_MQTT_TOPIC_NAMEtrueEnable forwarding the MQTT Topic to the Kafka Topic
falseDisable forwarding the MQTT Topic to the Kafka Topic
MQTT_TOPIC_NAME_SEPARATOR/Define the MQTT Topic seperator
KAFKA_TOPIC_NAME_SEPARATOR.Define the Kafka Topic separator
MQTT/ HTTP Config
MQTT_BROKER_HOSTtcp://MQTT_Broker_Hostname:MQTT_Broker_PortDefine the MQTT Broker Hostname or IP and the Port (Default: 1883)
MQTT_CLIENT_IDMQTTToApacheKafkaStreamClient Define the MQTT client ID
MQTT_TOPIC_NAMEMQTT/TopicDefine the MQTT Topic
MQTT/Topic;MQTT/Topic Define the MQTT Topics separated with ‘;’
LAST_WILL_TESTAMENT_TOPIC_NAMEMQTT/Topic Define the Last Will and Testament Topic
LAST_WILL_TESTAMENT_QOS2Define the Last Will and Testament QOS
LAST_WILL_TESTAMENT_MESSAGEThe device is currently not available.Define the Last Will and Testament message

Requirements

  1. The application was developed for Java 8 and may also run under higher Java versions.

Establishment

  1. Download the correct file for your Operating System.
  2. Unpack the file and copy the Jar and configuration folder & file to your local Operating System.
  3. Adjust your favorite user and set the correct rights at the Jar file (sudo chown user:user Path/Kafka_Mapper.jar & sudo chmod 755 Path/Kafka_Mapper.jar)
  4. Optional by Linux and MAC OS X Systems: Copy, implement and customize the Service Script.
    1. Download and copy the Script from here.
    2. Place the Script inside your /etc/init.d directory.
    3. Set the correct rights to your Script (sudo chmod 755 /etc/init.d/kafka-mapper).
    4. Set the correct owner to your Script (sudo chown root:root /etc/init.d/kafka-mapper).
    5. Customize the path inside the Script (sudo nano /etc/init.d/kafka-mapper).
    6. Activate the Script and implement it your system start process with (sudo systemctl enable kafka-mapper).
  5. Start the Service Script (sudo service kafka-mapper start) to run the application.
  6. Or start the MQTT_Kafka Jar file (/usr/bin/java -jar Path/Kafka_Mapper.jar &) to run the application.

Download

Donations

If you want you can support my work. The corresponding PayPal link will follow shortly.

License Agreement

You can find the license agreement here.

Support

Please, contact me if you find a bug or you need help!