组件连接复用
v0.24.0+网络连接类型的组件,可以把自身实例化的连接资源(客户端)共享出来,提供给其他组件复用,达到节省系统资源目的。
例如:多个相同组件复用同一个MQTT连接、同一个数据库连接或者http endpoint共用同一个端口等。
# 使用共享资源节点
endpoint
和 node
组件都支持共享资源节点,通过共享资源节点复用连接。
共享组件必须实现SharedNode
接口。官方提供的组件,网络连接类的基本支持这种方式。
复用同一连接资源通过以下方式:
- 初始化共享资源节点。通过提供一个规则链文件进行初始化,规则链定义的
endpoint
和node
组件客户端都会注册到共享资源节点中,被其他组件复用。例如:
node_pool.DefaultNodePool.Load(dsl []byte)
1
全局共享节点池规则链文件示例:
{
"ruleChain": {
"id": "default_node_pool",
"name": "全局共享节点池"
},
"metadata": {
"endpoints": [
{
"id": "local_endpoint_nats",
"type": "endpoint/nats",
"name": "本地nats连接池",
"configuration": {
"server": "nats://127.0.0.1:4222"
}
}
],
"nodes": [
{
"id": "local_mqtt_client",
"type": "mqttClient",
"name": "本地MQTT连接池",
"configuration": {
"server": "127.0.0.1:1883"
}
},
{
"id": "local_mysql_client",
"type": "dbClient",
"name": "本地MYSQL-test数据库连接池",
"configuration": {
"driverName": "mysql",
"dsn": "root:root@tcp(127.0.0.1:3306)/test"
}
},
{
"id": "local_nats",
"type": "x/natsClient",
"name": "本地nats连接池",
"configuration": {
"server": "nats://127.0.0.1:4222"
}
},
{
"id": "local_rabbitmq",
"type": "x/rabbitmqClient",
"name": "本地rabbitmq连接池",
"configuration": {
"autoDelete": true,
"durable": true,
"exchange": "rulego",
"exchangeType": "topic",
"server": "amqp://guest:guest@127.0.0.1:5672/"
}
},
{
"id": "local_redis",
"type": "x/redisClient",
"name": "本地redis连接池",
"configuration": {
"db": 0,
"server": "127.0.0.1:6379"
}
},
{
"id": "local_opengemini_write",
"type": "x/opengeminiWrite",
"name": "本地opengemini_write连接池",
"configuration": {
"database": "db0",
"server": "127.0.0.1:8086"
}
},
{
"id": "local_opengemini_query",
"type": "x/opengeminiQuery",
"name": "本地opengemini_query连接池",
"configuration": {
"database": "db0",
"server": "127.0.0.1:8086"
}
}
]
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
node_pool.DefaultNodePool 其他加载方法:参考node_pool.go (opens new window)
- 其他组件引用共享资源连接客户端,通过ref://{资源ID},方式:
{
"id": "node_2",
"type": "mqttClient",
"name": "测试",
"configuration": {
"maxReconnectInterval": 60,
"qOS": 0,
"server": "ref://local_mqtt_client",
"topic": "/device/msg"
}
}
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# 自定义共享资源节点组件
框架对共享节点做了封装,可以很方面把一个组件封装为共享资源节点,通过以下方式,下面是一个MQTT客户端节点的例子:
- 继承 base.SharedNode[T],T是可复用的资源的具体类型。 示例:
type MqttClientNode struct {
base.SharedNode[*mqtt.Client]
//节点配置
Config MqttClientNodeConfiguration
//topic 模板
topicTemplate str.Template
client *mqtt.Client
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
- 初始化SharedNode
// Init 初始化
func (x *MqttClientNode) Init(ruleConfig types.Config, configuration types.Configuration) error {
err := maps.Map2Struct(configuration, &x.Config)
if err == nil {
_ = x.SharedNode.Init(ruleConfig, x.Type(), x.Config.Server, true, func() (*mqtt.Client, error) {
return x.initClient()
})
x.topicTemplate = str.NewTemplate(x.Config.Topic)
}
return err
}
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
- 初始化具体类型客户端
// initClient 初始化客户端
func (x *MqttClientNode) initClient() (*mqtt.Client, error) {
if x.client != nil {
return x.client, nil
} else {
ctx, cancel := context.WithTimeout(context.TODO(), 4*time.Second)
x.Locker.Lock()
defer func() {
cancel()
x.Locker.Unlock()
}()
if x.client != nil {
return x.client, nil
}
var err error
x.client, err = mqtt.NewClient(ctx, x.Config.ToMqttConfig())
return x.client, err
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
通过
node_pool.DefaultNodePool
初始化全局复用节点通过ref://{resourceId}引用
# 共享资源节点组件和节点引用节点区别
在 GitHub 上编辑此页 (opens new window)
上次更新: 2024/09/24, 16:39:13