核心概念
# 核心概念
理解StreamSQL的核心概念是高效使用它的关键。本章将详细介绍流处理、窗口、聚合等重要概念。
# 流处理基础
# 什么是数据流
数据流(Stream)是一系列连续产生的数据记录,具有以下特征:
- 无界性:数据持续不断地产生,没有明确的结束
- 时序性:数据按时间顺序到达
- 实时性:需要快速处理,不能等待所有数据
- 不可变性:历史数据不可修改
# 流处理 vs 批处理
特征 | 流处理 | 批处理 |
---|---|---|
数据边界 | 无界 | 有界 |
处理延迟 | 毫秒级 | 分钟/小时级 |
数据完整性 | 近似结果 | 精确结果 |
资源使用 | 持续占用 | 周期性占用 |
应用场景 | 实时监控、告警 | 报表、分析 |
# 流处理模型
StreamSQL采用微批处理模型:
# 时间语义
# 时间类型
StreamSQL支持两种时间概念:
# 1. 事件时间 (Event Time)
- 数据产生时的真实时间
- 使用
with (TIMESTAMP='field_name')
指定 - 适用于需要精确时序的场景
- 重要:设置了事件时间后,窗口时间基于消息内的时间戳字段,而非消息到达时间
SELECT deviceId, AVG(temperature)
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
WITH (TIMESTAMP='event_time')
1
2
3
4
2
3
4
# 2. 处理时间 (Processing Time)
- 数据到达处理系统的时间
- 默认使用系统当前时间
- 处理简单,但可能不准确
-- 使用处理时间(默认)
SELECT deviceId, AVG(temperature)
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
1
2
3
4
2
3
4
# 时间格式
支持多种时间单位:
-- 时间单位配置
WITH (TIMEUNIT='ss') -- 秒
WITH (TIMEUNIT='ms') -- 毫秒
WITH (TIMEUNIT='mi') -- 分钟
WITH (TIMEUNIT='hh') -- 小时
WITH (TIMEUNIT='dd') -- 天
1
2
3
4
5
6
2
3
4
5
6
# 窗口概念
窗口是流处理中的核心概念,用于将无界流分割成有界的数据集进行聚合操作。
# 窗口类型
# 1. 滚动窗口 (Tumbling Window)
固定大小、无重叠的时间窗口:
-- 每5分钟计算一次平均值
SELECT AVG(temperature)
FROM stream
GROUP BY TumblingWindow('5m')
1
2
3
4
2
3
4
特点:
- 每条数据只属于一个窗口
- 窗口之间无重叠
- 适合周期性统计
# 2. 滑动窗口 (Sliding Window)
固定大小、有重叠的时间窗口:
-- 5分钟窗口,每2分钟滑动一次
SELECT AVG(temperature)
FROM stream
GROUP BY SlidingWindow('5m', '2m')
1
2
3
4
2
3
4
特点:
- 每条数据可能属于多个窗口
- 提供更平滑的分析结果
- 计算开销相对较大
# 3. 计数窗口 (Counting Window)
基于数据条数的窗口:
-- 每100条数据计算一次
SELECT AVG(temperature)
FROM stream
GROUP BY CountingWindow(100)
1
2
3
4
2
3
4
特点:
- 基于数据量而非时间
- 窗口大小固定
- 适合数据量稳定的场景
# 4. 会话窗口 (Session Window)
基于数据活跃度的动态窗口:
-- 超时5分钟关闭会话
SELECT user_id, COUNT(*)
FROM stream
GROUP BY user_id, SessionWindow('5m')
1
2
3
4
2
3
4
特点:
- 窗口大小动态变化
- 基于数据间隔判断会话
- 适合用户行为分析
# 窗口生命周期
# 聚合操作
# 聚合函数分类
# 1. 统计聚合
SELECT deviceId,
COUNT(*) as data_count, -- 计数
SUM(temperature) as total_temp, -- 求和
AVG(temperature) as avg_temp, -- 平均值
MIN(temperature) as min_temp, -- 最小值
MAX(temperature) as max_temp -- 最大值
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 2. 高级统计
SELECT deviceId,
STDDEV(temperature) as std_temp, -- 标准差
MEDIAN(temperature) as med_temp, -- 中位数
PERCENTILE(temperature, 0.95) as p95 -- 95分位数
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
1
2
3
4
5
6
2
3
4
5
6
# 3. 集合聚合
SELECT deviceId,
COLLECT(temperature) as temp_list, -- 收集到数组
LAST_VALUE(temperature) as last_temp -- 最后一个值
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
1
2
3
4
5
2
3
4
5
# 聚合状态管理
StreamSQL自动管理聚合状态:
# 表达式系统
# 算术表达式
SELECT deviceId,
temperature * 1.8 + 32 as fahrenheit, -- 温度转换
(humidity + moisture) / 2 as avg_wet -- 平均湿度
FROM stream
1
2
3
4
2
3
4
# 逻辑表达式
SELECT deviceId,
temperature > 30 AND humidity > 80 as alert_condition
FROM stream
WHERE temperature IS NOT NULL
1
2
3
4
2
3
4
# 字符串表达式
SELECT deviceId,
CONCAT(deviceId, '_', status) as device_status,
UPPER(location) as location_upper
FROM stream
1
2
3
4
2
3
4
# 条件表达式
SELECT deviceId,
CASE
WHEN temperature > 35 THEN '高温'
WHEN temperature > 25 THEN '正常'
ELSE '低温'
END as temp_level
FROM stream
1
2
3
4
5
6
7
2
3
4
5
6
7
# 数据类型
# 基本类型
类型 | 说明 | 示例 |
---|---|---|
数值 | 整数、浮点数 | 25 , 3.14 , -10 |
字符串 | 文本数据 | "sensor001" , 'active' |
布尔 | 逻辑值 | true , false |
时间 | 时间戳 | time.Now() |
# 复合类型
// 支持嵌套结构
data := map[string]interface{}{
"deviceId": "sensor001",
"location": map[string]interface{}{
"building": "A",
"floor": 3,
},
"readings": []float64{23.5, 24.1, 25.2},
}
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 类型转换
StreamSQL提供自动类型转换:
-- 自动转换字符串到数字
SELECT deviceId, temperature + '5' as adjusted_temp
FROM stream
-- 显式转换
SELECT deviceId, CAST(temperature AS STRING) as temp_str
FROM stream
1
2
3
4
5
6
7
2
3
4
5
6
7
# 执行模型
# 数据流向
# 处理阶段
- 解析阶段:SQL语句解析为抽象语法树
- 规划阶段:生成执行计划和配置
- 执行阶段:创建流处理管道
- 运行阶段:持续处理数据流
# 资源管理
// 正确的资源管理
ssql := streamsql.New()
defer ssql.Stop() // 确保资源释放
// 错误处理
err := ssql.Execute(sql)
if err != nil {
log.Printf("执行失败: %v", err)
return
}
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# 性能考虑
# 内存使用
- 窗口大小:较大窗口占用更多内存
- 聚合状态:复杂聚合需要更多状态存储
- 数据类型:避免不必要的大型对象
# 计算复杂度
- 滑动窗口 > 滚动窗口 > 无窗口
- 复杂表达式 > 简单表达式
- 多重GROUP BY > 单一GROUP BY
在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/07/27, 12:06:54