一,客户端代码
1 package bhz.netty.heartBeat; 2 3 import io.netty.bootstrap.Bootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.EventLoopGroup; 7 import io.netty.channel.nio.NioEventLoopGroup; 8 import io.netty.channel.socket.SocketChannel; 9 import io.netty.channel.socket.nio.NioSocketChannel; 10 import io.netty.handler.timeout.ReadTimeoutHandler; 11 12 public class Client { 13 14 15 public static void main(String[] args) throws Exception{ 16 17 EventLoopGroup group = new NioEventLoopGroup(); 18 Bootstrap b = new Bootstrap(); 19 b.group(group) 20 .channel(NioSocketChannel.class) 21 .handler(new ChannelInitializer<SocketChannel>() { 22 @Override 23 protected void initChannel(SocketChannel sc) throws Exception { 24 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); 25 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); 26 sc.pipeline().addLast(new ReadTimeoutHandler(30)); 27 sc.pipeline().addLast(new ClienHeartBeattHandler()); 28 } 29 }); 30 31 ChannelFuture cf = b.connect("10.13.82.18", 8888).sync(); 32 33 cf.channel().closeFuture().sync(); 34 group.shutdownGracefully(); 35 } 36 }
二,客户端助手类代码
1 package bhz.netty.heartBeat; 2 3 import java.net.InetAddress; 4 import java.util.HashMap; 5 import java.util.concurrent.Executors; 6 import java.util.concurrent.ScheduledExecutorService; 7 import java.util.concurrent.ScheduledFuture; 8 import java.util.concurrent.TimeUnit; 9 10 import org.hyperic.sigar.CpuPerc; 11 import org.hyperic.sigar.Mem; 12 import org.hyperic.sigar.Sigar; 13 14 import io.netty.channel.ChannelHandlerContext; 15 import io.netty.channel.ChannelInboundHandlerAdapter; 16 import io.netty.util.ReferenceCountUtil; 17 18 19 public class ClienHeartBeattHandler extends ChannelInboundHandlerAdapter { 20 21 private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); 22 23 private ScheduledFuture<?> heartBeat; 24 //主动向服务器发送认证信息 25 private InetAddress addr ; 26 27 private static final String SUCCESS_KEY = "auth_success_key"; 28 29 @Override 30 public void channelActive(ChannelHandlerContext ctx) throws Exception { 31 addr = InetAddress.getLocalHost(); 32 String ip = addr.getHostAddress(); 33 System.err.println("ip:" + ip); 34 String key = "1234"; 35 String auth = ip + "," + key; 36 ctx.writeAndFlush(auth); 37 } 38 39 @Override 40 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 41 try { 42 if(msg instanceof String){ 43 String ret = (String)msg; 44 if(SUCCESS_KEY.equals(ret)){ 45 // 握手成功,主动发送心跳消息 46 this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 8, TimeUnit.SECONDS); 47 System.out.println(msg); 48 } 49 else { 50 System.out.println(msg); 51 } 52 } 53 } finally { 54 ReferenceCountUtil.release(msg); 55 } 56 } 57 58 private class HeartBeatTask implements Runnable { 59 private final ChannelHandlerContext ctx; 60 61 public HeartBeatTask(final ChannelHandlerContext ctx) { 62 this.ctx = ctx; 63 } 64 65 @Override 66 public void run() { 67 try { 68 RequestInfo info = new RequestInfo(); 69 //ip 70 info.setIp(addr.getHostAddress()); 71 Sigar sigar = new Sigar(); 72 //cpu prec 73 CpuPerc cpuPerc = sigar.getCpuPerc(); 74 HashMap<String, Object> cpuPercMap = new HashMap<String, Object>(); 75 cpuPercMap.put("combined", cpuPerc.getCombined()); 76 cpuPercMap.put("user", cpuPerc.getUser()); 77 cpuPercMap.put("sys", cpuPerc.getSys()); 78 cpuPercMap.put("wait", cpuPerc.getWait()); 79 cpuPercMap.put("idle", cpuPerc.getIdle()); 80 // memory 81 Mem mem = sigar.getMem(); 82 HashMap<String, Object> memoryMap = new HashMap<String, Object>(); 83 memoryMap.put("total", mem.getTotal() / 1024L); 84 memoryMap.put("used", mem.getUsed() / 1024L); 85 memoryMap.put("free", mem.getFree() / 1024L); 86 info.setCpuPercMap(cpuPercMap); 87 info.setMemoryMap(memoryMap); 88 ctx.writeAndFlush(info); 89 90 } catch (Exception e) { 91 e.printStackTrace(); 92 } 93 } 94 95 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 96 cause.printStackTrace(); 97 if (heartBeat != null) { 98 heartBeat.cancel(true); 99 heartBeat = null; 100 } 101 ctx.fireExceptionCaught(cause); 102 } 103 104 } 105 }
1,连接成功后会先执行客户端的public void channelActive(ChannelHandlerContext ctx) throws Exception {方法注意这个时候还没有去执行服务器端的代码。
所以先打印出来ip:10.13.82.18这行代码。
然后现在去执行服务器端的代码
3,第40行代码这时候会返回auth_success_key,
4,第46行代码会执行,由于是异步的所以会先走第47行代码,所以客户端会打印出来auth_success_key这个字段
6,这时候服务端返回的会执行第50行代码所以打印出来info received!
7,由于一直处于一个连接的状态而且使用的是心跳连接每过5秒就会连接一次,这时候由于是一直连接的状态,所以就会一直返回info received!
三,封装的实体类对象
1 package bhz.netty.heartBeat; 2 3 import java.io.Serializable; 4 import java.util.HashMap; 5 6 public class RequestInfo implements Serializable { 7 8 private String ip ; 9 private HashMap<String, Object> cpuPercMap ; 10 private HashMap<String, Object> memoryMap; 11 //.. other field 12 13 public String getIp() { 14 return ip; 15 } 16 public void setIp(String ip) { 17 this.ip = ip; 18 } 19 public HashMap<String, Object> getCpuPercMap() { 20 return cpuPercMap; 21 } 22 public void setCpuPercMap(HashMap<String, Object> cpuPercMap) { 23 this.cpuPercMap = cpuPercMap; 24 } 25 public HashMap<String, Object> getMemoryMap() { 26 return memoryMap; 27 } 28 public void setMemoryMap(HashMap<String, Object> memoryMap) { 29 this.memoryMap = memoryMap; 30 } 31 32 33 }
四,编解码工具类
1 package bhz.netty.heartBeat; 2 3 import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; 4 import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; 5 import io.netty.handler.codec.marshalling.MarshallerProvider; 6 import io.netty.handler.codec.marshalling.MarshallingDecoder; 7 import io.netty.handler.codec.marshalling.MarshallingEncoder; 8 import io.netty.handler.codec.marshalling.UnmarshallerProvider; 9 10 import org.jboss.marshalling.MarshallerFactory; 11 import org.jboss.marshalling.Marshalling; 12 import org.jboss.marshalling.MarshallingConfiguration; 13 14 /** 15 * Marshalling工厂 16 * @author(alienware) 17 * @since 2014-12-16 18 */ 19 public final class MarshallingCodeCFactory { 20 21 /** 22 * 创建Jboss Marshalling解码器MarshallingDecoder 23 * @return MarshallingDecoder 24 */ 25 public static MarshallingDecoder buildMarshallingDecoder() { 26 //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。 27 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); 28 //创建了MarshallingConfiguration对象,配置了版本号为5 29 final MarshallingConfiguration configuration = new MarshallingConfiguration(); 30 configuration.setVersion(5); 31 //根据marshallerFactory和configuration创建provider 32 UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); 33 //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度 34 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1); 35 return decoder; 36 } 37 38 /** 39 * 创建Jboss Marshalling编码器MarshallingEncoder 40 * @return MarshallingEncoder 41 */ 42 public static MarshallingEncoder buildMarshallingEncoder() { 43 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); 44 final MarshallingConfiguration configuration = new MarshallingConfiguration(); 45 configuration.setVersion(5); 46 MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); 47 //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组 48 MarshallingEncoder encoder = new MarshallingEncoder(provider); 49 return encoder; 50 } 51 }
五,服务端代码
1 package bhz.netty.heartBeat; 2 3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioServerSocketChannel; 11 import io.netty.handler.logging.LogLevel; 12 import io.netty.handler.logging.LoggingHandler; 13 import io.netty.handler.timeout.ReadTimeoutHandler; 14 15 public class Server { 16 17 public static void main(String[] args) throws Exception{ 18 19 EventLoopGroup pGroup = new NioEventLoopGroup(); 20 EventLoopGroup cGroup = new NioEventLoopGroup(); 21 22 ServerBootstrap b = new ServerBootstrap(); 23 b.group(pGroup, cGroup) 24 .channel(NioServerSocketChannel.class) 25 .option(ChannelOption.SO_BACKLOG, 1024) 26 //设置日志 27 .handler(new LoggingHandler(LogLevel.INFO)) 28 .childHandler(new ChannelInitializer<SocketChannel>() { 29 protected void initChannel(SocketChannel sc) throws Exception { 30 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); 31 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); 32 sc.pipeline().addLast(new ReadTimeoutHandler(30)); 33 sc.pipeline().addLast(new ServerHeartBeatHandler()); 34 } 35 }); 36 37 ChannelFuture cf = b.bind(8888).sync(); 38 39 cf.channel().closeFuture().sync(); 40 pGroup.shutdownGracefully(); 41 cGroup.shutdownGracefully(); 42 43 } 44 }
,
六,服务端助手类代码
1 package bhz.netty.heartBeat; 2 3 import java.util.HashMap; 4 5 import io.netty.channel.ChannelFutureListener; 6 import io.netty.channel.ChannelHandlerContext; 7 import io.netty.channel.ChannelInboundHandlerAdapter; 8 9 public class ServerHeartBeatHandler extends ChannelInboundHandlerAdapter { 10 11 /** key:ip value:auth */ 12 private static HashMap<String, String> AUTH_IP_MAP = new HashMap<String, String>(); 13 private static final String SUCCESS_KEY = "auth_success_key"; 14 15 static { 16 AUTH_IP_MAP.put("10.13.82.18", "1234"); 17 } 18 19 private boolean auth(ChannelHandlerContext ctx, Object msg){ 20 //System.out.println(msg); 21 String [] ret = ((String) msg).split(","); 22 String auth = AUTH_IP_MAP.get(ret[0]); 23 if(auth != null && auth.equals(ret[1])){ 24 ctx.writeAndFlush(SUCCESS_KEY); 25 return true; 26 } else { 27 ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE); 28 return false; 29 } 30 } 31 32 @Override 33 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 34 if(msg instanceof String){ 35 auth(ctx, msg); 36 } else if (msg instanceof RequestInfo) { 37 38 RequestInfo info = (RequestInfo) msg; 39 System.out.println("--------------------------------------------"); 40 System.out.println("当前主机ip为: " + info.getIp()); 41 System.out.println("当前主机cpu情况: "); 42 HashMap<String, Object> cpu = info.getCpuPercMap(); 43 System.out.println("总使用率: " + cpu.get("combined")); 44 System.out.println("用户使用率: " + cpu.get("user")); 45 System.out.println("系统使用率: " + cpu.get("sys")); 46 System.out.println("等待率: " + cpu.get("wait")); 47 System.out.println("空闲率: " + cpu.get("idle")); 48 49 System.out.println("当前主机memory情况: "); 50 HashMap<String, Object> memory = info.getMemoryMap(); 51 System.out.println("内存总量: " + memory.get("total")); 52 System.out.println("当前内存使用量: " + memory.get("used")); 53 System.out.println("当前内存剩余量: " + memory.get("free")); 54 System.out.println("--------------------------------------------"); 55 56 ctx.writeAndFlush("info received!"); 57 } else { 58 ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE); 59 } 60 } 61 62 63 }
2,第35行代码执行,这时候会返回auth_success_key,所以客户端会打印出来auth_success_key这个字段
5,由46行代码触发的回调函数,会触发38-55行的代码,这时候会返回客户端
七,结果
客户端结果
1 ip:10.13.82.18 2 auth_success_key 3 info received! 4 info received! 5 info received! 6 info received! 7 info received! 8 info received!
服务端结果
1 当前主机ip为: 10.13.82.18 2 当前主机cpu情况: 3 总使用率: 0.15106815869786366 4 用户使用率: 0.1185147507629705 5 系统使用率: 0.03255340793489318 6 等待率: 0.0 7 空闲率: 0.8489318413021363 8 当前主机memory情况: 9 内存总量: 3865836 10 当前内存使用量: 3537528 11 当前内存剩余量: 328308 12 -------------------------------------------- 13 -------------------------------------------- 14 当前主机ip为: 10.13.82.18 15 当前主机cpu情况: 16 总使用率: 0.15106815869786366 17 用户使用率: 0.1190233977619532 18 系统使用率: 0.03204476093591048 19 等待率: 0.0 20 空闲率: 0.8413021363173957 21 当前主机memory情况: 22 内存总量: 3865836 23 当前内存使用量: 3518000 24 当前内存剩余量: 347836 25 -------------------------------------------- 26 -------------------------------------------- 27 当前主机ip为: 10.13.82.18 28 当前主机cpu情况: 29 总使用率: 0.16954164613109907 30 用户使用率: 0.16165598817151305 31 系统使用率: 0.007885657959586003 32 等待率: 0.0 33 空闲率: 0.830458353868901 34 当前主机memory情况: 35 内存总量: 3865836 36 当前内存使用量: 3521124 37 当前内存剩余量: 344712 38 -------------------------------------------- 39 -------------------------------------------- 40 当前主机ip为: 10.13.82.18 41 当前主机cpu情况: 42 总使用率: 0.163265306122449 43 用户使用率: 0.14095870906502136 44 系统使用率: 0.022306597057427623 45 等待率: 0.0 46 空闲率: 0.8367346938775511 47 当前主机memory情况: 48 内存总量: 3865836 49 当前内存使用量: 3495956 50 当前内存剩余量: 369880 51 -------------------------------------------- 52 -------------------------------------------- 53 当前主机ip为: 10.13.82.18 54 当前主机cpu情况: 55 总使用率: 0.16366366366366367 56 用户使用率: 0.14814814814814814 57 系统使用率: 0.015515515515515516 58 等待率: 0.0 59 空闲率: 0.8363363363363363 60 当前主机memory情况: 61 内存总量: 3865836 62 当前内存使用量: 3493664 63 当前内存剩余量: 372172 64 -------------------------------------------- 65 -------------------------------------------- 66 当前主机ip为: 10.13.82.18 67 当前主机cpu情况: 68 总使用率: 0.17497456765005087 69 用户使用率: 0.1515768056968464 70 系统使用率: 0.023397761953204477 71 等待率: 0.0 72 空闲率: 0.8250254323499492 73 当前主机memory情况: 74 内存总量: 3865836 75 当前内存使用量: 3489476 76 当前内存剩余量: 376360 77 -------------------------------------------- 78 -------------------------------------------- 79 当前主机ip为: 10.13.82.18 80 当前主机cpu情况: 81 总使用率: 0.14637752587481517 82 用户使用率: 0.10004928536224741 83 系统使用率: 0.046328240512567766 84 等待率: 0.0 85 空闲率: 0.8457368161655988 86 当前主机memory情况: 87 内存总量: 3865836 88 当前内存使用量: 3472776 89 当前内存剩余量: 393060 90 -------------------------------------------- 91 -------------------------------------------- 92 当前主机ip为: 10.13.82.18 93 当前主机cpu情况: 94 总使用率: 0.22200488997555012 95 用户使用率: 0.1374083129584352 96 系统使用率: 0.08459657701711491 97 等待率: 0.0 98 空闲率: 0.7779951100244499 99 当前主机memory情况: 100 内存总量: 3865836 101 当前内存使用量: 3467608 102 当前内存剩余量: 398228 103 -------------------------------------------- 104 -------------------------------------------- 105 当前主机ip为: 10.13.82.18 106 当前主机cpu情况: 107 总使用率: 0.1416267942583732 108 用户使用率: 0.13444976076555024 109 系统使用率: 0.007177033492822967 110 等待率: 0.0 111 空闲率: 0.8583732057416268 112 当前主机memory情况: 113 内存总量: 3865836 114 当前内存使用量: 3470100 115 当前内存剩余量: 395736
注意红字的顺序及解释