RuleGo RuleGo
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • 概述
  • 快速开始
  • 核心概念
  • SQL参考
  • API参考
    • 核心API
      • Streamsql 主类
      • 构造函数
      • Execute
      • Emit
      • EmitSync
      • IsAggregationQuery
      • Stream
      • GetStats
      • GetDetailedStats
      • Stop
      • AddSink
      • Print
      • ToChannel
    • 配置选项
      • 性能配置
      • WithHighPerformance
      • WithLowLatency
      • WithZeroDataLoss
      • WithCustomPerformance
      • 日志配置
      • WithLogLevel
      • WithDiscardLog
      • 持久化配置
      • WithPersistence
      • WithZeroDataLossConfig
      • WithPersistencePerformanceConfig
      • WithCustomPersistence
      • 缓冲区配置
      • WithBufferSizes
      • 溢出策略配置
      • WithOverflowStrategy
      • 工作池配置
      • WithWorkerConfig
      • 监控配置
      • WithMonitoring
    • 流处理API
      • Stream 类
      • AddSink
      • GetWindow
      • Stop
    • 窗口API
      • 窗口类型
      • TumblingWindow
      • SlidingWindow
      • CountingWindow
      • SessionWindow
      • 窗口接口
      • Window 接口
      • Add
      • Trigger
      • GetType
      • Stop
    • 函数系统API
      • 函数注册
      • RegisterCustomFunction
      • Register
      • Unregister
      • Get
      • GetByType
      • ListAll
      • Execute
      • 函数类型
      • 函数处理器
      • FunctionContext
      • WindowInfo
      • 工具函数
      • ConvertToFloat64
      • ConvertToInt
      • ConvertToString
      • ConvertToTime
      • ConvertToFloat64Array
    • 聚合器API
      • 聚合类型
      • Aggregator 接口
    • 表达式API
      • Expression 接口
      • NewExpression
    • 日志API
      • Logger 接口
      • 日志级别
      • 创建日志器
      • New
      • NewDiscard
    • 类型定义
      • Config
      • WindowConfig
      • FieldExpression
      • Projection
      • PerformanceConfig
      • BufferConfig
      • OverflowConfig
      • PersistenceConfig
      • ExpansionConfig
      • WorkerConfig
      • MonitoringConfig
      • WarningThresholds
    • 配置预设函数
      • DefaultPerformanceConfig
      • HighPerformanceConfig
      • LowLatencyConfig
      • ZeroDataLossConfig
      • PersistencePerformanceConfig
      • WindowResult
    • 错误类型
      • 常见错误
    • 使用示例
      • 完整示例
    • 最佳实践
      • 错误处理
      • 资源管理
      • 并发安全
  • RuleGo集成
  • 函数

  • 案例集锦

目录

API参考

# API参考

本章提供了StreamSQL的完整API参考文档,包括核心接口、配置选项、函数库等详细信息。

# 核心API

# Streamsql 主类

# 构造函数

func New(options ...Option) *Streamsql
1

创建新的StreamSQL实例。

参数:

  • options - 可选配置项

返回值:

  • *Streamsql - StreamSQL实例

示例:

// 默认配置
ssql := streamsql.New()

// 高性能配置
ssql := streamsql.New(streamsql.WithHighPerformance())

// 零数据丢失配置
ssql := streamsql.New(streamsql.WithZeroDataLoss())

