标签:hadoop hdfs api 合并小文件 copymerge
用fsck命令统计 查看HDFS上在某一天日志的大小,分块情况以及平均的块大小,即
[hduser@da-master jar]$ hadoop fsck /wcc/da/kafka/report/2015-01-11 DEPRECATED: Use of this script to execute hdfs command is deprecated. Instead use the hdfs command for it. 15/01/13 18:57:23 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Connecting to namenode via http://da-master:50070 FSCK started by hduser (auth:SIMPLE) from /172.21.101.30 for path /wcc/da/kafka/report/2015-01-11 at Tue Jan 13 18:57:24 CST 2015 .................................................................................................... .................................................................................................... ........................................Status: HEALTHY Total size: 9562516137 B Total dirs: 1 Total files: 240 Total symlinks: 0 Total blocks (validated): 264 (avg. block size 36221652 B) Minimally replicated blocks: 264 (100.0 %) Over-replicated blocks: 0 (0.0 %) Under-replicated blocks: 0 (0.0 %) Mis-replicated blocks: 0 (0.0 %) Default replication factor: 2 Average block replication: 2.0 Corrupt blocks: 0 Missing replicas: 0 (0.0 %) Number of data-nodes: 5 Number of racks: 1 FSCK ended at Tue Jan 13 18:57:24 CST 2015 in 14 milliseconds The filesystem under path '/wcc/da/kafka/report/2015-01-11' is HEALTHY
Date Time |
Total(GB) |
Total blocks |
AveBlockSize(MB) |
2014/12/21 |
9.39 |
268 |
36 |
2014/12/20 |
9.5 |
268 |
36 |
2014/12/19 |
8.89 |
268 |
34 |
2014/11/5 |
8.6 |
266 |
33 |
2014/10/1 |
9.31 |
268 |
36 |
分析问题的存在性:从表中可以看出,每天日志量的分块情况:总共大概有268左右的块数,平均块大小为36MB左右,远远不足128MB,这潜在的说明了一个问题。日志产生了很多小文件,大多数都不足128M,严重影响集群的扩展性和性能:首先,在HDFS中,任何block,文件或者目录在内存中均以对象的形式存储,每个对象约占150byte,如果有1000 0000个小文件,每个文件占用一个block,则namenode大约需要2G空间。如果存储1亿个文件,则namenode需要20G空间,这样namenode内存容量严重制约了集群的扩展; 其次,访问大量小文件速度远远小于访问几个大文件;HDFS最初是为流式访问大文件开发的,如果访问大量小文件,需要不断的从一个datanode跳到另一个datanode,严重影响性能;最后,处理大量小文件速度远远小于处理同等大小的大文件的速度,因为每一个小文件要占用一个slot,而task启动将耗费大量时间甚至大部分时间都耗费在启动task和释放task上,累计起来的总时长必然增加。我们采取的策略是先合并小文件,比如整理日志成user_report.tsv,client_report.tsv,AppLog_UserDevice.tsv, 再运行job。
可以调用API的 FileUtil工具类的方法copyMerge(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource,Configuration conf, String addString);
但是此方法并不适用,因为某一天日志存在着三种类型的日志,即:
要分别合并成三个文件user_report.tsv,client_report.tsv和AppLog_UserDevice.tsv,故必须重新实现copyMerge函数,先分析copyMerge源码:
/** Copy all files in a directory to one output file (merge). */ public static boolean copyMerge(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource, Configuration conf, String addString) throws IOException { //生成合并后的目标文件路径dstFile,文件名为srcDir.getName(),即源路径的目录名,因此这里我们不能自定义生成的日志文件名,非常不方便 dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false); //判断源路径是否为文件目录 if (!srcFS.getFileStatus(srcDir).isDirectory()) return false; //创建输出流,准备写文件 OutputStream out = dstFS.create(dstFile); try { // 得到每个源路径目录下的每个文件 FileStatus contents[] = srcFS.listStatus(srcDir); //排序操作 Arrays.sort(contents); for (int i = 0; i < contents.length; i++) { if (contents[i].isFile()) { //创建输入流,读文件 InputStream in = srcFS.open(contents[i].getPath()); try { //执行复制操作,写入到目标文件中 IOUtils.copyBytes(in, out, conf, false); if (addString!=null) out.write(addString.getBytes("UTF-8")); } finally { in.close(); } } } } finally { out.close(); } //若deleteSource为true,删除源路径目录下的每个文件 if (deleteSource) { return srcFS.delete(srcDir, true); } else { return true; } }
/** Copy corresponding files in a directory to related output file (merge). */ @SuppressWarnings("unchecked") public static boolean merge(FileSystem hdfs, Path srcDir, Path dstFile, boolean deleteSource, Configuration conf) throws IOException { if (!hdfs.getFileStatus(srcDir).isDirectory()) return false; // 得到每个源目录下的每个文件; FileStatus[] fileStatus = hdfs.listStatus(srcDir); // 三种不同类型的文件各自的文件路径存入不同的list; ArrayList<Path> userPaths = new ArrayList<Path>(); ArrayList<Path> clientPaths = new ArrayList<Path>(); ArrayList<Path> appPaths = new ArrayList<Path>(); for (FileStatus fileStatu : fileStatus) { Path filePath = fileStatu.getPath(); if (filePath.getName().startsWith("user_report")) { userPaths.add(filePath); } else if (filePath.getName().startsWith("client_report")) { clientPaths.add(filePath); } else if (filePath.getName().startsWith("AppLog_UserDevice")) { appPaths.add(filePath); } } // 分别写入到目标文件:user_report.tsv中 if (userPaths.size() > 0) { Path userDstFile = new Path(dstFile.toString() + "/user_report.tsv"); OutputStream out = hdfs.create(userDstFile); Collections.sort(userPaths); try { Iterator<Path> iterator = userPaths.iterator(); while (iterator.hasNext()) { InputStream in = hdfs.open(iterator.next()); try { IOUtils.copyBytes(in, out, conf, false); } finally { in.close(); } } } finally { out.close(); } } // 分别写入到目标文件:client_report.tsv中 if (clientPaths.size() > 0) { Path clientDstFile = new Path(dstFile.toString() + "/client_report.tsv"); OutputStream out = hdfs.create(clientDstFile); Collections.sort(clientPaths); try { Iterator<Path> iterator = clientPaths.iterator(); while (iterator.hasNext()) { InputStream in = hdfs.open(iterator.next()); try { IOUtils.copyBytes(in, out, conf, false); } finally { in.close(); } } } finally { out.close(); } } // 分别写入到目标文件:AppLog_UserDevice.tsv中 if (appPaths.size() > 0) { Path appDstFile = new Path(dstFile.toString() + "/AppLog_UserDevice.tsv"); OutputStream out = hdfs.create(appDstFile); Collections.sort(appPaths); try { Iterator<Path> iterator = appPaths.iterator(); while (iterator.hasNext()) { InputStream in = hdfs.open(iterator.next()); try { IOUtils.copyBytes(in, out, conf, false); } finally { in.close(); } } } finally { out.close(); } } if (deleteSource) { return hdfs.delete(srcDir, true); } return true; }
public static boolean mergeFiles(FileSystem hdfs, Path srcDir, Path dstFile, boolean deleteSource, Configuration conf) throws IOException { if (!hdfs.getFileStatus(srcDir).isDirectory()) return false; // 得到每个源目录下的每个文件; FileStatus[] fileStatus = hdfs.listStatus(srcDir); // 三种不同类型的文件各自合并 for (FileStatus fileStatu : fileStatus) { Path filePath = fileStatu.getPath(); Path dstPath = new Path(""); if (filePath.getName().startsWith("user_report")) { dstPath = new Path(dstFile.toString() + "/user_report.tsv"); } else if (filePath.getName().startsWith("client_report")) { dstPath = new Path(dstFile.toString() + "/client_report.tsv"); } else if (filePath.getName().startsWith("AppLog_UserDevice")) { dstPath = new Path(dstFile.toString() + "/client_report.tsv"); }else{ dstPath=new Path( "/error.tsv"); } OutputStream out = hdfs.create(dstPath); try { InputStream in = hdfs.open(filePath); try { IOUtils.copyBytes(in, out, conf, false); } finally { in.close(); } } finally { out.close(); } } if (deleteSource) { return hdfs.delete(srcDir, true); } return true; }
标签:hadoop hdfs api 合并小文件 copymerge
原文地址:http://blog.csdn.net/lviiii/article/details/42680249