单流数据合并
# 单流数据合并案例
# 业务场景
在物联网场景中,多个传感器的数据往往混杂在一个数据流中,每个传感器的采集频率不同,数据较为碎片化。我们需要将相关传感器的数据合并,以便后续分析。
# 典型场景
- 智能工厂:同一设备的温度、湿度、压力传感器数据分别上报
- 智慧城市:同一监测点的空气质量、噪音、交通流量数据
- 智能家居:同一房间的温湿度、光照、人体感应数据
# 数据模型
# 输入数据格式
{
"device_id": "sensor_001",
"sensor_type": "temperature",
"value": 25.6,
"timestamp": "2024-01-15T10:30:00Z",
"location": "workshop_A"
}
{
"device_id": "sensor_001",
"sensor_type": "humidity",
"value": 65.2,
"timestamp": "2024-01-15T10:30:05Z",
"location": "workshop_A"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 期望输出格式
{
"device_id": "sensor_001",
"temperature": 25.6,
"humidity": 65.2,
"location": "workshop_A",
"window_start": "2024-01-15T10:30:00Z",
"data_count": 2
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 解决方案
# 方案一:基于时间窗口的数据合并
方案说明: 使用滚动窗口将同一设备在时间窗口内的不同传感器数据合并。通过条件聚合函数(CASE WHEN)将不同传感器类型的数据分别聚合到对应字段。
适用场景:
- 传感器数据上报频率相对稳定
- 可以容忍一定的数据延迟(窗口大小)
- 需要批量处理数据
数据输入:
[
{"device_id": "sensor_001", "sensor_type": "temperature", "value": 25.6, "location": "workshop_A", "timestamp": "2024-01-15T10:30:00Z"},
{"device_id": "sensor_001", "sensor_type": "humidity", "value": 65.2, "location": "workshop_A", "timestamp": "2024-01-15T10:30:05Z"},
{"device_id": "sensor_001", "sensor_type": "pressure", "value": 1013.2, "location": "workshop_A", "timestamp": "2024-01-15T10:30:08Z"}
]
1
2
3
4
5
2
3
4
5
期望输出:
{
"device_id": "sensor_001",
"location": "workshop_A",
"temperature": 25.6,
"humidity": 65.2,
"pressure": 1013.2,
"data_count": 3,
"window_start": "2024-01-15T10:30:00Z"
}
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 方案二:使用自定义函数实现最新值合并
方案说明: 通过自定义函数保存每个传感器的最新值,实现实时数据合并。当有新数据到达时,立即更新对应传感器的值并输出完整的设备状态。
适用场景:
- 需要实时响应
- 传感器数据上报频率不规律
- 对数据完整性要求较高
数据输入:
[
{"device_id": "sensor_001", "sensor_type": "temperature", "value": 25.6, "timestamp": "2024-01-15T10:30:00Z"},
{"device_id": "sensor_001", "sensor_type": "humidity", "value": 65.2, "timestamp": "2024-01-15T10:30:05Z"}
]
1
2
3
4
2
3
4
期望输出:
{
"device_id": "sensor_001",
"temperature": 25.6,
"humidity": 65.2,
"last_update": "2024-01-15T10:30:05Z",
"complete_data": false
}
1
2
3
4
5
6
7
2
3
4
5
6
7
# 方案三:基于事件驱动的数据合并
方案说明: 当收到特定传感器数据时,触发数据合并逻辑。使用窗口函数LAG来比较当前值与前一个值,实现基于事件的数据处理。
适用场景:
- 以某个主要传感器为触发条件
- 需要检测数据变化趋势
- 对特定事件进行响应
数据输入:
[
{"device_id": "sensor_main", "sensor_type": "temperature", "value": 25.0, "location": "main_hall", "timestamp": "2024-01-15T10:30:00Z"},
{"device_id": "sensor_main", "sensor_type": "temperature", "value": 26.0, "location": "main_hall", "timestamp": "2024-01-15T10:30:05Z"}
]
1
2
3
4
2
3
4
期望输出:
{
"device_id": "sensor_main",
"location": "main_hall",
"temperature": 26.0,
"prev_temperature": 25.0,
"timestamp": "2024-01-15T10:30:05Z"
}
1
2
3
4
5
6
7
2
3
4
5
6
7
## 运行结果
### 方案一输出示例
```json
{
"data_count": 3,
"device_id": "sensor_001",
"humidity": 65.2,
"location": "workshop_A",
"pressure": 1025.3,
"temperature": 25.6,
"window_start": "2024-01-15T10:30:00Z"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
# 方案二输出示例
{
"device_id": "device_A",
"merged_data": {
"humidity": 55,
"last_update": "2024-01-15T10:30:15Z",
"pressure": 1015,
"temperature": 24
}
}
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 性能优化建议
# 1. 窗口大小调优
- 高频数据:使用较小窗口(5秒)
- 低频数据:使用较大窗口(30秒)
# 2. 内存管理
- 定期清理过期数据
- 设置合理的数据保留时间
- 使用LRU缓存策略
# 3. 并发优化
- 使用读写锁提高并发性能
- 分片存储减少锁竞争
- 异步处理提高吞吐量
# 扩展应用
# 1. 数据质量检查
检查每个设备在时间窗口内的数据完整性,识别缺失的传感器类型。
# 2. 异常检测
通过统计分析检测设备数据的稳定性,识别异常波动。
# 总结
单流数据合并是流处理中的常见需求,StreamSQL提供了多种解决方案:
- 时间窗口合并:适用于数据频率相对稳定的场景
- 自定义函数合并:适用于需要实时响应的场景
- 事件驱动合并:适用于以某个传感器为主导的场景
选择合适的方案需要考虑:
- 数据频率和时延要求
- 内存使用限制
- 业务逻辑复杂度
- 系统并发性能要求
通过合理的设计和优化,可以实现高效、稳定的数据合并处理。
在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/07/27, 12:06:54