标签:CSV super-csv 批量处理 并发处理 csv大数据
Super CSV是一个用于处理CSV文件的Java开源项目。它完全围绕面向对象的思想进行设计,因此可以利用你的面向对象代码来使得处理CSV文件变得更加简易。它支持输入/输出类型转换、数据完整性校验,支持从任何地方以任何编码读写数据,只要提供相应的Reader与Writer对象。可配置分割符,空格符号和行结束符等。一、下面先来看简单数据处理
引入依赖包:
<dependency>
<groupId>net.sf.supercsv</groupId>
<artifactId>super-csv</artifactId>
<version>2.4.0</version>
</dependency>
下面来看一下官方文档中的代码示例。
根据头来读取CSV文件
把文件中的每行记录读取出来转化为java对象,假设你有一个UserBean类,代码如下:
public class UserBean {
int id;
String username, password, street, town;
int zip;
public int getId() { return id;}
public String getPassword() { return password; }
public String getStreet() { return street; }
public String getTown() { return town; }
public String getUsername() { return username; }
public int getZip() { return zip; }
public void setId(int id) { this.id = id; }
public void setPassword(String password) { this.password = password; }
public void setStreet(String street) { this.street = street; }
public void setTown(String town) { this.town = town; }
public void setUsername(String username) { this.username = username; }
public void setZip(int zip) { this.zip = zip; }
}
并且有一个CSV文件,包含一个文件头,假设文件内容如下:
id,username,password,date,zip,town
1,Klaus,qwexyKiks,17/1/2007,1111,New York
2,Oufud,bobilop213,10/10/2007,4555,New York
3,Oufud1,bobilop213,10/10/2007,4555,New York
4,Oufud2,bobilop213,10/10/2007,4555,New York
5,Oufud3,bobilop213,10/10/2007,4555,New York
6,Oufud4,bobilop213,10/10/2007,4555,New York
7,Oufud5,bobilop213,10/10/2007,4555,New York
8,Oufud6,bobilop213,10/10/2007,4555,New York
9,Oufud7,bobilop213,10/10/2007,4555,New York
10,Oufud8,bobilop213,10/10/2007,4555,New York
11,Oufud9,bobilop213,10/10/2007,4555,New York
12,Oufud10,bobilop213,10/10/2007,4555,New York
13,Oufud11,bobilop213,10/10/2007,4555,New York
14,Oufud12,bobilop213,10/10/2007,4555,New York
15,Oufud13,bobilop213,10/10/2007,4555,New York
然后你可以使用一下代码来创建UserBean的实例对象,并打印出对象的属性值:
class ReadingObjects {
public static void main(String[] args) throws Exception{
ICsvBeanReader inFile = new CsvBeanReader(new FileReader("foo.csv"), CsvPreference.STANDARD_PREFERENCE);
try {
final String[] header = inFile.getCSVHeader(true);
UserBean user;
while( (user = inFile.read(UserBean.class, header, processors)) != null) {
System.out.println(user.getZip());
}
} finally {
inFile.close();
}
}
}
我们还剩下processors没有定义,通过名字我们可以看出是解析器,用来处理每列的数据,当然你也可以传入null,表示该列不做特殊处理,每个解析器可以被另外一个包含在内部,new Unique(new StrMinMax(5,20)),这个代码该列的值为唯一的,并且长度为8到20,具体处理细节我们先不讲,来看一下我们所需要的processors是如何定义的:
final CellProcessor[] processors = new CellProcessor[] {
new Unique(new ParseInt()),
new Unique(new StrMinMax(5, 20)),
new StrMinMax(8, 35),
new ParseDate("dd/MM/yyyy"),
new Optional(new ParseInt()),
null
};
上面的代码的具体意思为:
第一列是一个字符串,并且值是唯一的,长度为5到20
第二列是一个字符串,长度是8到35
第三列为一个日期类型,格式为天/月/年(day/month/year)
第四列是一个整型数字,但只有这列有值的时候ParseInt处理器才会去处理这个值(其实就是该列可以为空)
第五列为一个字符串(默认),不使用处理器
如果你的CSV文件没有头,你也可以定义个数组来替代:
final String[] header = new String[] { "username", "password", "date", "zip", "town"};
如果你想忽略某一列,和定义处理器类似,直接在头数组中使用null。
全部代码如下:
import Java.io.FileReader;
import Java.io.IOException;
import org.supercsv.cellprocessor.Optional;
import org.supercsv.cellprocessor.ParseDate;
import org.supercsv.cellprocessor.ParseInt;
import org.supercsv.cellprocessor.constraint.StrMinMax;
import org.supercsv.cellprocessor.constraint.Unique;
import org.supercsv.cellprocessor.ift.CellProcessor;
import org.supercsv.io.CsvBeanReader;
import org.supercsv.io.ICsvBeanReader;
import org.supercsv.prefs.CsvPreference;
class ReadingObjects {
static final CellProcessor[] userProcessors = new CellProcessor[] {
new Unique(new ParseInt()),
new Unique(new StrMinMax(5, 20)),
new StrMinMax(8, 35),
new ParseDate("dd/MM/yyyy"),
new Optional(new ParseInt()),
null
};
public static void main(String[] args) throws Exception {
ICsvBeanReader inFile = new CsvBeanReader(new FileReader("foo.csv"), CsvPreference.STANDARD_PREFERENCE);
try {
final String[] header = inFile.getCSVHeader(true);
UserBean user;
while( (user = inFile.read(UserBean.class, header, userProcessors)) != null) {
System.out.println(user.getZip());
}
} finally {
inFile.close();
}
}
}
public class UserBean {
String username, password, town;
Date date;
int zip;
public Date getDate() {
return date;
}
public String getPassword() {
return password;
}
public String getTown() {
return town;
}
public String getUsername() {
return username;
}
public int getZip() {
return zip;
}
public void setDate(final Date date) {
this.date = date;
}
public void setPassword(final String password) {
this.password = password;
}
public void setTown(final String town) {
this.town = town;
}
public void setUsername(final String username) {
this.username = username;
}
public void setZip(final int zip) {
this.zip = zip;
}
}
如果你在读取文件之前根本不知道文件的具体格式,你可以选择CsvListReader.read()方法,把每行读出出来的数据放在一个List里面。
读取文件的代码我们看到了,下面来看一下写的操作,也很简单。
import Java.util.HashMap;
import org.supercsv.io.*;
import org.supercsv.prefs.CsvPreference;
class WritingMaps {
main(String[] args) throws Exception {
ICsvMapWriter writer = new CsvMapWriter(new FileWriter(...), CsvPreference.STANDARD_PREFERENCE);
try {
final String[] header = new String[] { "name", "city", "zip" };
// set up some data to write
final HashMap<String, ? super Object> data1 = new HashMap<String, Object>();
data1.put(header[0], "Karl");
data1.put(header[1], "Tent city");
data1.put(header[2], 5565);
final HashMap<String, ? super Object> data2 = new HashMap<String, Object>();
data2.put(header[0], "Banjo");
data2.put(header[1], "River side");
data2.put(header[2], 5551);
// the actual writing
writer.writeHeader(header);
writer.write(data1, header);
writer.write(data2, header);
} finally {
writer.close();
}
}
}
二、并发分批处理大数据量的数据更新
代码如下
import org.supercsv.cellprocessor.Optional;
import org.supercsv.cellprocessor.ParseDate;
import org.supercsv.cellprocessor.ParseInt;
import org.supercsv.cellprocessor.constraint.StrMinMax;
import org.supercsv.cellprocessor.constraint.Unique;
import org.supercsv.cellprocessor.ift.CellProcessor;
import org.supercsv.io.CsvBeanReader;
import org.supercsv.io.ICsvBeanReader;
import org.supercsv.prefs.CsvPreference;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
class ThreadReadingObjects {
static final CellProcessor[] userProcessors = new CellProcessor[] {
new Unique(new ParseInt()),//唯一的,int id
new Unique(new StrMinMax(5, 20)),//唯一的,长度为5到20
new StrMinMax(8, 35), //长度是8到35
new ParseDate("dd/MM/yyyy"), //格式为天/月/年(day/month/year)
new Optional(new ParseInt()), //整型数字,但只有这列有值的时候ParseInt处理器才会去处理这个值(其实就是该列可以为空)
null //不使用处理器
};
public static void main(String[] args) throws Exception {
// InputStreamReader freader = new InputStreamReader(inputStream,"UTF-8");
// ICsvBeanReader inFile = new CsvBeanReader(freader, CsvPreference.STANDARD_PREFERENCE);
ICsvBeanReader inFile = new CsvBeanReader(new FileReader("D:\\foo.csv"), CsvPreference.STANDARD_PREFERENCE);
ExecutorService executorService = null;
try {
//如果你的CSV文件没有头,你也可以定义个数组来替代:
// final String[] header = new String[] { "username", "password", "date", "zip", "town"};
final String[] header = inFile.getHeader(true);
//创建缓存线程池
List<Future<String>> futureList = new ArrayList<Future<String>>();
executorService = Executors.newCachedThreadPool();
//分页读取数据后,加入线程池处理
while (getPageUserList(executorService,futureList,inFile, header)) {}
//获取线程处理结果
for (Future<String> future : futureList) {
while (true) {
if (future.isDone() && !future.isCancelled()) {
System.out.println("future result: "+future.get());
break;
} else {
Thread.sleep(1000);
}
}
}
} finally {
inFile.close();
executorService.shutdown();
}
}
private static boolean getPageUserList(ExecutorService executorService, List<Future<String>> futureList, ICsvBeanReader inFile, String[] header) throws IOException {
int index = 0;
boolean status = false;
List<UserBean> userBeans = new ArrayList<UserBean>();
UserBean user;
while ((user = inFile.read(UserBean.class, header, userProcessors)) != null) {// 这里从第一行开始取数据
userBeans.add(user);
index++;
//读取的行数,每个线程处理的记录数,根据实际情况修改
if (index == 10) {
status = true;
break;
}
}
//添加到线程集合
if(!userBeans.isEmpty()){
Future<String> future = executorService.submit(getUpdateDbJob(futureList.size(),userBeans));
futureList.add(future);
}
return status;
}
private static Callable<String> getUpdateDbJob(int threadNo,List<UserBean> userBeans) {
return new Callable<String>() {
@Override
public String call() throws Exception {
//分批量写入数据库
List<UserBean> userList = new ArrayList<UserBean>();
for(int i=0;i<userBeans.size();i++){
userList.add(userBeans.get(i));
//如果数据量比较大再次分批commit,第一次提交3条,后面每次提交2条
//取 % 条数根据实际情况修改
if (i > 0 && i % 3 == 0) {
System.out.println("线程"+threadNo+"更新用户:"+userList.size()+" 个成功");
//采用jdbcTemplate 批量写入数据库
// TODO 写入数据中
userList = new ArrayList<UserBean>();
} else if (i == userBeans.size() - 1) {
//处理最后一批数据提交
System.out.println("线程"+threadNo+"更新用户:"+userList.size()+" 个成功");
// TODO 写入数据中
userList = new ArrayList<UserBean>();
}
}
return String.valueOf(userBeans.size());
}
};
}
}
运行后返回结果:
线程0更新用户:4 个成功
线程0更新用户:3 个成功
线程0更新用户:3 个成功
线程1更新用户:4 个成功
线程1更新用户:1 个成功
future result: 10
future result: 5
标签:CSV super-csv 批量处理 并发处理 csv大数据
原文地址:http://blog.51cto.com/ciyorecord/2104385