Change Data Capture
# Change Data Capture (CDC) Case Study
# Business Scenario
In modern enterprise applications, database changes need to be captured and processed in real-time to support downstream systems such as data warehouses, cache updates, and audit logs. StreamSQL can be used to implement efficient change data capture (CDC) processing.
# Typical Scenarios
- Data Warehouse Synchronization: Real-time synchronization of database changes to data warehouses
- Cache Invalidation: Real-time update of cache data based on database changes
- Audit Log: Record all database change operations for audit purposes
- Real-time Analytics: Analyze business trends based on database change data
- Event-Driven Architecture: Trigger downstream business processes based on database changes
# Data Model
# Input Data Format
Database Change Event:
{
"table_name": "users",
"operation": "UPDATE",
"before_data": {
"id": 1,
"name": "Alice",
"email": "alice@old.com",
"status": "active",
"updated_at": "2024-01-15T10:30:00Z"
},
"after_data": {
"id": 1,
"name": "Alice",
"email": "alice@new.com",
"status": "inactive",
"updated_at": "2024-01-15T10:30:05Z"
},
"timestamp": "2024-01-15T10:30:05Z"
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Expected Output Format
Processed Change Event:
{
"table_name": "users",
"operation": "UPDATE",
"primary_key": 1,
"changed_fields": ["email", "status"],
"old_values": {
"email": "alice@old.com",
"status": "active"
},
"new_values": {
"email": "alice@new.com",
"status": "inactive"
},
"change_timestamp": "2024-01-15T10:30:05Z"
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# CDC Cases
# 1. User Change Capture
Business Scenario: Monitor user table changes, including registration, information updates, and status changes.
Data Model:
- Source Table: users (id, username, email, status, created_at, updated_at)
- Change Type: INSERT, UPDATE, DELETE
- Key Fields: id (primary key), status
Change Event Example:
{
"table_name": "users",
"operation": "INSERT",
"after_data": {
"id": 1001,
"username": "new_user",
"email": "user@example.com",
"status": "active",
"created_at": "2024-01-15T10:30:00Z"
}
}
2
3
4
5
6
7
8
9
10
11
Processing Logic:
- Extract user registration events
- Monitor status changes (active -> inactive)
- Track email changes
- Generate user lifecycle events
# 2. Order Change Capture
Business Scenario: Monitor order table changes, including order creation, status updates, and amount changes.
Data Model:
- Source Table: orders (id, user_id, amount, status, created_at, updated_at)
- Change Type: INSERT, UPDATE
- Key Fields: id, status (pending, paid, shipped, completed, cancelled)
Change Event Example:
{
"table_name": "orders",
"operation": "UPDATE",
"before_data": {
"id": 2001,
"status": "pending",
"amount": 99.99
},
"after_data": {
"id": 2001,
"status": "paid",
"amount": 99.99
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
Processing Logic:
- Monitor order status changes
- Calculate order completion rate
- Track payment time
- Identify cancelled orders
# 3. Product Change Capture
Business Scenario: Monitor product table changes, including price changes, inventory changes, and product information updates.
Data Model:
- Source Table: products (id, name, price, stock, status, updated_at)
- Change Type: INSERT, UPDATE
- Key Fields: id, price, stock, status
Change Event Example:
{
"table_name": "products",
"operation": "UPDATE",
"before_data": {
"id": 3001,
"price": 99.99,
"stock": 100
},
"after_data": {
"id": 3001,
"price": 89.99,
"stock": 95
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
Processing Logic:
- Track price change history
- Monitor inventory changes
- Generate product update events
- Calculate price change trends
# 4. Inventory Change Capture
Business Scenario: Monitor inventory table changes, including inventory increases, decreases, and safety stock alerts.
Data Model:
- Source Table: inventory (id, product_id, quantity, warehouse_id, updated_at)
- Change Type: INSERT, UPDATE
- Key Fields: product_id, quantity, warehouse_id
Change Event Example:
{
"table_name": "inventory",
"operation": "UPDATE",
"before_data": {
"product_id": 4001,
"quantity": 100,
"warehouse_id": "WH001"
},
"after_data": {
"product_id": 4001,
"quantity": 85,
"warehouse_id": "WH001"
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
Processing Logic:
- Calculate inventory change volume
- Monitor safety stock thresholds
- Generate low stock alerts
- Track inventory movement trends
# 5. Permission Change Capture
Business Scenario: Monitor permission table changes, including role changes, permission assignments, and access control updates.
Data Model:
- Source Table: permissions (id, user_id, role, resource, action, updated_at)
- Change Type: INSERT, UPDATE, DELETE
- Key Fields: user_id, role, resource, action
Change Event Example:
{
"table_name": "permissions",
"operation": "INSERT",
"after_data": {
"id": 5001,
"user_id": 1001,
"role": "admin",
"resource": "dashboard",
"action": "read"
}
}
2
3
4
5
6
7
8
9
10
11
Processing Logic:
- Track permission changes
- Generate permission audit logs
- Update user permission cache
- Trigger permission change notifications
# CDC Features
# 1. Real-time Processing
- Low Latency: Process database changes in milliseconds
- High Throughput: Support high-concurrency data processing
- Exactly-once: Ensure data is processed exactly once
# 2. Data Integrity
- Transaction Support: Ensure data consistency
- Schema Evolution: Support schema changes
- Data Validation: Verify data completeness and correctness
# 3. Flexible Processing
- Conditional Filtering: Filter based on business rules
- Data Transformation: Transform data formats
- Event Enrichment: Enrich events with additional information
# 4. Monitoring and Alerting
- Processing Lag Monitoring: Monitor processing delays
- Error Rate Monitoring: Monitor processing error rates
- Business Metric Monitoring: Monitor business-related metrics
# Data Compleness
# 1. Change Detection
- Complete Change History: Record all data changes
- Change Type Identification: Identify INSERT, UPDATE, DELETE operations
- Field-Level Changes: Track changes at the field level
# 2. Data Consistency
- Primary Key Consistency: Ensure primary key consistency
- Foreign Key Relationships: Maintain referential integrity
- Temporal Consistency: Ensure time-based consistency
# 3. Error Handling
- Retry Mechanism: Handle temporary failures
- Dead Letter Queue: Store failed messages
- Monitoring and Alerting: Monitor processing status
# Application Scenarios
# 1. Data Warehouse Synchronization
-- Synchronize user changes to data warehouse
INSERT INTO dw_users
SELECT
after_data->>'id' as user_id,
after_data->>'username' as username,
after_data->>'email' as email,
after_data->>'status' as status,
operation,
timestamp
FROM cdc_events
WHERE table_name = 'users' AND operation IN ('INSERT', 'UPDATE');
2
3
4
5
6
7
8
9
10
11
# 2. Cache Invalidation
-- Invalidate user cache based on user changes
SELECT
CASE
WHEN operation = 'UPDATE' THEN 'invalidate_cache'
WHEN operation = 'DELETE' THEN 'remove_cache'
END as action,
after_data->>'id' as user_id
FROM cdc_events
WHERE table_name = 'users';
2
3
4
5
6
7
8
9
# 3. Audit Log Generation
-- Generate audit logs for order changes
SELECT
table_name,
operation,
before_data,
after_data,
timestamp,
user_id
FROM cdc_events
WHERE table_name = 'orders';
2
3
4
5
6
7
8
9
10
# 4. Real-time Analytics
-- Real-time order analytics
SELECT
DATE(timestamp) as order_date,
COUNT(*) as total_orders,
SUM(CASE WHEN operation = 'INSERT' THEN 1 ELSE 0 END) as new_orders,
SUM(CASE WHEN operation = 'UPDATE' AND after_data->>'status' = 'cancelled' THEN 1 ELSE 0 END) as cancelled_orders
FROM cdc_events
WHERE table_name = 'orders'
GROUP BY DATE(timestamp);
2
3
4
5
6
7
8
9
# Performance Optimization
# 1. Batch Processing
- Batch Size: Process changes in batches to improve throughput
- Batch Interval: Set appropriate batch processing intervals
- Memory Management: Optimize memory usage
# 2. Parallel Processing
- Partitioning: Partition data by table or key
- Parallel Workers: Use multiple workers for processing
- Load Balancing: Distribute load evenly
# 3. State Management
- State Backend: Choose appropriate state backend
- State Cleanup: Regularly clean up expired state
- Checkpointing: Enable checkpointing for fault tolerance
# Summary
Change Data Capture (CDC) is a critical component in modern data architectures. StreamSQL provides powerful CDC processing capabilities:
- Real-time Processing: Low-latency processing of database changes
- Data Integrity: Ensure data consistency and completeness
- Flexible Processing: Support various business scenarios
- Monitoring and Alerting: Comprehensive monitoring capabilities
Key considerations for CDC implementation:
- Data Volume: Consider data volume and processing capacity
- Latency Requirements: Balance real-time requirements and system complexity
- Data Quality: Ensure data accuracy and completeness
- System Reliability: Implement proper error handling and recovery mechanisms
Through reasonable design and optimization, StreamSQL can build efficient and reliable CDC systems to support various real-time data processing requirements.