RuleGo RuleGo
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • Quick Start

  • Rule Chain

  • Standard Components

  • Extension Components

  • Custom Components

  • Components marketplace

  • Visualization

  • AOP

  • Trigger

  • Advanced Topic

  • RuleGo-Server

  • FAQ

  • Endpoint Module

  • Support

  • StreamSQL

    • Overview
    • Quick Start
    • Core Concepts
    • SQL Reference
    • API Reference
    • RuleGo Integration
    • functions

    • case-studies

      • Case Studies Overview
      • Single Stream Data Merging
      • Multi-Stream Data Merging
        • Business Scenario
          • Typical Scenarios
        • Data Model
          • Input Data Format
          • Expected Output Format
        • Solutions
          • Solution 1: Stream JOIN-Based Data Merging
          • Solution 2: Message Queue-Based Data Merging
        • Advanced Features
          • 1. Watermark-Based Event Time Processing
          • 2. State Backend Configuration
          • 3. Parallelism Tuning
        • Performance Optimization
          • 1. Join Strategy Optimization
          • 2. State Management Optimization
          • 3. Memory Optimization
        • Best Practices
          • 1. Key Selection
          • 2. Window Design
          • 3. Error Handling
        • Running Results
          • Solution 1 Output Example
          • Solution 2 Output Example
        • Summary
      • Change Data Capture
      • Real-time Data Analysis
      • Business Scenario Applications
目录

Multi-Stream Data Merging

# Multi-Stream Data Merging Case Study

# Business Scenario

In enterprise-level applications, data often comes from multiple independent data sources, such as user behavior data from APPs, transaction data from e-commerce platforms, and inventory data from ERP systems. We need to merge these data streams in real-time to provide comprehensive business insights.

# Typical Scenarios

  • E-commerce Platform: Merge user browsing data, order data, and inventory data
  • Financial Services: Merge transaction data, user data, and risk control data
  • Smart City: Merge traffic data, weather data, and population flow data
  • Industrial Internet: Merge production data, quality data, and equipment status data

# Data Model

# Input Data Format

User Behavior Stream:

{
  "user_id": "user_001",
  "event_type": "page_view",
  "page_url": "/product/123",
  "timestamp": "2024-01-15T10:30:00Z"
}
1
2
3
4
5
6

Order Stream:

{
  "order_id": "order_123",
  "user_id": "user_001",
  "product_id": "prod_456",
  "amount": 99.99,
  "status": "paid",
  "timestamp": "2024-01-15T10:30:05Z"
}
1
2
3
4
5
6
7
8

Inventory Stream:

{
  "product_id": "prod_456",
  "stock_quantity": 100,
  "warehouse_id": "wh_001",
  "timestamp": "2024-01-15T10:30:10Z"
}
1
2
3
4
5
6

# Expected Output Format

{
  "user_id": "user_001",
  "order_id": "order_123",
  "product_id": "prod_456",
  "amount": 99.99,
  "page_view_count": 3,
  "stock_quantity": 100,
  "merge_timestamp": "2024-01-15T10:30:15Z"
}
1
2
3
4
5
6
7
8
9

# Solutions

# Solution 1: Stream JOIN-Based Data Merging

Solution Description: Use StreamSQL's JOIN operation to merge data from multiple streams based on key fields (such as user_id, product_id). Support INNER JOIN, LEFT JOIN, and other JOIN types.

Applicable Scenarios:

  • Data from multiple sources has clear association relationships
  • Data volume is moderate, and real-time requirements are high
  • Business logic is relatively simple, mainly key-value association

Data Input:

// User Stream
[
  {"user_id": "user_001", "username": "Alice", "timestamp": "2024-01-15T10:30:00Z"},
  {"user_id": "user_002", "username": "Bob", "timestamp": "2024-01-15T10:30:01Z"}
]

// Order Stream
[
  {"order_id": "order_123", "user_id": "user_001", "amount": 99.99, "timestamp": "2024-01-15T10:30:05Z"},
  {"order_id": "order_124", "user_id": "user_001", "amount": 199.99, "timestamp": "2024-01-15T10:30:10Z"}
]
1
2
3
4
5
6
7
8
9
10
11

Expected Output:

{
  "user_id": "user_001",
  "username": "Alice",
  "order_id": "order_123",
  "amount": 99.99,
  "merge_timestamp": "2024-01-15T10:30:05Z"
}
1
2
3
4
5
6
7

# Solution 2: Message Queue-Based Data Merging

Solution Description: Use message queues (such as Kafka) as an intermediate layer to buffer and coordinate data from multiple streams. Merge data through window operations and state management.

