快速入门
- 首先定义路由,路由提供了流式的调用方式,包括输入端、处理函数和输出端。不同Endpoint其路由处理是
一致
的
router := endpoint.NewRouter().From("/api/v1/msg/").Process(func(exchange *endpoint.Exchange) bool {
//处理逻辑
return true
}).To("chain:default")
1
2
3
4
2
3
4
提示
不同Endpoint
类型,输入端From
代表的含义会有不同,但最终会根据From
值路由到该路由器:
- http/websocket endpoint:代表路径路由,根据
From
值创建指定的http服务。例如:From("/api/v1/msg/")表示创建/api/v1/msg/ http服务。 - mqtt/kafka endpoint:代表订阅的主题,根据
From
值订阅相关主题。例如:From("/api/v1/msg/")表示订阅/api/v1/msg/主题。 - schedule endpoint:代表cron表达式,根据
From
值创建相关定时任务。例如:From("*/1 * * * * *")表示每隔1秒触发该路由器。 - tpc/udp endpoint:代表正则表达式,根据
From
值把满足条件的消息转发到该路由。例如:From("^{.*")表示满足{
开头的数据。
- 然后创建Endpoint服务,创建接口也是
一致
的:
//例如:创建http 服务
restEndpoint, err := endpoint.Registry.New(rest.Type, config, rest.Config{Server: ":9090",})
// 或者使用map方式设置配置
restEndpoint, err := endpoint.Registry.New(rest.Type, config, types.Configuration{"server": ":9090",})
//例如:创建mqtt订阅 服务
mqttEndpoint, err := endpoint.Registry.New(mqtt.Type, config, mqtt.Config{Server: "127.0.0.1:1883",})
// 或者使用map方式设置配置
mqttEndpoint, err := endpoint.Registry.New(mqtt.Type, config, types.Configuration{"server": "127.0.0.1:1883",})
//例如:创建ws服务
wsEndpoint, err := endpoint.Registry.New(websocket.Type, config, websocket.Config{Server: ":9090"})
//例如:创建tcp服务
tcpEndpoint, err := endpoint.Registry.New(net.Type, config, Config{Protocol: "tcp", Server: ":8888",})
//例如: 创建schedule endpoint服务
scheduleEndpoint, err := endpoint.Registry.New(schedule.Type, config, nil)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- 把路由注册到endpoint服务中,并启动服务
//http endpoint注册路由
_, err = restEndpoint.AddRouter(router1,"POST")
_, err = restEndpoint.AddRouter(router2,"GET")
_ = restEndpoint.Start()
//mqtt endpoint注册路由
_, err = mqttEndpoint.AddRouter(router1)
_, err = mqttEndpoint.AddRouter(router2)
_ = mqttEndpoint.Start()
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
- Endpoint支持响应给调用方
router5 := endpoint.NewRouter().From("/api/v1/msgToComponent2/:msgType").Process(func(router endpoint.Router, exchange *endpoint.Exchange) bool {
//响应给客户端
exchange.Out.Headers().Set("Content-Type", "application/json")
exchange.Out.SetBody([]byte("ok"))
return true
})
//如果需要把规则链执行结果同步响应给客户端,则增加wait语义
router5 := endpoint.NewRouter().From("/api/v1/msg2Chain4/:chainId").
To("chain:${chainId}").
//必须增加Wait,异步转同步,http才能正常响应,如果不响应同步响应,不要加这一句,会影响吞吐量
Wait().
Process(func(router endpoint.Router, exchange *endpoint.Exchange) bool {
err := exchange.Out.GetError()
if err != nil {
//错误
exchange.Out.SetStatusCode(400)
exchange.Out.SetBody([]byte(exchange.Out.GetError().Error()))
} else {
//把处理结果响应给客户端,http endpoint 必须增加 Wait(),否则无法正常响应
outMsg := exchange.Out.GetMsg()
exchange.Out.Headers().Set("Content-Type", "application/json")
exchange.Out.SetBody([]byte(outMsg.Data))
}
return true
}).End()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
- 添加全局拦截器,用来进行权限校验等逻辑
restEndpoint.AddInterceptors(func(exchange *endpoint.Exchange) bool {
//权限校验逻辑
return true
})
1
2
3
4
2
3
4
在 GitHub 上编辑此页 (opens new window)
上次更新: 2024/12/22, 03:38:12
← Endpoint概述 路由→