Pulsar Endpoint
Pulsar Endpoint is used to create and start Pulsar subscription services. It can subscribe to different topic data and route them to different rule chains for processing.
TIP
This component is an extension component that requires importing an additional extension library: rulego-components (opens new window)
# Type
endpoint/pulsar
# Startup Configuration
Field | Type | Required | Description | Default Value |
---|---|---|---|---|
server | string | Yes | Pulsar server address, format: pulsar://host:port | pulsar://localhost:6650 |
subscriptionName | string | No | Default subscription name, used if not specified when adding router | default |
subscriptionType | string | No | Subscription type, supports: Exclusive, Shared, Failover, KeyShared (case insensitive) | Shared |
authToken | string | No | Pulsar JWT authentication token | - |
certFile | string | No | TLS certificate file path | - |
certKeyFile | string | No | TLS private key file path | - |
# Router Configuration
When adding routes, you can specify the subscription (subscription name) through parameters:
// Add route using default subscription "default"
routerId, err := pulsarEndpoint.AddRouter(router)
// Add route specifying subscription as "my-subscription"
routerId, err := pulsarEndpoint.AddRouter(router, "my-subscription")
2
3
4
5
# Response
Before exchange.Out.SetBody
response, you need to specify the responseTopic
parameter through exchange.Out.Headers()
or exchange.Out.Msg.Metadata
, and the component will send data to the specified topic:
exchange.Out.GetMsg().Metadata.PutValue("responseTopic", "persistent://public/default/device-response")
// or
exchange.Out.Headers().Add("responseTopic", "persistent://public/default/device-response")
exchange.Out.SetBody([]byte("ok"))
2
3
4
5
Response parameter configuration:
Field | Type | Required | Description | Default Value |
---|---|---|---|---|
responseTopic | string | Yes | Response topic | - |
# Message Format
Received Pulsar messages are converted to RuleMsg, containing the following metadata:
topic
: Message topicmessageId
: Pulsar message IDpublishTime
: Message publish time (RFC3339 format)eventTime
: Message event time (RFC3339 format)key
: Message key (if any)- Custom properties of the message are also added to metadata
# Working Principle
- Component starts by connecting to Pulsar cluster based on configuration
- Creates corresponding consumers for each route using Shared subscription type
- Upon receiving messages, converts to RuleMsg and routes to corresponding rule chains for processing
- Supports sending processing results through response topics, response messages include original message properties
- Automatically acknowledges message processing completion
- Supports graceful shutdown, ensuring message processing completion before closing connections
# Advanced Features
# Message Property Propagation
The component automatically adds all custom properties of Pulsar messages to RuleMsg metadata for use in rule chains.
# Response Message Properties
When sending response messages, all fields in response headers are sent as message properties:
// Set response message properties
exchange.Out.Headers().Set("correlationId", "12345")
exchange.Out.Headers().Set("messageType", "response")
// Set response topic and message body
exchange.Out.GetMsg().Metadata.PutValue("responseTopic", "response-topic")
exchange.Out.SetBody([]byte("response data"))
2
3
4
5
6
7
# Configuration Example
{
"server": "pulsar://localhost:6650",
"subscriptionName": "my-subscription",
"subscriptionType": "shared",
"authToken": "your-jwt-token",
"certFile": "/path/to/cert.pem",
"certKeyFile": "/path/to/key.pem"
}
2
3
4
5
6
7
8
# Examples
Here are example codes using endpoints: