RuleGo RuleGo
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • 概述
  • 快速开始
  • 核心概念
  • SQL参考
  • API参考
  • RuleGo集成
    • 组件概述
      • streamTransform 流转换器
      • streamAggregator 流聚合器
    • 完整示例:MQTT + StreamSQL + MQTT
      • 系统架构
      • 1. 规则链DSL配置
      • 2. Go应用程序代码(非必须,使用rulego-server可以0代码部署规则链)
      • 3. 依赖管理
      • 4. 运行示例
      • 方式一:使用 RuleGo-Server(推荐)
      • 方式二:使用 RuleGo-Server 可视化设计
      • 方式三:编程方式(自定义开发)
    • 数据流说明
      • 输入数据格式
      • 聚合结果格式
    • 核心特性
      • 1. 实时流式聚合
      • 2. 数据验证与过滤
      • 3. 智能告警
      • 4. 双路输出
      • 5. 错误处理
    • 扩展配置
      • 修改聚合窗口
      • 添加更多聚合指标
      • 滑动窗口聚合
  • 函数

  • 案例集锦

目录

RuleGo集成

# RuleGo集成

StreamSQL 提供了与 RuleGo 规则引擎的深度集成,通过两个专用组件实现流式数据处理能力:

  • streamTransform - 流转换器组件
  • streamAggregator - 流聚合器组件

# 组件概述

# streamTransform 流转换器

节点类型: x/streamTransform

流转换器用于处理非聚合的SQL查询,支持数据过滤、字段选择、计算和转换等操作。适用于实时数据清洗、格式转换和简单计算场景。

主要特性:

  • 支持标准SQL SELECT语句(非聚合)
  • 实时数据转换和计算
  • 字段过滤和重命名
  • 条件过滤和数据验证
  • 单条和批量数据处理

# streamAggregator 流聚合器

节点类型: x/streamAggregator

流聚合器用于处理聚合SQL查询,支持窗口聚合、分组聚合等复杂的流式计算。适用于实时边缘计算、统计分析、监控报警和数据汇总场景。

主要特性:

  • 支持聚合函数(COUNT、SUM、AVG、MAX、MIN等)
  • 滚动窗口、滑动窗口、计数窗口和会话窗口等
  • 分组聚合和多维度统计
  • 实时计算和结果输出
  • 灵活的窗口配置

# 完整示例:MQTT + StreamSQL + MQTT

以下是一个完整的IoT数据处理示例,展示如何使用MQTT输入端接收传感器数据,通过StreamSQL进行流式聚合处理,然后将结果通过MQTT发布出去。

# 系统架构

MQTT输入端 → 数据验证 → StreamSQL聚合器 → MQTT输出端
    ↓              ↓              ↓              ↓
接收传感器数据  →  数据清洗  →  实时聚合计算  →  发布聚合结果
1
2
3

# 1. 规则链DSL配置

创建 iot_stream_processing_chain.json 文件:

