码迷,mamicode.com
首页 > 编程语言 > 详细

JAVA NIO 服务器与客户端实现示例

时间:2014-11-22 23:20:18      阅读:319      评论:0      收藏:0      [点我收藏+]

标签:nio

以下代码只兼容Java 7及以上版本,对于一些关键地方请看注释说明。


公共类:

package com.stevex.app.nio;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;

public class CharsetHelper {
	private static final String UTF_8 = "UTF-8";
	private static CharsetEncoder encoder = Charset.forName(UTF_8).newEncoder();
	private static CharsetDecoder decoder = Charset.forName(UTF_8).newDecoder();
	
	public static ByteBuffer encode(CharBuffer in) throws CharacterCodingException{
		return encoder.encode(in);
	}

	public static CharBuffer decode(ByteBuffer in) throws CharacterCodingException{
		return decoder.decode(in);
	}
}


服务器代码:

package com.stevex.app.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class XiaoNa {
	private ByteBuffer readBuffer;
	private Selector selector;
	
	public static void main(String[] args){
		XiaoNa xiaona = new XiaoNa();
		xiaona.init();
		xiaona.listen();
	}
	
	private void init(){
		readBuffer = ByteBuffer.allocate(1024);
		ServerSocketChannel servSocketChannel;
		
		try {
			servSocketChannel = ServerSocketChannel.open();
			servSocketChannel.configureBlocking(false);
			//绑定端口
			servSocketChannel.socket().bind(new InetSocketAddress(8383));
			
			selector = Selector.open();
			servSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
		} catch (IOException e) {
			e.printStackTrace();
		}		
	}

	private void listen() {
		while(true){
			try{
				selector.select();				
				Iterator ite = selector.selectedKeys().iterator();
				
				while(ite.hasNext()){
					SelectionKey key = (SelectionKey) ite.next();					
					ite.remove();//确保不重复处理
					
					handleKey(key);
				}
			}
			catch(Throwable t){
				t.printStackTrace();
			}							
		}				
	}

	private void handleKey(SelectionKey key)
			throws IOException, ClosedChannelException {
		SocketChannel channel = null;
		
		try{
			if(key.isAcceptable()){
				ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
				channel = serverChannel.accept();//接受连接请求
				channel.configureBlocking(false);
				channel.register(selector, SelectionKey.OP_READ);
			}
			else if(key.isReadable()){
				channel = (SocketChannel) key.channel();
				readBuffer.clear();
				/*当客户端channel关闭后,会不断收到read事件,但没有消息,即read方法返回-1
				 * 所以这时服务器端也需要关闭channel,避免无限无效的处理*/				
				int count = channel.read(readBuffer);
				
				if(count > 0){
					//一定需要调用flip函数,否则读取错误数据
					readBuffer.flip();
					/*使用CharBuffer配合取出正确的数据
					String question = new String(readBuffer.array());  
					可能会出错,因为前面readBuffer.clear();并未真正清理数据
					只是重置缓冲区的position, limit, mark,
					而readBuffer.array()会返回整个缓冲区的内容。
					decode方法只取readBuffer的position到limit数据。
					例如,上一次读取到缓冲区的是"where", clear后position为0,limit为	1024,
					再次读取“bye"到缓冲区后,position为3,limit不变,
					flip后position为0,limit为3,前三个字符被覆盖了,但"re"还存在缓冲区中,
					所以 new String(readBuffer.array()) 返回 "byere",
					而decode(readBuffer)返回"bye"。				
					*/
					CharBuffer charBuffer = CharsetHelper.decode(readBuffer); 
					String question = charBuffer.toString(); 
					String answer = getAnswer(question);
					channel.write(CharsetHelper.encode(CharBuffer.wrap(answer)));
				}
				else{
					//这里关闭channel,因为客户端已经关闭channel或者异常了
					channel.close();				
				}						
			}
		}
		catch(Throwable t){
			t.printStackTrace();
			if(channel != null){
				channel.close();
			}
		}		
	}
	
	private String getAnswer(String question){
		String answer = null;
		
		switch(question){
		case "who":
			answer = "我是小娜\n";
			break;
		case "what":
			answer = "我是来帮你解闷的\n";
			break;
		case "where":
			answer = "我来自外太空\n";
			break;
		case "hi":
			answer = "hello\n";
			break;
		case "bye":
			answer = "88\n";
			break;
		default:
				answer = "请输入 who, 或者what, 或者where";
		}
		
		return answer;
	}
}


客户端代码:

package com.stevex.app.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;


public class Client implements Runnable{
	private BlockingQueue<String> words;
	private Random random;
	
	public static void main(String[] args) {		
		//种多个线程发起Socket客户端连接请求
		for(int i=0; i<10; i++){
			Client c = new Client();
			c.init();
			new Thread(c).start();
		}		
	}

	@Override
	public void run() {		
		SocketChannel channel = null;
		Selector selector = null;
		try {
			channel = SocketChannel.open();
			channel.configureBlocking(false);
			//请求连接
			channel.connect(new InetSocketAddress("localhost", 8383));
			selector = Selector.open();
			channel.register(selector, SelectionKey.OP_CONNECT);
			boolean isOver = false;
			
			while(! isOver){
				selector.select();
				Iterator ite = selector.selectedKeys().iterator();
				while(ite.hasNext()){
					SelectionKey key = (SelectionKey) ite.next();
					ite.remove();
					
					if(key.isConnectable()){
						if(channel.isConnectionPending()){
							if(channel.finishConnect()){
								//只有当连接成功后才能注册OP_READ事件
								key.interestOps(SelectionKey.OP_READ);
								
								channel.write(CharsetHelper.encode(CharBuffer.wrap(getWord())));
								sleep();
							}
							else{
								key.cancel();
							}
						}												
					}
					else if(key.isReadable()){
						ByteBuffer byteBuffer = ByteBuffer.allocate(128);						
						channel.read(byteBuffer);
						byteBuffer.flip();
						CharBuffer charBuffer = CharsetHelper.decode(byteBuffer);
						String answer = charBuffer.toString(); 
						System.out.println(Thread.currentThread().getId() + "---" + answer);
						
						String word = getWord();
						if(word != null){
							channel.write(CharsetHelper.encode(CharBuffer.wrap(word)));
						}
						else{
							isOver = true;
						}
						sleep();						
					}
				}
			}							
		} catch (IOException e) {
			e.printStackTrace();
		}
		finally{
			if(channel != null){
				try {
					channel.close();
				} catch (IOException e) {						
					e.printStackTrace();
				}					
			}
			
			if(selector != null){
				try {
					selector.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}

	private void init() {
		words = new ArrayBlockingQueue<String>(5);
		try {
			words.put("hi");
			words.put("who");
			words.put("what");
			words.put("where");
			words.put("bye");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}	
		
		random = new Random();
	}
	
	private String getWord(){
		return words.poll();
	}

	private void sleep() {
		try {
			TimeUnit.SECONDS.sleep(random.nextInt(3));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}	
	
	private void sleep(long l) {
		try {
			TimeUnit.SECONDS.sleep(l);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

本文出自 “力量来源于赤诚的爱!” 博客,请务必保留此出处http://stevex.blog.51cto.com/4300375/1581376

JAVA NIO 服务器与客户端实现示例

标签:nio

原文地址:http://stevex.blog.51cto.com/4300375/1581376

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!