Kafka Endpoint
Kafka Endpoint is used to create and start Kafka subscription service, which can subscribe to different topic data, and then route it to different rule chains for processing.
TIP
This component is an extension component, and you need to import an additional extension library: rulego-components (opens new window)
# Type
endpoint/kafka
# Startup configuration
Field | Type | Required | Description | Default |
---|---|---|---|---|
server | string | Yes | Kafka server address list, multiple addresses separated by commas | "127.0.0.1:9092" |
groupId v0.23.0+ | string | No | Consumer Group ID | "rulego" |
sasl | object | No | SASL authentication configuration | See table below |
tls | object | No | TLS configuration | See table below |
# SASL Authentication Configuration
Field | Type | Required | Description | Default |
---|---|---|---|---|
enable | bool | No | Whether to enable SASL authentication | false |
mechanism | string | No | Authentication mechanism, supports PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 | "PLAIN" |
username | string | No | Username | - |
password | string | No | Password | - |
# TLS Configuration
Field | Type | Required | Description | Default |
---|---|---|---|---|
enable | bool | No | Whether to enable TLS | false |
insecureSkipVerify | bool | No | Whether to skip certificate verification | false |
# Response
Before exchange.Out.SetBody
responds, you need to specify the responseTopic
parameter through exchange.Out.Headers()
or exchange.Out.Msg.Metadata
, and the component will respond to the specified topic data:
exchange.Out.GetMsg().Metadata.PutValue("responseTopic", "device.msg.response")
// or
exchange.Out.Headers().Add("responseTopic", "device.msg.response")
exchange.Out.SetBody([]byte("ok"))
1
2
3
4
2
3
4
Response parameter configuration:
Field | Type | Required | Description | Default |
---|---|---|---|---|
responseTopic | string | Yes | response Topic | - |
partition | int | No | Partition | 0 |
key | string | No | Partition Key | - |
# Examples
The following are example codes using endpoint:
Edit this page on GitHub (opens new window)
Last Updated: 2025/08/14, 08:19:07