码迷,mamicode.com
首页 > 数据库 > 详细

Spring-batch(ItemWriter)数据写入数据库,普通文件,xml文件,多文件分类写入

时间:2018-10-11 18:43:41      阅读:276      评论:0      收藏:0      [点我收藏+]

标签:bcp   tap   token   图片   RoCE   tab   processor   aging   do it   

Spring-batch学习总结(四)
一.ItemWriter简介
1.对于read读取数据时是一个item为单位的循环读取,而对于writer写入数据则是以chunk为单位,一块一块的进行写入
2.例(我们举一个小例子来认识其writer原理):
代码:
OutOverViewApplication

package com.dhcc.batch.batchDemo.output.outview;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class OutOverViewApplication {

    public static void main(String[] args) {
        SpringApplication.run(OutOverViewApplication.class, args);
    }
}

OutputViewItemWriterConfiguration

package com.dhcc.batch.batchDemo.output.outview;

import java.util.ArrayList;
import java.util.List;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class OutputViewItemWriterConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    @Qualifier("OutputViewItemWriter")
    private ItemWriter<? super String> outputViewItemWriter;

    @Bean
    public Job OutputViewItemWriterJob3() {
        return jobBuilderFactory.get("OutputViewItemWriterJob3")
                .start(OutputViewItemWriterStep3())
                .build();

    }

    @Bean
    public Step OutputViewItemWriterStep3() {
        return stepBuilderFactory.get("OutputViewItemWriterStep3")
                .<String, String>chunk(10)
                .reader(listViewItemRead())
                .writer(outputViewItemWriter)
                .build();
    }

    @Bean
    @StepScope
    public ListItemViewReader<String> listViewItemRead() {
        List<String> dataList=new ArrayList<>();
        for(int i=0;i<100;i++) {
            dataList.add("my name is zhongqiujie"+i);
        }
        return new ListItemViewReader<String>(dataList);
    }

}

ListItemViewReader

package com.dhcc.batch.batchDemo.output.outview;

import java.util.Iterator;
import java.util.List;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

@SuppressWarnings("hiding")
public class ListItemViewReader<String> implements ItemReader<String>{

    private final Iterator<String> iterator;

    public ListItemViewReader(List<String> data) {
        this.iterator = data.iterator();
    }

    @Override
    public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        if (iterator.hasNext()) {
            return this.iterator.next();
        } else {
            return null;
        }
    }
}

OutputViewItemWriter implements

package com.dhcc.batch.batchDemo.output.outview;

import java.util.List;

import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;

@Component("OutputViewItemWriter")
public class OutputViewItemWriter implements ItemWriter<String> {

    @Override
    public void write(List<? extends String> items) throws Exception {
        System.out.println("writer chunk size is :" + items.size());
        for (String item : items) {
            System.out.println("writer data is:" + item);
        }
    }

}

运行结果:
技术分享图片

二.将数据写入到数据库
1.在spring batch中为我们提供了许多将数据写入到数据库中的writer
(1)Neo4jItemWriter;
(2)MongoItemWriter;
..........
2.此处我们只学习JdbcBatchItemWriter
例:我们先在数据库中建立数据表alipaytrando,结构如下:
技术分享图片
接下来我们将项目中的springbatchtest2文件读出并写入到数据库表alipaytrando中
Springbatchtest2文件结构如下:
技术分享图片
开始写代码:
AlipayTranDo

package com.dhcc.batch.batchDemo.output.db.entity;

public class AlipayTranDo {
        private String tranId;
        private String channel;
        private String tranType;
        private String counterparty;
        private String goods;
        private String amount;
        private String isDebitCredit;
        private String state;

        public AlipayTranDo(String tranId, String channel, String tranType, String counterparty, String goods,
                String amount, String isDebitCredit, String state) {
            super();
            this.tranId = tranId;
            this.channel = channel;
            this.tranType = tranType;
            this.counterparty = counterparty;
            this.goods = goods;
            this.amount = amount;
            this.isDebitCredit = isDebitCredit;
            this.state = state;
        }

        public String getTranId() {
            return tranId;
        }

