RuleGo RuleGo
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • 快速入门

  • 规则链

  • 标准组件

  • 扩展组件

    • 扩展组件概述
    • 过滤器

    • 转换器

    • 外部的

    • AI

    • CI

    • IoT

    • 流式计算

      • 流式计算
      • 流聚合器
        • 输入数据支持
          • 单条数据输入
          • 数组数据输入
        • 配置
        • SQL语法支持
        • 关系类型
        • 执行结果
          • Success链输出
          • window_event链输出
          • Failure链输出
        • 配置示例
          • 基础分组聚合
          • 滑动窗口聚合
          • 多字段聚合
        • 应用示例
          • 示例1:设备状态监控
          • 示例2:高温报警系统
          • 示例3:数组数据批量聚合
        • 注意事项
          • 窗口事件回调示例
      • 流转换器
  • 自定义组件

  • 组件市场

  • 可视化

  • AOP

  • 触发器

  • 高级主题

  • RuleGo-Server

  • 问题

目录

流聚合器

# streamAggregator

节点类型: x/streamAggregator

说明: 流聚合器节点,用于处理聚合SQL查询,如窗口聚合、分组聚合等。该组件基于 StreamSQL 引擎,支持滚动窗口、滑动窗口等多种窗口类型的聚合计算。支持单条数据和数组数据输入。

# 输入数据支持

该节点支持两种输入数据格式:

# 单条数据输入

直接处理单个JSON对象:

{"deviceId": "sensor001", "temperature": 25.5, "humidity": 60.2}
1

# 数组数据输入

自动处理JSON数组,将数组中的每个元素逐条添加到聚合流中:

[
  {"deviceId": "sensor001", "temperature": 25.5, "humidity": 60.2},
  {"deviceId": "sensor002", "temperature": 28.3, "humidity": 55.8},
  {"deviceId": "sensor003", "temperature": 22.1, "humidity": 65.4}
]
1
2
3
4
5

数组处理说明

  • 数组中的每个元素都会被逐条添加到聚合流中参与聚合计算
  • 原始数组消息会通过Success链继续传递,保持数据流的连续性
  • 聚合结果仍然通过window_event链传递

# 配置

字段 类型 说明 默认值
sql string 聚合SQL查询语句,必须包含聚合函数(如COUNT、SUM、AVG、MAX、MIN)或窗口函数 无

# SQL语法支持

详细语法参考

完整的 SQL 语法说明请参考:StreamSQL SQL语法参考

# 关系类型

  • Success: 原始消息成功处理后,通过此关系链传递原始消息
  • window_event: 聚合结果通过此关系链传递,消息体为聚合计算的结果,结果格式是一个多列数组
  • Failure: 处理失败时,通过此关系链传递错误信息

# 执行结果

# Success链输出

原始消息不变,继续传递给下一个节点。

# window_event链输出

聚合结果作为新消息传递,消息格式:

[
  {
    "field1": "value1",
    "field2": "value2",
    "count": 10,
    "avg_temperature": 25.5
  }
]
1
2
3
4
5
6
7
8

# Failure链输出

错误信息,包含具体的错误描述。

# 配置示例

# 基础分组聚合

{
  "id": "s1",
  "type": "x/streamAggregator",
  "name": "设备温度聚合",
  "configuration": {
    "sql": "SELECT deviceId, AVG(temperature) as avg_temp, MAX(temperature) as max_temp, COUNT(*) as count FROM stream GROUP BY deviceId, TumblingWindow('2s')"
  }
}
1
2
3
4
5
6
7
8

# 滑动窗口聚合

{
  "id": "s2",
  "type": "x/streamAggregator",
  "name": "滑动窗口分析",
  "configuration": {
    "sql": "SELECT AVG(temperature) as avg_temp, COUNT(*) as count FROM stream GROUP BY SlidingWindow('10s', '2s')"
  }
}
1
2
3
4
5
6
7
8

# 多字段聚合

