RuleGo RuleGo
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • Quick Start

  • Rule Chain

  • Standard Components

  • Extension Components

  • Custom Components

  • Components marketplace

  • Visualization

  • AOP

  • Trigger

  • Advanced Topic

  • RuleGo-Server

  • FAQ

  • Endpoint Module

  • Support

  • StreamSQL

    • Overview
    • Quick Start
    • Core Concepts
    • SQL Reference
    • API Reference
    • RuleGo Integration
    • functions

      • Aggregate Functions
      • Analytical Functions
      • Window Functions
      • Math Functions
      • String Functions
      • Conversion Functions
      • DateTime Functions
      • JSON Functions
      • Hash Functions
      • Array Functions
      • Type Check Functions
      • Conditional Functions
      • Multi-row Functions
      • Expression Functions
      • Custom Functions
        • Core Features
        • Architecture Diagram
        • Function Type System
        • Basic Usage
          • 1. Custom Aggregate Function
          • 2. Using in SQL
          • 3. Function Management
        • Function Development Details
          • 1. Mathematical Functions
          • 2. String Functions
          • 3. Conversion Functions
          • 4. Time Functions
          • 5. Aggregate Functions
          • 6. Analytic Functions
        • Advanced Features
          • 1. Context Usage
          • 2. Error Handling
          • 3. Parameter Validation
        • Practical Application Cases
          • 1. IoT Device Monitoring
          • 2. Financial Data Analysis
        • 📚 Related Documentation
    • case-studies

目录

Custom Functions

# StreamSQL Custom Functions

StreamSQL provides a powerful custom function system, allowing users to extend SQL functionality according to their needs.

# Core Features

  • Rich Function Types: Support for scalar functions, aggregate functions, table functions, and window functions
  • Multiple Languages: Support for development in Go, JavaScript, and other languages
  • Hot Deployment: Dynamic loading and unloading without service restart
  • Performance Optimization: Support for incremental calculation and cache optimization
  • Security Isolation: Secure execution environment, preventing system impact

# Architecture Diagram

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   SQL Parser    │───▶│ Function Router │───▶│ Function Engine │
│                 │    │                 │    │                 │
│ - Parse SQL     │    │ - Route based   │    │ - Execute       │
│ - Validate      │    │   on function   │    │   functions     │
│   functions     │    │   type          │    │ - Manage state  │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                                │
                                ▼
                       ┌─────────────────┐
                       │ Function Store  │
                       │                 │
                       │ - Store         │
                       │   definitions   │
                       │ - Version       │
                       │   management    │
                       └─────────────────┘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# Function Type System

Function Type Description Example
Scalar Function Single input, single output UPPER(), LENGTH()
Aggregate Function Multiple inputs, single output SUM(), AVG()
Table Function Single input, multiple outputs UNNEST()
Window Function Windowed calculation ROW_NUMBER()

# Basic Usage

# 1. Custom Aggregate Function

Define a custom aggregate function:

// CustomSum calculates the sum of values
type CustomSum struct {
    sum float64
}

func (s *CustomSum) Add(value interface{}) {
    if v, ok := value.(float64); ok {
        s.sum += v
    }
}

func (s *CustomSum) Result() interface{} {
    return s.sum
}

