View Code
主要执行流程为:
1、preHandle():job前置操作
2、init():初始化reader和writer
3、prepare():执行插件的prepare操作
4、split():切分任务
5、schedule():执行任务
6、post():执行插件的post操作
7、postHandle():job后置操作
8、invokeHooks():调用hook
9、输出统计结果
上述任务流是顺序执行的,第5步会将切分的task分配到多个taskGroup中并发执行,
其中preHandle、postHandle、invokeHooks不影响整体执行,可以先忽略,
下面主要介绍关键步骤
1)init()
关键代码主要是初始化reader和writer插件
reader插件初始化的时候使用了自定义classLoader,这样可以做到插件级别的隔离,插件初始化完成之后调用了job的init函数,用于初始化插件的Job,writer插件初始化同理
2)prepare():
prepare操作比较简单,分别执行reader和writer插件Job中的prepare函数即可,同样,每次执行前都会先加载对应的classLoader用于隔离
private void prepare() {
this.prepareJobReader();
this.prepareJobWriter();
}
3)split():切分任务task
上面函数主要分为3步:
1、计算限速和并发,即实际的channel数和每个channel的限速,主要在adjustChannelNumber()中,这里不做过多说明
2、根据实际的channel数,切分reader端,具体的切分逻辑reader插件可以自行实现
3、根据reader端切分的数目切分writer端,达到reader:writer=1:1,这样每个task中都包含一个reader和一个writer
4)schedule():执行切分出来的task
schedule执行过程主要分为以下几步:
1、计算taskGroup个数
2、将切分的task分配到taskGroup中
3、启动线程池执行taskGroup,具体代码流程为scheduler.schedule(taskGroupConfigs) -> AbstractScheduler.schedule -> startAllTaskGroup -> ProcessInnerScheduler.startAllTaskGroup -> this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner) -> TaskGroupContainerRunner.run() -> this.taskGroupContainer.start()
4、收集taskGroup汇报的信息
5)post():执行插件的post操作
private void post() {
this.postJobWriter();
this.postJobReader();
}