码迷,mamicode.com
首页 > Web开发 > 详细

<Netty>(十八)(中级篇)心跳连接

时间:2018-03-14 22:39:02      阅读:308      评论:0      收藏:0      [点我收藏+]

标签:alien   static   cep   tip   客户   count   beat   java序列化   receive   

一,客户端代码

 1 package bhz.netty.heartBeat;
 2 
 3 import io.netty.bootstrap.Bootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.EventLoopGroup;
 7 import io.netty.channel.nio.NioEventLoopGroup;
 8 import io.netty.channel.socket.SocketChannel;
 9 import io.netty.channel.socket.nio.NioSocketChannel;
10 import io.netty.handler.timeout.ReadTimeoutHandler;
11 
12 public class Client {
13 
14     
15     public static void main(String[] args) throws Exception{
16         
17         EventLoopGroup group = new NioEventLoopGroup();
18         Bootstrap b = new Bootstrap();
19         b.group(group)
20          .channel(NioSocketChannel.class)
21          .handler(new ChannelInitializer<SocketChannel>() {
22             @Override
23             protected void initChannel(SocketChannel sc) throws Exception {
24                 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
25                 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
26                 sc.pipeline().addLast(new ReadTimeoutHandler(30));
27                 sc.pipeline().addLast(new ClienHeartBeattHandler());
28             }
29         });
30         
31         ChannelFuture cf = b.connect("10.13.82.18", 8888).sync();
32 
33         cf.channel().closeFuture().sync();
34         group.shutdownGracefully();
35     }
36 }

 

