自定义组件概述
你可以把你的业务逻辑或者通用逻辑封装成组件,然后在规则链通过配置规则节点调用它。步骤:
- 实现组件接口。
- 把自定组件注册到注册器。
- 通过规则链配置调用组件。
# 自定义组件
# 组件接口
实现types.Node (opens new window)接口:
// Node 节点组件接口
//把业务封或者通用逻辑装成组件,然后通过规则链配置方式调用该组件
type Node interface {
//New 创建一个组件新实例
//每个规则链的规则节点都会创建一个新的实例,数据是隔离的
New() Node
//Type 组件类型,类型不能重复。
//用于规则链,node.type配置,初始化对应的组件
//建议使用`/`区分命名空间,防止冲突。例如:x/httpClient
Type() string
//Init 组件初始化,一般做一些组件参数配置或者客户端初始化操作
//规则链里的规则节点初始化会调用一次
Init(ruleConfig Config, configuration Configuration) error
//OnMsg 处理消息,并控制流向子节点的关系。每条流入组件的数据会经过该方法处理
//ctx:规则引擎处理消息上下文
//msg:消息
OnMsg(ctx RuleContext, msg RuleMsg)
//Destroy 销毁,做一些资源释放操作
Destroy()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 生命周期
- New: 用于创建节点组件实例时调用。
- Init: 用于解析当前节点的JSON的配置,初始化规则链时候初始化一次。
- OnMsg: 用于处理消息,并控制流向子节点的关系。每条流入组件的数据会经过该函数处理。
- Destroy: 用于节点销毁时调用。规则引擎销毁时、规则链更新或节点配置更新时会调用该方法。
如果动态更新节点配置,则先调用该节点新实例
Init
方法,如果初始化成功后,再调用旧实例的Destroy
方法。
# 处理消息
当前规则节点处理完后,把消息通过以下方法通知下一个或者多个节点。规则引擎会查询满足关系的子节点,并发触发其OnMsg
方法,进行链式数据流调用
//TellSuccess 通知规则引擎处理当前消息处理成功,并把消息通过`Success`关系发送到下一个节点
TellSuccess(msg RuleMsg)
//TellNext 使用指定的relationTypes,发送消息到下一个节点
TellNext(msg RuleMsg, relationTypes ...string)
//TellFailure 通知规则引擎处理当前消息处理失败,并把消息通过`Failure`关系发送到下一个节点
TellFailure(msg RuleMsg, err error)
1
2
3
4
5
6
2
3
4
5
6
# 详细节点执行控制上下文接口
节点接口中处理消息函数:OnMsg(ctx RuleContext, msg RuleMsg)
,其中执行上下文ctx
可以对规则引擎节点进行流转已经编排控制。
ctx
接口定义如下:
// RuleContext 是规则引擎中消息处理上下文的接口。
// 它将消息传递到下一个或多个节点。并触发它们的业务逻辑。
// 以及对本次执行实例的节点流程进行控制和编排
type RuleContext interface {
// TellSuccess 通知规则引擎当前消息已成功处理,并通过 'Success' 关系将消息发送到下一个节点。
TellSuccess(msg RuleMsg)
// TellFailure 通知规则引擎当前消息处理失败,并通过 'Failure' 关系将消息发送到下一个节点。
TellFailure(msg RuleMsg, err error)
// TellNext 使用指定的 relationTypes 将消息发送到下一个节点。
TellNext(msg RuleMsg, relationTypes ...string)
// TellSelf 在指定的延迟(毫秒)后将消息发送到当前节点。
TellSelf(msg RuleMsg, delayMs int64)
// TellNextOrElse 使用指定的 relationTypes 将消息发送到下一个节点。如果相应的 relationType 没有找到下一个节点,则使用 defaultRelationType 进行搜索。
TellNextOrElse(msg RuleMsg, defaultRelationType string, relationTypes ...string)
// TellFlow 执行子规则链。
TellFlow(ctx context.Context, ruleChainId string, msg RuleMsg, endFunc OnEndFunc, onAllNodeCompleted func())
// TellNode 从指定节点开始执行。如果 skipTellNext=true,则只执行当前节点,不通知下一个节点。
// onEnd 用于查看最终执行结果。
TellNode(ctx context.Context, nodeId string, msg RuleMsg, skipTellNext bool, onEnd OnEndFunc, onAllNodeCompleted func())
// NewMsg 创建一个新消息实例。
NewMsg(msgType string, metaData Metadata, data string) RuleMsg
// GetSelfId 检索当前节点 ID。
GetSelfId() string
// Self 检索当前节点实例。
Self() NodeCtx
// From 检索消息进入当前节点的节点实例。
From() NodeCtx
// RuleChain 检索当前节点所在的规则链实例。
RuleChain() NodeCtx
// Config 检索规则引擎的配置。
Config() Config
// SubmitTack 提交一个异步任务以供执行。
SubmitTack(task func())
// SetEndFunc 设置当前消息处理结束时的回调函数。
SetEndFunc(f OnEndFunc) RuleContext
// GetEndFunc 检索当前消息处理结束时的回调函数。
GetEndFunc() OnEndFunc
// SetContext 设置共享信号量或数据的上下文,跨不同组件实例。
SetContext(c context.Context) RuleContext
// GetContext 检索共享信号量或数据的上下文,跨不同组件实例。
GetContext() context.Context
// SetOnAllNodeCompleted 设置所有节点执行完成时的回调。
SetOnAllNodeCompleted(onAllNodeCompleted func())
// DoOnEnd 触发 OnEnd 回调函数。
DoOnEnd(msg RuleMsg, err error, relationType string)
// SetCallbackFunc 设置回调函数。
SetCallbackFunc(functionName string, f interface{})
// GetCallbackFunc 检索回调函数。
GetCallbackFunc(functionName string) interface{}
// OnDebug 调用配置的 OnDebug 回调函数。
OnDebug(ruleChainId string, flowType string, nodeId string, msg RuleMsg, relationType string, err error)
// SetExecuteNode 设置当前节点。
// 如果 relationTypes 为空,则执行当前节点;否则,
// 查找并执行当前节点的子节点。
SetExecuteNode(nodeId string, relationTypes ...string)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# 示例
实现types.Node (opens new window)接口,定义组件:
//UpperNode A plugin that converts the message data to uppercase
type UpperNode struct{}
func (n *UpperNode) Type() string {
return "test/upper"
}
func (n *UpperNode) New() types.Node {
return &UpperNode{}
}
func (n *UpperNode) Init(ruleConfig types.Config, configuration types.Configuration) error {
// Do some initialization work
return nil
}
//处理消息
func (n *UpperNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg) {
msg.Data = strings.ToUpper(msg.Data)
// Send the modified message to the next node
ctx.TellSuccess(msg)
}
func (n *UpperNode) Destroy() {
// Do some cleanup work
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
把组件注册到注册器:
rulego.Registry.Register(&MyNode{})
1
然后在规则链文件使用你的自定义组件:
{
"ruleChain": {
"name": "测试规则链",
"root": true,
"debugMode": false
},
"metadata": {
"nodes": [
{
"id": "s1",
"type": "test/upper",
"name": "名称",
"debugMode": true,
"configuration": {
"field1": "组件定义的配置参数",
"....": "..."
}
}
],
"connections": [
{
"fromId": "s1",
"toId": "连接下一个组件ID",
"type": "与组件的连接关系"
}
]
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# go plugin
方式提供组件
这种方式支持动态加载组件,但是只能支持非windows系统。 一个插件可以提供多个组件。
实现types.PluginRegistry (opens new window)接口。
并导出变量名称:Plugins
,参考examples/plugin (opens new window)
# 示例
实现types.PluginRegistry (opens new window)接口
// 导出插件变量
var Plugins MyPlugins
type MyPlugins struct{}
func (p *MyPlugins) Init() error {
return nil
}
func (p *MyPlugins) Components() []types.Node {
//一个插件可以提供多个组件
return []types.Node{&UpperNode{}, &TimeNode{}, &FilterNode{}}
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
编译插件:
#编译插件,生成plugin.so文件,需要在mac或者linux环境下编译
go build -buildmode=plugin -o plugin.so plugin.go
1
2
2
把组件注册到注册器:
rulego.Registry.RegisterPlugin("test", "./plugin.so")
1
# 可视化
自定义组件可视化参考:获取组件配置表单 章节
# 贡献组件
RuleGo 的核心特性是组件化,所有业务逻辑都是组件,并能灵活配置和重用它们。目前RuleGo 已经内置了大量常用的组件。
但是,我们知道这些组件还远远不能满足所有用户的需求,所以我们希望能有更多的开发者为 RuleGo 贡献扩展组件,让 RuleGo 的生态更加丰富和强大。
- 如果您提交的组件代码没有第三方依赖或者是通用性组件请提交到主库下的标准组件components (opens new window)
- 常用组件提交到扩展组件仓库:rulego-components (opens new window)
- CI/CD相关组件提交到:rulego-components-ci (opens new window)
- AI相关组件提交到:rulego-components-ai (opens new window)
- IoT相关组件提交到:rulego-components-iot (opens new window)
在 GitHub 上编辑此页 (opens new window)
上次更新: 2024/09/02, 13:16:41