API参考
# API参考
本章提供了StreamSQL的完整API参考文档,包括核心接口、配置选项、函数库等详细信息。
# 核心API
# Streamsql 主类
# 构造函数
func New(options ...Option) *Streamsql
创建新的StreamSQL实例。
参数:
options
- 可选配置项
返回值:
*Streamsql
- StreamSQL实例
示例:
// 默认配置
ssql := streamsql.New()
// 高性能配置
ssql := streamsql.New(streamsql.WithHighPerformance())
// 零数据丢失配置
ssql := streamsql.New(streamsql.WithZeroDataLoss())
// 自定义配置
ssql := streamsql.New(
streamsql.WithLogLevel(logger.DEBUG),
streamsql.WithDiscardLog(),
)
2
3
4
5
6
7
8
9
10
11
12
13
14
# Execute
func (s *Streamsql) Execute(sql string) error
执行SQL查询,启动流处理。
参数:
sql
- SQL查询语句
返回值:
error
- 执行错误,成功时为nil
示例:
sql := "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('5m')"
err := ssql.Execute(sql)
if err != nil {
log.Fatal(err)
}
2
3
4
5
# Emit
func (s *Streamsql) Emit(data interface{})
向数据流异步添加数据。
参数:
data
- 数据记录,通常为map[string]interface{}
示例:
data := map[string]interface{}{
"deviceId": "sensor001",
"temperature": 25.5,
"timestamp": time.Now(),
}
ssql.Emit(data)
2
3
4
5
6
# EmitSync
func (s *Streamsql) EmitSync(data interface{}) (interface{}, error)
同步处理数据并立即返回结果,仅支持非聚合查询。
参数:
data
- 数据记录,通常为map[string]interface{}
返回值:
interface{}
- 处理结果error
- 处理错误
示例:
data := map[string]interface{}{
"deviceId": "sensor001",
"temperature": 25.5,
"timestamp": time.Now(),
}
result, err := ssql.EmitSync(data)
if err != nil {
log.Printf("处理错误: %v", err)
} else {
fmt.Printf("处理结果: %v", result)
}
2
3
4
5
6
7
8
9
10
11
# IsAggregationQuery
func (s *Streamsql) IsAggregationQuery() bool
检查当前查询是否为聚合查询。
返回值:
bool
- 是否为聚合查询
示例:
if ssql.IsAggregationQuery() {
fmt.Println("当前查询包含聚合操作")
} else {
fmt.Println("当前查询为简单查询")
}
2
3
4
5
# Stream
func (s *Streamsql) Stream() *stream.Stream
获取底层流处理实例。
返回值:
*stream.Stream
- 流处理实例
示例:
stream := ssql.Stream()
stream.AddSink(func(result interface{}) {
fmt.Printf("结果: %v\n", result)
})
2
3
4
# GetStats
func (s *Streamsql) GetStats() map[string]int64
获取流处理统计信息。
返回值:
map[string]int64
- 统计信息映射
示例:
stats := ssql.GetStats()
fmt.Printf("处理数据量: %d\n", stats["processed_count"])
2
# GetDetailedStats
func (s *Streamsql) GetDetailedStats() map[string]interface{}
获取详细的性能统计信息。
返回值:
map[string]interface{}
- 详细统计信息
# Stop
func (s *Streamsql) Stop()
停止流处理并清理资源。
示例:
defer ssql.Stop()
# AddSink
func (s *Streamsql) AddSink(sink func(interface{}))
添加结果处理回调函数。
参数:
sink
- 结果处理回调函数
示例:
ssql.AddSink(func(result interface{}) {
fmt.Printf("处理结果: %v\n", result)
})
2
3
func (s *Streamsql) Print()
便捷方法,自动添加一个打印结果到控制台的sink函数。
示例:
ssql.Print() // 等价于 ssql.AddSink(func(result interface{}) { fmt.Println(result) })
# ToChannel
func (s *Streamsql) ToChannel() <-chan interface{}
返回结果通道,用于异步获取处理结果。
返回值:
<-chan interface{}
- 只读结果通道
示例:
resultChan := ssql.ToChannel()
go func() {
for result := range resultChan {
fmt.Printf("通道结果: %v\n", result)
}
}()
2
3
4
5
6
# 配置选项
# 性能配置
# WithHighPerformance
func WithHighPerformance() Option
使用高性能配置,适用于需要最大吞吐量的场景。
示例:
ssql := streamsql.New(streamsql.WithHighPerformance())
# WithLowLatency
func WithLowLatency() Option
使用低延迟配置,适用于实时交互应用。
示例:
ssql := streamsql.New(streamsql.WithLowLatency())
# WithZeroDataLoss
func WithZeroDataLoss() Option
使用零数据丢失配置,适用于关键业务数据。
示例:
ssql := streamsql.New(streamsql.WithZeroDataLoss())
# WithCustomPerformance
func WithCustomPerformance(config types.PerformanceConfig) Option
使用自定义性能配置。
参数:
config
- 自定义性能配置
示例:
config := types.DefaultPerformanceConfig()
config.BufferConfig.DataChannelSize = 2000
ssql := streamsql.New(streamsql.WithCustomPerformance(config))
2
3
# 日志配置
# WithLogLevel
func WithLogLevel(level logger.Level) Option
设置日志级别。
参数:
level
- 日志级别(DEBUG, INFO, WARN, ERROR, OFF)
示例:
ssql := streamsql.New(streamsql.WithLogLevel(logger.DEBUG))
# WithDiscardLog
func WithDiscardLog() Option
禁用日志输出(生产环境推荐)。
示例:
ssql := streamsql.New(streamsql.WithDiscardLog())
# 持久化配置
# WithPersistence
func WithPersistence() Option
使用持久化配置预设。
示例:
ssql := streamsql.New(streamsql.WithPersistence())
# WithZeroDataLossConfig
func WithZeroDataLossConfig() Option
使用零数据丢失配置预设,采用阻塞策略确保数据不丢失。
示例:
ssql := streamsql.New(streamsql.WithZeroDataLossConfig())
# WithPersistencePerformanceConfig
func WithPersistencePerformanceConfig() Option
使用持久化性能配置预设,当缓冲区溢出时将数据持久化到磁盘。
示例:
ssql := streamsql.New(streamsql.WithPersistencePerformanceConfig())
# WithCustomPersistence
func WithCustomPersistence(dataDir string, maxFileSize int64, flushInterval time.Duration) Option
使用自定义持久化配置。
参数:
dataDir
- 数据目录maxFileSize
- 最大文件大小flushInterval
- 刷新间隔
示例:
ssql := streamsql.New(streamsql.WithCustomPersistence("/data", 100*1024*1024, 5*time.Second))
# 缓冲区配置
# WithBufferSizes
func WithBufferSizes(dataChannelSize, resultChannelSize, windowOutputSize int) Option
设置自定义缓冲区大小。
参数:
dataChannelSize
- 数据通道大小resultChannelSize
- 结果通道大小windowOutputSize
- 窗口输出大小
示例:
ssql := streamsql.New(streamsql.WithBufferSizes(2000, 1000, 500))
# 溢出策略配置
# WithOverflowStrategy
func WithOverflowStrategy(strategy string, blockTimeout time.Duration) Option
设置溢出策略。
参数:
strategy
- 溢出策略("drop", "block", "persist")blockTimeout
- 阻塞超时时间
示例:
ssql := streamsql.New(streamsql.WithOverflowStrategy("drop", 5*time.Second))
# 工作池配置
# WithWorkerConfig
func WithWorkerConfig(sinkPoolSize, sinkWorkerCount, maxRetryRoutines int) Option
设置工作池配置。
参数:
sinkPoolSize
- 结果处理池大小sinkWorkerCount
- 工作线程数maxRetryRoutines
- 最大重试协程数
示例:
ssql := streamsql.New(streamsql.WithWorkerConfig(100, 10, 5))
# 监控配置
# WithMonitoring
func WithMonitoring(updateInterval time.Duration, enableDetailedStats bool) Option
启用详细监控。
参数:
updateInterval
- 统计更新间隔enableDetailedStats
- 是否启用详细统计
示例:
ssql := streamsql.New(streamsql.WithMonitoring(10*time.Second, true))
# 流处理API
# Stream 类
# AddSink
func (s *Stream) AddSink(sink func(interface{}))
添加结果处理函数。
参数:
sink
- 结果处理回调函数
示例:
ssql.Stream().AddSink(func(result interface{}) {
// 处理结果
fmt.Printf("处理结果: %v\n", result)
})
2
3
4
# GetWindow
func (s *Stream) GetWindow() window.Window
获取窗口实例。
返回值:
window.Window
- 窗口接口
# Stop
func (s *Stream) Stop()
停止流处理。
# 窗口API
# 窗口类型
# TumblingWindow
func NewTumblingWindow(size time.Duration, timeUnit time.Duration, tsProp string) *TumblingWindow
创建滚动窗口。
参数:
size
- 窗口大小timeUnit
- 时间单位tsProp
- 时间戳字段名
# SlidingWindow
func NewSlidingWindow(size, slide time.Duration, timeUnit time.Duration, tsProp string) *SlidingWindow
创建滑动窗口。
参数:
size
- 窗口大小slide
- 滑动间隔timeUnit
- 时间单位tsProp
- 时间戳字段名
# CountingWindow
func NewCountingWindow(count int) *CountingWindow
创建计数窗口。
参数:
count
- 触发计数
# SessionWindow
func NewSessionWindow(timeout time.Duration, groupByKey string) *SessionWindow
创建会话窗口。
参数:
timeout
- 会话超时时间groupByKey
- 分组字段
# 窗口接口
# Window 接口
type Window interface {
Add(data interface{})
Trigger() []types.WindowResult
GetType() WindowType
Stop()
}
2
3
4
5
6
方法说明:
# Add
Add(data interface{})
向窗口添加数据。
# Trigger
Trigger() []types.WindowResult
手动触发窗口计算。
返回值:
[]types.WindowResult
- 窗口结果列表
# GetType
GetType() WindowType
获取窗口类型。
返回值:
WindowType
- 窗口类型枚举
# Stop
Stop()
停止窗口处理。
# 函数系统API
# 函数注册
# RegisterCustomFunction
func RegisterCustomFunction(
name string,
funcType FunctionType,
category string,
description string,
minArgs int,
maxArgs int,
handler FunctionHandler,
) error
2
3
4
5
6
7
8
9
注册自定义函数。
参数:
name
- 函数名funcType
- 函数类型category
- 函数分类description
- 函数描述minArgs
- 最小参数数量maxArgs
- 最大参数数量handler
- 函数处理器
返回值:
error
- 注册错误
示例:
err := functions.RegisterCustomFunction(
"my_function",
functions.TypeMath,
"数学计算",
"自定义数学函数",
2, 2,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
// 函数实现
return result, nil
},
)
2
3
4
5
6
7
8
9
10
11
# Register
func Register(function Function) error
注册函数实例。
参数:
function
- 函数实例
返回值:
error
- 注册错误
# Unregister
func Unregister(name string)
注销函数。
参数:
name
- 函数名
示例:
functions.Unregister("my_function")
# Get
func Get(name string) (Function, bool)
获取函数实例。
参数:
name
- 函数名
返回值:
Function
- 函数实例bool
- 是否存在
# GetByType
func GetByType(funcType FunctionType) []Function
根据类型获取函数列表。
参数:
funcType
- 函数类型
返回值:
[]Function
- 函数实例列表
# ListAll
func ListAll() map[string]Function
列出所有已注册的函数。
返回值:
map[string]Function
- 函数名到函数实例的映射
# Execute
func Execute(name string, args []interface{}) (interface{}, error)
执行指定名称的函数。
参数:
name
- 函数名args
- 函数参数
返回值:
interface{}
- 执行结果error
- 执行错误
# 函数类型
type FunctionType string
const (
TypeMath FunctionType = "math"
TypeString FunctionType = "string"
TypeConversion FunctionType = "conversion"
TypeDateTime FunctionType = "datetime"
TypeAggregation FunctionType = "aggregation"
TypeAnalytical FunctionType = "analytical"
TypeWindow FunctionType = "window"
TypeCustom FunctionType = "custom"
)
2
3
4
5
6
7
8
9
10
11
12
# 函数处理器
type FunctionHandler func(ctx *FunctionContext, args []interface{}) (interface{}, error)
# FunctionContext
type FunctionContext struct {
CurrentRow map[string]interface{}
WindowInfo *WindowInfo
CustomData map[string]interface{}
}
2
3
4
5
字段说明:
CurrentRow
- 当前处理的数据行WindowInfo
- 窗口信息CustomData
- 自定义数据
# WindowInfo
type WindowInfo struct {
StartTime time.Time
EndTime time.Time
Size time.Duration
Type string
}
2
3
4
5
6
# 工具函数
# ConvertToFloat64
func ConvertToFloat64(value interface{}) (float64, error)
转换值为float64类型。
# ConvertToInt
func ConvertToInt(value interface{}) (int, error)
转换值为int类型。
# ConvertToString
func ConvertToString(value interface{}) (string, error)
转换值为string类型。
# ConvertToTime
func ConvertToTime(value interface{}) (time.Time, error)
转换值为time.Time类型。
# ConvertToFloat64Array
func ConvertToFloat64Array(value interface{}) ([]float64, error)
转换值为float64数组。
# 聚合器API
# 聚合类型
type AggregateType string
const (
AggregateSum AggregateType = "sum"
AggregateAvg AggregateType = "avg"
AggregateMin AggregateType = "min"
AggregateMax AggregateType = "max"
AggregateCount AggregateType = "count"
AggregateStddev AggregateType = "stddev"
AggregateMedian AggregateType = "median"
AggregatePercentile AggregateType = "percentile"
// ... 其他聚合类型
)
2
3
4
5
6
7
8
9
10
11
12
13
# Aggregator 接口
type Aggregator interface {
Add(value interface{}) error
GetResult() interface{}
Reset()
GetType() AggregateType
}
2
3
4
5
6
# 表达式API
# Expression 接口
type Expression interface {
Evaluate(data map[string]interface{}) (interface{}, error)
GetFields() []string
}
2
3
4
# NewExpression
func NewExpression(expr string) (Expression, error)
创建表达式实例。
参数:
expr
- 表达式字符串
返回值:
Expression
- 表达式实例error
- 创建错误
# 日志API
# Logger 接口
type Logger interface {
Debug(format string, args ...interface{})
Info(format string, args ...interface{})
Warn(format string, args ...interface{})
Error(format string, args ...interface{})
SetLevel(level Level)
SetOutput(writer io.Writer)
}
2
3
4
5
6
7
8
# 日志级别
type Level int
const (
DEBUG Level = iota
INFO
WARN
ERROR
OFF
)
2
3
4
5
6
7
8
9
# 创建日志器
# New
func New(output io.Writer, level Level) Logger
创建新的日志器。
# NewDiscard
func NewDiscard() Logger
创建丢弃日志器。
# 类型定义
# Config
type Config struct {
WindowConfig WindowConfig
GroupFields []string
SelectFields map[string]aggregator.AggregateType
FieldAlias map[string]string
SimpleFields []string
FieldExpressions map[string]FieldExpression
Where string
Having string
NeedWindow bool
Distinct bool
Limit int
Projections []Projection
PerformanceConfig PerformanceConfig
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# WindowConfig
type WindowConfig struct {
Type window.WindowType
Params map[string]interface{}
TsProp string
TimeUnit time.Duration
GroupByKey string
}
2
3
4
5
6
7
# FieldExpression
type FieldExpression struct {
Field string
Expression string
Fields []string
}
2
3
4
5
# Projection
type Projection struct {
OutputName string
SourceType ProjectionSourceType
InputName string
}
type ProjectionSourceType int
const (
SourceGroupKey ProjectionSourceType = iota
SourceAggregateResult
SourceWindowProperty
)
2
3
4
5
6
7
8
9
10
11
12
13
# PerformanceConfig
type PerformanceConfig struct {
BufferConfig BufferConfig
OverflowConfig OverflowConfig
WorkerConfig WorkerConfig
MonitoringConfig MonitoringConfig
}
2
3
4
5
6
# BufferConfig
type BufferConfig struct {
DataChannelSize int
ResultChannelSize int
WindowOutputSize int
EnableDynamicResize bool
MaxBufferSize int
UsageThreshold float64
}
2
3
4
5
6
7
8
# OverflowConfig
type OverflowConfig struct {
Strategy string
BlockTimeout time.Duration
AllowDataLoss bool
PersistenceConfig *PersistenceConfig
ExpansionConfig ExpansionConfig
}
2
3
4
5
6
7
# PersistenceConfig
type PersistenceConfig struct {
DataDir string
MaxFileSize int64
FlushInterval time.Duration
MaxRetries int
RetryInterval time.Duration
}
2
3
4
5
6
7
# ExpansionConfig
type ExpansionConfig struct {
GrowthFactor float64
MinIncrement int
TriggerThreshold float64
ExpansionTimeout time.Duration
}
2
3
4
5
6
# WorkerConfig
type WorkerConfig struct {
SinkPoolSize int
SinkWorkerCount int
MaxRetryRoutines int
}
2
3
4
5
# MonitoringConfig
type MonitoringConfig struct {
EnableMonitoring bool
StatsUpdateInterval time.Duration
EnableDetailedStats bool
WarningThresholds WarningThresholds
}
2
3
4
5
6
# WarningThresholds
type WarningThresholds struct {
DropRateWarning float64
DropRateCritical float64
BufferUsageWarning float64
BufferUsageCritical float64
}
2
3
4
5
6
# 配置预设函数
# DefaultPerformanceConfig
func DefaultPerformanceConfig() PerformanceConfig
返回默认性能配置,平衡性能和资源使用。
# HighPerformanceConfig
func HighPerformanceConfig() PerformanceConfig
返回高性能配置预设,适用于需要最大吞吐量的场景。
# LowLatencyConfig
func LowLatencyConfig() PerformanceConfig
返回低延迟配置预设,适用于实时交互应用。
# ZeroDataLossConfig
func ZeroDataLossConfig() PerformanceConfig
返回零数据丢失配置预设,采用阻塞策略确保数据不丢失。
# PersistencePerformanceConfig
func PersistencePerformanceConfig() PerformanceConfig
返回持久化性能配置预设,当缓冲区溢出时将数据持久化到磁盘。
# WindowResult
type WindowResult struct {
GroupKey string
Results []map[string]interface{}
StartTime time.Time
EndTime time.Time
}
2
3
4
5
6
# 错误类型
# 常见错误
var (
ErrInvalidSQL = errors.New("无效的SQL语句")
ErrUnsupportedOperation = errors.New("不支持的操作")
ErrInvalidParameter = errors.New("无效的参数")
ErrFunctionNotFound = errors.New("函数未找到")
ErrTypeConversion = errors.New("类型转换失败")
ErrWindowNotInitialized = errors.New("窗口未初始化")
)
2
3
4
5
6
7
8
# 使用示例
# 完整示例
package main
import (
"fmt"
"log"
"time"
"github.com/rulego/streamsql"
"github.com/rulego/streamsql/functions"
"github.com/rulego/streamsql/logger"
)
func main() {
// 1. 创建StreamSQL实例
ssql := streamsql.New(
streamsql.WithLogLevel(logger.INFO),
)
defer ssql.Stop()
// 2. 注册自定义函数
err := functions.RegisterCustomFunction(
"celsius_to_fahrenheit",
functions.TypeConversion,
"温度转换",
"摄氏度转华氏度",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
celsius, err := functions.ConvertToFloat64(args[0])
if err != nil {
return nil, err
}
fahrenheit := celsius*9/5 + 32
return fahrenheit, nil
},
)
if err != nil {
log.Fatal(err)
}
// 3. 执行SQL查询
sql := `SELECT deviceId,
AVG(temperature) as avg_celsius,
AVG(celsius_to_fahrenheit(temperature)) as avg_fahrenheit,
COUNT(*) as sample_count,
window_start() as window_start
FROM stream
WHERE temperature > 0
GROUP BY deviceId, TumblingWindow('1m')`
err = ssql.Execute(sql)
if err != nil {
log.Fatal(err)
}
// 4. 添加结果处理
ssql.AddSink(func(result interface{}) {
fmt.Printf("聚合结果: %v\n", result)
})
// 5. 发送数据
devices := []string{"sensor001", "sensor002", "sensor003"}
go func() {
for i := 0; i < 100; i++ {
for _, device := range devices {
data := map[string]interface{}{
"deviceId": device,
"temperature": 20.0 + rand.Float64()*15,
"timestamp": time.Now(),
}
ssql.Emit(data)
}
time.Sleep(5 * time.Second)
}
}()
// 6. 等待结果
time.Sleep(5 * time.Minute)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# 最佳实践
# 错误处理
// 检查SQL执行错误
err := ssql.Execute(sql)
if err != nil {
log.Printf("SQL执行失败: %v", err)
return
}
// 检查函数注册错误
err = functions.RegisterCustomFunction(...)
if err != nil {
log.Printf("函数注册失败: %v", err)
return
}
2
3
4
5
6
7
8
9
10
11
12
13
# 资源管理
// 确保资源释放
ssql := streamsql.New()
defer ssql.Stop()
// 函数注册和注销
err := functions.RegisterCustomFunction(...)
if err == nil {
defer functions.Unregister("function_name")
}
2
3
4
5
6
7
8
9
# 并发安全
// StreamSQL实例是并发安全的
var ssql = streamsql.New()
go func() {
for {
ssql.Emit(generateData())
time.Sleep(100 * time.Millisecond)
}
}()
go func() {
for {
ssql.Emit(generateData())
time.Sleep(200 * time.Millisecond)
}
}()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16