package com.pool; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; public class MessagePool { public static Queue<String> clintmessageQueue = new LinkedBlockingQueue<String>(); public static Queue<String> serverMessageQueue = new LinkedBlockingQueue<String>(); }接口
package com.pool; public interface IMessagePool { public void addMessage(String message); public String pollMessage(); public boolean isEmpty(); }
package com.pool.impl; import com.pool.IMessagePool; import com.pool.MessagePool; public class ClientMessagePool implements IMessagePool { @Override public void addMessage(String message) { MessagePool.clintmessageQueue.add(message); } @Override public String pollMessage() { return MessagePool.clintmessageQueue.poll(); } @Override public boolean isEmpty() { if(MessagePool.clintmessageQueue.size()>0) return true; else return false; } }
package com.socket; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import org.apache.commons.lang.ArrayUtils; import com.pool.IMessagePool; import com.pool.impl.ClientMessagePool; import com.util.PackageUtil; public class MySocket { private SocketChannel mSocketChannel; private SelectionKey key; public static String CHARSET = "utf-8"; public static String ADDRESS = "127.0.0.1"; public static int HOST = 34521; protected Selector mSelector; protected IMessagePool messagePool = new ClientMessagePool();; ByteBuffer buffer; public MySocket() { try { mSelector = Selector.open(); initSocketChannel(); initBassiness(); } catch (Exception e) { e.printStackTrace(); } finally { try { key.channel().close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 业务逻辑 * * @throws Exception */ private void initBassiness() throws Exception { while (true) { checkWriteable(); // 瞬时检测 if (mSelector.select(100) > 0) { Iterator<SelectionKey> keys = mSelector.selectedKeys() .iterator(); while (keys.hasNext()) { SelectionKey key = keys.next(); if (key.isReadable()) { dispose4Readable(key); } if (key.isValid() && key.isWritable()) { dispose4Writable(key); } keys.remove(); } } } } /** * 可读请求 * * @param key * @throws Exception */ protected void dispose4Readable(SelectionKey key) throws Exception { SocketChannel mSocketChannel = ((SocketChannel) key.channel()); buffer = ByteBuffer.allocate(1024); mSocketChannel.read(buffer); buffer.flip(); this.unPacket(buffer.array(), key); } /** * 可写请求 * * @param key * @throws Exception */ protected void dispose4Writable(SelectionKey key) throws Exception { SocketChannel mSocketChannel = ((SocketChannel) key.channel()); int value = 0; do{ value = mSocketChannel.write(buffer); }while(value!=0); key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); key.interestOps(SelectionKey.OP_READ); } /** * 解包 * * @param buf * @return */ public byte[] unPacket(byte[] buf, SelectionKey key) { int len = buf.length;// 37 int i; for (i = 0; i < len; i++) { if (len < i + PackageUtil.PACKAGEHEADERLENGTH + PackageUtil.PACKAGESAVEDATALENGTH) { break; } String tmp = new String(ArrayUtils.subarray(buf, i, i + PackageUtil.PACKAGEHEADERLENGTH)); if (tmp.equals(PackageUtil.PACKAGEHEADER)) { int messageLength = PackageUtil.byte2Int(ArrayUtils.subarray( buf, i + PackageUtil.PACKAGEHEADERLENGTH, i + PackageUtil.PACKAGEHEADERLENGTH + PackageUtil.PACKAGESAVEDATALENGTH)); if (len < i + PackageUtil.PACKAGEHEADERLENGTH + PackageUtil.PACKAGESAVEDATALENGTH + messageLength) { break; } byte[] data = ArrayUtils.subarray(buf, i + PackageUtil.PACKAGEHEADERLENGTH + PackageUtil.PACKAGESAVEDATALENGTH, i + PackageUtil.PACKAGEHEADERLENGTH + PackageUtil.PACKAGESAVEDATALENGTH + messageLength); String message = new String(data); System.out.println(message); // Filter.filterRead(message, key, messagePool); i += PackageUtil.PACKAGEHEADERLENGTH + PackageUtil.PACKAGESAVEDATALENGTH + messageLength - 1; } } if (i == len) { return new byte[0]; } return ArrayUtils.subarray(buf, i, buf.length); } void initSocketChannel() throws Exception { mSocketChannel = SocketChannel.open(); mSocketChannel.connect(new InetSocketAddress(ADDRESS, HOST)); mSocketChannel.configureBlocking(false); key = mSocketChannel.register(mSelector, SelectionKey.OP_CONNECT|SelectionKey.OP_READ); } void checkWriteable() { if (messagePool.isEmpty()) { String values = messagePool.pollMessage(); System.out.println(" "+values); buffer = ByteBuffer.wrap(PackageUtil.packet(values.getBytes())); key.interestOps(SelectionKey.OP_WRITE); } } }
package com.socket; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import org.apache.commons.lang.ArrayUtils; import com.filter.Filter; import com.pool.IMessagePool; import com.pool.impl.ServerMessagePoll; import com.util.PackageUtil; public class MyServerSocket { private ServerSocketChannel mServerSocketChannel; private static MyServerSocket serverSocket; public static String CHARSET = "utf-8"; public static String ADDRESS = "127.0.0.1"; public static int HOST = 34521; protected Selector mSelector; protected IMessagePool messagePool = new ServerMessagePoll();; ByteBuffer buffer; private MyServerSocket() throws Exception { try { mSelector = Selector.open(); initSocketChannel(); initBassiness(); } catch (Exception e) { e.printStackTrace(); } finally { Set<SelectionKey> keys = mSelector.keys(); { for (SelectionKey key : keys) { try { key.channel().close(); } catch (IOException e) { e.printStackTrace(); continue; } } } } } /** * 业务逻辑 * * @throws Exception */ private void initBassiness() throws Exception { while (true) { checkWriteable(); // 瞬时检测 if (mSelector.select() > 0) { Iterator<SelectionKey> keys = mSelector.selectedKeys() .iterator(); while (keys.hasNext()) { SelectionKey key = keys.next(); if (key.isAcceptable()) { dispose4Acceptable(key); } if (key.isReadable()) { dispose4Readable(key); } if (key.isValid() && key.isWritable()) { dispose4Writable(key); } keys.remove(); } } } } /** * 响应读 * @param key * @throws Exception */ protected void dispose4Readable(SelectionKey key) throws Exception { SocketChannel mSocketChannel = ((SocketChannel) key.channel()); buffer = ByteBuffer.allocate(1024); mSocketChannel.read(buffer); buffer.flip(); this.unPacket(buffer.array(), key); } /** * 可写请求 * * @param key * @throws Exception */ protected void dispose4Writable(SelectionKey key) throws Exception { SocketChannel mSocketChannel = ((SocketChannel) key.channel()); if(mSocketChannel.write(buffer)!=-1){ buffer.clear(); } key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); // key.interestOps(SelectionKey.OP_READ); } /** * 解包 * * @param buf * @return */ private byte[] unPacket(byte[] buf, SelectionKey key) { int len = buf.length;// 37 int i; for (i = 0; i < len; i++) { if (len < i + PackageUtil.PACKAGEHEADERLENGTH + PackageUtil.PACKAGESAVEDATALENGTH) { break; } String tmp = new String(ArrayUtils.subarray(buf, i, i + PackageUtil.PACKAGEHEADERLENGTH)); if (tmp.equals(PackageUtil.PACKAGEHEADER)) { int messageLength = PackageUtil.byte2Int(ArrayUtils.subarray( buf, i + PackageUtil.PACKAGEHEADERLENGTH, i + PackageUtil.PACKAGEHEADERLENGTH + PackageUtil.PACKAGESAVEDATALENGTH)); if (len < i + PackageUtil.PACKAGEHEADERLENGTH + PackageUtil.PACKAGESAVEDATALENGTH + messageLength) { break; } byte[] data = ArrayUtils.subarray(buf, i + PackageUtil.PACKAGEHEADERLENGTH + PackageUtil.PACKAGESAVEDATALENGTH, i + PackageUtil.PACKAGEHEADERLENGTH + PackageUtil.PACKAGESAVEDATALENGTH + messageLength); String message = new String(data); System.out.println("server read message" + message); Filter.filterRead(message, key, messagePool); i += PackageUtil.PACKAGEHEADERLENGTH + PackageUtil.PACKAGESAVEDATALENGTH + messageLength - 1; } } if (i == len) { return new byte[0]; } return ArrayUtils.subarray(buf, i, buf.length); } public static MyServerSocket newInstence() throws Exception { if (serverSocket == null) { return new MyServerSocket(); } return serverSocket; } /** * SocketChannel初始化 * @throws Exception */ void initSocketChannel() throws Exception { mServerSocketChannel = ServerSocketChannel.open(); mServerSocketChannel.configureBlocking(false); mServerSocketChannel.bind(new InetSocketAddress(ADDRESS, HOST)); mServerSocketChannel.register(mSelector, SelectionKey.OP_ACCEPT); } void dispose4Acceptable(SelectionKey key) throws Exception { SocketChannel mSocketChannel = ((ServerSocketChannel) key.channel()) .accept(); mSocketChannel.configureBlocking(false); mSocketChannel.register(mSelector, SelectionKey.OP_READ); } void checkWriteable() { if (messagePool.isEmpty()) { String value = messagePool.pollMessage(); String result = Filter.filterWrite(value, mSelector); if (result != null) { System.out.println("server:" + result); buffer = ByteBuffer.wrap(PackageUtil.packet(result.getBytes())); } } } }
package com.filter; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Set; import com.model.BaseModule; import com.model.Chat; import com.model.User; import com.pool.IMessagePool; import com.util.StringUtil; public class Filter { private static final String LOGIN = "login"; private static BaseModule modul = new BaseModule(); private static SelectionKey selectionKey=null; private static Selector selector = null; /** * TODO 线程启动 * * @param message * @return */ public static void filterRead(String message, SelectionKey key, IMessagePool messagePool) { selectionKey = key; try { BaseModule filterModul = (BaseModule) StringUtil.string2Bean(modul, message); if(filterType(filterModul.getType())){ if(filterValue(filterModul.getMessage())){ // messagePool.addMessage(message); }else{ } }else{ messagePool.addMessage(message); } } catch (Exception e) { return; } } public static String filterWrite(String message,Selector mSelector){ selector = mSelector; return filter(message); } private static String filter(String message){ BaseModule filterModul = (BaseModule) StringUtil.string2Bean(modul, message); Chat chat = (Chat) StringUtil.string2Bean(new Chat(), filterModul.getMessage()); Set<SelectionKey> keys=selector.keys(); for(SelectionKey key:keys){ String markString=key.attachment()!=null?key.attachment().toString():null; if(markString!=null && markString.equals(chat.getTo())){ key.interestOps(SelectionKey.OP_WRITE); return chat.getMessage(); } } return null; } /** * 过滤类型 * @param value * @return */ private static boolean filterType(String value) { if (LOGIN.equals(value)) { return true; } return false; } /** * 过滤内容 * @param value * @return */ private static boolean filterValue(String value) { return filterLogin(value); } private static boolean filterLogin(String value) { User user = (User) StringUtil.string2Bean(new User(), value); if (user.getUserName() != null) { selectionKey.attach(user.getUserName()); return true; } return false; } }
package com.service; public interface IMessageService { void doMessage(String message); }util
package com.util; import java.io.UnsupportedEncodingException; import org.apache.commons.lang.ArrayUtils; public class PackageUtil { public static final String PACKAGEHEADER = "?-?";//消息长度 public static final int PACKAGEHEADERLENGTH = 7; //数据头长?? public static final int PACKAGESAVEDATALENGTH = 4; //数据长度站的位数 /** * 打包 * @param pkg 要打包的字节数组 * @return */ public static byte[] packet(byte[] pkg) { int intValue = pkg.length; byte[] b = new byte[4]; for (int i = 0; i < 4; i++) { b[i] = (byte) (intValue >> 8 * (3 - i) & 0xFF); // System.out.print(Integer.toBinaryString(b[i])+" "); //System.out.println((b[i] & 0xFF) + " "); } try { byte[] newPkg = ArrayUtils.addAll(PackageUtil.PACKAGEHEADER.getBytes("utf-8"), b); newPkg = ArrayUtils.addAll(newPkg, pkg); return newPkg; } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return null; } /** * 字节数组转整形 * @param b * @return */ public static int byte2Int(byte[] b) { int intValue = 0; for (int i = 0; i < b.length; i++) { intValue += (b[i] & 0xFF) << (8 * (3 - i)); // System.out.print(Integer.toBinaryString(intValue)+" "); } return intValue; } /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub } }
package com.util; import com.google.gson.Gson; public class StringUtil { private static Gson json =new Gson(); /** * 将字符串专为json * @param clazz * @param message * @return */ public static Object string2Bean(Object clazz,String message){ return json.fromJson(message, clazz.getClass()); } /** * 将json专为字符串 * @param clazz * @return */ public static String bean2Json(Object clazz){ return json.toJson(clazz); } }
package com.model; /** * 默认包装 * @author Administrator * */ public class BaseModule { String type ; String message; public String getType() { return type; } public void setType(String type) { this.type = type; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }
原文地址:http://blog.csdn.net/qzshiyongjie123/article/details/44117391