流转换器
# streamTransform
节点类型: x/streamTransform
说明: 流转换器节点,基于 StreamSQL 引擎,使用SQL语法对实时数据流进行过滤、转换和字段处理。专门处理非聚合查询,如数据过滤、字段转换、格式变换等。支持单条数据和数组数据输入。
# 功能特点
- SQL语法:使用标准SQL语法进行数据转换,学习成本低
- 实时处理:同步处理单条数据和数组数据
- 字段操作:支持字段选择、重命名、计算和条件过滤
- 函数支持:内置60+函数,包括数学、字符串、时间等函数
- 条件过滤:支持WHERE子句进行数据过滤
- 数组处理:自动处理数组数据,逐个转换并合并结果
# 输入数据支持
该节点支持两种输入数据格式:
# 单条数据输入
直接处理单个JSON对象,转换成功则通过Success链输出,失败或不符合WHERE条件则通过Failure链输出:
{"deviceId": "sensor001", "temperature": 25.5, "humidity": 60.2}
1
# 数组数据输入
自动处理JSON数组,遍历每个元素进行转换,将成功转换的结果合并成新数组输出:
[
{"deviceId": "sensor001", "temperature": 25.5, "humidity": 60.2},
{"deviceId": "sensor002", "temperature": 28.3, "humidity": 55.8},
{"deviceId": "sensor003", "temperature": 22.1, "humidity": 65.4}
]
1
2
3
4
5
2
3
4
5
数组处理说明
- 数组中的每个元素都会逐个进行SQL转换处理
- 只有转换成功且符合WHERE条件的元素才会包含在输出数组中
- 如果至少有一个元素转换成功,则通过Success链输出合并后的数组
- 如果所有元素都转换失败或被WHERE条件过滤,则通过Failure链输出错误信息
- 消息元数据中会包含处理统计信息:originalCount、transformedCount、failedCount
# 配置
字段 | 类型 | 说明 | 默认值 |
---|---|---|---|
sql | string | 转换SQL查询语句,必须是非聚合查询(不能包含GROUP BY、聚合函数等) | 无 |
# SQL语法支持
详细语法参考
完整的 SQL 语法说明请参考:StreamSQL SQL语法参考
# 关系类型
- Success: 数据转换成功后,通过此关系链传递转换后的数据
- Failure: 转换失败时,通过此关系链传递错误信息
# 执行结果
# Success链输出
转换后的数据,格式根据SQL查询结果确定:
{
"field1": "transformed_value1",
"field2": "transformed_value2",
"calculated_field": 123.45
}
1
2
3
4
5
2
3
4
5
# Failure链输出
错误信息,包含具体的错误描述。
# 配置示例
# 基础字段转换
{
"id": "s1",
"type": "x/streamTransform",
"name": "温度单位转换",
"configuration": {
"sql": "SELECT deviceId, temperature, humidity, temperature * 1.8 + 32 as temp_fahrenheit FROM stream WHERE temperature IS NOT NULL",
"debug": false
}
}
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 数据过滤和计算
{
"id": "s2",
"type": "x/streamTransform",
"name": "高温数据处理",
"configuration": {
"sql": "SELECT deviceId, temperature, CASE WHEN temperature > 30 THEN 'HIGH' WHEN temperature < 10 THEN 'LOW' ELSE 'NORMAL' END as temp_level FROM stream WHERE temperature > 20",
"debug": true
}
}
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 字符串处理
{
"id": "s3",
"type": "x/streamTransform",
"name": "设备信息格式化",
"configuration": {
"sql": "SELECT UPPER(deviceId) as device_id, CONCAT(location, '-', deviceType) as device_info, ROUND(temperature, 2) as temp FROM stream",
"debug": false
}
}
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 应用示例
# 示例1:IoT数据预处理
场景: 对IoT设备上报的原始数据进行清洗和格式转换。
规则链配置:
{
"ruleChain": {
"id": "iot_data_preprocessing",
"name": "IoT数据预处理",
"root": true
},
"metadata": {
"nodes": [
{
"id": "s1",
"type": "x/streamTransform",
"name": "数据清洗",
"configuration": {
"sql": "SELECT deviceId, temperature, humidity, pressure, CASE WHEN temperature > 50 OR temperature < -20 THEN 'INVALID' ELSE 'VALID' END as data_quality FROM stream WHERE deviceId IS NOT NULL"
}
},
{
"id": "s2",
"type": "jsFilter",
"name": "有效数据过滤",
"configuration": {
"jsScript": "return msg.data_quality === 'VALID';"
}
},
{
"id": "s3",
"type": "x/streamTransform",
"name": "单位转换",
"configuration": {
"sql": "SELECT deviceId, ROUND(temperature, 2) as temperature_c, ROUND(temperature * 1.8 + 32, 2) as temperature_f, ROUND(humidity, 1) as humidity_percent, pressure FROM stream"
}
},
{
"id": "s4",
"type": "log",
"name": "处理结果",
"configuration": {
"jsScript": "return 'Processed: ' + JSON.stringify(msg);"
}
},
{
"id": "s5",
"type": "log",
"name": "无效数据",
"configuration": {
"jsScript": "return 'Invalid data: ' + JSON.stringify(msg);"
}
}
],
"connections": [
{
"fromId": "s1",
"toId": "s2",
"type": "Success"
},
{
"fromId": "s2",
"toId": "s3",
"type": "True"
},
{
"fromId": "s2",
"toId": "s5",
"type": "False"
},
{
"fromId": "s3",
"toId": "s4",
"type": "Success"
}
]
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
输入数据:
{"deviceId": "sensor001", "temperature": 25.678, "humidity": 65.432, "pressure": 1013.25}
1
输出结果:
{
"deviceId": "sensor001",
"temperature_c": 25.68,
"temperature_f": 78.22,
"humidity_percent": 65.4,
"pressure": 1013.25
}
1
2
3
4
5
6
7
2
3
4
5
6
7
# 示例2:数据标准化处理
场景: 将不同格式的设备数据标准化为统一格式。
规则链配置:
{
"ruleChain": {
"id": "data_standardization",
"name": "数据标准化",
"root": true
},
"metadata": {
"nodes": [
{
"id": "s1",
"type": "x/streamTransform",
"name": "字段标准化",
"configuration": {
"sql": "SELECT UPPER(COALESCE(device_id, deviceId, id)) as device_id, COALESCE(temp, temperature, t) as temperature, COALESCE(hum, humidity, h) as humidity, CONCAT(COALESCE(location, 'unknown'), '-', COALESCE(building, 'default')) as location_info FROM stream"
}
},
{
"id": "s2",
"type": "x/streamTransform",
"name": "数据分类",
"configuration": {
"sql": "SELECT *, CASE WHEN temperature > 25 AND humidity > 60 THEN 'HOT_HUMID' WHEN temperature > 25 THEN 'HOT_DRY' WHEN humidity > 60 THEN 'COOL_HUMID' ELSE 'COMFORTABLE' END as environment_type FROM stream"
}
},
{
"id": "s3",
"type": "log",
"name": "标准化结果",
"configuration": {
"jsScript": "return 'Standardized: ' + JSON.stringify(msg);"
}
}
],
"connections": [
{
"fromId": "s1",
"toId": "s2",
"type": "Success"
},
{
"fromId": "s2",
"toId": "s3",
"type": "Success"
}
]
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 示例3:数组数据批量处理
场景: 处理包含多个传感器数据的数组消息,进行批量温度转换和过滤。
输入数据:
[
{"sensorId": "s001", "value": 23.5, "unit": "C", "status": "active"},
{"sensorId": "s002", "value": 45.2, "unit": "C", "status": "active"},
{"sensorId": "s003", "value": 18.7, "unit": "C", "status": "inactive"},
{"sensorId": "s004", "value": 35.8, "unit": "C", "status": "active"}
]
1
2
3
4
5
6
2
3
4
5
6
规则链配置:
{
"ruleChain": {
"id": "batch_transform",
"name": "批量数据转换",
"root": true
},
"metadata": {
"nodes": [
{
"id": "s1",
"type": "x/streamTransform",
"name": "批量温度转换",
"configuration": {
"sql": "SELECT sensorId, value as celsius, ROUND(value * 1.8 + 32, 1) as fahrenheit, CASE WHEN value > 30 THEN 'HIGH' ELSE 'NORMAL' END as temp_status FROM stream WHERE status = 'active'"
}
},
{
"id": "s2",
"type": "log",
"name": "转换成功",
"configuration": {
"jsScript": "return 'Transformed ' + metadata.getValue('transformedCount') + ' out of ' + metadata.getValue('originalCount') + ' items: ' + JSON.stringify(msg);"
}
},
{
"id": "s3",
"type": "log",
"name": "转换失败",
"configuration": {
"jsScript": "return 'Failed to transform: ' + metadata.getValue('failedCount') + ' out of ' + metadata.getValue('originalCount') + ' items';"
}
}
],
"connections": [
{
"fromId": "s1",
"toId": "s2",
"type": "Success"
},
{
"fromId": "s1",
"toId": "s3",
"type": "Failure"
}
]
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
Success链输出结果:
[
{"sensorId": "s001", "celsius": 23.5, "fahrenheit": 74.3, "temp_status": "NORMAL"},
{"sensorId": "s002", "celsius": 45.2, "fahrenheit": 113.4, "temp_status": "HIGH"},
{"sensorId": "s004", "celsius": 35.8, "fahrenheit": 96.4, "temp_status": "HIGH"}
]
1
2
3
4
5
2
3
4
5
消息元数据:
{
"match": "true",
"originalCount": "4",
"transformedCount": "3",
"failedCount": "1"
}
1
2
3
4
5
6
2
3
4
5
6
处理说明
在这个示例中:
- 原始数组包含4个元素
- WHERE条件过滤掉了status为'inactive'的s003传感器
- 最终输出数组包含3个转换成功的元素
- 元数据记录了详细的处理统计信息
# 注意事项
- SQL语法限制: 只支持非聚合查询,不能包含GROUP BY、聚合函数等
- 数据类型: 仅支持JSON数据类型输入
- 同步处理: 转换处理是同步的,会阻塞当前消息的处理
- 数组处理特性:
- 数组中的每个元素都会逐个进行SQL转换处理
- 只有转换成功且符合WHERE条件的元素才会包含在输出数组中
- 部分元素转换失败不会影响整体结果,只影响最终数组的元素数量
- 消息元数据会自动添加处理统计信息:originalCount、transformedCount、failedCount
- WHERE条件: 不符合WHERE条件的数据会被过滤掉,不包含在输出结果中
- 性能考虑: 对于大数组,建议考虑数据量对处理性能的影响
在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/07/27, 15:17:27