Applicable Scenarios:

  • Data volume is large, and real-time requirements are moderate
  • Data arrival time differences are large
  • Need to handle out-of-order data
  • Business logic is complex, requiring complex state management

Data Input:

// Stream 1: User Behavior
{
  "user_id": "user_001",
  "behavior": "login",
  "timestamp": "2024-01-15T10:30:00Z"
}

// Stream 2: Order Information
{
  "order_id": "order_123",
  "user_id": "user_001",
  "amount": 99.99,
  "timestamp": "2024-01-15T10:30:05Z"
}

// Stream 3: Product Information
{
  "product_id": "prod_456",
  "price": 99.99,
  "category": "electronics",
  "timestamp": "2024-01-15T10:30:10Z"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

Expected Output:

{
  "user_id": "user_001",
  "latest_behavior": "login",
  "orders": [
    {"order_id": "order_123", "amount": 99.99}
  ],
  "products": [
    {"product_id": "prod_456", "price": 99.99, "category": "electronics"}
  ],
  "merge_timestamp": "2024-01-15T10:30:15Z"
}
1
2
3
4
5
6
7
8
9
10
11

# Advanced Features

# 1. Watermark-Based Event Time Processing

Handle out-of-order data through watermark mechanisms, ensuring the correctness of data merging.

Configuration:

watermark:
  strategy: bounded-out-of-orderness
  max-out-of-orderness: 30s
1
2
3

# 2. State Backend Configuration

Optimize state storage and recovery mechanisms to improve system reliability.

Configuration:

state-backend:
  type: rocksdb
  checkpoint-interval: 60s
  min-pause-between-checkpoints: 30s
1
2
3
4

# 3. Parallelism Tuning

Improve processing performance by adjusting parallelism.

Configuration:

parallelism:
  source: 4
  sink: 2
  operator: 8
1
2
3
4

# Performance Optimization

# 1. Join Strategy Optimization

Broadcast Join:

  • Suitable for small table joins
  • Reduce network transmission
  • Improve join performance

Shuffle Join:

  • Suitable for large table joins
  • Ensure data distribution balance
  • Avoid data skew

# 2. State Management Optimization

State TTL Configuration:

SET 'state.ttl' = '24h';
1

State Cleanup Strategy:

  • Regular cleanup of expired state
  • Use incremental cleanup
  • Avoid full state scans

# 3. Memory Optimization

Memory Configuration:

memory:
  network-buffer: 32mb
  taskmanager.memory.process.size: 2gb
  taskmanager.memory.managed.fraction: 0.4
1
2
3
4

# Best Practices

# 1. Key Selection

  • Choose high-cardinality fields as join keys
  • Avoid using timestamp fields as join keys
  • Ensure key uniqueness

# 2. Window Design

  • Reasonably set window size based on business requirements
  • Consider data arrival delay
  • Balance real-time and accuracy

# 3. Error Handling

  • Handle null values and data format errors
  • Set appropriate retry mechanisms
  • Record and monitor error data

# Running Results

# Solution 1 Output Example

{
  "merged_data": {
    "user_id": "user_001",
    "username": "Alice",
    "orders": [
      {"order_id": "order_123", "amount": 99.99},
      {"order_id": "order_124", "amount": 199.99}
    ],
    "total_amount": 299.98
  },
  "merge_timestamp": "2024-01-15T10:30:15Z"
}
1
2
3
4
5
6
7
8
9
10
11
12

# Solution 2 Output Example

{
  "user_activity_summary": {
    "user_id": "user_001",
    "login_count": 3,
    "order_count": 2,
    "total_spent": 299.98,
    "avg_order_amount": 149.99,
    "last_activity": "2024-01-15T10:30:10Z"
  }
}
1
2
3
4
5
6
7
8
9
10

# Summary

Multi-stream data merging is one of the core functions of stream processing. StreamSQL provides flexible and powerful data merging capabilities:

  1. Stream JOIN: Suitable for scenarios with clear association relationships and moderate data volume
  2. Message Queue: Suitable for large data volume and complex business scenarios
  3. State Management: Ensure data consistency and reliability
  4. Performance Optimization: Improve system throughput through reasonable configuration

When choosing the appropriate solution, consider:

  • Data volume and real-time requirements
  • Business logic complexity
  • System resource constraints
  • Data quality and consistency requirements

Through reasonable design and optimization, efficient and stable multi-stream data merging can be achieved, providing strong support for real-time business decisions.

Edit this page on GitHub (opens new window)
Last Updated: 2025/08/05, 02:24:31
Single Stream Data Merging
Change Data Capture

← Single Stream Data Merging Change Data Capture→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式