二,客户端助手类代码

  1 package bhz.netty.heartBeat;
  2 
  3 import java.net.InetAddress;
  4 import java.util.HashMap;
  5 import java.util.concurrent.Executors;
  6 import java.util.concurrent.ScheduledExecutorService;
  7 import java.util.concurrent.ScheduledFuture;
  8 import java.util.concurrent.TimeUnit;
  9 
 10 import org.hyperic.sigar.CpuPerc;
 11 import org.hyperic.sigar.Mem;
 12 import org.hyperic.sigar.Sigar;
 13 
 14 import io.netty.channel.ChannelHandlerContext;
 15 import io.netty.channel.ChannelInboundHandlerAdapter;
 16 import io.netty.util.ReferenceCountUtil;
 17 
 18 
 19 public class ClienHeartBeattHandler extends ChannelInboundHandlerAdapter {
 20 
 21     private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
 22     
 23     private ScheduledFuture<?> heartBeat;
 24     //主动向服务器发送认证信息
 25     private InetAddress addr ;
 26     
 27     private static final String SUCCESS_KEY = "auth_success_key";
 28 
 29     @Override
 30     public void channelActive(ChannelHandlerContext ctx) throws Exception {
 31         addr = InetAddress.getLocalHost();
 32         String ip = addr.getHostAddress();
 33         System.err.println("ip:" + ip);
 34         String key = "1234";
 35         String auth = ip + "," + key;
 36         ctx.writeAndFlush(auth);
 37     }
 38     
 39     @Override
 40     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 41         try {
 42             if(msg instanceof String){
 43                 String ret = (String)msg;
 44                 if(SUCCESS_KEY.equals(ret)){
 45                     // 握手成功,主动发送心跳消息
 46                     this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 8, TimeUnit.SECONDS);
 47                     System.out.println(msg);                
 48                 }
 49                 else {
 50                     System.out.println(msg);
 51                 }
 52             }
 53         } finally {
 54             ReferenceCountUtil.release(msg);
 55         }
 56     }
 57 
 58     private class HeartBeatTask implements Runnable {
 59         private final ChannelHandlerContext ctx;
 60 
 61         public HeartBeatTask(final ChannelHandlerContext ctx) {
 62             this.ctx = ctx;
 63         }
 64     
 65         @Override
 66         public void run() {
 67             try {
 68                 RequestInfo info = new RequestInfo();
 69                 //ip
 70                 info.setIp(addr.getHostAddress());
 71                 Sigar sigar = new Sigar();
 72                 //cpu prec
 73                 CpuPerc cpuPerc = sigar.getCpuPerc();
 74                 HashMap<String, Object> cpuPercMap = new HashMap<String, Object>();
 75                 cpuPercMap.put("combined", cpuPerc.getCombined());
 76                 cpuPercMap.put("user", cpuPerc.getUser());
 77                 cpuPercMap.put("sys", cpuPerc.getSys());
 78                 cpuPercMap.put("wait", cpuPerc.getWait());
 79                 cpuPercMap.put("idle", cpuPerc.getIdle());
 80                 // memory
 81                 Mem mem = sigar.getMem();
 82                 HashMap<String, Object> memoryMap = new HashMap<String, Object>();
 83                 memoryMap.put("total", mem.getTotal() / 1024L);
 84                 memoryMap.put("used", mem.getUsed() / 1024L);
 85                 memoryMap.put("free", mem.getFree() / 1024L);
 86                 info.setCpuPercMap(cpuPercMap);
 87                 info.setMemoryMap(memoryMap);
 88                 ctx.writeAndFlush(info);
 89                 
 90             } catch (Exception e) {
 91                 e.printStackTrace();
 92             }
 93         }
 94 
 95         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 96             cause.printStackTrace();
 97             if (heartBeat != null) {
 98                 heartBeat.cancel(true);
 99                 heartBeat = null;
100             }
101             ctx.fireExceptionCaught(cause);
102         }
103         
104     }
105 }

 1,连接成功后会先执行客户端的public void channelActive(ChannelHandlerContext ctx) throws Exception {方法注意这个时候还没有去执行服务器端的代码。

所以先打印出来ip:10.13.82.18这行代码。

然后现在去执行服务器端的代码

 

3,第40行代码这时候会返回auth_success_key,

4,第46行代码会执行,由于是异步的所以会先走第47行代码,所以客户端会打印出来auth_success_key这个字段

6,这时候服务端返回的会执行第50行代码所以打印出来info received!

7,由于一直处于一个连接的状态而且使用的是心跳连接每过5秒就会连接一次,这时候由于是一直连接的状态,所以就会一直返回info received!

 

三,封装的实体类对象

 1 package bhz.netty.heartBeat;
 2 
 3 import java.io.Serializable;
 4 import java.util.HashMap;
 5 
 6 public class RequestInfo implements Serializable {
 7 
 8     private String ip ;
 9     private HashMap<String, Object> cpuPercMap ;
10     private HashMap<String, Object> memoryMap;
11     //.. other field
12     
13     public String getIp() {
14         return ip;
15     }
16     public void setIp(String ip) {
17         this.ip = ip;
18     }
19     public HashMap<String, Object> getCpuPercMap() {
20         return cpuPercMap;
21     }
22     public void setCpuPercMap(HashMap<String, Object> cpuPercMap) {
23         this.cpuPercMap = cpuPercMap;
24     }
25     public HashMap<String, Object> getMemoryMap() {
26         return memoryMap;
27     }
28     public void setMemoryMap(HashMap<String, Object> memoryMap) {
29         this.memoryMap = memoryMap;
30     }
31     
32     
33 }

 

四,编解码工具类

 1 package bhz.netty.heartBeat;
 2 
 3 import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
 4 import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
 5 import io.netty.handler.codec.marshalling.MarshallerProvider;
 6 import io.netty.handler.codec.marshalling.MarshallingDecoder;
 7 import io.netty.handler.codec.marshalling.MarshallingEncoder;
 8 import io.netty.handler.codec.marshalling.UnmarshallerProvider;
 9 
10 import org.jboss.marshalling.MarshallerFactory;
11 import org.jboss.marshalling.Marshalling;
12 import org.jboss.marshalling.MarshallingConfiguration;
13 
14 /**
15  * Marshalling工厂
16  * @author(alienware)
17  * @since 2014-12-16
18  */
19 public final class MarshallingCodeCFactory {
20 
21     /**
22      * 创建Jboss Marshalling解码器MarshallingDecoder
23      * @return MarshallingDecoder
24      */
25     public static MarshallingDecoder buildMarshallingDecoder() {
26         //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
27         final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
28         //创建了MarshallingConfiguration对象,配置了版本号为5 
29         final MarshallingConfiguration configuration = new MarshallingConfiguration();
30         configuration.setVersion(5);
31         //根据marshallerFactory和configuration创建provider
32         UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
33         //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
34         MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
35         return decoder;
36     }
37 
38     /**
39      * 创建Jboss Marshalling编码器MarshallingEncoder
40      * @return MarshallingEncoder
41      */
42     public static MarshallingEncoder buildMarshallingEncoder() {
43         final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
44         final MarshallingConfiguration configuration = new MarshallingConfiguration();
45         configuration.setVersion(5);
46         MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
47         //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
48         MarshallingEncoder encoder = new MarshallingEncoder(provider);
49         return encoder;
50     }
51 }

 

五,服务端代码

 1 package bhz.netty.heartBeat;
 2 
 3 import io.netty.bootstrap.ServerBootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioServerSocketChannel;
11 import io.netty.handler.logging.LogLevel;
12 import io.netty.handler.logging.LoggingHandler;
13 import io.netty.handler.timeout.ReadTimeoutHandler;
14 
15 public class Server {
16 
17     public static void main(String[] args) throws Exception{
18         
19         EventLoopGroup pGroup = new NioEventLoopGroup();
20         EventLoopGroup cGroup = new NioEventLoopGroup();
21         
22         ServerBootstrap b = new ServerBootstrap();
23         b.group(pGroup, cGroup)
24          .channel(NioServerSocketChannel.class)
25          .option(ChannelOption.SO_BACKLOG, 1024)
26          //设置日志
27          .handler(new LoggingHandler(LogLevel.INFO))
28          .childHandler(new ChannelInitializer<SocketChannel>() {
29             protected void initChannel(SocketChannel sc) throws Exception {
30                 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
31                 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
32                 sc.pipeline().addLast(new ReadTimeoutHandler(30));
33                 sc.pipeline().addLast(new ServerHeartBeatHandler());
34             }
35         });
36         
37         ChannelFuture cf = b.bind(8888).sync();
38         
39         cf.channel().closeFuture().sync();
40         pGroup.shutdownGracefully();
41         cGroup.shutdownGracefully();
42         
43     }
44 }

,

六,服务端助手类代码

 1 package bhz.netty.heartBeat;
 2 
 3 import java.util.HashMap;
 4 
 5 import io.netty.channel.ChannelFutureListener;
 6 import io.netty.channel.ChannelHandlerContext;
 7 import io.netty.channel.ChannelInboundHandlerAdapter;
 8 
 9 public class ServerHeartBeatHandler extends ChannelInboundHandlerAdapter {
10     
11     /** key:ip value:auth */
12     private static HashMap<String, String> AUTH_IP_MAP = new HashMap<String, String>();
13     private static final String SUCCESS_KEY = "auth_success_key";
14     
15     static {
16         AUTH_IP_MAP.put("10.13.82.18", "1234");
17     }
18     
19     private boolean auth(ChannelHandlerContext ctx, Object msg){
20             //System.out.println(msg);
21             String [] ret = ((String) msg).split(",");
22             String auth = AUTH_IP_MAP.get(ret[0]);
23             if(auth != null && auth.equals(ret[1])){
24                 ctx.writeAndFlush(SUCCESS_KEY);
25                 return true;
26             } else {
27                 ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE);
28                 return false;
29             }
30     }
31     
32     @Override
33     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
34         if(msg instanceof String){
35             auth(ctx, msg);
36         } else if (msg instanceof RequestInfo) {
37             
38             RequestInfo info = (RequestInfo) msg;
39             System.out.println("--------------------------------------------");
40             System.out.println("当前主机ip为: " + info.getIp());
41             System.out.println("当前主机cpu情况: ");
42             HashMap<String, Object> cpu = info.getCpuPercMap();
43             System.out.println("总使用率: " + cpu.get("combined"));
44             System.out.println("用户使用率: " + cpu.get("user"));
45             System.out.println("系统使用率: " + cpu.get("sys"));
46             System.out.println("等待率: " + cpu.get("wait"));
47             System.out.println("空闲率: " + cpu.get("idle"));
48             
49             System.out.println("当前主机memory情况: ");
50             HashMap<String, Object> memory = info.getMemoryMap();
51             System.out.println("内存总量: " + memory.get("total"));
52             System.out.println("当前内存使用量: " + memory.get("used"));
53             System.out.println("当前内存剩余量: " + memory.get("free"));
54             System.out.println("--------------------------------------------");
55             
56             ctx.writeAndFlush("info received!");
57         } else {
58             ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE);
59         }
60     }
61 
62 
63 }

 2,第35行代码执行,这时候会返回auth_success_key,所以客户端会打印出来auth_success_key这个字段

