Custom Functions
# StreamSQL Custom Functions
StreamSQL provides a powerful custom function system, allowing users to extend SQL functionality according to their needs.
# Core Features
- Rich Function Types: Support for scalar functions, aggregate functions, table functions, and window functions
- Multiple Languages: Support for development in Go, JavaScript, and other languages
- Hot Deployment: Dynamic loading and unloading without service restart
- Performance Optimization: Support for incremental calculation and cache optimization
- Security Isolation: Secure execution environment, preventing system impact
# Architecture Diagram
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ SQL Parser │───▶│ Function Router │───▶│ Function Engine │
│ │ │ │ │ │
│ - Parse SQL │ │ - Route based │ │ - Execute │
│ - Validate │ │ on function │ │ functions │
│ functions │ │ type │ │ - Manage state │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ Function Store │
│ │
│ - Store │
│ definitions │
│ - Version │
│ management │
└─────────────────┘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Function Type System
Function Type | Description | Example |
---|---|---|
Scalar Function | Single input, single output | UPPER() , LENGTH() |
Aggregate Function | Multiple inputs, single output | SUM() , AVG() |
Table Function | Single input, multiple outputs | UNNEST() |
Window Function | Windowed calculation | ROW_NUMBER() |
# Basic Usage
# 1. Custom Aggregate Function
Define a custom aggregate function:
// CustomSum calculates the sum of values
type CustomSum struct {
sum float64
}
func (s *CustomSum) Add(value interface{}) {
if v, ok := value.(float64); ok {
s.sum += v
}
}
func (s *CustomSum) Result() interface{} {
return s.sum
}
func (s *CustomSum) Merge(other interface{}) {
if o, ok := other.(*CustomSum); ok {
s.sum += o.sum
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 2. Using in SQL
Register and use the custom function:
-- Register custom function
CREATE AGGREGATE FUNCTION custom_sum AS 'com.example.CustomSum';
-- Use in query
SELECT custom_sum(value) FROM events GROUP BY device_id;
1
2
3
4
5
2
3
4
5
# 3. Function Management
- Register Function:
CREATE FUNCTION
- Delete Function:
DROP FUNCTION
- Update Function:
CREATE OR REPLACE FUNCTION
- View Functions:
SHOW FUNCTIONS
# Function Development Details
# 1. Mathematical Functions
Develop mathematical calculation functions:
type PowerFunction struct{}
func (f *PowerFunction) Call(args ...interface{}) (interface{}, error) {
if len(args) != 2 {
return nil, errors.New("power function requires 2 arguments")
}
base, ok1 := args[0].(float64)
exponent, ok2 := args[1].(float64)
if !ok1 || !ok2 {
return nil, errors.New("arguments must be numbers")
}
return math.Pow(base, exponent), nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 2. String Functions
Develop string processing functions:
type ReverseFunction struct{}
func (f *ReverseFunction) Call(args ...interface{}) (interface{}, error) {
if len(args) != 1 {
return nil, errors.New("reverse function requires 1 argument")
}
str, ok := args[0].(string)
if !ok {
return nil, errors.New("argument must be string")
}
runes := []rune(str)
for i, j := 0, len(runes)-1; i < j; i, j = i+1, j-1 {
runes[i], runes[j] = runes[j], runes[i]
}
return string(runes), nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 3. Conversion Functions
Develop type conversion functions:
type ToBooleanFunction struct{}
func (f *ToBooleanFunction) Call(args ...interface{}) (interface{}, error) {
if len(args) != 1 {
return nil, errors.New("to_boolean function requires 1 argument")
}
switch v := args[0].(type) {
case bool:
return v, nil
case string:
return strings.ToLower(v) == "true", nil
case int, int64, float64:
return v != 0, nil
default:
return false, nil
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 4. Time Functions
Develop time processing functions:
type FormatTimestampFunction struct{}
func (f *FormatTimestampFunction) Call(args ...interface{}) (interface{}, error) {
if len(args) != 2 {
return nil, errors.New("format_timestamp function requires 2 arguments")
}
timestamp, ok1 := args[0].(int64)
format, ok2 := args[1].(string)
if !ok1 || !ok2 {
return nil, errors.New("invalid arguments")
}
t := time.Unix(timestamp, 0)
return t.Format(format), nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 5. Aggregate Functions
Develop custom aggregate functions:
type MedianAggregate struct {
values []float64
}
func (a *MedianAggregate) Add(value interface{}) {
if v, ok := value.(float64); ok {
a.values = append(a.values, v)
}
}
func (a *MedianAggregate) Result() interface{} {
if len(a.values) == 0 {
return nil
}
sort.Float64s(a.values)
n := len(a.values)
if n%2 == 0 {
return (a.values[n/2-1] + a.values[n/2]) / 2
}
return a.values[n/2]
}
func (a *MedianAggregate) Merge(other interface{}) {
if o, ok := other.(*MedianAggregate); ok {
a.values = append(a.values, o.values...)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 6. Analytic Functions
Develop window calculation functions:
type MovingAverageFunction struct {
windowSize int
}
func (f *MovingAverageFunction) Call(args ...interface{}) (interface{}, error) {
// Implement moving average calculation logic
return nil, nil
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# Advanced Features
# 1. Context Usage
Access query context in functions:
type ContextFunction struct{}
func (f *ContextFunction) Call(ctx context.Context, args ...interface{}) (interface{}, error) {
// Get query information from context
queryID := ctx.Value("query_id")
return queryID, nil
}
1
2
3
4
5
6
7
2
3
4
5
6
7
# 2. Error Handling
Robust error handling:
func (f *SafeDivideFunction) Call(args ...interface{}) (interface{}, error) {
if len(args) != 2 {
return nil, fmt.Errorf("divide function requires 2 arguments, got %d", len(args))
}
dividend, ok1 := args[0].(float64)
divisor, ok2 := args[1].(float64)
if !ok1 || !ok2 {
return nil, errors.New("arguments must be numbers")
}
if divisor == 0 {
return nil, errors.New("division by zero")
}
return dividend / divisor, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 3. Parameter Validation
Parameter type and range validation:
func validateRange(value, min, max float64) error {
if value < min || value > max {
return fmt.Errorf("value must be between %.2f and %.2f", min, max)
}
return nil
}
1
2
3
4
5
6
2
3
4
5
6
# Practical Application Cases
# 1. IoT Device Monitoring
Custom function for device status analysis:
type DeviceStatusFunction struct{}
func (f *DeviceStatusFunction) Call(args ...interface{}) (interface{}, error) {
temperature, _ := args[0].(float64)
humidity, _ := args[1].(float64)
if temperature > 40 || humidity > 80 {
return "ALERT", nil
} else if temperature > 30 || humidity > 60 {
return "WARNING", nil
}
return "NORMAL", nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
# 2. Financial Data Analysis
Custom function for financial indicators:
type RSIFunction struct {
gains []float64
losses []float64
}
func (f *RSIFunction) Add(value interface{}) {
// Implement RSI calculation logic
}
func (f *RSIFunction) Result() interface{} {
// Calculate and return RSI value
return 0.0
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 📚 Related Documentation
- Function Reference - View all built-in functions
- SQL Reference - View complete SQL syntax reference
- Performance Optimization - Learn about performance optimization techniques
Edit this page on GitHub (opens new window)
Last Updated: 2025/08/05, 02:24:31