码迷,mamicode.com
首页 > 其他好文 > 详细

Hadoop小文件问题及解决方案

时间:2015-12-15 12:11:21      阅读:248      评论:0      收藏:0      [点我收藏+]

标签:

1.概述

小文件是指文件size小于HDFS上block大小的文件。这样的文件会给hadoop的扩展性和性能带来严重问题。首先,在HDFS中,任何block,文件或者目录在内存中均以对象的形式存储,每个对象约占150byte,如果有1千万个小文件,每个文件占用一个block,则NameNode大约需要2G空间。如果存储一亿个文件,则NameNode需要20G空间。这样NameNode内存容量严重制约了集群的扩展。其次,访问大量小文件速度远远小于访问几个大文件。HDFS最初是为流式访问大文件开发的,如果访问大量小文件,需要不断的从一个DataNode跳到另外一个DataNode,严重影响性能。最后,处理大量小文件速度远远小于处理同等大小的大文件的速度。每一个小文件要占用一个slot,而task启动将耗费大量时间甚至大部分时间都耗费在启动task和释放task上。

 

2.HDFS文件读写流程

在正式介绍HDFS小文件存储方案之前,我们先介绍一下当前HDFS上文件存取的基本流程。

2.1 读文件流程

A.client端发送读文件请求给NameNode,如果文件不存在,返回错误信息,否则,将该文件对应的block机器所在DataNode位置发送给client。

B.client收到文件位置信息后,与不同DataNode建立socket连接并行获取数据。

2.2 写文件流程

A.client端发送写文件请求,NameNode检查文件是否存在,如果已经存在,直接返回错误信息,否则,发送给client一些可用节点。

B.client将文件分块,并行存储到不同DataNode节点上,发送完成以后,client同时发送信息给NameNode和DataNode。

C.NameNode收到client的信息后,发送信息给DataNode。

D.DataNode同时收到NameNode和DataNode的确认信息后,提交写操作。

 

3 解决小文件的方案

3.1 编写应用程序实现

 

[java] view plaincopy
 
  1. public class AppForSmallFile {  
  2.   
  3.     //定义文件读取的路径  
  4.     private static final String OUTPATH = "hdfs://liaozhongmin:9000";  
  5.           
  6.     public static void main(String[] args) {  
  7.           
  8.         //定义FSDataOutputStream对象  
  9.         FSDataOutputStream fsDataoutputStream = null;  
  10.         //定义输入流读文件  
  11.         InputStreamReader inputStreamReader = null;  
  12.         try {  
  13.             //创建合并后文件存储的的路径  
  14.             Path path = new Path(OUTPATH + "/combinedFile");  
  15.               
  16.             //创建FSDataOutputStream对象  
  17.             fsDataoutputStream =  FileSystem.get(path.toUri(), new Configuration()).create(path);  
  18.               
  19.             //创建要合并的小文件路径  
  20.             File sourceDir = new File("C:\\Windows\\System32\\drivers\\etc");  
  21.               
  22.             //遍历小文件  
  23.             for (File fileName : sourceDir.listFiles()){  
  24.                   
  25.                 //创建输入流  
  26.                 //fileInputStream = new FileInputStream(fileName.getAbsolutePath());  
  27.                 //只有这样才可以制定字符编码(没办法,Window是默认GBK的,Hadoop是默认UTF-8的,所以读的时候就会乱码)  
  28.                 inputStreamReader = new InputStreamReader(new FileInputStream(fileName), "gbk");  
  29.                 //一行一行的读取  
  30.                 List<String> readLines = IOUtils.readLines(inputStreamReader);  
  31.                   
  32.                 //然后再写出去  
  33.                 for (String line : readLines){  
  34.                     //写入一行  
  35.                     fsDataoutputStream.write(line.getBytes());  
  36.                     //写入一个换行符  
  37.                     fsDataoutputStream.write("\n".getBytes());  
  38.                 }  
  39.                   
  40.             }  
  41.               
  42.             System.out.println("合并成功");  
  43.         } catch (Exception e) {  
  44.             e.printStackTrace();  
  45.         } finally{  
  46.             try {  
  47.                 inputStreamReader.close();  
  48.                 fsDataoutputStream.close();  
  49.             } catch (IOException e) {  
  50.                 e.printStackTrace();  
  51.             }  
  52.               
  53.         }  
  54.           
  55.     }  
  56. }  

