码迷,mamicode.com
首页 > Web开发 > 详细

mina的编码和解码以及断包的处理,发送自定义协议,仿qq聊天,发送xml或json和

时间:2014-06-15 18:16:25      阅读:376      评论:0      收藏:0      [点我收藏+]

标签:mina协议开发   mina仿qq   mina通讯心跳   mina发xml消息   mina发json消息   

最近一段时间以来,mina很火,和移动开发一样,异常的火爆。前面写了几篇移动开发的文章,都还不错,你们的鼓励就是我最大的动力。好了,废话少说。我们来看下tcp通讯吧。
tcp通讯对于java来说是很简单的。就是socket,也就是大家常说的套接字。大家不要把它看的很难。说白了tcp通讯其实就是数据流的读写。一条输入流,一条输出流。分别复杂发消息和接收消息。
明白了这些,ok,我们来看看我写的例子吧。先看服务器端的测试类的源码:
package com.minaqq.test;

import com.minaqq.server.ServerMsgProtocol;
import com.minaqq.worker.ServerSendMsgThread;

public class MsgServerTest {
	public static void main(String[] args) {
		if(ServerMsgProtocol.serverStart()){
			System.out.println("服务器启动成功......");
			ServerSendMsgThread ssmt=new ServerSendMsgThread();
			ssmt.start();
			System.out.println("工作线程启动成功......");
		}
	}
}

服务端连接代码:

package com.minaqq.server;

import java.io.IOException;
import java.net.InetSocketAddress;

import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.keepalive.KeepAliveFilter;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
import org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler;
import org.apache.mina.filter.logging.LogLevel;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.SocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

import com.minaqq.protocol.MsgProtocol;

/**
 * @see 服务器启动类,字符串消息测试类
 * @author Herman.Xiong
 * @date 2013年12月6日 09:23:31
 * @file MinaServer.java
 * @package com.minaqq.server
 * @project MINA_QQ
 * @version 1.0
 * @since jdk1.6,mina 2.0
 */
public class ServerMsgProtocol {
	
	//30秒后超时 
    private static final int IDELTIMEOUT = 30;
    //15秒发送一次心跳包
    private static final int HEARTBEATRATE = 15;
    
    private static SocketAcceptor acceptor;

    private ServerMsgProtocol() {}
    
    public static SocketAcceptor getAcceptor(){
    	if(null==acceptor){
    		// 创建非阻塞的server端的Socket连接
    		acceptor = new NioSocketAcceptor();
    	}
    	return acceptor;
    }

    public static boolean serverStart() {
        DefaultIoFilterChainBuilder filterChain = getAcceptor().getFilterChain();
        // 添加编码过滤器 处理乱码、编码问题
        filterChain.addLast("codec", new ProtocolCodecFilter(new MsgProtocol()));
        LoggingFilter loggingFilter = new LoggingFilter();
        loggingFilter.setMessageReceivedLogLevel(LogLevel.INFO);
        loggingFilter.setMessageSentLogLevel(LogLevel.INFO);
        // 添加日志过滤器
        filterChain.addLast("loger", loggingFilter);
        // 设置核心消息业务处理器
        getAcceptor().setHandler(new ServerMessageHandler());
        //KeepAliveMessageFactory heartBeatFactory = new KeepAliveMessageFactoryImpl();
        //KeepAliveRequestTimeoutHandler heartBeatHandler = new KeepAliveRequestTimeoutHandlerImpl();
        //KeepAliveFilter heartBeat = new KeepAliveFilter(heartBeatFactory,IdleStatus.BOTH_IDLE, heartBeatHandler);
        // 是否回发 
        //heartBeat.setForwardEvent(false);
        // 发送频率 
        //heartBeat.setRequestInterval(HEARTBEATRATE);
        //getAcceptor().getFilterChain().addLast("heartbeat", heartBeat);
        getAcceptor().getSessionConfig().setReceiveBufferSize(2048*5000);//接收缓冲区1M
        getAcceptor().getSessionConfig().setBothIdleTime(30);
        //getAcceptor().getSessionConfig().setKeepAlive(true);
        // 设置session配置,30秒内无操作进入空闲状态
        getAcceptor().getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, IDELTIMEOUT);
        try {
            // 绑定端口3456
        	getAcceptor().bind(new InetSocketAddress(8888));
        	return true;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return false;
    }

}
服务器的消息处理:
package com.minaqq.server;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.mina.core.future.CloseFuture;
import org.apache.mina.core.future.IoFuture;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;

