kafkaProducer
x/kafkaProducer
component: publish kafka data. Used to send the current message payload to the specified kafka topic.
# Configuration
This component supports connection reuse through the server
field to share Kafka client connections and avoid duplicate connections. See Component Connection Reuse.
| Field | Type | Required | Description | Default value | |-----------|----------|----------|----------------------------------------------------------------------------------------------|--------------------|| | server | string | Yes | Kafka server address list, multiple addresses separated by commas | "127.0.0.1:9092" | | topic | string | Yes | Publish topic, can using Component Configuration Variables. | - | | key | string | No | Partition key, can using Component Configuration Variables. | - | | partition | int32 | No | Partition number. If set, messages will be sent directly to the specified partition | 0 | | sasl | object | No | SASL authentication configuration | See table below | | tls | object | No | TLS configuration | See table below |
# SASL Authentication Configuration
Field | Type | Required | Description | Default value |
---|---|---|---|---|
enable | bool | No | Whether to enable SASL authentication | false |
mechanism | string | No | Authentication mechanism, supports PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 | "PLAIN" |
username | string | No | Username | - |
password | string | No | Password | - |
# TLS Configuration
Field | Type | Required | Description | Default value |
---|---|---|---|---|
enable | bool | No | Whether to enable TLS | false |
insecureSkipVerify | bool | No | Whether to skip certificate verification | false |
# Working Principle
- The component connects to the Kafka cluster during initialization
- When a message is received, it publishes the message content to the specified topic
- On successful publication, routes through Success chain; on failure, routes through Failure chain
- The component automatically manages connection lifecycle, including reconnection
# Relation Type
- Success: Message sent to
Success
chain in the following cases:- Message successfully published to Kafka cluster
- Received confirmation from Kafka server
- Failure: Message sent to
Failure
chain in the following cases:- Failed to connect to Kafka cluster
- Message publication timeout
- Message publication failed
- Configuration parameter error
# Execution result
After component execution, the message metadata will be updated:
- msg.data remains unchanged
- metadata will add the following fields:
- partition: The partition number where the message was actually written
- offset: The offset of the message in the partition
- msgType remains unchanged
# Configuration example
{
"id": "s5",
"type": "x/kafkaProducer",
"name": "Publish to kafka",
"debugMode": true,
"configuration": {
"topic": "device.msg.request",
"server": "localhost:9092"
}
}
2
3
4
5
6
7
8
9
10
# Configuration example with SASL authentication
{
"id": "s6",
"type": "x/kafkaProducer",
"name": "Publish to kafka (SASL)",
"debugMode": true,
"configuration": {
"topic": "device.msg.request",
"server": "localhost:9092",
"sasl": {
"enable": true,
"mechanism": "PLAIN",
"username": "your_username",
"password": "your_password"
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Configuration example with TLS
{
"id": "s7",
"type": "x/kafkaProducer",
"name": "Publish to kafka (TLS)",
"debugMode": true,
"configuration": {
"topic": "device.msg.request",
"server": "localhost:9093",
"tls": {
"enable": true,
"insecureSkipVerify": false
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# Application example
Application example reference: kafka_producer_test (opens new window)