码迷,mamicode.com
首页 > 编程语言 > 详细

Super CSV 线程池高并发处理大批量数据

时间:2018-04-17 15:12:07      阅读:249      评论:0      收藏:0      [点我收藏+]

标签:CSV   super-csv   批量处理   并发处理   csv大数据   

Super CSV是一个用于处理CSV文件的Java开源项目。它完全围绕面向对象的思想进行设计,因此可以利用你的面向对象代码来使得处理CSV文件变得更加简易。它支持输入/输出类型转换、数据完整性校验,支持从任何地方以任何编码读写数据,只要提供相应的Reader与Writer对象。可配置分割符,空格符号和行结束符等。

一、下面先来看简单数据处理
引入依赖包:

<dependency>
    <groupId>net.sf.supercsv</groupId>
    <artifactId>super-csv</artifactId>
    <version>2.4.0</version>
</dependency>

下面来看一下官方文档中的代码示例。

  1. 根据头来读取CSV文件
    把文件中的每行记录读取出来转化为java对象,假设你有一个UserBean类,代码如下:

    public class UserBean {  
            int id;
            String username, password, street, town;  
            int zip;  
    
            public int getId() { return id;}
            public String getPassword() { return password; }  
            public String getStreet() { return street; }  
            public String getTown() { return town; }  
            public String getUsername() { return username; }  
            public int getZip() { return zip; }  
            public void setId(int id) { this.id = id; }  
            public void setPassword(String password) { this.password = password; }  
            public void setStreet(String street) { this.street = street; }  
            public void setTown(String town) { this.town = town; }  
            public void setUsername(String username) { this.username = username; }  
            public void setZip(int zip) { this.zip = zip; }  
        }  

    并且有一个CSV文件,包含一个文件头,假设文件内容如下:
    id,username,password,date,zip,town
    1,Klaus,qwexyKiks,17/1/2007,1111,New York
    2,Oufud,bobilop213,10/10/2007,4555,New York
    3,Oufud1,bobilop213,10/10/2007,4555,New York
    4,Oufud2,bobilop213,10/10/2007,4555,New York
    5,Oufud3,bobilop213,10/10/2007,4555,New York
    6,Oufud4,bobilop213,10/10/2007,4555,New York
    7,Oufud5,bobilop213,10/10/2007,4555,New York
    8,Oufud6,bobilop213,10/10/2007,4555,New York
    9,Oufud7,bobilop213,10/10/2007,4555,New York
    10,Oufud8,bobilop213,10/10/2007,4555,New York
    11,Oufud9,bobilop213,10/10/2007,4555,New York
    12,Oufud10,bobilop213,10/10/2007,4555,New York
    13,Oufud11,bobilop213,10/10/2007,4555,New York
    14,Oufud12,bobilop213,10/10/2007,4555,New York
    15,Oufud13,bobilop213,10/10/2007,4555,New York

    然后你可以使用一下代码来创建UserBean的实例对象,并打印出对象的属性值:

    class ReadingObjects {  
        public static void main(String[] args) throws Exception{  
            ICsvBeanReader inFile = new CsvBeanReader(new FileReader("foo.csv"), CsvPreference.STANDARD_PREFERENCE);  
            try {  
                final String[] header = inFile.getCSVHeader(true);  
                UserBean user;  
                while( (user = inFile.read(UserBean.class, header, processors)) != null) {  
                    System.out.println(user.getZip());  
                }  
            } finally {  
                inFile.close();  
            }  
        }  
    }  

    我们还剩下processors没有定义,通过名字我们可以看出是解析器,用来处理每列的数据,当然你也可以传入null,表示该列不做特殊处理,每个解析器可以被另外一个包含在内部,new Unique(new StrMinMax(5,20)),这个代码该列的值为唯一的,并且长度为8到20,具体处理细节我们先不讲,来看一下我们所需要的processors是如何定义的:

    final CellProcessor[] processors = new CellProcessor[] {  
        new Unique(new ParseInt()),
        new Unique(new StrMinMax(5, 20)),  
        new StrMinMax(8, 35),  
        new ParseDate("dd/MM/yyyy"),  
        new Optional(new ParseInt()),  
        null  
    };  

    上面的代码的具体意思为:
    第一列是一个字符串,并且值是唯一的,长度为5到20
    第二列是一个字符串,长度是8到35
    第三列为一个日期类型,格式为天/月/年(day/month/year)
    第四列是一个整型数字,但只有这列有值的时候ParseInt处理器才会去处理这个值(其实就是该列可以为空)
    第五列为一个字符串(默认),不使用处理器

