RuleGo RuleGo
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • Quick Start

  • Rule Chain

  • Standard Components

  • Extension Components

  • Custom Components

  • Components marketplace

  • Visualization

  • AOP

  • Trigger

  • Advanced Topic

  • RuleGo-Server

  • FAQ

  • Endpoint Module

  • Support

  • StreamSQL

    • Overview
    • Quick Start
    • Core Concepts
    • SQL Reference
    • API Reference
    • RuleGo Integration
    • functions

    • case-studies

      • Case Studies Overview
      • Single Stream Data Merging
        • Business Scenario
          • Typical Scenarios
        • Data Model
          • Input Data Format
          • Expected Output Format
        • Solutions
          • Solution 1: Time Window-Based Data Merging
          • Solution 2: Latest Value Merging Using Custom Functions
          • Solution 3: Event-Driven Data Merging
        • Running Results
          • Solution 1 Output Example
          • Solution 2 Output Example
        • Performance Optimization Suggestions
          • 1. Window Size Tuning
          • 2. Memory Management
          • 3. Concurrency Optimization
        • Extended Applications
          • 1. Data Quality Check
          • 2. Anomaly Detection
        • Summary
      • Multi-Stream Data Merging
      • Change Data Capture
      • Real-time Data Analysis
      • Business Scenario Applications
目录

Single Stream Data Merging

# Single Stream Data Merging Case Study

# Business Scenario

In IoT scenarios, data from multiple sensors is often mixed in a single data stream, with each sensor having different collection frequencies and fragmented data. We need to merge data from related sensors for subsequent analysis.

# Typical Scenarios

  • Smart Factory: Temperature, humidity, and pressure sensor data from the same device are reported separately
  • Smart City: Air quality, noise, and traffic flow data from the same monitoring point
  • Smart Home: Temperature, humidity, lighting, and occupancy data from the same room

# Data Model

# Input Data Format

{
  "device_id": "sensor_001",
  "sensor_type": "temperature",
  "value": 25.6,
  "timestamp": "2024-01-15T10:30:00Z",
  "location": "workshop_A"
}

