Quick Start
- First, define the route, which provides a stream-like way of calling, including the input end, the processing function and the output end. The route processing is
consistent
for different Endpoints
router := endpoint.NewRouter().From("/api/v1/msg/").Process(func(exchange *endpoint.Exchange) bool {
//Processing logic
return true
}).To("chain:default")
1
2
3
4
2
3
4
TIP
For different Endpoint
types, the meaning of the input end From
will be different, but it will eventually route to the router according to the From
value:
- http/websocket endpoint: represents path routing, creates the specified http service according to the
From
value. For example: From("/api/v1/msg/") means to create /api/v1/msg/ http service. - mqtt/kafka endpoint: represents the subscribed topic, subscribes to the relevant topic according to the
From
value. For example: From("/api/v1/msg/") means to subscribe to the /api/v1/msg/ topic. - schedule endpoint: represents the cron expression, creates the relevant scheduled task according to the
From
value. For example: From("*/1 * * * * *") means to trigger the router every 1 second. - tpc/udp endpoint: represents the regular expression, forwards the message that meets the condition to the router according to the
From
value. For example: From("^{.*") means that the data starting with{
is satisfied.
- Then create the Endpoint service, the creation interface is also
consistent
:
//For example: create http service
restEndpoint, err := endpoint.Registry.New(rest.Type, config, rest.Config{Server: ":9090",})
// or use map to set configuration
restEndpoint, err := endpoint.Registry.New(rest.Type, config, types.Configuration{"server": ":9090",})
//For example: create mqtt subscription service
mqttEndpoint, err := endpoint.Registry.New(mqtt.Type, config, mqtt.Config{Server: "127.0.0.1:1883",})
// or use map to set configuration
mqttEndpoint, err := endpoint.Registry.New(mqtt.Type, config, types.Configuration{"server": "127.0.0.1:1883",})
//For example: create ws service
wsEndpoint, err := endpoint.Registry.New(websocket.Type, config, websocket.Config{Server: ":9090"})
//For example: create tcp service
tcpEndpoint, err := endpoint.Registry.New(net.Type, config, Config{Protocol: "tcp", Server: ":8888",})
//For example: create schedule endpoint service
scheduleEndpoint, err := endpoint.Registry.New(schedule.Type, config, nil)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- Register the route to the endpoint service and start the service
//http endpoint register route
_, err = restEndpoint.AddRouter(router1,"POST")
_, err = restEndpoint.AddRouter(router2,"GET")
_ = restEndpoint.Start()
//mqtt endpoint register route
_, err = mqttEndpoint.AddRouter(router1)
_, err = mqttEndpoint.AddRouter(router2)
_ = mqttEndpoint.Start()
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
- Endpoint supports response to the caller
router5 := endpoint.NewRouter().From("/api/v1/msgToComponent2/:msgType").Process(func(router endpoint.Router, exchange *endpoint.Exchange) bool {
//Response to the client
exchange.Out.Headers().Set("Content-Type", "application/json")
exchange.Out.SetBody([]byte("ok"))
return true
})
//If you need to synchronize the rule chain execution result to the client, add the wait semantics
router5 := endpoint.NewRouter().From("/api/v1/msg2Chain4/:chainId").
To("chain:${chainId}").
//Must add Wait, asynchronous to synchronous, http can respond normally, if you do not respond synchronously, do not add this sentence, it will affect the throughput
Wait().
Process(func(router endpoint.Router, exchange *endpoint.Exchange) bool {
err := exchange.Out.GetError()
if err != nil {
//error
exchange.Out.SetStatusCode(400)
exchange.Out.SetBody([]byte(exchange.Out.GetError().Error()))
} else {
//Respond the processing result to the client, http endpoint must add Wait(), otherwise it cannot respond normally
outMsg := exchange.Out.GetMsg()
exchange.Out.Headers().Set("Content-Type", "application/json")
exchange.Out.SetBody([]byte(outMsg.Data))
}
return true
}).End()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
- Add global interceptors to perform authorization verification and other logic
restEndpoint.AddInterceptors(func(exchange *endpoint.Exchange) bool {
//Authorization verification logic
return true
})
1
2
3
4
2
3
4
Edit this page on GitHub (opens new window)
Last Updated: 2024/12/22, 03:38:12