RuleGo RuleGo
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • 快速入门

  • 规则链

  • 标准组件

  • 扩展组件

    • 扩展组件概述
    • 过滤器

    • 转换器

    • 外部的

      • redis客户端
      • kafka客户端
        • 配置
          • SASL认证配置
          • TLS配置
        • 工作原理
        • Relation Type
        • 执行结果
        • 配置示例
          • 带SASL认证的配置示例
          • 带TLS的配置示例
        • 应用示例
      • nats客户端
      • rabbitmq客户端
      • opengemini写客户端
      • opengemini读客户端
      • MongoDB户端
      • redis发布
      • gRPC客户端
      • OpenTelemetry
      • BeanstalkdWorker
      • BeanstalkdTube
      • WukongimSender
      • NSQ客户端
      • Pulsar客户端
    • AI

    • CI

    • IoT

    • 流式计算

  • 自定义组件

  • 组件市场

  • 可视化

  • AOP

  • 触发器

  • 高级主题

  • RuleGo-Server

  • 问题

目录

kafka客户端

x/kafkaProducer组件:v0.23.0+ Kafka生产者组件。用于将消息发布到Kafka指定主题。

# 配置

该组件支持通过server字段复用共享的Kafka连接客户端,避免重复创建连接。详见组件连接复用。

字段 类型 必填 说明 默认值
server string 是 Kafka服务器地址列表,多个地址用逗号分隔 "127.0.0.1:9092"
topic string 是 发布主题,支持使用组件配置变量进行动态配置 无
key string 否 消息分区键,支持使用组件配置变量进行动态配置。用于控制消息分区分配 无
partition int32 否 指定分区编号。如果设置,消息将直接发送到指定分区 0
sasl object 否 SASL认证配置 见下表
tls object 否 TLS配置 见下表

# SASL认证配置

字段 类型 必填 说明 默认值
enable bool 否 是否启用SASL认证 false
mechanism string 否 认证机制,支持 PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 "PLAIN"
username string 否 用户名 无
password string 否 密码 无

# TLS配置

字段 类型 必填 说明 默认值
enable bool 否 是否启用TLS false
insecureSkipVerify bool 否 是否跳过证书验证 false

# 工作原理

  1. 组件初始化时会根据配置连接到Kafka集群
  2. 接收到消息后,将消息内容发布到指定的topic
  3. 发布成功后通过Success链路由,失败则通过Failure链路由
  4. 组件会自动管理连接的生命周期,包括重连等

# Relation Type

  • Success: 以下情况消息发送到Success链路:
    • 消息成功发布到Kafka集群
    • 收到Kafka服务器确认
  • Failure: 以下情况消息发送到Failure链路:
    • 连接Kafka集群失败
    • 发布消息超时
    • 发布消息失败
    • 配置参数错误

# 执行结果

组件执行完成后会更新消息的元数据信息:

  • msg.data保持不变
  • metadata会添加以下字段:
    • partition: 消息实际写入的分区编号
    • offset: 消息在分区中的偏移量
  • msgType保持不变

# 配置示例

{
  "id": "s5",
  "type": "x/kafkaProducer",
  "name": "发布到kafka",
  "debugMode": true,
  "configuration": {
    "topic": "device.msg.request",
    "server": "localhost:9092"
  }
}
1
2
3
4
5
6
7
8
9
10

# 带SASL认证的配置示例

{
  "id": "s6",
  "type": "x/kafkaProducer",
  "name": "发布到kafka(SASL)",
  "debugMode": true,
  "configuration": {
    "topic": "device.msg.request",
    "server": "localhost:9092",
    "sasl": {
      "enable": true,
      "mechanism": "PLAIN",
      "username": "your_username",
      "password": "your_password"
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 带TLS的配置示例

{
  "id": "s7",
  "type": "x/kafkaProducer",
  "name": "发布到kafka(TLS)",
  "debugMode": true,
  "configuration": {
    "topic": "device.msg.request",
    "server": "localhost:9093",
    "tls": {
      "enable": true,
      "insecureSkipVerify": false
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 应用示例

应用示例参考:kafka_producer_test (opens new window)

在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/08/14, 08:19:07
redis客户端
nats客户端

← redis客户端 nats客户端→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式