{
  "device_id": "sensor_001", 
  "sensor_type": "humidity",
  "value": 65.2,
  "timestamp": "2024-01-15T10:30:05Z",
  "location": "workshop_A"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# Expected Output Format

{
  "device_id": "sensor_001",
  "temperature": 25.6,
  "humidity": 65.2,
  "location": "workshop_A",
  "window_start": "2024-01-15T10:30:00Z",
  "data_count": 2
}
1
2
3
4
5
6
7
8

# Solutions

# Solution 1: Time Window-Based Data Merging

Solution Description: Use a tumbling window to merge different sensor data from the same device within a time window. Use conditional aggregation functions (CASE WHEN) to aggregate data from different sensor types into corresponding fields.

Applicable Scenarios:

  • Sensor data reporting frequency is relatively stable
  • Can tolerate certain data delay (window size)
  • Requires batch processing of data

Data Input:

[
  {"device_id": "sensor_001", "sensor_type": "temperature", "value": 25.6, "location": "workshop_A", "timestamp": "2024-01-15T10:30:00Z"},
  {"device_id": "sensor_001", "sensor_type": "humidity", "value": 65.2, "location": "workshop_A", "timestamp": "2024-01-15T10:30:05Z"},
  {"device_id": "sensor_001", "sensor_type": "pressure", "value": 1013.2, "location": "workshop_A", "timestamp": "2024-01-15T10:30:08Z"}
]
1
2
3
4
5

Expected Output:

{
  "device_id": "sensor_001",
  "location": "workshop_A",
  "temperature": 25.6,
  "humidity": 65.2,
  "pressure": 1013.2,
  "data_count": 3,
  "window_start": "2024-01-15T10:30:00Z"
}
1
2
3
4
5
6
7
8
9

# Solution 2: Latest Value Merging Using Custom Functions

Solution Description: Save the latest value for each sensor through custom functions to achieve real-time data merging. When new data arrives, immediately update the corresponding sensor value and output the complete device status.

Applicable Scenarios:

  • Requires real-time response
  • Sensor data reporting frequency is irregular
  • High requirements for data completeness

Data Input:

[
  {"device_id": "sensor_001", "sensor_type": "temperature", "value": 25.6, "timestamp": "2024-01-15T10:30:00Z"},
  {"device_id": "sensor_001", "sensor_type": "humidity", "value": 65.2, "timestamp": "2024-01-15T10:30:05Z"}
]
1
2
3
4

Expected Output:

{
  "device_id": "sensor_001",
  "temperature": 25.6,
  "humidity": 65.2,
  "last_update": "2024-01-15T10:30:05Z",
  "complete_data": false
}
1
2
3
4
5
6
7

# Solution 3: Event-Driven Data Merging

Solution Description: When specific sensor data is received, trigger the data merging logic. Use the window function LAG to compare the current value with the previous value, achieving event-based data processing.

Applicable Scenarios:

  • A specific sensor is the trigger condition
  • Need to detect data change trends
  • Response to specific events is required

Data Input:

[
  {"device_id": "sensor_main", "sensor_type": "temperature", "value": 25.0, "location": "main_hall", "timestamp": "2024-01-15T10:30:00Z"},
  {"device_id": "sensor_main", "sensor_type": "temperature", "value": 26.0, "location": "main_hall", "timestamp": "2024-01-15T10:30:05Z"}
]
1
2
3
4

Expected Output:

{
  "device_id": "sensor_main",
  "location": "main_hall",
  "temperature": 26.0,
  "prev_temperature": 25.0,
  "timestamp": "2024-01-15T10:30:05Z"
}
1
2
3
4
5
6
7

# Running Results

# Solution 1 Output Example

{
  "data_count": 3,
  "device_id": "sensor_001",
  "humidity": 65.2,
  "location": "workshop_A",
  "pressure": 1025.3,
  "temperature": 25.6,
  "window_start": "2024-01-15T10:30:00Z"
}
1
2
3
4
5
6
7
8
9

# Solution 2 Output Example

{
  "device_id": "device_A",
  "merged_data": {
    "humidity": 55,
    "last_update": "2024-01-15T10:30:15Z",
    "pressure": 1015,
    "temperature": 24
  }
}
1
2
3
4
5
6
7
8
9

# Performance Optimization Suggestions

# 1. Window Size Tuning

  • High-frequency data: Use smaller windows (5 seconds)
  • Low-frequency data: Use larger windows (30 seconds)

# 2. Memory Management

  • Regularly clean up expired data
  • Set reasonable data retention time
  • Use LRU cache strategy

# 3. Concurrency Optimization

  • Use read-write locks to improve concurrent performance
  • Shard storage to reduce lock contention
  • Use asynchronous processing to improve throughput

# Extended Applications

# 1. Data Quality Check

Check the data completeness of each device within the time window, identifying missing sensor types.

# 2. Anomaly Detection

Detect data stability of devices through statistical analysis, identifying abnormal fluctuations.

# Summary

Single stream data merging is a common requirement in stream processing. StreamSQL provides multiple solutions:

  1. Time Window Merging: Suitable for scenarios with relatively stable data frequency
  2. Custom Function Merging: Suitable for scenarios requiring real-time response
  3. Event-Driven Merging: Suitable for scenarios where a certain sensor is dominant

Considerations for choosing the appropriate solution:

  • Data frequency and latency requirements
  • Memory usage limitations
  • Business logic complexity
  • System concurrent performance requirements

Through reasonable design and optimization, efficient and stable data merging processing can be achieved.

Edit this page on GitHub (opens new window)
Last Updated: 2025/08/05, 02:24:31
Case Studies Overview
Multi-Stream Data Merging

← Case Studies Overview Multi-Stream Data Merging→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式