RuleGo Integration
# RuleGo Integration
This chapter introduces how to integrate StreamSQL with the RuleGo rule engine to achieve complex stream processing and rule orchestration.
# Integration Overview
StreamSQL provides seamless integration with RuleGo, offering two core components:
streamTransform
: Stream transformation component, used for data filtering, transformation, and enrichmentstreamAggregator
: Stream aggregation component, used for real-time data aggregation and analysis
# Component Configuration
# streamTransform Component
# Component Description
streamTransform
is a stream transformation component in RuleGo, based on StreamSQL, used for real-time data filtering, transformation, and enrichment.
# Component Configuration
{
"id": "streamTransform",
"type": "streamTransform",
"configuration": {
"sql": "SELECT deviceId, temperature, humidity FROM stream WHERE temperature > 25",
"timeout": "30s",
"logLevel": "INFO",
"performanceConfig": "high"
}
}
2
3
4
5
6
7
8
9
10
# Configuration Parameters
Parameter | Type | Required | Description | Default Value |
---|---|---|---|---|
sql | string | Yes | StreamSQL query statement | - |
timeout | string | No | Query timeout duration | "30s" |
logLevel | string | No | Log level (DEBUG, INFO, WARN, ERROR) | "INFO" |
performanceConfig | string | No | Performance configuration (high, low, zeroLoss) | "high" |
customConfig | object | No | Custom configuration object | null |
# Usage Example
{
"ruleChain": {
"id": "iot_data_filter",
"name": "IoT Data Filter",
"root": true,
"nodes": [
{
"id": "mqttIn",
"type": "mqtt",
"configuration": {
"topic": "sensor/data",
"qos": 1
}
},
{
"id": "temperatureFilter",
"type": "streamTransform",
"configuration": {
"sql": "SELECT * FROM stream WHERE temperature > 25"
}
},
{
"id": "mqttOut",
"type": "mqtt",
"configuration": {
"topic": "alerts/high_temperature",
"qos": 1
}
}
],
"connections": [
{
"fromId": "mqttIn",
"toId": "temperatureFilter",
"type": "Success"
},
{
"fromId": "temperatureFilter",
"toId": "mqttOut",
"type": "Success"
}
]
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# streamAggregator Component
# Component Description
streamAggregator
is a stream aggregation component in RuleGo, based on StreamSQL, used for real-time data aggregation and analysis.
# Component Configuration
{
"id": "streamAggregator",
"type": "streamAggregator",
"configuration": {
"sql": "SELECT deviceId, AVG(temperature) as avgTemp, COUNT(*) as count FROM stream GROUP BY deviceId, TumblingWindow('5m')",
"timeout": "5m",
"logLevel": "INFO",
"performanceConfig": "high"
}
}
2
3
4
5
6
7
8
9
10
# Configuration Parameters
Parameter | Type | Required | Description | Default Value |
---|---|---|---|---|
sql | string | Yes | StreamSQL aggregation query statement | - |
timeout | string | No | Query timeout duration | "5m" |
logLevel | string | No | Log level (DEBUG, INFO, WARN, ERROR) | "INFO" |
performanceConfig | string | No | Performance configuration (high, low, zeroLoss) | "high" |
customConfig | object | No | Custom configuration object | null |
# Usage Example
{
"ruleChain": {
"id": "iot_aggregation",
"name": "IoT Data Aggregation",
"root": true,
"nodes": [
{
"id": "mqttIn",
"type": "mqtt",
"configuration": {
"topic": "sensor/data",
"qos": 1
}
},
{
"id": "temperatureAggregator",
"type": "streamAggregator",
"configuration": {
"sql": "SELECT deviceId, AVG(temperature) as avgTemp, MAX(temperature) as maxTemp, COUNT(*) as count FROM stream GROUP BY deviceId, TumblingWindow('5m')"
}
},
{
"id": "mqttOut",
"type": "mqtt",
"configuration": {
"topic": "aggregates/temperature",
"qos": 1
}
}
],
"connections": [
{
"fromId": "mqttIn",
"toId": "temperatureAggregator",
"type": "Success"
},
{
"fromId": "temperatureAggregator",
"toId": "mqttOut",
"type": "Success"
}
]
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# Complete IoT Data Processing Example
This example demonstrates a complete IoT data processing solution, including MQTT input, StreamSQL aggregation, and MQTT output.
# Project Structure
iot-streamsql-example/
├── main.go # Main application
├── rule_chain.json # Rule chain configuration
├── go.mod # Go module file
└── README.md # Project documentation
2
3
4
5
# 1. Rule Chain Configuration (rule_chain.json)
{
"ruleChain": {
"id": "iot_data_processor",
"name": "IoT Data Processor",
"root": true,
"nodes": [
{
"id": "mqttIn",
"type": "mqtt",
"configuration": {
"topic": "sensor/data/+",
"qos": 1,
"broker": "tcp://localhost:1883"
}
},
{
"id": "dataEnricher",
"type": "streamTransform",
"configuration": {
"sql": "SELECT *, CASE WHEN temperature > 30 THEN 'high' WHEN temperature > 20 THEN 'normal' ELSE 'low' END as tempLevel FROM stream",
"timeout": "30s"
}
},
{
"id": "realtimeAggregator",
"type": "streamAggregator",
"configuration": {
"sql": "SELECT deviceId, AVG(temperature) as avgTemp, MAX(temperature) as maxTemp, MIN(temperature) as minTemp, COUNT(*) as count, ANY_VALUE(tempLevel) as tempLevel FROM stream GROUP BY deviceId, TumblingWindow('1m')",
"timeout": "1m"
}
},
{
"id": "alertGenerator",
"type": "streamTransform",
"configuration": {
"sql": "SELECT deviceId, avgTemp, maxTemp, count, CASE WHEN maxTemp > 35 THEN 'critical' WHEN maxTemp > 30 THEN 'warning' ELSE 'normal' END as alertLevel FROM stream WHERE maxTemp > 30"
}
},
{
"id": "mqttOut",
"type": "mqtt",
"configuration": {
"topic": "aggregates/device_data",
"qos": 1,
"broker": "tcp://localhost:1883"
}
},
{
"id": "alertOut",
"type": "mqtt",
"configuration": {
"topic": "alerts/temperature",
"qos": 2,
"broker": "tcp://localhost:1883"
}
}
],
"connections": [
{
"fromId": "mqttIn",
"toId": "dataEnricher",
"type": "Success"
},
{
"fromId": "dataEnricher",
"toId": "realtimeAggregator",
"type": "Success"
},
{
"fromId": "realtimeAggregator",
"toId": "mqttOut",
"type": "Success"
},
{
"fromId": "realtimeAggregator",
"toId": "alertGenerator",
"type": "Success"
},
{
"fromId": "alertGenerator",
"toId": "alertOut",
"type": "Success"
}
]
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# 2. Go Application Code (main.go)
package main
import (
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/rulego/rulego"
"github.com/rulego/streamsql"
_ "github.com/rulego/streamsql/rulego"
)
// IoTData represents IoT device data structure
type IoTData struct {
DeviceID string `json:"deviceId"`
Temperature float64 `json:"temperature"`
Humidity float64 `json:"humidity"`
Timestamp time.Time `json:"timestamp"`
}
func main() {
// 1. Initialize RuleGo
config := rulego.NewConfig()
// 2. Load rule chain configuration
ruleChainFile := "rule_chain.json"
ruleChain, err := loadRuleChain(ruleChainFile)
if err != nil {
log.Fatalf("Failed to load rule chain: %v", err)
}
// 3. Initialize rule engine
engine, err := rulego.New(ruleChain, config)
if err != nil {
log.Fatalf("Failed to initialize rule engine: %v", err)
}
// 4. Start rule engine
if err := engine.Start(); err != nil {
log.Fatalf("Failed to start rule engine: %v", err)
}
fmt.Println("IoT data processing system started successfully!")
fmt.Println("- Listening on topic: sensor/data/+")
fmt.Println("- Aggregating every 1 minute")
fmt.Println("- Publishing results to: aggregates/device_data")
fmt.Println("- Publishing alerts to: alerts/temperature")
// 5. Simulate sending test data
go simulateData(engine)
// 6. Wait for shutdown signal
waitForShutdown()
// 7. Stop rule engine
engine.Stop()
fmt.Println("System stopped")
}
// loadRuleChain loads rule chain configuration
func loadRuleChain(filename string) (*rulego.RuleChain, error) {
data, err := os.ReadFile(filename)
if err != nil {
return nil, err
}
var ruleChain rulego.RuleChain
err = json.Unmarshal(data, &ruleChain)
return &ruleChain, err
}
// simulateData simulates sending IoT device data
func simulateData(engine *rulego.RuleGo) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
devices := []string{"device-001", "device-002", "device-003"}
for {
select {
case <-ticker.C:
for _, deviceID := range devices {
data := IoTData{
DeviceID: deviceID,
Temperature: 20.0 + float64(time.Now().Unix()%30), // Random temperature 20-50°C
Humidity: 30.0 + float64(time.Now().Unix()%40), // Random humidity 30-70%
Timestamp: time.Now(),
}
jsonData, _ := json.Marshal(data)
// Send data to rule engine
msg := rulego.NewMsg("", "JSON", jsonData)
engine.OnMsg(msg)
fmt.Printf("Sent data: %s\n", string(jsonData))
}
}
}
}
// waitForShutdown waits for shutdown signal
func waitForShutdown() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
fmt.Println("\nReceived shutdown signal, stopping system...")
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# 3. go.mod File
module iot-streamsql-example
go 1.21
require (
github.com/rulego/rulego v0.21.0
github.com/rulego/streamsql v0.15.0
)
require (
// Other dependencies will be automatically resolved
)
2
3
4
5
6
7
8
9
10
11
12
# 4. Project Documentation (README.md)
# IoT StreamSQL Example
This project demonstrates how to use StreamSQL with RuleGo to build a complete IoT data processing system.
## Features
- Real-time IoT device data processing
- Temperature anomaly detection
- Data aggregation by 1-minute windows
- MQTT-based message communication
- Configurable rule chains
## Quick Start
### 1. Install Dependencies
```bash
go mod tidy
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 2. Start MQTT Broker
You can use Mosquitto or other MQTT brokers:
# Start Mosquitto
mosquitto -p 1883
2
# 3. Run Application
go run main.go
# 4. Subscribe to Results
# Subscribe to aggregation results
mosquitto_sub -t "aggregates/device_data" -v
# Subscribe to alerts
mosquitto_sub -t "alerts/temperature" -v
2
3
4
5
# Data Flow
- Input: IoT devices publish data to
sensor/data/+
- Enrichment: Add temperature level tags
- Aggregation: Calculate average, maximum, and minimum values by device every 1 minute
- Output: Publish results to
aggregates/device_data
- Alerts: Generate alerts when temperature exceeds thresholds
# Configuration
Modify rule_chain.json
to adjust:
- MQTT broker configuration
- StreamSQL query statements
- Aggregation time windows
- Alert thresholds
# Extension
You can extend this example by:
- Adding more sensor types
- Implementing more complex aggregation rules
- Adding data persistence
- Integrating with other systems
## Data Flow Description
The complete data processing flow is as follows:
1. **Data Input**: IoT devices publish sensor data to the `sensor/data/+` topic
2. **Data Enrichment**: The `dataEnricher` node uses StreamSQL to add temperature level tags
3. **Real-time Aggregation**: The `realtimeAggregator` node performs aggregation calculations every 1 minute
4. **Result Output**: Aggregated results are published to the `aggregates/device_data` topic
5. **Alert Generation**: The `alertGenerator` node generates alerts based on temperature thresholds
6. **Alert Output**: Alert messages are published to the `alerts/temperature` topic
## Core Features
### 1. Real-time Processing
- Supports millisecond-level data processing
- Automatic window calculation and triggering
- Configurable processing latency
### 2. Flexible Configuration
- Rule chain configuration via JSON
- Dynamic SQL query modification
- Configurable performance parameters
### 3. High Reliability
- Automatic failover and recovery
- Data persistence and replay
- Monitoring and alerting
### 4. Easy Extension
- Custom function support
- Plugin-based architecture
- Integration with other systems
## Extension Configuration
### Custom Performance Configuration
```json
{
"id": "streamAggregator",
"type": "streamAggregator",
"configuration": {
"sql": "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('5m')",
"customConfig": {
"bufferConfig": {
"dataChannelSize": 5000,
"resultChannelSize": 2000
},
"overflowConfig": {
"strategy": "persist",
"blockTimeout": "10s"
},
"workerConfig": {
"sinkPoolSize": 100,
"sinkWorkerCount": 20
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# Advanced Features
# 1. Custom Functions
You can register custom functions in the Go application:
// Register custom function
functions.RegisterCustomFunction(
"calculate_heat_index",
functions.TypeMath,
"Heat Index Calculation",
"Calculate heat index based on temperature and humidity",
2, 2,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
temp, _ := functions.ConvertToFloat64(args[0])
humidity, _ := functions.ConvertToFloat64(args[1])
// Calculate heat index
heatIndex := -42.379 + 2.04901523*temp + 10.14333127*humidity -
0.22475541*temp*humidity - 0.00683783*temp*temp -
0.05481717*humidity*humidity + 0.00122874*temp*temp*humidity +
0.00085282*temp*humidity*humidity - 0.00000199*temp*temp*humidity*humidity
return heatIndex, nil
},
)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 2. Error Handling
Configure error handling strategies:
{
"id": "streamAggregator",
"type": "streamAggregator",
"configuration": {
"sql": "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('5m')",
"errorHandling": {
"strategy": "retry",
"maxRetries": 3,
"retryInterval": "1s",
"deadLetterTopic": "errors/stream_processing"
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
# 3. Monitoring and Metrics
Enable detailed monitoring:
{
"id": "streamAggregator",
"type": "streamAggregator",
"configuration": {
"sql": "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('5m')",
"monitoring": {
"enableMetrics": true,
"metricsInterval": "30s",
"metricsTopic": "metrics/stream_processing"
}
}
}
2
3
4
5
6
7
8
9
10
11
12
# 📚 Related Documentation
- API Reference - View complete API documentation
- SQL Reference - View SQL syntax reference
- Custom Functions - Learn how to develop custom functions
- Performance Optimization - Learn about performance optimization techniques