NSQ Endpoint
NSQ Endpoint 用来创建和启动NSQ订阅服务,它可以订阅不同主题数据,然后路由到不同规则链进行处理。
提示
该组件是扩展组件,需要引入额外的扩展库:rulego-components (opens new window)
# Type
endpoint/nsq
# 启动配置
字段 | 类型 | 是否必填 | 说明 | 默认值 |
---|---|---|---|---|
server | string | 是 | NSQ服务器地址,支持多种格式: 1. 单个nsqd: "127.0.0.1:4150" 2. 多个nsqd: "127.0.0.1:4150,127.0.0.1:4151" 3. lookupd地址: "http://127.0.0.1:4161,http://127.0.0.1:4162" | 127.0.0.1:4150 |
channel | string | 否 | 默认频道名称,如果AddRouter时未指定则使用此值 | default |
authToken | string | 否 | NSQ鉴权令牌 | - |
certFile | string | 否 | TLS证书文件路径 | - |
certKeyFile | string | 否 | TLS私钥文件路径 | - |
# 配置示例
{
"server": "127.0.0.1:4150",
"channel": "my-channel",
"authToken": "your-auth-token",
"certFile": "/path/to/cert.pem",
"certKeyFile": "/path/to/key.pem"
}
1
2
3
4
5
6
7
2
3
4
5
6
7
# 路由配置
添加路由时,可以通过参数指定channel(消费者组)。Channel的优先级为:AddRouter参数 > 全局配置 > 默认值("default"):
// 添加路由,使用全局配置中的channel,如果未配置则使用 "default"
routerId, err := nsqEndpoint.AddRouter(router)
// 添加路由,指定channel为 "my-channel",优先级高于全局配置
routerId, err := nsqEndpoint.AddRouter(router, "my-channel")
1
2
3
4
5
2
3
4
5
# 响应
exchange.Out.SetBody
响应之前,需要通过exchange.Out.Headers()
或者exchange.Out.Msg.Metadata
指定responseTopic
参数,组件就会往指定的主题发送数据:
exchange.Out.GetMsg().Metadata.PutValue("responseTopic", "device.msg.response")
// or
exchange.Out.Headers().Add("responseTopic", "device.msg.response")
exchange.Out.SetBody([]byte("ok"))
1
2
3
4
5
2
3
4
5
响应参数配置:
字段 | 类型 | 是否必填 | 说明 | 默认值 |
---|---|---|---|---|
responseTopic | string | 是 | 响应主题 | - |
# 消息格式
接收到的NSQ消息会被转换为RuleMsg,包含以下元数据:
messageId
: NSQ消息IDattempts
: 消息重试次数timestamp
: 消息时间戳
# 工作原理
- 组件启动时根据配置连接到NSQ服务器
- 支持连接到单个或多个nsqd实例,也支持通过lookupd发现nsqd
- 为每个路由创建对应的消费者,支持多个路由共享同一个消费者(相同topic+channel)
- 接收到消息后,转换为RuleMsg并路由到对应的规则链处理
- 支持通过响应主题发送处理结果
- 自动确认消息处理完成
# 示例
以下是使用endpoint的示例代码:
在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/07/22, 15:00:05