// 自定义配置
ssql := streamsql.New(
    streamsql.WithLogLevel(logger.DEBUG),
    streamsql.WithDiscardLog(),
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# Execute

func (s *Streamsql) Execute(sql string) error
1

执行SQL查询,启动流处理。

参数:

  • sql - SQL查询语句

返回值:

  • error - 执行错误,成功时为nil

示例:

sql := "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('5m')"
err := ssql.Execute(sql)
if err != nil {
    log.Fatal(err)
}
1
2
3
4
5

# Emit

func (s *Streamsql) Emit(data interface{})
1

向数据流异步添加数据。

参数:

  • data - 数据记录,通常为map[string]interface{}

示例:

data := map[string]interface{}{
    "deviceId": "sensor001",
    "temperature": 25.5,
    "timestamp": time.Now(),
}
ssql.Emit(data)
1
2
3
4
5
6

# EmitSync

func (s *Streamsql) EmitSync(data interface{}) (interface{}, error)
1

同步处理数据并立即返回结果,仅支持非聚合查询。

参数:

  • data - 数据记录,通常为map[string]interface{}

返回值:

  • interface{} - 处理结果
  • error - 处理错误

示例:

data := map[string]interface{}{
    "deviceId": "sensor001",
    "temperature": 25.5,
    "timestamp": time.Now(),
}
result, err := ssql.EmitSync(data)
if err != nil {
    log.Printf("处理错误: %v", err)
} else {
    fmt.Printf("处理结果: %v", result)
}
1
2
3
4
5
6
7
8
9
10
11

# IsAggregationQuery

func (s *Streamsql) IsAggregationQuery() bool
1

检查当前查询是否为聚合查询。

返回值:

  • bool - 是否为聚合查询

示例:

if ssql.IsAggregationQuery() {
    fmt.Println("当前查询包含聚合操作")
} else {
    fmt.Println("当前查询为简单查询")
}
1
2
3
4
5

# Stream

func (s *Streamsql) Stream() *stream.Stream
1

获取底层流处理实例。

返回值:

  • *stream.Stream - 流处理实例

示例:

stream := ssql.Stream()
stream.AddSink(func(result interface{}) {
    fmt.Printf("结果: %v\n", result)
})
1
2
3
4

# GetStats

func (s *Streamsql) GetStats() map[string]int64
1

获取流处理统计信息。

返回值:

  • map[string]int64 - 统计信息映射

示例:

stats := ssql.GetStats()
fmt.Printf("处理数据量: %d\n", stats["processed_count"])
1
2

# GetDetailedStats

func (s *Streamsql) GetDetailedStats() map[string]interface{}
1

获取详细的性能统计信息。

返回值:

  • map[string]interface{} - 详细统计信息

# Stop

func (s *Streamsql) Stop()
1

停止流处理并清理资源。

示例:

defer ssql.Stop()
1

# AddSink

func (s *Streamsql) AddSink(sink func(interface{}))
1

添加结果处理回调函数。

参数:

  • sink - 结果处理回调函数

示例:

ssql.AddSink(func(result interface{}) {
    fmt.Printf("处理结果: %v\n", result)
})
1
2
3

# Print

func (s *Streamsql) Print()
1

便捷方法,自动添加一个打印结果到控制台的sink函数。

示例:

ssql.Print() // 等价于 ssql.AddSink(func(result interface{}) { fmt.Println(result) })
1

# ToChannel

func (s *Streamsql) ToChannel() <-chan interface{}
1

返回结果通道,用于异步获取处理结果。

返回值:

  • <-chan interface{} - 只读结果通道

示例:

resultChan := ssql.ToChannel()
go func() {
    for result := range resultChan {
        fmt.Printf("通道结果: %v\n", result)
    }
}()
1
2
3
4
5
6

# 配置选项

# 性能配置

# WithHighPerformance

func WithHighPerformance() Option
1

使用高性能配置,适用于需要最大吞吐量的场景。

示例:

ssql := streamsql.New(streamsql.WithHighPerformance())
1

# WithLowLatency

func WithLowLatency() Option
1

使用低延迟配置,适用于实时交互应用。

示例:

ssql := streamsql.New(streamsql.WithLowLatency())
1

# WithZeroDataLoss

func WithZeroDataLoss() Option
1

使用零数据丢失配置,适用于关键业务数据。

示例:

ssql := streamsql.New(streamsql.WithZeroDataLoss())
1

# WithCustomPerformance

func WithCustomPerformance(config types.PerformanceConfig) Option
1

使用自定义性能配置。

参数:

  • config - 自定义性能配置

示例:

config := types.DefaultPerformanceConfig()
config.BufferConfig.DataChannelSize = 2000
ssql := streamsql.New(streamsql.WithCustomPerformance(config))
1
2
3

# 日志配置

# WithLogLevel

func WithLogLevel(level logger.Level) Option
1

设置日志级别。

参数:

  • level - 日志级别(DEBUG, INFO, WARN, ERROR, OFF)

示例:

ssql := streamsql.New(streamsql.WithLogLevel(logger.DEBUG))
1

# WithDiscardLog

func WithDiscardLog() Option
1

禁用日志输出(生产环境推荐)。

示例:

ssql := streamsql.New(streamsql.WithDiscardLog())
1

# 持久化配置

# WithPersistence

func WithPersistence() Option
1

使用持久化配置预设。

示例:

ssql := streamsql.New(streamsql.WithPersistence())
1

# WithZeroDataLossConfig

func WithZeroDataLossConfig() Option
1

使用零数据丢失配置预设,采用阻塞策略确保数据不丢失。

示例:

ssql := streamsql.New(streamsql.WithZeroDataLossConfig())
1

# WithPersistencePerformanceConfig

func WithPersistencePerformanceConfig() Option
1

使用持久化性能配置预设,当缓冲区溢出时将数据持久化到磁盘。

示例:

ssql := streamsql.New(streamsql.WithPersistencePerformanceConfig())
1

# WithCustomPersistence

func WithCustomPersistence(dataDir string, maxFileSize int64, flushInterval time.Duration) Option
1

使用自定义持久化配置。

参数:

  • dataDir - 数据目录
  • maxFileSize - 最大文件大小
  • flushInterval - 刷新间隔

示例:

ssql := streamsql.New(streamsql.WithCustomPersistence("/data", 100*1024*1024, 5*time.Second))
1

# 缓冲区配置

# WithBufferSizes

func WithBufferSizes(dataChannelSize, resultChannelSize, windowOutputSize int) Option
1

设置自定义缓冲区大小。

参数:

  • dataChannelSize - 数据通道大小
  • resultChannelSize - 结果通道大小
  • windowOutputSize - 窗口输出大小

示例:

ssql := streamsql.New(streamsql.WithBufferSizes(2000, 1000, 500))
1

# 溢出策略配置

# WithOverflowStrategy

func WithOverflowStrategy(strategy string, blockTimeout time.Duration) Option
1

设置溢出策略。

参数:

  • strategy - 溢出策略("drop", "block", "persist")
  • blockTimeout - 阻塞超时时间

示例:

ssql := streamsql.New(streamsql.WithOverflowStrategy("drop", 5*time.Second))
1

# 工作池配置

# WithWorkerConfig

func WithWorkerConfig(sinkPoolSize, sinkWorkerCount, maxRetryRoutines int) Option
1

设置工作池配置。

参数:

  • sinkPoolSize - 结果处理池大小
  • sinkWorkerCount - 工作线程数
  • maxRetryRoutines - 最大重试协程数

示例:

ssql := streamsql.New(streamsql.WithWorkerConfig(100, 10, 5))
1

# 监控配置

# WithMonitoring

func WithMonitoring(updateInterval time.Duration, enableDetailedStats bool) Option
1

启用详细监控。

参数:

  • updateInterval - 统计更新间隔
  • enableDetailedStats - 是否启用详细统计

示例:

ssql := streamsql.New(streamsql.WithMonitoring(10*time.Second, true))
1

# 流处理API

# Stream 类

# AddSink

func (s *Stream) AddSink(sink func(interface{}))
1

添加结果处理函数。

参数:

  • sink - 结果处理回调函数

示例:

ssql.Stream().AddSink(func(result interface{}) {
    // 处理结果
    fmt.Printf("处理结果: %v\n", result)
})
1
2
3
4

# GetWindow

func (s *Stream) GetWindow() window.Window
1

获取窗口实例。

返回值:

  • window.Window - 窗口接口

# Stop

func (s *Stream) Stop()
1

停止流处理。

# 窗口API

# 窗口类型

# TumblingWindow

func NewTumblingWindow(size time.Duration, timeUnit time.Duration, tsProp string) *TumblingWindow
1

创建滚动窗口。

参数:

  • size - 窗口大小
  • timeUnit - 时间单位
  • tsProp - 时间戳字段名

# SlidingWindow

func NewSlidingWindow(size, slide time.Duration, timeUnit time.Duration, tsProp string) *SlidingWindow
1

创建滑动窗口。

参数:

  • size - 窗口大小
  • slide - 滑动间隔
  • timeUnit - 时间单位
  • tsProp - 时间戳字段名

# CountingWindow

func NewCountingWindow(count int) *CountingWindow
1

创建计数窗口。

参数:

  • count - 触发计数

# SessionWindow

func NewSessionWindow(timeout time.Duration, groupByKey string) *SessionWindow
1

创建会话窗口。

参数:

  • timeout - 会话超时时间
  • groupByKey - 分组字段

# 窗口接口

# Window 接口

type Window interface {
    Add(data interface{})
    Trigger() []types.WindowResult
    GetType() WindowType
    Stop()
}
1
2
3
4
5
6

方法说明:

# Add
Add(data interface{})
1

向窗口添加数据。

# Trigger
Trigger() []types.WindowResult
1

手动触发窗口计算。

返回值:

  • []types.WindowResult - 窗口结果列表
# GetType
GetType() WindowType
1

获取窗口类型。

返回值:

  • WindowType - 窗口类型枚举
# Stop
Stop()
1

停止窗口处理。

# 函数系统API

# 函数注册

# RegisterCustomFunction

func RegisterCustomFunction(
    name string,
    funcType FunctionType,
    category string,
    description string,
    minArgs int,
    maxArgs int,
    handler FunctionHandler,
) error
1
2
3
4
5
6
7
8
9

注册自定义函数。

参数:

  • name - 函数名
  • funcType - 函数类型
  • category - 函数分类
  • description - 函数描述
  • minArgs - 最小参数数量
  • maxArgs - 最大参数数量
  • handler - 函数处理器

返回值:

  • error - 注册错误

示例:

err := functions.RegisterCustomFunction(
    "my_function",
    functions.TypeMath,
    "数学计算",
    "自定义数学函数",
    2, 2,
    func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
        // 函数实现
        return result, nil
    },
)
1
2
3
4
5
6
7
8
9
10
11