注:这种方案是使用java文件相关操作,将众多的小文件写到一个文件中。

 

 

3.2 使用archive工具

 

[java] view plaincopy
 
  1. 创建文件 hadoop archive -archiveName xxx.har -p  /src  /dest  
  2. 查看内部结构 hadoop fs -lsr /dest/xxx.har  
  3. 查看内容 hadoop fs -lsr har:///dest/xxx.har  

 

 

3.3 使用SequenceFile或者MapFile(以SequenceFile为例)

 

提供两种将小文件打成SequenceFile的方法:
方法一:

 

[java] view plaincopy
 
  1. public class WriteSequenceMapReduce {  
  2.     // 定义输入路径  
  3.         private static final String INPUT_PATH = "hdfs://master:9000/files";  
  4.         // 定义输出路径  
  5.         private static final String OUT_PATH = "hdfs://master:9000/seq/";  
  6.         //定义文件系统  
  7.         private static FileSystem fileSystem = null;  
  8.           
  9.         public static void main(String[] args) {  
  10.   
  11.             try {  
  12.                 // 创建配置信息  
  13.                 Configuration conf = new Configuration();  
  14.   
  15.                 // 创建文件系统  
  16.                 fileSystem = FileSystem.get(new URI(OUT_PATH), conf);  
  17.                 // 如果输出目录存在,我们就删除  
  18.                 if (fileSystem.exists(new Path(OUT_PATH))) {  
  19.                     fileSystem.delete(new Path(OUT_PATH), true);  
  20.                 }  
  21.   
  22.                 // 创建任务  
  23.                 Job job = new Job(conf, WriteSequenceMapReduce.class.getName());  
  24.   
  25.                 // 1.1 设置输入目录和设置输入数据格式化的类  
  26.                 FileInputFormat.setInputPaths(job, INPUT_PATH);  
  27.                 // 1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型  
  28.                 job.setMapperClass(WriteSequenceMapper.class);  
  29.                 // 2.3 指定输出的路径和设置输出的格式化类  
  30.                 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));  
  31.                 // 提交作业 退出  
  32.                 System.exit(job.waitForCompletion(true) ? 0 : 1);  
  33.   
  34.             } catch (Exception e) {  
  35.                 e.printStackTrace();  
  36.             }  
  37.         }  
  38.   
  39.       
  40.     public static class WriteSequenceMapper extends Mapper<LongWritable, Text, Text, BytesWritable> {  
  41.         // 定义SequenceFile.Reader对象用于读文件  
  42.         private static SequenceFile.Writer writer = null;  
  43.         // 定义配置信息  
  44.         private static Configuration conf = null;  
  45.         // 定义最终输出的key和value  
  46.         private Text outkey = new Text();  
  47.         private BytesWritable outValue = new BytesWritable();  
  48.         //定义要合并的文件(存放在数组中)  
  49.         private FileStatus[] files = null;  
  50.         //定义输入流和一个字节数组  
  51.         private InputStream inputStream = null;  
  52.         private byte[] buffer = null;  
  53.           
  54.         @Override  
  55.         protected void setup(Mapper<LongWritable, Text, Text, BytesWritable>.Context context) throws IOException, InterruptedException {  
  56.             try {  
  57.                 // 创建配置信息  
  58.                 conf = new Configuration();  
  59.                 // 创建Path对象  
  60.                 Path path = new Path(INPUT_PATH);  
  61.                 // 创建SequenceFile.Writer对象,并指定压缩格式  
  62.                 writer = SequenceFile.createWriter(fileSystem,conf, new Path(OUT_PATH+"/total.seq"), Text.class, BytesWritable.class, CompressionType.BLOCK, new BZip2Codec());  
  63.                 //writer = SequenceFile.createWriter(fileSystem,conf, new Path(OUT_PATH+"/total.seq"), Text.class, BytesWritable.class);  
  64.                 //获取要合并的文件数组  
  65.                 files = fileSystem.listStatus(path);  
  66.                   
  67.             } catch (Exception e) {  
  68.                 e.printStackTrace();  
  69.             }  
  70.         }  
  71.   
  72.         @Override  
  73.         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, BytesWritable>.Context context) throws IOException, InterruptedException {  
  74.               
  75.   
  76.             //遍历文件数组  
  77.             for (int i=0; i<files.length; i++){  
  78.                 //将文件名作为输出的key  
  79.                 outkey.set(files[i].getPath().toString());  
  80.                   
  81.                 //创建输入流  
  82.                 inputStream = fileSystem.open(files[i].getPath());  
  83.                 //创建字节数组  
  84.                 buffer = new byte[(int) files[i].getLen()];  
  85.                 //通过工具类将文件读到字节数组中  
  86.                 IOUtils.readFully(inputStream, buffer, 0, buffer.length);  
  87.                 //将字节数组中的内容及单个文件的内容作为value输出  
  88.                 outValue.set(new BytesWritable(buffer));  
  89.                   
  90.                 //关闭输入流  
  91.                 IOUtils.closeStream(inputStream);  
  92.                   
  93.                 //将结果写到Sequencefile中  
  94.                 writer.append(outkey, outValue);  
  95.                   
  96.             }  
  97.               
  98.             //关闭流  
  99.             IOUtils.closeStream(writer);  
  100.               
  101.             //System.exit(0);  
  102.         }  
  103.   
  104.     }  
  105. }  


