RuleGo RuleGo
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • 概述
  • 快速开始
    • 环境要求
    • 安装
      • 1. 创建新项目
      • 2. 添加依赖
      • 3. 验证安装
    • 核心概念速览
    • 第一个StreamSQL程序
      • 1. 基础示例 - 简单数据过滤
      • 2. 聚合分析示例 - 计算平均温度
    • 进阶示例
      • 3. 滑动窗口分析
      • 4. 嵌套字段访问示例
      • 5. 自定义函数示例
      • 6. 性能模式示例
    • 性能提示
    • 常见问题
      • Q: 数据没有输出结果?
      • Q: 窗口函数何时触发?
      • Q: 如何处理异常数据?
    • 与RuleGo集成示例
      • 完整的数据处理流水线
      • 优势
    • 完整示例代码
      • 快速验证安装
  • 核心概念
  • SQL参考
  • API参考
  • RuleGo集成
  • 函数

  • 案例集锦

目录

快速开始

# 快速开始

本指南将在5分钟内带您体验StreamSQL的基本功能,从安装到运行第一个流处理程序。

# 环境要求

  • Go 1.18 或更高版本

  • 基本的Go语言开发经验

  • 了解SQL基础语法(可选,但有助于理解)

# 安装

# 1. 创建新项目

mkdir my-streamsql-app
cd my-streamsql-app
go mod init my-streamsql-app
1
2
3

# 2. 添加依赖

go get github.com/rulego/streamsql
1

# 3. 验证安装

创建一个简单的测试文件验证安装:

package main

import (
    "fmt"
    "github.com/rulego/streamsql"
)

func main() {
    ssql := streamsql.New()
    fmt.Println("StreamSQL 安装成功!")
    ssql.Stop()
}
1
2
3
4
5
6
7
8
9
10
11
12

# 核心概念速览

在开始编写代码之前,了解几个核心概念:

  • 流(Stream):连续的数据序列,类似于数据库中的表

  • 窗口(Window):将无界流分割成有界数据集的机制

  • 聚合(Aggregation):对窗口内数据进行统计计算

  • Sink:处理查询结果的回调函数

# 第一个StreamSQL程序

# 1. 基础示例 - 简单数据过滤

这个示例展示如何过滤实时数据流:

创建 main.go 文件:

package main

import (
    "fmt"
    "time"
    "github.com/rulego/streamsql"
)

