RuleGo RuleGo
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • 快速入门

  • 规则链

  • 标准组件

  • 扩展组件

    • 扩展组件概述
    • 过滤器

    • 转换器

    • 外部的

    • AI

    • CI

    • IoT

    • 流式计算

      • 流式计算
      • 流聚合器
      • 流转换器
        • 功能特点
        • 输入数据支持
          • 单条数据输入
          • 数组数据输入
        • 配置
        • SQL语法支持
        • 关系类型
        • 执行结果
          • Success链输出
          • Failure链输出
        • 配置示例
          • 基础字段转换
          • 数据过滤和计算
          • 字符串处理
        • 应用示例
          • 示例1:IoT数据预处理
          • 示例2:数据标准化处理
          • 示例3:数组数据批量处理
        • 注意事项
  • 自定义组件

  • 组件市场

  • 可视化

  • AOP

  • 触发器

  • 高级主题

  • RuleGo-Server

  • 问题

目录

流转换器

# streamTransform

节点类型: x/streamTransform

说明: 流转换器节点,基于 StreamSQL 引擎,使用SQL语法对实时数据流进行过滤、转换和字段处理。专门处理非聚合查询,如数据过滤、字段转换、格式变换等。支持单条数据和数组数据输入。

# 功能特点

  • SQL语法:使用标准SQL语法进行数据转换,学习成本低
  • 实时处理:同步处理单条数据和数组数据
  • 字段操作:支持字段选择、重命名、计算和条件过滤
  • 函数支持:内置60+函数,包括数学、字符串、时间等函数
  • 条件过滤:支持WHERE子句进行数据过滤
  • 数组处理:自动处理数组数据,逐个转换并合并结果

# 输入数据支持

该节点支持两种输入数据格式:

# 单条数据输入

直接处理单个JSON对象,转换成功则通过Success链输出,失败或不符合WHERE条件则通过Failure链输出:

{"deviceId": "sensor001", "temperature": 25.5, "humidity": 60.2}
1

# 数组数据输入

自动处理JSON数组,遍历每个元素进行转换,将成功转换的结果合并成新数组输出:

[
  {"deviceId": "sensor001", "temperature": 25.5, "humidity": 60.2},
  {"deviceId": "sensor002", "temperature": 28.3, "humidity": 55.8},
  {"deviceId": "sensor003", "temperature": 22.1, "humidity": 65.4}
]
1
2
3
4
5

数组处理说明

  • 数组中的每个元素都会逐个进行SQL转换处理
  • 只有转换成功且符合WHERE条件的元素才会包含在输出数组中
  • 如果至少有一个元素转换成功,则通过Success链输出合并后的数组
  • 如果所有元素都转换失败或被WHERE条件过滤,则通过Failure链输出错误信息
  • 消息元数据中会包含处理统计信息:originalCount、transformedCount、failedCount

# 配置

字段 类型 说明 默认值
sql string 转换SQL查询语句,必须是非聚合查询(不能包含GROUP BY、聚合函数等) 无

# SQL语法支持

详细语法参考

完整的 SQL 语法说明请参考:StreamSQL SQL语法参考

# 关系类型

  • Success: 数据转换成功后,通过此关系链传递转换后的数据
  • Failure: 转换失败时,通过此关系链传递错误信息

# 执行结果

# Success链输出

转换后的数据,格式根据SQL查询结果确定:

{
  "field1": "transformed_value1",
  "field2": "transformed_value2",
  "calculated_field": 123.45
}
1
2
3
4
5

# Failure链输出

错误信息,包含具体的错误描述。

# 配置示例

# 基础字段转换

{
  "id": "s1",
  "type": "x/streamTransform",
  "name": "温度单位转换",
  "configuration": {
    "sql": "SELECT deviceId, temperature, humidity, temperature * 1.8 + 32 as temp_fahrenheit FROM stream WHERE temperature IS NOT NULL",
    "debug": false
  }
}
1
2
3
4
5
6
7
8
9

# 数据过滤和计算