方法二:自定义InputFormat和RecordReader实现

 

 

[java] view plaincopy
 
  1. public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{  
  2.   
  3.     @Override  
  4.     public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {  
  5.         //创建自定义的RecordReader  
  6.         WholeFileRecordReader reader = new WholeFileRecordReader();  
  7.           
  8.         reader.initialize(split, context);  
  9.           
  10.         return reader;  
  11.     }  
  12.       
  13.     @Override  
  14.     protected boolean isSplitable(JobContext context, Path filename) {  
  15.           
  16.         return false;  
  17.     }  
  18.   
  19. }  

 

[java] view plaincopy
 
  1. public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable>{  
  2.   
  3.     private FileSplit fileSplit;  
  4.     private Configuration conf;  
  5.     private BytesWritable value = new BytesWritable();  
  6.     private boolean processed = false;  
  7.       
  8.       
  9.     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException{  
  10.         this.fileSplit = (FileSplit) split;  
  11.         this.conf = context.getConfiguration();  
  12.           
  13.     }  
  14.       
  15.     /** 
  16.      * process表示记录是否已经被处理过了 
  17.      */  
  18.     @Override  
  19.     public boolean nextKeyValue() throws IOException, InterruptedException {  
  20.         if (!processed){  
  21.             byte[] contents = new byte[(int) fileSplit.getLength()];  
  22.             //获取路径  
  23.             Path file = fileSplit.getPath();  
  24.             //创建文件系统  
  25.             FileSystem fileSystem = file.getFileSystem(conf);  
  26.             FSDataInputStream in = null;  
  27.             try {  
  28.                 //打开文件  
  29.                 in = fileSystem.open(file);  
  30.                 //将file文件中的内容放入contents数组中。使用了IOUtils工具类的readFully()方法,将in流中的内容读到contents字节数组中  
  31.                 IOUtils.readFully(in, contents, 0, contents.length);  
  32.                 //BytesWritable是一个可用做key或value的字节序列,而ByteWritable是单个字节  
  33.                 //将value的内容设置为contents的值  
  34.                 value.set(contents, 0, contents.length);  
  35.                   
  36.             } catch (Exception e) {  
  37.                 e.printStackTrace();  
  38.             } finally{  
  39.                 IOUtils.closeStream(in);  
  40.             }  
  41.               
  42.             processed = true;  
  43.             return true;  
  44.         }  
  45.         return false;  
  46.     }  
  47.   
  48.     @Override  
  49.     public NullWritable getCurrentKey() throws IOException, InterruptedException {  
  50.         return NullWritable.get();  
  51.     }  
  52.   
  53.     @Override  
  54.     public BytesWritable getCurrentValue() throws IOException, InterruptedException {  
  55.         return value;  
  56.     }  
  57.   
  58.     @Override  
  59.     public float getProgress() throws IOException, InterruptedException {  
  60.           
  61.         return processed ? 1.0f : 0.0f;  
  62.     }  
  63.   
  64.     @Override  
  65.     public void close() throws IOException {  
  66.   
  67.         //do nothing  
  68.     }  
  69.   
  70.   
  71.   
  72. }  

 

