标签:server 网络框架 add dia 原因 java editor 定义 read
大四毕业准研一的项目,项目主要用于接收udp,tcp,dns等数据,进行分析存盘。存盘后用于数据挖掘试着找出有异常行为的僵尸网络主机。底层网络框架使用netty。
netty的简介:
netty的maven:
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
netty的性能测试:
项目测试时用700Mb/s(忘记是MB还是Mb了,应该是Mb)的数据跑了一晚上没有掉包。(之前有掉包的情况出现,原因是由于路由器的最大帧长度为1500字节,而传输的测试数据帧长2000+字节,甚至有4000的,即巨帧)。用1GMb+/s(几乎已经达到测试网线的最大速率)的速度跑过几分钟,没有掉包。性能可以说是非常高了。
客户端是由C++编写,代码这里略。(需要注意的是C++传递的结构体时会进行数据对齐,java按字节读取时补齐的字段会读为0)
接受线程服务器端源码:
1 /* 2 * To change this license header, choose License Headers in Project Properties. 3 * To change this template file, choose Tools | Templates 4 * and open the template in the editor. 5 */ 6 package packetserver; 7 8 9 import commonclasses.RawData;//原始数据 10 import commonclasses.ServerProperties;//配置文件 11 import commonclasses.StatisticData;//统计数据 12 import io.netty.bootstrap.ServerBootstrap; 13 import io.netty.channel.ChannelInitializer; 14 import io.netty.channel.ChannelPipeline; 15 import io.netty.channel.socket.SocketChannel; 16 17 import java.nio.ByteOrder;//java和linux默认的小端大端顺序不一样,需要设置 18 import io.netty.handler.codec.ByteToMessageDecoder; 19 20 import io.netty.buffer.ByteBuf; 21 import io.netty.channel.ChannelFuture; 22 import io.netty.channel.ChannelHandlerContext; 23 import io.netty.channel.ChannelInboundHandlerAdapter; 24 import io.netty.channel.ChannelOption; 25 import io.netty.channel.EventLoopGroup; 26 import io.netty.channel.nio.NioEventLoopGroup; 27 import io.netty.channel.socket.nio.NioServerSocketChannel; 28 import io.netty.handler.codec.MessageToByteEncoder; 29 import java.net.InetAddress; 30 import java.net.NetworkInterface; 31 import java.util.Enumeration; 32 import java.util.List; 33 34 /** 35 * 36 * @author gaoxiang 37 */ 38 public class RawDataNetAgent implements Runnable { 39 40 static int port = ServerProperties.rawDataServerPort; 41 static String hostadd = ServerProperties.recevierServerIP; 42 RawDataServer s1 = new RawDataServer(); 43 44 RawDataAgent fileAgent; 45 private Thread workerThread; 46 47 RawDataNetAgent(RawDataAgent fileAgent) { 48 this.fileAgent = fileAgent; 49 50 workerThread = new Thread(this); 51 workerThread.start(); 52 } 53 54 @Override 55 public void run() { 56 try { 57 s1.bind(); 58 System.out.println("NetAgent ended..."); 59 } catch (Exception e) { 60 // TODO Auto-generated catch block 61 e.printStackTrace(); 62 } 63 } 64 65 class RawDataServerInitializer extends ChannelInitializer<SocketChannel> { 66 67 @Override 68 protected void initChannel(SocketChannel ch) throws Exception { 69 ChannelPipeline pipeline = ch.pipeline(); 70 pipeline.addLast(new RawDataServerDecoder()); 71 // pipeline.addLast(new ServerEncoder()); 72 pipeline.addLast(new RawDataServerHandler()); 73 74 } 75 } 76 77 class RawDataServerEncoder extends MessageToByteEncoder { 78 79 @Override 80 protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { 81 byte[] body = (byte[]) (msg); 82 int dataLength = body.length; 83 out.writeInt(dataLength); 84 out.writeBytes(body); 85 } 86 87 } 88 89 class RawDataServerDecoder extends ByteToMessageDecoder { 90 91 final int socketHeaderLength = 16; 92 final int rawHeaderLength = 74; 93 94 @Override 95 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { 96 if (in.readableBytes() < socketHeaderLength) { 97 return; 98 } 99 in.markWriterIndex(); 100 ByteBuf buf = in.order(ByteOrder.LITTLE_ENDIAN); 101 102 int ssyn = buf.readInt(); 103 if (ssyn != 0xfae9dafc) { 104 in.readerIndex(in.readerIndex() - 3); 105 in.discardReadBytes(); 106 System.out.println("ssyn != 0xfae9dafc"); 107 return; 108 } 109 110 int totallength = buf.readInt(); 111 if (totallength > in.readableBytes() + 8) { 112 in.resetReaderIndex(); 113 return; 114 } 115 out.add(in.readBytes(totallength - socketHeaderLength + 8)); 116 in.discardReadBytes(); 117 118 } 119 120 } 121 122 class RawDataServerHandler extends ChannelInboundHandlerAdapter { 123 124 int num = 0; 125 126 @Override 127 public void channelActive(ChannelHandlerContext ctx) throws Exception { 128 System.out.println("channelActive work"); 129 } 130 131 @Override 132 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 133 cause.printStackTrace(); 134 ctx.close(); 135 } 136 137 public int verseInt(int i) { 138 int j; 139 j = (i >> 24) & 0xff | (i >> 8) & 0xff00 | (i << 8) & 0Xff0000 | (i << 24) & 0xff000000; 140 return j; 141 } 142 143 @Override 144 public void channelRead(ChannelHandlerContext ctx, Object _msg) throws Exception { 145 ByteBuf bytes = ((ByteBuf) _msg).order(ByteOrder.LITTLE_ENDIAN); 146 int type = bytes.readUnsignedShort(); 147 int code = bytes.readUnsignedShort(); 148 long capip = bytes.readUnsignedInt(); 149 bytes.discardReadBytes(); 150 if (type == 1000 && code == 1001) { 151 SoKeyRz msg = new SoKeyRz(bytes); 152 RawData rf = new RawData(); 153 rf.protocolNumber = (byte) msg.tuple5.protocol; 154 rf.sPort = (short) msg.tuple5.source; 155 rf.dPort = (short) msg.tuple5.dest; 156 rf.sIP = verseInt((int) msg.tuple5.saddr); 157 rf.dIP = verseInt((int) msg.tuple5.daddr); 158 rf.timeStamp = msg.t_s * 1000 + msg.t_ms / 1000;// Caution, just for 159 // test!!!!!!! 160 rf.payload = msg.data; 161 /** 162 * have been changed to fit mapdb 163 */ 164 Object[] object = new Object[]{ 165 rf.sIP, rf.dIP, rf.sPort, rf.dPort, rf.protocolNumber, rf.timeStamp, 166 }; 167 AddRawDataToQueue(rf); 168 169 } 170 bytes.release(); 171 } 172 173 public void AddRawDataToQueue(RawData rf) { 174 fileAgent.offer(rf); 175 } 176 } 177 178 class RawDataServer { 179 180 ChannelFuture f; 181 182 public void closeServer() { 183 f.channel().close(); 184 } 185 186 public void bind() throws Exception { 187 EventLoopGroup bossGroup = new NioEventLoopGroup(); 188 EventLoopGroup workerGroup = new NioEventLoopGroup(); 189 try { 190 ServerBootstrap b = new ServerBootstrap(); 191 b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)// .localAddress(inetHost, 192 // inetPort) 193 .childHandler(new RawDataServerInitializer()).option(ChannelOption.SO_BACKLOG, 2048) 194 .childOption(ChannelOption.SO_KEEPALIVE, true); 195 196 f = b.bind(RawDataNetAgent.hostadd, RawDataNetAgent.port).sync(); 197 f.channel().closeFuture().sync(); 198 } finally { 199 bossGroup.shutdownGracefully(); 200 workerGroup.shutdownGracefully(); 201 } 202 } 203 204 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { 205 206 @Override 207 protected void initChannel(SocketChannel arg0) throws Exception { 208 System.out.println("server initChannel.."); 209 arg0.pipeline().addLast(new RawDataServerHandler()); 210 } 211 } 212 213 public String getAddr(String rcvName) { 214 Enumeration<NetworkInterface> netInterfaces = null; 215 try { 216 netInterfaces = NetworkInterface.getNetworkInterfaces(); 217 218 while (netInterfaces.hasMoreElements()) { 219 220 NetworkInterface ni = netInterfaces.nextElement(); 221 222 if (ni.getDisplayName().equals(rcvName)) { 223 Enumeration<InetAddress> ips = ni.getInetAddresses(); 224 ips.nextElement(); 225 return ips.nextElement().getHostAddress(); 226 } 227 } 228 } catch (Exception e) { 229 e.printStackTrace(); 230 } 231 return "localhost"; 232 233 } 234 235 } 236 237 class SoKeyRz {//包的格式定义,包名公司定的 238 239 long soid; 240 long cap_ip; 241 int cap_port; 242 long t_s; 243 long t_ms; 244 byte[] srcmac = new byte[6]; 245 byte[] dstmac = new byte[6]; 246 Tuple5 tuple5 = new Tuple5(); 247 char src_dep[] = new char[16]; 248 long datalen; 249 char reserve[] = new char[8]; 250 byte data[] = null; 251 252 public SoKeyRz(ByteBuf buf) { 253 soid = buf.readUnsignedInt(); 254 cap_ip = buf.readUnsignedInt(); 255 cap_port = buf.readUnsignedShort(); 256 t_s = buf.readUnsignedInt(); 257 t_ms = buf.readUnsignedInt(); 258 ///////////// buf.readCharSequence(); 259 for (int i = 0; i < 6; i++) { 260 srcmac[i] = buf.readByte(); 261 } 262 // buf.readCharSequence(6, srcmac); 263 for (int i = 0; i < 6; i++) { 264 dstmac[i] = buf.readByte(); 265 } 266 tuple5.protocol = buf.readInt(); 267 tuple5.source = buf.readUnsignedShort(); 268 tuple5.dest = buf.readUnsignedShort(); 269 tuple5.saddr = buf.readUnsignedInt(); 270 tuple5.daddr = buf.readUnsignedInt(); 271 for (int i = 0; i < 16; i++) { 272 src_dep[i] = (char) buf.readByte(); 273 } 274 datalen = buf.readUnsignedInt(); 275 for (int i = 0; i < 8; i++) { 276 reserve[i] = (char) buf.readByte(); 277 } 278 data = new byte[(int) datalen]; 279 buf.readBytes(data); 280 } 281 ; 282 283 } 284 285 class Tuple5 {//tcp五元组 286 287 int protocol; 288 int source; 289 int dest; 290 long saddr; 291 long daddr; 292 } 293 }
标签:server 网络框架 add dia 原因 java editor 定义 read
原文地址:http://www.cnblogs.com/theWeirdCode/p/7420721.html