Single Stream Data Merging
# Single Stream Data Merging Case Study
# Business Scenario
In IoT scenarios, data from multiple sensors is often mixed in a single data stream, with each sensor having different collection frequencies and fragmented data. We need to merge data from related sensors for subsequent analysis.
# Typical Scenarios
- Smart Factory: Temperature, humidity, and pressure sensor data from the same device are reported separately
- Smart City: Air quality, noise, and traffic flow data from the same monitoring point
- Smart Home: Temperature, humidity, lighting, and occupancy data from the same room
# Data Model
# Input Data Format
{
"device_id": "sensor_001",
"sensor_type": "temperature",
"value": 25.6,
"timestamp": "2024-01-15T10:30:00Z",
"location": "workshop_A"
}
{
"device_id": "sensor_001",
"sensor_type": "humidity",
"value": 65.2,
"timestamp": "2024-01-15T10:30:05Z",
"location": "workshop_A"
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Expected Output Format
{
"device_id": "sensor_001",
"temperature": 25.6,
"humidity": 65.2,
"location": "workshop_A",
"window_start": "2024-01-15T10:30:00Z",
"data_count": 2
}
2
3
4
5
6
7
8
# Solutions
# Solution 1: Time Window-Based Data Merging
Solution Description: Use a tumbling window to merge different sensor data from the same device within a time window. Use conditional aggregation functions (CASE WHEN) to aggregate data from different sensor types into corresponding fields.
Applicable Scenarios:
- Sensor data reporting frequency is relatively stable
- Can tolerate certain data delay (window size)
- Requires batch processing of data
Data Input:
[
{"device_id": "sensor_001", "sensor_type": "temperature", "value": 25.6, "location": "workshop_A", "timestamp": "2024-01-15T10:30:00Z"},
{"device_id": "sensor_001", "sensor_type": "humidity", "value": 65.2, "location": "workshop_A", "timestamp": "2024-01-15T10:30:05Z"},
{"device_id": "sensor_001", "sensor_type": "pressure", "value": 1013.2, "location": "workshop_A", "timestamp": "2024-01-15T10:30:08Z"}
]
2
3
4
5
Expected Output:
{
"device_id": "sensor_001",
"location": "workshop_A",
"temperature": 25.6,
"humidity": 65.2,
"pressure": 1013.2,
"data_count": 3,
"window_start": "2024-01-15T10:30:00Z"
}
2
3
4
5
6
7
8
9
# Solution 2: Latest Value Merging Using Custom Functions
Solution Description: Save the latest value for each sensor through custom functions to achieve real-time data merging. When new data arrives, immediately update the corresponding sensor value and output the complete device status.
Applicable Scenarios:
- Requires real-time response
- Sensor data reporting frequency is irregular
- High requirements for data completeness
Data Input:
[
{"device_id": "sensor_001", "sensor_type": "temperature", "value": 25.6, "timestamp": "2024-01-15T10:30:00Z"},
{"device_id": "sensor_001", "sensor_type": "humidity", "value": 65.2, "timestamp": "2024-01-15T10:30:05Z"}
]
2
3
4
Expected Output:
{
"device_id": "sensor_001",
"temperature": 25.6,
"humidity": 65.2,
"last_update": "2024-01-15T10:30:05Z",
"complete_data": false
}
2
3
4
5
6
7
# Solution 3: Event-Driven Data Merging
Solution Description: When specific sensor data is received, trigger the data merging logic. Use the window function LAG to compare the current value with the previous value, achieving event-based data processing.
Applicable Scenarios:
- A specific sensor is the trigger condition
- Need to detect data change trends
- Response to specific events is required
Data Input:
[
{"device_id": "sensor_main", "sensor_type": "temperature", "value": 25.0, "location": "main_hall", "timestamp": "2024-01-15T10:30:00Z"},
{"device_id": "sensor_main", "sensor_type": "temperature", "value": 26.0, "location": "main_hall", "timestamp": "2024-01-15T10:30:05Z"}
]
2
3
4
Expected Output:
{
"device_id": "sensor_main",
"location": "main_hall",
"temperature": 26.0,
"prev_temperature": 25.0,
"timestamp": "2024-01-15T10:30:05Z"
}
2
3
4
5
6
7
# Running Results
# Solution 1 Output Example
{
"data_count": 3,
"device_id": "sensor_001",
"humidity": 65.2,
"location": "workshop_A",
"pressure": 1025.3,
"temperature": 25.6,
"window_start": "2024-01-15T10:30:00Z"
}
2
3
4
5
6
7
8
9
# Solution 2 Output Example
{
"device_id": "device_A",
"merged_data": {
"humidity": 55,
"last_update": "2024-01-15T10:30:15Z",
"pressure": 1015,
"temperature": 24
}
}
2
3
4
5
6
7
8
9
# Performance Optimization Suggestions
# 1. Window Size Tuning
- High-frequency data: Use smaller windows (5 seconds)
- Low-frequency data: Use larger windows (30 seconds)
# 2. Memory Management
- Regularly clean up expired data
- Set reasonable data retention time
- Use LRU cache strategy
# 3. Concurrency Optimization
- Use read-write locks to improve concurrent performance
- Shard storage to reduce lock contention
- Use asynchronous processing to improve throughput
# Extended Applications
# 1. Data Quality Check
Check the data completeness of each device within the time window, identifying missing sensor types.
# 2. Anomaly Detection
Detect data stability of devices through statistical analysis, identifying abnormal fluctuations.
# Summary
Single stream data merging is a common requirement in stream processing. StreamSQL provides multiple solutions:
- Time Window Merging: Suitable for scenarios with relatively stable data frequency
- Custom Function Merging: Suitable for scenarios requiring real-time response
- Event-Driven Merging: Suitable for scenarios where a certain sensor is dominant
Considerations for choosing the appropriate solution:
- Data frequency and latency requirements
- Memory usage limitations
- Business logic complexity
- System concurrent performance requirements
Through reasonable design and optimization, efficient and stable data merging processing can be achieved.