RuleGo RuleGo
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • Quick Start

  • Rule Chain

  • Standard Components

  • Extension Components

  • Custom Components

  • Components marketplace

  • Visualization

  • AOP

  • Trigger

  • Advanced Topic

  • RuleGo-Server

  • FAQ

  • Endpoint Module

  • Support

  • StreamSQL

    • Overview
    • Quick Start
    • Core Concepts
    • SQL Reference
    • API Reference
    • RuleGo Integration
      • Integration Overview
      • Component Configuration
        • streamTransform Component
        • Component Description
        • Component Configuration
        • Configuration Parameters
        • Usage Example
        • streamAggregator Component
        • Component Description
        • Component Configuration
        • Configuration Parameters
        • Usage Example
      • Complete IoT Data Processing Example
        • Project Structure
        • 1. Rule Chain Configuration (rule_chain.json)
        • 2. Go Application Code (main.go)
        • 3. go.mod File
        • 4. Project Documentation (README.md)
        • 2. Start MQTT Broker
        • 3. Run Application
        • 4. Subscribe to Results
      • Data Flow
      • Configuration
      • Extension
        • Advanced Features
        • 1. Custom Functions
        • 2. Error Handling
        • 3. Monitoring and Metrics
      • 📚 Related Documentation
    • functions

    • case-studies

目录

RuleGo Integration

# RuleGo Integration

This chapter introduces how to integrate StreamSQL with the RuleGo rule engine to achieve complex stream processing and rule orchestration.

# Integration Overview

StreamSQL provides seamless integration with RuleGo, offering two core components:

  • streamTransform: Stream transformation component, used for data filtering, transformation, and enrichment
  • streamAggregator: Stream aggregation component, used for real-time data aggregation and analysis

# Component Configuration

# streamTransform Component

# Component Description

streamTransform is a stream transformation component in RuleGo, based on StreamSQL, used for real-time data filtering, transformation, and enrichment.

# Component Configuration

{
  "id": "streamTransform",
  "type": "streamTransform",
  "configuration": {
    "sql": "SELECT deviceId, temperature, humidity FROM stream WHERE temperature > 25",
    "timeout": "30s",
    "logLevel": "INFO",
    "performanceConfig": "high"
  }
}
1
2
3
4
5
6
7
8
9
10

# Configuration Parameters

Parameter Type Required Description Default Value
sql string Yes StreamSQL query statement -
timeout string No Query timeout duration "30s"
logLevel string No Log level (DEBUG, INFO, WARN, ERROR) "INFO"
performanceConfig string No Performance configuration (high, low, zeroLoss) "high"
customConfig object No Custom configuration object null

# Usage Example