{
  "id": "s2",
  "type": "x/streamTransform",
  "name": "高温数据处理",
  "configuration": {
    "sql": "SELECT deviceId, temperature, CASE WHEN temperature > 30 THEN 'HIGH' WHEN temperature < 10 THEN 'LOW' ELSE 'NORMAL' END as temp_level FROM stream WHERE temperature > 20",
    "debug": true
  }
}
1
2
3
4
5
6
7
8
9

# 字符串处理

{
  "id": "s3",
  "type": "x/streamTransform",
  "name": "设备信息格式化",
  "configuration": {
    "sql": "SELECT UPPER(deviceId) as device_id, CONCAT(location, '-', deviceType) as device_info, ROUND(temperature, 2) as temp FROM stream",
    "debug": false
  }
}
1
2
3
4
5
6
7
8
9

# 应用示例

# 示例1:IoT数据预处理

场景: 对IoT设备上报的原始数据进行清洗和格式转换。

规则链配置:

{
  "ruleChain": {
    "id": "iot_data_preprocessing",
    "name": "IoT数据预处理",
    "root": true
  },
  "metadata": {
    "nodes": [
      {
        "id": "s1",
        "type": "x/streamTransform",
        "name": "数据清洗",
        "configuration": {
          "sql": "SELECT deviceId, temperature, humidity, pressure, CASE WHEN temperature > 50 OR temperature < -20 THEN 'INVALID' ELSE 'VALID' END as data_quality FROM stream WHERE deviceId IS NOT NULL"
        }
      },
      {
        "id": "s2",
        "type": "jsFilter",
        "name": "有效数据过滤",
        "configuration": {
          "jsScript": "return msg.data_quality === 'VALID';"
        }
      },
      {
        "id": "s3",
        "type": "x/streamTransform",
        "name": "单位转换",
        "configuration": {
          "sql": "SELECT deviceId, ROUND(temperature, 2) as temperature_c, ROUND(temperature * 1.8 + 32, 2) as temperature_f, ROUND(humidity, 1) as humidity_percent, pressure FROM stream"
        }
      },
      {
        "id": "s4",
        "type": "log",
        "name": "处理结果",
        "configuration": {
          "jsScript": "return 'Processed: ' + JSON.stringify(msg);"
        }
      },
      {
        "id": "s5",
        "type": "log",
        "name": "无效数据",
        "configuration": {
          "jsScript": "return 'Invalid data: ' + JSON.stringify(msg);"
        }
      }
    ],
    "connections": [
      {
        "fromId": "s1",
        "toId": "s2",
        "type": "Success"
      },
      {
        "fromId": "s2",
        "toId": "s3",
        "type": "True"
      },
      {
        "fromId": "s2",
        "toId": "s5",
        "type": "False"
      },
      {
        "fromId": "s3",
        "toId": "s4",
        "type": "Success"
      }
    ]
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

输入数据:

{"deviceId": "sensor001", "temperature": 25.678, "humidity": 65.432, "pressure": 1013.25}
1

输出结果:

{
  "deviceId": "sensor001",
  "temperature_c": 25.68,
  "temperature_f": 78.22,
  "humidity_percent": 65.4,
  "pressure": 1013.25
}
1
2
3
4
5
6
7

# 示例2:数据标准化处理

场景: 将不同格式的设备数据标准化为统一格式。

规则链配置:

{
  "ruleChain": {
    "id": "data_standardization",
    "name": "数据标准化",
    "root": true
  },
  "metadata": {
    "nodes": [
      {
        "id": "s1",
        "type": "x/streamTransform",
        "name": "字段标准化",
        "configuration": {
          "sql": "SELECT UPPER(COALESCE(device_id, deviceId, id)) as device_id, COALESCE(temp, temperature, t) as temperature, COALESCE(hum, humidity, h) as humidity, CONCAT(COALESCE(location, 'unknown'), '-', COALESCE(building, 'default')) as location_info FROM stream"
        }
      },
      {
        "id": "s2",
        "type": "x/streamTransform",
        "name": "数据分类",
        "configuration": {
          "sql": "SELECT *, CASE WHEN temperature > 25 AND humidity > 60 THEN 'HOT_HUMID' WHEN temperature > 25 THEN 'HOT_DRY' WHEN humidity > 60 THEN 'COOL_HUMID' ELSE 'COMFORTABLE' END as environment_type FROM stream"
        }
      },
      {
        "id": "s3",
        "type": "log",
        "name": "标准化结果",
        "configuration": {
          "jsScript": "return 'Standardized: ' + JSON.stringify(msg);"
        }
      }
    ],
    "connections": [
      {
        "fromId": "s1",
        "toId": "s2",
        "type": "Success"
      },
      {
        "fromId": "s2",
        "toId": "s3",
        "type": "Success"
      }
    ]
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

# 示例3:数组数据批量处理

场景: 处理包含多个传感器数据的数组消息,进行批量温度转换和过滤。

输入数据:

[
  {"sensorId": "s001", "value": 23.5, "unit": "C", "status": "active"},
  {"sensorId": "s002", "value": 45.2, "unit": "C", "status": "active"},
  {"sensorId": "s003", "value": 18.7, "unit": "C", "status": "inactive"},
  {"sensorId": "s004", "value": 35.8, "unit": "C", "status": "active"}
]
1
2
3
4
5
6

规则链配置:

{
  "ruleChain": {
    "id": "batch_transform",
    "name": "批量数据转换",
    "root": true
  },
  "metadata": {
    "nodes": [
      {
        "id": "s1",
        "type": "x/streamTransform",
        "name": "批量温度转换",
        "configuration": {
          "sql": "SELECT sensorId, value as celsius, ROUND(value * 1.8 + 32, 1) as fahrenheit, CASE WHEN value > 30 THEN 'HIGH' ELSE 'NORMAL' END as temp_status FROM stream WHERE status = 'active'"
        }
      },
      {
        "id": "s2",
        "type": "log",
        "name": "转换成功",
        "configuration": {
          "jsScript": "return 'Transformed ' + metadata.getValue('transformedCount') + ' out of ' + metadata.getValue('originalCount') + ' items: ' + JSON.stringify(msg);"
        }
      },
      {
        "id": "s3",
        "type": "log",
        "name": "转换失败",
        "configuration": {
          "jsScript": "return 'Failed to transform: ' + metadata.getValue('failedCount') + ' out of ' + metadata.getValue('originalCount') + ' items';"
        }
      }
    ],
    "connections": [
      {
        "fromId": "s1",
        "toId": "s2",
        "type": "Success"
      },
      {
        "fromId": "s1",
        "toId": "s3",
        "type": "Failure"
      }
    ]
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

Success链输出结果:

[
  {"sensorId": "s001", "celsius": 23.5, "fahrenheit": 74.3, "temp_status": "NORMAL"},
  {"sensorId": "s002", "celsius": 45.2, "fahrenheit": 113.4, "temp_status": "HIGH"},
  {"sensorId": "s004", "celsius": 35.8, "fahrenheit": 96.4, "temp_status": "HIGH"}
]
1
2
3
4
5

消息元数据:

{
  "match": "true",
  "originalCount": "4",
  "transformedCount": "3",
  "failedCount": "1"
}
1
2
3
4
5
6

处理说明

在这个示例中:

  • 原始数组包含4个元素
  • WHERE条件过滤掉了status为'inactive'的s003传感器
  • 最终输出数组包含3个转换成功的元素
  • 元数据记录了详细的处理统计信息

# 注意事项

  1. SQL语法限制: 只支持非聚合查询,不能包含GROUP BY、聚合函数等
  2. 数据类型: 仅支持JSON数据类型输入
  3. 同步处理: 转换处理是同步的,会阻塞当前消息的处理
  4. 数组处理特性:
    • 数组中的每个元素都会逐个进行SQL转换处理
    • 只有转换成功且符合WHERE条件的元素才会包含在输出数组中
    • 部分元素转换失败不会影响整体结果,只影响最终数组的元素数量
    • 消息元数据会自动添加处理统计信息:originalCount、transformedCount、failedCount
  5. WHERE条件: 不符合WHERE条件的数据会被过滤掉,不包含在输出结果中
  6. 性能考虑: 对于大数组,建议考虑数据量对处理性能的影响
在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/07/27, 15:17:27
流聚合器
自定义组件概述

← 流聚合器 自定义组件概述→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式