RuleGo RuleGo
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • 快速入门

  • 规则链

  • 标准组件

  • 扩展组件

    • 扩展组件概述
    • 过滤器

    • 转换器

    • 外部的

      • redis客户端
      • kafka客户端
        • 配置
        • 工作原理
        • Relation Type
        • 执行结果
        • 配置示例
        • 应用示例
      • nats客户端
      • rabbitmq客户端
      • opengemini写客户端
      • opengemini读客户端
      • MongoDB户端
      • redis发布
      • gRPC客户端
      • OpenTelemetry
      • BeanstalkdWorker
      • BeanstalkdTube
      • WukongimSender
    • AI

    • CI

    • IoT

  • 自定义组件

  • 组件市场

  • 可视化

  • AOP

  • 触发器

  • 高级主题

  • RuleGo-Server

  • 问题

目录

kafka客户端

x/kafkaProducer组件:v0.23.0+ Kafka生产者组件。用于将消息发布到Kafka指定主题。

# 配置

该组件支持通过server字段复用共享的Kafka连接客户端,避免重复创建连接。详见组件连接复用。

字段 类型 必填 说明 默认值
brokers []string 是 Kafka服务器地址列表 ["localhost:9092"]
topic string 是 发布主题,支持使用组件配置变量进行动态配置 无
key string 否 消息分区键,支持使用组件配置变量进行动态配置。用于控制消息分区分配 无
partition int 否 指定分区编号。如果设置,消息将直接发送到指定分区,key配置将被忽略 -1

# 工作原理

  1. 组件初始化时会根据配置连接到Kafka集群
  2. 接收到消息后,将消息内容发布到指定的topic
  3. 发布成功后通过Success链路由,失败则通过Failure链路由
  4. 组件会自动管理连接的生命周期,包括重连等

# Relation Type

  • Success: 以下情况消息发送到Success链路:
    • 消息成功发布到Kafka集群
    • 收到Kafka服务器确认
  • Failure: 以下情况消息发送到Failure链路:
    • 连接Kafka集群失败
    • 发布消息超时
    • 发布消息失败
    • 配置参数错误

# 执行结果

组件执行完成后会更新消息的元数据信息:

  • msg.data保持不变
  • metadata会添加以下字段:
    • partition: 消息实际写入的分区编号
    • offset: 消息在分区中的偏移量
  • msgType保持不变

# 配置示例

{
  "id": "s5",
  "type": "x/kafkaProducer",
  "name": "发布到kafka",
  "debugMode": true,
  "configuration": {
    "topic": "device.msg.request",
    "brokers": ["localhost:9092"]
  }
}
1
2
3
4
5
6
7
8
9
10

# 应用示例

应用示例参考:kafka_producer_test (opens new window)

在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/04/02, 01:29:50
redis客户端
nats客户端

← redis客户端 nats客户端→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式