This repository has MQTT source and sink plugins for SensorBee.
Add gopkg.in/sensorbee/mqtt.v1/plugin to build.yaml and build a sensorbee
command with build_sensorbee.
The MQTT source subscribes a topic from a MQTT broker. To create a source,
use the CREATE SOURCE statement with the type mqtt:
> CREATE SOURCE mqtt_src TYPE mqtt WITH topic = "some/topic";The source generates tuples having two fields as shown below:
{
"topic": "topic/of/the/message",
"payload": <blob>
}
The topic field has a topic of a message. Its value varies if a source is
subscribing a topic with a wildcard. The payload field has a payload of a
message as a blob. To manipulate payload in BQL, it needs to be parsed. For
example, when the payload contains a JSON object, a stream like a following is
required:
> CREATE STREAM mqtt_parsed AS
SELECT RSTREAM topic, decode_json(payload) AS data FROM mqtt_src [RANGE 1 TUPLES];When a topic isn't necessary, the result of decode_json can be extracted at the top level:
> CREATE STREAM mqtt_parsed AS
SELECT RSTREAM decode_json(payload) AS * FROM mqtt_src [RANGE 1 TUPLES];The MQTT sink publishes a message to a MQTT broker. To create a sink, use the
CREATE SINK statement with type mqtt:
> CREATE SINK mqtt_sink TYPE mqtt;To publish a message from the sink, insert a tuple having topic and payload
fields to it. Then, the message will be published with the given topic and
data in the payload field.
The payload field can be one of four types: string, blob, array, or
map. When it's a string or a blob, its value is directly sent to the
broker.
> CREATE STREAM processed AS
SELECT RSTREAM "foo/bar" AS topic, encode_json(data) AS payload
FROM some_stream [RANGE 1 TUPLES];
> INSERT INTO mqtt_sink FROM processed;The processed stream above encodes the data field that is assumed to have
data as a map, so the resulting payload field is a string cnotaining a JSON
object. Then, the content will directly be sent to the broker with "foo/bar"
topic.
When the payload is an array or a map, it'll be encoded into JSON and sent
to the broker. Therefore, the example above is actually equivalent to the one
below:
> CREATE STREAM processed AS
SELECT RSTREAM "foo/bar" AS topic, data AS payload
FROM some_stream [RANGE 1 TUPLES];
> INSERT INTO mqtt_sink FROM processed;A tuple inserted into the sink can have the qos field. Its value must be 0, 1,
or 2 as an integer. Those numbers correspond to at most once, at least once,
and exactly once, respectively.
The names of topic, payload, and qos fields can be changed by setting
topic_field, payload_field, and qos_field parameters described later.
The MQTT source has a required parameter topic and following optional
parameters.
brokeruserpasswordreconnect_min_timereconnect_max_time
topic specifies a topic to which the source subscribes. It can contain
wildcards. topic is a required parameter.
broker is the address of the MQTT broker from which the source subscribes.
The address should be in "scheme://host:port" format. The default value is
tcp://127.0.0.1:1883.
The old "host:port" format is still supported in the latest version, but the
support will be dropped in the next major version up.
user is the user name used to connect to the broker. The default value is
an empty string.
password is the password of the user specified by the user parameter.
The default value is an empty string.
reconnect_min_time is the minimal time to wait before reconnecting to the
broker. The value can be specified in second as an integer or a float. It can
also be a string having Go duration format like "1s". The default value is 1
second.
Examples:
reconnect_min_time = 5
reconnect_min_time = 1.5
reconnect_min_time = "3s"
reconnect_max_time is the maximum time to wait before reconnecting to the
broker. The value can be specified in second as an integer or a float. It can
also be a string having Go duration format like "1s". The default value is 30
second.
Examples:
reconnect_max_time = 40
reconnect_max_time = 60.5
reconnect_max_time = "1m"
The MQTT sink has following optional parameters.
brokeruserpasswordtopic_fieldpayload_fieldqos_fielddefault_topicdefault_qos
broker is the address of the MQTT broker to which the sink publishes messages.
The address should be in "scheme://host:port" format. The default value is
tcp://127.0.0.1:1883.
The old "host:port" format is still supported in the latest version, but the
support will be dropped in the next major version up.
user is the user name used to connect to the broker. The default value is
an empty string.
password is the password of the user specified by the user parameter.
The default value is an empty string.
topic_field is the name of the field containing a topic as a string. For
example, when topic_field = "my_topic", a tuple inserted into the sink should
look like below:
{
"my_topic": "some/topic",
"payload": ... payload data ...
}
The default value is topic.
payload_field is the name of the field containing a payload. For example,
when payload_field = "my_payload", a tuple inserted into the sink should look
like below:
{
"topic": "some/topic",
"my_payload": ... payload data ...
}
Note that both topic_field and payload_field can be specified at once.
qos_field is the name of the field containing a qos as an integer. For
example, when qos_field = "my_qos", a tuple inserted into the sink should
look like below:
{
"my_qos": 2,
"topic": "some/topic",
"payload": ... payload data ...
}
The default value is qos.
default_topic is used when a tuple doesn't have a topic field. Its value
must not be empty. When this parameter is not specified, a tuple missing the
topic field will result in an error and will not be published.
default_qos is used when a tuple doesn't have a qos field. Its value must be
0 (at most once), 1 (at least once), or 2 (exactly once). The default value is 0.