import com.minaqq.domain.MsgPack;
/**
 * @see 处理服务器端消息
 * @author Herman.Xiong
 * @date 2012-6-26 下午01:12:34
 * @file ServerMessageHandler.java
 * @package com.minaqq.server
 * @project MINA_QQ
 * @version 1.0
 * @since jdk1.6,mina 2.0
 */
public class ServerMessageHandler implements IoHandler{

    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
        System.out.println("服务器发生异常:"+ cause.toString());
    }

    public void messageReceived(IoSession session, Object message) throws Exception {
    	MsgPack mp=(MsgPack)message;
        System.out.println("收到客户端数据messageReceived----------:"+ mp.toString());
		/*//请求协议
		mp.setMsgMethod(3000);
		mp.setMsgPack("我是服务器发的消息");
		mp.setMsgLength(mp.getMsgPack().getBytes().length);
        session.write(mp);*/
        /*String content = mp.toString();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
        String datetime = sdf.format(new Date());
        System.out.println(datetime+"服务器接收到数据的内容为messageReceived----------: " + content);*/
        // 拿到所有的客户端Session
        /*Collection<IoSession> sessions = session.getService().getManagedSessions().values();
        // 向所有客户端发送数据
        for (IoSession sess : sessions) {
            sess.write(datetime + "\t" + content);
        }*/
        
    }

    public void messageSent(IoSession session, Object message) throws Exception {
       /* System.out.println("服务器发送消息messageSent----------: "+ message);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
        String datetime = sdf.format(new Date());
        System.out.println(datetime+"服务器发送消息messageSent----------: "+message.toString());*/
    }
 
    public void sessionClosed(IoSession session) throws Exception {
        System.out.println("关闭当前session: "+session.getId()+session.getRemoteAddress());
        CloseFuture closeFuture = session.close(true);
        closeFuture.addListener(new IoFutureListener<IoFuture>() {
            public void operationComplete(IoFuture future) {
                if (future instanceof CloseFuture) {
                    ((CloseFuture) future).setClosed();
                    System.out.println("sessionClosed CloseFuture setClosed-->"+ future.getSession().getId());
                }
            }
        });
    }

    public void sessionCreated(IoSession session) throws Exception {
        System.out.println("创建一个新连接:"+ session.getRemoteAddress()+"  id:  "+session.getId());
        session.write("welcome to the chat room !");
    }

    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
        System.out.println("当前连接处于空闲状态:"+ session.getRemoteAddress()+ status);
    }

    public void sessionOpened(IoSession session) throws Exception {
        System.out.println("打开一个session id:"+ session.getId()+"  空闲连接个数IdleCount:  "+ session.getBothIdleCount());
    }
}
自定义协议类:

/**
 * @see 自定义协议
 * @author Herman.Xiong
 * @date 2014年6月11日 10:30:40
 */
public class MsgProtocol implements ProtocolCodecFactory{
	private static final Charset charset=Charset.forName("UTF-8");
  
    public ProtocolDecoder getDecoder(IoSession session) throws Exception {  
        return new MsgProtocolDecoder(charset);
    }  
  
    public ProtocolEncoder getEncoder(IoSession session) throws Exception {  
        return new MsgProtocolEncoder(charset);
    }
}
协议解码类:
package com.minaqq.charset;

import java.nio.ByteOrder;
import java.nio.charset.Charset;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;

import com.minaqq.domain.MsgPack;
/**
 * @see 协议解码
 * @author Herman.Xiong
 * @date 2014年6月11日 16:47:24
 */
public class MsgProtocolDecoder extends CumulativeProtocolDecoder  {  
    private Charset charset=null;  
  
    public MsgProtocolDecoder() {  
        this(Charset.defaultCharset());  
    }  
    
    public MsgProtocolDecoder(Charset charset) {  
        this.charset = charset;  
    }
    
	public void decode1(IoSession is, IoBuffer buf, ProtocolDecoderOutput out)
			throws Exception {
		buf.order(ByteOrder.LITTLE_ENDIAN);
		MsgPack mp=new MsgPack();
		//获取消息的内容长度
		mp.setMsgLength(buf.getInt());
		//获取消息的功能函数
		mp.setMsgMethod(buf.getInt());
		byte[] msg=new byte[mp.getMsgLength()];
		buf.get(msg);
		mp.setMsgPack(new String(msg,charset));
		buf.flip();
		out.write(mp);
	}
	
