Skip to main content
Version: v5.1

MQTT Publish

Overview#

The MQTT Publish task is a workflow task component designed to connect to an MQTT broker (such as ActiveMQ, HiveMQ, or AWS IoT Core), and publish messages to a topic.

The MQTT Publish task supports both MQTT 3.x and MQTT 5.x protocols. It supports secure connections (SSL/TLS), authentication, failover, and dynamic configuration via workflow input parameters.

Note: For information about the MQTT protocol, refer to this Wikipedia page.

MQTT Publish features#

The MQTT Publish task offers the following features:

  • Publish messages to any MQTTS broker supporting the MQTT 3.x (default) and MQTT 5.x protocols.
  • Supports SSL/TLS (including AWS IoT Core with in-memory certificates/keys).
  • Suppots authentication via username/password or client certificates.
  • Configurable via workflow input parameters.
  • Failover accepts one or multiple brokers in _mqtt_target input, parameter if provided.
  • Supports message handling (expects _messages as a JSON array).

Typical use cases#

The MQTT Publish task is particularly useful for these tasks:

  • Publishing messages to ActiveMQ, HiveMQ, or AWS IoT Core from a workflow.
  • Integrating IoT device communication into business workflows.

Input parameters#

ParameterTypeRequiredDescription
_mqtt_targetJSON arrayYesArray of MQTT connection targets. Each element contains a _config object, which must include a _type field specifying the authentication method: "cert" (certificate-based, requires _cert, _key, _ca, _clientId), "password" (requires _username, _password), or "none" (anonymous). _config also includes _url and optional _mqtt_version (3 or 5, default to 3), and _qos (Quality of Service: 0, 1, or 2; default: 1).
_topicStringYesMQTT topic to publish.
_messagesJSON arrayYesThe messages to publish - an array of strings that will be published individually.

Notes on MQTT quality of service#

MQTT quality of service has settings of 0, 1, or 2. The default value is 1.

Note the following settings:

  • QoS 0: At most once (fire and forget)
  • QoS 1: At least once (guaranteed delivery, but possible duplicates)
  • QoS 2: Exactly once (guaranteed delivery, no duplicates)

How the MQTT Publish task works#

The QTTT Publish task operates as follows:

  1. Connection setup: The MQTT Publish task first reads the broker URL from _config._url in _mqtt_target. It also supports failover and will try multiple URLs if there are multiple JSON entries supplied in _mqtt_target.

  2. Authentication: The task supports username and password or alternatively, client certificate authentication.

  3. Publish: The task publishes the provided JSON messages to the topic.

  4. Result: The task returns a JSON object indicating a status of "published" and a count of the number of messages published if more than 1.

Example Workflow definition#

The example Workflow definition below takes data from Haystack and publishes to ActiveMQ.

Note that the result of the Haystack Task is a JSON object with property result, and the evalData is what we want to publish. This can be accessed by subsequent tasks as ${haystack_task.output.result.evalData}.

{  "_name": "Haystack to MQTT Workflow",  "_description": "Fetch data from Haystack and publish to MQTT",  "_namespaces": [    "{{nsfilter}}"  ],  "_userType": "haystack_mqtt_publish",  "_taskDefs": [    {      "_name": "haystack_task",      "_type": "HAYSTACK",      "_sequenceno": 1,      "_inputParams": {        "_project": "test_project",        "_url": "https://xxxxx.invicara.com/api",        "_uname": "xxxxxxxx",        "_pwd": "xxxxxxxx",        "_action": "eval",        "_to": "json",        "_cmd": "point and his and hisStatus == \"ok\""      }    },    {      "_name": "publish_haystack_data",      "_type": "MQTT_PUBLISH",      "_sequenceno": 2,      "_inputParams": {        "_mqtt_target": [{            "_config": {                "_url": "ssl://broker.hivemq.com:8883?",                "_username": "xxxxxxx",                "_password": "xxxxxxxxxxxxx"            }        }],        "_topic": "haystack/data",        "_messages": ["${haystack_task.output.result.evalData}", "Hello, Test."]      }    }  ]}

AWS IoT Core Connection code snippet#

Refer to the sample code snippet below.

{    "_inputParams": {        "_mqtt_target": [{            "_config": {                "_url": "ssl://xxxxxx-ats.iot.us-east-1.amazonaws.com:8883",                "_clientId": "<client Id set in AWS IoT>",                "_cert": "<PEM client certificate>",                "_key": "<PEM private key>",                "_ca": "<PEM root CA>"            }        }],        "_topic": "test/topic",        "_messages": [            "List of strings"        ]    }}

Notes on general usage#

Note the following:

  • The task expects _message to be a JSON object and not a string.
  • For AWS IoT Core and other protocols requiring certificates, all three PEM fields (_cert, _key, _ca) are required.
  • For standard MQTT brokers, only _config._url, _topic, and _message are required.

Notes on error handling#

Note the following behavior when the MQTT Publish task handles errors:

  • Throws an exception if all connection attempts fail.
  • Throws an exception if required parameters are missing or invalid.
  • Logs all connection attempts and errors for troubleshooting.