[java] view plaincopy
 
  1. public class SmallFilesToSequenceFileConverter {  
  2.       
  3.       
  4.     // 定义输入路径  
  5.     private static final String INPUT_PATH = "hdfs://master:9000/files/*";  
  6.     // 定义输出路径  
  7.     private static final String OUT_PATH = "hdfs://<span style="font-family: Arial, Helvetica, sans-serif;">master</span>:9000/seq/total.seq";  
  8.       
  9.     public static void main(String[] args) {  
  10.         try {  
  11.             // 创建配置信息  
  12.             Configuration conf = new Configuration();  
  13.   
  14.             // 创建文件系统  
  15.             FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);  
  16.             // 如果输出目录存在,我们就删除  
  17.             if (fileSystem.exists(new Path(OUT_PATH))) {  
  18.                 fileSystem.delete(new Path(OUT_PATH), true);  
  19.             }  
  20.   
  21.             // 创建任务  
  22.             Job job = new Job(conf, SmallFilesToSequenceFileConverter.class.getName());  
  23.   
  24.             //1.1   设置输入目录和设置输入数据格式化的类  
  25.             FileInputFormat.addInputPaths(job, INPUT_PATH);  
  26.             job.setInputFormatClass(WholeFileInputFormat.class);  
  27.   
  28.             //1.2   设置自定义Mapper类和设置map函数输出数据的key和value的类型  
  29.             job.setMapperClass(SequenceFileMapper.class);  
  30.             job.setMapOutputKeyClass(Text.class);  
  31.             job.setMapOutputValueClass(BytesWritable.class);  
  32.   
  33.             //1.3   设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)  
  34.             job.setPartitionerClass(HashPartitioner.class);  
  35.             //千万不要有这句话,否则单个小文件的内容会输出到单独的一个Sequencefile文件中(简直内伤)  
  36.             //job.setNumReduceTasks(0);  
  37.   
  38.             FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));  
  39.             job.setOutputFormatClass(SequenceFileOutputFormat.class);  
  40.               
  41.             // 此处的设置是最终输出的key/value,一定要注意!  
  42.             job.setOutputKeyClass(Text.class);  
  43.             job.setOutputValueClass(BytesWritable.class);  
  44.               
  45.             // 提交作业 退出  
  46.             System.exit(job.waitForCompletion(true) ? 0 : 1);  
  47.           
  48.         } catch (Exception e) {  
  49.             e.printStackTrace();  
  50.         }  
  51.     }  
  52.   
  53.     public static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {  
  54.         // 定义文件的名称作为key  
  55.         private Text fileNameKey = null;  
  56.   
  57.         /**  
  58.          * task调用之前,初始化fileNameKey  
  59.          */  
  60.         @Override  
  61.         protected void setup(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context) throws IOException, InterruptedException {  
  62.             // 获取分片  
  63.             InputSplit split = context.getInputSplit();  
  64.             // 获取输入目录  
  65.             Path path = ((FileSplit) split).getPath();  
  66.             // 设置fileNameKey  
  67.             fileNameKey = new Text(path.toString());  
  68.         }  
  69.   
  70.         @Override  
  71.         protected void map(NullWritable key, BytesWritable value, Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context) throws IOException,  
  72.                 InterruptedException {  
  73.             // 将fileNameKey作为输出的key(文件名),value作为输出的value(单个小文件的内容)  
  74.             System.out.println(fileNameKey.toString());  
  75.             context.write(fileNameKey, value);  
  76.         }  
  77.     }  
  78. }  

注:方法二的这三个类可以实现将小文件写到一个SequenceFile中。

 

 

读取SequenceFile文件:

 

