RuleGo RuleGo
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • Quick Start

  • Rule Chain

  • Standard Components

  • Extension Components

  • Custom Components

  • Components marketplace

  • Visualization

  • AOP

  • Trigger

  • Advanced Topic

  • RuleGo-Server

  • FAQ

  • Endpoint Module

  • Support

  • StreamSQL

    • Overview
    • Quick Start
    • Core Concepts
    • SQL Reference
    • API Reference
      • Core API
        • Streamsql Main Class
        • Constructor
        • Execute
        • Emit
        • EmitSync
        • IsAggregationQuery
        • Stream
        • GetStats
        • GetDetailedStats
        • Stop
        • AddSink
        • Print
        • ToChannel
      • Configuration Options
        • Performance Configuration
        • WithHighPerformance
        • WithLowLatency
        • WithZeroDataLoss
        • WithCustomPerformance
        • Log Configuration
        • WithLogLevel
        • WithDiscardLog
        • Persistence Configuration
        • WithPersistence
        • WithZeroDataLossConfig
        • WithPersistencePerformanceConfig
        • WithCustomPersistence
        • Buffer Configuration
        • WithBufferSizes
        • Overflow Strategy Configuration
        • WithOverflowStrategy
        • Worker Pool Configuration
        • WithWorkerConfig
        • Monitoring Configuration
        • WithMonitoring
      • Stream Processing API
        • Stream Class
        • AddSink
        • GetWindow
        • Stop
      • Window API
        • Window Types
        • TumblingWindow
        • SlidingWindow
        • CountingWindow
        • SessionWindow
        • Window Interface
        • Window Interface
        • Add
        • Trigger
        • GetType
        • Stop
      • Function System API
        • Function Registration
        • RegisterCustomFunction
        • Register
        • Unregister
        • Get
        • GetByType
        • ListAll
        • Execute
        • Function Types
        • Function Handler
        • FunctionContext
        • WindowInfo
        • Utility Functions
        • ConvertToFloat64
        • ConvertToInt
        • ConvertToString
        • ConvertToTime
        • ConvertToFloat64Array
      • Aggregator API
        • Aggregate Types
        • Aggregator Interface
      • Expression API
        • Expression Interface
        • NewExpression
      • Log API
        • Logger Interface
        • Log Level
        • Creating Logger
        • New
        • NewDiscard
      • Type Definitions
        • Config
        • WindowConfig
        • FieldExpression
        • Projection
        • PerformanceConfig
        • BufferConfig
        • OverflowConfig
        • PersistenceConfig
        • ExpansionConfig
        • WorkerConfig
        • MonitoringConfig
        • WarningThresholds
      • Configuration Preset Functions
        • DefaultPerformanceConfig
        • HighPerformanceConfig
        • LowLatencyConfig
        • ZeroDataLossConfig
        • PersistencePerformanceConfig
        • WindowResult
      • Error Types
        • Common Errors
      • Usage Example
        • Complete Example
      • 📚 Related Documentation
    • RuleGo Integration
    • functions

    • case-studies

目录

API Reference

# API Reference

This chapter provides the complete API reference documentation for StreamSQL, including core interfaces, configuration options, function libraries, and other detailed information.

# Core API

# Streamsql Main Class

# Constructor

func New(options ...Option) *Streamsql
1

Creates a new StreamSQL instance.

Parameters:

  • options - Optional configuration items

Return Value:

  • *Streamsql - StreamSQL instance

Example:

// Default configuration
ssql := streamsql.New()

// High performance configuration
ssql := streamsql.New(streamsql.WithHighPerformance())

// Zero data loss configuration
ssql := streamsql.New(streamsql.WithZeroDataLoss())

