延迟
delay
组件:延迟组件。用于实现消息的延迟处理、定时任务、调度、时间敏感操作和流量削峰等场景。
组件工作原理:
- 接收到消息后,将消息放入延迟队列,等待指定的延迟时间
- 当延迟时间到达后,从队列中取出消息,通过成功链路(
Success
)路由到下一个节点 - 如果队列中的消息数量超过最大限制(maxPendingMsgs),新消息会通过失败链路(
Failure
)路由 - 当overwrite=true时,延迟期内的新消息会覆盖旧消息,直到第一条消息处理完成后才会接收新消息
主要应用场景:
- 消息延迟处理:如订单超时取消
- 定时任务:如定期数据清理
- 流量削峰:控制消息处理速率
- 时间窗口聚合:在固定时间窗口内处理消息
- 条件防抖:避免频繁触发,如以下示例
应用示例:温度报警防抖
假设有一个温度传感器每秒上报一次数据,需要实现:
- 温度 > 30℃ 持续10秒时触发高温报警
- 温度 < 25℃ 持续10秒时解除报警
- 避免温度波动导致报警频繁触发
配置延迟组件:
# 配置
字段 | 类型 | 说明 | 默认值 |
---|---|---|---|
periodInSeconds | int | 延迟时间,单位秒 | 60 |
maxPendingMsgs | int | 最大允许挂起消息的数量 | 1000 |
periodInSecondsPattern | int | 通过${metadataKey}方式从metadata变量中获取延迟时间,如果有值,优先取该值。 | 无 |
overwrite | bool | 是否覆盖周期内的消息。true:周期内只保留一条消息,新的消息会覆盖之前的消息。直到队列里的消息被处理后,才会再次进入延迟队。 | false |
# Relation Type
- Success: periodInSeconds时间过后,把消息发送到
Success
链 - Failure: 等待消息数量>maxPendingMsgs,把消息发送到
Failure
链
# 执行结果
无
# 配置示例
{
"id": "s1",
"type": "delay",
"name": "执行延迟组件",
"configuration": {
"periodInSeconds": 1,
"maxPendingMsgs": 1
}
}
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 应用示例
在 GitHub 上编辑此页 (opens new window)
上次更新: 2024/12/22, 03:38:12