func (s *CustomSum) Merge(other interface{}) {
    if o, ok := other.(*CustomSum); ok {
        s.sum += o.sum
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 2. Using in SQL

Register and use the custom function:

-- Register custom function
CREATE AGGREGATE FUNCTION custom_sum AS 'com.example.CustomSum';

-- Use in query
SELECT custom_sum(value) FROM events GROUP BY device_id;
1
2
3
4
5

# 3. Function Management

  • Register Function: CREATE FUNCTION
  • Delete Function: DROP FUNCTION
  • Update Function: CREATE OR REPLACE FUNCTION
  • View Functions: SHOW FUNCTIONS

# Function Development Details

# 1. Mathematical Functions

Develop mathematical calculation functions:

type PowerFunction struct{}

func (f *PowerFunction) Call(args ...interface{}) (interface{}, error) {
    if len(args) != 2 {
        return nil, errors.New("power function requires 2 arguments")
    }
    
    base, ok1 := args[0].(float64)
    exponent, ok2 := args[1].(float64)
    
    if !ok1 || !ok2 {
        return nil, errors.New("arguments must be numbers")
    }
    
    return math.Pow(base, exponent), nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 2. String Functions

Develop string processing functions:

type ReverseFunction struct{}

func (f *ReverseFunction) Call(args ...interface{}) (interface{}, error) {
    if len(args) != 1 {
        return nil, errors.New("reverse function requires 1 argument")
    }
    
    str, ok := args[0].(string)
    if !ok {
        return nil, errors.New("argument must be string")
    }
    
    runes := []rune(str)
    for i, j := 0, len(runes)-1; i < j; i, j = i+1, j-1 {
        runes[i], runes[j] = runes[j], runes[i]
    }
    
    return string(runes), nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 3. Conversion Functions

Develop type conversion functions:

type ToBooleanFunction struct{}

func (f *ToBooleanFunction) Call(args ...interface{}) (interface{}, error) {
    if len(args) != 1 {
        return nil, errors.New("to_boolean function requires 1 argument")
    }
    
    switch v := args[0].(type) {
    case bool:
        return v, nil
    case string:
        return strings.ToLower(v) == "true", nil
    case int, int64, float64:
        return v != 0, nil
    default:
        return false, nil
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 4. Time Functions

Develop time processing functions:

type FormatTimestampFunction struct{}

func (f *FormatTimestampFunction) Call(args ...interface{}) (interface{}, error) {
    if len(args) != 2 {
        return nil, errors.New("format_timestamp function requires 2 arguments")
    }
    
    timestamp, ok1 := args[0].(int64)
    format, ok2 := args[1].(string)
    
    if !ok1 || !ok2 {
        return nil, errors.New("invalid arguments")
    }
    
    t := time.Unix(timestamp, 0)
    return t.Format(format), nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 5. Aggregate Functions

Develop custom aggregate functions:

type MedianAggregate struct {
    values []float64
}

func (a *MedianAggregate) Add(value interface{}) {
    if v, ok := value.(float64); ok {
        a.values = append(a.values, v)
    }
}

func (a *MedianAggregate) Result() interface{} {
    if len(a.values) == 0 {
        return nil
    }
    
    sort.Float64s(a.values)
    n := len(a.values)
    
    if n%2 == 0 {
        return (a.values[n/2-1] + a.values[n/2]) / 2
    }
    
    return a.values[n/2]
}

func (a *MedianAggregate) Merge(other interface{}) {
    if o, ok := other.(*MedianAggregate); ok {
        a.values = append(a.values, o.values...)
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 6. Analytic Functions

Develop window calculation functions:

type MovingAverageFunction struct {
    windowSize int
}

func (f *MovingAverageFunction) Call(args ...interface{}) (interface{}, error) {
    // Implement moving average calculation logic
    return nil, nil
}
1
2
3
4
5
6
7
8

# Advanced Features

# 1. Context Usage

Access query context in functions:

type ContextFunction struct{}

func (f *ContextFunction) Call(ctx context.Context, args ...interface{}) (interface{}, error) {
    // Get query information from context
    queryID := ctx.Value("query_id")
    return queryID, nil
}
1
2
3
4
5
6
7

# 2. Error Handling

Robust error handling:

func (f *SafeDivideFunction) Call(args ...interface{}) (interface{}, error) {
    if len(args) != 2 {
        return nil, fmt.Errorf("divide function requires 2 arguments, got %d", len(args))
    }
    
    dividend, ok1 := args[0].(float64)
    divisor, ok2 := args[1].(float64)
    
    if !ok1 || !ok2 {
        return nil, errors.New("arguments must be numbers")
    }
    
    if divisor == 0 {
        return nil, errors.New("division by zero")
    }
    
    return dividend / divisor, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 3. Parameter Validation

Parameter type and range validation:

func validateRange(value, min, max float64) error {
    if value < min || value > max {
        return fmt.Errorf("value must be between %.2f and %.2f", min, max)
    }
    return nil
}
1
2
3
4
5
6

# Practical Application Cases

# 1. IoT Device Monitoring

Custom function for device status analysis:

type DeviceStatusFunction struct{}

func (f *DeviceStatusFunction) Call(args ...interface{}) (interface{}, error) {
    temperature, _ := args[0].(float64)
    humidity, _ := args[1].(float64)
    
    if temperature > 40 || humidity > 80 {
        return "ALERT", nil
    } else if temperature > 30 || humidity > 60 {
        return "WARNING", nil
    }
    
    return "NORMAL", nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 2. Financial Data Analysis

Custom function for financial indicators:

type RSIFunction struct {
    gains  []float64
    losses []float64
}

func (f *RSIFunction) Add(value interface{}) {
    // Implement RSI calculation logic
}

func (f *RSIFunction) Result() interface{} {
    // Calculate and return RSI value
    return 0.0
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 📚 Related Documentation

  • Function Reference - View all built-in functions
  • SQL Reference - View complete SQL syntax reference
  • Performance Optimization - Learn about performance optimization techniques
Edit this page on GitHub (opens new window)
Last Updated: 2025/08/05, 02:24:31
Expression Functions
Case Studies Overview

← Expression Functions Case Studies Overview→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式