// Custom configuration
ssql := streamsql.New(
    streamsql.WithLogLevel(logger.DEBUG),
    streamsql.WithDiscardLog(),
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# Execute

func (s *Streamsql) Execute(sql string) error
1

Executes SQL query and starts stream processing.

Parameters:

  • sql - SQL query statement

Return Value:

  • error - Execution error, nil on success

Example:

sql := "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('5m')"
err := ssql.Execute(sql)
if err != nil {
    log.Fatal(err)
}
1
2
3
4
5

# Emit

func (s *Streamsql) Emit(data map[string]interface{})
1

Adds data to the stream asynchronously.

Parameters:

  • data - Data record, must be of type map[string]interface{}

Example:

data := map[string]interface{}{
    "deviceId": "sensor001",
    "temperature": 25.5,
    "timestamp": time.Now(),
}
ssql.Emit(data)
1
2
3
4
5
6

# EmitSync

func (s *Streamsql) EmitSync(data map[string]interface{}) (map[string]interface{}, error)
1

Processes data synchronously and returns results immediately, only supports non-aggregation queries.

Parameters:

  • data - Data record, must be of type map[string]interface{}

Return Value:

  • map[string]interface{} - Processing result, returns nil if filter conditions don't match
  • error - Processing error

Example:

data := map[string]interface{}{
    "deviceId": "sensor001",
    "temperature": 25.5,
    "timestamp": time.Now(),
}
result, err := ssql.EmitSync(data)
if err != nil {
    log.Printf("Processing error: %v", err)
} else if result != nil {
    fmt.Printf("Processing result: %v", result)
}
1
2
3
4
5
6
7
8
9
10
11

# IsAggregationQuery

func (s *Streamsql) IsAggregationQuery() bool
1

Checks whether the current query is an aggregation query.

Return Value:

  • bool - Whether it's an aggregation query

Example:

if ssql.IsAggregationQuery() {
    fmt.Println("Current query contains aggregation operations")
} else {
    fmt.Println("Current query is a simple query")
}
1
2
3
4
5

# Stream

func (s *Streamsql) Stream() *stream.Stream
1

Gets the underlying stream processing instance.

Return Value:

  • *stream.Stream - Stream processing instance

Example:

stream := ssql.Stream()
stream.AddSink(func(result interface{}) {
    fmt.Printf("Result: %v\n", result)
})
1
2
3
4

# GetStats

func (s *Streamsql) GetStats() map[string]int64
1

Gets stream processing statistics.

Return Value:

  • map[string]int64 - Statistics map

Example:

stats := ssql.GetStats()
fmt.Printf("Processed data count: %d\n", stats["processed_count"])
1
2

# GetDetailedStats

func (s *Streamsql) GetDetailedStats() map[string]interface{}
1

Gets detailed performance statistics.

Return Value:

  • map[string]interface{} - Detailed statistics

# Stop

func (s *Streamsql) Stop()
1

Stops stream processing and cleans up resources.

Example:

defer ssql.Stop()
1

# AddSink

func (s *Streamsql) AddSink(sink func([]map[string]interface{}))
1

Adds result processing callback function.

Parameters:

  • sink - Result processing callback function, receives result data of type []map[string]interface{}

Example:

ssql.AddSink(func(results []map[string]interface{}) {
    fmt.Printf("Processing results: %v\n", results)
})
1
2
3

# Print

func (s *Streamsql) Print()
1

Convenient method, automatically adds a sink function that prints results to console.

Example:

ssql.Print() // Equivalent to ssql.AddSink(func(result interface{}) { fmt.Println(result) })
1

# ToChannel

func (s *Streamsql) ToChannel() <-chan []map[string]interface{}
1

Returns result channel for asynchronously getting processing results.

Return Value:

  • <-chan []map[string]interface{} - Read-only result channel, returns nil if SQL hasn't been executed

Example:

// Get result channel
resultChan := ssql.ToChannel()
if resultChan != nil {
    go func() {
        for results := range resultChan {
            fmt.Printf("Async results: %v\n", results)
        }
    }()
}
1
2
3
4
5
6
7
8
9

# Configuration Options

# Performance Configuration

# WithHighPerformance

func WithHighPerformance() Option
1

Uses high-performance configuration, suitable for scenarios requiring maximum throughput.

Example:

ssql := streamsql.New(streamsql.WithHighPerformance())
1

# WithLowLatency

func WithLowLatency() Option
1

Uses low-latency configuration, suitable for real-time interactive applications.

Example:

ssql := streamsql.New(streamsql.WithLowLatency())
1

# WithZeroDataLoss

func WithZeroDataLoss() Option
1

Uses zero data loss configuration, suitable for critical business data.

Example:

ssql := streamsql.New(streamsql.WithZeroDataLoss())
1

# WithCustomPerformance

func WithCustomPerformance(config types.PerformanceConfig) Option
1

Uses custom performance configuration.

Parameters:

  • config - Custom performance configuration

Example:

config := types.DefaultPerformanceConfig()
config.BufferConfig.DataChannelSize = 2000
ssql := streamsql.New(streamsql.WithCustomPerformance(config))
1
2
3

# Log Configuration

# WithLogLevel

func WithLogLevel(level logger.Level) Option
1

Sets log level.

Parameters:

  • level - Log level (DEBUG, INFO, WARN, ERROR, OFF)

Example:

ssql := streamsql.New(streamsql.WithLogLevel(logger.DEBUG))
1

# WithDiscardLog

func WithDiscardLog() Option
1

Disables log output (recommended for production environment).

Example:

ssql := streamsql.New(streamsql.WithDiscardLog())
1

# Persistence Configuration

# WithPersistence

func WithPersistence() Option
1

Uses persistence configuration preset.

Example:

ssql := streamsql.New(streamsql.WithPersistence())
1

# WithZeroDataLossConfig

func WithZeroDataLossConfig() Option
1

Uses zero data loss configuration preset, adopts blocking strategy to ensure no data loss.

Example:

ssql := streamsql.New(streamsql.WithZeroDataLossConfig())
1

# WithPersistencePerformanceConfig

func WithPersistencePerformanceConfig() Option
1

Uses persistence performance configuration preset, persists data to disk when buffer overflows.

Example:

ssql := streamsql.New(streamsql.WithPersistencePerformanceConfig())
1

# WithCustomPersistence

func WithCustomPersistence(dataDir string, maxFileSize int64, flushInterval time.Duration) Option
1

Uses custom persistence configuration.

Parameters:

  • dataDir - Data directory
  • maxFileSize - Maximum file size
  • flushInterval - Flush interval

Example:

ssql := streamsql.New(streamsql.WithCustomPersistence("/data", 100*1024*1024, 5*time.Second))
1

# Buffer Configuration

# WithBufferSizes

func WithBufferSizes(dataChannelSize, resultChannelSize, windowOutputSize int) Option
1

Sets custom buffer sizes.

Parameters:

  • dataChannelSize - Data channel size
  • resultChannelSize - Result channel size
  • windowOutputSize - Window output size

Example:

ssql := streamsql.New(streamsql.WithBufferSizes(2000, 1000, 500))
1

# Overflow Strategy Configuration

# WithOverflowStrategy

func WithOverflowStrategy(strategy string, blockTimeout time.Duration) Option
1

Sets overflow strategy.

Parameters:

  • strategy - Overflow strategy ("drop", "block", "persist")
  • blockTimeout - Block timeout duration

Example:

ssql := streamsql.New(streamsql.WithOverflowStrategy("drop", 5*time.Second))
1

# Worker Pool Configuration

# WithWorkerConfig

func WithWorkerConfig(sinkPoolSize, sinkWorkerCount, maxRetryRoutines int) Option
1

Sets worker pool configuration.

Parameters:

  • sinkPoolSize - Result processing pool size
  • sinkWorkerCount - Worker thread count
  • maxRetryRoutines - Maximum retry goroutines

Example:

ssql := streamsql.New(streamsql.WithWorkerConfig(100, 10, 5))
1

# Monitoring Configuration

# WithMonitoring

func WithMonitoring(updateInterval time.Duration, enableDetailedStats bool) Option
1

Enables detailed monitoring.

Parameters:

  • updateInterval - Statistics update interval
  • enableDetailedStats - Whether to enable detailed statistics

Example:

ssql := streamsql.New(streamsql.WithMonitoring(10*time.Second, true))
1

# Stream Processing API

# Stream Class

# AddSink

func (s *Stream) AddSink(sink func(interface{}))
1

Adds result processing function.

Parameters:

  • sink - Result processing callback function

Example:

ssql.Stream().AddSink(func(result interface{}) {
    // Process result
    fmt.Printf("Processing result: %v\n", result)
})
1
2
3
4

# GetWindow

func (s *Stream) GetWindow() window.Window
1

Gets window instance.

Return Value:

  • window.Window - Window interface

# Stop

func (s *Stream) Stop()
1

Stops stream processing.

# Window API

# Window Types

# TumblingWindow

func NewTumblingWindow(size time.Duration, timeUnit time.Duration, tsProp string) *TumblingWindow
1

Creates a tumbling window.

Parameters:

  • size - Window size
  • timeUnit - Time unit
  • tsProp - Timestamp field name

# SlidingWindow

func NewSlidingWindow(size, slide time.Duration, timeUnit time.Duration, tsProp string) *SlidingWindow
1

Creates a sliding window.

Parameters:

  • size - Window size
  • slide - Slide interval
  • timeUnit - Time unit
  • tsProp - Timestamp field name

# CountingWindow

func NewCountingWindow(count int) *CountingWindow
1

Creates a counting window.

Parameters:

  • count - Trigger count

# SessionWindow

func NewSessionWindow(timeout time.Duration, groupByKey string) *SessionWindow
1

Creates a session window.

Parameters:

  • timeout - Session timeout duration
  • groupByKey - Group by field

# Window Interface

# Window Interface

type Window interface {
    Add(data interface{})
    Trigger() []types.WindowResult
    GetType() WindowType
    Stop()
}
1
2
3
4
5
6

Method Descriptions:

# Add
Add(data interface{})
1

Adds data to the window.

# Trigger
Trigger() []types.WindowResult
1

Manually triggers window calculation.

Return Value:

  • []types.WindowResult - Window result list
# GetType
GetType() WindowType
1

Gets window type.

Return Value:

  • WindowType - Window type enumeration
# Stop
Stop()
1

Stops window processing.

# Function System API

# Function Registration

# RegisterCustomFunction

func RegisterCustomFunction(
    name string,
    funcType FunctionType,
    category string,
    description string,
    minArgs int,
    maxArgs int,
    handler FunctionHandler,
) error
1
2
3
4
5
6
7
8
9

Registers a custom function.

Parameters:

  • name - Function name
  • funcType - Function type
  • category - Function category
  • description - Function description
  • minArgs - Minimum argument count
  • maxArgs - Maximum argument count
  • handler - Function handler

Return Value:

  • error - Registration error

Example:

err := functions.RegisterCustomFunction(
    "my_function",
    functions.TypeMath,
    "Mathematical Calculation",
    "Custom mathematical function",
    2, 2,
    func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
        // Function implementation
        return result, nil
    },
)
1
2
3
4
5
6
7
8
9
10
11

