NSQ Endpoint
NSQ Endpoint is used to create and start NSQ subscription services. It can subscribe to different topic data and route them to different rule chains for processing.
TIP
This component is an extension component that requires importing an additional extension library: rulego-components (opens new window)
# Type
endpoint/nsq
# Startup Configuration
Field | Type | Required | Description | Default Value |
---|---|---|---|---|
server | string | Yes | NSQ server address, supports multiple formats: 1. Single nsqd: "127.0.0.1:4150" 2. Multiple nsqd: "127.0.0.1:4150,127.0.0.1:4151" 3. Lookupd address: "http://127.0.0.1:4161,http://127.0.0.1:4162" | 127.0.0.1:4150 |
channel | string | No | Default channel name, used if not specified when adding router | default |
authToken | string | No | NSQ authentication token | - |
certFile | string | No | TLS certificate file path | - |
certKeyFile | string | No | TLS private key file path | - |
# Configuration Example
{
"server": "127.0.0.1:4150",
"channel": "my-channel",
"authToken": "your-auth-token",
"certFile": "/path/to/cert.pem",
"certKeyFile": "/path/to/key.pem"
}
2
3
4
5
6
7
# Router Configuration
When adding routes, you can specify the channel (consumer group) through parameters. Channel priority is: AddRouter parameter > Global configuration > Default value ("default"):
// Add route using global configuration channel, or "default" if not configured
routerId, err := nsqEndpoint.AddRouter(router)
// Add route specifying channel as "my-channel", takes priority over global configuration
routerId, err := nsqEndpoint.AddRouter(router, "my-channel")
2
3
4
5
# Response
Before exchange.Out.SetBody
response, you need to specify the responseTopic
parameter through exchange.Out.Headers()
or exchange.Out.Msg.Metadata
, and the component will send data to the specified topic:
exchange.Out.GetMsg().Metadata.PutValue("responseTopic", "device.msg.response")
// or
exchange.Out.Headers().Add("responseTopic", "device.msg.response")
exchange.Out.SetBody([]byte("ok"))
2
3
4
5
Response parameter configuration:
Field | Type | Required | Description | Default Value |
---|---|---|---|---|
responseTopic | string | Yes | Response topic | - |
# Message Format
Received NSQ messages are converted to RuleMsg, containing the following metadata:
messageId
: NSQ message IDattempts
: Message retry counttimestamp
: Message timestamp
# Working Principle
- Component starts by connecting to NSQ server based on configuration
- Supports connecting to single or multiple nsqd instances, also supports discovering nsqd through lookupd
- Creates corresponding consumers for each route, supports multiple routes sharing the same consumer (same topic+channel)
- Upon receiving messages, converts to RuleMsg and routes to corresponding rule chains for processing
- Supports sending processing results through response topics
- Automatically acknowledges message processing completion
# Examples
Here are example codes using endpoints: