标签:step 项目 wait org pat toc schema 执行 任务
企业中经常会需要批处理才能处理完成的业务操作,比如:自动化地处理大批量复杂数据,如月结计算;重复性地处理大批量数据,如费率计算;充当内部系统和外部系统的数据纽带,中间需要对数据进行格式化,校验,转换处理等.
在pom中导入Spring Batch,Mysql,和JDBC依赖,
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cc.mrbird</groupId>
<artifactId>spring-batch-start</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-batch-start</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
在编写代码前,了解一下Spring Batch的构成
? spring-batch里最基本的单元就是人物Job,一个job由若干个步骤step组成,任务启动器job launcher负责运行job,任务存储仓库job Repository存贮者job的执行状态,参数,和日志等信息.
? job可以分为三大类:
数据读取item Reader
数据中间处理item Processor
数据输出item Writer
任务存储仓库可以是关系型数据库MySQL,菲关系型数据库MongoDB或直接存储字内存中
在springboot的入口类上添加@EnableBatchProcessing注解,表示开启SpringBatch批处理功能
步骤step是由若干个小人物taskLet组成的,所以我们通过tasklet方法创建,tasklet方法接受了一个tasklet类型参数,tasklet是一个函数式接口,源码如下
public interface Tasklet {
@Nullable
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;
}
我们可以使用lambda表达式创建一个匿名实现:
(contribution, chunkContext) -> {
System.out.println("执行步骤....");
return RepeatStatus.FINISHED;
}
一个复杂的任务一般包含多个步骤,下面举个多步骤任务的例子。在job包下新建MultiStepJobDemo
类:
@Component
public class MultiStepJobDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job multiStepJob() {
return jobBuilderFactory.get("multiStepJob")
.start(step1())
.next(step2())
.next(step3())
.build();
}
private Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step step2() {
return stepBuilderFactory.get("step2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step step3() {
return stepBuilderFactory.get("step3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
}
上面代码中,我们通过step1()
、step2()
和step3()
三个方法创建了三个步骤。Job里要使用这些步骤,只需要通过JobBuilderFactory
的start
方法指定第一个步骤,然后通过next
方法不断地指定下一个步骤即可。
启动项目,控制台打印日志如下:
2020-03-06 13:52:52.188 INFO 18472 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=multiStepJob]] launched with the following parameters: [{}]
2020-03-06 13:52:52.222 INFO 18472 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
执行步骤一操作。。。
2020-03-06 13:52:52.251 INFO 18472 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step1] executed in 29ms
2020-03-06 13:52:52.292 INFO 18472 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step2]
执行步骤二操作。。。
2020-03-06 13:52:52.323 INFO 18472 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step2] executed in 30ms
2020-03-06 13:52:52.375 INFO 18472 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step3]
执行步骤三操作。。。
2020-03-06 13:52:52.405 INFO 18472 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step3] executed in 29ms
2020-03-06 13:52:52.428 INFO 18472 --- [ main] o.s.b.c.l.support.SimpleJobLauncher
三个步骤依次执行成功。
多个步骤在执行过程中也可以通过上一个步骤的执行状态来决定是否执行下一个步骤,修改上面的代码:
@Component
public class MultiStepJobDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job multiStepJob() {
return jobBuilderFactory.get("multiStepJob2")
.start(step1())
.on(ExitStatus.COMPLETED.getExitCode()).to(step2())
.from(step2())
.on(ExitStatus.COMPLETED.getExitCode()).to(step3())
.from(step3()).end()
.build();
}
private Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step step2() {
return stepBuilderFactory.get("step2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step step3() {
return stepBuilderFactory.get("step3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
}
multiStepJob()
方法的含义是:multiStepJob2任务先执行step1,当step1状态为完成时,接着执行step2,当step2的状态为完成时,接着执行step3。ExitStatus.COMPLETED
常量表示任务顺利执行完毕,正常退出,该类还包含以下几种退出状态:
public class ExitStatus implements Serializable, Comparable<ExitStatus> {
/**
* Convenient constant value representing unknown state - assumed not
* continuable.
*/
public static final ExitStatus UNKNOWN = new ExitStatus("UNKNOWN");
/**
* Convenient constant value representing continuable state where processing
* is still taking place, so no further action is required. Used for
* asynchronous execution scenarios where the processing is happening in
* another thread or process and the caller is not required to wait for the
* result.
*/
public static final ExitStatus EXECUTING = new ExitStatus("EXECUTING");
/**
* Convenient constant value representing finished processing.
*/
public static final ExitStatus COMPLETED = new ExitStatus("COMPLETED");
/**
* Convenient constant value representing job that did no processing (e.g.
* because it was already complete).
*/
public static final ExitStatus NOOP = new ExitStatus("NOOP");
/**
* Convenient constant value representing finished processing with an error.
*/
public static final ExitStatus FAILED = new ExitStatus("FAILED");
/**
* Convenient constant value representing finished processing with
* interrupted status.
*/
public static final ExitStatus STOPPED = new ExitStatus("STOPPED");
...
}
Flow的作用就是可以将多个步骤Step组合在一起然后再组装到任务job中
@Component
public class FlowJobDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job flowJob() {
return jobBuilderFactory.get("flowJob")
.start(flow())
.next(step3())
.end()
.build();
}
private Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step step2() {
return stepBuilderFactory.get("step2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step step3() {
return stepBuilderFactory.get("step3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
// 创建一个flow对象,包含若干个step
private Flow flow() {
return new FlowBuilder<Flow>("flow")
.start(step1())
.next(step2())
.build();
}
}
通过FlowBuilder将step1和step2组合在一起,创建了一个名为flow的Flow,然后将其赋给任务job,使用flow和step构建job的区别是,job流程中包含flow类型的时候需要在build方法前调用end()方法
? 任务中的步骤可以串行执行外,还可以并行执行,并行执行在特定的业务需求下可以提供任务执行效率
? 将任务并行化只需要简单步骤:
@Component
public class SplitJobDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job splitJob() {
return jobBuilderFactory.get("splitJob")
.start(flow1())
.split(new SimpleAsyncTaskExecutor()).add(flow2())
.end()
.build();
}
private Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step step2() {
return stepBuilderFactory.get("step2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step step3() {
return stepBuilderFactory.get("step3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Flow flow1() {
return new FlowBuilder<Flow>("flow1")
.start(step1())
.next(step2())
.build();
}
private Flow flow2() {
return new FlowBuilder<Flow>("flow2")
.start(step3())
.build();
}
}
上面的例子中,我们创建了两个Flow:flow(包含step1和step2)和flow2(包含step3)然后通过jobBuilderFactory的split方法,指定一个异步执行器,将flow1和flow2异步执行
? 决策器的作用就是可以指定程序在不同的情况下运行不同的流程,比如今天是周末,则让任务执行step1和step2,如果是工作日,则选择step1和step3
? 使用决策器前,我们需要自定义一个决策器的实现
@Component
public class MyDecider implements JobExecutionDecider {
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
LocalDate now = LocalDate.now();
DayOfWeek dayOfWeek = now.getDayOfWeek();
if (dayOfWeek == DayOfWeek.SATURDAY || dayOfWeek == DayOfWeek.SUNDAY) {
return new FlowExecutionStatus("weekend");
} else {
return new FlowExecutionStatus("workingDay");
}
}
}
MyDecider
实现JobExecutionDecider
接口的decide
方法,该方法返回FlowExecutionStatus
。上面的逻辑是:判断今天是否是周末,如果是,返回FlowExecutionStatus("weekend")
状态,否则返回FlowExecutionStatus("workingDay")
状态。
下面演示如何在任务Job里使用决策器。在job包下新建DeciderJobDemo
:
@Component
public class DeciderJobDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private MyDecider myDecider;
@Bean
public Job deciderJob() {
return jobBuilderFactory.get("deciderJob")
.start(step1())
.next(myDecider)
.from(myDecider).on("weekend").to(step2())
.from(myDecider).on("workingDay").to(step3())
.from(step3()).on("*").to(step4())
.end()
.build();
}
private Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step step2() {
return stepBuilderFactory.get("step2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step step3() {
return stepBuilderFactory.get("step3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step step4() {
return stepBuilderFactory.get("step4")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤四操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
上面代码中,我们注入了自定义决策器MyDecider
,然后在jobDecider()
方法里使用了该决策器:
@Bean
public Job deciderJob() {
return jobBuilderFactory.get("deciderJob")
.start(step1())
.next(myDecider)
.from(myDecider).on("weekend").to(step2())
.from(myDecider).on("workingDay").to(step3())
.from(step3()).on("*").to(step4())
.end()
.build();
}
这段代码的含义是:任务deciderJob首先执行step1,然后指定自定义决策器,如果决策器返回weekend,那么执行step2,如果决策器返回workingDay,那么执行step3,那么无论step3 的结果如何都将执行step4
任务job除了可以由step或者Flow构成外,我们还可以将多个job转换为特殊的step.然后再赋给另一个任务job,这就是任务的嵌套
@Component
public class NestedJobDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobRepository jobRepository;
@Autowired
private PlatformTransactionManager platformTransactionManager;
// 父任务
@Bean
public Job parentJob() {
return jobBuilderFactory.get("parentJob")
.start(childJobOneStep())
.next(childJobTwoStep())
.build();
}
// 将任务转换为特殊的步骤
private Step childJobOneStep() {
return new JobStepBuilder(new StepBuilder("childJobOneStep"))
.job(childJobOne())
.launcher(jobLauncher)
.repository(jobRepository)
.transactionManager(platformTransactionManager)
.build();
}
// 将任务转换为特殊的步骤
private Step childJobTwoStep() {
return new JobStepBuilder(new StepBuilder("childJobTwoStep"))
.job(childJobTwo())
.launcher(jobLauncher)
.repository(jobRepository)
.transactionManager(platformTransactionManager)
.build();
}
// 子任务一
private Job childJobOne() {
return jobBuilderFactory.get("childJobOne")
.start(
stepBuilderFactory.get("childJobOneStep")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("子任务一执行步骤。。。");
return RepeatStatus.FINISHED;
}).build()
).build();
}
// 子任务二
private Job childJobTwo()
return jobBuilderFactory.get("childJobTwo")
.start(
stepBuilderFactory.get("childJobTwoStep")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("子任务二执行步骤。。。");
return RepeatStatus.FINISHED;
}).build()
).build();
}
}
上面的代码中,我们通过childJobOne()和childJobTwo()方法创建两个任务Job,关键在于childJobOne()和childJobTwo(),在childJobOne()方法中,我们通过JobStepBuilder构建了一个名称为childJobOneStep的Step,顾名思义,他是一个任务型Step的构造工厂,可以将任务转换为特殊的步骤,在构建过程中,我们还需要传入任务执行器JobLauncher,任务仓库JobRepository和事务管理器PlantformTransactionManager.将任务转换为特殊步骤后,将其赋给父任务parentJob即可
标签:step 项目 wait org pat toc schema 执行 任务
原文地址:https://www.cnblogs.com/evilposeidon/p/14111729.html