# Register

func Register(function Function) error
1

Registers function instance.

Parameters:

  • function - Function instance

Return Value:

  • error - Registration error

# Unregister

func Unregister(name string)
1

Unregisters function.

Parameters:

  • name - Function name

Example:

functions.Unregister("my_function")
1

# Get

func Get(name string) (Function, bool)
1

Gets function instance.

Parameters:

  • name - Function name

Return Value:

  • Function - Function instance
  • bool - Whether it exists

# GetByType

func GetByType(funcType FunctionType) []Function
1

Gets function list by type.

Parameters:

  • funcType - Function type

Return Value:

  • []Function - Function instance list

# ListAll

func ListAll() map[string]Function
1

Lists all registered functions.

Return Value:

  • map[string]Function - Map from function name to function instance

# Execute

func Execute(name string, args []interface{}) (interface{}, error)
1

Executes function by specified name.

Parameters:

  • name - Function name
  • args - Function arguments

Return Value:

  • interface{} - Execution result
  • error - Execution error

# Function Types

type FunctionType string

const (
    TypeMath        FunctionType = "math"
    TypeString      FunctionType = "string"
    TypeConversion  FunctionType = "conversion"
    TypeDateTime    FunctionType = "datetime"
    TypeAggregation FunctionType = "aggregation"
    TypeAnalytical  FunctionType = "analytical"
    TypeWindow      FunctionType = "window"
    TypeCustom      FunctionType = "custom"
)
1
2
3
4
5
6
7
8
9
10
11
12