	public void dispose(IoSession arg0) throws Exception {
		
	}
	
	public void finishDecode(IoSession arg0, ProtocolDecoderOutput arg1)
			throws Exception {
		
	}

	public void decode0(IoSession arg0, IoBuffer arg1, ProtocolDecoderOutput arg2)
			throws Exception {
		int limit = arg1.limit();
		byte[] bytes = new byte[limit];
		arg1.get(bytes);
		arg2.write(bytes);
	}

	protected boolean doDecode(IoSession session, IoBuffer ioBuffer, ProtocolDecoderOutput out) throws Exception {
		ioBuffer.order(ByteOrder.LITTLE_ENDIAN); 
		MsgPack mp = (MsgPack) session.getAttribute("nac-msg-pack"); // 从session对象中获取“xhs-upload”属性值 
		if(null==mp){
			 if (ioBuffer.remaining() >= 8) {
				 //取消息体长度
				 int msgLength = ioBuffer.getInt(); 
				 int msgMethod = ioBuffer.getInt();
				 mp=new MsgPack();
				 mp.setMsgLength(msgLength);
				 mp.setMsgMethod(msgMethod);
				 session.setAttribute("nac-msg-pack",mp);
				 return true;
			 }
			 return false;
		}
		if(ioBuffer.remaining()>=mp.getMsgLength()){
			byte [] msgPack=new byte[mp.getMsgLength()];
			ioBuffer.get(msgPack);
			mp.setMsgPack(new String(msgPack,charset));
			session.removeAttribute("nac-msg-pack");
			out.write(mp);
			return true;
		}
		return false;
	}   

}
协议编码类:
package com.minaqq.charset;

import java.io.NotSerializableException;
import java.io.Serializable;
import java.nio.ByteOrder;
import java.nio.charset.Charset;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;

import com.minaqq.domain.MsgPack;

public class MsgProtocolEncoder extends ProtocolEncoderAdapter{
	private Charset charset=null;

    public MsgProtocolEncoder(Charset charset) {
        this.charset = charset;     
    }     
    //在此处实现对MsgProtocolEncoder包的编码工作,并把它写入输出流中     
    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { 
        if(message instanceof MsgPack){
        	 MsgPack mp = (MsgPack) message; 
        	 IoBuffer buf = IoBuffer.allocate(mp.getMsgLength());
        	 buf.order(ByteOrder.LITTLE_ENDIAN);
             buf.setAutoExpand(true);    
             //设置消息内容的长度
             buf.putInt(mp.getMsgLength()); 
             //设置消息的功能函数
             buf.putInt(mp.getMsgMethod());
             if (null != mp.getMsgPack()) {
            	 buf.put(mp.getMsgPack().getBytes(charset));
             }   
             buf.flip();     
             out.write(buf);  
             out.flush();
             buf.free();
        }
    }     
    public void dispose() throws Exception {     
    }
    
	public void encode0(IoSession arg0, Object arg1, ProtocolEncoderOutput arg2)
			throws Exception {
		if (!(arg1 instanceof Serializable)) {
	        throw new NotSerializableException();
	    }
	    IoBuffer buf = IoBuffer.allocate(64);
	    buf.setAutoExpand(true);
	    buf.putObject(arg1);

	    int objectSize = buf.position() - 4;
	    if (objectSize > 1024) {
	        throw new IllegalArgumentException("The encoded object is too big: " + objectSize + " (> " + 1024
	                + ')');
	    }

	    buf.flip();
	    arg2.write(buf);
	}
	
}
协议实体类:

package com.minaqq.domain;

import java.io.Serializable;

/**
 * @see 自定义数据包
 * @author Herman.Xiong
 * @date 2014年6月11日 11:31:45
 */
public class MsgPack implements Serializable{
	/**
	 * 序列化和反序列化的版本号
	 */
	private static final long serialVersionUID = 1L;
	//消息长度
	private int msgLength;
	//消息方法
	private int msgMethod;
	//消息包内容
	private String msgPack;
	
	public MsgPack() {}

	public int getMsgLength() {
		return msgLength;
	}

