RuleGo RuleGo
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • Endpoint概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件

    • Rest Endpoint
    • Websocket Endpoint
    • MQTT Endpoint
    • Schedule Endpoint
    • Net Endpoint
    • Kafka Endpoint
    • Nats Endpoint
      • Type
      • 启动配置
      • 工作原理
        • 订阅模式
        • 消息处理
      • 响应
      • 示例
        • 基本配置示例
        • 代码示例
        • 更多示例
    • Redis Sub Endpoint
    • Redis Steam Endpoint
    • Rabbitmq Endpoint
    • MYSQL CDC Endpoint
    • OPC_UA Endpoint
    • GRPC Stream Endpoint
    • Beanstalkd Endpoint
    • Wukongim Endpoint
    • 扩展Endpoint
    • NSQ Endpoint
    • Pulsar Endpoint
目录

Nats Endpoint

Nats Endpoint v0.21.0+ 用来创建和启动Nats订阅服务,它可以订阅不同主题数据,然后路由到不同规则链进行处理。

提示

该组件是扩展组件,需要引入额外的扩展库:rulego-components (opens new window)

# Type

endpoint/nats

# 启动配置

该组件允许通过server字段复用共享的连接客户端。参考组件连接复用 。

字段 类型 是否必填 说明 默认值
server string 是 NATS服务器地址,格式为host:port 无
username string 否 NATS服务器认证用户名 无
password string 否 NATS服务器认证密码 无
groupId string 否 消费者组ID,用于负载均衡。设置后使用队列订阅模式,为空时使用普通订阅模式 无

# 工作原理

# 订阅模式

普通订阅模式(groupId为空):

  • 使用Subscribe方法订阅主题
  • 每个订阅者都会收到发布到该主题的所有消息
  • 适用于广播场景

队列订阅模式(设置groupId):

  • 使用QueueSubscribe方法订阅主题
  • 多个具有相同groupId的订阅者组成一个消费者组
  • 消息在组内以负载均衡方式分发,每条消息只被组内一个消费者处理
  • 适用于负载均衡和高可用场景

# 消息处理

  • 接收到NATS消息后,组件会将消息转换为RuleGo消息格式
  • 根据路由配置将消息发送到对应的规则链进行处理
  • 支持通过responseTopic进行消息响应

# 响应

exchange.Out.SetBody响应之前,需要通过exchange.Out.Headers()或者exchange.Out.Msg.Metadata指定responseTopic参数,组件就会往指定的主题发送数据:

exchange.Out.GetMsg().Metadata.PutValue("responseTopic", "device.msg.response")
// or
exchange.Out.Headers().Add("responseTopic", "device.msg.response")

exchange.Out.SetBody([]byte("ok"))
1
2
3
4
5

响应参数配置:

字段 类型 是否必填 说明 默认值
responseTopic string 是 主题 -

# 示例

# 基本配置示例

普通订阅模式:

{
  "server": "nats://127.0.0.1:4222",
  "username": "user",
  "password": "password"
}
1
2
3
4
5

队列订阅模式(负载均衡):

{
  "server": "nats://127.0.0.1:4222",
  "username": "user",
  "password": "password",
  "groupId": "device-processors"
}
1
2
3
4
5
6

# 代码示例

// 创建NATS endpoint
config := nats.Config{
    Server:   "nats://127.0.0.1:4222",
    Username: "user",
    Password: "password",
    GroupId:  "device-group", // 设置消费者组ID
}

ep := &nats.Nats{}
ep.Config = config

// 添加路由
ep.AddRouter(rulego.NewRouter().From("device.msg.request").To("chain1"))

// 启动endpoint
ep.Start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 更多示例

以下是使用endpoint的示例代码:

  • NatsEndpoint (opens new window) (扩展组件库)
  • RestEndpoint (opens new window)
  • WebsocketEndpoint (opens new window)
  • MqttEndpoint (opens new window)
  • ScheduleEndpoint (opens new window)
  • NetEndpoint (opens new window)
  • KafkaEndpoint (opens new window) (扩展组件库)
在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/09/03, 10:09:04
Kafka Endpoint
Redis Sub Endpoint

← Kafka Endpoint Redis Sub Endpoint→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式