RuleGo RuleGo
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • 概述
  • 快速开始
  • 核心概念
  • SQL参考
  • API参考
  • RuleGo集成
  • 函数

  • 案例集锦

    • 案例集锦概述
    • 单流数据合并
      • 业务场景
        • 典型场景
      • 数据模型
        • 输入数据格式
        • 期望输出格式
      • 解决方案
        • 方案一:基于时间窗口的数据合并
        • 方案二:使用自定义函数实现最新值合并
        • 方案三:基于事件驱动的数据合并
        • 方案二输出示例
      • 性能优化建议
        • 1. 窗口大小调优
        • 2. 内存管理
        • 3. 并发优化
      • 扩展应用
        • 1. 数据质量检查
        • 2. 异常检测
      • 总结
    • 多流数据合并
    • 变更数据捕获案例
    • 实时数据分析案例
    • 业务场景应用案例
目录

单流数据合并

# 单流数据合并案例

# 业务场景

在物联网场景中,多个传感器的数据往往混杂在一个数据流中,每个传感器的采集频率不同,数据较为碎片化。我们需要将相关传感器的数据合并,以便后续分析。

# 典型场景

  • 智能工厂:同一设备的温度、湿度、压力传感器数据分别上报
  • 智慧城市:同一监测点的空气质量、噪音、交通流量数据
  • 智能家居:同一房间的温湿度、光照、人体感应数据

# 数据模型

# 输入数据格式

{
  "device_id": "sensor_001",
  "sensor_type": "temperature",
  "value": 25.6,
  "timestamp": "2024-01-15T10:30:00Z",
  "location": "workshop_A"
}

{
  "device_id": "sensor_001", 
  "sensor_type": "humidity",
  "value": 65.2,
  "timestamp": "2024-01-15T10:30:05Z",
  "location": "workshop_A"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 期望输出格式

{
  "device_id": "sensor_001",
  "temperature": 25.6,
  "humidity": 65.2,
  "location": "workshop_A",
  "window_start": "2024-01-15T10:30:00Z",
  "data_count": 2
}
1
2
3
4
5
6
7
8

# 解决方案

# 方案一:基于时间窗口的数据合并

方案说明: 使用滚动窗口将同一设备在时间窗口内的不同传感器数据合并。通过条件聚合函数(CASE WHEN)将不同传感器类型的数据分别聚合到对应字段。

适用场景:

  • 传感器数据上报频率相对稳定
  • 可以容忍一定的数据延迟(窗口大小)
  • 需要批量处理数据

数据输入:

[
  {"device_id": "sensor_001", "sensor_type": "temperature", "value": 25.6, "location": "workshop_A", "timestamp": "2024-01-15T10:30:00Z"},
  {"device_id": "sensor_001", "sensor_type": "humidity", "value": 65.2, "location": "workshop_A", "timestamp": "2024-01-15T10:30:05Z"},
  {"device_id": "sensor_001", "sensor_type": "pressure", "value": 1013.2, "location": "workshop_A", "timestamp": "2024-01-15T10:30:08Z"}
]
1
2
3
4
5

期望输出:

{
  "device_id": "sensor_001",
  "location": "workshop_A",
  "temperature": 25.6,
  "humidity": 65.2,
  "pressure": 1013.2,
  "data_count": 3,
  "window_start": "2024-01-15T10:30:00Z"
}
1
2
3
4
5
6
7
8
9

# 方案二:使用自定义函数实现最新值合并

方案说明: 通过自定义函数保存每个传感器的最新值,实现实时数据合并。当有新数据到达时,立即更新对应传感器的值并输出完整的设备状态。

适用场景:

  • 需要实时响应
  • 传感器数据上报频率不规律
  • 对数据完整性要求较高

数据输入:

[
  {"device_id": "sensor_001", "sensor_type": "temperature", "value": 25.6, "timestamp": "2024-01-15T10:30:00Z"},
  {"device_id": "sensor_001", "sensor_type": "humidity", "value": 65.2, "timestamp": "2024-01-15T10:30:05Z"}
]
1
2
3
4

期望输出:

{
  "device_id": "sensor_001",
  "temperature": 25.6,
  "humidity": 65.2,
  "last_update": "2024-01-15T10:30:05Z",
  "complete_data": false
}
1
2
3
4
5
6
7

# 方案三:基于事件驱动的数据合并

方案说明: 当收到特定传感器数据时,触发数据合并逻辑。使用窗口函数LAG来比较当前值与前一个值,实现基于事件的数据处理。

适用场景:

  • 以某个主要传感器为触发条件
  • 需要检测数据变化趋势
  • 对特定事件进行响应

数据输入:

[
  {"device_id": "sensor_main", "sensor_type": "temperature", "value": 25.0, "location": "main_hall", "timestamp": "2024-01-15T10:30:00Z"},
  {"device_id": "sensor_main", "sensor_type": "temperature", "value": 26.0, "location": "main_hall", "timestamp": "2024-01-15T10:30:05Z"}
]
1
2
3
4

期望输出:

{
  "device_id": "sensor_main",
  "location": "main_hall",
  "temperature": 26.0,
  "prev_temperature": 25.0,
  "timestamp": "2024-01-15T10:30:05Z"
}
1
2
3
4
5
6
7

## 运行结果

### 方案一输出示例
```json
{
  "data_count": 3,
  "device_id": "sensor_001",
  "humidity": 65.2,
  "location": "workshop_A",
  "pressure": 1025.3,
  "temperature": 25.6,
  "window_start": "2024-01-15T10:30:00Z"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 方案二输出示例

{
  "device_id": "device_A",
  "merged_data": {
    "humidity": 55,
    "last_update": "2024-01-15T10:30:15Z",
    "pressure": 1015,
    "temperature": 24
  }
}
1
2
3
4
5
6
7
8
9

# 性能优化建议

# 1. 窗口大小调优

  • 高频数据:使用较小窗口(5秒)
  • 低频数据:使用较大窗口(30秒)

# 2. 内存管理

  • 定期清理过期数据
  • 设置合理的数据保留时间
  • 使用LRU缓存策略

# 3. 并发优化

  • 使用读写锁提高并发性能
  • 分片存储减少锁竞争
  • 异步处理提高吞吐量

# 扩展应用

# 1. 数据质量检查

检查每个设备在时间窗口内的数据完整性,识别缺失的传感器类型。

# 2. 异常检测

通过统计分析检测设备数据的稳定性,识别异常波动。

# 总结

单流数据合并是流处理中的常见需求,StreamSQL提供了多种解决方案:

  1. 时间窗口合并:适用于数据频率相对稳定的场景
  2. 自定义函数合并:适用于需要实时响应的场景
  3. 事件驱动合并:适用于以某个传感器为主导的场景

选择合适的方案需要考虑:

  • 数据频率和时延要求
  • 内存使用限制
  • 业务逻辑复杂度
  • 系统并发性能要求

通过合理的设计和优化,可以实现高效、稳定的数据合并处理。

在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/07/27, 12:06:54
案例集锦概述
多流数据合并

← 案例集锦概述 多流数据合并→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式