Quick Start
# Quick Start
This guide will take you through StreamSQL's basic features in 5 minutes, from installation to running your first stream processing program.
# Environment Requirements
Go 1.18 or higher
Basic Go language development experience
Understanding of basic SQL syntax (optional, but helpful for understanding)
# Installation
# 1. Create New Project
mkdir my-streamsql-app
cd my-streamsql-app
go mod init my-streamsql-app
2
3
# 2. Add Dependencies
go get github.com/rulego/streamsql
# 3. Verify Installation
Create a simple test file to verify installation:
package main
import (
"fmt"
"github.com/rulego/streamsql"
)
func main() {
ssql := streamsql.New()
fmt.Println("StreamSQL installed successfully!")
ssql.Stop()
}
2
3
4
5
6
7
8
9
10
11
12
# Core Concepts Overview
Before starting to write code, understand a few core concepts:
Stream: Continuous data sequence, similar to a table in a database
Window: Mechanism to divide unbounded streams into bounded datasets
Aggregation: Statistical calculations on data within windows
Sink: Callback function to process query results
# First StreamSQL Program
# 1. Basic Example - Simple Data Filtering
This example shows how to filter real-time data streams:
Create main.go
file:
package main
import (
"fmt"
"time"
"github.com/rulego/streamsql"
)
func main() {
// 1. Create StreamSQL instance
ssql := streamsql.New()
defer ssql.Stop()
// 2. Define SQL query - filter data with temperature greater than 25 degrees
sql := "SELECT deviceId, temperature FROM stream WHERE temperature > 25"
// 3. Execute SQL query
err := ssql.Execute(sql)
if err != nil {
panic(err)
}
// 4. Add result processing function
ssql.AddSink(func(results []map[string]interface{}) {
fmt.Printf("High temperature alert: %v\n", results)
})
// 5. Send test data
testData := []map[string]interface{}{
{"deviceId": "sensor001", "temperature": 23.5}, // Won't trigger alert
{"deviceId": "sensor002", "temperature": 28.3}, // Will trigger alert
{"deviceId": "sensor003", "temperature": 31.2}, // Will trigger alert
}
for _, data := range testData {
ssql.Emit(data)
time.Sleep(100 * time.Millisecond)
}
// Wait for processing to complete
time.Sleep(1 * time.Second)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
Run the program:
go run main.go
Expected output:
High temperature alert: [map[deviceId:sensor002 temperature:28.3]]
High temperature alert: [map[deviceId:sensor003 temperature:31.2]]
2
Code Analysis:
streamsql.New()
- Create StreamSQL instanceExecute(sql)
- Parse and execute SQL queryAddSink()
- Register result processing functionAddData()
- Add data to the streamWHERE temperature > 25
- Filter condition, only process data with temperature greater than 25 degrees
Important Note: For aggregate queries (using window functions), you need to wait for the window time to arrive or manually call ssql.Stream().Window.Trigger()
to trigger window calculations.
# 2. Aggregation Analysis Example - Calculate Average Temperature
package main
import (
"fmt"
"math/rand"
"time"
"github.com/rulego/streamsql"
)
func main() {
ssql := streamsql.New()
defer ssql.Stop()
// Calculate average temperature for each device every 5 seconds
sql := `SELECT deviceId,
AVG(temperature) as avg_temp,
COUNT(*) as sample_count,
window_start() as window_start,
window_end() as window_end
FROM stream
GROUP BY deviceId, TumblingWindow('5s')`
err := ssql.Execute(sql)
if err != nil {
panic(err)
}
// Process aggregation results
ssql.AddSink(func(results []map[string]interface{}) {
fmt.Printf("Aggregation results: %v\n", results)
})
// Simulate sensor data stream
devices := []string{"sensor001", "sensor002", "sensor003"}
for i := 0; i < 8; i++ {
for _, device := range devices {
data := map[string]interface{}{
"deviceId": device,
"temperature": 20.0 + rand.Float64()*15, // Random temperature 20-35 degrees
"timestamp": time.Now(),
}
ssql.Emit(data)
}
time.Sleep(300 * time.Millisecond)
}
// Wait for window to trigger
time.Sleep(5 * time.Second)
time.Sleep(500 * time.Millisecond)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# Advanced Examples
# 3. Sliding Window Analysis
package main
import (
"fmt"
"math/rand"
"time"
"github.com/rulego/streamsql"
)
func main() {
ssql := streamsql.New()
defer ssql.Stop()
// 30-second sliding window, sliding every 10 seconds
sql := `SELECT deviceId,
AVG(temperature) as avg_temp,
MAX(temperature) as max_temp,
MIN(temperature) as min_temp
FROM stream
WHERE temperature > 0
GROUP BY deviceId, SlidingWindow('30s', '10s')`
err := ssql.Execute(sql)
if err != nil {
panic(err)
}
ssql.AddSink(func(results []map[string]interface{}) {
fmt.Printf("Sliding window analysis: %v\n", results)
})
// Continuously send data
for i := 0; i < 10; i++ {
data := map[string]interface{}{
"deviceId": "sensor001",
"temperature": 20.0 + rand.Float64()*10,
"timestamp": time.Now(),
}
ssql.Emit(data)
time.Sleep(800 * time.Millisecond)
}
time.Sleep(1 * time.Second)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 4. Nested Field Access Example
package main
import (
"fmt"
"time"
"github.com/rulego/streamsql"
)
func main() {
ssql := streamsql.New()
defer ssql.Stop()
// SQL query to access nested fields
sql := `SELECT device.info.name as device_name,
device.location.building as building,
sensor.temperature as temp,
UPPER(device.info.type) as device_type
FROM stream
WHERE sensor.temperature > 25 AND device.info.status = 'active'`
err := ssql.Execute(sql)
if err != nil {
panic(err)
}
ssql.AddSink(func(results []map[string]interface{}) {
fmt.Printf("Nested field results: %v\n", results)
})
// Send nested structure data
complexData := map[string]interface{}{
"device": map[string]interface{}{
"info": map[string]interface{}{
"name": "Temperature Sensor 001",
"type": "temperature",
"status": "active",
},
"location": map[string]interface{}{
"building": "Building A",
"floor": "3F",
},
},
"sensor": map[string]interface{}{
"temperature": 28.5,
"humidity": 65.0,
},
}
ssql.Emit(complexData)
time.Sleep(500 * time.Millisecond)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# 5. Custom Function Example
StreamSQL supports registering and using custom functions:
package main
import (
"fmt"
"math"
"time"
"github.com/rulego/streamsql"
"github.com/rulego/streamsql/functions"
"github.com/rulego/streamsql/utils/cast"
)
func main() {
// Register custom functions
registerCustomFunctions()
ssql := streamsql.New()
defer ssql.Stop()
// SQL query using custom functions
sql := `SELECT
device,
square(value) as squared_value,
f_to_c(temperature) as celsius,
circle_area(radius) as area
FROM stream
WHERE value > 0`
err := ssql.Execute(sql)
if err != nil {
panic(err)
}
ssql.AddSink(func(results []map[string]interface{}) {
fmt.Printf("Custom function results: %v\n", results)
})
// Add test data
testData := []map[string]interface{}{
{
"device": "sensor1",
"value": 5.0,
"temperature": 68.0, // Fahrenheit
"radius": 3.0,
},
{
"device": "sensor2",
"value": 10.0,
"temperature": 86.0, // Fahrenheit
"radius": 2.5,
},
}
for _, data := range testData {
ssql.Emit(data)
time.Sleep(200 * time.Millisecond)
}
time.Sleep(500 * time.Millisecond)
}
// Register custom functions
func registerCustomFunctions() {
// Math function: square
functions.RegisterCustomFunction(
"square",
functions.TypeMath,
"Math Function",
"Calculate square",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
val := cast.ToFloat64(args[0])
return val * val, nil
},
)
// Fahrenheit to Celsius conversion function
functions.RegisterCustomFunction(
"f_to_c",
functions.TypeConversion,
"Temperature Conversion",
"Fahrenheit to Celsius",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
fahrenheit := cast.ToFloat64(args[0])
celsius := (fahrenheit - 32) * 5 / 9
return celsius, nil
},
)
// Circle area calculation function
functions.RegisterCustomFunction(
"circle_area",
functions.TypeMath,
"Geometric Calculation",
"Calculate circle area",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
radius := cast.ToFloat64(args[0])
if radius < 0 {
return nil, fmt.Errorf("radius must be positive")
}
area := math.Pi * radius * radius
return area, nil
},
)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# 6. Performance Mode Example
StreamSQL provides multiple performance modes to adapt to different scenarios:
package main
import (
"fmt"
"time"
"github.com/rulego/streamsql"
)
func main() {
// High performance mode - suitable for high throughput scenarios
ssqlHighPerf := streamsql.New(streamsql.WithHighPerformance())
defer ssqlHighPerf.Stop()
// Low latency mode - suitable for real-time response scenarios
ssqlLowLatency := streamsql.New(streamsql.WithLowLatency())
defer ssqlLowLatency.Stop()
// Zero data loss mode - suitable for critical data scenarios
ssqlZeroLoss := streamsql.New(streamsql.WithZeroDataLoss())
defer ssqlZeroLoss.Stop()
sql := "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('5s')"
// Execute same SQL for each instance
ssqlHighPerf.Execute(sql)
ssqlLowLatency.Execute(sql)
ssqlZeroLoss.Execute(sql)
fmt.Println("Different performance modes started")
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Performance Tips
Choose appropriate window size: Too small windows increase computational overhead, too large windows increase memory usage
Use filtering conditions reasonably: Filter data early in WHERE clauses to improve performance
Avoid complex nested queries: StreamSQL is optimized for simple and efficient queries
Monitor memory usage: Monitor memory usage in high-frequency data scenarios
# Common Questions
# Q: No output results for data?
A: Check the following:
- Ensure
AddSink()
is called to add result processing function - If using window functions, ensure window is triggered (time elapsed or manually triggered)
- Check if WHERE conditions filter out all data
# Q: When do window functions trigger?
A:
Tumbling window: Automatically triggers when window end time is reached
Sliding window: Tr