如果你的CSV文件没有头,你也可以定义个数组来替代:

final String[] header = new String[] { "username", "password", "date", "zip", "town"}; 

如果你想忽略某一列,和定义处理器类似,直接在头数组中使用null。

全部代码如下:

import Java.io.FileReader;  
import Java.io.IOException;  
import org.supercsv.cellprocessor.Optional;  
import org.supercsv.cellprocessor.ParseDate;  
import org.supercsv.cellprocessor.ParseInt;  
import org.supercsv.cellprocessor.constraint.StrMinMax;  
import org.supercsv.cellprocessor.constraint.Unique;  
import org.supercsv.cellprocessor.ift.CellProcessor;  
import org.supercsv.io.CsvBeanReader;  
import org.supercsv.io.ICsvBeanReader;  
import org.supercsv.prefs.CsvPreference;  

class ReadingObjects {  

    static final CellProcessor[] userProcessors = new CellProcessor[] {  
            new Unique(new ParseInt()),
        new Unique(new StrMinMax(5, 20)),  
        new StrMinMax(8, 35),  
        new ParseDate("dd/MM/yyyy"),  
        new Optional(new ParseInt()),  
        null  
    };  

    public static void main(String[] args) throws Exception {  
        ICsvBeanReader inFile = new CsvBeanReader(new FileReader("foo.csv"), CsvPreference.STANDARD_PREFERENCE);  
        try {  
          final String[] header = inFile.getCSVHeader(true);  
          UserBean user;  
          while( (user = inFile.read(UserBean.class, header, userProcessors)) != null) {  
            System.out.println(user.getZip());  
          }  
        } finally {  
          inFile.close();  
        }  
   }  
}  

public class UserBean {  
    String username, password, town;  
    Date date;  
    int zip;  

    public Date getDate() {  
        return date;  
    }  

    public String getPassword() {  
        return password;  
    }  

    public String getTown() {  
        return town;  
    }  

    public String getUsername() {  
        return username;  
    }  

    public int getZip() {  
        return zip;  
    }  

    public void setDate(final Date date) {  
        this.date = date;  
    }  

    public void setPassword(final String password) {  
        this.password = password;  
    }  

    public void setTown(final String town) {  
        this.town = town;  
    }  

    public void setUsername(final String username) {  
        this.username = username;  
    }  

    public void setZip(final int zip) {  
        this.zip = zip;  
    }  

}  

如果你在读取文件之前根本不知道文件的具体格式,你可以选择CsvListReader.read()方法,把每行读出出来的数据放在一个List里面。

读取文件的代码我们看到了,下面来看一下写的操作,也很简单。

import Java.util.HashMap;  
import org.supercsv.io.*;  
import org.supercsv.prefs.CsvPreference;  

class WritingMaps {  
  main(String[] args) throws Exception {  
    ICsvMapWriter writer = new CsvMapWriter(new FileWriter(...), CsvPreference.STANDARD_PREFERENCE);  
    try {  
      final String[] header = new String[] { "name", "city", "zip" };  
      // set up some data to write  
      final HashMap<String, ? super Object> data1 = new HashMap<String, Object>();  
      data1.put(header[0], "Karl");  
      data1.put(header[1], "Tent city");  
      data1.put(header[2], 5565);  
      final HashMap<String, ? super Object> data2 = new HashMap<String, Object>();  
      data2.put(header[0], "Banjo");  
      data2.put(header[1], "River side");  
      data2.put(header[2], 5551);  
      // the actual writing  
      writer.writeHeader(header);  
      writer.write(data1, header);  
      writer.write(data2, header);  
    } finally {  
      writer.close();  
    }  
  }  
}  

二、并发分批处理大数据量的数据更新
代码如下

import org.supercsv.cellprocessor.Optional;
import org.supercsv.cellprocessor.ParseDate;
import org.supercsv.cellprocessor.ParseInt;
import org.supercsv.cellprocessor.constraint.StrMinMax;
import org.supercsv.cellprocessor.constraint.Unique;
import org.supercsv.cellprocessor.ift.CellProcessor;
import org.supercsv.io.CsvBeanReader;
import org.supercsv.io.ICsvBeanReader;
import org.supercsv.prefs.CsvPreference;

import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

class ThreadReadingObjects {

    static final CellProcessor[] userProcessors = new CellProcessor[] {
            new Unique(new ParseInt()),//唯一的,int id
            new Unique(new StrMinMax(5, 20)),//唯一的,长度为5到20
            new StrMinMax(8, 35), //长度是8到35
            new ParseDate("dd/MM/yyyy"), //格式为天/月/年(day/month/year)
            new Optional(new ParseInt()), //整型数字,但只有这列有值的时候ParseInt处理器才会去处理这个值(其实就是该列可以为空)
            null //不使用处理器
    };

    public static void main(String[] args) throws Exception {
        // InputStreamReader freader = new InputStreamReader(inputStream,"UTF-8");
        // ICsvBeanReader inFile = new CsvBeanReader(freader, CsvPreference.STANDARD_PREFERENCE);

        ICsvBeanReader inFile = new CsvBeanReader(new FileReader("D:\\foo.csv"), CsvPreference.STANDARD_PREFERENCE);

        ExecutorService executorService = null;
        try {
            //如果你的CSV文件没有头,你也可以定义个数组来替代:
            // final String[] header = new String[] { "username", "password", "date", "zip", "town"};
            final String[] header = inFile.getHeader(true);

            //创建缓存线程池
            List<Future<String>> futureList = new ArrayList<Future<String>>();
            executorService = Executors.newCachedThreadPool();

            //分页读取数据后,加入线程池处理
            while (getPageUserList(executorService,futureList,inFile, header)) {}

                //获取线程处理结果
                for (Future<String> future : futureList) {
                    while (true) {
                        if (future.isDone() && !future.isCancelled()) {
                            System.out.println("future result: "+future.get());
                            break;
                        } else {
                            Thread.sleep(1000);
                        }
                    }
                }

        } finally {
            inFile.close();
            executorService.shutdown();
        }
    }

    private static boolean getPageUserList(ExecutorService executorService, List<Future<String>> futureList, ICsvBeanReader inFile, String[] header) throws IOException {
        int index = 0;
        boolean status = false;
        List<UserBean> userBeans = new ArrayList<UserBean>();
        UserBean user;
        while ((user = inFile.read(UserBean.class, header, userProcessors)) != null) {// 这里从第一行开始取数据
            userBeans.add(user);
            index++;
            //读取的行数,每个线程处理的记录数,根据实际情况修改
            if (index == 10) {
                status = true;
                break;
            }
        }
        //添加到线程集合
        if(!userBeans.isEmpty()){
            Future<String> future = executorService.submit(getUpdateDbJob(futureList.size(),userBeans));
            futureList.add(future);
        }

        return status;
    }

    private static Callable<String> getUpdateDbJob(int threadNo,List<UserBean> userBeans) {
        return new Callable<String>() {
            @Override
            public String call() throws Exception {
                //分批量写入数据库
                List<UserBean> userList = new ArrayList<UserBean>();
                for(int i=0;i<userBeans.size();i++){
                    userList.add(userBeans.get(i));
                    //如果数据量比较大再次分批commit,第一次提交3条,后面每次提交2条
                    //取 % 条数根据实际情况修改
                    if (i > 0 && i % 3 == 0) {
                        System.out.println("线程"+threadNo+"更新用户:"+userList.size()+" 个成功");
                        //采用jdbcTemplate 批量写入数据库
                        // TODO 写入数据中
                        userList = new ArrayList<UserBean>();
                    } else if (i == userBeans.size() - 1) {
                        //处理最后一批数据提交
                        System.out.println("线程"+threadNo+"更新用户:"+userList.size()+" 个成功");
                        // TODO 写入数据中
                        userList = new ArrayList<UserBean>();
                    }
                }
                return String.valueOf(userBeans.size());
            }
        };
    }
}

运行后返回结果:

线程0更新用户:4 个成功
线程0更新用户:3 个成功
线程0更新用户:3 个成功
线程1更新用户:4 个成功
线程1更新用户:1 个成功
future result: 10
future result: 5

Super CSV 线程池高并发处理大批量数据

标签:CSV   super-csv   批量处理   并发处理   csv大数据   

原文地址:http://blog.51cto.com/ciyorecord/2104385

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!