RuleGo RuleGo
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • Quick Start

  • Rule Chain

  • Standard Components

  • Extension Components

  • Custom Components

  • Components marketplace

  • Visualization

  • AOP

  • Trigger

  • Advanced Topic

  • RuleGo-Server

  • FAQ

  • Endpoint Module

  • Support

  • StreamSQL

    • Overview
    • Quick Start
    • Core Concepts
      • Stream Processing Basics
        • What is a Data Stream
        • Stream Processing vs Batch Processing
        • Stream Processing Model
      • Time Semantics
        • Time Types
        • 1. Event Time
        • 2. Processing Time
        • Event Time vs Processing Time
        • Time Unit Configuration
        • Watermark and Late Data
      • Window Concepts
        • Window Types
        • 1. Tumbling Window
        • 2. Sliding Window
        • 3. Counting Window
        • 4. Session Window
        • Window Lifecycle
      • Aggregation Operations
        • Aggregation Function Categories
        • 1. Statistical Aggregation
        • 2. Advanced Statistics
        • 3. Collection Aggregation
        • Aggregation State Management
      • Expression System
        • Arithmetic Expressions
        • Logical Expressions
        • String Expressions
        • Conditional Expressions
      • Data Types
        • Basic Types
        • Composite Types
        • Type Conversion
      • Execution Model
        • Data Flow
        • Processing Stages
        • Resource Management
      • Performance Considerations
        • Memory Usage
        • Computational Complexity
    • SQL Reference
    • API Reference
    • RuleGo Integration
    • functions

    • case-studies

目录

Core Concepts

# Core Concepts

Understanding StreamSQL's core concepts is key to using it efficiently. This chapter will detail important concepts such as stream processing, windows, and aggregation.

# Stream Processing Basics

# What is a Data Stream

A data stream (Stream) is a series of continuously generated data records with the following characteristics:

  • Unbounded: Data is continuously generated without a clear end
  • Sequential: Data arrives in chronological order
  • Real-time: Needs to be processed quickly, cannot wait for all data
  • Immutable: Historical data cannot be modified

# Stream Processing vs Batch Processing

Feature Stream Processing Batch Processing
Data Boundary Unbounded Bounded
Processing Latency Milliseconds Minutes/Hours
Data Completeness Approximate results Exact results
Resource Usage Continuous occupation Periodic occupation
Application Scenarios Real-time monitoring, alerts Reports, analysis

# Stream Processing Model

StreamSQL adopts a micro-batch processing model:

# Time Semantics

# Time Types

StreamSQL supports two time concepts that determine how windows are partitioned and triggered:

# 1. Event Time

Definition: The time when the data was actually generated, usually in a field such as order_time, event_time, or timestamp.

Features:

  • Partition windows based on the timestamp field value
  • Late-arriving data is counted into the correct window
  • Uses a Watermark mechanism to handle out-of-order and late data
  • Accurate results, but may be delayed (waits for late data)

Use cases:

  • Precise time-series analysis
  • Out-of-order or delayed data
  • Historical replay and analysis

Configuration: Use WITH (TIMESTAMP='field_name') to specify the event-time field

Example (orders):

-- Event-time window: partitioned by order_time
SELECT COUNT(*) AS order_count
FROM stream
GROUP BY TumblingWindow('5m')
WITH (TIMESTAMP='order_time')
1
2
3
4
5

How it works:

  • On arrival, the system extracts order_time
  • Assign the record to the window defined by order_time (e.g., 10:00–10:05)
  • Use Watermark to decide when a window can trigger
  • Even if data arrives at 11:00, if order_time=10:03 it is counted in [10:00–10:05)
-- Full example: event time with time unit
SELECT deviceId, AVG(temperature)
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
WITH (TIMESTAMP='event_time', TIMEUNIT='ms')
1
2
3
4
5

# 2. Processing Time

Definition: The time when StreamSQL receives the record.

Features:

  • Partition windows by arrival time (time.Now())
  • Ignores timestamp fields inside the record
  • Uses system clock (Timer) to trigger windows
  • Low latency, but may be less accurate

Use cases:

  • Real-time monitoring and alerts
  • Low-latency requirements
  • Data arrives in order with controlled delays

Configuration: Omit WITH (TIMESTAMP=...) to use processing time

Example (orders):

-- Processing-time window: partitioned by system time
SELECT COUNT(*) AS order_count
FROM stream
GROUP BY TumblingWindow('5m')
1
2
3
4

