MQTT Endpoint
Mqtt Endpoint is used to create and start MQTT receiving service, it can subscribe to different topic data, and then route it to different rule chains for processing.
# Type
endpoint/mqtt
# Startup configuration
This component allows the reuse of shared connection clients through the server field. See Component Connection Reuse for reference.
| Field | Type | Required | Description | Default |
|---|---|---|---|---|
| server | string | Yes | mqtt broker address | - |
| username | string | No | username | 0 |
| Password | string | No | password | - |
| qOS | int | No | QOS | 0 |
| cleanSession | bool | No | CleanSession | false |
| clientID | string | No | client ID | default random number |
| cAFile | string | No | CA file path | - |
| certFile | string | No | Cert file path | - |
| certKeyFile | string | No | CertKey file path | - |
# Response
Before exchange.Out.SetBody response, you need to specify the responseTopic parameter through exchange.Out.Headers() or exchange.Out.Msg.Metadata, and the component will respond to the specified topic data:
exchange.Out.GetMsg().Metadata.PutValue("responseTopic", "device.msg.response")
// or
exchange.Out.Headers().Add("responseTopic", "device.msg.response")
exchange.Out.SetBody([]byte("ok"))
2
3
4
Response parameter configuration:
| Field | Type | Required | Description | Default |
|---|---|---|---|---|
| responseTopic | string | Yes | Response topic | - |
| responseQos | int | No | Response QOS | 0 |
# Examples
The following are example codes using endpoint:
- RestEndpoint (opens new window)
- WebsocketEndpoint (opens new window)
- MqttEndpoint (opens new window)
- ScheduleEndpoint (opens new window)
- NetEndpoint (opens new window)
- KafkaEndpoint (opens new window) (Extended component library)
# Note: Topic Overlap and Duplicate Messages
In some MQTT brokers (e.g., EMQX), if a client subscribes to overlapping topic patterns, the same message may be delivered once for each subscription, resulting in duplicates. Typical examples:
- /sys/msg/a/+
- /sys/msg/+/+
These two subscriptions have overlapping match ranges. When a message topic falls into the intersection, the same message will be delivered twice (or more, depending on the number of overlapping subscriptions). When routing to rule chains via Mqtt Endpoint, plan for deduplication or avoid overlapping subscriptions.
Recommendation: Subscribe only a widest topic, then route and dispatch via nodes; for example, dispatch by msgType (msgType=topic) or by metadata.topic. Use JS scripts for complex dispatching if needed.
- Dispatch Option A: distribute using msgTypeSwitch component with msgType matching
{
"ruleChain": {
"id": "tgimYPt5L06J",
"name": "test",
"root": true,
"debugMode": true,
"additionalInfo": {
"description": "",
"noDefaultInput": false,
"layoutX": "306",
"layoutY": "285"
},
"configuration": {}
},
"metadata": {
"endpoints": [
{
"id": "node_1",
"type": "endpoint/mqtt",
"name": "MQTT",
"configuration": {
"clientID": "test5888",
"maxReconnectInterval": 0,
"password": "nc_admin",
"qOS": 0,
"server": "192.168.62.20:1883",
"username": "nc_admin"
},
"debugMode": false,
"additionalInfo": {
"layoutX": 351,
"layoutY": 132
},
"routers": [
{
"id": "m62quFtZlMMZ",
"params": [],
"from": {
"path": "/sys/msg/+/+",
"configuration": null,
"processors": []
},
"to": {
"path": "tgimYPt5L06J:node_3",
"configuration": null,
"wait": false,
"processors": []
}
}
]
}
],
"nodes": [
{
"id": "node_6",
"type": "log",
"name": "Log",
"configuration": {
"jsScript": "return 'Incoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);"
},
"debugMode": false,
"additionalInfo": {
"layoutX": 1048,
"layoutY": 248
}
},
{
"id": "node_5",
"type": "log",
"name": "Log",
"configuration": {
"jsScript": "return 'Incoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);"
},
"debugMode": false,
"additionalInfo": {
"layoutX": 1040,
"layoutY": -29
}
},
{
"type": "msgTypeSwitch",
"debugMode": false,
"id": "node_3",
"name": "Message routing",
"additionalInfo": {
"layoutX": 748,
"layoutY": 126
}
}
],
"connections": [
{
"fromId": "node_3",
"toId": "node_5",
"type": "/sys/msg/device01/data"
},
{
"fromId": "node_3",
"toId": "node_6",
"type": "/sys/msg/device02/data"
}
]
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