	public void setMsgLength(int msgLength) {
		this.msgLength = msgLength;
	}

	public int getMsgMethod() {
		return msgMethod;
	}

	public void setMsgMethod(int msgMethod) {
		this.msgMethod = msgMethod;
	}

	public String getMsgPack() {
		return msgPack;
	}

	public void setMsgPack(String msgPack) {
		this.msgPack = msgPack;
	}

	public MsgPack(int msgLength, int msgMethod, String msgPack) {
		this.msgLength = msgLength;
		this.msgMethod = msgMethod;
		this.msgPack = msgPack;
	}

	public String toString() {
		return "MsgPack [msgLength=" + msgLength + ", msgMethod=" + msgMethod
				+ ", msgPack=" + msgPack + "]";
	}
	
}
心跳信息工厂类:
package com.minaqq.server;

import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
/**
 * @see 发送心跳包的内容
 * getResponse()---->isResponse();获取数据判断心跳事件(目的是判断是否触发心跳超时异常)
 * isRequest()----->getRequest(); 写回数据是心跳事件触发的数据(目的写回给服务器(客户端)心跳包)
 * @author Herman.Xiong
 */
public class KeepAliveMessageFactoryImpl implements KeepAliveMessageFactory{
	
	//心跳包内容
    private static final String HEARTBEATREQUEST = "HEARTBEATREQUEST";
    private static final String HEARTBEATRESPONSE = "HEARTBEATRESPONSE";
    
	/**
     * @see 返回给客户端的心跳包数据 return 返回结果才是客户端收到的心跳包数据
     * @author Herman.Xiong
     */
    public Object getRequest(IoSession session) {
        return HEARTBEATREQUEST;
    }

    /**
     * @see 接受到的客户端数据包
     * @author Herman.Xiong
     */
    public Object getResponse(IoSession session, Object request) {
        return request;
    }

    /**
     * @see 判断是否是客户端发送来的的心跳包此判断影响 KeepAliveRequestTimeoutHandler实现类判断是否心跳包发送超时
     * @author Herman.Xiong
     */
    public boolean isRequest(IoSession session, Object message) {
        if(message.equals(HEARTBEATRESPONSE)){
            System.out.println("接收到客户端心数据包引发心跳事件                 心跳数据包是》》" + message);
	        return true;
	    }
        return false;
    }

    /**
     * @see  判断发送信息是否是心跳数据包此判断影响 KeepAliveRequestTimeoutHandler实现类 判断是否心跳包发送超时
     * @author Herman.Xiong
     */
    public boolean isResponse(IoSession session, Object message) {
        if(message.equals(HEARTBEATREQUEST)){
            System.out.println("服务器发送数据包中引发心跳事件: " + message);
            return true;
        }
        return false;
    }
}
心跳业务处理类:
package com.minaqq.server;

import org.apache.mina.core.future.CloseFuture;
import org.apache.mina.core.future.IoFuture;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.keepalive.KeepAliveFilter;
import org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler;
/**
 * @see 当心跳超时时的处理,也可以用默认处理 这里like
 * KeepAliveRequestTimeoutHandler.LOG的处理
 * @author Herman.Xiong
 */
public class KeepAliveRequestTimeoutHandlerImpl  implements KeepAliveRequestTimeoutHandler {
	/**
     * @see org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler心跳超时处理
     * @author Herman.Xiong
     */
    public void keepAliveRequestTimedOut(KeepAliveFilter filter,
            IoSession session) throws Exception {
        System.out.println("服务器端心跳包发送超时处理(即长时间没有发送(接受)心跳包)---关闭当前长连接");
        CloseFuture closeFuture = session.close(true);
        closeFuture.addListener(new IoFutureListener<IoFuture>() {
            public void operationComplete(IoFuture future) {
                if (future instanceof CloseFuture) {
                    ((CloseFuture) future).setClosed();
                    System.out.println("sessionClosed CloseFuture setClosed-->"+ future.getSession().getId());
                }
            }
        });
    }
}
服务器发送数据包的线程类:
package com.minaqq.worker;

import java.util.Map;

import org.apache.mina.core.session.IoSession;

import com.minaqq.domain.MsgPack;
import com.minaqq.server.ServerMsgProtocol;
import com.minaqq.utils.XmlUtils;
/**
 * @see 服务器端发送数据
 * @author Herman.Xiong
 * @date 2014年6月9日 10:38:59
 */