# Function Handler

type FunctionHandler func(ctx *FunctionContext, args []interface{}) (interface{}, error)
1

# FunctionContext

type FunctionContext struct {
    CurrentRow   map[string]interface{}
    WindowInfo   *WindowInfo
    CustomData   map[string]interface{}
}
1
2
3
4
5

Field Descriptions:

  • CurrentRow - Currently processed data row
  • WindowInfo - Window information
  • CustomData - Custom data

# WindowInfo

type WindowInfo struct {
    StartTime time.Time
    EndTime   time.Time
    Size      time.Duration
    Type      string
}
1
2
3
4
5
6

# Utility Functions

# ConvertToFloat64

func ConvertToFloat64(value interface{}) (float64, error)
1

Converts value to float64 type.

# ConvertToInt

func ConvertToInt(value interface{}) (int, error)
1

Converts value to int type.

# ConvertToString

func ConvertToString(value interface{}) (string, error)
1

Converts value to string type.

# ConvertToTime

func ConvertToTime(value interface{}) (time.Time, error)
1

Converts value to time.Time type.

# ConvertToFloat64Array

func ConvertToFloat64Array(value interface{}) ([]float64, error)
1

Converts value to float64 array.

# Aggregator API

# Aggregate Types

type AggregateType string

const (
    AggregateSum        AggregateType = "sum"
    AggregateAvg        AggregateType = "avg"
    AggregateMin        AggregateType = "min"
    AggregateMax        AggregateType = "max"
    AggregateCount      AggregateType = "count"
    AggregateStddev     AggregateType = "stddev"
    AggregateMedian     AggregateType = "median"
    AggregatePercentile AggregateType = "percentile"
    // ... Other aggregate types
)
1
2
3
4
5
6
7
8
9
10
11
12
13

