RuleGo RuleGo
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • Quick Start

  • Rule Chain

  • Standard Components

  • Extension Components

  • Custom Components

  • Components marketplace

  • Visualization

  • AOP

  • Trigger

  • Advanced Topic

  • RuleGo-Server

  • FAQ

  • Endpoint Module

  • Support

  • StreamSQL

    • Overview
    • Quick Start
      • Environment Requirements
      • Installation
        • 1. Create New Project
        • 2. Add Dependencies
        • 3. Verify Installation
      • Core Concepts Overview
      • First StreamSQL Program
        • 1. Basic Example - Simple Data Filtering
        • 2. Aggregation Analysis Example - Calculate Average Temperature
      • Advanced Examples
        • 3. Sliding Window Analysis
        • 4. Nested Field Access Example
        • 5. Custom Function Example
        • 6. Performance Mode Example
      • Performance Tips
      • Common Questions
        • Q: No output results for data?
        • Q: When do window functions trigger?
    • Core Concepts
    • SQL Reference
    • API Reference
    • RuleGo Integration
    • functions

    • case-studies

目录

Quick Start

# Quick Start

This guide will take you through StreamSQL's basic features in 5 minutes, from installation to running your first stream processing program.

# Environment Requirements

  • Go 1.18 or higher

  • Basic Go language development experience

  • Understanding of basic SQL syntax (optional, but helpful for understanding)

# Installation

# 1. Create New Project

mkdir my-streamsql-app
cd my-streamsql-app
go mod init my-streamsql-app
1
2
3

# 2. Add Dependencies

go get github.com/rulego/streamsql
1

# 3. Verify Installation

Create a simple test file to verify installation:

package main

import (
    "fmt"
    "github.com/rulego/streamsql"
)

func main() {
    ssql := streamsql.New()
    fmt.Println("StreamSQL installed successfully!")
    ssql.Stop()
}
1
2
3
4
5
6
7
8
9
10
11
12

# Core Concepts Overview

Before starting to write code, understand a few core concepts:

  • Stream: Continuous data sequence, similar to a table in a database

  • Window: Mechanism to divide unbounded streams into bounded datasets

  • Aggregation: Statistical calculations on data within windows

  • Sink: Callback function to process query results

# First StreamSQL Program

# 1. Basic Example - Simple Data Filtering

This example shows how to filter real-time data streams:

Create main.go file:

package main

import (
    "fmt"
    "time"
    "github.com/rulego/streamsql"
)

