背景
本文只是提出一个工作流思路,目前开源工作流有几款,但针对的模型大多数都是审批流程,并不是配置下发的流程。配置工作流的思想基于两个前提:
1. 在配置进入数据库后,那么即认为数据库中的数据都是正确的,如果不正确,不能进行数据的本地化
2. 入库的数据需要下发给转发设备的配置,只要符合给转发下发的条件,那么必须要成功,以达到配置的一致。如果中间流程出错,例如网络抖动,机器服务重启等,需要工作流在每一步中进行重试,如果长时间不恢复,可以人工处理,将工作流取消,或者修复环境自恢复。
需求
配置下发流程可以按需定义每一个步骤的处理流程 ,工作流可以限定调度次数,也可以无限次重试。
- step by step进行数据流程处理
- 每个步骤开发人员需要保证代码级可重入
- 每个步骤都可进行重新调度
- 步骤出错可根据业务流程将当前步骤重新调度或者卡住,并体现在工作流任务的状态上
- 工作流可以有无限个步骤
- 工作流基于配置的方式,定义工作流中的步骤与顺序
- 工作流中可以嵌入子工作流,当所有的子工作流下发完毕时,再进父工作流的调度
- 工作流可通过运维接口进行重新调度与取消
状态机设计(任务与Step同状态机)
状态机定义
Init: 工作流初始状态,未被调度状态
Success: 工作流处理成功状态
Cancel: 工作流人工取消状态
Failed: 工作流处理失败状态
Error: 工作流发生错误状态
Processing:工作流正在调度状态
状态机调度流程
Step编程要求
- 入口需检查当前是否有子任务,如果有,需要等子任务结束,更新当前步骤状态,此步骤己封装在主流程中
- 每一个流程需要可重入,即下一次重试时,有容错处理
运维接口
- 取消接口,如果event出错,或者达到最大的调度次数,需要有运维接口将正在处理或者出错的步骤包括子event取消掉
- 重新调度接口,需要有将cancel掉的event,接着出错的步骤进行重新调度
- 获取接口,根据event id查询event的状态与相关信息
配置文件
## 线程池配置
core.pool.size=2
core.pool.max.size=2000
core.pool.keepalive.second=3
core.pool.queue.capacity=2000
core.pool.delay.milliseconds=20
## 工作流任务的namespace定义
namespaces=event,test
## 工作流任务中每个step定义
# 最大步骤
event.step.max = 6
# namespace + step + index = 处理的对应的java 类名
event.step.1 = com.ctcc.event.start.flow.EventStep1
event.step.2 = com.ctcc.event.start.flow.EventStep1
event.step.3 = com.ctcc.event.start.flow.EventStep1
event.step.4 = com.ctcc.event.start.flow.EventStep1
event.step.5 = com.ctcc.event.start.flow.EventStep1
event.step.6 = com.ctcc.event.start.flow.EventStep1
test.step.max = 2
test.step.1 = com.ctcc.event.start.flow.EventStep2
test.step.2 = com.ctcc.event.start.flow.EventStep1