流聚合器
# streamAggregator
节点类型: x/streamAggregator
说明: 流聚合器节点,用于处理聚合SQL查询,如窗口聚合、分组聚合等。该组件基于 StreamSQL 引擎,支持滚动窗口、滑动窗口等多种窗口类型的聚合计算。支持单条数据和数组数据输入。
# 输入数据支持
该节点支持两种输入数据格式:
# 单条数据输入
直接处理单个JSON对象:
{"deviceId": "sensor001", "temperature": 25.5, "humidity": 60.2}
1
# 数组数据输入
自动处理JSON数组,将数组中的每个元素逐条添加到聚合流中:
[
{"deviceId": "sensor001", "temperature": 25.5, "humidity": 60.2},
{"deviceId": "sensor002", "temperature": 28.3, "humidity": 55.8},
{"deviceId": "sensor003", "temperature": 22.1, "humidity": 65.4}
]
1
2
3
4
5
2
3
4
5
数组处理说明
- 数组中的每个元素都会被逐条添加到聚合流中参与聚合计算
- 原始数组消息会通过Success链继续传递,保持数据流的连续性
- 聚合结果仍然通过window_event链传递
# 配置
字段 | 类型 | 说明 | 默认值 |
---|---|---|---|
sql | string | 聚合SQL查询语句,必须包含聚合函数(如COUNT、SUM、AVG、MAX、MIN)或窗口函数 | 无 |
# SQL语法支持
详细语法参考
完整的 SQL 语法说明请参考:StreamSQL SQL语法参考
# 关系类型
- Success: 原始消息成功处理后,通过此关系链传递原始消息
- window_event: 聚合结果通过此关系链传递,消息体为聚合计算的结果,结果格式是一个多列数组
- Failure: 处理失败时,通过此关系链传递错误信息
# 执行结果
# Success链输出
原始消息不变,继续传递给下一个节点。
# window_event链输出
聚合结果作为新消息传递,消息格式:
[
{
"field1": "value1",
"field2": "value2",
"count": 10,
"avg_temperature": 25.5
}
]
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# Failure链输出
错误信息,包含具体的错误描述。
# 配置示例
# 基础分组聚合
{
"id": "s1",
"type": "x/streamAggregator",
"name": "设备温度聚合",
"configuration": {
"sql": "SELECT deviceId, AVG(temperature) as avg_temp, MAX(temperature) as max_temp, COUNT(*) as count FROM stream GROUP BY deviceId, TumblingWindow('2s')"
}
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 滑动窗口聚合
{
"id": "s2",
"type": "x/streamAggregator",
"name": "滑动窗口分析",
"configuration": {
"sql": "SELECT AVG(temperature) as avg_temp, COUNT(*) as count FROM stream GROUP BY SlidingWindow('10s', '2s')"
}
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 多字段聚合
{
"id": "s3",
"type": "x/streamAggregator",
"name": "多维度聚合",
"configuration": {
"sql": "SELECT deviceType, location, AVG(temperature) as avg_temp, MIN(humidity) as min_humidity, MAX(pressure) as max_pressure FROM stream GROUP BY deviceType, location, TumblingWindow('5m')"
}
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 应用示例
# 示例1:设备状态监控
场景: 监控IoT设备的温度数据,每2秒计算一次各设备的平均温度和最大温度。
规则链配置:
{
"ruleChain": {
"id": "device_monitoring",
"name": "设备监控规则链",
"root": true
},
"metadata": {
"nodes": [
{
"id": "s1",
"type": "x/streamAggregator",
"name": "温度聚合",
"configuration": {
"sql": "SELECT deviceId, AVG(temperature) as avg_temp, MAX(temperature) as max_temp, COUNT(*) as count FROM stream GROUP BY deviceId, TumblingWindow('2s')"
}
},
{
"id": "s2",
"type": "jsTransform",
"name": "结果处理",
"configuration": {
"jsScript": "msg.timestamp = new Date().toISOString(); return {'msg': msg, 'metadata': metadata, 'msgType': msgType};"
}
},
{
"id": "s3",
"type": "log",
"name": "聚合结果日志",
"configuration": {
"jsScript": "return 'Aggregation Result: ' + JSON.stringify(msg);"
}
},
{
"id": "s4",
"type": "log",
"name": "原始数据日志",
"configuration": {
"jsScript": "return 'Original Data: ' + JSON.stringify(msg);"
}
}
],
"connections": [
{
"fromId": "s1",
"toId": "s2",
"type": "window_event"
},
{
"fromId": "s1",
"toId": "s4",
"type": "Success"
},
{
"fromId": "s2",
"toId": "s3",
"type": "Success"
}
]
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
输入数据:
{"deviceId": "device001", "temperature": 25.5, "timestamp": "2023-09-13T10:00:00Z"}
{"deviceId": "device001", "temperature": 26.2, "timestamp": "2023-09-13T10:00:01Z"}
{"deviceId": "device002", "temperature": 24.8, "timestamp": "2023-09-13T10:00:01Z"}
1
2
3
2
3
聚合结果输出:
{
"deviceId": "device001",
"avg_temp": 25.85,
"max_temp": 26.2,
"count": 2
}
1
2
3
4
5
6
2
3
4
5
6
# 示例2:高温报警系统
场景: 使用滑动窗口监控温度变化,当3秒内平均温度超过30度时触发报警。
规则链配置:
{
"ruleChain": {
"id": "temperature_alarm",
"name": "高温报警规则链",
"root": true
},
"metadata": {
"nodes": [
{
"id": "s1",
"type": "x/streamAggregator",
"name": "温度滑动窗口",
"configuration": {
"sql": "SELECT AVG(temperature) as avg_temp, MAX(temperature) as max_temp, COUNT(*) as count FROM stream GROUP BY SlidingWindow('3s', '1s')"
}
},
{
"id": "s2",
"type": "jsFilter",
"name": "高温过滤",
"configuration": {
"jsScript": "return msg.avg_temp > 30;"
}
},
{
"id": "s3",
"type": "jsTransform",
"name": "报警信息",
"configuration": {
"jsScript": "msg.alert = 'High temperature detected!'; msg.level = 'WARNING'; return {'msg': msg, 'metadata': metadata, 'msgType': 'ALARM'};"
}
},
{
"id": "s4",
"type": "log",
"name": "报警日志",
"configuration": {
"jsScript": "return 'ALARM: ' + JSON.stringify(msg);"
}
}
],
"connections": [
{
"fromId": "s1",
"toId": "s2",
"type": "window_event"
},
{
"fromId": "s2",
"toId": "s3",
"type": "True"
},
{
"fromId": "s3",
"toId": "s4",
"type": "Success"
}
]
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# 示例3:数组数据批量聚合
场景: 处理包含多个设备数据的数组消息,进行批量聚合分析。
输入数据:
[
{"deviceId": "sensor001", "temperature": 25.5, "location": "room1"},
{"deviceId": "sensor002", "temperature": 28.3, "location": "room1"},
{"deviceId": "sensor003", "temperature": 22.1, "location": "room2"},
{"deviceId": "sensor004", "temperature": 30.8, "location": "room2"}
]
1
2
3
4
5
6
2
3
4
5
6
规则链配置:
{
"ruleChain": {
"id": "batch_aggregation",
"name": "批量数据聚合",
"root": true
},
"metadata": {
"nodes": [
{
"id": "s1",
"type": "x/streamAggregator",
"name": "按位置聚合",
"configuration": {
"sql": "SELECT location, AVG(temperature) as avg_temp, MAX(temperature) as max_temp, COUNT(*) as device_count FROM stream GROUP BY location, TumblingWindow('5s')"
}
},
{
"id": "s2",
"type": "log",
"name": "聚合结果",
"configuration": {
"jsScript": "return 'Location Aggregation: ' + JSON.stringify(msg);"
}
},
{
"id": "s3",
"type": "log",
"name": "原始数组",
"configuration": {
"jsScript": "return 'Original Array: ' + JSON.stringify(msg);"
}
}
],
"connections": [
{
"fromId": "s1",
"toId": "s2",
"type": "window_event"
},
{
"fromId": "s1",
"toId": "s3",
"type": "Success"
}
]
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
聚合结果输出:
[
{"location": "room1", "avg_temp": 26.9, "max_temp": 28.3, "device_count": 2},
{"location": "room2", "avg_temp": 26.45, "max_temp": 30.8, "device_count": 2}
]
1
2
3
4
2
3
4
# 注意事项
- SQL语法限制: 只支持聚合查询,不支持非聚合的SELECT语句
- 窗口类型: 必须在GROUP BY子句中指定窗口函数
- 性能考虑: 窗口大小和滑动间隔会影响内存使用和计算性能
- 数据类型: 确保聚合字段的数据类型支持相应的聚合函数
- 数组处理: 数组中的每个元素都会逐条添加到聚合流中,原始数组消息通过Success链传递
- 窗口事件回调: 窗口事件触发的结束回调需要通过
Config.OnEnd
设置,而不是通过OnMsg
注册的OnEnd
回调。这是因为窗口事件是由聚合器内部触发的,不会经过常规的消息处理流程
# 窗口事件回调示例
正确的方式 - 使用 Config.OnEnd:
// 设置全局聚合结果处理器
config.OnEnd = func(ctx types.RuleContext, msg types.RuleMsg, err error, relationType string) {
if err == nil && msg.Type == WindowEventMsgType {
// 处理窗口聚合结果
var result map[string]interface{}
if jsonErr := json.Unmarshal([]byte(msg.Data.String()), &result); jsonErr == nil {
// 处理聚合结果
fmt.Printf("聚合结果: %+v\n", result)
}
}
}
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
错误的方式 - 使用 OnMsg 的 OnEnd:
// 这种方式无法捕获窗口事件
ruleEngine.OnMsg(msg, types.WithOnEnd(func(ctx types.RuleContext, msg types.RuleMsg, err error, relationType string) {
// 窗口事件不会触发这个回调
}))
1
2
3
4
2
3
4
在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/07/27, 15:17:27