# Register

func Register(function Function) error
1

注册函数实例。

参数:

  • function - 函数实例

返回值:

  • error - 注册错误

# Unregister

func Unregister(name string)
1

注销函数。

参数:

  • name - 函数名

示例:

functions.Unregister("my_function")
1

# Get

func Get(name string) (Function, bool)
1

获取函数实例。

参数:

  • name - 函数名

返回值:

  • Function - 函数实例
  • bool - 是否存在

# GetByType

func GetByType(funcType FunctionType) []Function
1

根据类型获取函数列表。

参数:

  • funcType - 函数类型

返回值:

  • []Function - 函数实例列表

# ListAll

func ListAll() map[string]Function
1

列出所有已注册的函数。

返回值:

  • map[string]Function - 函数名到函数实例的映射

# Execute

func Execute(name string, args []interface{}) (interface{}, error)
1

执行指定名称的函数。

参数:

  • name - 函数名
  • args - 函数参数

返回值:

  • interface{} - 执行结果
  • error - 执行错误

# 函数类型

type FunctionType string

const (
    TypeMath        FunctionType = "math"
    TypeString      FunctionType = "string"
    TypeConversion  FunctionType = "conversion"
    TypeDateTime    FunctionType = "datetime"
    TypeAggregation FunctionType = "aggregation"
    TypeAnalytical  FunctionType = "analytical"
    TypeWindow      FunctionType = "window"
    TypeCustom      FunctionType = "custom"
)
1
2
3
4
5
6
7
8
9
10
11
12

