Nats Endpoint
Nats Endpoint v0.21.0+ is used to create and start a Nats subscription service. It can subscribe to data on different topics and then route it to different rule chains for processing.
TIP
This component is an extension component and requires the inclusion of an additional extension library: rulego-components (opens new window)
# Type
endpoint/nats
# Startup Configuration
This component allows the reuse of shared connection clients through the server
field. See Component Connection Reuse for reference.
Field | Type | Required | Description | Default Value |
---|---|---|---|---|
server | string | Yes | NATS server address in format host:port | None |
username | string | No | NATS server authentication username | None |
password | string | No | NATS server authentication password | None |
groupId | string | No | Consumer group ID for load balancing. Uses queue subscription mode when set, normal mode when empty | None |
# How It Works
# Subscription Modes
Normal Subscription Mode (groupId empty):
- Uses
Subscribe
method to subscribe to topics - Each subscriber receives all messages published to the topic
- Suitable for broadcast scenarios
Queue Subscription Mode (groupId set):
- Uses
QueueSubscribe
method to subscribe to topics - Multiple subscribers with the same groupId form a consumer group
- Messages are distributed within the group in a load-balanced manner, ensuring each message is processed by only one consumer in the group
- Suitable for load balancing and high availability scenarios
# Message Processing
- After receiving NATS messages, the component converts them to RuleGo message format
- Routes messages to corresponding rule chains based on routing configuration
- Supports message response through
responseTopic
# Response
Before responding with exchange.Out.SetBody
, you need to specify the responseTopic
parameter through exchange.Out.Headers()
or exchange.Out.Msg.Metadata
. The component will then respond with data to the specified topic:
exchange.Out.GetMsg().Metadata.PutValue("responseTopic", "device.msg.response")
// or
exchange.Out.Headers().Add("responseTopic", "device.msg.response")
exchange.Out.SetBody([]byte("ok"))
2
3
4
Response parameter configuration:
Field | Type | Required | Description | Default Value |
---|---|---|---|---|
responseTopic | string | Yes | Response Topic | - |
# Example
# Basic Configuration Examples
Normal Subscription Mode:
{
"server": "nats://127.0.0.1:4222",
"username": "user",
"password": "password"
}
2
3
4
5
Queue Subscription Mode (Load Balancing):
{
"server": "nats://127.0.0.1:4222",
"username": "user",
"password": "password",
"groupId": "device-processors"
}
2
3
4
5
6
# Code Example
// Create NATS endpoint
config := nats.Config{
Server: "nats://127.0.0.1:4222",
Username: "user",
Password: "password",
GroupId: "device-group", // Set consumer group ID
}
ep := &nats.Nats{}
ep.Config = config
// Add router
ep.AddRouter(rulego.NewRouter().From("device.msg.request").To("chain1"))
// Start endpoint
ep.Start()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# More Examples
Below is an example code using the endpoint: