RuleGo RuleGo
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • Endpoint概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件

    • Rest Endpoint
    • Websocket Endpoint
    • MQTT Endpoint
    • Schedule Endpoint
    • Net Endpoint
    • Kafka Endpoint
    • Nats Endpoint
    • Redis Sub Endpoint
    • Redis Steam Endpoint
    • Rabbitmq Endpoint
    • MYSQL CDC Endpoint
    • OPC_UA Endpoint
    • GRPC Stream Endpoint
    • Beanstalkd Endpoint
    • Wukongim Endpoint
    • 扩展Endpoint
    • NSQ Endpoint
    • Pulsar Endpoint
      • Type
      • 启动配置
      • 路由配置
      • 响应
      • 消息格式
      • 工作原理
      • 高级特性
        • 消息属性传递
        • 响应消息属性
      • 配置示例
      • 示例
目录

Pulsar Endpoint

Pulsar Endpoint 用来创建和启动Pulsar订阅服务,它可以订阅不同主题数据,然后路由到不同规则链进行处理。

提示

该组件是扩展组件,需要引入额外的扩展库:rulego-components (opens new window)

# Type

endpoint/pulsar

# 启动配置

字段 类型 是否必填 说明 默认值
server string 是 Pulsar服务器地址,格式为pulsar://host:port pulsar://localhost:6650
subscriptionName string 否 默认订阅名称,如果AddRouter时未指定则使用此值 default
subscriptionType string 否 订阅类型,支持:Exclusive、Shared、Failover、KeyShared(大小写不敏感) Shared
authToken string 否 Pulsar JWT鉴权令牌 -
certFile string 否 TLS证书文件路径 -
certKeyFile string 否 TLS私钥文件路径 -

# 路由配置

添加路由时,可以通过参数指定subscription(订阅名称):

// 添加路由,使用默认subscription "default"
routerId, err := pulsarEndpoint.AddRouter(router)

// 添加路由,指定subscription为 "my-subscription"
routerId, err := pulsarEndpoint.AddRouter(router, "my-subscription")
1
2
3
4
5

# 响应

exchange.Out.SetBody响应之前,需要通过exchange.Out.Headers()或者exchange.Out.Msg.Metadata指定responseTopic参数,组件就会往指定的主题发送数据:

exchange.Out.GetMsg().Metadata.PutValue("responseTopic", "persistent://public/default/device-response")
// or
exchange.Out.Headers().Add("responseTopic", "persistent://public/default/device-response")

exchange.Out.SetBody([]byte("ok"))
1
2
3
4
5

响应参数配置:

字段 类型 是否必填 说明 默认值
responseTopic string 是 响应主题 -

# 消息格式

接收到的Pulsar消息会被转换为RuleMsg,包含以下元数据:

  • topic: 消息主题
  • messageId: Pulsar消息ID
  • publishTime: 消息发布时间(RFC3339格式)
  • eventTime: 消息事件时间(RFC3339格式)
  • key: 消息键(如果有)
  • 消息的自定义属性也会添加到元数据中

# 工作原理

  1. 组件启动时根据配置连接到Pulsar集群
  2. 为每个路由创建对应的消费者,使用Shared订阅类型
  3. 接收到消息后,转换为RuleMsg并路由到对应的规则链处理
  4. 支持通过响应主题发送处理结果,响应消息会包含原始消息的属性
  5. 自动确认消息处理完成
  6. 支持优雅停机,确保消息处理完成后再关闭连接

# 高级特性

# 消息属性传递

组件会自动将Pulsar消息的所有自定义属性添加到RuleMsg的元数据中,方便在规则链中使用。

# 响应消息属性

发送响应消息时,会将响应头中的所有字段作为消息属性发送:

// 设置响应消息属性
exchange.Out.Headers().Set("correlationId", "12345")
exchange.Out.Headers().Set("messageType", "response")

// 设置响应主题和消息体
exchange.Out.GetMsg().Metadata.PutValue("responseTopic", "response-topic")
exchange.Out.SetBody([]byte("response data"))
1
2
3
4
5
6
7

# 配置示例

{
  "server": "pulsar://localhost:6650",
  "subscriptionName": "my-subscription",
  "subscriptionType": "shared",
  "authToken": "your-jwt-token",
  "certFile": "/path/to/cert.pem",
  "certKeyFile": "/path/to/key.pem"
}
1
2
3
4
5
6
7
8

# 示例

以下是使用endpoint的示例代码:

  • RestEndpoint (opens new window)
  • WebsocketEndpoint (opens new window)
  • MqttEndpoint (opens new window)
  • ScheduleEndpoint (opens new window)
  • NetEndpoint (opens new window)
  • PulsarEndpoint (opens new window) (扩展组件库)
在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/07/22, 15:00:05
NSQ Endpoint

← NSQ Endpoint

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式