标签:listener listen header 空闲 ken html css hit 取出
就是告诉其它人自己还活着。在简易RPC框架中,采用的是TCP长连接,为了确保长连接有效,就需要客户端与服务端之间有一种通知机制告知对方的存活状态。
在状态空闲的时候定时给服务端发送消息类型为PING消息。
捕获通道空闲状态事件,如果接收客户端PING消息,则发送PONG消息给服务端。如果在一定时间内没有收到客户端的PING消息,则说明客户端已经不在线,此时关闭通道。
由于服务端会因为长时间接收不到服务端的PING消息而关闭通道,这就导致缓存在客户端的连接的可用性发生变化。需要将不可用的从可用列表中转移出去,并对不可用连接进行处理,比如直接丢弃或者是重新连接。
ChannelPipeline与handle的关系。netty中的这些handle和spring mvc中的filter作用是类似的,ChannelPipeline可以理解成handle的容器,里面可以被注册众多处理不同业务功能的事件处理器,比如:
可以利用netty提供的IdleStateHandler来发送PING-PONG消息。这个处理器主要是捕获通道超时事件,主要有三类
客户端捕获读写超时,如果事件触发就给服务端发送PING消息。
服务端只需要捕获读超时即可,当读超时触发后就关闭通道。
为什么在空闲状态才发送心跳消息
在正常客户端与服务端有交互的情况下,说明双方都在正常工作不需要额外的心跳来告知对方的存活。只有双方在一定时间内没有接收到对方的消息时才开始采用心跳消息来探测对方的存活,这也是一种提升效率的做法。
创建AbstractHeartbeatHandler,并继承ChannelInboundHandlerAdapter,服务于客户端与服务端的心跳处理器。在读取方法中判断消息类型:
public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
if(!(msg instanceof RpcMessage)){
channelHandlerContext.fireChannelRead(msg);
return;
}
RpcMessage message=(RpcMessage)msg;
if(null==message||null==message.getMessageHeader()){
channelHandlerContext.fireChannelRead(msg);
return;
}
if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PONG){
logger.info("ClientHeartbeatHandler.channelRead0 ,pong data is:{}",message.getMessageBody());
}
else if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PING){
this.sendPong(channelHandlerContext);
}
else {
channelHandlerContext.fireChannelRead(msg);
}
}
空闲状态事件,可以根据不同的状态做不同的行为处理,定义三个可重写事件供客户端与服务端处理器具体确认处理事件。
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
switch (e.state()) {
case READER_IDLE:
this.handleReaderIdle(ctx);
break;
case WRITER_IDLE:
this.handleWriterIdle(ctx);
break;
case ALL_IDLE:
this.handleAllIdle(ctx);
break;
default:
break;
}
}
}
继承抽象心跳处理器,并重写事件发送PING消息。
public class ClientHeartbeatHandler extends AbstractHeartbeatHandler {
@Override
protected void handleAllIdle(ChannelHandlerContext ctx) {
this.sendPing(ctx);
}
}
继承抽象心跳处理器,并重写事件关闭通道。
public class ServerHeartbeatHandler extends AbstractHeartbeatHandler {
@Override
protected void handleReaderIdle(ChannelHandlerContext ctx) {
logger.info("ServerHeartbeatHandler.handleReaderIdle reader timeout ,close channel");
ctx.close();
}
}
比如5秒内未写入或者读取通道数据就触发超时事件。
.addLast(new IdleStateHandler(0, 0, Constants.ALLIDLE_TIME_SECONDS));
比如10秒未接收到通道消息就触发读超时事件。
.addLast(new IdleStateHandler(Constants.READER_TIME_SECONDS, 0, 0))
正常情况下心跳消息显示如下图所示,消息的内容可以根据自己的情况自行定义。
停止客户端程序,然后服务端读超时事件触发,并关闭通道。
由于上述的服务端心跳处理器,在触发读超时后会关闭通信管道,这导致客户端缓存的连接状态会出现不可用的情况,为了让客户端一直只能取到可用连接就需要对从缓存中获取到的连接做状态判断,如果可用直接返回,如果不可用则将连接从可用列表中删除然后取下一个可用连接。
通过channel的isActive属性可以判断连接是否可用,如果不可以做删除并重新获取的操作。
public RpcClientInvoker getInvoker() {
// ...
int index = loadbalanceService.index(size);
RpcClientInvoker invoker= RpcClientInvokerCache.get(index);
if(invoker.getChannel().isActive()) {
return invoker;
}
else {
RpcClientInvokerCache.removeHandler(invoker);
logger.info("invoker is not active,so remove it and get next one");
return this.getInvoker();
}
}
启动一个每隔5秒执行一次任务的线程,定时取出不可用连接,然后重连,并将不可用连接删除。
这里我处理的重连是直接丢弃原有不可用连接,然后重新创建新连接。
private static final Logger logger = LoggerFactory.getLogger(RpcClientInvokerManager.class);
static {
executorService.schedule(new Runnable() {
@Override
public void run() {
while (true) {
List<RpcClientInvoker> notConnectedHandlers = RpcClientInvokerCache.getNotConnectedHandlers();
if (!CollectionUtils.isEmpty(notConnectedHandlers)) {
for (RpcClientInvoker invoker : notConnectedHandlers) {
RpcClientInvokerManager.getInstance(referenceConfig).connect();
}
RpcClientInvokerCache.clearNotConnectedHandler();
}
}
}
}, Constants.RECONNECT_TIME_SECONDS,TimeUnit.SECONDS);
}
https://github.com/jiangmin168168/jim-framework
文中代码是依赖上述项目的,如果有不明白的可下载源码
本文中的图取自于网格
标签:listener listen header 空闲 ken html css hit 取出
原文地址:http://www.cnblogs.com/ASPNET2008/p/7615973.html