标签:java nio serversocketchannel selector socketchannel 服务器传文件
说在前面:给我的需求是实现从服务器A将大量文件(大概几十TB)发送到服务器B,在A服务器生成文件的MD5码,并在服务器B进行md5验证,验证通过保存。
我的实现思路:
将待上传的所有文件目录生成一个txt文件,格式如下。前缀中,当后面的数字等于9999的时候,前面的数字会自行相加。(此处加前缀的目的是为了整齐,而且失败了便于查询。)
AAA0000:D:\upload\addChannel.html
AAA0001:D:\upload\addChannel2.html
AAA0002:D:\upload\addContactPerson.html
AAA0003:D:\upload\admin.html
AAA0004:D:\upload\businessOfChannel.html
....
AAA9999:D:\upload\admin1.html
AAB0000:D:\upload\businessOfChannel1.html
...
每次读取一条目录,进行上传。
本地测试版未去实现的部分,没有把成功和失败的目录写到文件中,也没有添加日志。
第一部分:将文件目录存储到文本中,文件夹不进行存储。
import java.io.File; import java.io.FileOutputStream; public class ReadAllPaths { private static final String rootPath="D:/upload/"; //the root path of the files which will be copied private static final String filePath="G:/temp/unUploadedFilePath.txt";//the record of all files path /* * the items of prefix and num construct the path prefix,for example AAA0001 * and it‘s mainly convenient for searching */ private String prefix="AAA"; private int num=0; /** * main * @param args * @throws Exception */ public static void main(String[] args) throws Exception { ReadAllPaths paths=new ReadAllPaths(); File file=new File(filePath); if(file.exists()){ file.delete(); } FileOutputStream out=new FileOutputStream(file,true); paths.getAllPaths(rootPath, out); out.close(); } /** * get all path out * @param root * @param out * @throws Exception */ private void getAllPaths(String root,FileOutputStream out) throws Exception{ File file=new File(root); if(file.isDirectory()){ try{if(file.list().length==0){ return; }else{ String[] files=file.list(); for(String f:files){ getAllPaths(root+f+File.separator, out); } } }catch(NullPointerException npe){ return; } }else{ String pathNum=getPathNum(); String path=file.getAbsolutePath(); out.write((pathNum+":"+path+"\n").getBytes()); } } /** * get the path prefix * @return */ private String getPathNum(){ StringBuilder sb=new StringBuilder(); sb.append(getPrefix()).append(getNum()); setNum(); return sb.toString(); } /** * get the String prefix of path prefix * @return */ private String getPrefix() { return prefix; } /** * set the String prefix of path prefix * for example:AAA AAB AAC....AAZ ABA....AZZ BAA... */ private void setPrefix() { char[] ch=new char[3]; ch=getPrefix().toCharArray(); if(ch[2]!=‘Z‘){ ch[2]++; }else{ ch[2]=‘A‘; if(ch[1]!=‘Z‘){ ch[1]++; }else{ ch[1]=‘A‘; ch[0]++; } } prefix=new String(ch); } /** * get the int prefix of path prefix * @return */ private String getNum() { StringBuffer sb=new StringBuffer(); if(num<10){ sb.append("000").append(num); }else if(num<100){ sb.append("00").append(num); }else if(num<1000){ sb.append("0").append(num); }else{ sb.append(num); } return sb.toString(); } /** * set the int prefix of path prefix * and the max num is 9999 and the min is 0000 */ private void setNum() { if(num<9999){ num++; }else{ num=0; setPrefix(); } } }
第二部分,服务器端代码
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.util.Iterator; public class Server { Selector selector = null; ServerSocketChannel serverSocketChannel = null; private NioserverHandler2 handler; public Server() throws IOException { selector = Selector.open(); // 打开服务器套接字通道 serverSocketChannel = ServerSocketChannel.open(); // 调整通道的阻塞模式非阻塞 serverSocketChannel.configureBlocking(false); //serverSocketChannel.socket().setReuseAddress(true); serverSocketChannel.socket().bind(new InetSocketAddress(9999)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } public Server(NioserverHandler2 handler) throws IOException { this(); this.handler = handler; while (selector.select() > 0) { Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey s = it.next(); it.remove(); this.handler.excute((ServerSocketChannel) s.channel()); } } } public static void main(String[] args) throws IOException { new Server(new NioserverHandler2()); } } public class NioserverHandler2 { private final static String DIRECTORY = "G:\\NioRequest\\"; /** * 这里边我们处理接收和发送 * * @param serverSocketChannel */ public void excute(ServerSocketChannel serverSocketChannel) { SocketChannel socketChannel = null; try { socketChannel = serverSocketChannel.accept(); // 等待客户端连接 RequestObject2 requestObject = receiveData(socketChannel);// 接数据 // logger.log(Level.INFO,requestObject.toString()); String md5 = DigestUtils.md5Hex(requestObject.getContents()); String response = ""; if (md5.equals(requestObject.getMd5())) { response = (new ResponseObject("succeed", requestObject.getAbsolutePath(), "")).toString(); File file = new File(DIRECTORY + requestObject.getRelativePath()); if (!file.exists()) { file.mkdirs(); } File file1 = new File(DIRECTORY + requestObject.getRelativePath() + requestObject.getFilename()); if (!file1.exists()) { file1.createNewFile(); } FileOutputStream fos = new FileOutputStream(file1); fos.write(requestObject.getContents()); fos.close(); } else { response = (new ResponseObject("failed", requestObject.getAbsolutePath(), "md5验证失败")).toString(); } System.out.println(response); responseData(socketChannel, response); // logger.log(Level.INFO, response); } catch (IOException e) { e.printStackTrace(); } } /** * <p> * 读取通道中的数据到Object里去 * </p> * * @param socketChannel * @return * @throws IOException */ public RequestObject2 receiveData(SocketChannel socketChannel) throws IOException { // 文件名 String fileName = null; String relativePath = null; String absolutePath = null; String md5 = null; // 文件长度 int contentLength = 0; // 文件内容 byte[] contents = null; // 由于我们解析时前4个字节是文件名长度 int capacity = 4; ByteBuffer buf = ByteBuffer.allocate(capacity); int size = 0; byte[] bytes = null; // 拿到文件名的长度 size = socketChannel.read(buf); if (size >= 0) { buf.flip(); capacity = buf.getInt(); buf.clear(); } // 拿文件名,相信文件名一次能够读完,如果你文件名超过1K 你有病了 buf = ByteBuffer.allocate(capacity); size = socketChannel.read(buf); if (size >= 0) { buf.flip(); bytes = new byte[size]; buf.get(bytes); buf.clear(); } String fileInfo = new String(bytes); System.out.println(fileInfo); fileName = fileInfo.split(";")[0]; relativePath = fileInfo.split(";")[1]; absolutePath = fileInfo.split(";")[2]; md5 = fileInfo.split(";")[3]; // 拿到文件长度 capacity = 4; buf = ByteBuffer.allocate(capacity); size = socketChannel.read(buf); if (size >= 0) { buf.flip(); // 文件长度是可要可不要的,如果你要做校验可以留下 capacity = buf.getInt(); buf.clear(); } if (capacity == 0) { contents = new byte[] {}; } else { // 用于接收buffer中的字节数组 ByteArrayOutputStream baos = new ByteArrayOutputStream(); // 文件可能会很大 // capacity = 1024; buf = ByteBuffer.allocate(capacity); while ((size = socketChannel.read(buf)) >= 0) { buf.flip(); bytes = new byte[size]; buf.get(bytes); baos.write(bytes); buf.clear(); } contents = baos.toByteArray(); } RequestObject2 requestObject = new RequestObject2(fileName, relativePath, absolutePath, md5, contents); return requestObject; } private void responseData(SocketChannel socketChannel, String response) { ByteBuffer buffer = ByteBuffer.wrap(response.getBytes()); try { socketChannel.write(buffer); buffer.clear(); // 确认要发送的东西发送完了关闭output 不然它端接收时socketChannel.read(Buffer) // 很可能造成阻塞 ,可以把这个(L)注释掉,会发现客户端一直等待接收数据 socketChannel.socket().shutdownOutput();// (L) } catch (IOException e) { e.printStackTrace(); } } } import java.io.Serializable; public class RequestObject2 implements Serializable { private static final long serialVersionUID = 1L; private String filename; private String relativePath; private String absolutePath; private String md5; private byte[] contents; public RequestObject2(String filename, String relativePath, String absolutePath, String md5, byte[] contents) { this.filename = filename; this.relativePath = relativePath; this.absolutePath = absolutePath; this.md5 = md5; this.contents = contents; } public String getFilename() { return filename; } public String getRelativePath() { return relativePath; } public String getAbsolutePath() { return absolutePath; } public String getMd5() { return md5; } public byte[] getContents() { return contents; }
第三部分 客户端代码
import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; public class Client2 { private static final String unpath = "G:\\temp\\unUploadedFilePath.txt"; private static final String pathPre = "D:\\upload\\"; private static final String IPADDR = "127.0.0.1"; private static final int PORT = 9999; Selector selector; public Client2() throws IOException { selector = Selector.open(); new Thread(new SendDataRunnable()).start(); } private class SendDataRunnable implements Runnable { private ClientHandler handler; public SendDataRunnable() { handler = new ClientHandler(); } @Override public void run() { try { BufferedReader reader = new BufferedReader(new FileReader(new File(unpath))); String path = ""; while ((path = reader.readLine()) != null && path.length() != 0) { SocketChannel socketChannel; socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress(IPADDR, PORT)); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); handler.sendData(socketChannel, path, pathPre); String response = handler.receiveData(socketChannel); System.out.println(response); socketChannel.close(); } } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException { Client2 client = new Client2(); } } import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import org.apache.commons.codec.digest.DigestUtils; public class ClientHandler { public void sendData(SocketChannel socketChannel,String path,String pathPre)throws Exception{ System.out.println(path); String absoluteFilePath=getAbsoluteFilePath(path); String fileName=getFileName(absoluteFilePath); String relativeFilePath=getRelativeFilePath(absoluteFilePath, pathPre,fileName); System.out.println(absoluteFilePath); byte[] bytes=makeFileToBytes(absoluteFilePath); System.out.println(bytes.length); String md5=DigestUtils.md5Hex(bytes); String fileInfo=new StringBuffer() .append(fileName) .append(";") .append(relativeFilePath) .append(";") .append(path) .append(";") .append(md5) .toString(); System.out.println(fileInfo); ByteBuffer buffer = ByteBuffer.allocate(8 +fileInfo.getBytes().length+bytes.length); buffer.putInt(fileInfo.getBytes().length); buffer.put(fileInfo.getBytes()); buffer.putInt(bytes.length); buffer.put(ByteBuffer.wrap(bytes)); buffer.flip(); socketChannel.write(buffer); buffer.clear(); // 关闭输出流防止接受时阻塞,就是告诉接收方本次的内容已经发完了,你不用等了 socketChannel.socket().shutdownOutput(); } private String getAbsoluteFilePath(String path){ return path.substring(8); } private String getRelativeFilePath(String absoluteFilePath,String pathPre,String fileName){ return absoluteFilePath.substring(pathPre.length(),absoluteFilePath.length()-fileName.length()); } private String getFileName(String path){ return new File(path).getName(); } private byte[] makeFileToBytes(String filePath){ File file=new File(filePath); byte[] ret = null; try { FileInputStream in = new FileInputStream(file); ByteArrayOutputStream out = new ByteArrayOutputStream(4096); byte[] b = new byte[4096]; int n; while ((n = in.read(b)) != -1) { out.write(b, 0, n); } in.close(); out.close(); ret = out.toByteArray(); } catch (IOException e) { // log.error("helper:get bytes from file process error!"); e.printStackTrace(); } return ret; } public String receiveData(SocketChannel socketChannel) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); String response = ""; try { ByteBuffer buffer = ByteBuffer.allocate(1024); byte[] bytes; int count = 0; while ((count = socketChannel.read(buffer)) >= 0) { buffer.flip(); bytes = new byte[count]; buffer.get(bytes); baos.write(bytes); buffer.clear(); } bytes = baos.toByteArray(); response = new String(bytes, "UTF-8"); // socketChannel.socket().shutdownInput(); } finally { try { baos.close(); } catch (Exception ex) { } } return response; } }
/*至此全部完成,注释不够多,部分代码是从网上找的。后期有时间会补全注释的,或者下次直接上最终使用的代码*/
本文出自 “枫叶还没红” 博客,请务必保留此出处http://itlearninger.blog.51cto.com/12572641/1945045
标签:java nio serversocketchannel selector socketchannel 服务器传文件
原文地址:http://itlearninger.blog.51cto.com/12572641/1945045