标签:
一、报文队列的处理:
如果将多路复用器获取到的所有事件,阻塞式的同步处理,那恐怕会严重影响selector的性能,所以我把从客户端接收到的大部分消息,都放入了队列中,然后另外启动队列的消费线程对消息进行异步的处理;具体如下:
1.通讯报文队列消费者:在selector对read事件的处理过程中,我在最后都把客户端发送的报文放入了一个叫CQUEUE的队列中,具体定义如下,CQUEUE是所有客户端发送报文的队列,在CQUEUE队列中的消费者线程中,我又对M类报文进行了对垒处理,放入了另一个队列MQUEUE。
public class GVQueue { //通讯级别报文的队列 public static BlockingQueue CQUEUE = new ArrayBlockingQueue<Packet>(3000); //短消息级别报文的队列 public static BlockingQueue MQUEUE = new ArrayBlockingQueue<Packet>(3000); }
2.CQUEUE队列的消费者线程,专门针对通讯层面的消息进行处理,比如:客户端链路维护的回应等;如下:
public class CQueueConsumer extends Thread { private int waitTime; private static Logger logger = LogManager.getLogger(CQueueConsumer.class.getName()); public CQueueConsumer(int waitTime) { this.waitTime = waitTime; } public void run() { logger.info("通讯队列消费者线程启动……"); boolean isRunning = true; try { while (isRunning) { IPacket packet = (IPacket) GVQueue.CQUEUE.poll(1, TimeUnit.SECONDS); if (packet != null) { handleQueue(packet); /* if (logger.isDebugEnabled()) { logger.debug("读出消息队列收到的客户端消息:" + packet.getPacketStr()); }*/ logger.debug("读出消息队列收到的客户端消息:" + packet.getPacketStr()); } else { Thread.sleep(waitTime); if (logger.isDebugEnabled()) { logger.debug("消息队列中没有消息,休息一会儿……"); } } } } catch (InterruptedException e) { logger.info("通讯队列消费者处理线程终止……"); e.printStackTrace(); } } /** * 通讯层处理(对除了M报文之外的报文进行处理) * @param packet */ private void handleQueue(IPacket packet) { //如果是短消息类报文,则直接放入短消息队列等待短消息消费者处理; if (packet.getHeader().equals(MsgPacket.HEADER)){ GVQueue.MQUEUE.offer(packet); } if (!packet.getHeader().equals(ReplyPacket.HEADER)) { //需要更新通道的最后访问时间 GVConnection gvConn = GVConnTools.getConnByToken(packet.getClientToken()); if (gvConn!=null){ //更改最后访问时间 GVConnTools.updLastAccessTime(packet.getClientToken(),CommonTools.systemTimeUtc()); SocketChannel socketChannel = gvConn.getChannel(); //对客户端的报文做出R相应 if (socketChannel != null) { ReplyOrder replyOrder = (ReplyOrder)Config.getOrderInstance(ReplyOrder.HEADER); replyOrder.initReplyOrder(packet.getRid()); GVServer.write2Client(replyOrder, socketChannel); } } } } }
3.而MQUEUE队列的消费者线程,则专门针对M类报文进行处理,它的工作是拿出M报文,找到目标通道,然后将报文内容转入目标通道(目前离线存储尚未实现)。如下:
public class MQueueConsumer extends Thread { private static Logger logger = LogManager.getLogger(MQueueConsumer.class.getName()); public void run() { logger.info("短消息队列消费者线程启动……"); while (true) { try { Packet packet = (Packet) GVQueue.MQUEUE.poll(1, TimeUnit.SECONDS); if (packet != null) { // Logs.info("读出消息队列收到的客户端消息:" + packet.getPacketStr()); MsgInfo msgInfo = new MsgInfo(); msgInfo = msgInfo.generaterMsgInfo(packet.getPacketBody()); SocketChannel channel = GVConnTools.getChannelByUserId(msgInfo.getReceiver()); if(channel!=null && channel.isOpen()) { MsgOrder msgOrder = (MsgOrder) Config.getOrderInstance(MsgOrder.HEADER); msgOrder.initMsgOrder(packet.getPacketBody()); GVServer.write2Client(msgOrder, channel); if (logger.isDebugEnabled()) { logger.debug("短消息发送至:<" + msgInfo.getReceiver() + ">"); } }else{ /* 此处将数据放入离线存储队列 */ if(logger.isDebugEnabled()) { logger.debug("短消息放入离线短消息队列:<" + msgInfo.getReceiver() + ">"); } } } else { Thread.sleep(200); if(logger.isDebugEnabled()) { logger.debug("消息队列中没有消息,休息一会儿……"); } } } catch (InterruptedException e) { logger.info("短消息队列消费者处理线程终止……"); e.printStackTrace(); } } } }
标签:
原文地址:http://my.oschina.net/u/2397619/blog/497328