Nats Endpoint
Nats Endpoint v0.21.0+ 用来创建和启动Nats订阅服务,它可以订阅不同主题数据,然后路由到不同规则链进行处理。
提示
该组件是扩展组件,需要引入额外的扩展库:rulego-components (opens new window)
# Type
endpoint/nats
# 启动配置
该组件允许通过server
字段复用共享的连接客户端。参考组件连接复用 。
字段 | 类型 | 是否必填 | 说明 | 默认值 |
---|---|---|---|---|
server | string | 是 | NATS服务器地址,格式为host:port | 无 |
username | string | 否 | NATS服务器认证用户名 | 无 |
password | string | 否 | NATS服务器认证密码 | 无 |
groupId | string | 否 | 消费者组ID,用于负载均衡。设置后使用队列订阅模式,为空时使用普通订阅模式 | 无 |
# 工作原理
# 订阅模式
普通订阅模式(groupId为空):
- 使用
Subscribe
方法订阅主题 - 每个订阅者都会收到发布到该主题的所有消息
- 适用于广播场景
队列订阅模式(设置groupId):
- 使用
QueueSubscribe
方法订阅主题 - 多个具有相同groupId的订阅者组成一个消费者组
- 消息在组内以负载均衡方式分发,每条消息只被组内一个消费者处理
- 适用于负载均衡和高可用场景
# 消息处理
- 接收到NATS消息后,组件会将消息转换为RuleGo消息格式
- 根据路由配置将消息发送到对应的规则链进行处理
- 支持通过
responseTopic
进行消息响应
# 响应
exchange.Out.SetBody
响应之前,需要通过exchange.Out.Headers()
或者exchange.Out.Msg.Metadata
指定responseTopic
参数,组件就会往指定的主题发送数据:
exchange.Out.GetMsg().Metadata.PutValue("responseTopic", "device.msg.response")
// or
exchange.Out.Headers().Add("responseTopic", "device.msg.response")
exchange.Out.SetBody([]byte("ok"))
1
2
3
4
5
2
3
4
5
响应参数配置:
字段 | 类型 | 是否必填 | 说明 | 默认值 |
---|---|---|---|---|
responseTopic | string | 是 | 主题 | - |
# 示例
# 基本配置示例
普通订阅模式:
{
"server": "nats://127.0.0.1:4222",
"username": "user",
"password": "password"
}
1
2
3
4
5
2
3
4
5
队列订阅模式(负载均衡):
{
"server": "nats://127.0.0.1:4222",
"username": "user",
"password": "password",
"groupId": "device-processors"
}
1
2
3
4
5
6
2
3
4
5
6
# 代码示例
// 创建NATS endpoint
config := nats.Config{
Server: "nats://127.0.0.1:4222",
Username: "user",
Password: "password",
GroupId: "device-group", // 设置消费者组ID
}
ep := &nats.Nats{}
ep.Config = config
// 添加路由
ep.AddRouter(rulego.NewRouter().From("device.msg.request").To("chain1"))
// 启动endpoint
ep.Start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 更多示例
以下是使用endpoint的示例代码:
在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/09/03, 10:09:04