标签:现在 写入 支持 数据读取 trace from big break 协助
package com.cn.codc; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.frame.FrameDecoder; import com.cn.constant.ConstantValue; import com.cn.model.Request; /** * 请求解码器 * <pre> * 数据包格式 * +——----——+——-----——+——----——+——----——+——-----——+ * | 包头 | 模块号 | 命令号 | 长度 | 数据 | * +——----——+——-----——+——----——+——----——+——-----——+ * </pre> * 包头4字节 * 模块号2字节short * 命令号2字节short * 长度4字节(描述数据部分字节长度) */ public class RequestDecoder extends FrameDecoder{// FrameDecoder 这个decoder可以协助我们解决粘包分包问题 /** * 数据包基本长度 */ public static int BASE_LENTH = 4 + 2 + 2 + 4; //ChannelBuffer里面有一个读指针和写指针。读指针和写指针初始值是0,写多少数据写指针就移动多少 //调用readShort方法,readInt方法就会移动读指针, 0 =< readerIndex =< writerIndex @Override protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer) throws Exception { //可读长度readableBytes必须大于基本长度才处理 if(buffer.readableBytes() >= BASE_LENTH){ //防止socket字节流攻击 if(buffer.readableBytes() > 2048){ buffer.skipBytes(buffer.readableBytes()); } //记录包头开始的index int beginReader; while(true){//循环读取,直到包头读取完毕 beginReader = buffer.readerIndex();//获取读指针 buffer.markReaderIndex(); if(buffer.readInt() == ConstantValue.FLAG){ break; } //未读到包头,略过一个字节 buffer.resetReaderIndex(); buffer.readByte(); //长度又变得不满足 if(buffer.readableBytes() < BASE_LENTH){ return null; } } //包头读取完毕,读取模块号 short module = buffer.readShort(); //读取命令号 short cmd = buffer.readShort(); //读取长度 int length = buffer.readInt(); //readableBytes现在可读的长度小于数据的长度。判断请求数据包数据部分是否到齐 if(buffer.readableBytes() < length){ //还原读指针,已经读取了12个字节,但是没用,所以要还原buffer的读指针, buffer.readerIndex(beginReader); return null;//等待后面的数据包来 } //比length要长,就读取data数据 byte[] data = new byte[length]; buffer.readBytes(data);//数据读取完毕 //封装request对象继续向下传递 Request request = new Request(); request.setModule(module); request.setCmd(cmd); request.setData(data); //继续往下传递 ,调用sendUpStreamEvent方法向下传递 return request; } //长度短了,数据包不完整,需要等待后面的包来 return null; //FrameDecoder: return null就是等待后面的包,return一个解码的对象就是向下传递。 } }
package com.cn.codc; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; import com.cn.constant.ConstantValue; import com.cn.model.Request; /** * 请求编码器 * <pre> * 数据包格式 * +——----——+——-----——+——----——+——----——+——-----——+ * | 包头 | 模块号 | 命令号 | 长度 | 数据 | * +——----——+——-----——+——----——+——----——+——-----——+ * </pre> * 包头4字节 * 模块号2字节short * 命令号2字节short * 长度4字节(描述数据部分字节长度) */ public class RequestEncoder extends OneToOneEncoder{ //把一个request对象转换成了一个ChannelBuffer二进制数据 @Override protected Object encode(ChannelHandlerContext context, Channel channel, Object rs) throws Exception { Request request = (Request)(rs); ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(); //包头,确定数据包的开始 buffer.writeInt(ConstantValue.FLAG); //module buffer.writeShort(request.getModule()); //cmd buffer.writeShort(request.getCmd()); //长度 buffer.writeInt(request.getDataLength()); //data if(request.getData() != null){ buffer.writeBytes(request.getData()); } return buffer;//返回一个ChannelBuffer继续向下传递。 } }
package com.cn.codc; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.frame.FrameDecoder; import com.cn.constant.ConstantValue; import com.cn.model.Response; /** * response解码器 * <pre> * 数据包格式 * +——----——+——-----——+——----——+——----——+——-----——+——-----——+ * | 包头 | 模块号 | 命令号 | 状态码 | 长度 | 数据 | * +——----——+——-----——+——----——+——----——+——-----——+——-----——+ * </pre> * 包头4字节 * 模块号2字节short * 命令号2字节short * 长度4字节(描述数据部分字节长度) */ public class ResponseDecoder extends FrameDecoder{ /** * 数据包基本长度 */ public static int BASE_LENTH = 4 + 2 + 2 + 4; @Override protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer) throws Exception { //可读长度必须大于基本长度 if(buffer.readableBytes() >= BASE_LENTH){ //记录包头开始的index int beginReader = buffer.readerIndex(); while(true){ if(buffer.readInt() == ConstantValue.FLAG){ break; } } //模块号 short module = buffer.readShort(); //命令号 short cmd = buffer.readShort(); //状态码 int stateCode = buffer.readInt(); //长度 int length = buffer.readInt(); if(buffer.readableBytes() < length){ //还原读指针 buffer.readerIndex(beginReader); return null; } byte[] data = new byte[length]; buffer.readBytes(data); //封装Response对象 Response response = new Response(); response.setModule(module); response.setCmd(cmd); response.setStateCode(stateCode); response.setData(data); //继续往下传递 return response; } //数据包不完整,需要等待后面的包来 return null; } }
package com.cn.codc; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; import com.cn.constant.ConstantValue; import com.cn.model.Response; /** * 请求编码器 * <pre> * 数据包格式 * +——----——+——-----——+——----——+——----——+——-----——+——-----——+ * | 包头 | 模块号 | 命令号 | 状态码 | 长度 | 数据 | * +——----——+——-----——+——----——+——----——+——-----——+——-----——+ * </pre> * 包头4字节 * 模块号2字节short * 命令号2字节short * 长度4字节(描述数据部分字节长度) */ public class ResponseEncoder extends OneToOneEncoder{ @Override protected Object encode(ChannelHandlerContext context, Channel channel, Object rs) throws Exception { Response response = (Response)(rs); ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(); //包头 buffer.writeInt(ConstantValue.FLAG); //module buffer.writeShort(response.getModule()); //cmd buffer.writeShort(response.getCmd()); //状态码 buffer.writeInt(response.getStateCode()); //长度 buffer.writeInt(response.getDataLength()); //data if(response.getData() != null){ buffer.writeBytes(response.getData()); } return buffer; } }
package com.cn.constant; public interface ConstantValue { /** * 包头 */ public static final int FLAG = -32523523; }
package com.cn.model; /** * 客户端请求服务端的对象 */ public class Request { /** * 请求模块 */ private short module; /** * 命令号 */ private short cmd; /** * 数据部分 */ private byte[] data; public short getModule() { return module; } public void setModule(short module) { this.module = module; } public short getCmd() { return cmd; } public void setCmd(short cmd) { this.cmd = cmd; } public byte[] getData() { return data; } public void setData(byte[] data) { this.data = data; } public int getDataLength(){ if(data == null){ return 0; } return data.length; } }
package com.cn.model; /** * 服务端返回给客户端的对象 */ public class Response { /** * 请求模块 */ private short module; /** * 命令号 */ private short cmd; /** * 状态码 */ private int stateCode; /** * 数据部分 */ private byte[] data; public short getModule() { return module; } public void setModule(short module) { this.module = module; } public short getCmd() { return cmd; } public void setCmd(short cmd) { this.cmd = cmd; } public int getStateCode() { return stateCode; } public void setStateCode(int stateCode) { this.stateCode = stateCode; } public byte[] getData() { return data; } public void setData(byte[] data) { this.data = data; } public int getDataLength(){ if(data == null){ return 0; } return data.length; } }
package com.cn.model; public interface StateCode { /** * 成功 */ public static int SUCCESS = 0; /** * 失败 */ public static int FAIL = 1; }
package com.cn.module.fuben.request; import com.cn.serial.Serializer; //FightRequest是模块名 public class FightRequest extends Serializer{ /** * 副本id */ private int fubenId; /** * 次数 */ private int count; public int getFubenId() { return fubenId; } public void setFubenId(int fubenId) { this.fubenId = fubenId; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } @Override protected void read() { this.fubenId = readInt(); this.count = readInt(); } @Override protected void write() { writeInt(fubenId); writeInt(count); } }
package com.cn.module.fuben.response; import com.cn.serial.Serializer; public class FightResponse extends Serializer{ /** * 获取金币 */ private int gold; public int getGold() { return gold; } public void setGold(int gold) { this.gold = gold; } @Override protected void read() { this.gold = readInt(); } @Override protected void write() { writeInt(gold); } }
package com.cn.serial; import java.nio.ByteOrder; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; /** * buff工厂 */ public class BufferFactory { public static ByteOrder BYTE_ORDER = ByteOrder.BIG_ENDIAN; /** * 获取一个buffer */ public static ChannelBuffer getBuffer() { ChannelBuffer dynamicBuffer = ChannelBuffers.dynamicBuffer(); return dynamicBuffer; } /** * 将数据写入buffer */ public static ChannelBuffer getBuffer(byte[] bytes) { ChannelBuffer copiedBuffer = ChannelBuffers.copiedBuffer(bytes); return copiedBuffer; } }
package com.cn.serial; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.jboss.netty.buffer.ChannelBuffer; /** * 自定义序列化接口 */ public abstract class Serializer { public static final Charset CHARSET = Charset.forName("UTF-8"); protected ChannelBuffer writeBuffer; protected ChannelBuffer readBuffer; /** * 反序列化具体实现 */ protected abstract void read(); /** * 序列化具体实现 */ protected abstract void write(); /** * 从byte数组获取数据 * @param bytes 读取的数组 */ public Serializer readFromBytes(byte[] bytes) { readBuffer = BufferFactory.getBuffer(bytes); read(); readBuffer.clear(); return this; } /** * 从buff获取数据 * @param readBuffer */ public void readFromBuffer(ChannelBuffer readBuffer) { this.readBuffer = readBuffer; read(); } /** * 写入本地buff * @return */ public ChannelBuffer writeToLocalBuff(){ writeBuffer = BufferFactory.getBuffer(); write(); return writeBuffer; } /** * 写入目标buff * @param buffer * @return */ public ChannelBuffer writeToTargetBuff(ChannelBuffer buffer){ writeBuffer = buffer; write(); return writeBuffer; } /** * 返回buffer数组 * * @return */ public byte[] getBytes() { writeToLocalBuff(); byte[] bytes = null; if (writeBuffer.writerIndex() == 0) { bytes = new byte[0]; } else { bytes = new byte[writeBuffer.writerIndex()]; writeBuffer.readBytes(bytes); } writeBuffer.clear(); return bytes; } public byte readByte() { return readBuffer.readByte(); } public short readShort() { return readBuffer.readShort(); } public int readInt() { return readBuffer.readInt(); } public long readLong() { return readBuffer.readLong(); } public float readFloat() { return readBuffer.readFloat(); } public double readDouble() { return readBuffer.readDouble(); } public String readString() { int size = readBuffer.readShort(); if (size <= 0) { return ""; } byte[] bytes = new byte[size]; readBuffer.readBytes(bytes); return new String(bytes, CHARSET); } public <T> List<T> readList(Class<T> clz) { List<T> list = new ArrayList<>(); int size = readBuffer.readShort(); for (int i = 0; i < size; i++) { list.add(read(clz)); } return list; } public <K,V> Map<K,V> readMap(Class<K> keyClz, Class<V> valueClz) { Map<K,V> map = new HashMap<>(); int size = readBuffer.readShort(); for (int i = 0; i < size; i++) { K key = read(keyClz); V value = read(valueClz); map.put(key, value); } return map; } @SuppressWarnings("unchecked") public <I> I read(Class<I> clz) { Object t = null; if ( clz == int.class || clz == Integer.class) { t = this.readInt(); } else if (clz == byte.class || clz == Byte.class){ t = this.readByte(); } else if (clz == short.class || clz == Short.class){ t = this.readShort(); } else if (clz == long.class || clz == Long.class){ t = this.readLong(); } else if (clz == float.class || clz == Float.class){ t = readFloat(); } else if (clz == double.class || clz == Double.class){ t = readDouble(); } else if (clz == String.class ){ t = readString(); } else if (Serializer.class.isAssignableFrom(clz)){ try { byte hasObject = this.readBuffer.readByte(); if(hasObject == 1){ Serializer temp = (Serializer)clz.newInstance(); temp.readFromBuffer(this.readBuffer); t = temp; }else{ t = null; } } catch (Exception e) { e.printStackTrace(); } } else { throw new RuntimeException(String.format("不支持类型:[%s]", clz)); } return (I) t; } public Serializer writeByte(Byte value) { writeBuffer.writeByte(value); return this; } public Serializer writeShort(Short value) { writeBuffer.writeShort(value); return this; } public Serializer writeInt(Integer value) { writeBuffer.writeInt(value); return this; } public Serializer writeLong(Long value) { writeBuffer.writeLong(value); return this; } public Serializer writeFloat(Float value) { writeBuffer.writeFloat(value); return this; } public Serializer writeDouble(Double value) { writeBuffer.writeDouble(value); return this; } public <T> Serializer writeList(List<T> list) { if (isEmpty(list)) { writeBuffer.writeShort((short) 0); return this; } writeBuffer.writeShort((short) list.size()); for (T item : list) { writeObject(item); } return this; } public <K,V> Serializer writeMap(Map<K, V> map) { if (isEmpty(map)) { writeBuffer.writeShort((short) 0); return this; } writeBuffer.writeShort((short) map.size()); for (Entry<K, V> entry : map.entrySet()) { writeObject(entry.getKey()); writeObject(entry.getValue()); } return this; } public Serializer writeString(String value) { if (value == null || value.isEmpty()) { writeShort((short) 0); return this; } byte data[] = value.getBytes(CHARSET); short len = (short) data.length; writeBuffer.writeShort(len); writeBuffer.writeBytes(data); return this; } public Serializer writeObject(Object object) { if(object == null){ writeByte((byte)0); }else{ if (object instanceof Integer) { writeInt((int) object); return this; } if (object instanceof Long) { writeLong((long) object); return this; } if (object instanceof Short) { writeShort((short) object); return this; } if (object instanceof Byte) { writeByte((byte) object); return this; } if (object instanceof String) { String value = (String) object; writeString(value); return this; } if (object instanceof Serializer) { writeByte((byte)1); Serializer value = (Serializer) object; value.writeToTargetBuff(writeBuffer); return this; } throw new RuntimeException("不可序列化的类型:" + object.getClass()); } return this; } private <T> boolean isEmpty(Collection<T> c) { return c == null || c.size() == 0; } public <K,V> boolean isEmpty(Map<K,V> c) { return c == null || c.size() == 0; } }
标签:现在 写入 支持 数据读取 trace from big break 协助
原文地址:https://www.cnblogs.com/yaowen/p/9063053.html