RuleGo RuleGo
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • Quick Start

  • Rule Chain

  • Standard Components

  • Extension Components

  • Custom Components

  • Components marketplace

  • Visualization

  • AOP

  • Trigger

  • Advanced Topic

  • RuleGo-Server

  • FAQ

  • Endpoint Module

  • Support

  • StreamSQL

    • Overview
    • Quick Start
    • Core Concepts
    • SQL Reference
    • API Reference
    • RuleGo Integration
    • functions

    • case-studies

      • Case Studies Overview
      • Single Stream Data Merging
      • Multi-Stream Data Merging
      • Change Data Capture
        • Business Scenario
          • Typical Scenarios
        • Data Model
          • Input Data Format
          • Expected Output Format
        • CDC Cases
          • 1. User Change Capture
          • 2. Order Change Capture
          • 3. Product Change Capture
          • 4. Inventory Change Capture
          • 5. Permission Change Capture
        • CDC Features
          • 1. Real-time Processing
          • 2. Data Integrity
          • 3. Flexible Processing
          • 4. Monitoring and Alerting
        • Data Compleness
          • 1. Change Detection
          • 2. Data Consistency
          • 3. Error Handling
        • Application Scenarios
          • 1. Data Warehouse Synchronization
          • 2. Cache Invalidation
          • 3. Audit Log Generation
          • 4. Real-time Analytics
        • Performance Optimization
          • 1. Batch Processing
          • 2. Parallel Processing
          • 3. State Management
        • Summary
      • Real-time Data Analysis
      • Business Scenario Applications
目录

Change Data Capture

# Change Data Capture (CDC) Case Study

# Business Scenario

In modern enterprise applications, database changes need to be captured and processed in real-time to support downstream systems such as data warehouses, cache updates, and audit logs. StreamSQL can be used to implement efficient change data capture (CDC) processing.

# Typical Scenarios

  • Data Warehouse Synchronization: Real-time synchronization of database changes to data warehouses
  • Cache Invalidation: Real-time update of cache data based on database changes
  • Audit Log: Record all database change operations for audit purposes
  • Real-time Analytics: Analyze business trends based on database change data
  • Event-Driven Architecture: Trigger downstream business processes based on database changes

# Data Model

# Input Data Format

Database Change Event:

