标签:
连接池实现
1 package com.ccqtgb; 2 3 import java.io.IOException; 4 import java.net.InetSocketAddress; 5 import java.util.concurrent.ArrayBlockingQueue; 6 import java.util.concurrent.ConcurrentHashMap; 7 import java.util.concurrent.TimeUnit; 8 9 import org.csource.fastdfs.ClientGlobal; 10 import org.csource.fastdfs.StorageClient; 11 import org.csource.fastdfs.StorageClient1; 12 import org.csource.fastdfs.StorageServer; 13 import org.csource.fastdfs.TrackerClient; 14 import org.csource.fastdfs.TrackerGroup; 15 import org.csource.fastdfs.TrackerServer; 16 17 public class ConnectionPool { 18 19 //最大连接数,可以写配置文件 20 private int size = 5; 21 //被使用的连接 22 private ConcurrentHashMap<StorageClient1,Object> busyConnectionPool = null; 23 //空闲的连接 24 private ArrayBlockingQueue<StorageClient1> idleConnectionPool = null; 25 26 private Object obj = new Object(); 27 28 private static ConnectionPool instance = new ConnectionPool(); 29 30 public static ConnectionPool getConnectionPool(){ 31 return instance; 32 } 33 34 //取出连接 35 public StorageClient1 checkout(int waitTime){ 36 StorageClient1 storageClient1 = null; 37 try { 38 storageClient1 = idleConnectionPool.poll(waitTime, TimeUnit.SECONDS); 39 System.out.println(storageClient1); 40 if(storageClient1 != null){ 41 busyConnectionPool.put(storageClient1, obj); 42 } 43 } catch (InterruptedException e) { 44 // TODO Auto-generated catch block 45 storageClient1 = null; 46 e.printStackTrace(); 47 } 48 return storageClient1; 49 } 50 51 //回收连接 52 public void checkin(StorageClient1 storageClient1){ 53 if(busyConnectionPool.remove(storageClient1) != null){ 54 idleConnectionPool.add(storageClient1); 55 } 56 } 57 58 //如果连接无效则抛弃,新建连接来补充到池里 59 public void drop(StorageClient1 storageClient1){ 60 if(busyConnectionPool.remove(storageClient1) != null){ 61 TrackerServer trackerServer = null; 62 TrackerClient trackerClient = new TrackerClient(); 63 try { 64 trackerServer = trackerClient.getConnection(); 65 StorageClient1 newStorageClient1 = new StorageClient1(trackerServer,null); 66 idleConnectionPool.add(newStorageClient1); 67 System.out.println("------------------------- :connection +1"); 68 } catch (IOException e) { 69 // TODO Auto-generated catch block 70 e.printStackTrace(); 71 }finally{ 72 if(trackerServer != null){ 73 try { 74 trackerServer.close(); 75 } catch (IOException e) { 76 // TODO Auto-generated catch block 77 e.printStackTrace(); 78 } 79 } 80 } 81 } 82 } 83 84 //单例 85 private ConnectionPool(){ 86 busyConnectionPool = new ConcurrentHashMap<StorageClient1, Object>(); 87 idleConnectionPool = new ArrayBlockingQueue<StorageClient1>(size); 88 init(size); 89 } 90 91 //初始化连接池 92 private void init(int size){ 93 initClientGlobal(); 94 TrackerServer trackerServer = null; 95 try { 96 TrackerClient trackerClient = new TrackerClient(); 97 //只需要一个tracker server连接 98 trackerServer = trackerClient.getConnection(); 99 StorageServer storageServer = null; 100 StorageClient1 storageClient1 = null; 101 for(int i=0; i<size; i++){ 102 storageClient1 = new StorageClient1(trackerServer,storageServer); 103 idleConnectionPool.add(storageClient1); 104 System.out.println("------------------------- :connection +1"); 105 } 106 107 } catch (IOException e) { 108 // TODO Auto-generated catch block 109 e.printStackTrace(); 110 }finally{ 111 if(trackerServer != null){ 112 try { 113 trackerServer.close(); 114 } catch (IOException e) { 115 // TODO Auto-generated catch block 116 e.printStackTrace(); 117 } 118 } 119 } 120 } 121 122 //初始化客户端 123 private void initClientGlobal(){ 124 //连接超时时间 125 ClientGlobal.setG_connect_timeout(2000); 126 //网络超时时间 127 ClientGlobal.setG_network_timeout(3000); 128 ClientGlobal.setG_anti_steal_token(false); 129 // 字符集 130 ClientGlobal.setG_charset("UTF-8"); 131 ClientGlobal.setG_secret_key(null); 132 // HTTP访问服务的端口号 133 ClientGlobal.setG_tracker_http_port(8080); 134 135 InetSocketAddress[] trackerServers = new InetSocketAddress[2]; 136 trackerServers[0] = new InetSocketAddress("10.64.2.171",22122); 137 trackerServers[1] = new InetSocketAddress("10.64.2.172",22122); 138 TrackerGroup trackerGroup = new TrackerGroup(trackerServers); 139 //tracker server 集群 140 ClientGlobal.setG_tracker_group(trackerGroup); 141 } 142 143 144 }
客户端接口
1 package com.ccqtgb.client; 2 3 import java.io.File; 4 5 public interface FileClient { 6 7 public String uploadFile(File file); 8 9 public String uploadFile(File file, String name); 10 11 public String uploadFile(byte[] buff, String name); 12 13 }
客户端实现
1 package com.ccqtgb.client.impl; 2 3 import java.io.File; 4 import java.io.FileInputStream; 5 import java.io.FileNotFoundException; 6 import java.io.IOException; 7 8 import org.csource.common.NameValuePair; 9 import org.csource.fastdfs.StorageClient1; 10 11 import com.ccqtgb.ConnectionPool; 12 import com.ccqtgb.client.FileClient; 13 14 public class FileClientImpl implements FileClient { 15 16 @Override 17 public String uploadFile(File file) { 18 byte[] buff = getFileBuff(file); 19 String file_ext_name = getFileExtName(file.getName()); 20 return send(buff, file_ext_name, null); 21 } 22 23 @Override 24 public String uploadFile(File file, String file_ext_name) { 25 byte[] buff = getFileBuff(file); 26 return send(buff,file_ext_name,null); 27 } 28 29 @Override 30 public String uploadFile(byte[] buff,String file_ext_name) { 31 return send(buff,file_ext_name,null); 32 } 33 34 //上传缓存数据到storage服务器 35 private String send(byte[] buff, String file_ext_name, NameValuePair[] mate_list){ 36 String upPath = null; 37 StorageClient1 storageClient1 = null; 38 ConnectionPool pool = ConnectionPool.getConnectionPool(); 39 storageClient1 = pool.checkout(10); 40 try { 41 upPath = storageClient1.upload_file1(buff, file_ext_name, mate_list); 42 pool.checkin(storageClient1); 43 } catch (IOException e) { 44 //如果出现了IO异常应该销毁此连接 45 pool.drop(storageClient1); 46 e.printStackTrace(); 47 } catch (Exception e) { 48 pool.drop(storageClient1); 49 e.printStackTrace(); 50 } 51 52 return upPath; 53 } 54 55 //将文件缓存到字节数组中 56 private byte[] getFileBuff(File file){ 57 byte[] buff = null; 58 try { 59 FileInputStream inputStream = new FileInputStream(file); 60 buff = new byte[inputStream.available()]; 61 inputStream.read(buff); 62 } catch (FileNotFoundException e) { 63 // TODO Auto-generated catch block 64 e.printStackTrace(); 65 } catch (IOException e) { 66 // TODO Auto-generated catch block 67 e.printStackTrace(); 68 } 69 return buff; 70 } 71 72 //通过文件名称获取文件扩展名 73 private String getFileExtName(String name){ 74 String ext_name = null; 75 if(name != null){ 76 if(name.contains(".")){ 77 ext_name = name.substring(name.indexOf(".")+1); 78 } 79 } 80 return ext_name; 81 } 82 83 }
标签:
原文地址:http://www.cnblogs.com/juanmao-zhang/p/4936568.html