RuleGo RuleGo
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • 快速入门

  • 规则链

  • 标准组件

  • 扩展组件

    • 扩展组件概述
    • 过滤器

    • 转换器

    • 外部的

      • redis客户端
      • kafka客户端
      • nats客户端
      • rabbitmq客户端
      • opengemini写客户端
      • opengemini读客户端
      • MongoDB户端
      • redis发布
      • gRPC客户端
      • OpenTelemetry
      • BeanstalkdWorker
      • BeanstalkdTube
      • WukongimSender
      • NSQ客户端
      • Pulsar客户端
        • 配置
        • 工作原理
        • Relation Type
        • 执行结果
        • 配置示例
          • 基础配置
          • 高级配置
        • 应用示例
    • AI

    • CI

    • IoT

    • 流式计算

  • 自定义组件

  • 组件市场

  • 可视化

  • AOP

  • 触发器

  • 高级主题

  • RuleGo-Server

  • 问题

目录

Pulsar客户端

x/pulsarClient组件:v0.33.0+ Pulsar生产者组件。用于将消息发布到Pulsar指定主题。

# 配置

该组件支持通过server字段复用共享的Pulsar连接客户端,避免重复创建连接。详见组件连接复用。

字段 类型 必填 说明 默认值
server string 是 Pulsar服务器地址,格式为pulsar://host:port pulsar://localhost:6650
topic string 是 发布主题,支持使用组件配置变量进行动态配置 /device/msg
key string 否 消息键模板,支持使用组件配置变量进行动态配置,用于消息路由和分区 无
headers map[string]string 否 自定义消息属性,支持key和value都使用组件配置变量进行动态配置 无
authToken string 否 Pulsar JWT鉴权令牌 无
certFile string 否 TLS证书文件路径 无
certKeyFile string 否 TLS私钥文件路径 无

# 工作原理

  1. 组件初始化时会根据配置连接到Pulsar集群
  2. 创建生产者实例用于发送消息
  3. 接收到消息后,将消息内容发布到指定的topic
  4. 支持设置消息键和自定义headers属性,key和value都支持变量替换
  5. 使用模板引擎处理topic、key和headers中的变量替换
  6. 发布成功后通过Success链路由,失败则通过Failure链路由

# Relation Type

  • Success: 以下情况消息发送到Success链路:
    • 消息成功发布到Pulsar集群
    • 收到Pulsar服务器确认
  • Failure: 以下情况消息发送到Failure链路:
    • 连接Pulsar集群失败
    • 创建生产者失败
    • 发布消息失败
    • 配置参数错误
    • TLS证书加载失败
    • 模板解析失败
    • topic为空

# 执行结果

组件执行完成后:

  • msg.data保持不变
  • metadata保持不变
  • msgType保持不变

# 配置示例

# 基础配置

{
  "id": "s5",
  "type": "x/pulsarClient",
  "name": "发布到Pulsar",
  "debugMode": true,
  "configuration": {
    "server": "pulsar://localhost:6650",
    "topic": "persistent://public/default/device-msg"
  }
}
1
2
3
4
5
6
7
8
9
10

# 高级配置

{
  "id": "s6",
  "type": "x/pulsarClient",
  "name": "发布到Pulsar带属性",
  "debugMode": true,
  "configuration": {
    "server": "pulsar://localhost:6650",
    "topic": "persistent://public/default/device-${deviceType}",
    "key": "${deviceId}",
    "headers": {
      "source": "${source}",
      "timestamp": "${ts}",
      "deviceType": "${deviceType}"
    },
    "authToken": "your-jwt-token"
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 应用示例

应用示例参考:pulsar_client_test (opens new window)

在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/07/22, 15:00:05
NSQ客户端
LLM

← NSQ客户端 LLM→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式