Kafka Endpoint
Kafka Endpoint 用来创建和启动Kafka订阅服务,它可以订阅不同主题数据,然后路由到不同规则链进行处理。
提示
该组件是扩展组件,需要引入额外的扩展库:rulego-components (opens new window)
# Type
endpoint/kafka
# 启动配置
字段 | 类型 | 是否必填 | 说明 | 默认值 |
---|---|---|---|---|
server | string | 是 | kafka服务器地址列表,多个地址用逗号分隔 | "127.0.0.1:9092" |
groupId v0.23.0+ | string | 否 | 消费者组ID | "rulego" |
sasl | object | 否 | SASL认证配置 | 见下表 |
tls | object | 否 | TLS配置 | 见下表 |
# SASL认证配置
字段 | 类型 | 是否必填 | 说明 | 默认值 |
---|---|---|---|---|
enable | bool | 否 | 是否启用SASL认证 | false |
mechanism | string | 否 | 认证机制,支持 PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 | "PLAIN" |
username | string | 否 | 用户名 | 无 |
password | string | 否 | 密码 | 无 |
# TLS配置
字段 | 类型 | 是否必填 | 说明 | 默认值 |
---|---|---|---|---|
enable | bool | 否 | 是否启用TLS | false |
insecureSkipVerify | bool | 否 | 是否跳过证书验证 | false |
# 响应
exchange.Out.SetBody
响应之前,需要通过exchange.Out.Headers()
或者exchange.Out.Msg.Metadata
指定responseTopic
参数,组件就会往指定的主题发送数据:
exchange.Out.GetMsg().Metadata.PutValue("responseTopic", "device.msg.response")
// or
exchange.Out.Headers().Add("responseTopic", "device.msg.response")
exchange.Out.SetBody([]byte("ok"))
1
2
3
4
5
2
3
4
5
响应参数配置:
字段 | 类型 | 是否必填 | 说明 | 默认值 |
---|---|---|---|---|
responseTopic | string | 是 | 主题 | - |
partition | int | 否 | 分区 | 0 |
key | string | 否 | 分区Key | - |
# 示例
以下是使用endpoint的示例代码:
在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/08/14, 08:19:07