        public void setTranId(String tranId) {
            this.tranId = tranId;
        }

        public String getChannel() {
            return channel;
        }

        public void setChannel(String channel) {
            this.channel = channel;
        }

        public String getTranType() {
            return tranType;
        }

        public void setTranType(String tranType) {
            this.tranType = tranType;
        }

        public String getCounterparty() {
            return counterparty;
        }

        public void setCounterparty(String counterparty) {
            this.counterparty = counterparty;
        }

        public String getGoods() {
            return goods;
        }

        public void setGoods(String goods) {
            this.goods = goods;
        }

        public String getAmount() {
            return amount;
        }

        public void setAmount(String amount) {
            this.amount = amount;
        }

        public String getIsDebitCredit() {
            return isDebitCredit;
        }

        public void setIsDebitCredit(String isDebitCredit) {
            this.isDebitCredit = isDebitCredit;
        }

        public String getState() {
            return state;
        }

        public void setState(String state) {
            this.state = state;
        }

        @Override
        public String toString() {
            return "AlipayTranDO{" +
                    "tranId=‘" + tranId + ‘\‘‘ +
                    ", channel=‘" + channel + ‘\‘‘ +
                    ", tranType=‘" + tranType + ‘\‘‘ +
                    ", counterparty=‘" + counterparty + ‘\‘‘ +
                    ", goods=‘" + goods + ‘\‘‘ +
                    ", amount=‘" + amount + ‘\‘‘ +
                    ", isDebitCredit=‘" + isDebitCredit + ‘\‘‘ +
                    ", state=‘" + state + ‘\‘‘ +
                    ‘}‘;
        }
    }

AlipayTranDoFileMapper

package com.dhcc.batch.batchDemo.output.db.util;

import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;

public class AlipayTranDoFileMapper implements FieldSetMapper<AlipayTranDo> {

    @Override
    public AlipayTranDo mapFieldSet(FieldSet fieldSet) throws BindException {
        return new AlipayTranDo(fieldSet.readString("tranId")
                , fieldSet.readString("channel")
                ,fieldSet.readString("tranType")
                , fieldSet.readString("counterparty")
                , fieldSet.readString("goods")
                ,fieldSet.readString("amount")
                , fieldSet.readString("isDebitCredit")
                , fieldSet.readString("state")
                );
    }

}

OutputItemWriterDBApplication

package com.dhcc.batch.batchDemo.output.db.jdbcout;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class OutputItemWriterDBApplication {
    public static void main(String[] args) {
        SpringApplication.run(OutputItemWriterDBApplication.class, args);

    }
}

*OutputItemWriterDBConfiguration

package com.dhcc.batch.batchDemo.output.db.jdbcout;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;

@Configuration
public class OutputItemWriterDBConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    @Qualifier("outputDBItemReader")
    private ItemReader<? extends AlipayTranDo> outputDBItemReader;
    @Autowired
    @Qualifier("outputDBItemWriter")
    private ItemWriter<? super AlipayTranDo> outputDBItemWriter;

    @Autowired
    private MyProcess myProcess;
    @Bean
    public Job OutputItemWriterDBJob2() {
        return jobBuilderFactory.get("OutputItemWriterDBJob2").start(OutputItemWriterDBStep2()).build();

    }

    @Bean
    public Step OutputItemWriterDBStep2() {
        return stepBuilderFactory.get("OutputItemWriterDBStep2").<AlipayTranDo, AlipayTranDo>chunk(50)
                .reader(outputDBItemReader)
                .processor(myProcess)
                .writer(outputDBItemWriter)
                .build();
    }

}

OutputItemWriterDBItemReaderConfiguration

package com.dhcc.batch.batchDemo.output.db.jdbcout;

import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;
import com.dhcc.batch.batchDemo.output.db.util.AlipayTranDoFileMapper;
@Configuration
public class OutputItemWriterDBItemReaderConfiguration {