func main() {
    // 1. Create StreamSQL instance
    ssql := streamsql.New()
    defer ssql.Stop()
    
    // 2. Define SQL query - filter data with temperature greater than 25 degrees
    sql := "SELECT deviceId, temperature FROM stream WHERE temperature > 25"
    
    // 3. Execute SQL query
    err := ssql.Execute(sql)
    if err != nil {
        panic(err)
    }
    
    // 4. Add result processing function
    ssql.AddSink(func(results []map[string]interface{}) {
        fmt.Printf("High temperature alert: %v\n", results)
    })
    
    // 5. Send test data
    testData := []map[string]interface{}{
        {"deviceId": "sensor001", "temperature": 23.5}, // Won't trigger alert
        {"deviceId": "sensor002", "temperature": 28.3}, // Will trigger alert
        {"deviceId": "sensor003", "temperature": 31.2}, // Will trigger alert
    }
    
    for _, data := range testData {
        ssql.Emit(data)
        time.Sleep(100 * time.Millisecond)
    }
    
    // Wait for processing to complete
    time.Sleep(1 * time.Second)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

Run the program:

go run main.go
1

Expected output:

High temperature alert: [map[deviceId:sensor002 temperature:28.3]]
High temperature alert: [map[deviceId:sensor003 temperature:31.2]]
1
2

Code Analysis:

  1. streamsql.New() - Create StreamSQL instance
  2. Execute(sql) - Parse and execute SQL query
  3. AddSink() - Register result processing function
  4. AddData() - Add data to the stream
  5. WHERE temperature > 25 - Filter condition, only process data with temperature greater than 25 degrees

Important Note: For aggregate queries (using window functions), you need to wait for the window time to arrive or manually call ssql.Stream().Window.Trigger() to trigger window calculations.

# 2. Aggregation Analysis Example - Calculate Average Temperature

package main

import (
    "fmt"
    "math/rand"
    "time"
    "github.com/rulego/streamsql"
)

func main() {
    ssql := streamsql.New()
    defer ssql.Stop()
    
    // Calculate average temperature for each device every 5 seconds
    sql := `SELECT deviceId, 
                   AVG(temperature) as avg_temp,
                   COUNT(*) as sample_count,
                   window_start() as window_start,
                   window_end() as window_end
            FROM stream 
            GROUP BY deviceId, TumblingWindow('5s')`
    
    err := ssql.Execute(sql)
    if err != nil {
        panic(err)
    }
    
    // Process aggregation results
    ssql.AddSink(func(results []map[string]interface{}) {
        fmt.Printf("Aggregation results: %v\n", results)
    })
    
    // Simulate sensor data stream
    devices := []string{"sensor001", "sensor002", "sensor003"}
    for i := 0; i < 8; i++ {
        for _, device := range devices {
            data := map[string]interface{}{
                "deviceId":    device,
                "temperature": 20.0 + rand.Float64()*15, // Random temperature 20-35 degrees
                "timestamp":   time.Now(),
            }
            ssql.Emit(data)
        }
        time.Sleep(300 * time.Millisecond)
    }
    
    // Wait for window to trigger
    time.Sleep(5 * time.Second)
    time.Sleep(500 * time.Millisecond)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

# Advanced Examples

# 3. Sliding Window Analysis

package main

import (
    "fmt"
    "math/rand"
    "time"
    "github.com/rulego/streamsql"
)

func main() {
    ssql := streamsql.New()
    defer ssql.Stop()
    
    // 30-second sliding window, sliding every 10 seconds
    sql := `SELECT deviceId,
                   AVG(temperature) as avg_temp,
                   MAX(temperature) as max_temp,
                   MIN(temperature) as min_temp
            FROM stream 
            WHERE temperature > 0
            GROUP BY deviceId, SlidingWindow('30s', '10s')`
    
    err := ssql.Execute(sql)
    if err != nil {
        panic(err)
    }
    
    ssql.AddSink(func(results []map[string]interface{}) {
        fmt.Printf("Sliding window analysis: %v\n", results)
    })
    
    // Continuously send data
    for i := 0; i < 10; i++ {
        data := map[string]interface{}{
            "deviceId":    "sensor001",
            "temperature": 20.0 + rand.Float64()*10,
            "timestamp":   time.Now(),
        }
        ssql.Emit(data)
        time.Sleep(800 * time.Millisecond)
    }
    
    time.Sleep(1 * time.Second)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

# 4. Nested Field Access Example

package main

import (
    "fmt"
    "time"
    "github.com/rulego/streamsql"
)

func main() {
    ssql := streamsql.New()
    defer ssql.Stop()
    
    // SQL query to access nested fields
    sql := `SELECT device.info.name as device_name,
                   device.location.building as building,
                   sensor.temperature as temp,
                   UPPER(device.info.type) as device_type
            FROM stream 
            WHERE sensor.temperature > 25 AND device.info.status = 'active'`
    
    err := ssql.Execute(sql)
    if err != nil {
        panic(err)
    }
    
    ssql.AddSink(func(results []map[string]interface{}) {
        fmt.Printf("Nested field results: %v\n", results)
    })
    
    // Send nested structure data
    complexData := map[string]interface{}{
        "device": map[string]interface{}{
            "info": map[string]interface{}{
                "name":   "Temperature Sensor 001",
                "type":   "temperature",
                "status": "active",
            },
            "location": map[string]interface{}{
                "building": "Building A",
                "floor":    "3F",
            },
        },
        "sensor": map[string]interface{}{
            "temperature": 28.5,
            "humidity":    65.0,
        },
    }
    
    ssql.Emit(complexData)
    time.Sleep(500 * time.Millisecond)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

# 5. Custom Function Example

StreamSQL supports registering and using custom functions:

package main

import (
    "fmt"
    "math"
    "time"
    "github.com/rulego/streamsql"
    "github.com/rulego/streamsql/functions"
    "github.com/rulego/streamsql/utils/cast"
)

func main() {
    // Register custom functions
    registerCustomFunctions()
    
    ssql := streamsql.New()
    defer ssql.Stop()
    
    // SQL query using custom functions
    sql := `SELECT 
                device,
                square(value) as squared_value,
                f_to_c(temperature) as celsius,
                circle_area(radius) as area
            FROM stream
            WHERE value > 0`
    
    err := ssql.Execute(sql)
    if err != nil {
        panic(err)
    }
    
    ssql.AddSink(func(results []map[string]interface{}) {
        fmt.Printf("Custom function results: %v\n", results)
    })
    
    // Add test data
    testData := []map[string]interface{}{
        {
            "device":      "sensor1",
            "value":       5.0,
            "temperature": 68.0, // Fahrenheit
            "radius":      3.0,
        },
        {
            "device":      "sensor2",
            "value":       10.0,
            "temperature": 86.0, // Fahrenheit
            "radius":      2.5,
        },
    }
    
    for _, data := range testData {
        ssql.Emit(data)
        time.Sleep(200 * time.Millisecond)
    }
    
    time.Sleep(500 * time.Millisecond)
}

// Register custom functions
func registerCustomFunctions() {
    // Math function: square
    functions.RegisterCustomFunction(
        "square",
        functions.TypeMath,
        "Math Function",
        "Calculate square",
        1, 1,
        func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
            val := cast.ToFloat64(args[0])
            return val * val, nil
        },
    )
    
    // Fahrenheit to Celsius conversion function
    functions.RegisterCustomFunction(
        "f_to_c",
        functions.TypeConversion,
        "Temperature Conversion",
        "Fahrenheit to Celsius",
        1, 1,
        func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
            fahrenheit := cast.ToFloat64(args[0])
            celsius := (fahrenheit - 32) * 5 / 9
            return celsius, nil
        },
    )
    
    // Circle area calculation function
    functions.RegisterCustomFunction(
        "circle_area",
        functions.TypeMath,
        "Geometric Calculation",
        "Calculate circle area",
        1, 1,
        func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
            radius := cast.ToFloat64(args[0])
            if radius < 0 {
                return nil, fmt.Errorf("radius must be positive")
            }
            area := math.Pi * radius * radius
            return area, nil
        },
    )
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106

# 6. Performance Mode Example

StreamSQL provides multiple performance modes to adapt to different scenarios:

package main

import (
    "fmt"
    "time"
    "github.com/rulego/streamsql"
)

func main() {
    // High performance mode - suitable for high throughput scenarios
    ssqlHighPerf := streamsql.New(streamsql.WithHighPerformance())
    defer ssqlHighPerf.Stop()
    
    // Low latency mode - suitable for real-time response scenarios
    ssqlLowLatency := streamsql.New(streamsql.WithLowLatency())
    defer ssqlLowLatency.Stop()
    
    // Zero data loss mode - suitable for critical data scenarios
    ssqlZeroLoss := streamsql.New(streamsql.WithZeroDataLoss())
    defer ssqlZeroLoss.Stop()
    
    sql := "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('5s')"
    
    // Execute same SQL for each instance
    ssqlHighPerf.Execute(sql)
    ssqlLowLatency.Execute(sql)
    ssqlZeroLoss.Execute(sql)
    
    fmt.Println("Different performance modes started")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# Performance Tips

  • Choose appropriate window size: Too small windows increase computational overhead, too large windows increase memory usage

  • Use filtering conditions reasonably: Filter data early in WHERE clauses to improve performance

  • Avoid complex nested queries: StreamSQL is optimized for simple and efficient queries

  • Monitor memory usage: Monitor memory usage in high-frequency data scenarios

# Common Questions

# Q: No output results for data?

A: Check the following:

  1. Ensure AddSink() is called to add result processing function
  2. If using window functions, ensure window is triggered (time elapsed or manually triggered)
  3. Check if WHERE conditions filter out all data

# Q: When do window functions trigger?

A:

  • Tumbling window: Automatically triggers when window end time is reached

  • Sliding window: Tr

Edit this page on GitHub (opens new window)
Last Updated: 2025/08/05, 02:24:31
Overview
Core Concepts

← Overview Core Concepts→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式