DSL
# Endpoint DSL v0.21.0+
允许通过DSL动态创建和更新,不同类型的接入端点(Endpoint),例如配置:mqtt、http、ws、schedule等接入端点触发器,以及配置路由和处理器。
Endpoint
的DSL配置文件是一个 JSON格式的文件,总体结构如下:
字段 | 类型 | 是否必填 | 说明 |
---|---|---|---|
id | string | 是 | 接入端ID |
name | string | 否 | 路由 |
type | string | 是 | 接入端类型,http、mqtt、ws、net、scheduler |
configuration | object | 否 | 接入端配置,不同的接入端类型有不同的配置字段,具体请参考Endpoint组件启动配置。 |
additionalInfo | object | 否 | 扩展字段,用于保存额外信息。 |
processor | string[] | 否 | Endpoint全局处理器列表,拦截器名称必须在平台注册。 |
routers | router[] | 是 | 路由列表。 |
# router: 接入端点路由
字段 | 类型 | 是否必填 | 说明 |
---|---|---|---|
id | string | 是 | 路由ID |
params | string[] | 否 | 路由参数,例如:HTTP Endpoint 路由参数是:POST/GET/PUT... |
from | From | 是 | 路由源 |
to | To | 否 | 路由目标,如果在规则链DSL配置,则路由目标是规则链ID自身,不需要额外指定 |
additionalInfo | object | 否 | 扩展字段,用于保存额外信息。 |
# router.from: 接入端点路由源
字段 | 类型 | 是否必填 | 说明 |
---|---|---|---|
path | string | 是 | 路由源路径,例如:http Endpoint /api/msg/:chainId scheduler Endpoint */1 * * * * * |
configuration | object | 否 | 路由源配置,非必填 |
processor | From | 是 | 路由源处理器列表,对路由源数据进行处理,拦截器名称必须在平台注册。 |
# router.to: 接入端点路由目标
字段 | 类型 | 是否必填 | 说明 |
---|---|---|---|
path | string | 是 | 路由目标路径,某个执行的规则链ID,也可以使用变量例如:${chainId} |
configuration | object | 否 | 路由目标配置,非必填 |
wait | boolean | 否 | 是否等待消息处理完成,默认为false。 |
processor | From | 是 | 路由目标处理器列表,To执行器执行完后执行该列表,拦截器名称必须在平台注册。 |
router.to.path
支持指定规则链的某节点作为起始节点,格式:chainId:nodeId,例如:b9ab7ada-97d0-4b73-8887-0cfca516a871:node_12router.to.path
支持使用metadata
变量,例如:${chainId},变量会替换成对应的值,例如:b9ab7ada-97d0-4b73-8887-0cfca516a871router.to.path
如果配置在规则链DSL中,可以不填,则表示路由到当前规则链的默认起点节点。
# 注册Processor
你可以内置一些处理器,然后在DSL使用它的name
来调用它,对数据进行转换、校验等操作。
- 通过
processor.InBuiltins.Register
注册From
处理器。 - 通过
processor.OutBuiltins.Register
注册To
处理器。
首先注册处理器:
//把http header放入消息元数据
processor.InBuiltins.Register("headersToMetadata", func(router endpoint.Router, exchange *endpoint.Exchange) bool {
msg := exchange.In.GetMsg()
headers := exchange.In.Headers()
for k := range headers {
msg.Metadata.PutValue(k, headers.Get(k))
}
//返回true执行一个处理器或者To操作
//返回true本次逻辑执行完毕,不往下执行
return true
})
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
然后:通过 "processors": ["responseToBody"] 指定,可以指定多个,如果处理器return false,则不会往下执行。
# 示例
{
"id": "e1",
"type": "http",
"name": "http server",
"configuration": {
"server": ":9090"
},
"routers": [
{
"id":"r1",
"params": [
"post"
],
"from": {
"path": "/api/v1/test/:chainId",
"configuration": {
}
},
"to": {
"path": "${chainId}",
"wait": true,
"processors": ["responseToBody"]
},
"additionalInfo": {
"aa":"aa"
}
}
]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
通过DSL启动Endpoint:
//初始化规则链
ruleDsl, err := os.ReadFile(testRulesFolder + "/filter_node.json")
_, err = engine.New("test01", ruleDsl)
if err != nil {
t.Fatal(err)
}
//读取Endpoint DSL文件
endpointBuf, err := os.ReadFile(testEndpointsFolder + "/http_01.json")
if err != nil {
t.Fatal(err)
}
//初始化endpoint
ep, err = endpoint.New("", endpointBuf, endpoint.DynamicEndpointOptions.WithConfig(config))
if err != nil {
t.Fatal(err)
}
//启动endpoint
err = ep.Start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
该DSL是启动一个端口为9090 的http服务,并提供/api/v1/test/:chainId API接口,处理逻辑交给:${chainId}规则链处理,并把规则链处理结果返回给客户端。
支持动态刷新服务和路由:
_ = ep.Reload([]byte(newDsl))
1
通过该方式,你可以方便实现以下需求,例如:
- 快速动态对外提供HTTP API,并让规则链处理API逻辑。
- 动态订阅MQTT、Kafka、Nats主题,并交给规则链处理订阅数据。
- 定时触发规则链逻辑。
- 动态提供TCP/UDP、websocket等服务。
- 你可以很容易扩展和自定义你的Endpoint,并以DSL方式管理它。
# 其他示例
订阅mqtt主题,并交给ID=default的规则链处理:
{
"id": "e_mqtt_01",
"type": "mqtt",
"name": "mqtt订阅触发",
"configuration": {
"server": "127.0.0.1:1883",
"username":"admin",
"password":"admin"
},
"routers": [
{
"id":"r1",
"from": {
"path": "#"
},
"to": {
"path": "default"
}
}
]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
每个1秒触发执行ID=default的规则链:
{
"id": "schedule_e1",
"type": "schedule",
"name": "schedule",
"routers": [
{
"from": {
"path": "*/1 * * * * *"
},
"to": {
"path": "default"
}
}
]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
在 GitHub 上编辑此页 (opens new window)
上次更新: 2024/09/02, 13:16:41