kafka客户端
x/kafkaProducer
组件:v0.23.0+ Kafka生产者组件。用于将消息发布到Kafka指定主题。
# 配置
该组件支持通过server
字段复用共享的Kafka连接客户端,避免重复创建连接。详见组件连接复用。
字段 | 类型 | 必填 | 说明 | 默认值 |
---|---|---|---|---|
server | string | 是 | Kafka服务器地址列表,多个地址用逗号分隔 | "127.0.0.1:9092" |
topic | string | 是 | 发布主题,支持使用组件配置变量进行动态配置 | 无 |
key | string | 否 | 消息分区键,支持使用组件配置变量进行动态配置。用于控制消息分区分配 | 无 |
partition | int32 | 否 | 指定分区编号。如果设置,消息将直接发送到指定分区 | 0 |
sasl | object | 否 | SASL认证配置 | 见下表 |
tls | object | 否 | TLS配置 | 见下表 |
# SASL认证配置
字段 | 类型 | 必填 | 说明 | 默认值 |
---|---|---|---|---|
enable | bool | 否 | 是否启用SASL认证 | false |
mechanism | string | 否 | 认证机制,支持 PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 | "PLAIN" |
username | string | 否 | 用户名 | 无 |
password | string | 否 | 密码 | 无 |
# TLS配置
字段 | 类型 | 必填 | 说明 | 默认值 |
---|---|---|---|---|
enable | bool | 否 | 是否启用TLS | false |
insecureSkipVerify | bool | 否 | 是否跳过证书验证 | false |
# 工作原理
- 组件初始化时会根据配置连接到Kafka集群
- 接收到消息后,将消息内容发布到指定的topic
- 发布成功后通过Success链路由,失败则通过Failure链路由
- 组件会自动管理连接的生命周期,包括重连等
# Relation Type
- Success: 以下情况消息发送到
Success
链路:- 消息成功发布到Kafka集群
- 收到Kafka服务器确认
- Failure: 以下情况消息发送到
Failure
链路:- 连接Kafka集群失败
- 发布消息超时
- 发布消息失败
- 配置参数错误
# 执行结果
组件执行完成后会更新消息的元数据信息:
- msg.data保持不变
- metadata会添加以下字段:
- partition: 消息实际写入的分区编号
- offset: 消息在分区中的偏移量
- msgType保持不变
# 配置示例
{
"id": "s5",
"type": "x/kafkaProducer",
"name": "发布到kafka",
"debugMode": true,
"configuration": {
"topic": "device.msg.request",
"server": "localhost:9092"
}
}
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# 带SASL认证的配置示例
{
"id": "s6",
"type": "x/kafkaProducer",
"name": "发布到kafka(SASL)",
"debugMode": true,
"configuration": {
"topic": "device.msg.request",
"server": "localhost:9092",
"sasl": {
"enable": true,
"mechanism": "PLAIN",
"username": "your_username",
"password": "your_password"
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 带TLS的配置示例
{
"id": "s7",
"type": "x/kafkaProducer",
"name": "发布到kafka(TLS)",
"debugMode": true,
"configuration": {
"topic": "device.msg.request",
"server": "localhost:9093",
"tls": {
"enable": true,
"insecureSkipVerify": false
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
# 应用示例
在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/08/14, 08:19:07