func main() {
    // 1. 创建StreamSQL实例
    ssql := streamsql.New()
    defer ssql.Stop()
    
    // 2. 定义SQL查询 - 过滤温度大于25度的数据
    sql := "SELECT deviceId, temperature FROM stream WHERE temperature > 25"
    
    // 3. 执行SQL查询
    err := ssql.Execute(sql)
    if err != nil {
        panic(err)
    }
    
    // 4. 添加结果处理函数
    ssql.AddSink(func(result interface{}) {
        fmt.Printf("高温告警: %v\n", result)
    })
    
    // 5. 发送测试数据
    testData := []map[string]interface{}{
        {"deviceId": "sensor001", "temperature": 23.5}, // 不会触发告警
        {"deviceId": "sensor002", "temperature": 28.3}, // 会触发告警
        {"deviceId": "sensor003", "temperature": 31.2}, // 会触发告警
    }
    
    for _, data := range testData {
        ssql.AddData(data)
        time.Sleep(100 * time.Millisecond)
    }
    
    // 等待处理完成
    time.Sleep(1 * time.Second)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

运行程序:

go run main.go
1

期望输出:

高温告警: [map[deviceId:sensor002 temperature:28.3]]
高温告警: [map[deviceId:sensor003 temperature:31.2]]
1
2

代码解析:

  1. streamsql.New() - 创建StreamSQL实例
  2. Execute(sql) - 解析并执行SQL查询
  3. AddSink() - 注册结果处理函数
  4. AddData() - 向流中添加数据
  5. WHERE temperature > 25 - 过滤条件,只处理温度大于25度的数据

重要提示: 对于聚合查询(使用窗口函数),需要等待窗口时间到达或手动调用 ssql.Stream().Window.Trigger() 来触发窗口计算。

# 2. 聚合分析示例 - 计算平均温度

package main

import (
    "fmt"
    "math/rand"
    "time"
    "github.com/rulego/streamsql"
)

func main() {
    ssql := streamsql.New()
    defer ssql.Stop()
    
    // 每5秒计算一次各设备的平均温度
    sql := `SELECT deviceId, 
                   AVG(temperature) as avg_temp,
                   COUNT(*) as sample_count,
                   window_start() as window_start,
                   window_end() as window_end
            FROM stream 
            GROUP BY deviceId, TumblingWindow('5s')`
    
    err := ssql.Execute(sql)
    if err != nil {
        panic(err)
    }
    
    // 处理聚合结果
    ssql.AddSink(func(result interface{}) {
        fmt.Printf("聚合结果: %v\n", result)
    })
    
    // 模拟传感器数据流
    devices := []string{"sensor001", "sensor002", "sensor003"}
    for i := 0; i < 8; i++ {
        for _, device := range devices {
            data := map[string]interface{}{
                "deviceId":    device,
                "temperature": 20.0 + rand.Float64()*15, // 20-35度随机温度
                "timestamp":   time.Now(),
            }
            ssql.AddData(data)
        }
        time.Sleep(300 * time.Millisecond)
    }
    
    // 等待窗口触发
    time.Sleep(5 * time.Second)
    ssql.Stream().Window.Trigger() // 手动触发窗口
    time.Sleep(500 * time.Millisecond)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

# 进阶示例

# 3. 滑动窗口分析

package main

import (
    "fmt"
    "math/rand"
    "time"
    "github.com/rulego/streamsql"
)

func main() {
    ssql := streamsql.New()
    defer ssql.Stop()
    
    // 30秒滑动窗口,每10秒滑动一次
    sql := `SELECT deviceId,
                   AVG(temperature) as avg_temp,
                   MAX(temperature) as max_temp,
                   MIN(temperature) as min_temp
            FROM stream 
            WHERE temperature > 0
            GROUP BY deviceId, SlidingWindow('30s', '10s')`
    
    err := ssql.Execute(sql)
    if err != nil {
        panic(err)
    }
    
    ssql.AddSink(func(result interface{}) {
        fmt.Printf("滑动窗口分析: %v\n", result)
    })
    
    // 持续发送数据
    for i := 0; i < 10; i++ {
        data := map[string]interface{}{
            "deviceId":    "sensor001",
            "temperature": 20.0 + rand.Float64()*10,
            "timestamp":   time.Now(),
        }
        ssql.AddData(data)
        time.Sleep(800 * time.Millisecond)
    }
    
    time.Sleep(1 * time.Second)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

# 4. 嵌套字段访问示例

package main

import (
    "fmt"
    "time"
    "github.com/rulego/streamsql"
)

func main() {
    ssql := streamsql.New()
    defer ssql.Stop()
    
    // 访问嵌套字段的SQL查询
    sql := `SELECT device.info.name as device_name,
                   device.location.building as building,
                   sensor.temperature as temp,
                   UPPER(device.info.type) as device_type
            FROM stream 
            WHERE sensor.temperature > 25 AND device.info.status = 'active'`
    
    err := ssql.Execute(sql)
    if err != nil {
        panic(err)
    }
    
    ssql.AddSink(func(result interface{}) {
        fmt.Printf("嵌套字段结果: %v\n", result)
    })
    
    // 发送嵌套结构数据
    complexData := map[string]interface{}{
        "device": map[string]interface{}{
            "info": map[string]interface{}{
                "name":   "温度传感器001",
                "type":   "temperature",
                "status": "active",
            },
            "location": map[string]interface{}{
                "building": "A栋",
                "floor":    "3F",
            },
        },
        "sensor": map[string]interface{}{
            "temperature": 28.5,
            "humidity":    65.0,
        },
    }
    
    ssql.AddData(complexData)
    time.Sleep(500 * time.Millisecond)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

# 5. 自定义函数示例

StreamSQL支持注册和使用自定义函数:

package main

import (
    "fmt"
    "math"
    "time"
    "github.com/rulego/streamsql"
    "github.com/rulego/streamsql/functions"
    "github.com/rulego/streamsql/utils/cast"
)

func main() {
    // 注册自定义函数
    registerCustomFunctions()
    
    ssql := streamsql.New()
    defer ssql.Stop()
    
    // 使用自定义函数的SQL查询
    sql := `SELECT 
                device,
                square(value) as squared_value,
                f_to_c(temperature) as celsius,
                circle_area(radius) as area
            FROM stream
            WHERE value > 0`
    
    err := ssql.Execute(sql)
    if err != nil {
        panic(err)
    }
    
    ssql.AddSink(func(result interface{}) {
        fmt.Printf("自定义函数结果: %v\n", result)
    })
    
    // 添加测试数据
    testData := []map[string]interface{}{
        {
            "device":      "sensor1",
            "value":       5.0,
            "temperature": 68.0, // 华氏度
            "radius":      3.0,
        },
        {
            "device":      "sensor2",
            "value":       10.0,
            "temperature": 86.0, // 华氏度
            "radius":      2.5,
        },
    }
    
    for _, data := range testData {
        ssql.AddData(data)
        time.Sleep(200 * time.Millisecond)
    }
    
    time.Sleep(500 * time.Millisecond)
}

// 注册自定义函数
func registerCustomFunctions() {
    // 数学函数:平方
    functions.RegisterCustomFunction(
        "square",
        functions.TypeMath,
        "数学函数",
        "计算平方",
        1, 1,
        func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
            val := cast.ToFloat64(args[0])
            return val * val, nil
        },
    )
    
    // 华氏度转摄氏度函数
    functions.RegisterCustomFunction(
        "f_to_c",
        functions.TypeConversion,
        "温度转换",
        "华氏度转摄氏度",
        1, 1,
        func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
            fahrenheit := cast.ToFloat64(args[0])
            celsius := (fahrenheit - 32) * 5 / 9
            return celsius, nil
        },
    )
    
    // 圆面积计算函数
    functions.RegisterCustomFunction(
        "circle_area",
        functions.TypeMath,
        "几何计算",
        "计算圆的面积",
        1, 1,
        func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
            radius := cast.ToFloat64(args[0])
            if radius < 0 {
                return nil, fmt.Errorf("半径必须为正数")
            }
            area := math.Pi * radius * radius
            return area, nil
        },
    )
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106

# 6. 性能模式示例

StreamSQL提供多种性能模式以适应不同场景:

package main

import (
    "fmt"
    "time"
    "github.com/rulego/streamsql"
)

func main() {
    // 高性能模式 - 适合高吞吐量场景
    ssqlHighPerf := streamsql.New(streamsql.WithHighPerformance())
    defer ssqlHighPerf.Stop()
    
    // 低延迟模式 - 适合实时响应场景
    ssqlLowLatency := streamsql.New(streamsql.WithLowLatency())
    defer ssqlLowLatency.Stop()
    
    // 零数据丢失模式 - 适合关键数据场景
    ssqlZeroLoss := streamsql.New(streamsql.WithZeroDataLoss())
    defer ssqlZeroLoss.Stop()
    
    sql := "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('5s')"
    
    // 为每个实例执行相同的SQL
    ssqlHighPerf.Execute(sql)
    ssqlLowLatency.Execute(sql)
    ssqlZeroLoss.Execute(sql)
    
    fmt.Println("不同性能模式已启动")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 性能提示

  • 选择合适的窗口大小:过小的窗口会增加计算开销,过大的窗口会增加内存使用

  • 合理使用过滤条件:在WHERE子句中尽早过滤数据可以提高性能

  • 避免复杂的嵌套查询:StreamSQL针对简单高效的查询进行了优化

  • 监控内存使用:在高频数据场景下注意监控内存使用情况

# 常见问题

# Q: 数据没有输出结果?

A: 检查以下几点:

  1. 确保调用了 AddSink() 添加结果处理函数
  2. 如果使用窗口函数,确保窗口已触发(时间已到或手动触发)
  3. 检查WHERE条件是否过滤了所有数据

# Q: 窗口函数何时触发?

A:

  • 滚动窗口:到达窗口结束时间时自动触发

  • 滑动窗口:每个滑动间隔触发一次

  • 计数窗口:累积到指定数量时触发

  • 会话窗口:会话超时后触发

# Q: 如何处理异常数据?

A: 使用WHERE子句过滤异常数据:

SELECT * FROM stream 
WHERE temperature IS NOT NULL 
  AND temperature BETWEEN -50 AND 100
1
2
3

# 与RuleGo集成示例

# 完整的数据处理流水线

// 1. 使用RuleGo接收MQTT数据
mqttEndpoint := mqtt.NewEndpoint()

// 2. 在RuleGo规则链中集成StreamSQL处理
ruleChain := `{
  "ruleChain": {
    "nodes": [
      {
        "id": "s1",
        "type": "streamSqlNode",
        "configuration": {
          "sql": "SELECT device_id, AVG(temperature) as avg_temp FROM stream GROUP BY device_id, TumblingWindow('5m')"
        }
      },
      {
        "id": "s2",
        "type": "restApiCallNode",
        "configuration": {
          "restEndpointUrlPattern": "http://alert-service/api/alerts",
          "requestMethod": "POST"
        }
      }
    ],
    "connections": [
      {
        "fromId": "s1",
        "toId": "s2",
        "type": "Success"
      }
    ]
  }
}`

// 3. 启动规则引擎
engine := rulego.New([]byte(ruleChain))
// 4. 处理数据
var msg = rulego.NewMsg(0, "deviceData", types.JSON,nil,`{"device_id": "dev1", "temperature": 25.5}`)
engine.OnMsg(msg)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

# 优势

  • 数据接入:利用RuleGo的MQTT、HTTP、WebSocket等组件接入数据
  • 流处理:使用StreamSQL进行实时数据分析和计算
  • 结果输出:通过RuleGo的输出组件发送到数据库、API、消息队列等
  • 规则管理:动态配置和热更新流处理规则
  • 监控告警:集成RuleGo的监控和告警机制

# 完整示例代码

所有示例代码都可以在项目的 examples/ 目录中找到:

  • examples/simple-custom-functions/ (opens new window) - 基础用法示例
  • examples/custom-functions-demo/ (opens new window) - 完整功能演示
  • examples/function-integration-demo/ (opens new window) - 集成使用案例
  • examples/advanced-functions/ (opens new window) - 高级特性展示
  • examples/comprehensive-test/ (opens new window) - 本教材综合测试演示

# 快速验证安装

如果您想快速验证StreamSQL的各种功能,推荐运行综合测试示例:

cd examples/comprehensive-test
go run main.go
1
2

这个示例包含了本文档中提到的所有功能特性,是验证安装和学习使用的最佳起点。

在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/07/27, 12:06:54
概述
核心概念

← 概述 核心概念→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式