{
  "ruleChain": {
    "id": "iot_stream_processor",
    "name": "IoT数据流处理链",
    "root": true,
    "debugMode": true,
    "additionalInfo": {
      "description": "MQTT输入 + StreamSQL聚合 + MQTT输出的完整IoT数据处理示例"
    }
  },
  "metadata": {
    "endpoints": [
      {
        "id": "mqtt_sensor_input",
        "type": "endpoint/mqtt",
        "name": "传感器数据MQTT输入端",
        "configuration": {
          "server": "127.0.0.1:1883",
          "username": "",
          "password": "",
          "qos": 1,
          "clientId": "rulego_sensor_subscriber"
        },
        "routers": [
          {
            "id": "sensor_data_router",
            "from": {
              "path": "sensors/+/data",
              "processors": ["setJsonDataType"]
            },
            "to": {
              "path": "iot_stream_processor:data_validator"
            }
          }
        ]
      }
    ],
    "nodes": [
      {
        "id": "data_validator",
        "type": "jsFilter",
        "name": "数据验证器",
        "configuration": {
          "jsScript": "// 验证传感器数据格式\nif (!msg.deviceId || !msg.timestamp || msg.temperature === undefined) {\n  return false;\n}\n// 验证数据范围\nif (msg.temperature < -50 || msg.temperature > 100) {\n  return false;\n}\nif (msg.humidity !== undefined && (msg.humidity < 0 || msg.humidity > 100)) {\n  return false;\n}\nreturn true;"
        },
        "additionalInfo": {
          "layoutX": 200,
          "layoutY": 100
        }
      },
      {
        "id": "stream_aggregator",
        "type": "x/streamAggregator",
        "name": "流式聚合器",
        "configuration": {
          "sql": "SELECT deviceId, AVG(temperature) as avg_temp, MAX(temperature) as max_temp, MIN(temperature) as min_temp, COUNT(*) as data_count, AVG(humidity) as avg_humidity FROM stream WHERE temperature IS NOT NULL GROUP BY deviceId, TumblingWindow('30s')"
        },
        "additionalInfo": {
          "layoutX": 400,
          "layoutY": 100
        }
      },
      {
        "id": "data_enricher",
        "type": "jsTransform",
        "name": "数据丰富器",
        "configuration": {
          "jsScript": "// 丰富原始数据\nvar enrichedData = msg;\nenrichedData.processedAt = new Date().toISOString();\nenrichedData.source = 'sensor_network';\n// 添加设备位置信息(模拟)\nvar deviceLocations = {\n  'sensor_001': { building: 'A', floor: 1, room: '101' },\n  'sensor_002': { building: 'A', floor: 1, room: '102' },\n  'sensor_003': { building: 'B', floor: 2, room: '201' }\n};\nif (deviceLocations[enrichedData.deviceId]) {\n  enrichedData.location = deviceLocations[enrichedData.deviceId];\n}\nreturn { msg: enrichedData, metadata: metadata, msgType: msgType };"
        },
        "additionalInfo": {
          "layoutX": 400,
          "layoutY": 250
        }
      },
      {
        "id": "aggregation_processor",
        "type": "jsTransform",
        "name": "聚合结果处理器",
        "configuration": {
          "jsScript": "// 处理聚合结果\nvar aggregationResult = msg;\naggregationResult.aggregatedAt = new Date().toISOString();\naggregationResult.windowType = 'tumbling_30s';\naggregationResult.alertLevel = 'INFO';\n\n// 温度异常检测\nif (aggregationResult.max_temp > 35) {\n  aggregationResult.alertLevel = 'WARNING';\n  aggregationResult.alert = 'HIGH_TEMPERATURE_DETECTED';\n  aggregationResult.alertMessage = '设备 ' + aggregationResult.deviceId + ' 最高温度达到 ' + aggregationResult.max_temp + '°C';\n} else if (aggregationResult.avg_temp > 30) {\n  aggregationResult.alertLevel = 'INFO';\n  aggregationResult.alert = 'ELEVATED_TEMPERATURE';\n  aggregationResult.alertMessage = '设备 ' + aggregationResult.deviceId + ' 平均温度为 ' + aggregationResult.avg_temp.toFixed(2) + '°C';\n}\n\n// 设置MQTT发布主题\nmetadata['responseTopic'] = 'analytics/aggregated/' + aggregationResult.deviceId;\nmetadata['responseQos'] = '1';\n\nreturn { msg: aggregationResult, metadata: metadata, msgType: 'AGGREGATED_DATA' };"
        },
        "additionalInfo": {
          "layoutX": 600,
          "layoutY": 100
        }
      },
      {
        "id": "mqtt_publisher",
        "type": "mqttClient",
        "name": "MQTT结果发布器",
        "configuration": {
          "server": "127.0.0.1:1883",
          "topic": "${metadata.responseTopic}",
          "qos": 1,
          "clientId": "rulego_analytics_publisher"
        },
        "additionalInfo": {
          "layoutX": 800,
          "layoutY": 100
        }
      },
      {
        "id": "raw_data_publisher",
        "type": "mqttClient",
        "name": "原始数据发布器",
        "configuration": {
          "server": "127.0.0.1:1883",
          "topic": "processed/sensors/${msg.deviceId}",
          "qos": 0,
          "clientId": "rulego_raw_publisher"
        },
        "additionalInfo": {
          "layoutX": 600,
          "layoutY": 250
        }
      },
      {
        "id": "error_handler",
        "type": "log",
        "name": "错误处理器",
        "configuration": {
          "jsScript": "return 'ERROR: ' + JSON.stringify(msg) + ', metadata: ' + JSON.stringify(metadata);"
        },
        "additionalInfo": {
          "layoutX": 200,
          "layoutY": 300
        }
      }
    ],
    "connections": [
      {
        "fromId": "data_validator",
        "toId": "stream_aggregator",
        "type": "Success"
      },
      {
        "fromId": "data_validator",
        "toId": "error_handler",
        "type": "Failure"
      },
      {
        "fromId": "stream_aggregator",
        "toId": "data_enricher",
        "type": "Success"
      },
      {
        "fromId": "stream_aggregator",
        "toId": "aggregation_processor",
        "type": "window_event"
      },
      {
        "fromId": "data_enricher",
        "toId": "raw_data_publisher",
        "type": "Success"
      },
      {
        "fromId": "aggregation_processor",
        "toId": "mqtt_publisher",
        "type": "Success"
      },
      {
        "fromId": "aggregation_processor",
        "toId": "error_handler",
        "type": "Failure"
      }
    ]
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168

# 2. Go应用程序代码(非必须,使用rulego-server可以0代码部署规则链)

创建 main.go 文件:

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"math/rand"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/rulego/rulego"
	"github.com/rulego/rulego/api/types"
	"github.com/rulego/rulego/api/types/endpoint"
	"github.com/rulego/rulego/utils/mqtt"

	// 导入扩展组件
	_ "github.com/rulego/rulego/components/external"
	_ "github.com/rulego/rulego-components/stats/streamsql"
)

// SensorData 传感器数据结构
type SensorData struct {
	DeviceID    string    `json:"deviceId"`
	Timestamp   time.Time `json:"timestamp"`
	Temperature float64   `json:"temperature"`
	Humidity    *float64  `json:"humidity,omitempty"`
	BatteryLevel *int     `json:"batteryLevel,omitempty"`
}

func main() {
	// 初始化RuleGo配置
	config := rulego.NewConfig()

	// 从DSL文件加载规则链
	ruleEngine, err := rulego.New("iot_stream_processor", loadRuleChainFromFile("iot_stream_processing_chain.json"), rulego.WithConfig(config))
	if err != nil {
		log.Fatal("Failed to create rule engine:", err)
	}

	// 启动MQTT端点
	mqttEndpoint, err := endpoint.Registry.New("endpoint/mqtt", config, types.Configuration{
		"server":   "127.0.0.1:1883",
		"username": "",
		"password": "",
		"qos":      1,
		"clientId": "rulego_sensor_subscriber",
	})
	if err != nil {
		log.Fatal("Failed to create MQTT endpoint:", err)
	}

	// 配置路由:将传感器数据路由到规则链
	router := endpoint.NewRouter().From("sensors/+/data").To("chain:iot_stream_processor").End()
	_, err = mqttEndpoint.AddRouter(router)
	if err != nil {
		log.Fatal("Failed to add router:", err)
	}

	// 启动MQTT端点
	if err := mqttEndpoint.Start(); err != nil {
		log.Fatal("Failed to start MQTT endpoint:", err)
	}

	log.Println("IoT Stream Processing System started successfully!")
	log.Println("Listening for sensor data on topics: sensors/+/data")
	log.Println("Publishing aggregated results to: analytics/aggregated/{deviceId}")
	log.Println("Publishing processed raw data to: processed/sensors/{deviceId}")

	// 启动模拟数据发送器(用于测试)
	go startDataSimulator()

	// 等待中断信号
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
	<-sigChan

	log.Println("Shutting down...")

	// 优雅关闭
	if err := mqttEndpoint.Destroy(); err != nil {
		log.Printf("Error during MQTT endpoint shutdown: %v", err)
	}

	log.Println("System shutdown complete")
}

// loadRuleChainFromFile 从文件加载规则链配置
func loadRuleChainFromFile(filename string) []byte {
	data, err := os.ReadFile(filename)
	if err != nil {
		log.Fatal("Failed to read rule chain file:", err)
	}
	return data
}

// startDataSimulator 启动数据模拟器,用于测试
func startDataSimulator() {
	time.Sleep(2 * time.Second) // 等待系统启动完成

	// 创建MQTT客户端用于发送测试数据
	client, err := mqtt.NewClient(context.Background(), mqtt.Config{
		Server:   "127.0.0.1:1883",
		ClientID: "data_simulator",
	})
	if err != nil {
		log.Printf("Failed to create simulator MQTT client: %v", err)
		return
	}

	devices := []string{"sensor_001", "sensor_002", "sensor_003"}
	ticker := time.NewTicker(5 * time.Second)
	defer ticker.Stop()

	log.Println("Data simulator started, sending sensor data every 5 seconds...")

	for {
		select {
		case <-ticker.C:
			// 为每个设备生成并发送数据
			for _, deviceID := range devices {
				data := generateSensorData(deviceID)
				topic := fmt.Sprintf("sensors/%s/data", deviceID)
				
				jsonData, _ := json.Marshal(data)
				err := client.Publish(topic, 1, string(jsonData))
				if err != nil {
					log.Printf("Failed to publish data for %s: %v", deviceID, err)
				} else {
					log.Printf("Published data for %s: temp=%.1f°C", deviceID, data.Temperature)
				}
			}
		}
	}
}

// generateSensorData 生成模拟传感器数据
func generateSensorData(deviceID string) SensorData {
	// 生成随机温度数据(15-40°C)
	temperature := 15 + rand.Float64()*25
	
	// 生成随机湿度数据(30-80%)
	humidity := 30 + rand.Float64()*50
	
	// 生成随机电池电量(20-100%)
	batteryLevel := 20 + rand.Intn(81)

	return SensorData{
		DeviceID:     deviceID,
		Timestamp:    time.Now(),
		Temperature:  temperature,
		Humidity:     &humidity,
		BatteryLevel: &batteryLevel,
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157

# 3. 依赖管理

创建 go.mod 文件:

module iot-stream-processing

go 1.21

require (
	github.com/rulego/rulego v0.32.0
	github.com/rulego/rulego-components v0.7.0
	github.com/rulego/streamsql v0.1.0
)
1
2
3
4
5
6
7
8
9

# 4. 运行示例

# 方式一:使用 RuleGo-Server(推荐)

RuleGo-Server 是一个独立的服务器应用,可以直接加载和运行规则链DSL文件,无需编写任何代码。

  1. 启动MQTT Broker:

    # 使用Docker启动Mosquitto
    docker run -it -p 1883:1883 eclipse-mosquitto
    
    1
    2
  2. 下载并启动 RuleGo-Server:

    # 下载 RuleGo-Server
    https://github.com/rulego/rulego/releases
    
    # 将规则链DSL文件放入 rules 目录
    cp iot_stream_processing_chain.json ./data/workflows/admin/rules/
    
    # 启动服务器
    ./rulego-server
    
    1
    2
    3
    4
    5
    6
    7
    8
  3. 监控输出数据:

    # 监控聚合结果
    mosquitto_sub -h 127.0.0.1 -t "analytics/aggregated/+"
    
    # 监控处理后的原始数据
    mosquitto_sub -h 127.0.0.1 -t "processed/sensors/+"
    
    1
    2
    3
    4
    5

# 方式二:使用 RuleGo-Server 可视化设计

RuleGo-Server 提供的RuleGo-Editor 是一个基于Web的可视化规则链编辑器,支持通过拖拽方式设计规则链。

  1. 启动 RuleGo-Server:

  2. 可视化设计规则链:

    • 打开浏览器访问 http://localhost:3000
    • 从组件库拖拽 streamAggregator 和 streamTransform 组件
    • 配置节点属性和连接关系
    • 点击保存规则链DSL文件,立刻生效

# 方式三:编程方式(自定义开发)

  1. 运行应用程序:
    go mod tidy
    go run main.go
    
    1
    2

# 数据流说明

# 输入数据格式

{
  "deviceId": "sensor_001",
  "timestamp": "2024-01-15T10:30:00Z",
  "temperature": 23.5,
  "humidity": 65.2,
  "batteryLevel": 85
}
1
2
3
4
5
6
7

# 聚合结果格式

{
  "deviceId": "sensor_001",
  "avg_temp": 23.2,
  "max_temp": 25.1,
  "min_temp": 21.8,
  "data_count": 6,
  "avg_humidity": 64.5,
  "aggregatedAt": "2024-01-15T10:30:30Z",
  "windowType": "tumbling_30s",
  "alertLevel": "INFO",
  "alert": "NORMAL_OPERATION"
}
1
2
3
4
5
6
7
8
9
10
11
12

# 核心特性

# 1. 实时流式聚合

  • 时间窗口:30秒滚动窗口聚合
  • 聚合指标:平均值、最大值、最小值、计数
  • 分组聚合:按设备ID分组

# 2. 数据验证与过滤

  • 验证必需字段(deviceId、timestamp、temperature)
  • 数据范围检查(温度:-50°C到100°C,湿度:0%到100%)
  • 异常数据过滤

# 3. 智能告警

  • 高温告警:最高温度超过35°C
  • 温度提醒:平均温度超过30°C
  • 多级告警:INFO、WARNING级别

# 4. 双路输出

  • 聚合数据:发布到 analytics/aggregated/{deviceId}
  • 原始数据:发布到 processed/sensors/{deviceId}

# 5. 错误处理

  • 数据验证失败处理
  • 聚合处理异常处理
  • 详细错误日志记录

# 扩展配置

# 修改聚合窗口

{
  "sql": "SELECT deviceId, AVG(temperature) as avg_temp FROM stream GROUP BY deviceId, TumblingWindow('1m')"
}
1
2
3

# 添加更多聚合指标

{
  "sql": "SELECT deviceId, AVG(temperature) as avg_temp, STDDEV(temperature) as temp_stddev, PERCENTILE(temperature, 95) as temp_p95 FROM stream GROUP BY deviceId, TumblingWindow('30s')"
}
1
2
3

# 滑动窗口聚合

{
  "sql": "SELECT deviceId, AVG(temperature) as avg_temp FROM stream GROUP BY deviceId, SlidingWindow('2m', '30s')"
}
1
2
3
在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/07/27, 15:17:27
API参考
聚合函数

← API参考 聚合函数→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式