{
  "id": "s3",
  "type": "x/streamAggregator",
  "name": "多维度聚合",
  "configuration": {
    "sql": "SELECT deviceType, location, AVG(temperature) as avg_temp, MIN(humidity) as min_humidity, MAX(pressure) as max_pressure FROM stream GROUP BY deviceType, location, TumblingWindow('5m')"
  }
}
1
2
3
4
5
6
7
8

# 应用示例

# 示例1:设备状态监控

场景: 监控IoT设备的温度数据,每2秒计算一次各设备的平均温度和最大温度。

规则链配置:

{
  "ruleChain": {
    "id": "device_monitoring",
    "name": "设备监控规则链",
    "root": true
  },
  "metadata": {
    "nodes": [
      {
        "id": "s1",
        "type": "x/streamAggregator",
        "name": "温度聚合",
        "configuration": {
          "sql": "SELECT deviceId, AVG(temperature) as avg_temp, MAX(temperature) as max_temp, COUNT(*) as count FROM stream GROUP BY deviceId, TumblingWindow('2s')"
        }
      },
      {
        "id": "s2",
        "type": "jsTransform",
        "name": "结果处理",
        "configuration": {
          "jsScript": "msg.timestamp = new Date().toISOString(); return {'msg': msg, 'metadata': metadata, 'msgType': msgType};"
        }
      },
      {
        "id": "s3",
        "type": "log",
        "name": "聚合结果日志",
        "configuration": {
          "jsScript": "return 'Aggregation Result: ' + JSON.stringify(msg);"
        }
      },
      {
        "id": "s4",
        "type": "log",
        "name": "原始数据日志",
        "configuration": {
          "jsScript": "return 'Original Data: ' + JSON.stringify(msg);"
        }
      }
    ],
    "connections": [
      {
        "fromId": "s1",
        "toId": "s2",
        "type": "window_event"
      },
      {
        "fromId": "s1",
        "toId": "s4",
        "type": "Success"
      },
      {
        "fromId": "s2",
        "toId": "s3",
        "type": "Success"
      }
    ]
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

输入数据:

{"deviceId": "device001", "temperature": 25.5, "timestamp": "2023-09-13T10:00:00Z"}
{"deviceId": "device001", "temperature": 26.2, "timestamp": "2023-09-13T10:00:01Z"}
{"deviceId": "device002", "temperature": 24.8, "timestamp": "2023-09-13T10:00:01Z"}
1
2
3

聚合结果输出:

{
  "deviceId": "device001",
  "avg_temp": 25.85,
  "max_temp": 26.2,
  "count": 2
}
1
2
3
4
5
6

# 示例2:高温报警系统

场景: 使用滑动窗口监控温度变化,当3秒内平均温度超过30度时触发报警。

规则链配置:

{
  "ruleChain": {
    "id": "temperature_alarm",
    "name": "高温报警规则链",
    "root": true
  },
  "metadata": {
    "nodes": [
      {
        "id": "s1",
        "type": "x/streamAggregator",
        "name": "温度滑动窗口",
        "configuration": {
          "sql": "SELECT AVG(temperature) as avg_temp, MAX(temperature) as max_temp, COUNT(*) as count FROM stream GROUP BY SlidingWindow('3s', '1s')"
        }
      },
      {
        "id": "s2",
        "type": "jsFilter",
        "name": "高温过滤",
        "configuration": {
          "jsScript": "return msg.avg_temp > 30;"
        }
      },
      {
        "id": "s3",
        "type": "jsTransform",
        "name": "报警信息",
        "configuration": {
          "jsScript": "msg.alert = 'High temperature detected!'; msg.level = 'WARNING'; return {'msg': msg, 'metadata': metadata, 'msgType': 'ALARM'};"
        }
      },
      {
        "id": "s4",
        "type": "log",
        "name": "报警日志",
        "configuration": {
          "jsScript": "return 'ALARM: ' + JSON.stringify(msg);"
        }
      }
    ],
    "connections": [
      {
        "fromId": "s1",
        "toId": "s2",
        "type": "window_event"
      },
      {
        "fromId": "s2",
        "toId": "s3",
        "type": "True"
      },
      {
        "fromId": "s3",
        "toId": "s4",
        "type": "Success"
      }
    ]
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

# 示例3:数组数据批量聚合

场景: 处理包含多个设备数据的数组消息,进行批量聚合分析。

输入数据:

[
  {"deviceId": "sensor001", "temperature": 25.5, "location": "room1"},
  {"deviceId": "sensor002", "temperature": 28.3, "location": "room1"},
  {"deviceId": "sensor003", "temperature": 22.1, "location": "room2"},
  {"deviceId": "sensor004", "temperature": 30.8, "location": "room2"}
]
1
2
3
4
5
6

规则链配置:

{
  "ruleChain": {
    "id": "batch_aggregation",
    "name": "批量数据聚合",
    "root": true
  },
  "metadata": {
    "nodes": [
      {
        "id": "s1",
        "type": "x/streamAggregator",
        "name": "按位置聚合",
        "configuration": {
          "sql": "SELECT location, AVG(temperature) as avg_temp, MAX(temperature) as max_temp, COUNT(*) as device_count FROM stream GROUP BY location, TumblingWindow('5s')"
        }
      },
      {
        "id": "s2",
        "type": "log",
        "name": "聚合结果",
        "configuration": {
          "jsScript": "return 'Location Aggregation: ' + JSON.stringify(msg);"
        }
      },
      {
        "id": "s3",
        "type": "log",
        "name": "原始数组",
        "configuration": {
          "jsScript": "return 'Original Array: ' + JSON.stringify(msg);"
        }
      }
    ],
    "connections": [
      {
        "fromId": "s1",
        "toId": "s2",
        "type": "window_event"
      },
      {
        "fromId": "s1",
        "toId": "s3",
        "type": "Success"
      }
    ]
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

聚合结果输出:

[
  {"location": "room1", "avg_temp": 26.9, "max_temp": 28.3, "device_count": 2},
  {"location": "room2", "avg_temp": 26.45, "max_temp": 30.8, "device_count": 2}
]
1
2
3
4

# 注意事项

  1. SQL语法限制: 只支持聚合查询,不支持非聚合的SELECT语句
  2. 窗口类型: 必须在GROUP BY子句中指定窗口函数
  3. 性能考虑: 窗口大小和滑动间隔会影响内存使用和计算性能
  4. 数据类型: 确保聚合字段的数据类型支持相应的聚合函数
  5. 数组处理: 数组中的每个元素都会逐条添加到聚合流中,原始数组消息通过Success链传递
  6. 窗口事件回调: 窗口事件触发的结束回调需要通过 Config.OnEnd 设置,而不是通过 OnMsg 注册的 OnEnd 回调。这是因为窗口事件是由聚合器内部触发的,不会经过常规的消息处理流程

# 窗口事件回调示例

正确的方式 - 使用 Config.OnEnd:

// 设置全局聚合结果处理器
config.OnEnd = func(ctx types.RuleContext, msg types.RuleMsg, err error, relationType string) {
    if err == nil && msg.Type == WindowEventMsgType {
        // 处理窗口聚合结果
        var result map[string]interface{}
        if jsonErr := json.Unmarshal([]byte(msg.Data.String()), &result); jsonErr == nil {
            // 处理聚合结果
            fmt.Printf("聚合结果: %+v\n", result)
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11

错误的方式 - 使用 OnMsg 的 OnEnd:

// 这种方式无法捕获窗口事件
ruleEngine.OnMsg(msg, types.WithOnEnd(func(ctx types.RuleContext, msg types.RuleMsg, err error, relationType string) {
    // 窗口事件不会触发这个回调
}))
1
2
3
4
在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/07/27, 15:17:27
流式计算
流转换器

← 流式计算 流转换器→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式