标签:conf 需要 定义 connect cal read 执行 time nec
本案例旨在让新手从0开始完成一个批量任务的开发
CREATE TABLE `music_info` (
`id` int(10) NOT NULL AUTO_INCREMENT COMMENT ‘主键id‘,
`singer_name` varchar(100) NOT NULL COMMENT ‘歌手名‘,
`music_size` varchar(100) NOT NULL COMMENT ‘歌曲大小‘,
`music_name` varchar(100) NOT NULL COMMENT ‘歌曲名‘,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8;
insert into `music_info`(`id`,`singer_name`,`music_size`,`music_name`) values (1,‘小三‘,‘3.2M‘,‘起风了‘),(2,‘刘德华‘,‘3.0M‘,‘忘情水‘),(3,‘猪点点‘,‘5.0M‘,‘会写程序的小猪‘);
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
@SpringBootApplication
@EnableBatchProcessing
public class SpringbatchApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbatchApplication.class, args);
}
}
#开发配置
#数据库连接参数的配置
spring.datasource.driverClassName = com.mysql.jdbc.Driver
spring.datasource.url = jdbc:mysql://localhost:3306/test?serverTimezone=UTC
spring.datasource.username = root
spring.datasource.password = 123456
#项目启动时的建表sql脚本,该脚本由Spring Batch提供
spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql
#在项目启动时进行执行建表sql
#是否生成执行状态记录的表结构
spring.batch.initialize-schema=always
#禁止Spring Batch自动执行,既需要用户触发才能执行
spring.batch.job.enabled=true
public class Music {
// 主键id
private Integer id;
// 歌手名
private String singer_name;
// 歌曲大小
private String music_size;
// 歌曲名
private String music_name;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getSinger_name() {
return singer_name;
}
public void setSinger_name(String singer_name) {
this.singer_name = singer_name;
}
public String getMusic_size() {
return music_size;
}
public void setMusic_size(String music_size) {
this.music_size = music_size;
}
public String getMusic_name() {
return music_name;
}
public void setMusic_name(String music_name) {
this.music_name = music_name;
}
@Override
public String toString() {
return "Music{" +
"id=" + id +
", singer_name=‘" + singer_name + ‘\‘‘ +
", music_size=‘" + music_size + ‘\‘‘ +
", music_name=‘" + music_name + ‘\‘‘ +
‘}‘;
}
}
@Mapper
public interface MusicDao {
//通过id查询数据库记录
@Select("select id , singer_name , music_size , music_name from music_info where id = #{id};")
public List<Music> queryInfoById(Map<String , Integer> map);
}
@Configuration
public class BatchJobDemo {
@Autowired
JobBuilderFactory jobBuilderFactory;
@Autowired
StepBuilderFactory stepBuilderFactory;
@Autowired
SqlSessionFactory sqlSessionFactory;
private static final String JOB = "job";
private static final String STEP = "step";
//配置一个Job
@Bean(name = JOB)
Job job() {
return jobBuilderFactory.get(JOB)
.start(step())
.build();
}
//配置一个Step
@Bean(name = STEP)
Step step() {
return stepBuilderFactory.get(STEP)
.<Music, Music>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.build();
}
//配置itemReader
@Bean("itemReader")
@StepScope
MyBatisCursorItemReader<Music> itemReader() {
System.out.println("开始查询数据库");
MyBatisCursorItemReader<Music> reader = new MyBatisCursorItemReader<>();
Map<String , Object> map = new HashMap<>();
map.put("id" , 2);
reader.setQueryId("com.example.springbatch.dao.MusicDao.queryInfoById");
reader.setSqlSessionFactory(sqlSessionFactory);
reader.setParameterValues(map);
return reader;
}
//配置itemWriter
@Bean("itemWriter")
@StepScope
FlatFileItemWriter<Music> itemWriter() {
System.out.println("开始写入文件中");
FlatFileItemWriter<Music> writer = new FlatFileItemWriter<>();
writer.setResource(new FileSystemResource("F:\\music.txt"));//系统目录
//将Music对象转换成字符串,并输出到文件
writer.setLineAggregator(new LineAggregator<Music>() {
@Override
public String aggregate(Music music) {
ObjectMapper mapper = new ObjectMapper();
String str = null;
try {
str = mapper.writeValueAsString(music);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return str;
}
});
return writer;
}
}
{"id":2,"singer_name":"刘德华","music_size":"3.0M","music_name":"忘情水"}
技术总结
1、 什么是SpringBatch?
是Spring提供的一个数据处理框架,是一个轻量级,全面的批处理框架,旨在开发对企业系统日常运营至关重要的强大批处理应用程序
2、 批处理应用程序大致流程
从数据库、文件、队列中读取大量记录
以某种方式处理数据
以修改之后的形式写回数据
3、 SpringBatch总体架构
在SpringBatch中一个Job可以定义很多的步骤Step,在每一个Step中可以定义其专属的ItemReader用于读取数据,ItemProcesseor用于处理数据,ItemWriter用于写入数据,而每一个定义的Job则都在JobRepository中,我们可以通过JobLauncher来启动某个Job
4、 什么是Job?
Job是一个封装整个批处理过程的概念,在SpringBatch体系中是一个最顶层的抽象概念,体现在代码中则是一个最上层的接口
5、 什么是JobLauncher?
该接口主要用于启动指定了JobParameters的Job,JobParameters和Job一起才能组成一次Job的执行
6、 chunk处理流程
由于一次batch任务可能会有很多数据读写操作,一条一条的处理并向数据库提交的话效率不会很高,因此SpringBatch提出了chunk的概念,设定一个chunk size,SpraingBatch将会一条条处理数据,但是不会提交到数据库,只有当处理的数据达到了chunk size设定值,才会一起commit
例如:在一个Step中,chunk size设置为10,当ItemReader读的数据达到10的时候,这一批次就一起传到ItemWriter,同时Transaction被提交
7、 skip策略和失败处理
skipLimit():该方法的作用是我们可以设定一个我们允许的Step跳过异常的数量,加入设定为10,那么整个Step运行时,只要出现异常数量不超过10,整个Step就不会失败
skip():用来指定跳过的异常,因为有些异常的出现, 我们可以忽略
noSkip():指定某些异常出现时,无需跳过,一旦出现,计数器就会累加一次,直到达到上限
8、JobLauncher何时使用?
问题:
1、 如果一个类实现了Job接口,会怎样,实际开发中没有出现实现Job接口的情况?
2、 什么是JobInstance、什么是JobParameters、什么是JobExecution?Springboot整合SpringBatch完成基本案例--从数据库读取数据并写入文件
Springboot整合SpringBatch完成基本案例--从数据库读取数据并写入文件
标签:conf 需要 定义 connect cal read 执行 time nec
原文地址:https://www.cnblogs.com/codeXi/p/12951277.html