Core Concepts
# Core Concepts
Understanding StreamSQL's core concepts is key to using it efficiently. This chapter will detail important concepts such as stream processing, windows, and aggregation.
# Stream Processing Basics
# What is a Data Stream
A data stream (Stream) is a series of continuously generated data records with the following characteristics:
- Unbounded: Data is continuously generated without a clear end
- Sequential: Data arrives in chronological order
- Real-time: Needs to be processed quickly, cannot wait for all data
- Immutable: Historical data cannot be modified
# Stream Processing vs Batch Processing
Feature | Stream Processing | Batch Processing |
---|---|---|
Data Boundary | Unbounded | Bounded |
Processing Latency | Milliseconds | Minutes/Hours |
Data Completeness | Approximate results | Exact results |
Resource Usage | Continuous occupation | Periodic occupation |
Application Scenarios | Real-time monitoring, alerts | Reports, analysis |
# Stream Processing Model
StreamSQL adopts a micro-batch processing model:
# Time Semantics
# Time Types
StreamSQL supports two time concepts:
# 1. Event Time
- The actual time when data was generated
- Use
with (TIMESTAMP='field_name')
to specify - Suitable for scenarios requiring precise timing
- Important: After setting event time, window time is based on the timestamp field in the message, not the message arrival time
SELECT deviceId, AVG(temperature)
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
WITH (TIMESTAMP='event_time')
2
3
4
# 2. Processing Time
- The time when data arrives at the processing system
- Uses system current time by default
- Simple processing but may be inaccurate
-- Use processing time (default)
SELECT deviceId, AVG(temperature)
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
2
3
4
# Time Formats
Supports multiple time units:
-- Time unit configuration
WITH (TIMEUNIT='ss') -- seconds
WITH (TIMEUNIT='ms') -- milliseconds
WITH (TIMEUNIT='mi') -- minutes
WITH (TIMEUNIT='hh') -- hours
WITH (TIMEUNIT='dd') -- days
2
3
4
5
6
# Window Concepts
Windows are core concepts in stream processing, used to divide unbounded streams into bounded datasets for aggregation operations.
# Window Types
# 1. Tumbling Window
Fixed-size, non-overlapping time windows:
-- Calculate average every 5 minutes
SELECT AVG(temperature)
FROM stream
GROUP BY TumblingWindow('5m')
2
3
4
Features:
- Each data belongs to only one window
- No overlap between windows
- Suitable for periodic statistics
# 2. Sliding Window
Fixed-size, overlapping time windows:
-- 5-minute window, sliding every 2 minutes
SELECT AVG(temperature)
FROM stream
GROUP BY SlidingWindow('5m', '2m')
2
3
4
Features:
- Each data may belong to multiple windows
- Provides smoother analysis results
- Relatively higher computational overhead
# 3. Counting Window
Window based on data count:
-- Calculate average every 100 data points
SELECT AVG(temperature)
FROM stream
GROUP BY CountingWindow(100)
2
3
4
Features:
- Based on data volume rather than time
- Fixed window size
- Suitable for scenarios with stable data volume
# 4. Session Window
Dynamic window based on data activity:
-- Close session after 5 minutes timeout
SELECT user_id, COUNT(*)
FROM stream
GROUP BY user_id, SessionWindow('5m')
2
3
4
Features:
- Window size varies dynamically
- Session is determined based on data intervals
- Suitable for user behavior analysis
# Window Lifecycle
# Aggregation Operations
# Aggregation Function Categories
# 1. Statistical Aggregation
SELECT deviceId,
COUNT(*) as data_count, -- Count
SUM(temperature) as total_temp, -- Sum
AVG(temperature) as avg_temp, -- Average
MIN(temperature) as min_temp, -- Minimum
MAX(temperature) as max_temp -- Maximum
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
2
3
4
5
6
7
8
# 2. Advanced Statistics
SELECT deviceId,
STDDEV(temperature) as std_temp, -- Standard deviation
MEDIAN(temperature) as med_temp, -- Median
PERCENTILE(temperature, 0.95) as p95 -- 95th percentile
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
2
3
4
5
6
# 3. Collection Aggregation
SELECT deviceId,
COLLECT(temperature) as temp_list, -- Collect to array
LAST_VALUE(temperature) as last_temp -- Last value
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
2
3
4
5
# Aggregation State Management
StreamSQL automatically manages aggregation states:
# Expression System
# Arithmetic Expressions
SELECT deviceId,
temperature * 1.8 + 32 as fahrenheit, -- Temperature conversion
(humidity + moisture) / 2 as avg_wet -- Average humidity
FROM stream
2
3
4
# Logical Expressions
SELECT deviceId,
temperature > 30 AND humidity > 80 as alert_condition
FROM stream
WHERE temperature IS NOT NULL
2
3
4
# String Expressions
SELECT deviceId,
CONCAT(deviceId, '_', status) as device_status,
UPPER(location) as location_upper
FROM stream
2
3
4
# Conditional Expressions
SELECT deviceId,
CASE
WHEN temperature > 35 THEN 'High'
WHEN temperature > 25 THEN 'Normal'
ELSE 'Low'
END as temp_level
FROM stream
2
3
4
5
6
7
# Data Types
# Basic Types
Type | Description | Example |
---|---|---|
Numeric | Integer, Float | 25 , 3.14 , -10 |
String | Text data | "sensor001" , 'active' |
Boolean | Logical values | true , false |
Time | Timestamp | time.Now() |
# Composite Types
// Support nested structures
data := map[string]interface{}{
"deviceId": "sensor001",
"location": map[string]interface{}{
"building": "A",
"floor": 3,
},
"readings": []float64{23.5, 24.1, 25.2},
}
2
3
4
5
6
7
8
9
# Type Conversion
StreamSQL provides automatic type conversion:
-- Automatic string to number conversion
SELECT deviceId, temperature + '5' as adjusted_temp
FROM stream
-- Explicit conversion
SELECT deviceId, CAST(temperature AS STRING) as temp_str
FROM stream
2
3
4
5
6
7
# Execution Model
# Data Flow
# Processing Stages
- Parsing Phase: SQL statement parsed into abstract syntax tree
- Planning Phase: Generate execution plan and configuration
- Execution Phase: Create stream processing pipeline
- Running Phase: Continuously process data streams
# Resource Management
// Proper resource management
ssql := streamsql.New()
defer ssql.Stop() // Ensure resource release
// Error handling
err := ssql.Execute(sql)
if err != nil {
log.Printf("Execution failed: %v", err)
return
}
2
3
4
5
6
7
8
9
10
# Performance Considerations
# Memory Usage
- Window Size: Larger windows occupy more memory
- Aggregation State: Complex aggregations require more state storage
- Data Types: Avoid unnecessary large objects
# Computational Complexity
- Sliding Window > Tumbling Window > No Window
- Complex Expressions > Simple Expressions
- Multiple GROUP BY > Single GROUP BY