{
  "ruleChain": {
    "id": "iot_data_filter",
    "name": "IoT Data Filter",
    "root": true,
    "nodes": [
      {
        "id": "mqttIn",
        "type": "mqtt",
        "configuration": {
          "topic": "sensor/data",
          "qos": 1
        }
      },
      {
        "id": "temperatureFilter",
        "type": "streamTransform",
        "configuration": {
          "sql": "SELECT * FROM stream WHERE temperature > 25"
        }
      },
      {
        "id": "mqttOut",
        "type": "mqtt",
        "configuration": {
          "topic": "alerts/high_temperature",
          "qos": 1
        }
      }
    ],
    "connections": [
      {
        "fromId": "mqttIn",
        "toId": "temperatureFilter",
        "type": "Success"
      },
      {
        "fromId": "temperatureFilter",
        "toId": "mqttOut",
        "type": "Success"
      }
    ]
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

# streamAggregator Component

# Component Description

streamAggregator is a stream aggregation component in RuleGo, based on StreamSQL, used for real-time data aggregation and analysis.

# Component Configuration

{
  "id": "streamAggregator",
  "type": "streamAggregator",
  "configuration": {
    "sql": "SELECT deviceId, AVG(temperature) as avgTemp, COUNT(*) as count FROM stream GROUP BY deviceId, TumblingWindow('5m')",
    "timeout": "5m",
    "logLevel": "INFO",
    "performanceConfig": "high"
  }
}
1
2
3
4
5
6
7
8
9
10

# Configuration Parameters

Parameter Type Required Description Default Value
sql string Yes StreamSQL aggregation query statement -
timeout string No Query timeout duration "5m"
logLevel string No Log level (DEBUG, INFO, WARN, ERROR) "INFO"
performanceConfig string No Performance configuration (high, low, zeroLoss) "high"
customConfig object No Custom configuration object null

# Usage Example

{
  "ruleChain": {
    "id": "iot_aggregation",
    "name": "IoT Data Aggregation",
    "root": true,
    "nodes": [
      {
        "id": "mqttIn",
        "type": "mqtt",
        "configuration": {
          "topic": "sensor/data",
          "qos": 1
        }
      },
      {
        "id": "temperatureAggregator",
        "type": "streamAggregator",
        "configuration": {
          "sql": "SELECT deviceId, AVG(temperature) as avgTemp, MAX(temperature) as maxTemp, COUNT(*) as count FROM stream GROUP BY deviceId, TumblingWindow('5m')"
        }
      },
      {
        "id": "mqttOut",
        "type": "mqtt",
        "configuration": {
          "topic": "aggregates/temperature",
          "qos": 1
        }
      }
    ],
    "connections": [
      {
        "fromId": "mqttIn",
        "toId": "temperatureAggregator",
        "type": "Success"
      },
      {
        "fromId": "temperatureAggregator",
        "toId": "mqttOut",
        "type": "Success"
      }
    ]
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

# Complete IoT Data Processing Example

This example demonstrates a complete IoT data processing solution, including MQTT input, StreamSQL aggregation, and MQTT output.

# Project Structure

iot-streamsql-example/
├── main.go           # Main application
├── rule_chain.json   # Rule chain configuration
├── go.mod           # Go module file
└── README.md        # Project documentation
1
2
3
4
5

# 1. Rule Chain Configuration (rule_chain.json)

{
  "ruleChain": {
    "id": "iot_data_processor",
    "name": "IoT Data Processor",
    "root": true,
    "nodes": [
      {
        "id": "mqttIn",
        "type": "mqtt",
        "configuration": {
          "topic": "sensor/data/+",
          "qos": 1,
          "broker": "tcp://localhost:1883"
        }
      },
      {
        "id": "dataEnricher",
        "type": "streamTransform",
        "configuration": {
          "sql": "SELECT *, CASE WHEN temperature > 30 THEN 'high' WHEN temperature > 20 THEN 'normal' ELSE 'low' END as tempLevel FROM stream",
          "timeout": "30s"
        }
      },
      {
        "id": "realtimeAggregator",
        "type": "streamAggregator",
        "configuration": {
          "sql": "SELECT deviceId, AVG(temperature) as avgTemp, MAX(temperature) as maxTemp, MIN(temperature) as minTemp, COUNT(*) as count, ANY_VALUE(tempLevel) as tempLevel FROM stream GROUP BY deviceId, TumblingWindow('1m')",
          "timeout": "1m"
        }
      },
      {
        "id": "alertGenerator",
        "type": "streamTransform",
        "configuration": {
          "sql": "SELECT deviceId, avgTemp, maxTemp, count, CASE WHEN maxTemp > 35 THEN 'critical' WHEN maxTemp > 30 THEN 'warning' ELSE 'normal' END as alertLevel FROM stream WHERE maxTemp > 30"
        }
      },
      {
        "id": "mqttOut",
        "type": "mqtt",
        "configuration": {
          "topic": "aggregates/device_data",
          "qos": 1,
          "broker": "tcp://localhost:1883"
        }
      },
      {
        "id": "alertOut",
        "type": "mqtt",
        "configuration": {
          "topic": "alerts/temperature",
          "qos": 2,
          "broker": "tcp://localhost:1883"
        }
      }
    ],
    "connections": [
      {
        "fromId": "mqttIn",
        "toId": "dataEnricher",
        "type": "Success"
      },
      {
        "fromId": "dataEnricher",
        "toId": "realtimeAggregator",
        "type": "Success"
      },
      {
        "fromId": "realtimeAggregator",
        "toId": "mqttOut",
        "type": "Success"
      },
      {
        "fromId": "realtimeAggregator",
        "toId": "alertGenerator",
        "type": "Success"
      },
      {
        "fromId": "alertGenerator",
        "toId": "alertOut",
        "type": "Success"
      }
    ]
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86

# 2. Go Application Code (main.go)

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/rulego/rulego"
    "github.com/rulego/streamsql"
    _ "github.com/rulego/streamsql/rulego"
)

// IoTData represents IoT device data structure
type IoTData struct {
    DeviceID    string    `json:"deviceId"`
    Temperature float64   `json:"temperature"`
    Humidity    float64   `json:"humidity"`
    Timestamp   time.Time `json:"timestamp"`
}

func main() {
    // 1. Initialize RuleGo
    config := rulego.NewConfig()
    
    // 2. Load rule chain configuration
    ruleChainFile := "rule_chain.json"
    ruleChain, err := loadRuleChain(ruleChainFile)
    if err != nil {
        log.Fatalf("Failed to load rule chain: %v", err)
    }
    
    // 3. Initialize rule engine
    engine, err := rulego.New(ruleChain, config)
    if err != nil {
        log.Fatalf("Failed to initialize rule engine: %v", err)
    }
    
    // 4. Start rule engine
    if err := engine.Start(); err != nil {
        log.Fatalf("Failed to start rule engine: %v", err)
    }
    
    fmt.Println("IoT data processing system started successfully!")
    fmt.Println("- Listening on topic: sensor/data/+")
    fmt.Println("- Aggregating every 1 minute")
    fmt.Println("- Publishing results to: aggregates/device_data")
    fmt.Println("- Publishing alerts to: alerts/temperature")
    
    // 5. Simulate sending test data
    go simulateData(engine)
    
    // 6. Wait for shutdown signal
    waitForShutdown()
    
    // 7. Stop rule engine
    engine.Stop()
    fmt.Println("System stopped")
}

// loadRuleChain loads rule chain configuration
func loadRuleChain(filename string) (*rulego.RuleChain, error) {
    data, err := os.ReadFile(filename)
    if err != nil {
        return nil, err
    }
    
    var ruleChain rulego.RuleChain
    err = json.Unmarshal(data, &ruleChain)
    return &ruleChain, err
}

// simulateData simulates sending IoT device data
func simulateData(engine *rulego.RuleGo) {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()
    
    devices := []string{"device-001", "device-002", "device-003"}
    
    for {
        select {
        case <-ticker.C:
            for _, deviceID := range devices {
                data := IoTData{
                    DeviceID:    deviceID,
                    Temperature: 20.0 + float64(time.Now().Unix()%30), // Random temperature 20-50°C
                    Humidity:    30.0 + float64(time.Now().Unix()%40), // Random humidity 30-70%
                    Timestamp:   time.Now(),
                }
                
                jsonData, _ := json.Marshal(data)
                
                // Send data to rule engine
                msg := rulego.NewMsg("", "JSON", jsonData)
                engine.OnMsg(msg)
                
                fmt.Printf("Sent data: %s\n", string(jsonData))
            }
        }
    }
}

// waitForShutdown waits for shutdown signal
func waitForShutdown() {
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    
    <-sigChan
    fmt.Println("\nReceived shutdown signal, stopping system...")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113

# 3. go.mod File

module iot-streamsql-example

go 1.21

require (
    github.com/rulego/rulego v0.21.0
    github.com/rulego/streamsql v0.15.0
)

require (
    // Other dependencies will be automatically resolved
)
1
2
3
4
5
6
7
8
9
10
11
12

# 4. Project Documentation (README.md)

# IoT StreamSQL Example

This project demonstrates how to use StreamSQL with RuleGo to build a complete IoT data processing system.

## Features

- Real-time IoT device data processing
- Temperature anomaly detection
- Data aggregation by 1-minute windows
- MQTT-based message communication
- Configurable rule chains

## Quick Start

### 1. Install Dependencies

```bash
go mod tidy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 2. Start MQTT Broker

You can use Mosquitto or other MQTT brokers:

# Start Mosquitto
mosquitto -p 1883
1
2

# 3. Run Application

go run main.go
1

# 4. Subscribe to Results

# Subscribe to aggregation results
mosquitto_sub -t "aggregates/device_data" -v

# Subscribe to alerts
mosquitto_sub -t "alerts/temperature" -v
1
2
3
4
5

# Data Flow

  1. Input: IoT devices publish data to sensor/data/+
  2. Enrichment: Add temperature level tags
  3. Aggregation: Calculate average, maximum, and minimum values by device every 1 minute
  4. Output: Publish results to aggregates/device_data
  5. Alerts: Generate alerts when temperature exceeds thresholds

# Configuration

Modify rule_chain.json to adjust:

  • MQTT broker configuration
  • StreamSQL query statements
  • Aggregation time windows
  • Alert thresholds

# Extension

You can extend this example by:

  • Adding more sensor types
  • Implementing more complex aggregation rules
  • Adding data persistence
  • Integrating with other systems

## Data Flow Description

The complete data processing flow is as follows:

1. **Data Input**: IoT devices publish sensor data to the `sensor/data/+` topic
2. **Data Enrichment**: The `dataEnricher` node uses StreamSQL to add temperature level tags
3. **Real-time Aggregation**: The `realtimeAggregator` node performs aggregation calculations every 1 minute
4. **Result Output**: Aggregated results are published to the `aggregates/device_data` topic
5. **Alert Generation**: The `alertGenerator` node generates alerts based on temperature thresholds
6. **Alert Output**: Alert messages are published to the `alerts/temperature` topic

## Core Features

### 1. Real-time Processing
- Supports millisecond-level data processing
- Automatic window calculation and triggering
- Configurable processing latency

### 2. Flexible Configuration
- Rule chain configuration via JSON
- Dynamic SQL query modification
- Configurable performance parameters

### 3. High Reliability
- Automatic failover and recovery
- Data persistence and replay
- Monitoring and alerting

### 4. Easy Extension
- Custom function support
- Plugin-based architecture
- Integration with other systems

## Extension Configuration

### Custom Performance Configuration

```json
{
  "id": "streamAggregator",
  "type": "streamAggregator",
  "configuration": {
    "sql": "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('5m')",
    "customConfig": {
      "bufferConfig": {
        "dataChannelSize": 5000,
        "resultChannelSize": 2000
      },
      "overflowConfig": {
        "strategy": "persist",
        "blockTimeout": "10s"
      },
      "workerConfig": {
        "sinkPoolSize": 100,
        "sinkWorkerCount": 20
      }
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

# Advanced Features

# 1. Custom Functions

You can register custom functions in the Go application:

// Register custom function
functions.RegisterCustomFunction(
    "calculate_heat_index",
    functions.TypeMath,
    "Heat Index Calculation",
    "Calculate heat index based on temperature and humidity",
    2, 2,
    func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
        temp, _ := functions.ConvertToFloat64(args[0])
        humidity, _ := functions.ConvertToFloat64(args[1])
        
        // Calculate heat index
        heatIndex := -42.379 + 2.04901523*temp + 10.14333127*humidity -
            0.22475541*temp*humidity - 0.00683783*temp*temp -
            0.05481717*humidity*humidity + 0.00122874*temp*temp*humidity +
            0.00085282*temp*humidity*humidity - 0.00000199*temp*temp*humidity*humidity
        
        return heatIndex, nil
    },
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 2. Error Handling

Configure error handling strategies:

{
  "id": "streamAggregator",
  "type": "streamAggregator",
  "configuration": {
    "sql": "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('5m')",
    "errorHandling": {
      "strategy": "retry",
      "maxRetries": 3,
      "retryInterval": "1s",
      "deadLetterTopic": "errors/stream_processing"
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 3. Monitoring and Metrics

Enable detailed monitoring:

{
  "id": "streamAggregator",
  "type": "streamAggregator",
  "configuration": {
    "sql": "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('5m')",
    "monitoring": {
      "enableMetrics": true,
      "metricsInterval": "30s",
      "metricsTopic": "metrics/stream_processing"
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12

# 📚 Related Documentation

  • API Reference - View complete API documentation
  • SQL Reference - View SQL syntax reference
  • Custom Functions - Learn how to develop custom functions
  • Performance Optimization - Learn about performance optimization techniques
Edit this page on GitHub (opens new window)
Last Updated: 2025/08/05, 02:24:31
API Reference
Aggregate Functions

← API Reference Aggregate Functions→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式