RuleGo RuleGo
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • Quick Start

  • Rule Chain

  • Standard Components

  • Extension Components

    • Extension Components Overview
    • filter

    • transform

    • external

      • redisClient
      • kafkaProducer
        • Configuration
          • SASL Authentication Configuration
          • TLS Configuration
        • Working Principle
        • Relation Type
        • Execution result
        • Configuration example
          • Configuration example with SASL authentication
          • Configuration example with TLS
        • Application example
      • natsClient
      • rabbitmqClient
      • opengeminiWrite
      • opengeminiQuery
      • MongoDB Client
      • Redis Publisher
      • grpcClient
      • OpenTelemetry
      • BeanstalkdWorker
      • BeanstalkdTube
      • WukongimSender
      • NSQ Client
      • Pulsar Client
    • ai

    • ci

    • IoT

  • Custom Components

  • Components marketplace

  • Visualization

  • AOP

  • Trigger

  • Advanced Topic

  • RuleGo-Server

  • FAQ

  • Endpoint Module

  • Support

  • StreamSQL

目录

kafkaProducer

x/kafkaProducer component: publish kafka data. Used to send the current message payload to the specified kafka topic.

# Configuration

This component supports connection reuse through the server field to share Kafka client connections and avoid duplicate connections. See Component Connection Reuse.

| Field | Type | Required | Description | Default value | |-----------|----------|----------|----------------------------------------------------------------------------------------------|--------------------|| | server | string | Yes | Kafka server address list, multiple addresses separated by commas | "127.0.0.1:9092" | | topic | string | Yes | Publish topic, can using Component Configuration Variables. | - | | key | string | No | Partition key, can using Component Configuration Variables. | - | | partition | int32 | No | Partition number. If set, messages will be sent directly to the specified partition | 0 | | sasl | object | No | SASL authentication configuration | See table below | | tls | object | No | TLS configuration | See table below |

# SASL Authentication Configuration

Field Type Required Description Default value
enable bool No Whether to enable SASL authentication false
mechanism string No Authentication mechanism, supports PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 "PLAIN"
username string No Username -
password string No Password -

# TLS Configuration

Field Type Required Description Default value
enable bool No Whether to enable TLS false
insecureSkipVerify bool No Whether to skip certificate verification false

# Working Principle

  1. The component connects to the Kafka cluster during initialization
  2. When a message is received, it publishes the message content to the specified topic
  3. On successful publication, routes through Success chain; on failure, routes through Failure chain
  4. The component automatically manages connection lifecycle, including reconnection

# Relation Type

  • Success: Message sent to Success chain in the following cases:
    • Message successfully published to Kafka cluster
    • Received confirmation from Kafka server
  • Failure: Message sent to Failure chain in the following cases:
    • Failed to connect to Kafka cluster
    • Message publication timeout
    • Message publication failed
    • Configuration parameter error

# Execution result

After component execution, the message metadata will be updated:

  • msg.data remains unchanged
  • metadata will add the following fields:
    • partition: The partition number where the message was actually written
    • offset: The offset of the message in the partition
  • msgType remains unchanged

# Configuration example

{
  "id": "s5",
  "type": "x/kafkaProducer",
  "name": "Publish to kafka",
  "debugMode": true,
  "configuration": {
    "topic": "device.msg.request",
    "server": "localhost:9092"
  }
}
1
2
3
4
5
6
7
8
9
10

# Configuration example with SASL authentication

{
  "id": "s6",
  "type": "x/kafkaProducer",
  "name": "Publish to kafka (SASL)",
  "debugMode": true,
  "configuration": {
    "topic": "device.msg.request",
    "server": "localhost:9092",
    "sasl": {
      "enable": true,
      "mechanism": "PLAIN",
      "username": "your_username",
      "password": "your_password"
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# Configuration example with TLS

{
  "id": "s7",
  "type": "x/kafkaProducer",
  "name": "Publish to kafka (TLS)",
  "debugMode": true,
  "configuration": {
    "topic": "device.msg.request",
    "server": "localhost:9093",
    "tls": {
      "enable": true,
      "insecureSkipVerify": false
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# Application example

Application example reference: kafka_producer_test (opens new window)

Edit this page on GitHub (opens new window)
Last Updated: 2025/08/14, 08:19:07
redisClient
natsClient

← redisClient natsClient→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式