Websocket Client Endpoint
Websocket Client Endpoint 主动连接远程 WebSocket 服务器,接收服务端推送的消息并路由到规则链处理。支持自动重连、心跳保活、自定义请求头和消息类型过滤。适用于实时数据订阅、WebSocket 协议代理、IoT 设备对接等场景。
# Type
endpoint/ws_client
# 核心特性
# 🔄 自动重连
连接断开后,按配置的重连间隔自动尝试重新连接服务器,确保连接可靠性。
# 💓 心跳保活
支持三种心跳模式:
- 默认: 发送 WebSocket 协议级 Ping 帧
- 自定义内容: 通过 TextMessage 发送自定义心跳数据
- 回调函数: 通过代码完全控制心跳发送逻辑(Go API)
# 📨 消息类型过滤
支持按消息类型过滤接收到的数据:
- TextMessage: 文本消息
- BinaryMessage: 二进制消息
- 可分别开关,灵活控制需要处理的消息类型
# 🔗 自定义请求头
支持在连接时携带自定义 HTTP Header,用于认证、鉴权等场景。
# 启动配置
| 字段 | 类型 | 是否必填 | 说明 | 默认值 |
|---|---|---|---|---|
| server | string | 是 | WebSocket 服务器地址,如 ws://127.0.0.1:8080/ws 或 wss:// | - |
| headers | object | 否 | 连接时携带的自定义 Header,JSON 格式 | - |
| reconnectInterval | int | 否 | 断线重连间隔(秒),0 表示不重连 | 5 |
| heartbeatInterval | int | 否 | 心跳发送间隔(秒),0 表示不发送心跳 | 0 |
| heartbeatData | string | 否 | 心跳包内容,空字符串默认发送 Ping 帧,非空则发送 TextMessage | "" |
| allowText | bool | 否 | 是否接收 TextMessage | true |
| allowBinary | bool | 否 | 是否接收 BinaryMessage | true |
# 完整配置示例
# 基础 WebSocket 客户端
{
"id": "ws_client",
"type": "endpoint/ws_client",
"configuration": {
"server": "ws://127.0.0.1:8080/ws",
"reconnectInterval": 5
},
"routers": [
{
"from": {
"path": ""
},
"to": {
"path": "chain:processMessage"
}
}
]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 带认证和心跳的客户端
{
"id": "ws_auth_client",
"type": "endpoint/ws_client",
"configuration": {
"server": "wss://api.example.com/v1/stream",
"headers": {
"Authorization": "Bearer token123"
},
"reconnectInterval": 3,
"heartbeatInterval": 30,
"heartbeatData": "{\"type\":\"ping\"}"
},
"routers": [
{
"from": {
"path": ""
},
"to": {
"path": "chain:handleStream"
}
}
]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 仅接收二进制消息
{
"id": "ws_binary_client",
"type": "endpoint/ws_client",
"configuration": {
"server": "ws://192.168.1.100:9090/binary",
"allowText": false,
"allowBinary": true,
"reconnectInterval": 5,
"heartbeatInterval": 60
},
"routers": [
{
"from": {
"path": ""
},
"to": {
"path": "chain:binaryHandler"
}
}
]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Go 代码示例
config := engine.NewConfig()
client := &websocket.WsClient{}
client.Init(config, types.Configuration{
"server": "ws://127.0.0.1:8080/ws",
"reconnectInterval": 5,
"heartbeatInterval": 30,
"heartbeatData": "",
})
router := impl.NewRouter().From("").Process(func(router endpoint.Router, exchange *endpoint.Exchange) bool {
// 处理接收到的消息
return true
}).End()
client.AddRouter(router)
client.Start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 应用场景
# 📡 实时数据订阅
- 行情数据推送
- 实时通知订阅
- 日志流收集
# 🔗 服务对接
- 第三方 WebSocket API 集成
- 微服务间通信
- IoT 平台数据对接
# 🌐 协议代理
- WebSocket 转 MQTT 网关
- WebSocket 转 TCP 代理
- 消息路由分发
# 示例代码
参考完整示例:
在 GitHub 上编辑此页 (opens new window)
上次更新: 2026/05/13, 04:03:23