kafkaProducer
  x/kafkaProducer component: publish kafka data. Used to send the current message payload to the specified kafka topic.
# Configuration
This component supports connection reuse through the server field to share Kafka client connections and avoid duplicate connections. See Component Connection Reuse.
| Field | Type | Required | Description | Default value | |-----------|----------|----------|----------------------------------------------------------------------------------------------|--------------------|| | server | string | Yes | Kafka server address list, multiple addresses separated by commas | "127.0.0.1:9092" | | topic | string | Yes | Publish topic, can using Component Configuration Variables. | - | | key | string | No | Partition key, can using Component Configuration Variables. | - | | partition | int32 | No | Partition number. If set, messages will be sent directly to the specified partition | 0 | | sasl | object | No | SASL authentication configuration | See table below | | tls | object | No | TLS configuration | See table below |
# SASL Authentication Configuration
| Field | Type | Required | Description | Default value | 
|---|---|---|---|---|
| enable | bool | No | Whether to enable SASL authentication | false | 
| mechanism | string | No | Authentication mechanism, supports PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 | "PLAIN" | 
| username | string | No | Username | - | 
| password | string | No | Password | - | 
# TLS Configuration
| Field | Type | Required | Description | Default value | 
|---|---|---|---|---|
| enable | bool | No | Whether to enable TLS | false | 
| insecureSkipVerify | bool | No | Whether to skip certificate verification | false | 
# Working Principle
- The component connects to the Kafka cluster during initialization
 - When a message is received, it publishes the message content to the specified topic
 - On successful publication, routes through Success chain; on failure, routes through Failure chain
 - The component automatically manages connection lifecycle, including reconnection
 
# Relation Type
- Success: Message sent to 
Successchain in the following cases:- Message successfully published to Kafka cluster
 - Received confirmation from Kafka server
 
 - Failure: Message sent to 
Failurechain in the following cases:- Failed to connect to Kafka cluster
 - Message publication timeout
 - Message publication failed
 - Configuration parameter error
 
 
# Execution result
After component execution, the message metadata will be updated:
- msg.data remains unchanged
 - metadata will add the following fields:
- partition: The partition number where the message was actually written
 - offset: The offset of the message in the partition
 
 - msgType remains unchanged
 
# Configuration example
{
  "id": "s5",
  "type": "x/kafkaProducer",
  "name": "Publish to kafka",
  "debugMode": true,
  "configuration": {
    "topic": "device.msg.request",
    "server": "localhost:9092"
  }
}
 2
3
4
5
6
7
8
9
10
# Configuration example with SASL authentication
{
  "id": "s6",
  "type": "x/kafkaProducer",
  "name": "Publish to kafka (SASL)",
  "debugMode": true,
  "configuration": {
    "topic": "device.msg.request",
    "server": "localhost:9092",
    "sasl": {
      "enable": true,
      "mechanism": "PLAIN",
      "username": "your_username",
      "password": "your_password"
    }
  }
}
 2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Configuration example with TLS
{
  "id": "s7",
  "type": "x/kafkaProducer",
  "name": "Publish to kafka (TLS)",
  "debugMode": true,
  "configuration": {
    "topic": "device.msg.request",
    "server": "localhost:9093",
    "tls": {
      "enable": true,
      "insecureSkipVerify": false
    }
  }
}
 2
3
4
5
6
7
8
9
10
11
12
13
14
# Application example
Application example reference: kafka_producer_test (opens new window)