Pulsar客户端
x/pulsarClient
组件:v0.33.0+ Pulsar生产者组件。用于将消息发布到Pulsar指定主题。
# 配置
该组件支持通过server
字段复用共享的Pulsar连接客户端,避免重复创建连接。详见组件连接复用。
字段 | 类型 | 必填 | 说明 | 默认值 |
---|---|---|---|---|
server | string | 是 | Pulsar服务器地址,格式为pulsar://host:port | pulsar://localhost:6650 |
topic | string | 是 | 发布主题,支持使用组件配置变量进行动态配置 | /device/msg |
key | string | 否 | 消息键模板,支持使用组件配置变量进行动态配置,用于消息路由和分区 | 无 |
headers | map[string]string | 否 | 自定义消息属性,支持key和value都使用组件配置变量进行动态配置 | 无 |
authToken | string | 否 | Pulsar JWT鉴权令牌 | 无 |
certFile | string | 否 | TLS证书文件路径 | 无 |
certKeyFile | string | 否 | TLS私钥文件路径 | 无 |
# 工作原理
- 组件初始化时会根据配置连接到Pulsar集群
- 创建生产者实例用于发送消息
- 接收到消息后,将消息内容发布到指定的topic
- 支持设置消息键和自定义headers属性,key和value都支持变量替换
- 使用模板引擎处理topic、key和headers中的变量替换
- 发布成功后通过Success链路由,失败则通过Failure链路由
# Relation Type
- Success: 以下情况消息发送到
Success
链路:- 消息成功发布到Pulsar集群
- 收到Pulsar服务器确认
- Failure: 以下情况消息发送到
Failure
链路:- 连接Pulsar集群失败
- 创建生产者失败
- 发布消息失败
- 配置参数错误
- TLS证书加载失败
- 模板解析失败
- topic为空
# 执行结果
组件执行完成后:
- msg.data保持不变
- metadata保持不变
- msgType保持不变
# 配置示例
# 基础配置
{
"id": "s5",
"type": "x/pulsarClient",
"name": "发布到Pulsar",
"debugMode": true,
"configuration": {
"server": "pulsar://localhost:6650",
"topic": "persistent://public/default/device-msg"
}
}
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# 高级配置
{
"id": "s6",
"type": "x/pulsarClient",
"name": "发布到Pulsar带属性",
"debugMode": true,
"configuration": {
"server": "pulsar://localhost:6650",
"topic": "persistent://public/default/device-${deviceType}",
"key": "${deviceId}",
"headers": {
"source": "${source}",
"timestamp": "${ts}",
"deviceType": "${deviceType}"
},
"authToken": "your-jwt-token"
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 应用示例
在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/07/22, 15:00:05