How it works:

  • Use current time on arrival
  • Assign by arrival time (e.g., 11:00–11:05)
  • Trigger via system timer
  • If data arrives between 11:00–11:05, even with order_time=10:03, it is counted in [11:00–11:05)
-- Default uses processing time
SELECT deviceId, AVG(temperature)
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
1
2
3
4

# Event Time vs Processing Time

Feature Event Time Processing Time
Time source Timestamp field in data System current time
Window partitioning Based on event timestamp Based on arrival time
Late handling Supported (Watermark) Not supported
Out-of-order handling Supported (Watermark) Not supported
Result accuracy Accurate May be inaccurate
Processing latency Higher Lower
Configuration WITH (TIMESTAMP='field') Default (omit WITH)
Scenarios Precise analysis, historical replay Real-time monitoring, low latency

# Time Unit Configuration

When the event-time field is an integer (Unix timestamp), specify the unit:

WITH (TIMEUNIT='ns')  -- nanoseconds
WITH (TIMEUNIT='ms')  -- milliseconds (default)
WITH (TIMEUNIT='ss')  -- seconds
WITH (TIMEUNIT='mi')  -- minutes
WITH (TIMEUNIT='hh')  -- hours
WITH (TIMEUNIT='dd')  -- days
1
2
3
4
5
6

Notes:

  • No TIMEUNIT needed for time.Time fields
  • Integer timestamps (int64) must specify TIMEUNIT
  • Processing-time windows never need a time unit

# Watermark and Late Data

Event-time windows use Watermarks to handle out-of-order and late data.

Watermark:

  • Meaning: events earlier than the watermark are assumed complete
  • Formula: Watermark = max(event_time) - MaxOutOfOrderness
  • Trigger: window fires when watermark >= window_end
  • MaxOutOfOrderness: allowed out-of-order duration

Example:

  • MaxOutOfOrderness = 5s, max(event_time) = 10:10 → watermark = 10:05
  • Window [10:00, 10:05) can trigger

Late data handling:

  1. MaxOutOfOrderness (before trigger)
  • Config: WITH (MAXOUTOFORDERNESS='5s')
  • Effect: delays watermark and window trigger to tolerate out-of-order data
  1. AllowedLateness (after trigger)
  • Config: WITH (ALLOWEDLATENESS='2s')
  • Effect: keeps window open after trigger and updates results with late data
SELECT COUNT(*) AS order_count
FROM stream
GROUP BY TumblingWindow('5m')
WITH (
  TIMESTAMP='order_time',
  MAXOUTOFORDERNESS='5s',
  ALLOWEDLATENESS='2s'
)
1
2
3
4
5
6
7
8
  1. IdleTimeout (idle source)
  • Config: WITH (IDLETIMEOUT='5s')
  • Effect: when the source is idle, advance watermark by processing time so windows can close
SELECT COUNT(*) AS order_count
FROM stream
GROUP BY TumblingWindow('5m')
WITH (
  TIMESTAMP='order_time',
  MAXOUTOFORDERNESS='5s',
  ALLOWEDLATENESS='2s',
  IDLETIMEOUT='5s'
)
1
2
3
4
5
6
7
8
9

Workflow:

  • Update watermark on data arrival (consider MaxOutOfOrderness)
  • Trigger when watermark >= window_end
  • Keep open until watermark >= window_end + AllowedLateness
  • Late arrivals within this period cause delayed updates
  • After AllowedLateness, the window closes and late data is ignored
  • If the source is idle beyond IdleTimeout, watermark = currentProcessingTime - MaxOutOfOrderness, ensuring windows eventually close

# Window Concepts

Windows are core concepts in stream processing, used to divide unbounded streams into bounded datasets for aggregation operations.

# Window Types

# 1. Tumbling Window

Fixed-size, non-overlapping time windows:

-- Calculate average every 5 minutes
SELECT AVG(temperature) 
FROM stream 
GROUP BY TumblingWindow('5m')
1
2
3
4

Features:

  • Each data belongs to only one window
  • No overlap between windows
  • Suitable for periodic statistics

# 2. Sliding Window

Fixed-size, overlapping time windows:

-- 5-minute window, sliding every 2 minutes
SELECT AVG(temperature) 
FROM stream 
GROUP BY SlidingWindow('5m', '2m')
1
2
3
4

Features:

  • Each data may belong to multiple windows
  • Provides smoother analysis results
  • Relatively higher computational overhead

# 3. Counting Window

Window based on data count:

-- Calculate average every 100 data points
SELECT AVG(temperature) 
FROM stream 
GROUP BY CountingWindow(100)
1
2
3
4