# 函数处理器

type FunctionHandler func(ctx *FunctionContext, args []interface{}) (interface{}, error)
1

# FunctionContext

type FunctionContext struct {
    CurrentRow   map[string]interface{}
    WindowInfo   *WindowInfo
    CustomData   map[string]interface{}
}
1
2
3
4
5

字段说明:

  • CurrentRow - 当前处理的数据行
  • WindowInfo - 窗口信息
  • CustomData - 自定义数据

# WindowInfo

type WindowInfo struct {
    StartTime time.Time
    EndTime   time.Time
    Size      time.Duration
    Type      string
}
1
2
3
4
5
6

# 工具函数

# ConvertToFloat64

func ConvertToFloat64(value interface{}) (float64, error)
1

转换值为float64类型。

# ConvertToInt

func ConvertToInt(value interface{}) (int, error)
1

转换值为int类型。

# ConvertToString

func ConvertToString(value interface{}) (string, error)
1

转换值为string类型。

# ConvertToTime

func ConvertToTime(value interface{}) (time.Time, error)
1

转换值为time.Time类型。

# ConvertToFloat64Array

func ConvertToFloat64Array(value interface{}) ([]float64, error)
1

转换值为float64数组。

# 聚合器API

# 聚合类型

type AggregateType string

const (
    AggregateSum        AggregateType = "sum"
    AggregateAvg        AggregateType = "avg"
    AggregateMin        AggregateType = "min"
    AggregateMax        AggregateType = "max"
    AggregateCount      AggregateType = "count"
    AggregateStddev     AggregateType = "stddev"
    AggregateMedian     AggregateType = "median"
    AggregatePercentile AggregateType = "percentile"
    // ... 其他聚合类型
)
1
2
3
4
5
6
7
8
9
10
11
12
13

# Aggregator 接口