{
  "table_name": "users",
  "operation": "UPDATE",
  "before_data": {
    "id": 1,
    "name": "Alice",
    "email": "alice@old.com",
    "status": "active",
    "updated_at": "2024-01-15T10:30:00Z"
  },
  "after_data": {
    "id": 1,
    "name": "Alice",
    "email": "alice@new.com",
    "status": "inactive",
    "updated_at": "2024-01-15T10:30:05Z"
  },
  "timestamp": "2024-01-15T10:30:05Z"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# Expected Output Format

Processed Change Event:

{
  "table_name": "users",
  "operation": "UPDATE",
  "primary_key": 1,
  "changed_fields": ["email", "status"],
  "old_values": {
    "email": "alice@old.com",
    "status": "active"
  },
  "new_values": {
    "email": "alice@new.com",
    "status": "inactive"
  },
  "change_timestamp": "2024-01-15T10:30:05Z"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# CDC Cases

# 1. User Change Capture

Business Scenario: Monitor user table changes, including registration, information updates, and status changes.

Data Model:

  • Source Table: users (id, username, email, status, created_at, updated_at)
  • Change Type: INSERT, UPDATE, DELETE
  • Key Fields: id (primary key), status

Change Event Example:

{
  "table_name": "users",
  "operation": "INSERT",
  "after_data": {
    "id": 1001,
    "username": "new_user",
    "email": "user@example.com",
    "status": "active",
    "created_at": "2024-01-15T10:30:00Z"
  }
}
1
2
3
4
5
6
7
8
9
10
11

Processing Logic:

  • Extract user registration events
  • Monitor status changes (active -> inactive)
  • Track email changes
  • Generate user lifecycle events

# 2. Order Change Capture

Business Scenario: Monitor order table changes, including order creation, status updates, and amount changes.

Data Model:

  • Source Table: orders (id, user_id, amount, status, created_at, updated_at)
  • Change Type: INSERT, UPDATE
  • Key Fields: id, status (pending, paid, shipped, completed, cancelled)

Change Event Example:

{
  "table_name": "orders",
  "operation": "UPDATE",
  "before_data": {
    "id": 2001,
    "status": "pending",
    "amount": 99.99
  },
  "after_data": {
    "id": 2001,
    "status": "paid",
    "amount": 99.99
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

Processing Logic:

  • Monitor order status changes
  • Calculate order completion rate
  • Track payment time
  • Identify cancelled orders

# 3. Product Change Capture

Business Scenario: Monitor product table changes, including price changes, inventory changes, and product information updates.

Data Model:

  • Source Table: products (id, name, price, stock, status, updated_at)
  • Change Type: INSERT, UPDATE
  • Key Fields: id, price, stock, status

Change Event Example:

{
  "table_name": "products",
  "operation": "UPDATE",
  "before_data": {
    "id": 3001,
    "price": 99.99,
    "stock": 100
  },
  "after_data": {
    "id": 3001,
    "price": 89.99,
    "stock": 95
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

Processing Logic:

  • Track price change history
  • Monitor inventory changes
  • Generate product update events
  • Calculate price change trends

# 4. Inventory Change Capture

Business Scenario: Monitor inventory table changes, including inventory increases, decreases, and safety stock alerts.

Data Model:

  • Source Table: inventory (id, product_id, quantity, warehouse_id, updated_at)
  • Change Type: INSERT, UPDATE
  • Key Fields: product_id, quantity, warehouse_id

Change Event Example:

{
  "table_name": "inventory",
  "operation": "UPDATE",
  "before_data": {
    "product_id": 4001,
    "quantity": 100,
    "warehouse_id": "WH001"
  },
  "after_data": {
    "product_id": 4001,
    "quantity": 85,
    "warehouse_id": "WH001"
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

Processing Logic:

  • Calculate inventory change volume
  • Monitor safety stock thresholds
  • Generate low stock alerts
  • Track inventory movement trends

# 5. Permission Change Capture

Business Scenario: Monitor permission table changes, including role changes, permission assignments, and access control updates.

Data Model:

  • Source Table: permissions (id, user_id, role, resource, action, updated_at)
  • Change Type: INSERT, UPDATE, DELETE
  • Key Fields: user_id, role, resource, action

Change Event Example:

{
  "table_name": "permissions",
  "operation": "INSERT",
  "after_data": {
    "id": 5001,
    "user_id": 1001,
    "role": "admin",
    "resource": "dashboard",
    "action": "read"
  }
}
1
2
3
4
5
6
7
8
9
10
11

Processing Logic:

  • Track permission changes
  • Generate permission audit logs
  • Update user permission cache
  • Trigger permission change notifications

# CDC Features

# 1. Real-time Processing

  • Low Latency: Process database changes in milliseconds
  • High Throughput: Support high-concurrency data processing
  • Exactly-once: Ensure data is processed exactly once

# 2. Data Integrity

  • Transaction Support: Ensure data consistency
  • Schema Evolution: Support schema changes
  • Data Validation: Verify data completeness and correctness

# 3. Flexible Processing

  • Conditional Filtering: Filter based on business rules
  • Data Transformation: Transform data formats
  • Event Enrichment: Enrich events with additional information

# 4. Monitoring and Alerting

  • Processing Lag Monitoring: Monitor processing delays
  • Error Rate Monitoring: Monitor processing error rates
  • Business Metric Monitoring: Monitor business-related metrics

# Data Compleness

# 1. Change Detection

  • Complete Change History: Record all data changes
  • Change Type Identification: Identify INSERT, UPDATE, DELETE operations
  • Field-Level Changes: Track changes at the field level

# 2. Data Consistency

  • Primary Key Consistency: Ensure primary key consistency
  • Foreign Key Relationships: Maintain referential integrity
  • Temporal Consistency: Ensure time-based consistency

# 3. Error Handling

  • Retry Mechanism: Handle temporary failures
  • Dead Letter Queue: Store failed messages
  • Monitoring and Alerting: Monitor processing status

# Application Scenarios

# 1. Data Warehouse Synchronization

-- Synchronize user changes to data warehouse
INSERT INTO dw_users
SELECT 
    after_data->>'id' as user_id,
    after_data->>'username' as username,
    after_data->>'email' as email,
    after_data->>'status' as status,
    operation,
    timestamp
FROM cdc_events
WHERE table_name = 'users' AND operation IN ('INSERT', 'UPDATE');
1
2
3
4
5
6
7
8
9
10
11

# 2. Cache Invalidation

-- Invalidate user cache based on user changes
SELECT 
    CASE 
        WHEN operation = 'UPDATE' THEN 'invalidate_cache'
        WHEN operation = 'DELETE' THEN 'remove_cache'
    END as action,
    after_data->>'id' as user_id
FROM cdc_events
WHERE table_name = 'users';
1
2
3
4
5
6
7
8
9

# 3. Audit Log Generation

-- Generate audit logs for order changes
SELECT 
    table_name,
    operation,
    before_data,
    after_data,
    timestamp,
    user_id
FROM cdc_events
WHERE table_name = 'orders';
1
2
3
4
5
6
7
8
9
10

# 4. Real-time Analytics

-- Real-time order analytics
SELECT 
    DATE(timestamp) as order_date,
    COUNT(*) as total_orders,
    SUM(CASE WHEN operation = 'INSERT' THEN 1 ELSE 0 END) as new_orders,
    SUM(CASE WHEN operation = 'UPDATE' AND after_data->>'status' = 'cancelled' THEN 1 ELSE 0 END) as cancelled_orders
FROM cdc_events
WHERE table_name = 'orders'
GROUP BY DATE(timestamp);
1
2
3
4
5
6
7
8
9

# Performance Optimization

# 1. Batch Processing

  • Batch Size: Process changes in batches to improve throughput
  • Batch Interval: Set appropriate batch processing intervals
  • Memory Management: Optimize memory usage

# 2. Parallel Processing

  • Partitioning: Partition data by table or key
  • Parallel Workers: Use multiple workers for processing
  • Load Balancing: Distribute load evenly

# 3. State Management

  • State Backend: Choose appropriate state backend
  • State Cleanup: Regularly clean up expired state
  • Checkpointing: Enable checkpointing for fault tolerance

# Summary

Change Data Capture (CDC) is a critical component in modern data architectures. StreamSQL provides powerful CDC processing capabilities:

  1. Real-time Processing: Low-latency processing of database changes
  2. Data Integrity: Ensure data consistency and completeness
  3. Flexible Processing: Support various business scenarios
  4. Monitoring and Alerting: Comprehensive monitoring capabilities

Key considerations for CDC implementation:

  • Data Volume: Consider data volume and processing capacity
  • Latency Requirements: Balance real-time requirements and system complexity
  • Data Quality: Ensure data accuracy and completeness
  • System Reliability: Implement proper error handling and recovery mechanisms

Through reasonable design and optimization, StreamSQL can build efficient and reliable CDC systems to support various real-time data processing requirements.

Edit this page on GitHub (opens new window)
Last Updated: 2025/08/05, 02:24:31
Multi-Stream Data Merging
Real-time Data Analysis

← Multi-Stream Data Merging Real-time Data Analysis→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式