标签:
1. 在本地文件系统生成一个文本文件,,读入文件,将其第101-120字节的内容写入HDFS成为一个新文件
2. 在HDFS中生成文本文件,读入这个文件,将其第101-120字节的内容写入本地文件系统成为一个新文件
环境部署:http://www.cnblogs.com/dopeter/p/4630791.html
FileBuilder.java
生成文件的工具类,包含在本地生成文件,在Hadoop生成文件,读取Hadoop指定目录的文件
1 package story; 2 3 import java.io.ByteArrayInputStream; 4 import java.io.ByteArrayOutputStream; 5 import java.io.FileNotFoundException; 6 import java.io.FileWriter; 7 import java.io.IOException; 8 import java.io.InputStream; 9 import java.io.OutputStream; 10 import java.io.PrintWriter; 11 import java.io.UnsupportedEncodingException; 12 import java.net.URI; 13 14 import org.apache.hadoop.conf.Configuration; 15 import org.apache.hadoop.fs.FileSystem; 16 import org.apache.hadoop.fs.Path; 17 import org.apache.hadoop.io.IOUtils; 18 import org.apache.hadoop.util.Progressable; 19 20 public class FileBuilder { 21 22 //build default test data 23 public static String BuildTestFileContent() 24 { 25 StringBuilder contentBuilder=new StringBuilder(); 26 27 for(int loop=0;loop<100;loop++) 28 contentBuilder.append(String.valueOf(loop)); 29 30 String content =contentBuilder.toString(); 31 32 return content; 33 } 34 35 //build local file 36 public static void BuildLocalFile(String buildPath,String content) throws FileNotFoundException, UnsupportedEncodingException 37 { 38 /* 39 FileWriter fileWriter; 40 try { 41 fileWriter = new FileWriter(buildPath); 42 43 fileWriter.write(content); 44 fileWriter.close(); 45 } catch (IOException e) { 46 e.printStackTrace(); 47 } 48 */ 49 50 51 52 PrintWriter out = new java.io.PrintWriter(new java.io.File(buildPath), "UTF-8"); 53 String text = new java.lang.String(content); 54 out.print(text); 55 out.flush(); 56 out.close(); 57 58 } 59 60 //upload file to hadoop 61 public static void BuildHdfsFile(String buildPath,byte[] fileContent) throws IOException 62 { 63 //convert to inputstream 64 InputStream inputStream=new ByteArrayInputStream(fileContent); 65 66 //hdfs upload 67 Configuration conf = new Configuration(); 68 69 FileSystem fs = FileSystem.get(URI.create(buildPath), conf); 70 OutputStream outputStream = fs.create(new Path(buildPath), new Progressable() { 71 public void progress() { 72 System.out.print("."); 73 } 74 }); 75 76 IOUtils.copyBytes(inputStream, outputStream, fileContent.length, true); 77 } 78 79 //wrapper for upload file 80 public static void BuildHdfsFile(String buildPath,String fileContent) throws IOException 81 { 82 BuildHdfsFile(buildPath,fileContent.getBytes()); 83 } 84 85 //download file from hadoop 86 public static byte[] ReadHdfsFile(String readPath)throws IOException 87 { 88 byte[] fileBuffer; 89 Configuration conf = new Configuration(); 90 FileSystem fs = FileSystem.get(URI.create(readPath), conf); 91 InputStream in = null; 92 ByteArrayOutputStream out=new ByteArrayOutputStream(); 93 try { 94 in = fs.open(new Path(readPath)); 95 IOUtils.copyBytes(in, out, 4096, false); 96 97 fileBuffer=out.toByteArray(); 98 } finally { 99 IOUtils.closeStream(in); 100 } 101 102 return fileBuffer; 103 } 104 105 }
FileContentHandler.java
文件内容的处理类,读取本地文件时设置起始Position与截取的长度,读取从Hadoop下载的文件时设置起始Position与截取的长度
1 package story; 2 3 import java.io.IOException; 4 import java.io.RandomAccessFile; 5 import java.io.UnsupportedEncodingException; 6 7 public class FileContentHandler { 8 public static byte[] GetContentByLocalFile(String filePath,long beginPosition,int readLength) 9 { 10 int readBufferSize=readLength; 11 byte[] readBuffer=new byte[readBufferSize]; 12 13 RandomAccessFile accessFile; 14 try { 15 accessFile=new RandomAccessFile (filePath,"r"); 16 long length=accessFile.length(); 17 System.out.println(length); 18 19 if(length>beginPosition&&length>beginPosition+readBufferSize) 20 { 21 accessFile.seek(beginPosition); 22 accessFile.read(readBuffer); 23 accessFile.close(); 24 } 25 } catch ( IOException e) { 26 // TODO Auto-generated catch block 27 e.printStackTrace(); 28 } 29 30 return readBuffer; 31 } 32 33 public static String GetContentByBuffer(byte[] buffer,int beginPosition,int readLength) throws UnsupportedEncodingException 34 { 35 String content; 36 byte[] subBuffer=new byte[readLength]; 37 for(int position=0;position<readLength;position++) 38 subBuffer[position]=buffer[beginPosition+position]; 39 40 buffer=null; 41 42 content=new String(subBuffer,"UTF-8"); 43 System.out.println(content); 44 45 return content; 46 } 47 48 }
UploadStory.java
1的流程代码
1 package story; 2 3 public class UploadStory { 4 5 //public static void main(String[] args) throws Exception {} 6 7 public static void main(String[] args) throws Exception { 8 //also define value of parameter from arguments. 9 String localFilePath="F:/bulid.txt"; 10 String hdfsFilePath="hdfs://hmaster0:9000/user/14699_000/input/build.txt"; 11 int readBufferSize=20; 12 long fileBeginReadPosition=101; 13 14 //upload story begin. 15 16 //build local file 17 FileBuilder.BuildLocalFile(localFilePath,FileBuilder.BuildTestFileContent()); 18 //read file 19 byte[] uploadBuffer=FileContentHandler.GetContentByLocalFile(localFilePath, fileBeginReadPosition, readBufferSize); 20 //upload 21 if(uploadBuffer!=null&&uploadBuffer.length>0) 22 FileBuilder.BuildHdfsFile(hdfsFilePath, uploadBuffer); 23 24 } 25 26 }
DownloadStory.java
2的流程代码
1 package story; 2 3 public class DownloadStory { 4 5 //public static void main(String[] args) throws Exception { } 6 7 8 public static void main(String[] args) throws Exception { 9 //also define value of parameter from arguments. 10 String localFilePath="F:/bulid.txt"; 11 String hdfsFilePath="hdfs://hmaster0:9000/user/14699_000/input/build2.txt"; 12 int readBufferSize=20; 13 int fileBeginReadPosition=101; 14 15 //build file to hadoop 16 FileBuilder.BuildHdfsFile(hdfsFilePath, FileBuilder.BuildTestFileContent()); 17 18 //download file 19 byte[] readBuffer=FileBuilder.ReadHdfsFile(hdfsFilePath); 20 21 //handle buffer 22 String content=FileContentBuilder.GetContentByBuffer(readBuffer, fileBeginReadPosition, readBufferSize); 23 24 //write to local file 25 FileBuilder.BuildLocalFile(localFilePath, content); 26 } 27 28 }
标签:
原文地址:http://www.cnblogs.com/dopeter/p/4631840.html