快速开始
# 快速开始
本指南将在5分钟内带您体验StreamSQL的基本功能,从安装到运行第一个流处理程序。
# 环境要求
Go 1.18 或更高版本
基本的Go语言开发经验
了解SQL基础语法(可选,但有助于理解)
# 安装
# 1. 创建新项目
mkdir my-streamsql-app
cd my-streamsql-app
go mod init my-streamsql-app
1
2
3
2
3
# 2. 添加依赖
go get github.com/rulego/streamsql
1
# 3. 验证安装
创建一个简单的测试文件验证安装:
package main
import (
"fmt"
"github.com/rulego/streamsql"
)
func main() {
ssql := streamsql.New()
fmt.Println("StreamSQL 安装成功!")
ssql.Stop()
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# 核心概念速览
在开始编写代码之前,了解几个核心概念:
流(Stream):连续的数据序列,类似于数据库中的表
窗口(Window):将无界流分割成有界数据集的机制
聚合(Aggregation):对窗口内数据进行统计计算
Sink:处理查询结果的回调函数
# 第一个StreamSQL程序
# 1. 基础示例 - 简单数据过滤
这个示例展示如何过滤实时数据流:
创建 main.go
文件:
package main
import (
"fmt"
"time"
"github.com/rulego/streamsql"
)
func main() {
// 1. 创建StreamSQL实例
ssql := streamsql.New()
defer ssql.Stop()
// 2. 定义SQL查询 - 过滤温度大于25度的数据
sql := "SELECT deviceId, temperature FROM stream WHERE temperature > 25"
// 3. 执行SQL查询
err := ssql.Execute(sql)
if err != nil {
panic(err)
}
// 4. 添加结果处理函数
ssql.AddSink(func(result interface{}) {
fmt.Printf("高温告警: %v\n", result)
})
// 5. 发送测试数据
testData := []map[string]interface{}{
{"deviceId": "sensor001", "temperature": 23.5}, // 不会触发告警
{"deviceId": "sensor002", "temperature": 28.3}, // 会触发告警
{"deviceId": "sensor003", "temperature": 31.2}, // 会触发告警
}
for _, data := range testData {
ssql.AddData(data)
time.Sleep(100 * time.Millisecond)
}
// 等待处理完成
time.Sleep(1 * time.Second)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
运行程序:
go run main.go
1
期望输出:
高温告警: [map[deviceId:sensor002 temperature:28.3]]
高温告警: [map[deviceId:sensor003 temperature:31.2]]
1
2
2
代码解析:
streamsql.New()
- 创建StreamSQL实例Execute(sql)
- 解析并执行SQL查询AddSink()
- 注册结果处理函数AddData()
- 向流中添加数据WHERE temperature > 25
- 过滤条件,只处理温度大于25度的数据
重要提示: 对于聚合查询(使用窗口函数),需要等待窗口时间到达或手动调用 ssql.Stream().Window.Trigger()
来触发窗口计算。
# 2. 聚合分析示例 - 计算平均温度
package main
import (
"fmt"
"math/rand"
"time"
"github.com/rulego/streamsql"
)
func main() {
ssql := streamsql.New()
defer ssql.Stop()
// 每5秒计算一次各设备的平均温度
sql := `SELECT deviceId,
AVG(temperature) as avg_temp,
COUNT(*) as sample_count,
window_start() as window_start,
window_end() as window_end
FROM stream
GROUP BY deviceId, TumblingWindow('5s')`
err := ssql.Execute(sql)
if err != nil {
panic(err)
}
// 处理聚合结果
ssql.AddSink(func(result interface{}) {
fmt.Printf("聚合结果: %v\n", result)
})
// 模拟传感器数据流
devices := []string{"sensor001", "sensor002", "sensor003"}
for i := 0; i < 8; i++ {
for _, device := range devices {
data := map[string]interface{}{
"deviceId": device,
"temperature": 20.0 + rand.Float64()*15, // 20-35度随机温度
"timestamp": time.Now(),
}
ssql.AddData(data)
}
time.Sleep(300 * time.Millisecond)
}
// 等待窗口触发
time.Sleep(5 * time.Second)
ssql.Stream().Window.Trigger() // 手动触发窗口
time.Sleep(500 * time.Millisecond)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# 进阶示例
# 3. 滑动窗口分析
package main
import (
"fmt"
"math/rand"
"time"
"github.com/rulego/streamsql"
)
func main() {
ssql := streamsql.New()
defer ssql.Stop()
// 30秒滑动窗口,每10秒滑动一次
sql := `SELECT deviceId,
AVG(temperature) as avg_temp,
MAX(temperature) as max_temp,
MIN(temperature) as min_temp
FROM stream
WHERE temperature > 0
GROUP BY deviceId, SlidingWindow('30s', '10s')`
err := ssql.Execute(sql)
if err != nil {
panic(err)
}
ssql.AddSink(func(result interface{}) {
fmt.Printf("滑动窗口分析: %v\n", result)
})
// 持续发送数据
for i := 0; i < 10; i++ {
data := map[string]interface{}{
"deviceId": "sensor001",
"temperature": 20.0 + rand.Float64()*10,
"timestamp": time.Now(),
}
ssql.AddData(data)
time.Sleep(800 * time.Millisecond)
}
time.Sleep(1 * time.Second)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 4. 嵌套字段访问示例
package main
import (
"fmt"
"time"
"github.com/rulego/streamsql"
)
func main() {
ssql := streamsql.New()
defer ssql.Stop()
// 访问嵌套字段的SQL查询
sql := `SELECT device.info.name as device_name,
device.location.building as building,
sensor.temperature as temp,
UPPER(device.info.type) as device_type
FROM stream
WHERE sensor.temperature > 25 AND device.info.status = 'active'`
err := ssql.Execute(sql)
if err != nil {
panic(err)
}
ssql.AddSink(func(result interface{}) {
fmt.Printf("嵌套字段结果: %v\n", result)
})
// 发送嵌套结构数据
complexData := map[string]interface{}{
"device": map[string]interface{}{
"info": map[string]interface{}{
"name": "温度传感器001",
"type": "temperature",
"status": "active",
},
"location": map[string]interface{}{
"building": "A栋",
"floor": "3F",
},
},
"sensor": map[string]interface{}{
"temperature": 28.5,
"humidity": 65.0,
},
}
ssql.AddData(complexData)
time.Sleep(500 * time.Millisecond)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# 5. 自定义函数示例
StreamSQL支持注册和使用自定义函数:
package main
import (
"fmt"
"math"
"time"
"github.com/rulego/streamsql"
"github.com/rulego/streamsql/functions"
"github.com/rulego/streamsql/utils/cast"
)
func main() {
// 注册自定义函数
registerCustomFunctions()
ssql := streamsql.New()
defer ssql.Stop()
// 使用自定义函数的SQL查询
sql := `SELECT
device,
square(value) as squared_value,
f_to_c(temperature) as celsius,
circle_area(radius) as area
FROM stream
WHERE value > 0`
err := ssql.Execute(sql)
if err != nil {
panic(err)
}
ssql.AddSink(func(result interface{}) {
fmt.Printf("自定义函数结果: %v\n", result)
})
// 添加测试数据
testData := []map[string]interface{}{
{
"device": "sensor1",
"value": 5.0,
"temperature": 68.0, // 华氏度
"radius": 3.0,
},
{
"device": "sensor2",
"value": 10.0,
"temperature": 86.0, // 华氏度
"radius": 2.5,
},
}
for _, data := range testData {
ssql.AddData(data)
time.Sleep(200 * time.Millisecond)
}
time.Sleep(500 * time.Millisecond)
}
// 注册自定义函数
func registerCustomFunctions() {
// 数学函数:平方
functions.RegisterCustomFunction(
"square",
functions.TypeMath,
"数学函数",
"计算平方",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
val := cast.ToFloat64(args[0])
return val * val, nil
},
)
// 华氏度转摄氏度函数
functions.RegisterCustomFunction(
"f_to_c",
functions.TypeConversion,
"温度转换",
"华氏度转摄氏度",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
fahrenheit := cast.ToFloat64(args[0])
celsius := (fahrenheit - 32) * 5 / 9
return celsius, nil
},
)
// 圆面积计算函数
functions.RegisterCustomFunction(
"circle_area",
functions.TypeMath,
"几何计算",
"计算圆的面积",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
radius := cast.ToFloat64(args[0])
if radius < 0 {
return nil, fmt.Errorf("半径必须为正数")
}
area := math.Pi * radius * radius
return area, nil
},
)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# 6. 性能模式示例
StreamSQL提供多种性能模式以适应不同场景:
package main
import (
"fmt"
"time"
"github.com/rulego/streamsql"
)
func main() {
// 高性能模式 - 适合高吞吐量场景
ssqlHighPerf := streamsql.New(streamsql.WithHighPerformance())
defer ssqlHighPerf.Stop()
// 低延迟模式 - 适合实时响应场景
ssqlLowLatency := streamsql.New(streamsql.WithLowLatency())
defer ssqlLowLatency.Stop()
// 零数据丢失模式 - 适合关键数据场景
ssqlZeroLoss := streamsql.New(streamsql.WithZeroDataLoss())
defer ssqlZeroLoss.Stop()
sql := "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('5s')"
// 为每个实例执行相同的SQL
ssqlHighPerf.Execute(sql)
ssqlLowLatency.Execute(sql)
ssqlZeroLoss.Execute(sql)
fmt.Println("不同性能模式已启动")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 性能提示
选择合适的窗口大小:过小的窗口会增加计算开销,过大的窗口会增加内存使用
合理使用过滤条件:在WHERE子句中尽早过滤数据可以提高性能
避免复杂的嵌套查询:StreamSQL针对简单高效的查询进行了优化
监控内存使用:在高频数据场景下注意监控内存使用情况
# 常见问题
# Q: 数据没有输出结果?
A: 检查以下几点:
- 确保调用了
AddSink()
添加结果处理函数 - 如果使用窗口函数,确保窗口已触发(时间已到或手动触发)
- 检查WHERE条件是否过滤了所有数据
# Q: 窗口函数何时触发?
A:
滚动窗口:到达窗口结束时间时自动触发
滑动窗口:每个滑动间隔触发一次
计数窗口:累积到指定数量时触发
会话窗口:会话超时后触发
# Q: 如何处理异常数据?
A: 使用WHERE子句过滤异常数据:
SELECT * FROM stream
WHERE temperature IS NOT NULL
AND temperature BETWEEN -50 AND 100
1
2
3
2
3
# 与RuleGo集成示例
# 完整的数据处理流水线
// 1. 使用RuleGo接收MQTT数据
mqttEndpoint := mqtt.NewEndpoint()
// 2. 在RuleGo规则链中集成StreamSQL处理
ruleChain := `{
"ruleChain": {
"nodes": [
{
"id": "s1",
"type": "streamSqlNode",
"configuration": {
"sql": "SELECT device_id, AVG(temperature) as avg_temp FROM stream GROUP BY device_id, TumblingWindow('5m')"
}
},
{
"id": "s2",
"type": "restApiCallNode",
"configuration": {
"restEndpointUrlPattern": "http://alert-service/api/alerts",
"requestMethod": "POST"
}
}
],
"connections": [
{
"fromId": "s1",
"toId": "s2",
"type": "Success"
}
]
}
}`
// 3. 启动规则引擎
engine := rulego.New([]byte(ruleChain))
// 4. 处理数据
var msg = rulego.NewMsg(0, "deviceData", types.JSON,nil,`{"device_id": "dev1", "temperature": 25.5}`)
engine.OnMsg(msg)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 优势
- 数据接入:利用RuleGo的MQTT、HTTP、WebSocket等组件接入数据
- 流处理:使用StreamSQL进行实时数据分析和计算
- 结果输出:通过RuleGo的输出组件发送到数据库、API、消息队列等
- 规则管理:动态配置和热更新流处理规则
- 监控告警:集成RuleGo的监控和告警机制
# 完整示例代码
所有示例代码都可以在项目的 examples/
目录中找到:
examples/simple-custom-functions/
(opens new window) - 基础用法示例examples/custom-functions-demo/
(opens new window) - 完整功能演示examples/function-integration-demo/
(opens new window) - 集成使用案例examples/advanced-functions/
(opens new window) - 高级特性展示examples/comprehensive-test/
(opens new window) - 本教材综合测试演示
# 快速验证安装
如果您想快速验证StreamSQL的各种功能,推荐运行综合测试示例:
cd examples/comprehensive-test
go run main.go
1
2
2
这个示例包含了本文档中提到的所有功能特性,是验证安装和学习使用的最佳起点。
在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/07/27, 12:06:54