[java] view plaincopy
 
  1. public class ReadSequenceMapReduce {  
  2.     // 定义输入路径  
  3.     private static final String INPUT_PATH = "hdfs://master:9000/seq/total.seq";  
  4.     // 定义输出路径  
  5.     private static final String OUT_PATH = "hdfs://<span style="font-family: Arial, Helvetica, sans-serif;">master</span>:9000/seq/out";  
  6.     //定义文件系统  
  7.     private static FileSystem fileSystem = null;  
  8.       
  9.     public static void main(String[] args) {  
  10.   
  11.         try {  
  12.             // 创建配置信息  
  13.             Configuration conf = new Configuration();  
  14.   
  15.             // 创建文件系统  
  16.             fileSystem = FileSystem.get(new URI(OUT_PATH), conf);  
  17.             // 如果输出目录存在,我们就删除  
  18.             if (fileSystem.exists(new Path(OUT_PATH))) {  
  19.                 fileSystem.delete(new Path(OUT_PATH), true);  
  20.             }  
  21.   
  22.             // 创建任务  
  23.             Job job = new Job(conf, ReadSequenceMapReduce.class.getName());  
  24.   
  25.             // 1.1 设置输入目录和设置输入数据格式化的类  
  26.             FileInputFormat.setInputPaths(job, INPUT_PATH);  
  27.             // 这个很重要,指定使用SequenceFileInputFormat类来处理我们的输入文件  
  28.             job.setInputFormatClass(SequenceFileInputFormat.class);  
  29.   
  30.             // 1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型  
  31.             job.setMapperClass(ReadSequenceMapper.class);  
  32.             job.setMapOutputKeyClass(Text.class);  
  33.             job.setMapOutputValueClass(Text.class);  
  34.   
  35.             // 1.3 设置分区和reduce数量  
  36.             job.setPartitionerClass(HashPartitioner.class);  
  37.             job.setNumReduceTasks(0);  
  38.   
  39.             // 最终输出的类型  
  40.             job.setOutputKeyClass(Text.class);  
  41.             job.setOutputValueClass(Text.class);  
  42.   
  43.             // 2.3 指定输出的路径和设置输出的格式化类  
  44.             FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));  
  45.             job.setOutputFormatClass(TextOutputFormat.class);  
  46.   
  47.             // 提交作业 退出  
  48.             System.exit(job.waitForCompletion(true) ? 0 : 1);  
  49.   
  50.         } catch (Exception e) {  
  51.             e.printStackTrace();  
  52.         }  
  53.     }  
  54.   
  55.     public static class ReadSequenceMapper extends Mapper<Text, BytesWritable, Text, Text> {  
  56.   
  57.         //定义SequenceFile.Reader对象用于读文件  
  58.         private static SequenceFile.Reader reader = null;  
  59.         //定义配置信息  
  60.         private static Configuration conf = null;  
  61.         //定义最终输出的value  
  62.         private Text outValue = new Text();  
  63.           
  64.         /** 
  65.          * 在setUp()函数中初始化相关对象 
  66.          */  
  67.         @Override  
  68.         protected void setup(Mapper<Text, BytesWritable, Text, Text>.Context context) throws IOException, InterruptedException {  
  69.             try {  
  70.                 // 创建配置信息  
  71.                 conf = new Configuration();  
  72.                 // 创建文件系统  
  73.                 //FileSystem fs = FileSystem.get(new URI(INPUT_PATH), conf);  
  74.                 // 创建Path对象  
  75.                 Path path = new Path(INPUT_PATH);  
  76.                 // 创建SequenceFile.Reader对象  
  77.                 reader = new SequenceFile.Reader(fileSystem, path, conf);  
  78.                   
  79.             } catch (Exception e) {  
  80.                 e.printStackTrace();  
  81.             }  
  82.   
  83.         }  
  84.   
  85.         @Override  
  86.         protected void map(Text key, BytesWritable value, Mapper<Text, BytesWritable, Text, Text>.Context context) throws IOException, InterruptedException {  
  87.               
  88.             if (!"".equals(key.toString())  && !"".equals(value.get())){  
  89.                   
  90.                 //设置输出的value  
  91.                 outValue.set(new String(value.getBytes(), 0, value.getLength()));  
  92.                 //把结果写出去  
  93.                 context.write(key, outValue);  
  94.             }  
  95.                   
  96.           
  97.         }  
  98.     }  
  99. }  

 

 

3.4 使用CombineFileInputFormat

 

详细内容见:http://shiyanjun.cn/archives/299.html

Hadoop小文件问题及解决方案

标签:

原文地址:http://www.cnblogs.com/thinkpad/p/5047700.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!