API Reference
# API Reference
This chapter provides the complete API reference documentation for StreamSQL, including core interfaces, configuration options, function libraries, and other detailed information.
# Core API
# Streamsql Main Class
# Constructor
func New(options ...Option) *Streamsql
Creates a new StreamSQL instance.
Parameters:
options
- Optional configuration items
Return Value:
*Streamsql
- StreamSQL instance
Example:
// Default configuration
ssql := streamsql.New()
// High performance configuration
ssql := streamsql.New(streamsql.WithHighPerformance())
// Zero data loss configuration
ssql := streamsql.New(streamsql.WithZeroDataLoss())
// Custom configuration
ssql := streamsql.New(
streamsql.WithLogLevel(logger.DEBUG),
streamsql.WithDiscardLog(),
)
2
3
4
5
6
7
8
9
10
11
12
13
14
# Execute
func (s *Streamsql) Execute(sql string) error
Executes SQL query and starts stream processing.
Parameters:
sql
- SQL query statement
Return Value:
error
- Execution error, nil on success
Example:
sql := "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('5m')"
err := ssql.Execute(sql)
if err != nil {
log.Fatal(err)
}
2
3
4
5
# Emit
func (s *Streamsql) Emit(data map[string]interface{})
Adds data to the stream asynchronously.
Parameters:
data
- Data record, must be of typemap[string]interface{}
Example:
data := map[string]interface{}{
"deviceId": "sensor001",
"temperature": 25.5,
"timestamp": time.Now(),
}
ssql.Emit(data)
2
3
4
5
6
# EmitSync
func (s *Streamsql) EmitSync(data map[string]interface{}) (map[string]interface{}, error)
Processes data synchronously and returns results immediately, only supports non-aggregation queries.
Parameters:
data
- Data record, must be of typemap[string]interface{}
Return Value:
map[string]interface{}
- Processing result, returns nil if filter conditions don't matcherror
- Processing error
Example:
data := map[string]interface{}{
"deviceId": "sensor001",
"temperature": 25.5,
"timestamp": time.Now(),
}
result, err := ssql.EmitSync(data)
if err != nil {
log.Printf("Processing error: %v", err)
} else if result != nil {
fmt.Printf("Processing result: %v", result)
}
2
3
4
5
6
7
8
9
10
11
# IsAggregationQuery
func (s *Streamsql) IsAggregationQuery() bool
Checks whether the current query is an aggregation query.
Return Value:
bool
- Whether it's an aggregation query
Example:
if ssql.IsAggregationQuery() {
fmt.Println("Current query contains aggregation operations")
} else {
fmt.Println("Current query is a simple query")
}
2
3
4
5
# Stream
func (s *Streamsql) Stream() *stream.Stream
Gets the underlying stream processing instance.
Return Value:
*stream.Stream
- Stream processing instance
Example:
stream := ssql.Stream()
stream.AddSink(func(result interface{}) {
fmt.Printf("Result: %v\n", result)
})
2
3
4
# GetStats
func (s *Streamsql) GetStats() map[string]int64
Gets stream processing statistics.
Return Value:
map[string]int64
- Statistics map
Example:
stats := ssql.GetStats()
fmt.Printf("Processed data count: %d\n", stats["processed_count"])
2
# GetDetailedStats
func (s *Streamsql) GetDetailedStats() map[string]interface{}
Gets detailed performance statistics.
Return Value:
map[string]interface{}
- Detailed statistics
# Stop
func (s *Streamsql) Stop()
Stops stream processing and cleans up resources.
Example:
defer ssql.Stop()
# AddSink
func (s *Streamsql) AddSink(sink func([]map[string]interface{}))
Adds result processing callback function.
Parameters:
sink
- Result processing callback function, receives result data of type[]map[string]interface{}
Example:
ssql.AddSink(func(results []map[string]interface{}) {
fmt.Printf("Processing results: %v\n", results)
})
2
3
func (s *Streamsql) Print()
Convenient method, automatically adds a sink function that prints results to console.
Example:
ssql.Print() // Equivalent to ssql.AddSink(func(result interface{}) { fmt.Println(result) })
# ToChannel
func (s *Streamsql) ToChannel() <-chan []map[string]interface{}
Returns result channel for asynchronously getting processing results.
Return Value:
<-chan []map[string]interface{}
- Read-only result channel, returns nil if SQL hasn't been executed
Example:
// Get result channel
resultChan := ssql.ToChannel()
if resultChan != nil {
go func() {
for results := range resultChan {
fmt.Printf("Async results: %v\n", results)
}
}()
}
2
3
4
5
6
7
8
9
# Configuration Options
# Performance Configuration
# WithHighPerformance
func WithHighPerformance() Option
Uses high-performance configuration, suitable for scenarios requiring maximum throughput.
Example:
ssql := streamsql.New(streamsql.WithHighPerformance())
# WithLowLatency
func WithLowLatency() Option
Uses low-latency configuration, suitable for real-time interactive applications.
Example:
ssql := streamsql.New(streamsql.WithLowLatency())
# WithZeroDataLoss
func WithZeroDataLoss() Option
Uses zero data loss configuration, suitable for critical business data.
Example:
ssql := streamsql.New(streamsql.WithZeroDataLoss())
# WithCustomPerformance
func WithCustomPerformance(config types.PerformanceConfig) Option
Uses custom performance configuration.
Parameters:
config
- Custom performance configuration
Example:
config := types.DefaultPerformanceConfig()
config.BufferConfig.DataChannelSize = 2000
ssql := streamsql.New(streamsql.WithCustomPerformance(config))
2
3
# Log Configuration
# WithLogLevel
func WithLogLevel(level logger.Level) Option
Sets log level.
Parameters:
level
- Log level (DEBUG, INFO, WARN, ERROR, OFF)
Example:
ssql := streamsql.New(streamsql.WithLogLevel(logger.DEBUG))
# WithDiscardLog
func WithDiscardLog() Option
Disables log output (recommended for production environment).
Example:
ssql := streamsql.New(streamsql.WithDiscardLog())
# Persistence Configuration
# WithPersistence
func WithPersistence() Option
Uses persistence configuration preset.
Example:
ssql := streamsql.New(streamsql.WithPersistence())
# WithZeroDataLossConfig
func WithZeroDataLossConfig() Option
Uses zero data loss configuration preset, adopts blocking strategy to ensure no data loss.
Example:
ssql := streamsql.New(streamsql.WithZeroDataLossConfig())
# WithPersistencePerformanceConfig
func WithPersistencePerformanceConfig() Option
Uses persistence performance configuration preset, persists data to disk when buffer overflows.
Example:
ssql := streamsql.New(streamsql.WithPersistencePerformanceConfig())
# WithCustomPersistence
func WithCustomPersistence(dataDir string, maxFileSize int64, flushInterval time.Duration) Option
Uses custom persistence configuration.
Parameters:
dataDir
- Data directorymaxFileSize
- Maximum file sizeflushInterval
- Flush interval
Example:
ssql := streamsql.New(streamsql.WithCustomPersistence("/data", 100*1024*1024, 5*time.Second))
# Buffer Configuration
# WithBufferSizes
func WithBufferSizes(dataChannelSize, resultChannelSize, windowOutputSize int) Option
Sets custom buffer sizes.
Parameters:
dataChannelSize
- Data channel sizeresultChannelSize
- Result channel sizewindowOutputSize
- Window output size
Example:
ssql := streamsql.New(streamsql.WithBufferSizes(2000, 1000, 500))
# Overflow Strategy Configuration
# WithOverflowStrategy
func WithOverflowStrategy(strategy string, blockTimeout time.Duration) Option
Sets overflow strategy.
Parameters:
strategy
- Overflow strategy ("drop", "block", "persist")blockTimeout
- Block timeout duration
Example:
ssql := streamsql.New(streamsql.WithOverflowStrategy("drop", 5*time.Second))
# Worker Pool Configuration
# WithWorkerConfig
func WithWorkerConfig(sinkPoolSize, sinkWorkerCount, maxRetryRoutines int) Option
Sets worker pool configuration.
Parameters:
sinkPoolSize
- Result processing pool sizesinkWorkerCount
- Worker thread countmaxRetryRoutines
- Maximum retry goroutines
Example:
ssql := streamsql.New(streamsql.WithWorkerConfig(100, 10, 5))
# Monitoring Configuration
# WithMonitoring
func WithMonitoring(updateInterval time.Duration, enableDetailedStats bool) Option
Enables detailed monitoring.
Parameters:
updateInterval
- Statistics update intervalenableDetailedStats
- Whether to enable detailed statistics
Example:
ssql := streamsql.New(streamsql.WithMonitoring(10*time.Second, true))
# Stream Processing API
# Stream Class
# AddSink
func (s *Stream) AddSink(sink func(interface{}))
Adds result processing function.
Parameters:
sink
- Result processing callback function
Example:
ssql.Stream().AddSink(func(result interface{}) {
// Process result
fmt.Printf("Processing result: %v\n", result)
})
2
3
4
# GetWindow
func (s *Stream) GetWindow() window.Window
Gets window instance.
Return Value:
window.Window
- Window interface
# Stop
func (s *Stream) Stop()
Stops stream processing.
# Window API
# Window Types
# TumblingWindow
func NewTumblingWindow(size time.Duration, timeUnit time.Duration, tsProp string) *TumblingWindow
Creates a tumbling window.
Parameters:
size
- Window sizetimeUnit
- Time unittsProp
- Timestamp field name
# SlidingWindow
func NewSlidingWindow(size, slide time.Duration, timeUnit time.Duration, tsProp string) *SlidingWindow
Creates a sliding window.
Parameters:
size
- Window sizeslide
- Slide intervaltimeUnit
- Time unittsProp
- Timestamp field name
# CountingWindow
func NewCountingWindow(count int) *CountingWindow
Creates a counting window.
Parameters:
count
- Trigger count
# SessionWindow
func NewSessionWindow(timeout time.Duration, groupByKey string) *SessionWindow
Creates a session window.
Parameters:
timeout
- Session timeout durationgroupByKey
- Group by field
# Window Interface
# Window Interface
type Window interface {
Add(data interface{})
Trigger() []types.WindowResult
GetType() WindowType
Stop()
}
2
3
4
5
6
Method Descriptions:
# Add
Add(data interface{})
Adds data to the window.
# Trigger
Trigger() []types.WindowResult
Manually triggers window calculation.
Return Value:
[]types.WindowResult
- Window result list
# GetType
GetType() WindowType
Gets window type.
Return Value:
WindowType
- Window type enumeration
# Stop
Stop()
Stops window processing.
# Function System API
# Function Registration
# RegisterCustomFunction
func RegisterCustomFunction(
name string,
funcType FunctionType,
category string,
description string,
minArgs int,
maxArgs int,
handler FunctionHandler,
) error
2
3
4
5
6
7
8
9
Registers a custom function.
Parameters:
name
- Function namefuncType
- Function typecategory
- Function categorydescription
- Function descriptionminArgs
- Minimum argument countmaxArgs
- Maximum argument counthandler
- Function handler
Return Value:
error
- Registration error
Example:
err := functions.RegisterCustomFunction(
"my_function",
functions.TypeMath,
"Mathematical Calculation",
"Custom mathematical function",
2, 2,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
// Function implementation
return result, nil
},
)
2
3
4
5
6
7
8
9
10
11
# Register
func Register(function Function) error
Registers function instance.
Parameters:
function
- Function instance
Return Value:
error
- Registration error
# Unregister
func Unregister(name string)
Unregisters function.
Parameters:
name
- Function name
Example:
functions.Unregister("my_function")
# Get
func Get(name string) (Function, bool)
Gets function instance.
Parameters:
name
- Function name
Return Value:
Function
- Function instancebool
- Whether it exists
# GetByType
func GetByType(funcType FunctionType) []Function
Gets function list by type.
Parameters:
funcType
- Function type
Return Value:
[]Function
- Function instance list
# ListAll
func ListAll() map[string]Function
Lists all registered functions.
Return Value:
map[string]Function
- Map from function name to function instance
# Execute
func Execute(name string, args []interface{}) (interface{}, error)
Executes function by specified name.
Parameters:
name
- Function nameargs
- Function arguments
Return Value:
interface{}
- Execution resulterror
- Execution error
# Function Types
type FunctionType string
const (
TypeMath FunctionType = "math"
TypeString FunctionType = "string"
TypeConversion FunctionType = "conversion"
TypeDateTime FunctionType = "datetime"
TypeAggregation FunctionType = "aggregation"
TypeAnalytical FunctionType = "analytical"
TypeWindow FunctionType = "window"
TypeCustom FunctionType = "custom"
)
2
3
4
5
6
7
8
9
10
11
12
# Function Handler
type FunctionHandler func(ctx *FunctionContext, args []interface{}) (interface{}, error)
# FunctionContext
type FunctionContext struct {
CurrentRow map[string]interface{}
WindowInfo *WindowInfo
CustomData map[string]interface{}
}
2
3
4
5
Field Descriptions:
CurrentRow
- Currently processed data rowWindowInfo
- Window informationCustomData
- Custom data
# WindowInfo
type WindowInfo struct {
StartTime time.Time
EndTime time.Time
Size time.Duration
Type string
}
2
3
4
5
6
# Utility Functions
# ConvertToFloat64
func ConvertToFloat64(value interface{}) (float64, error)
Converts value to float64 type.
# ConvertToInt
func ConvertToInt(value interface{}) (int, error)
Converts value to int type.
# ConvertToString
func ConvertToString(value interface{}) (string, error)
Converts value to string type.
# ConvertToTime
func ConvertToTime(value interface{}) (time.Time, error)
Converts value to time.Time type.
# ConvertToFloat64Array
func ConvertToFloat64Array(value interface{}) ([]float64, error)
Converts value to float64 array.
# Aggregator API
# Aggregate Types
type AggregateType string
const (
AggregateSum AggregateType = "sum"
AggregateAvg AggregateType = "avg"
AggregateMin AggregateType = "min"
AggregateMax AggregateType = "max"
AggregateCount AggregateType = "count"
AggregateStddev AggregateType = "stddev"
AggregateMedian AggregateType = "median"
AggregatePercentile AggregateType = "percentile"
// ... Other aggregate types
)
2
3
4
5
6
7
8
9
10
11
12
13
# Aggregator Interface
type Aggregator interface {
Add(value interface{}) error
GetResult() interface{}
Reset()
GetType() AggregateType
}
2
3
4
5
6
# Expression API
# Expression Interface
type Expression interface {
Evaluate(data map[string]interface{}) (interface{}, error)
GetFields() []string
}
2
3
4
# NewExpression
func NewExpression(expr string) (Expression, error)
Creates expression instance.
Parameters:
expr
- Expression string
Return Value:
Expression
- Expression instanceerror
- Creation error
# Log API
# Logger Interface
type Logger interface {
Debug(format string, args ...interface{})
Info(format string, args ...interface{})
Warn(format string, args ...interface{})
Error(format string, args ...interface{})
SetLevel(level Level)
SetOutput(writer io.Writer)
}
2
3
4
5
6
7
8
# Log Level
type Level int
const (
DEBUG Level = iota
INFO
WARN
ERROR
OFF
)
2
3
4
5
6
7
8
9
# Creating Logger
# New
func New(output io.Writer, level Level) Logger
Creates new logger.
# NewDiscard
func NewDiscard() Logger
Creates discard logger.
# Type Definitions
# Config
type Config struct {
WindowConfig WindowConfig
GroupFields []string
SelectFields map[string]aggregator.AggregateType
FieldAlias map[string]string
SimpleFields []string
FieldExpressions map[string]FieldExpression
Where string
Having string
NeedWindow bool
Distinct bool
Limit int
Projections []Projection
PerformanceConfig PerformanceConfig
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# WindowConfig
type WindowConfig struct {
Type window.WindowType
Params map[string]interface{}
TsProp string
TimeUnit time.Duration
GroupByKey string
}
2
3
4
5
6
7
# FieldExpression
type FieldExpression struct {
Field string
Expression string
Fields []string
}
2
3
4
5
# Projection
type Projection struct {
OutputName string
SourceType ProjectionSourceType
InputName string
}
type ProjectionSourceType int
const (
SourceGroupKey ProjectionSourceType = iota
SourceAggregateResult
SourceWindowProperty
)
2
3
4
5
6
7
8
9
10
11
12
13
# PerformanceConfig
type PerformanceConfig struct {
BufferConfig BufferConfig
OverflowConfig OverflowConfig
WorkerConfig WorkerConfig
MonitoringConfig MonitoringConfig
}
2
3
4
5
6
# BufferConfig
type BufferConfig struct {
DataChannelSize int
ResultChannelSize int
WindowOutputSize int
EnableDynamicResize bool
MaxBufferSize int
UsageThreshold float64
}
2
3
4
5
6
7
8
# OverflowConfig
type OverflowConfig struct {
Strategy string
BlockTimeout time.Duration
AllowDataLoss bool
PersistenceConfig *PersistenceConfig
ExpansionConfig ExpansionConfig
}
2
3
4
5
6
7
# PersistenceConfig
type PersistenceConfig struct {
DataDir string
MaxFileSize int64
FlushInterval time.Duration
MaxRetries int
RetryInterval time.Duration
}
2
3
4
5
6
7
# ExpansionConfig
type ExpansionConfig struct {
GrowthFactor float64
MinIncrement int
TriggerThreshold float64
ExpansionTimeout time.Duration
}
2
3
4
5
6
# WorkerConfig
type WorkerConfig struct {
SinkPoolSize int
SinkWorkerCount int
MaxRetryRoutines int
}
2
3
4
5
# MonitoringConfig
type MonitoringConfig struct {
EnableMonitoring bool
StatsUpdateInterval time.Duration
EnableDetailedStats bool
WarningThresholds WarningThresholds
}
2
3
4
5
6
# WarningThresholds
type WarningThresholds struct {
DropRateWarning float64
DropRateCritical float64
BufferUsageWarning float64
BufferUsageCritical float64
}
2
3
4
5
6
# Configuration Preset Functions
# DefaultPerformanceConfig
func DefaultPerformanceConfig() PerformanceConfig
Returns default performance configuration, balancing performance and resource usage.
# HighPerformanceConfig
func HighPerformanceConfig() PerformanceConfig
Returns high-performance configuration preset, suitable for scenarios requiring maximum throughput.
# LowLatencyConfig
func LowLatencyConfig() PerformanceConfig
Returns low-latency configuration preset, suitable for real-time interactive applications.
# ZeroDataLossConfig
func ZeroDataLossConfig() PerformanceConfig
Returns zero data loss configuration preset, adopts blocking strategy to ensure no data loss.
# PersistencePerformanceConfig
func PersistencePerformanceConfig() PerformanceConfig
Returns persistence performance configuration preset, persists data to disk when buffer overflows.
# WindowResult
type WindowResult struct {
GroupKey string
Results []map[string]interface{}
StartTime time.Time
EndTime time.Time
}
2
3
4
5
6
# Error Types
# Common Errors
var (
ErrInvalidSQL = errors.New("invalid SQL statement")
ErrUnsupportedOperation = errors.New("unsupported operation")
ErrInvalidParameter = errors.New("invalid parameter")
ErrFunctionNotFound = errors.New("function not found")
ErrTypeConversion = errors.New("type conversion failed")
ErrWindowNotInitialized = errors.New("window not initialized")
)
2
3
4
5
6
7
8
# Usage Example
# Complete Example
package main
import (
"fmt"
"log"
"time"
"github.com/rulego/streamsql"
"github.com/rulego/streamsql/functions"
"github.com/rulego/streamsql/logger"
)
func main() {
// 1. Create StreamSQL instance
ssql := streamsql.New(
streamsql.WithLogLevel(logger.INFO),
)
defer ssql.Stop()
// 2. Register custom function
err := functions.RegisterCustomFunction(
"celsius_to_fahrenheit",
functions.TypeConversion,
"Temperature Conversion",
"Convert Celsius to Fahrenheit",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
celsius, err := functions.ConvertToFloat64(args[0])
if err != nil {
return nil, err
}
fahrenheit := celsius*9/5 + 32
return fahrenheit, nil
},
)
if err != nil {
log.Fatal(err)
}
// 3. Execute SQL query
sql := "SELECT deviceId, celsius_to_fahrenheit(temperature) as fahrenheit FROM stream"
err = ssql.Execute(sql)
if err != nil {
log.Fatal(err)
}
// 4. Add result processor
ssql.AddSink(func(results []map[string]interface{}) {
fmt.Printf("Processing results: %v\n", results)
})
// 5. Send test data
data := map[string]interface{}{
"deviceId": "sensor001",
"temperature": 25.0,
}
ssql.Emit(data)
// 6. Wait for processing
time.Sleep(1 * time.Second)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# 📚 Related Documentation
- SQL Reference - View complete SQL syntax reference
- Function Reference - View all built-in functions
- Performance Optimization - Learn about performance optimization techniques
- Custom Functions - Learn how to develop custom functions