标签:
1.概述
小文件是指文件size小于HDFS上block大小的文件。这样的文件会给hadoop的扩展性和性能带来严重问题。首先,在HDFS中,任何block,文件或者目录在内存中均以对象的形式存储,每个对象约占150byte,如果有1千万个小文件,每个文件占用一个block,则NameNode大约需要2G空间。如果存储一亿个文件,则NameNode需要20G空间。这样NameNode内存容量严重制约了集群的扩展。其次,访问大量小文件速度远远小于访问几个大文件。HDFS最初是为流式访问大文件开发的,如果访问大量小文件,需要不断的从一个DataNode跳到另外一个DataNode,严重影响性能。最后,处理大量小文件速度远远小于处理同等大小的大文件的速度。每一个小文件要占用一个slot,而task启动将耗费大量时间甚至大部分时间都耗费在启动task和释放task上。
2.HDFS文件读写流程
在正式介绍HDFS小文件存储方案之前,我们先介绍一下当前HDFS上文件存取的基本流程。
2.1 读文件流程
A.client端发送读文件请求给NameNode,如果文件不存在,返回错误信息,否则,将该文件对应的block机器所在DataNode位置发送给client。
B.client收到文件位置信息后,与不同DataNode建立socket连接并行获取数据。
2.2 写文件流程
A.client端发送写文件请求,NameNode检查文件是否存在,如果已经存在,直接返回错误信息,否则,发送给client一些可用节点。
B.client将文件分块,并行存储到不同DataNode节点上,发送完成以后,client同时发送信息给NameNode和DataNode。
C.NameNode收到client的信息后,发送信息给DataNode。
D.DataNode同时收到NameNode和DataNode的确认信息后,提交写操作。
3 解决小文件的方案
3.1 编写应用程序实现
- public class AppForSmallFile {
-
-
- private static final String OUTPATH = "hdfs://liaozhongmin:9000";
-
- public static void main(String[] args) {
-
-
- FSDataOutputStream fsDataoutputStream = null;
-
- InputStreamReader inputStreamReader = null;
- try {
-
- Path path = new Path(OUTPATH + "/combinedFile");
-
-
- fsDataoutputStream = FileSystem.get(path.toUri(), new Configuration()).create(path);
-
-
- File sourceDir = new File("C:\\Windows\\System32\\drivers\\etc");
-
-
- for (File fileName : sourceDir.listFiles()){
-
-
-
-
- inputStreamReader = new InputStreamReader(new FileInputStream(fileName), "gbk");
-
- List<String> readLines = IOUtils.readLines(inputStreamReader);
-
-
- for (String line : readLines){
-
- fsDataoutputStream.write(line.getBytes());
-
- fsDataoutputStream.write("\n".getBytes());
- }
-
- }
-
- System.out.println("合并成功");
- } catch (Exception e) {
- e.printStackTrace();
- } finally{
- try {
- inputStreamReader.close();
- fsDataoutputStream.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- }
-
- }
- }
注:这种方案是使用java文件相关操作,将众多的小文件写到一个文件中。
3.2 使用archive工具
- 创建文件 hadoop archive -archiveName xxx.har -p /src /dest
- 查看内部结构 hadoop fs -lsr /dest/xxx.har
- 查看内容 hadoop fs -lsr har:
3.3 使用SequenceFile或者MapFile(以SequenceFile为例)
提供两种将小文件打成SequenceFile的方法:
方法一:
- public class WriteSequenceMapReduce {
-
- private static final String INPUT_PATH = "hdfs://master:9000/files";
-
- private static final String OUT_PATH = "hdfs://master:9000/seq/";
-
- private static FileSystem fileSystem = null;
-
- public static void main(String[] args) {
-
- try {
-
- Configuration conf = new Configuration();
-
-
- fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
-
- if (fileSystem.exists(new Path(OUT_PATH))) {
- fileSystem.delete(new Path(OUT_PATH), true);
- }
-
-
- Job job = new Job(conf, WriteSequenceMapReduce.class.getName());
-
-
- FileInputFormat.setInputPaths(job, INPUT_PATH);
-
- job.setMapperClass(WriteSequenceMapper.class);
-
- FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
-
- System.exit(job.waitForCompletion(true) ? 0 : 1);
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
-
- public static class WriteSequenceMapper extends Mapper<LongWritable, Text, Text, BytesWritable> {
-
- private static SequenceFile.Writer writer = null;
-
- private static Configuration conf = null;
-
- private Text outkey = new Text();
- private BytesWritable outValue = new BytesWritable();
-
- private FileStatus[] files = null;
-
- private InputStream inputStream = null;
- private byte[] buffer = null;
-
- @Override
- protected void setup(Mapper<LongWritable, Text, Text, BytesWritable>.Context context) throws IOException, InterruptedException {
- try {
-
- conf = new Configuration();
-
- Path path = new Path(INPUT_PATH);
-
- writer = SequenceFile.createWriter(fileSystem,conf, new Path(OUT_PATH+"/total.seq"), Text.class, BytesWritable.class, CompressionType.BLOCK, new BZip2Codec());
-
-
- files = fileSystem.listStatus(path);
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Override
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, BytesWritable>.Context context) throws IOException, InterruptedException {
-
-
-
- for (int i=0; i<files.length; i++){
-
- outkey.set(files[i].getPath().toString());
-
-
- inputStream = fileSystem.open(files[i].getPath());
-
- buffer = new byte[(int) files[i].getLen()];
-
- IOUtils.readFully(inputStream, buffer, 0, buffer.length);
-
- outValue.set(new BytesWritable(buffer));
-
-
- IOUtils.closeStream(inputStream);
-
-
- writer.append(outkey, outValue);
-
- }
-
-
- IOUtils.closeStream(writer);
-
-
- }
-
- }
- }
方法二:自定义InputFormat和RecordReader实现
- public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{
-
- @Override
- public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
-
- WholeFileRecordReader reader = new WholeFileRecordReader();
-
- reader.initialize(split, context);
-
- return reader;
- }
-
- @Override
- protected boolean isSplitable(JobContext context, Path filename) {
-
- return false;
- }
-
- }
- public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable>{
-
- private FileSplit fileSplit;
- private Configuration conf;
- private BytesWritable value = new BytesWritable();
- private boolean processed = false;
-
-
- public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException{
- this.fileSplit = (FileSplit) split;
- this.conf = context.getConfiguration();
-
- }
-
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if (!processed){
- byte[] contents = new byte[(int) fileSplit.getLength()];
-
- Path file = fileSplit.getPath();
-
- FileSystem fileSystem = file.getFileSystem(conf);
- FSDataInputStream in = null;
- try {
-
- in = fileSystem.open(file);
-
- IOUtils.readFully(in, contents, 0, contents.length);
-
-
- value.set(contents, 0, contents.length);
-
- } catch (Exception e) {
- e.printStackTrace();
- } finally{
- IOUtils.closeStream(in);
- }
-
- processed = true;
- return true;
- }
- return false;
- }
-
- @Override
- public NullWritable getCurrentKey() throws IOException, InterruptedException {
- return NullWritable.get();
- }
-
- @Override
- public BytesWritable getCurrentValue() throws IOException, InterruptedException {
- return value;
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
-
- return processed ? 1.0f : 0.0f;
- }
-
- @Override
- public void close() throws IOException {
-
-
- }
-
-
-
- }
- public class SmallFilesToSequenceFileConverter {
-
-
-
- private static final String INPUT_PATH = "hdfs://master:9000/files/*";
-
- private static final String OUT_PATH = "hdfs://<span style="font-family: Arial, Helvetica, sans-serif;">master</span>:9000/seq/total.seq";
-
- public static void main(String[] args) {
- try {
-
- Configuration conf = new Configuration();
-
-
- FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
-
- if (fileSystem.exists(new Path(OUT_PATH))) {
- fileSystem.delete(new Path(OUT_PATH), true);
- }
-
-
- Job job = new Job(conf, SmallFilesToSequenceFileConverter.class.getName());
-
-
- FileInputFormat.addInputPaths(job, INPUT_PATH);
- job.setInputFormatClass(WholeFileInputFormat.class);
-
-
- job.setMapperClass(SequenceFileMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(BytesWritable.class);
-
-
- job.setPartitionerClass(HashPartitioner.class);
-
-
-
- FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
-
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(BytesWritable.class);
-
-
- System.exit(job.waitForCompletion(true) ? 0 : 1);
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
-
- private Text fileNameKey = null;
-
- /**
- * task调用之前,初始化fileNameKey
- */
- @Override
- protected void setup(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context) throws IOException, InterruptedException {
-
- InputSplit split = context.getInputSplit();
-
- Path path = ((FileSplit) split).getPath();
-
- fileNameKey = new Text(path.toString());
- }
-
- @Override
- protected void map(NullWritable key, BytesWritable value, Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context) throws IOException,
- InterruptedException {
-
- System.out.println(fileNameKey.toString());
- context.write(fileNameKey, value);
- }
- }
- }
注:方法二的这三个类可以实现将小文件写到一个SequenceFile中。
读取SequenceFile文件:
- public class ReadSequenceMapReduce {
-
- private static final String INPUT_PATH = "hdfs://master:9000/seq/total.seq";
-
- private static final String OUT_PATH = "hdfs://<span style="font-family: Arial, Helvetica, sans-serif;">master</span>:9000/seq/out";
-
- private static FileSystem fileSystem = null;
-
- public static void main(String[] args) {
-
- try {
-
- Configuration conf = new Configuration();
-
-
- fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
-
- if (fileSystem.exists(new Path(OUT_PATH))) {
- fileSystem.delete(new Path(OUT_PATH), true);
- }
-
-
- Job job = new Job(conf, ReadSequenceMapReduce.class.getName());
-
-
- FileInputFormat.setInputPaths(job, INPUT_PATH);
-
- job.setInputFormatClass(SequenceFileInputFormat.class);
-
-
- job.setMapperClass(ReadSequenceMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
-
-
- job.setPartitionerClass(HashPartitioner.class);
- job.setNumReduceTasks(0);
-
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
-
-
- FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
- job.setOutputFormatClass(TextOutputFormat.class);
-
-
- System.exit(job.waitForCompletion(true) ? 0 : 1);
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public static class ReadSequenceMapper extends Mapper<Text, BytesWritable, Text, Text> {
-
-
- private static SequenceFile.Reader reader = null;
-
- private static Configuration conf = null;
-
- private Text outValue = new Text();
-
-
- @Override
- protected void setup(Mapper<Text, BytesWritable, Text, Text>.Context context) throws IOException, InterruptedException {
- try {
-
- conf = new Configuration();
-
-
-
- Path path = new Path(INPUT_PATH);
-
- reader = new SequenceFile.Reader(fileSystem, path, conf);
-
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
-
- @Override
- protected void map(Text key, BytesWritable value, Mapper<Text, BytesWritable, Text, Text>.Context context) throws IOException, InterruptedException {
-
- if (!"".equals(key.toString()) && !"".equals(value.get())){
-
-
- outValue.set(new String(value.getBytes(), 0, value.getLength()));
-
- context.write(key, outValue);
- }
-
-
- }
- }
- }
3.4 使用CombineFileInputFormat
详细内容见:http://shiyanjun.cn/archives/299.html
Hadoop小文件问题及解决方案
标签:
原文地址:http://www.cnblogs.com/thinkpad/p/5047700.html