    @Bean
    public FlatFileItemReader<AlipayTranDo> outputDBItemReader(){
        FlatFileItemReader<AlipayTranDo> reader=new FlatFileItemReader<AlipayTranDo>();
        reader.setEncoding("UTF-8");
        reader.setResource(new ClassPathResource("/data/init/springbatchtest2.csv"));
        reader.setLinesToSkip(5);

        DelimitedLineTokenizer tokenizer=new DelimitedLineTokenizer();
        tokenizer.setNames(new String[] 
                {"tranId","channel","tranType","counterparty","goods","amount","isDebitCredit","state"}
        );
        DefaultLineMapper<AlipayTranDo> lineMapper=new DefaultLineMapper<AlipayTranDo>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(new AlipayTranDoFileMapper());
        lineMapper.afterPropertiesSet();
        reader.setLineMapper(lineMapper);
        return reader;
    }

}

MyProcess

package com.dhcc.batch.batchDemo.output.db.jdbcout;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;

@Component
public class MyProcess implements ItemProcessor<AlipayTranDo, AlipayTranDo> {

    @Override
    public AlipayTranDo process(AlipayTranDo item) throws Exception {
        System.out.println(item);
        return item;
    }

}

OutputItemWriterDBItemWriterConfiguration

package com.dhcc.batch.batchDemo.output.db.jdbcout;

import javax.sql.DataSource;

import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;

@Configuration
public class OutputItemWriterDBItemWriterConfiguration {
    @Autowired
    private DataSource dataSource;

    @Bean
    public JdbcBatchItemWriter<AlipayTranDo> outputDBItemWriter() {
        System.out.println();
        JdbcBatchItemWriter<AlipayTranDo> writer = new JdbcBatchItemWriter<>();
        writer.setDataSource(dataSource);
        writer.setSql(
                "insert into alipaytrando"
                + "(tranId,channel,tranType,counterparty,goods,amount,isDebitCredit,state) values"
                + "(:tranId,:channel,:tranType,:counterparty,:goods,:amount,:isDebitCredit,:state) ");
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<AlipayTranDo>());
        return writer;

    }

}

运行结果:
技术分享图片
观察控制台可得我们的项目运行成功,接下来我们再到数据中观察数据是否成功插入
技术分享图片
发现表中数据已经插入成功
技术分享图片
三.将数据写入到普通文件中
1.FlatFileItemWriter可以将任何一个类型为T的对象数据写入到普通文件中
2.例:我们将数据库中的alipaytrando中的数据读出并且写入到普通文件中接下里我们开始编写代码:
实体类AlipayTranDo与上一个例子一样,我们不在重复展示
AlipayTranDoFileMapper

package com.dhcc.batch.batchDemo.output.flatfile;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;

public class AlipayTranDoFileMapper implements RowMapper<AlipayTranDo> {

    @Override
    public AlipayTranDo mapRow(ResultSet rs, int rowNum) throws SQLException {
        return new AlipayTranDo(rs.getString("tranId"), rs.getString("channel"), rs.getString("tranType"),
                rs.getString("counterparty"), rs.getString("goods"), rs.getString("amount"),
                rs.getString("isDebitCredit"), rs.getString("state"));
    }

}

AlipayTranDoLineAggregator

package com.dhcc.batch.batchDemo.output.flatfile;

import org.springframework.batch.item.file.transform.LineAggregator;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class AlipayTranDoLineAggregator implements LineAggregator<AlipayTranDo> {
    //JSON
    private ObjectMapper mapper=new ObjectMapper();

    @Override
    public String aggregate(AlipayTranDo alipayTranDo) {
        try {
            return mapper.writeValueAsString(alipayTranDo);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("unable to writer...",e);
        }
    }

}

FlatFileOutputFromDBConfiguration

package com.dhcc.batch.batchDemo.output.flatfile;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FlatFileOutputFromDBConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    @Qualifier("flatFileOutputFromDBItemReader")
    private ItemReader<? extends AlipayTranDo> flatFileOutputFromDBItemReader;

    @Autowired
    @Qualifier("flatFileOutputFromDBItemWriter")
    private ItemWriter<? super AlipayTranDo> flatFileOutputFromDBItemWriter;

    @Bean
    public Job FlatFileOutputFromDBJob() {
        return jobBuilderFactory.get("FlatFileOutputFromDBJob").start(FlatFileOutputFromDBStep()).build();

    }

    @Bean
    public Step FlatFileOutputFromDBStep() {
        return stepBuilderFactory.get("FlatFileOutputFromDBStep").<AlipayTranDo, AlipayTranDo>chunk(100)
                .reader(flatFileOutputFromDBItemReader).writer(flatFileOutputFromDBItemWriter).build();
    }

}

