MQTT Publish
Overview#
The MQTT Publish task is a workflow task component designed to connect to an MQTT broker (such as ActiveMQ, HiveMQ, or AWS IoT Core), and publish messages to a topic.
The MQTT Publish task supports both MQTT 3.x and MQTT 5.x protocols. It supports secure connections (SSL/TLS), authentication, failover, and dynamic configuration via workflow input parameters.
Note: For information about the MQTT protocol, refer to this Wikipedia page.
MQTT Publish features#
The MQTT Publish task offers the following features:
- Publish messages to any MQTTS broker supporting the MQTT 3.x (default) and MQTT 5.x protocols.
- Supports SSL/TLS (including AWS IoT Core with in-memory certificates/keys).
- Suppots authentication via username/password or client certificates.
- Configurable via workflow input parameters.
- Failover accepts one or multiple brokers in
_mqtt_targetinput, parameter if provided. - Supports message handling (expects
_messagesas a JSON array).
Typical use cases#
The MQTT Publish task is particularly useful for these tasks:
- Publishing messages to ActiveMQ, HiveMQ, or AWS IoT Core from a workflow.
- Integrating IoT device communication into business workflows.
Input parameters#
| Parameter | Type | Required | Description |
|---|---|---|---|
_mqtt_target | JSON array | Yes | Array of MQTT connection targets. Each element contains a _config object, which must include a _type field specifying the authentication method: "cert" (certificate-based, requires _cert, _key, _ca, _clientId), "password" (requires _username, _password), or "none" (anonymous). _config also includes _url and optional _mqtt_version (3 or 5, default to 3), and _qos (Quality of Service: 0, 1, or 2; default: 1). |
_topic | String | Yes | MQTT topic to publish. |
_messages | JSON array | Yes | The messages to publish - an array of strings that will be published individually. |
Notes on MQTT quality of service#
MQTT quality of service has settings of 0, 1, or 2. The default value is 1.
Note the following settings:
- QoS 0: At most once (fire and forget)
- QoS 1: At least once (guaranteed delivery, but possible duplicates)
- QoS 2: Exactly once (guaranteed delivery, no duplicates)
How the MQTT Publish task works#
The QTTT Publish task operates as follows:
Connection setup: The MQTT Publish task first reads the broker URL from
_config._urlin_mqtt_target. It also supports failover and will try multiple URLs if there are multiple JSON entries supplied in_mqtt_target.Authentication: The task supports username and password or alternatively, client certificate authentication.
Publish: The task publishes the provided JSON messages to the topic.
Result: The task returns a JSON object indicating a status of
"published"and a count of the number of messages published if more than 1.
Example Workflow definition#
The example Workflow definition below takes data from Haystack and publishes to ActiveMQ.
Note that the result of the Haystack Task is a JSON object with property result, and the evalData is what we want to publish. This can be accessed by subsequent tasks as ${haystack_task.output.result.evalData}.
{ "_name": "Haystack to MQTT Workflow", "_description": "Fetch data from Haystack and publish to MQTT", "_namespaces": [ "{{nsfilter}}" ], "_userType": "haystack_mqtt_publish", "_taskDefs": [ { "_name": "haystack_task", "_type": "HAYSTACK", "_sequenceno": 1, "_inputParams": { "_project": "test_project", "_url": "https://xxxxx.invicara.com/api", "_uname": "xxxxxxxx", "_pwd": "xxxxxxxx", "_action": "eval", "_to": "json", "_cmd": "point and his and hisStatus == \"ok\"" } }, { "_name": "publish_haystack_data", "_type": "MQTT_PUBLISH", "_sequenceno": 2, "_inputParams": { "_mqtt_target": [{ "_config": { "_url": "ssl://broker.hivemq.com:8883?", "_username": "xxxxxxx", "_password": "xxxxxxxxxxxxx" } }], "_topic": "haystack/data", "_messages": ["${haystack_task.output.result.evalData}", "Hello, Test."] } } ]}
AWS IoT Core Connection code snippet#
Refer to the sample code snippet below.
{ "_inputParams": { "_mqtt_target": [{ "_config": { "_url": "ssl://xxxxxx-ats.iot.us-east-1.amazonaws.com:8883", "_clientId": "<client Id set in AWS IoT>", "_cert": "<PEM client certificate>", "_key": "<PEM private key>", "_ca": "<PEM root CA>" } }], "_topic": "test/topic", "_messages": [ "List of strings" ] }}
Notes on general usage#
Note the following:
- The task expects
_messageto be a JSON object and not a string. - For AWS IoT Core and other protocols requiring certificates, all three PEM fields (
_cert,_key,_ca) are required. - For standard MQTT brokers, only
_config._url,_topic, and_messageare required.
Notes on error handling#
Note the following behavior when the MQTT Publish task handles errors:
- Throws an exception if all connection attempts fail.
- Throws an exception if required parameters are missing or invalid.
- Logs all connection attempts and errors for troubleshooting.