Pulsar Endpoint
Pulsar Endpoint 用来创建和启动Pulsar订阅服务,它可以订阅不同主题数据,然后路由到不同规则链进行处理。
提示
该组件是扩展组件,需要引入额外的扩展库:rulego-components (opens new window)
# Type
endpoint/pulsar
# 启动配置
字段 | 类型 | 是否必填 | 说明 | 默认值 |
---|---|---|---|---|
server | string | 是 | Pulsar服务器地址,格式为pulsar://host:port | pulsar://localhost:6650 |
subscriptionName | string | 否 | 默认订阅名称,如果AddRouter时未指定则使用此值 | default |
subscriptionType | string | 否 | 订阅类型,支持:Exclusive、Shared、Failover、KeyShared(大小写不敏感) | Shared |
authToken | string | 否 | Pulsar JWT鉴权令牌 | - |
certFile | string | 否 | TLS证书文件路径 | - |
certKeyFile | string | 否 | TLS私钥文件路径 | - |
# 路由配置
添加路由时,可以通过参数指定subscription(订阅名称):
// 添加路由,使用默认subscription "default"
routerId, err := pulsarEndpoint.AddRouter(router)
// 添加路由,指定subscription为 "my-subscription"
routerId, err := pulsarEndpoint.AddRouter(router, "my-subscription")
1
2
3
4
5
2
3
4
5
# 响应
exchange.Out.SetBody
响应之前,需要通过exchange.Out.Headers()
或者exchange.Out.Msg.Metadata
指定responseTopic
参数,组件就会往指定的主题发送数据:
exchange.Out.GetMsg().Metadata.PutValue("responseTopic", "persistent://public/default/device-response")
// or
exchange.Out.Headers().Add("responseTopic", "persistent://public/default/device-response")
exchange.Out.SetBody([]byte("ok"))
1
2
3
4
5
2
3
4
5
响应参数配置:
字段 | 类型 | 是否必填 | 说明 | 默认值 |
---|---|---|---|---|
responseTopic | string | 是 | 响应主题 | - |
# 消息格式
接收到的Pulsar消息会被转换为RuleMsg,包含以下元数据:
topic
: 消息主题messageId
: Pulsar消息IDpublishTime
: 消息发布时间(RFC3339格式)eventTime
: 消息事件时间(RFC3339格式)key
: 消息键(如果有)- 消息的自定义属性也会添加到元数据中
# 工作原理
- 组件启动时根据配置连接到Pulsar集群
- 为每个路由创建对应的消费者,使用Shared订阅类型
- 接收到消息后,转换为RuleMsg并路由到对应的规则链处理
- 支持通过响应主题发送处理结果,响应消息会包含原始消息的属性
- 自动确认消息处理完成
- 支持优雅停机,确保消息处理完成后再关闭连接
# 高级特性
# 消息属性传递
组件会自动将Pulsar消息的所有自定义属性添加到RuleMsg的元数据中,方便在规则链中使用。
# 响应消息属性
发送响应消息时,会将响应头中的所有字段作为消息属性发送:
// 设置响应消息属性
exchange.Out.Headers().Set("correlationId", "12345")
exchange.Out.Headers().Set("messageType", "response")
// 设置响应主题和消息体
exchange.Out.GetMsg().Metadata.PutValue("responseTopic", "response-topic")
exchange.Out.SetBody([]byte("response data"))
1
2
3
4
5
6
7
2
3
4
5
6
7
# 配置示例
{
"server": "pulsar://localhost:6650",
"subscriptionName": "my-subscription",
"subscriptionType": "shared",
"authToken": "your-jwt-token",
"certFile": "/path/to/cert.pem",
"certKeyFile": "/path/to/key.pem"
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 示例
以下是使用endpoint的示例代码:
在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/07/22, 15:00:05