# Aggregator Interface

type Aggregator interface {
    Add(value interface{}) error
    GetResult() interface{}
    Reset()
    GetType() AggregateType
}
1
2
3
4
5
6

# Expression API

# Expression Interface

type Expression interface {
    Evaluate(data map[string]interface{}) (interface{}, error)
    GetFields() []string
}
1
2
3
4

# NewExpression

func NewExpression(expr string) (Expression, error)
1

Creates expression instance.

Parameters:

  • expr - Expression string

Return Value:

  • Expression - Expression instance
  • error - Creation error

# Log API

# Logger Interface

type Logger interface {
    Debug(format string, args ...interface{})
    Info(format string, args ...interface{})
    Warn(format string, args ...interface{})
    Error(format string, args ...interface{})
    SetLevel(level Level)
    SetOutput(writer io.Writer)
}
1
2
3
4
5
6
7
8

# Log Level

type Level int

const (
    DEBUG Level = iota
    INFO
    WARN
    ERROR
    OFF
)
1
2
3
4
5
6
7
8
9

# Creating Logger

# New

func New(output io.Writer, level Level) Logger
1

Creates new logger.

# NewDiscard

func NewDiscard() Logger
1

Creates discard logger.

# Type Definitions

# Config

type Config struct {
    WindowConfig      WindowConfig
    GroupFields       []string
    SelectFields      map[string]aggregator.AggregateType
    FieldAlias        map[string]string
    SimpleFields      []string
    FieldExpressions  map[string]FieldExpression
    Where             string
    Having            string
    NeedWindow        bool
    Distinct          bool
    Limit             int
    Projections       []Projection
    PerformanceConfig PerformanceConfig
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# WindowConfig

type WindowConfig struct {
    Type       window.WindowType
    Params     map[string]interface{}
    TsProp     string
    TimeUnit   time.Duration
    GroupByKey string
}
1
2
3
4
5
6
7

# FieldExpression

type FieldExpression struct {
    Field      string
    Expression string
    Fields     []string
}
1
2
3
4
5

# Projection

type Projection struct {
    OutputName string
    SourceType ProjectionSourceType
    InputName  string
}

type ProjectionSourceType int

const (
    SourceGroupKey ProjectionSourceType = iota
    SourceAggregateResult
    SourceWindowProperty
)
1
2
3
4
5
6
7
8
9
10
11
12
13

# PerformanceConfig

type PerformanceConfig struct {
    BufferConfig     BufferConfig
    OverflowConfig   OverflowConfig
    WorkerConfig     WorkerConfig
    MonitoringConfig MonitoringConfig
}
1
2
3
4
5
6

# BufferConfig

type BufferConfig struct {
    DataChannelSize     int
    ResultChannelSize   int
    WindowOutputSize    int
    EnableDynamicResize bool
    MaxBufferSize       int
    UsageThreshold      float64
}
1
2
3
4
5
6
7
8

# OverflowConfig

type OverflowConfig struct {
    Strategy          string
    BlockTimeout      time.Duration
    AllowDataLoss     bool
    PersistenceConfig *PersistenceConfig
    ExpansionConfig   ExpansionConfig
}
1
2
3
4
5
6
7

# PersistenceConfig

type PersistenceConfig struct {
    DataDir       string
    MaxFileSize   int64
    FlushInterval time.Duration
    MaxRetries    int
    RetryInterval time.Duration
}
1
2
3
4
5
6
7

# ExpansionConfig

type ExpansionConfig struct {
    GrowthFactor     float64
    MinIncrement     int
    TriggerThreshold float64
    ExpansionTimeout time.Duration
}
1
2
3
4
5
6

# WorkerConfig

type WorkerConfig struct {
    SinkPoolSize     int
    SinkWorkerCount  int
    MaxRetryRoutines int
}
1
2
3
4
5

# MonitoringConfig

type MonitoringConfig struct {
    EnableMonitoring    bool
    StatsUpdateInterval time.Duration
    EnableDetailedStats bool
    WarningThresholds   WarningThresholds
}
1
2
3
4
5
6

# WarningThresholds

type WarningThresholds struct {
    DropRateWarning     float64
    DropRateCritical    float64
    BufferUsageWarning  float64
    BufferUsageCritical float64
}
1
2
3
4
5
6

# Configuration Preset Functions

# DefaultPerformanceConfig

func DefaultPerformanceConfig() PerformanceConfig
1

Returns default performance configuration, balancing performance and resource usage.

# HighPerformanceConfig

func HighPerformanceConfig() PerformanceConfig
1

Returns high-performance configuration preset, suitable for scenarios requiring maximum throughput.

# LowLatencyConfig

func LowLatencyConfig() PerformanceConfig
1

Returns low-latency configuration preset, suitable for real-time interactive applications.

# ZeroDataLossConfig

func ZeroDataLossConfig() PerformanceConfig
1

Returns zero data loss configuration preset, adopts blocking strategy to ensure no data loss.

# PersistencePerformanceConfig

func PersistencePerformanceConfig() PerformanceConfig
1

Returns persistence performance configuration preset, persists data to disk when buffer overflows.

# WindowResult

type WindowResult struct {
    GroupKey  string
    Results   []map[string]interface{}
    StartTime time.Time
    EndTime   time.Time
}
1
2
3
4
5
6

# Error Types

# Common Errors

var (
    ErrInvalidSQL           = errors.New("invalid SQL statement")
    ErrUnsupportedOperation = errors.New("unsupported operation")
    ErrInvalidParameter     = errors.New("invalid parameter")
    ErrFunctionNotFound     = errors.New("function not found")
    ErrTypeConversion       = errors.New("type conversion failed")
    ErrWindowNotInitialized = errors.New("window not initialized")
)
1
2
3
4
5
6
7
8

# Usage Example

# Complete Example

package main

import (
    "fmt"
    "log"
    "time"
    "github.com/rulego/streamsql"
    "github.com/rulego/streamsql/functions"
    "github.com/rulego/streamsql/logger"
)

func main() {
    // 1. Create StreamSQL instance
    ssql := streamsql.New(
        streamsql.WithLogLevel(logger.INFO),
    )
    defer ssql.Stop()
    
    // 2. Register custom function
    err := functions.RegisterCustomFunction(
        "celsius_to_fahrenheit",
        functions.TypeConversion,
        "Temperature Conversion",
        "Convert Celsius to Fahrenheit",
        1, 1,
        func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
            celsius, err := functions.ConvertToFloat64(args[0])
            if err != nil {
                return nil, err
            }
            fahrenheit := celsius*9/5 + 32
            return fahrenheit, nil
        },
    )
    if err != nil {
        log.Fatal(err)
    }
    
    // 3. Execute SQL query
    sql := "SELECT deviceId, celsius_to_fahrenheit(temperature) as fahrenheit FROM stream"
    err = ssql.Execute(sql)
    if err != nil {
        log.Fatal(err)
    }
    
    // 4. Add result processor
    ssql.AddSink(func(results []map[string]interface{}) {
        fmt.Printf("Processing results: %v\n", results)
    })
    
    // 5. Send test data
    data := map[string]interface{}{
        "deviceId": "sensor001",
        "temperature": 25.0,
    }
    ssql.Emit(data)
    
    // 6. Wait for processing
    time.Sleep(1 * time.Second)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

# 📚 Related Documentation

  • SQL Reference - View complete SQL syntax reference
  • Function Reference - View all built-in functions
  • Performance Optimization - Learn about performance optimization techniques
  • Custom Functions - Learn how to develop custom functions
Edit this page on GitHub (opens new window)
Last Updated: 2025/08/05, 02:24:31
SQL Reference
RuleGo Integration

← SQL Reference RuleGo Integration→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式