RuleGo RuleGo
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • 快速入门

  • 规则链

  • 标准组件

  • 扩展组件

  • 自定义组件

    • 自定义组件概述
      • 自定义组件
        • 组件接口
        • 生命周期
        • 初始化
        • 处理消息
        • 详细节点执行控制上下文接口
        • 示例
      • go plugin 方式提供组件
        • 示例
      • 可视化
      • 贡献组件
  • 组件市场

  • 可视化

  • AOP

  • 触发器

  • 高级主题

  • RuleGo-Server

  • 问题

目录

自定义组件概述

你可以把你的业务逻辑或者通用逻辑封装成组件,然后在规则链通过配置规则节点调用它。步骤:

  1. 实现组件接口。
  2. 把自定组件注册到注册器。
  3. 通过规则链配置调用组件。

# 自定义组件

# 组件接口

实现types.Node (opens new window)接口:

// Node 节点组件接口
//把业务封或者通用逻辑装成组件,然后通过规则链配置方式调用该组件
type Node interface {
  //New 创建一个组件新实例
  //每个规则链的规则节点都会创建一个新的实例,数据是隔离的
  New() Node
  //Type 组件类型,类型不能重复。
  //用于规则链,node.type配置,初始化对应的组件
  //建议使用`/`区分命名空间,防止冲突。例如:x/httpClient
  Type() string
  //Init 组件初始化,一般做一些组件参数配置或者客户端初始化操作
  //规则链里的规则节点初始化会调用一次
  //Configuration 参数为节点配置数据
  Init(ruleConfig Config, configuration Configuration) error
  //OnMsg 处理消息,并控制流向子节点的关系。每条流入组件的数据会经过该方法处理
  //ctx:规则引擎处理消息上下文
  //msg:消息
  OnMsg(ctx RuleContext, msg RuleMsg)
  //Destroy 销毁,做一些资源释放操作
  Destroy()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 生命周期

  • New: 用于创建节点组件实例时调用。
  • Init: 用于解析当前节点的JSON的配置,初始化规则链时候初始化一次。
  • OnMsg: 用于处理消息,并控制流向子节点的关系。每条流入组件的数据会经过该函数处理。
  • Destroy: 用于节点销毁时调用。规则引擎销毁时、规则链更新或节点配置更新时会调用该方法。

如果动态更新节点配置,则先调用该节点新实例Init方法,如果初始化成功后,再调用旧实例的Destroy方法。

# 初始化

一般我们先定义该组件的配置结构体,配置结构体支持嵌套自定义结构体,嵌套结构体的字段只支持基础类型,另外需要使用json标签定义字段的编解码,字段格式使用小驼峰命名方式。 例如以下MQTT客户端配置结构体:

type MqttClientNodeConfiguration struct {
	Server   string `json:"server"`
	Username string `json:"username"`
	Password string `json:"password"`
	// Topic 发布主题 可以使用 ${metadata.key} 读取元数据中的变量或者使用 ${msg.key} 读取消息负荷中的变量进行替换
	Topic string `json:"topic"`
	//MaxReconnectInterval 重连间隔 单位秒
	MaxReconnectInterval int `json:"maxReconnectInterval"`
	QOS                  uint8 `json:"qos"`
	CleanSession         bool `json:"cleanSession"`
	ClientID             string `json:"clientID"`
	CAFile               string `json:"cAFile"`
	CertFile             string `json:"certFile"`
	CertKeyFile          string `json:"certKeyFile"`
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

组件使用Config字段自定义组件配置(可视化编辑器会根据Config字段的结构体生成可视化节点配置):

type MqttClientNode struct {
	//节点配置
	Config MqttClientNodeConfiguration
	//topic 模板
	topicTemplate str.Template
	//客户端
	client        *mqtt.Client
}
1
2
3
4
5
6
7
8

global,vars,secrets,label,desc ,icon 为保留字段。

然后在组件初始化时,把该节点的实例化配置转成组件配置结构体,例如以下节点配置:

{
     "id": "s3",
     "type": "mqttClient",
     "name": "mqtt推送数据",
     "debugMode": false,
     "configuration": {
       "Server": "127.0.0.1:1883",
       "Topic": "/device/msg"
     }
}
1
2
3
4
5
6
7
8
9
10

Init方法 configuration 参数传递的就是该节点配置的configuration数据

// Init 初始化
func (x *MqttClientNode) Init(ruleConfig types.Config, configuration types.Configuration) error {
	//把节点配置转成组件配置结构体
    err := maps.Map2Struct(configuration, &x.Config)
    //其他初始化
    return err
}
1
2
3
4
5
6
7

最后在OnMsg中,或者其他使用该配置

// OnMsg 处理消息
func (x *MqttClientNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg) {
	//使用配置
    topic:= x.Config.Topic,
}
1
2
3
4
5

# 处理消息

当前规则节点处理完后,把消息通过以下方法通知下一个或者多个节点。规则引擎会查询满足关系的子节点,并发触发其OnMsg方法,进行链式数据流调用

//TellSuccess 通知规则引擎处理当前消息处理成功,并把消息通过`Success`关系发送到下一个节点
TellSuccess(msg RuleMsg)
//TellNext 使用指定的relationTypes,发送消息到下一个节点
TellNext(msg RuleMsg, relationTypes ...string)
//TellFailure 通知规则引擎处理当前消息处理失败,并把消息通过`Failure`关系发送到下一个节点
TellFailure(msg RuleMsg, err error)
1
2
3
4
5
6

示例:

// OnMsg 处理消息
func (x *MqttClientNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg) {
	//使用模板替换topic
	topic := x.topicTemplate.ExecuteFn(func() map[string]any {
		return base.NodeUtils.GetEvnAndMetadata(ctx, msg)
	})
	//获取mqtt客户端
	if client, err := x.SharedNode.Get(); err != nil {
		ctx.TellFailure(msg, err)
	} else {
		//把上一个节点的输出,发布到mqtt broker
		if err := client.Publish(topic, x.Config.QOS, []byte(msg.Data)); err != nil {
			//失败则通过失败链发送到下一个节点
			ctx.TellFailure(msg, err)
		} else {
			//成功则通过成功链发送到下一个节点,不改消息负荷
			ctx.TellSuccess(msg)
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 详细节点执行控制上下文接口

节点接口中处理消息函数:OnMsg(ctx RuleContext, msg RuleMsg),其中执行上下文ctx可以对规则引擎节点进行流转已经编排控制。

ctx接口定义如下:

// RuleContext 是规则引擎中消息处理上下文的接口。
// 它将消息传递到下一个或多个节点。并触发它们的业务逻辑。
// 以及对本次执行实例的节点流程进行控制和编排
type RuleContext interface {
	// TellSuccess 通知规则引擎当前消息已成功处理,并通过 'Success' 关系将消息发送到下一个节点。
	TellSuccess(msg RuleMsg)
	// TellFailure 通知规则引擎当前消息处理失败,并通过 'Failure' 关系将消息发送到下一个节点。
	TellFailure(msg RuleMsg, err error)
	// TellNext 使用指定的 relationTypes 将消息发送到下一个节点。
	TellNext(msg RuleMsg, relationTypes ...string)
	// TellSelf 在指定的延迟(毫秒)后将消息发送到当前节点。
	TellSelf(msg RuleMsg, delayMs int64)
	// TellNextOrElse 使用指定的 relationTypes 将消息发送到下一个节点。如果相应的 relationType 没有找到下一个节点,则使用 defaultRelationType 进行搜索。
	TellNextOrElse(msg RuleMsg, defaultRelationType string, relationTypes ...string)
	// TellFlow 执行子规则链。
	TellFlow(ctx context.Context, ruleChainId string, msg RuleMsg, endFunc OnEndFunc, onAllNodeCompleted func())
	// TellNode 从指定节点开始执行。如果 skipTellNext=true,则只执行当前节点,不通知下一个节点。
	// onEnd 用于查看最终执行结果。
	TellNode(ctx context.Context, nodeId string, msg RuleMsg, skipTellNext bool, onEnd OnEndFunc, onAllNodeCompleted func())
	// NewMsg 创建一个新消息实例。
	NewMsg(msgType string, metaData Metadata, data string) RuleMsg
	// GetSelfId 检索当前节点 ID。
	GetSelfId() string
	// Self 检索当前节点实例。
	Self() NodeCtx
	// From 检索消息进入当前节点的节点实例。
	From() NodeCtx
	// RuleChain 检索当前节点所在的规则链实例。
	RuleChain() NodeCtx
	// Config 检索规则引擎的配置。
	Config() Config
	// SubmitTack 提交一个异步任务以供执行。
	SubmitTack(task func())
	// SetEndFunc 设置当前消息处理结束时的回调函数。
	SetEndFunc(f OnEndFunc) RuleContext
	// GetEndFunc 检索当前消息处理结束时的回调函数。
	GetEndFunc() OnEndFunc
	// SetContext 设置共享信号量或数据的上下文,跨不同组件实例。
	SetContext(c context.Context) RuleContext
	// GetContext 检索共享信号量或数据的上下文,跨不同组件实例。
	GetContext() context.Context
	// SetOnAllNodeCompleted 设置所有节点执行完成时的回调。
	SetOnAllNodeCompleted(onAllNodeCompleted func())
	// DoOnEnd 触发 OnEnd 回调函数。
	DoOnEnd(msg RuleMsg, err error, relationType string)
	// SetCallbackFunc 设置回调函数。
	SetCallbackFunc(functionName string, f interface{})
	// GetCallbackFunc 检索回调函数。
	GetCallbackFunc(functionName string) interface{}
	// OnDebug 调用配置的 OnDebug 回调函数。
	OnDebug(ruleChainId string, flowType string, nodeId string, msg RuleMsg, relationType string, err error)
	// SetExecuteNode 设置当前节点。
	// 如果 relationTypes 为空,则执行当前节点;否则,
	// 查找并执行当前节点的子节点。
	SetExecuteNode(nodeId string, relationTypes ...string)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

# 示例

实现types.Node (opens new window)接口,定义组件:

//UpperNode A plugin that converts the message data to uppercase
type UpperNode struct{}

func (n *UpperNode) Type() string {
   return "test/upper"
}
func (n *UpperNode) New() types.Node {
  return &UpperNode{}
}
func (n *UpperNode) Init(ruleConfig types.Config, configuration types.Configuration) error {
  // Do some initialization work
  return nil
}
//处理消息
func (n *UpperNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg)  {
  msg.Data = strings.ToUpper(msg.Data)
  // Send the modified message to the next node
  ctx.TellSuccess(msg)
}

func (n *UpperNode) Destroy() {
// Do some cleanup work
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

把组件注册到注册器:

rulego.Registry.Register(&MyNode{})
1

然后在规则链文件使用你的自定义组件:

{
  "ruleChain": {
    "name": "测试规则链",
    "root": true,
    "debugMode": false
  },
  "metadata": {
    "nodes": [
      {
        "id": "s1",
        "type": "test/upper",
        "name": "名称",
        "debugMode": true,
        "configuration": {
          "field1": "组件定义的配置参数",
          "....": "..."
        }
      }
    ],
    "connections": [
      {
        "fromId": "s1",
        "toId": "连接下一个组件ID",
        "type": "与组件的连接关系"
      }
    ]
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# go plugin 方式提供组件

这种方式支持动态加载组件,但是只能支持非windows系统。 一个插件可以提供多个组件。

实现types.PluginRegistry (opens new window)接口。 并导出变量名称:Plugins,参考examples/plugin (opens new window)

# 示例

实现types.PluginRegistry (opens new window)接口

// 导出插件变量
var Plugins MyPlugins

type MyPlugins struct{}

func (p *MyPlugins) Init() error {
    return nil
}
func (p *MyPlugins) Components() []types.Node {
  //一个插件可以提供多个组件
  return []types.Node{&UpperNode{}, &TimeNode{}, &FilterNode{}}
}
1
2
3
4
5
6
7
8
9
10
11
12

编译插件:

#编译插件,生成plugin.so文件,需要在mac或者linux环境下编译
go build -buildmode=plugin -o plugin.so plugin.go
1
2

把组件注册到注册器:

rulego.Registry.RegisterPlugin("test", "./plugin.so")
1

# 可视化

  • 组件配置表单约定
  • 自定义组件可视化参考:获取组件配置表单 章节

# 贡献组件

RuleGo 的核心特性是组件化,所有业务逻辑都是组件,并能灵活配置和重用它们。目前RuleGo 已经内置了大量常用的组件。
但是,我们知道这些组件还远远不能满足所有用户的需求,所以我们希望能有更多的开发者为 RuleGo 贡献扩展组件,让 RuleGo 的生态更加丰富和强大。

  • 如果您提交的组件代码没有第三方依赖或者是通用性组件请提交到主库下的标准组件components (opens new window)
  • 常用组件提交到扩展组件仓库:rulego-components (opens new window)
  • CI/CD相关组件提交到:rulego-components-ci (opens new window)
  • AI相关组件提交到:rulego-components-ai (opens new window)
  • IoT相关组件提交到:rulego-components-iot (opens new window)
在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/03/31, 01:52:11
Modbus Node
动态组件

← Modbus Node 动态组件→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式