type Aggregator interface {
    Add(value interface{}) error
    GetResult() interface{}
    Reset()
    GetType() AggregateType
}
1
2
3
4
5
6

# 表达式API

# Expression 接口

type Expression interface {
    Evaluate(data map[string]interface{}) (interface{}, error)
    GetFields() []string
}
1
2
3
4

# NewExpression

func NewExpression(expr string) (Expression, error)
1

创建表达式实例。

参数:

  • expr - 表达式字符串

返回值:

  • Expression - 表达式实例
  • error - 创建错误

# 日志API

# Logger 接口

type Logger interface {
    Debug(format string, args ...interface{})
    Info(format string, args ...interface{})
    Warn(format string, args ...interface{})
    Error(format string, args ...interface{})
    SetLevel(level Level)
    SetOutput(writer io.Writer)
}
1
2
3
4
5
6
7
8

# 日志级别

type Level int

const (
    DEBUG Level = iota
    INFO
    WARN
    ERROR
    OFF
)
1
2
3
4
5
6
7
8
9

# 创建日志器

# New

func New(output io.Writer, level Level) Logger
1

创建新的日志器。

# NewDiscard

func NewDiscard() Logger
1

创建丢弃日志器。

# 类型定义

# Config

type Config struct {
    WindowConfig      WindowConfig
    GroupFields       []string
    SelectFields      map[string]aggregator.AggregateType
    FieldAlias        map[string]string
    SimpleFields      []string
    FieldExpressions  map[string]FieldExpression
    Where             string
    Having            string
    NeedWindow        bool
    Distinct          bool
    Limit             int
    Projections       []Projection
    PerformanceConfig PerformanceConfig
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# WindowConfig

type WindowConfig struct {
    Type       window.WindowType
    Params     map[string]interface{}
    TsProp     string
    TimeUnit   time.Duration
    GroupByKey string
}
1
2
3
4
5
6
7

# FieldExpression

type FieldExpression struct {
    Field      string
    Expression string
    Fields     []string
}
1
2
3
4
5

# Projection

type Projection struct {
    OutputName string
    SourceType ProjectionSourceType
    InputName  string
}

type ProjectionSourceType int

const (
    SourceGroupKey ProjectionSourceType = iota
    SourceAggregateResult
    SourceWindowProperty
)
1
2
3
4
5
6
7
8
9
10
11
12
13

# PerformanceConfig

type PerformanceConfig struct {
    BufferConfig     BufferConfig
    OverflowConfig   OverflowConfig
    WorkerConfig     WorkerConfig
    MonitoringConfig MonitoringConfig
}
1
2
3
4
5
6

# BufferConfig

type BufferConfig struct {
    DataChannelSize     int
    ResultChannelSize   int
    WindowOutputSize    int
    EnableDynamicResize bool
    MaxBufferSize       int
    UsageThreshold      float64
}
1
2
3
4
5
6
7
8

# OverflowConfig

type OverflowConfig struct {
    Strategy          string
    BlockTimeout      time.Duration
    AllowDataLoss     bool
    PersistenceConfig *PersistenceConfig
    ExpansionConfig   ExpansionConfig
}
1
2
3
4
5
6
7

# PersistenceConfig

type PersistenceConfig struct {
    DataDir       string
    MaxFileSize   int64
    FlushInterval time.Duration
    MaxRetries    int
    RetryInterval time.Duration
}
1
2
3
4
5
6
7

# ExpansionConfig

type ExpansionConfig struct {
    GrowthFactor     float64
    MinIncrement     int
    TriggerThreshold float64
    ExpansionTimeout time.Duration
}
1
2
3
4
5
6

# WorkerConfig

type WorkerConfig struct {
    SinkPoolSize     int
    SinkWorkerCount  int
    MaxRetryRoutines int
}
1
2
3
4
5

# MonitoringConfig

type MonitoringConfig struct {
    EnableMonitoring    bool
    StatsUpdateInterval time.Duration
    EnableDetailedStats bool
    WarningThresholds   WarningThresholds
}
1
2
3
4
5
6

# WarningThresholds

type WarningThresholds struct {
    DropRateWarning     float64
    DropRateCritical    float64
    BufferUsageWarning  float64
    BufferUsageCritical float64
}
1
2
3
4
5
6

# 配置预设函数

# DefaultPerformanceConfig

func DefaultPerformanceConfig() PerformanceConfig
1

返回默认性能配置,平衡性能和资源使用。

# HighPerformanceConfig

func HighPerformanceConfig() PerformanceConfig
1

返回高性能配置预设,适用于需要最大吞吐量的场景。

# LowLatencyConfig

func LowLatencyConfig() PerformanceConfig
1

返回低延迟配置预设,适用于实时交互应用。

# ZeroDataLossConfig

func ZeroDataLossConfig() PerformanceConfig
1

返回零数据丢失配置预设,采用阻塞策略确保数据不丢失。

# PersistencePerformanceConfig

func PersistencePerformanceConfig() PerformanceConfig
1

返回持久化性能配置预设,当缓冲区溢出时将数据持久化到磁盘。

# WindowResult

type WindowResult struct {
    GroupKey  string
    Results   []map[string]interface{}
    StartTime time.Time
    EndTime   time.Time
}
1
2
3
4
5
6

# 错误类型

# 常见错误

var (
    ErrInvalidSQL           = errors.New("无效的SQL语句")
    ErrUnsupportedOperation = errors.New("不支持的操作")
    ErrInvalidParameter     = errors.New("无效的参数")
    ErrFunctionNotFound     = errors.New("函数未找到")
    ErrTypeConversion       = errors.New("类型转换失败")
    ErrWindowNotInitialized = errors.New("窗口未初始化")
)
1
2
3
4
5
6
7
8

# 使用示例

# 完整示例

package main

import (
    "fmt"
    "log"
    "time"
    "github.com/rulego/streamsql"
    "github.com/rulego/streamsql/functions"
    "github.com/rulego/streamsql/logger"
)

func main() {
    // 1. 创建StreamSQL实例
    ssql := streamsql.New(
        streamsql.WithLogLevel(logger.INFO),
    )
    defer ssql.Stop()
    
    // 2. 注册自定义函数
    err := functions.RegisterCustomFunction(
        "celsius_to_fahrenheit",
        functions.TypeConversion,
        "温度转换",
        "摄氏度转华氏度",
        1, 1,
        func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
            celsius, err := functions.ConvertToFloat64(args[0])
            if err != nil {
                return nil, err
            }
            fahrenheit := celsius*9/5 + 32
            return fahrenheit, nil
        },
    )
    if err != nil {
        log.Fatal(err)
    }
    
	// 3. 执行SQL查询
    sql := `SELECT deviceId,
                   AVG(temperature) as avg_celsius,
                   AVG(celsius_to_fahrenheit(temperature)) as avg_fahrenheit,
                   COUNT(*) as sample_count,
                   window_start() as window_start
            FROM stream
            WHERE temperature > 0
            GROUP BY deviceId, TumblingWindow('1m')`
    
    err = ssql.Execute(sql)
    if err != nil {
        log.Fatal(err)
    }
    
    // 4. 添加结果处理
    ssql.AddSink(func(result interface{}) {
        fmt.Printf("聚合结果: %v\n", result)
    })
    
    // 5. 发送数据
    devices := []string{"sensor001", "sensor002", "sensor003"}
    go func() {
        for i := 0; i < 100; i++ {
            for _, device := range devices {
                data := map[string]interface{}{
                    "deviceId":    device,
                    "temperature": 20.0 + rand.Float64()*15,
                    "timestamp":   time.Now(),
                }
                ssql.Emit(data)
            }
            time.Sleep(5 * time.Second)
        }
    }()
    
    // 6. 等待结果
    time.Sleep(5 * time.Minute)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77

# 最佳实践

# 错误处理

// 检查SQL执行错误
err := ssql.Execute(sql)
if err != nil {
    log.Printf("SQL执行失败: %v", err)
    return
}

// 检查函数注册错误
err = functions.RegisterCustomFunction(...)
if err != nil {
    log.Printf("函数注册失败: %v", err)
    return
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 资源管理

// 确保资源释放
ssql := streamsql.New()
defer ssql.Stop()

// 函数注册和注销
err := functions.RegisterCustomFunction(...)
if err == nil {
    defer functions.Unregister("function_name")
}
1
2
3
4
5
6
7
8
9

# 并发安全

// StreamSQL实例是并发安全的
var ssql = streamsql.New()

go func() {
    for {
        ssql.Emit(generateData())
        time.Sleep(100 * time.Millisecond)
    }
}()

go func() {
    for {
        ssql.Emit(generateData())
        time.Sleep(200 * time.Millisecond)
    }
}()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/07/27, 15:17:27
SQL参考
RuleGo集成

← SQL参考 RuleGo集成→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式