FlatFileOutputFromDBItemReaderConfiguration

package com.dhcc.batch.batchDemo.output.flatfile;

import java.util.HashMap;
import java.util.Map;

import javax.sql.DataSource;

import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FlatFileOutputFromDBItemReaderConfiguration {
    @Autowired
    private DataSource dataSource;

    @Bean
    public JdbcPagingItemReader<AlipayTranDo> flatFileOutputFromDBItemReader() {
        JdbcPagingItemReader<AlipayTranDo> reader = new JdbcPagingItemReader<>();
        reader.setDataSource(this.dataSource); // 设置数据源
        reader.setFetchSize(100); // 设置一次最大读取条数
        reader.setRowMapper(new AlipayTranDoFileMapper()); // 把数据库中的每条数据映射到AlipaytranDo对像中
        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("tranId,channel,tranType,counterparty,goods,amount,isDebitCredit,state"); // 设置查询的列
        queryProvider.setFromClause("from alipaytrando"); // 设置要查询的表
        Map<String, Order> sortKeys = new HashMap<String, Order>();// 定义一个集合用于存放排序列
        sortKeys.put("tranId", Order.ASCENDING);// 按照升序排序
        queryProvider.setSortKeys(sortKeys);
        reader.setQueryProvider(queryProvider);// 设置排序列
        return reader;
    }
}

FlatFileOutputFromDBItemWriterConfiguration

package com.dhcc.batch.batchDemo.output.flatfile;

import java.io.File;

import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;

@Configuration
public class FlatFileOutputFromDBItemWriterConfiguration {

    @Bean
    public FlatFileItemWriter<AlipayTranDo> flatFileOutputFromDBItemWriter(){
        FlatFileItemWriter<AlipayTranDo> writer=new FlatFileItemWriter<AlipayTranDo>();
        try {
            File path=new File("D:"+File.separator+"alipayTranDo.data").getAbsoluteFile();
//          String path=File.createTempFile("alipayTranDo", ".data").getAbsolutePath();
            System.out.println("file is create in :"+path);
            writer.setResource(new FileSystemResource(path));
            writer.setLineAggregator(new AlipayTranDoLineAggregator());
            writer.afterPropertiesSet();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return writer;

    }

}

OutputItemWriterFlatFileApplication

package com.dhcc.batch.batchDemo.output.flatfile;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class OutputItemWriterFlatFileApplication {
    public static void main(String[] args) {
        SpringApplication.run(OutputItemWriterFlatFileApplication.class, args);

    }
}

运行结果:
技术分享图片
控制台显示文件读取写入成功,我们根据文件地址,观察写入后的普通文件
技术分享图片
四.将数据写入到xml文件中
1.将数据写入到xml文件中,我们必须用到StaxEventItemWriter;
2.我们也会用到XStreamMarshall来序列文件
例:我们将数据库表alipaytrando中的数据写入到本地磁盘中
代码(此处我们只展示writer,用来写入的类,其他的均与上一个例子相同):

XMLFileOutputFromDBItemWriterConfiguration

package com.dhcc.batch.batchDemo.output.xmlfile;

import java.io.File;
import java.util.HashMap;
import java.util.Map;

import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.oxm.xstream.XStreamMarshaller;

@Configuration
public class XMLFileOutputFromDBItemWriterConfiguration {