5,由46行代码触发的回调函数,会触发38-55行的代码,这时候会返回客户端

七,结果

客户端结果

1 ip:10.13.82.18
2 auth_success_key
3 info received!
4 info received!
5 info received!
6 info received!
7 info received!
8 info received!

 

服务端结果

  1 当前主机ip为: 10.13.82.18
  2 当前主机cpu情况: 
  3 总使用率: 0.15106815869786366
  4 用户使用率: 0.1185147507629705
  5 系统使用率: 0.03255340793489318
  6 等待率: 0.0
  7 空闲率: 0.8489318413021363
  8 当前主机memory情况: 
  9 内存总量: 3865836
 10 当前内存使用量: 3537528
 11 当前内存剩余量: 328308
 12 --------------------------------------------
 13 --------------------------------------------
 14 当前主机ip为: 10.13.82.18
 15 当前主机cpu情况: 
 16 总使用率: 0.15106815869786366
 17 用户使用率: 0.1190233977619532
 18 系统使用率: 0.03204476093591048
 19 等待率: 0.0
 20 空闲率: 0.8413021363173957
 21 当前主机memory情况: 
 22 内存总量: 3865836
 23 当前内存使用量: 3518000
 24 当前内存剩余量: 347836
 25 --------------------------------------------
 26 --------------------------------------------
 27 当前主机ip为: 10.13.82.18
 28 当前主机cpu情况: 
 29 总使用率: 0.16954164613109907
 30 用户使用率: 0.16165598817151305
 31 系统使用率: 0.007885657959586003
 32 等待率: 0.0
 33 空闲率: 0.830458353868901
 34 当前主机memory情况: 
 35 内存总量: 3865836
 36 当前内存使用量: 3521124
 37 当前内存剩余量: 344712
 38 --------------------------------------------
 39 --------------------------------------------
 40 当前主机ip为: 10.13.82.18
 41 当前主机cpu情况: 
 42 总使用率: 0.163265306122449
 43 用户使用率: 0.14095870906502136
 44 系统使用率: 0.022306597057427623
 45 等待率: 0.0
 46 空闲率: 0.8367346938775511
 47 当前主机memory情况: 
 48 内存总量: 3865836
 49 当前内存使用量: 3495956
 50 当前内存剩余量: 369880
 51 --------------------------------------------
 52 --------------------------------------------
 53 当前主机ip为: 10.13.82.18
 54 当前主机cpu情况: 
 55 总使用率: 0.16366366366366367
 56 用户使用率: 0.14814814814814814
 57 系统使用率: 0.015515515515515516
 58 等待率: 0.0
 59 空闲率: 0.8363363363363363
 60 当前主机memory情况: 
 61 内存总量: 3865836
 62 当前内存使用量: 3493664
 63 当前内存剩余量: 372172
 64 --------------------------------------------
 65 --------------------------------------------
 66 当前主机ip为: 10.13.82.18
 67 当前主机cpu情况: 
 68 总使用率: 0.17497456765005087
 69 用户使用率: 0.1515768056968464
 70 系统使用率: 0.023397761953204477
 71 等待率: 0.0
 72 空闲率: 0.8250254323499492
 73 当前主机memory情况: 
 74 内存总量: 3865836
 75 当前内存使用量: 3489476
 76 当前内存剩余量: 376360
 77 --------------------------------------------
 78 --------------------------------------------
 79 当前主机ip为: 10.13.82.18
 80 当前主机cpu情况: 
 81 总使用率: 0.14637752587481517
 82 用户使用率: 0.10004928536224741
 83 系统使用率: 0.046328240512567766
 84 等待率: 0.0
 85 空闲率: 0.8457368161655988
 86 当前主机memory情况: 
 87 内存总量: 3865836
 88 当前内存使用量: 3472776
 89 当前内存剩余量: 393060
 90 --------------------------------------------
 91 --------------------------------------------
 92 当前主机ip为: 10.13.82.18
 93 当前主机cpu情况: 
 94 总使用率: 0.22200488997555012
 95 用户使用率: 0.1374083129584352
 96 系统使用率: 0.08459657701711491
 97 等待率: 0.0
 98 空闲率: 0.7779951100244499
 99 当前主机memory情况: 
100 内存总量: 3865836
101 当前内存使用量: 3467608
102 当前内存剩余量: 398228
103 --------------------------------------------
104 --------------------------------------------
105 当前主机ip为: 10.13.82.18
106 当前主机cpu情况: 
107 总使用率: 0.1416267942583732
108 用户使用率: 0.13444976076555024
109 系统使用率: 0.007177033492822967
110 等待率: 0.0
111 空闲率: 0.8583732057416268
112 当前主机memory情况: 
113 内存总量: 3865836
114 当前内存使用量: 3470100
115 当前内存剩余量: 395736

 注意红字的顺序及解释

<Netty>(十八)(中级篇)心跳连接

标签:alien   static   cep   tip   客户   count   beat   java序列化   receive   

原文地址:https://www.cnblogs.com/qingruihappy/p/8570524.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!