public class ServerSendMsgThread extends Thread{
	
	public void run() {
		while(true){
			if(null!=ServerMsgProtocol.getAcceptor()){
				System.out.println("MinaServer.getAcceptor().getManagedSessionCount() is "+ServerMsgProtocol.getAcceptor().getManagedSessionCount());
				
				Map<Long, IoSession> map=ServerMsgProtocol.getAcceptor().getManagedSessions();
				for (Long key : map.keySet()) {
					IoSession is = map.get(key);
					//SocketAddress sa=is.getRemoteAddress();
					//InetSocketAddress isa=(InetSocketAddress)sa;
					//is.write("我是中文测试"+"session id is "+key+"  hostName:"+isa.getHostName()+"   address:"+isa.getAddress()+"   port:"+isa.getPort()+"        isa.toString:"+isa.toString());
					MsgPack mp=new MsgPack();
					//请求协议
					mp.setMsgMethod(1000);
					//mp.setMsgPack("我是服务端");
					String str="";
					for (int i = 0; i < 100; i++) {
						str+=XmlUtils.getXml();
					}
					mp.setMsgPack(str);
					mp.setMsgLength(mp.getMsgPack().getBytes().length);
					try {
						is.write(mp);
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}else {
				System.out.println("MinaServer.getAcceptor is null ");
			}
			try {
				Thread.sleep(3000);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}
客户端测试类:
package com.minaqq.test;

import com.minaqq.client.ClientMsgProtocol;
import com.minaqq.domain.MsgPack;
import com.minaqq.worker.ClientSendMsgThread;

public class MsgClientTest {
	public static void main(String[] args) {
		ClientMsgProtocol.clientStart();
		System.out.println("客户端启动成功......");
		ClientSendMsgThread csmt=new ClientSendMsgThread();
		csmt.start();
		/*MsgPack mp=new MsgPack();
		//请求协议
		mp.setMsgMethod(2000);
		mp.setMsgPack("我是客户端");
		mp.setMsgLength(mp.getMsgPack().getBytes().length);
		ClientMsgProtocol.getIoSession().write(mp);*/
		System.out.println("客户端工作线程启动成功......");
	}
}
客户端创建连接类:
package com.minaqq.client;

import java.net.InetSocketAddress;

import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LogLevel;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;

import com.minaqq.protocol.MsgProtocol;

/**
 * @see 模拟客户端;
 * 用于连接服务端,并向服务端发送消息
 * @author Herman.Xiong
 * @date 2013年11月26日 11:27:50
 * @version 1.0
 * @serial jdk 1.6
 */
public class ClientMsgProtocol {
	
	private static NioSocketConnector connector ;
	
	private static IoSession is;
	
	public static NioSocketConnector getConnector(){
    	if(null==connector){
    		// 创建非阻塞的server端的Socket连接
    		connector = new NioSocketConnector();
    	}
    	return connector;
    }
	
	public static IoSession getIoSession(){
    	return is;
	}
	
	public static void clientStart(){
		// 创建客户端连接器
		NioSocketConnector connector = getConnector(); 
		connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MsgProtocol()));
		LoggingFilter loggingFilter = new LoggingFilter();
        loggingFilter.setMessageReceivedLogLevel(LogLevel.INFO);
        loggingFilter.setMessageSentLogLevel(LogLevel.INFO);
		connector.getFilterChain().addLast("logger", loggingFilter); 
		connector.getSessionConfig().setReceiveBufferSize(2048*5000);//接收缓冲区1M
		connector.setConnectTimeoutMillis(30000); // 设置连接超时
		connector.setHandler(new TimeClientHandler());// 设置消息处理器
		ConnectFuture cf = connector.connect(new InetSocketAddress("10.10.2.136",8888));// 建立连接
		cf.awaitUninterruptibly();// 等待连接创建完成
		try {
			is=cf.getSession();
			//getIoSession().write(new String(XmlUtils.getXml().getBytes("UTF-8")));// 发送消息
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
}
客户端消息处理事件类:
package com.minaqq.client;

import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;

import com.minaqq.domain.MsgPack;


/**
 * @see 处理接收客户端消的息事件
 * @author Herman.Xiong
 * @date 2013年11月26日 11:23:32
 * @version 1.0
 * @since jdk1.6
 */
public class TimeClientHandler implements IoHandler{
	
	/**
	 * 接收客户端发送的消息
	 */
	public void messageReceived(IoSession session, Object message) throws Exception { 
		MsgPack mp=(MsgPack)message;
		System.out.println("收到服务端发来的消息:"+mp.toString());// 显示接收到的消息
	}

	public void exceptionCaught(IoSession arg0, Throwable arg1)
			throws Exception {
		
	}

	public void messageSent(IoSession arg0, Object arg1) throws Exception {
		
	}

	public void sessionClosed(IoSession arg0) throws Exception {
		
	}

	public void sessionCreated(IoSession arg0) throws Exception {
		
	}

	public void sessionIdle(IoSession arg0, IdleStatus arg1) throws Exception {
		
	}

	public void sessionOpened(IoSession arg0) throws Exception {
		
	}
}
客户端发送消息线程类:
package com.minaqq.worker;

import com.minaqq.client.ClientMsgProtocol;
import com.minaqq.domain.MsgPack;
/**
 * @see 模拟客户端发送数据
 * @author Herman.Xiong
 * @date 2014年6月9日 10:38:59
 */
public class ClientSendMsgThread extends Thread{
	public void run() {
		while(true){
			if(null!=ClientMsgProtocol.getConnector()){
				try {
					//ClientMsgProtocol.getIoSession().write(new String("我是客户端".getBytes("UTF-8")));
					MsgPack mp=new MsgPack();
					//请求协议
					mp.setMsgMethod(2000);
					mp.setMsgPack("我是客户端");
					mp.setMsgLength(mp.getMsgPack().getBytes().length);
					ClientMsgProtocol.getIoSession().write(mp);
				} catch (Exception e) {
					e.printStackTrace();
				}
			}else {
				System.out.println("MinaServer.getAcceptor is null ");
			}
			try {
				Thread.sleep(3000);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}
发送xml消息的工具类:
package com.minaqq.utils;

import com.minaqq.domain.Address;
import com.minaqq.domain.House;
import com.minaqq.domain.Person;
import com.minaqq.domain.PhoneNumber;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.xml.DomDriver;

public class XmlUtils {
	
	public static void testXStream(){
		XStream xstream=new XStream(new DomDriver());
		xstream.alias("PERSON", Person.class);
		xstream.alias("ADDRESS",Address.class);
		xstream.alias("PHONENUMBER", PhoneNumber.class);
		xstream.alias("HOUSE", House.class);
		Person person=(Person)xstream.fromXML(XmlUtils.getXml());
		System.out.println(person.toString());
	}
	
	public static String getXml(){
		StringBuffer sb=new StringBuffer("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");//
		sb.append("<PERSON firstName=\"Herman\">");
		sb.append("<lastName>Xiong</lastName>  ");
		sb.append("<phonex>");
		sb.append("<code>0</code>");
		sb.append("<number>1234567</number>");
		sb.append("</phonex>");
		sb.append("<fax>");
		sb.append("<code>0</code>");
		sb.append("<number>7654321</number>");
		sb.append("</fax>");
		sb.append("<addList>");
		sb.append("<ADDRESS>");
		sb.append("<add>上海市</add>");
		sb.append("<zipCode>123456</zipCode>");
		sb.append("</ADDRESS>");
		sb.append("</addList>");
		sb.append("<house>");
		sb.append("<HOUSE>");
		sb.append("<size>300万</size>");
		sb.append("<price>120平方米</price>");
		sb.append("</HOUSE>");
		sb.append("<HOUSE>");
		sb.append("<size>500万</size>");
		sb.append("<price>130平方米</price>");
		sb.append("</HOUSE>");
		sb.append("<HOUSE>");
		sb.append("<size>160万</size>");
		sb.append("<price>61.5平方米</price>");
		sb.append("</HOUSE>");
		sb.append("</house>");
		sb.append("</PERSON>\n");
		return sb.toString();
	}
}
xml有关的实体类:
package com.minaqq.domain;

import java.util.Arrays;
import java.util.List;

import com.thoughtworks.xstream.annotations.XStreamAsAttribute;

public class Person {
	@XStreamAsAttribute
	private String firstName;
	private String lastName;
	private PhoneNumber phonex;
	private PhoneNumber fax;
	private List<Address> addList;
	private House[] house;
	public String getFirstName() {
		return firstName;
	}
	public void setFirstName(String firstName) {
		this.firstName = firstName;
	}
	public String getLastName() {
		return lastName;
	}
	public void setLastName(String lastName) {
		this.lastName = lastName;
	}
	public PhoneNumber getPhonex() {
		return phonex;
	}
	public void setPhonex(PhoneNumber phonex) {
		this.phonex = phonex;
	}
	public PhoneNumber getFax() {
		return fax;
	}
	public void setFax(PhoneNumber fax) {
		this.fax = fax;
	}
	public List<Address> getAddList() {
		return addList;
	}
	public void setAddList(List<Address> addList) {
		this.addList = addList;
	}
	public House[] getHouse() {
		return house;
	}
	public void setHouse(House[] house) {
		this.house = house;
	}
	public Person() {
	}
	public Person(String firstName, String lastName) {
		this.firstName=firstName;
		this.lastName=lastName;
	}
	public Person(String firstName, String lastName, PhoneNumber phonex,
			PhoneNumber fax) {
		this.firstName = firstName;
		this.lastName = lastName;
		this.phonex = phonex;
		this.fax = fax;
	}
	public Person(String firstName, String lastName, PhoneNumber phonex,
			PhoneNumber fax, List<Address> addList) {
		this.firstName = firstName;
		this.lastName = lastName;
		this.phonex = phonex;
		this.fax = fax;
		this.addList = addList;
	}
	public Person(String firstName, String lastName, PhoneNumber phonex,
			PhoneNumber fax, List<Address> addList, House[] house) {
		this.firstName = firstName;
		this.lastName = lastName;
		this.phonex = phonex;
		this.fax = fax;
		this.addList = addList;
		this.house = house;
	}
	@Override
	public String toString() {
		return "Person [addList=" + addList + ", fax=" + fax + ", firstName="
				+ firstName + ", house=" + Arrays.toString(house)
				+ ", lastName=" + lastName + ", phonex=" + phonex + "]";
	}
}
xml有关的实体类:
package com.minaqq.domain;

public class Address {
	private String add;
	private String zipCode;
	public String getAdd() {
		return add;
	}
	public void setAdd(String add) {
		this.add = add;
	}
	public String getZipCode() {
		return zipCode;
	}
	public void setZipCode(String zipCode) {
		this.zipCode = zipCode;
	}
	public Address() {
	}
	public Address(String add, String zipCode) {
		this.add = add;
		this.zipCode = zipCode;
	}
	@Override
	public String toString() {
		return "Address [add=" + add + ", zipCode=" + zipCode + "]";
	}
}
xml有关的实体类:
package com.minaqq.domain;

public class PhoneNumber {
	private int code;
	private int number;
	public int getCode() {
		return code;
	}
	public void setCode(int code) {
		this.code = code;
	}
	public int getNumber() {
		return number;
	}
	public void setNumber(int number) {
		this.number = number;
	}
	public PhoneNumber(){}
	public PhoneNumber(int code,int number){
		this.code=code;
		this.number=number;
	}
	@Override
	public String toString() {
		return "PhoneNumber [code=" + code + ", number=" + number + "]";
	}
}
xml有关的实体类:
package com.minaqq.domain;

public class House {
	private String size;
	private String price;
	public String getSize() {
		return size;
	}
	public void setSize(String size) {
		this.size = size;
	}
	public String getPrice() {
		return price;
	}
	public void setPrice(String price) {
		this.price = price;
	}
	public House() {
	}
	public House(String size, String price) {
		this.size = size;
		this.price = price;
	}
	@Override
	public String toString() {
		return "House [price=" + price + ", size=" + size + "]";
	}
}

运行效果图,服务图片:

bubuko.com,布布扣

客户端图片:bubuko.com,布布扣

OK,到此结束了,欢迎大家关注我的个人博客。

学习资料已经源码下载请点击:http://download.csdn.net/download/xmt1139057136/7487611

如有不懂,请大家加入qq群:135430763共同学习!
















mina的编码和解码以及断包的处理,发送自定义协议,仿qq聊天,发送xml或json和,布布扣,bubuko.com

mina的编码和解码以及断包的处理,发送自定义协议,仿qq聊天,发送xml或json和

标签:mina协议开发   mina仿qq   mina通讯心跳   mina发xml消息   mina发json消息   

原文地址:http://blog.csdn.net/xmtblog/article/details/30272785

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!