    @Bean
    public StaxEventItemWriter<AlipayTranDo> xmlFileOutputFromDBItemWriter() throws Exception {
        XStreamMarshaller marshaller = new XStreamMarshaller();
        @SuppressWarnings("rawtypes")
        Map<String, Class> aliases = new HashMap<>();
        aliases.put("alipayTranDo", AlipayTranDo.class);
        marshaller.setAliases(aliases);

        StaxEventItemWriter<AlipayTranDo> writer = new StaxEventItemWriter<>();
        writer.setRootTagName("alipaytrandos");
        writer.setMarshaller(marshaller);
        File path = new File("D:" + File.separator + "alipayTranDo.xml").getAbsoluteFile();
        System.out.println("file is create in :" + path);
        writer.setResource(new FileSystemResource(path));
        writer.afterPropertiesSet();
        return writer;

    }

}

运行结果:
技术分享图片
根据地址观察写入后的xml文件
技术分享图片
五.将数据写入到多文件
1.将数据写入多个文件,我们使用CompositItemWriter<T>或者使用ClassifierCompositItemWriter<T>
2.例(1):我们将数据表alipaytrandao中的数据分别写入到xml文件和json文件中
此处我们只展示writer(其余代码与上例相同):
mutipleFileOutputFromDBItemWriterConfiguration

package com.dhcc.batch.batchDemo.output.mutiple.composit;

import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.oxm.xstream.XStreamMarshaller;

@Configuration
public class mutipleFileOutputFromDBItemWriterConfiguration {

    @Bean
    public FlatFileItemWriter<AlipayTranDo> jsonFileItemWriter(){
        FlatFileItemWriter<AlipayTranDo> writer=new FlatFileItemWriter<AlipayTranDo>();
        try {
            File path=new File("D:"+File.separator+"alipayTranDo1.json").getAbsoluteFile();
//          String path=File.createTempFile("alipayTranDo", ".json").getAbsolutePath();
            System.out.println("file is create in :"+path);
            writer.setResource(new FileSystemResource(path));
            writer.setLineAggregator(new AlipayTranDoLineAggregator());
            writer.afterPropertiesSet();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return writer;

    }

    @Bean 
    public StaxEventItemWriter<AlipayTranDo> xmlFileItemWriter() throws Exception{
        XStreamMarshaller marshaller=new XStreamMarshaller();
        @SuppressWarnings("rawtypes")
        Map<String, Class> aliases=new HashMap<>();
        aliases.put("alipayTranDo", AlipayTranDo.class);
        marshaller.setAliases(aliases);

        StaxEventItemWriter<AlipayTranDo> writer=new StaxEventItemWriter<>();
        writer.setRootTagName("alipaytrandos");
        writer.setMarshaller(marshaller);
        File path=new File("D:"+File.separator+"alipayTranDo1.xml").getAbsoluteFile();
        System.out.println("file is create in :"+path);
        writer.setResource(new FileSystemResource(path));
        writer.afterPropertiesSet();
        return writer;

    }

    @Bean
    public CompositeItemWriter<AlipayTranDo> alipayTranDoFileOutputFromDBItemWriter() throws Exception{
        CompositeItemWriter<AlipayTranDo> itemWriter=new CompositeItemWriter<>();
        itemWriter.setDelegates(Arrays.asList(xmlFileItemWriter(),jsonFileItemWriter()));
        itemWriter.afterPropertiesSet();

        return itemWriter;

    }

}

运行结果:
技术分享图片
观察文件:
Json:
技术分享图片
Xml:
技术分享图片
3.例(2):我们将同一个文件进行分类写入:
首先我们观察数据库表person_buf的数据结构(数据总数是10001):
技术分享图片

技术分享图片

我们的目标是将数据从数据库读出按照id的奇偶分别写入不同类型的文件中
接下来上代码:
Person

package com.dhcc.batch.batchDemo.output.mutiple.classifier;

import java.util.Date;

public class Person {
    private Integer id;
    private String name;
    private String perDesc;
    private Date createTime;
    private Date updateTime;
    private String sex;
    private Float score;
    private Double price;

    public Person() {
        super();
    }