Features:

  • Based on data volume rather than time
  • Fixed window size
  • Suitable for scenarios with stable data volume

# 4. Session Window

Dynamic window based on data activity:

-- Close session after 5 minutes timeout
SELECT user_id, COUNT(*) 
FROM stream 
GROUP BY user_id, SessionWindow('5m')
1
2
3
4

Features:

  • Window size varies dynamically
  • Session is determined based on data intervals
  • Suitable for user behavior analysis

# Window Lifecycle

# Aggregation Operations

# Aggregation Function Categories

# 1. Statistical Aggregation

SELECT deviceId,
       COUNT(*) as data_count,           -- Count
       SUM(temperature) as total_temp,   -- Sum
       AVG(temperature) as avg_temp,     -- Average
       MIN(temperature) as min_temp,     -- Minimum
       MAX(temperature) as max_temp      -- Maximum
FROM stream 
GROUP BY deviceId, TumblingWindow('1m')
1
2
3
4
5
6
7
8

# 2. Advanced Statistics

SELECT deviceId,
       STDDEV(temperature) as std_temp,      -- Standard deviation
       MEDIAN(temperature) as med_temp,      -- Median
       PERCENTILE(temperature, 0.95) as p95 -- 95th percentile
FROM stream 
GROUP BY deviceId, TumblingWindow('1m')
1
2
3
4
5
6

# 3. Collection Aggregation

SELECT deviceId,
       COLLECT(temperature) as temp_list,     -- Collect to array
       LAST_VALUE(temperature) as last_temp   -- Last value
FROM stream 
GROUP BY deviceId, TumblingWindow('1m')
1
2
3
4
5

# Aggregation State Management

StreamSQL automatically manages aggregation states:

# Expression System

# Arithmetic Expressions

SELECT deviceId,
       temperature * 1.8 + 32 as fahrenheit,  -- Temperature conversion
       (humidity + moisture) / 2 as avg_wet   -- Average humidity
FROM stream
1
2
3
4

# Logical Expressions

SELECT deviceId,
       temperature > 30 AND humidity > 80 as alert_condition
FROM stream
WHERE temperature IS NOT NULL
1
2
3
4

# String Expressions

SELECT deviceId,
       CONCAT(deviceId, '_', status) as device_status,
       UPPER(location) as location_upper
FROM stream
1
2
3
4

# Conditional Expressions

SELECT deviceId,
       CASE 
           WHEN temperature > 35 THEN 'High'
           WHEN temperature > 25 THEN 'Normal'
           ELSE 'Low'
       END as temp_level
FROM stream
1
2
3
4
5
6
7

# Data Types

# Basic Types

Type Description Example
Numeric Integer, Float 25, 3.14, -10
String Text data "sensor001", 'active'
Boolean Logical values true, false
Time Timestamp time.Now()

# Composite Types

// Support nested structures
data := map[string]interface{}{
    "deviceId": "sensor001",
    "location": map[string]interface{}{
        "building": "A",
        "floor": 3,
    },
    "readings": []float64{23.5, 24.1, 25.2},
}
1
2
3
4
5
6
7
8
9

# Type Conversion

StreamSQL provides automatic type conversion:

-- Automatic string to number conversion
SELECT deviceId, temperature + '5' as adjusted_temp
FROM stream

-- Explicit conversion
SELECT deviceId, CAST(temperature AS STRING) as temp_str
FROM stream
1
2
3
4
5
6
7

# Execution Model

# Data Flow

# Processing Stages

  1. Parsing Phase: SQL statement parsed into abstract syntax tree
  2. Planning Phase: Generate execution plan and configuration
  3. Execution Phase: Create stream processing pipeline
  4. Running Phase: Continuously process data streams

# Resource Management

// Proper resource management
ssql := streamsql.New()
defer ssql.Stop()  // Ensure resource release

// Error handling
err := ssql.Execute(sql)
if err != nil {
    log.Printf("Execution failed: %v", err)
    return
}
1
2
3
4
5
6
7
8
9
10

# Performance Considerations

# Memory Usage

  • Window Size: Larger windows occupy more memory
  • Aggregation State: Complex aggregations require more state storage
  • Data Types: Avoid unnecessary large objects

# Computational Complexity

  • Sliding Window > Tumbling Window > No Window
  • Complex Expressions > Simple Expressions
  • Multiple GROUP BY > Single GROUP BY
Edit this page on GitHub (opens new window)
Last Updated: 2025/11/14, 10:47:28
Quick Start
SQL Reference

← Quick Start SQL Reference→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式