    public Person(Integer id, String name, String perDesc, Date createTime, Date updateTime, String sex, Float score,
            Double price) {
        super();
        this.id = id;
        this.name = name;
        this.perDesc = perDesc;
        this.createTime = createTime;
        this.updateTime = updateTime;
        this.sex = sex;
        this.score = score;
        this.price = price;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public String getPerDesc() {
        return perDesc;
    }

    public void setPerDesc(String perDesc) {
        this.perDesc = perDesc;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public Date getUpdateTime() {
        return updateTime;
    }

    public void setUpdateTime(Date updateTime) {
        this.updateTime = updateTime;
    }

    public String getSex() {
        return sex;
    }

    public void setSex(String sex) {
        this.sex = sex;
    }

    public Float getScore() {
        return score;
    }

    public void setScore(Float score) {
        this.score = score;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    @Override
    public String toString() {
        return "Person [id=" + id + ", name=" + name + ", perDesc=" + perDesc + ", createTime=" + createTime + ", updateTime="
                + updateTime + ", sex=" + sex + ", score=" + score + ", price=" + price + "]";
    }

}

PersonLineAggregator

package com.dhcc.batch.batchDemo.output.mutiple.classifier;

import org.springframework.batch.item.file.transform.LineAggregator;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class PersonLineAggregator implements LineAggregator<Person> {
    //JSON
    private ObjectMapper mapper=new ObjectMapper();

    @Override
    public String aggregate(Person person) {
        try {
            return mapper.writeValueAsString(person);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("unable to writer...",e);
        }
    }

}

PersonRowMapper

package com.dhcc.batch.batchDemo.output.mutiple.classifier;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;
/**
 * 实现将数据库中的每条数据映射到Person对象中
 * @author Administrator
 *
 */
public class PersonRowMapper implements RowMapper<Person> {

    /**
     * rs一条结果集,rowNum代表当前行
     */
    @Override
    public Person mapRow(ResultSet rs, int rowNum) throws SQLException {
        return new Person(rs.getInt("id")
                ,rs.getString("name")
                ,rs.getString("per_desc")
                ,rs.getDate("create_time")
                ,rs.getDate("update_time")
                ,rs.getString("sex")
                ,rs.getFloat("score")
                ,rs.getDouble("price"));
    }

}

OutputItemWriterMutipleClassFileApplication

package com.dhcc.batch.batchDemo.output.mutiple.classifier;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class OutputItemWriterMutipleClassFileApplication {
    public static void main(String[] args) {
        SpringApplication.run(OutputItemWriterMutipleClassFileApplication.class, args);

    }
}

ClassifierMutipleFileOutputFromDBConfiguration

package com.dhcc.batch.batchDemo.output.mutiple.classifier;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ClassifierMutipleFileOutputFromDBConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    @Qualifier("mutipleFileOutputFromDBItemReader")
    private ItemReader<? extends Person> mutipleFileOutputFromDBItemReader;

    @Autowired
    @Qualifier("alipayTranDoFileOutputFromDBItemWriter")
    private ItemWriter<? super Person> alipayTranDoFileOutputFromDBItemWriter;

    @Autowired
    @Qualifier("jsonFileItemWriter")
    private ItemStream jsonFileItemWriter;

    @Autowired
    @Qualifier("xmlFileItemWriter")
    private ItemStream xmlFileItemWriter;

    @Bean
    public Job mutipleFileOutputFromDBJob1() {
        return jobBuilderFactory.get("mutipleFileOutputFromDBJob1")
                .start(mutipleFileOutputFromDBStep1())
                .build();

    }

    @Bean
    public Step mutipleFileOutputFromDBStep1() {
        return stepBuilderFactory.get("mutipleFileOutputFromDBStep1").<Person, Person>chunk(100)
                .reader(mutipleFileOutputFromDBItemReader)
                .writer(alipayTranDoFileOutputFromDBItemWriter)
                .stream(jsonFileItemWriter)
                .stream(xmlFileItemWriter)
                .build();
    }

}

mutipleFileOutputFromDBItemReaderConfiguration

package com.dhcc.batch.batchDemo.output.mutiple.classifier;

import java.util.HashMap;
import java.util.Map;

import javax.sql.DataSource;

import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class mutipleFileOutputFromDBItemReaderConfiguration {
    @Autowired
    private DataSource dataSource;

    @Bean
    public JdbcPagingItemReader<Person> mutipleFileOutputFromDBItemReader() {
        JdbcPagingItemReader<Person> reader = new JdbcPagingItemReader<>();
        reader.setDataSource(this.dataSource); // 设置数据源
        reader.setFetchSize(100); // 设置一次最大读取条数
        reader.setRowMapper(new PersonRowMapper()); // 把数据库中的每条数据映射到AlipaytranDo对像中
        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("id,name,per_desc,create_time,update_time,sex,score,price"); // 设置查询的列
        queryProvider.setFromClause("from person_buf"); // 设置要查询的表
        Map<String, Order> sortKeys = new HashMap<String, Order>();// 定义一个集合用于存放排序列
        sortKeys.put("id", Order.ASCENDING);// 按照升序排序
        queryProvider.setSortKeys(sortKeys);
        reader.setQueryProvider(queryProvider);// 设置排序列
        return reader;
    }
}

mutipleFileOutputFromDBItemWriterConfiguration

package com.dhcc.batch.batchDemo.output.mutiple.classifier;

import java.io.File;
import java.util.HashMap;
import java.util.Map;

import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.oxm.xstream.XStreamMarshaller;

@Configuration
public class mutipleFileOutputFromDBItemWriterConfiguration {

    @Bean
    public FlatFileItemWriter<Person> jsonFileItemWriter(){
        FlatFileItemWriter<Person> writer=new FlatFileItemWriter<Person>();
        try {
            File path=new File("D:"+File.separator+"person.json").getAbsoluteFile();
            System.out.println("file is create in :"+path);
            writer.setResource(new FileSystemResource(path));
            writer.setLineAggregator(new PersonLineAggregator());
            writer.afterPropertiesSet();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return writer;

    }

    @Bean 
    public StaxEventItemWriter<Person> xmlFileItemWriter() throws Exception{
        XStreamMarshaller marshaller=new XStreamMarshaller();
        @SuppressWarnings("rawtypes")
        Map<String, Class> aliases=new HashMap<>();
        aliases.put("person", Person.class);
        marshaller.setAliases(aliases);

        StaxEventItemWriter<Person> writer=new StaxEventItemWriter<>();
        writer.setRootTagName("persons");
        writer.setMarshaller(marshaller);
        File path=new File("D:"+File.separator+"person.xml").getAbsoluteFile();
        System.out.println("file is create in :"+path);
        writer.setResource(new FileSystemResource(path));
        writer.afterPropertiesSet();
        return writer;

    }

    @Bean
    public ClassifierCompositeItemWriter<Person> alipayTranDoFileOutputFromDBItemWriter() throws Exception{
        ClassifierCompositeItemWriter<Person> itemWriter=new ClassifierCompositeItemWriter<Person>();

        itemWriter.setClassifier(new MyWriterClassifier(jsonFileItemWriter(),xmlFileItemWriter()));
        return itemWriter;

    }

}

MyWriterClassifier

package com.dhcc.batch.batchDemo.output.mutiple.classifier;

import org.springframework.batch.item.ItemWriter;
import org.springframework.classify.Classifier;

public class MyWriterClassifier implements Classifier<Person, ItemWriter<? super Person>> {
    private ItemWriter<Person> jsonWriter;
    private ItemWriter<Person> xmlWriter;

    /**
     * 
     */
    private static final long serialVersionUID = -2911015707834323846L;

    public MyWriterClassifier(ItemWriter<Person> jsonWriter, ItemWriter<Person> xmlWriter) {
        this.jsonWriter = jsonWriter;
        this.xmlWriter = xmlWriter;

    }

    @Override
    public ItemWriter<? super Person> classify(Person classifiable) {
        if (classifiable.getId()%2==0) {
            return jsonWriter;
        }else {
            return xmlWriter;
        }
    }

}

运行结果:
技术分享图片
观察文件:
Person.json:(我们可以看出id为偶数的都写在了json文件中)
技术分享图片
Person.xml:(我们可以看出id为奇数的都写在了xml文件中)

技术分享图片

Spring-batch(ItemWriter)数据写入数据库,普通文件,xml文件,多文件分类写入

标签:bcp   tap   token   图片   RoCE   tab   processor   aging   do it   

原文地址:http://blog.51cto.com/13501268/2298822

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!