码迷,mamicode.com
首页 > Web开发 > 详细

netty8---自定义编码解码器

时间:2018-05-20 14:12:46      阅读:251      评论:0      收藏:0      [点我收藏+]

标签:现在   写入   支持   数据读取   trace   from   big   break   协助   

技术分享图片

package com.cn.codc;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;

import com.cn.constant.ConstantValue;
import com.cn.model.Request;

/**
 * 请求解码器
 * <pre>
 * 数据包格式
 * +——----——+——-----——+——----——+——----——+——-----——+
 * | 包头          | 模块号        | 命令号      |  长度        |   数据       |
 * +——----——+——-----——+——----——+——----——+——-----——+
 * </pre>
 * 包头4字节
 * 模块号2字节short
 * 命令号2字节short
 * 长度4字节(描述数据部分字节长度)
 */
public class RequestDecoder extends FrameDecoder{// FrameDecoder 这个decoder可以协助我们解决粘包分包问题
    
    /**
     * 数据包基本长度
     */
    public static int BASE_LENTH = 4 + 2 + 2 + 4;

    //ChannelBuffer里面有一个读指针和写指针。读指针和写指针初始值是0,写多少数据写指针就移动多少
    //调用readShort方法,readInt方法就会移动读指针, 0 =< readerIndex =< writerIndex
    @Override
    protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer) throws Exception {
        
        //可读长度readableBytes必须大于基本长度才处理
        if(buffer.readableBytes() >= BASE_LENTH){
            //防止socket字节流攻击
            if(buffer.readableBytes() > 2048){
                buffer.skipBytes(buffer.readableBytes());
            }
            
            //记录包头开始的index
            int beginReader;
            
            while(true){//循环读取,直到包头读取完毕
                beginReader = buffer.readerIndex();//获取读指针
                buffer.markReaderIndex();
                if(buffer.readInt() == ConstantValue.FLAG){
                    break;
                }
                
                //未读到包头,略过一个字节
                buffer.resetReaderIndex();
                buffer.readByte();
                
                //长度又变得不满足
                if(buffer.readableBytes() < BASE_LENTH){
                    return null;
                }
            }
            
            //包头读取完毕,读取模块号
            short module = buffer.readShort();
            //读取命令号
            short cmd = buffer.readShort();
            //读取长度
            int length = buffer.readInt();
            
            //readableBytes现在可读的长度小于数据的长度。判断请求数据包数据部分是否到齐
            if(buffer.readableBytes() < length){
                //还原读指针,已经读取了12个字节,但是没用,所以要还原buffer的读指针,
                buffer.readerIndex(beginReader);
                return null;//等待后面的数据包来
            }
            
            //比length要长,就读取data数据
            byte[] data = new byte[length];
            buffer.readBytes(data);//数据读取完毕
            
            //封装request对象继续向下传递
            Request request = new Request();
            request.setModule(module);
            request.setCmd(cmd);
            request.setData(data);
            
            //继续往下传递 ,调用sendUpStreamEvent方法向下传递
            return request;
            
        }
        //长度短了,数据包不完整,需要等待后面的包来
        return null;
        //FrameDecoder: return null就是等待后面的包,return一个解码的对象就是向下传递。
    }

}
package com.cn.codc;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;

import com.cn.constant.ConstantValue;
import com.cn.model.Request;

/**
 * 请求编码器
 * <pre>
 * 数据包格式
 * +——----——+——-----——+——----——+——----——+——-----——+
 * | 包头          | 模块号        | 命令号      |  长度        |   数据       |
 * +——----——+——-----——+——----——+——----——+——-----——+
 * </pre>
 * 包头4字节
 * 模块号2字节short
 * 命令号2字节short
 * 长度4字节(描述数据部分字节长度)
 */
public class RequestEncoder extends OneToOneEncoder{

    //把一个request对象转换成了一个ChannelBuffer二进制数据
    @Override
    protected Object encode(ChannelHandlerContext context, Channel channel, Object rs) throws Exception {
        Request request = (Request)(rs);
        ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
        //包头,确定数据包的开始
        buffer.writeInt(ConstantValue.FLAG);
        //module
        buffer.writeShort(request.getModule());
        //cmd
        buffer.writeShort(request.getCmd());
        //长度
        buffer.writeInt(request.getDataLength());
        //data
        if(request.getData() != null){
            buffer.writeBytes(request.getData());
        }
        return buffer;//返回一个ChannelBuffer继续向下传递。
    }

}
package com.cn.codc;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
import com.cn.constant.ConstantValue;
import com.cn.model.Response;

/**
 * response解码器
 * <pre>
 * 数据包格式
 * +——----——+——-----——+——----——+——----——+——-----——+——-----——+
 * | 包头          | 模块号        | 命令号       |  状态码    |  长度          |   数据       |
 * +——----——+——-----——+——----——+——----——+——-----——+——-----——+
 * </pre>
 * 包头4字节
 * 模块号2字节short
 * 命令号2字节short
 * 长度4字节(描述数据部分字节长度)
 */
public class ResponseDecoder extends FrameDecoder{
    
    /**
     * 数据包基本长度
     */
    public static int BASE_LENTH = 4 + 2 + 2 + 4;

    @Override
    protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer) throws Exception {
        
        //可读长度必须大于基本长度
        if(buffer.readableBytes() >= BASE_LENTH){
            
            //记录包头开始的index
            int beginReader = buffer.readerIndex();
            
            while(true){
                if(buffer.readInt() == ConstantValue.FLAG){
                    break;
                }
            }
            
            //模块号
            short module = buffer.readShort();
            //命令号
            short cmd = buffer.readShort();
            //状态码
            int stateCode = buffer.readInt();
            //长度
            int length = buffer.readInt();
            
            if(buffer.readableBytes() < length){
                //还原读指针
                buffer.readerIndex(beginReader);
                return null;
            }
            
            byte[] data = new byte[length];
            buffer.readBytes(data);
            
            //封装Response对象
            Response response = new Response();
            response.setModule(module);
            response.setCmd(cmd);
            response.setStateCode(stateCode);
            response.setData(data);
            
            //继续往下传递 
            return response;
            
        }
        //数据包不完整,需要等待后面的包来
        return null;
    }

}
package com.cn.codc;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
import com.cn.constant.ConstantValue;
import com.cn.model.Response;

/**
 * 请求编码器
 * <pre>
 * 数据包格式
 * +——----——+——-----——+——----——+——----——+——-----——+——-----——+
 * | 包头          | 模块号        | 命令号       |  状态码    |  长度          |   数据       |
 * +——----——+——-----——+——----——+——----——+——-----——+——-----——+
 * </pre>
 * 包头4字节
 * 模块号2字节short
 * 命令号2字节short
 * 长度4字节(描述数据部分字节长度)
 */
public class ResponseEncoder extends OneToOneEncoder{

    @Override
    protected Object encode(ChannelHandlerContext context, Channel channel, Object rs) throws Exception {
        Response response = (Response)(rs);
        
        ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
        //包头
        buffer.writeInt(ConstantValue.FLAG);
        //module
        buffer.writeShort(response.getModule());
        //cmd
        buffer.writeShort(response.getCmd());
        //状态码
        buffer.writeInt(response.getStateCode());
        //长度
        buffer.writeInt(response.getDataLength());
        //data
        if(response.getData() != null){
            buffer.writeBytes(response.getData());
        }
    
        return buffer;
    }

}
package com.cn.constant;

public interface ConstantValue {
    
    /**
     * 包头
     */
    public static final int FLAG = -32523523;

}
package com.cn.model;
/**
 * 客户端请求服务端的对象
 */
public class Request {
    
    /**
     * 请求模块
     */
    private short module;
    
    /**
     * 命令号
     */
    private short cmd;
    
    /**
     * 数据部分
     */
    private byte[] data;

    public short getModule() {
        return module;
    }

    public void setModule(short module) {
        this.module = module;
    }

    public short getCmd() {
        return cmd;
    }

    public void setCmd(short cmd) {
        this.cmd = cmd;
    }

    public byte[] getData() {
        return data;
    }

    public void setData(byte[] data) {
        this.data = data;
    }
    
    
    public int getDataLength(){
        if(data == null){
            return 0;
        }
        return data.length;
    }
}
package com.cn.model;
/**
 * 服务端返回给客户端的对象
 */
public class Response {
    /**
     * 请求模块
     */
    private short module;
    
    /**
     * 命令号
     */
    private short cmd;
    
    /**
     * 状态码
     */
    private int stateCode;
    
    /**
     * 数据部分
     */
    private byte[] data;

    public short getModule() {
        return module;
    }

    public void setModule(short module) {
        this.module = module;
    }

    public short getCmd() {
        return cmd;
    }

    public void setCmd(short cmd) {
        this.cmd = cmd;
    }

    public int getStateCode() {
        return stateCode;
    }

    public void setStateCode(int stateCode) {
        this.stateCode = stateCode;
    }

    public byte[] getData() {
        return data;
    }

    public void setData(byte[] data) {
        this.data = data;
    }
    
    public int getDataLength(){
        if(data == null){
            return 0;
        }
        return data.length;
    }
}
package com.cn.model;

public interface StateCode {
    
    /**
     * 成功
     */
    public static int SUCCESS  = 0;
    
    /**
     * 失败
     */
    public static int FAIL  =  1;

}
package com.cn.module.fuben.request;

import com.cn.serial.Serializer;

//FightRequest是模块名
public class FightRequest extends Serializer{
    
    /**
     * 副本id
     */
    private int fubenId;
    
    /**
     * 次数
     */
    private int count;

    public int getFubenId() {
        return fubenId;
    }

    public void setFubenId(int fubenId) {
        this.fubenId = fubenId;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }

    @Override
    protected void read() {
        this.fubenId = readInt();
        this.count = readInt();
    }

    @Override
    protected void write() {
        writeInt(fubenId);
        writeInt(count);
    }
    
    

}
package com.cn.module.fuben.response;

import com.cn.serial.Serializer;

public class FightResponse extends Serializer{

    /**
     * 获取金币
     */
    private int gold;

    public int getGold() {
        return gold;
    }

    public void setGold(int gold) {
        this.gold = gold;
    }

    @Override
    protected void read() {
        this.gold = readInt();
    }

    @Override
    protected void write() {
        writeInt(gold);
    }
}
package com.cn.serial;


import java.nio.ByteOrder;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
/**
 * buff工厂
 */
public class BufferFactory {
    
    public static ByteOrder BYTE_ORDER = ByteOrder.BIG_ENDIAN;

    /**
     * 获取一个buffer
     */
    public static ChannelBuffer getBuffer() {
        ChannelBuffer dynamicBuffer = ChannelBuffers.dynamicBuffer();
        return dynamicBuffer;
    }

    /**
     * 将数据写入buffer
     */
    public static ChannelBuffer getBuffer(byte[] bytes) {
        ChannelBuffer copiedBuffer = ChannelBuffers.copiedBuffer(bytes);
        return copiedBuffer;
    }

}
package com.cn.serial;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.jboss.netty.buffer.ChannelBuffer;
/**
 * 自定义序列化接口
 */
public abstract class Serializer {
    
    
    public static final Charset CHARSET = Charset.forName("UTF-8");
    
    protected ChannelBuffer writeBuffer;
    
    protected ChannelBuffer readBuffer;
    
    /**
     * 反序列化具体实现
     */
    protected abstract void read();
    
    /**
     * 序列化具体实现
     */
    protected abstract void write();
    
    /**
     * 从byte数组获取数据
     * @param bytes    读取的数组
     */
    public Serializer readFromBytes(byte[] bytes) {
        readBuffer = BufferFactory.getBuffer(bytes);
        read();
        readBuffer.clear();
        return this;
    }
    
    /**
     * 从buff获取数据
     * @param readBuffer
     */
    public void readFromBuffer(ChannelBuffer readBuffer) {
        this.readBuffer = readBuffer;
        read();
    }
    
    /**
     * 写入本地buff
     * @return
     */
    public ChannelBuffer writeToLocalBuff(){
        writeBuffer = BufferFactory.getBuffer();
        write();
        return writeBuffer;
    }
    
    /**
     * 写入目标buff
     * @param buffer
     * @return
     */
    public ChannelBuffer writeToTargetBuff(ChannelBuffer buffer){
        writeBuffer = buffer;
        write();
        return writeBuffer;
    }
    
    /**
     * 返回buffer数组
     * 
     * @return
     */
    public byte[] getBytes() {
        writeToLocalBuff();
        byte[] bytes = null;
        if (writeBuffer.writerIndex() == 0) {
            bytes = new byte[0];
        } else {
            bytes = new byte[writeBuffer.writerIndex()];
            writeBuffer.readBytes(bytes);
        }
        writeBuffer.clear();
        return bytes;
    }

    
    public byte readByte() {
        return readBuffer.readByte();
    }

    public short readShort() {
        return readBuffer.readShort();
    }

    public int readInt() {
        return readBuffer.readInt();
    }

    public long readLong() {
        return readBuffer.readLong();
    }

    public float readFloat() {
        return readBuffer.readFloat();
    }

    public double readDouble() {
        return readBuffer.readDouble();
    }
    
    public String readString() {
        int size = readBuffer.readShort();
        if (size <= 0) {
            return "";
        }

        byte[] bytes = new byte[size];
        readBuffer.readBytes(bytes);

        return new String(bytes, CHARSET);
    }
    
    public <T> List<T> readList(Class<T> clz) {
        List<T> list = new ArrayList<>();
        int size = readBuffer.readShort();
        for (int i = 0; i < size; i++) {
            list.add(read(clz));
        }
        return list;
    }
    
    public <K,V> Map<K,V> readMap(Class<K> keyClz, Class<V> valueClz) {
        Map<K,V> map = new HashMap<>();
        int size = readBuffer.readShort();
        for (int i = 0; i < size; i++) {
            K key = read(keyClz);
            V value = read(valueClz);
            map.put(key, value);    
        }
        return map;
    }
    
    @SuppressWarnings("unchecked")
    public <I> I read(Class<I> clz) {
        Object t = null;
        if ( clz == int.class || clz == Integer.class) {
            t = this.readInt();
        } else if (clz == byte.class || clz == Byte.class){
            t = this.readByte();
        } else if (clz == short.class || clz == Short.class){
            t = this.readShort();
        } else if (clz == long.class || clz == Long.class){
            t = this.readLong();
        } else if (clz == float.class || clz == Float.class){
            t = readFloat();
        } else if (clz == double.class || clz == Double.class){
            t = readDouble();
        } else if (clz == String.class ){
            t = readString();
        } else if (Serializer.class.isAssignableFrom(clz)){
            try {
                byte hasObject = this.readBuffer.readByte();
                if(hasObject == 1){
                    Serializer temp = (Serializer)clz.newInstance();
                    temp.readFromBuffer(this.readBuffer);
                    t = temp;
                }else{
                    t = null;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } 
            
        } else {
            throw new RuntimeException(String.format("不支持类型:[%s]", clz));
        }
        return (I) t;
    }


    public Serializer writeByte(Byte value) {
        writeBuffer.writeByte(value);
        return this;
    }

    public Serializer writeShort(Short value) {
        writeBuffer.writeShort(value);
        return this;
    }

    public Serializer writeInt(Integer value) {
        writeBuffer.writeInt(value);
        return this;
    }

    public Serializer writeLong(Long value) {
        writeBuffer.writeLong(value);
        return this;
    }

    public Serializer writeFloat(Float value) {
        writeBuffer.writeFloat(value);
        return this;
    }

    public Serializer writeDouble(Double value) {
        writeBuffer.writeDouble(value);
        return this;
    }

    public <T> Serializer writeList(List<T> list) {
        if (isEmpty(list)) {
            writeBuffer.writeShort((short) 0);
            return this;
        }
        writeBuffer.writeShort((short) list.size());
        for (T item : list) {
            writeObject(item);
        }
        return this;
    }

    public <K,V> Serializer writeMap(Map<K, V> map) {
        if (isEmpty(map)) {
            writeBuffer.writeShort((short) 0);
            return this;
        }
        writeBuffer.writeShort((short) map.size());
        for (Entry<K, V> entry : map.entrySet()) {
            writeObject(entry.getKey());
            writeObject(entry.getValue());
        }
        return this;
    }

    public Serializer writeString(String value) {
        if (value == null || value.isEmpty()) {
            writeShort((short) 0);
            return this;
        }

        byte data[] = value.getBytes(CHARSET);
        short len = (short) data.length;
        writeBuffer.writeShort(len);
        writeBuffer.writeBytes(data);
        return this;
    }

    public Serializer writeObject(Object object) {
        
        if(object == null){
            writeByte((byte)0);
        }else{
            if (object instanceof Integer) {
                writeInt((int) object);
                return this;
            }

            if (object instanceof Long) {
                writeLong((long) object);
                return this;
            }

            if (object instanceof Short) {
                writeShort((short) object);
                return this;
            }

            if (object instanceof Byte) {
                writeByte((byte) object);
                return this;
            }

            if (object instanceof String) {
                String value = (String) object;
                writeString(value);
                return this;
            }
            if (object instanceof Serializer) {
                writeByte((byte)1);
                Serializer value = (Serializer) object;
                value.writeToTargetBuff(writeBuffer);
                return this;
            }
            
            throw new RuntimeException("不可序列化的类型:" + object.getClass());
        }
        
        return this;
    }

    private <T> boolean isEmpty(Collection<T> c) {
        return c == null || c.size() == 0;
    }
    public <K,V> boolean isEmpty(Map<K,V> c) {
        return c == null || c.size() == 0;
    }
}

 

netty8---自定义编码解码器

标签:现在   写入   支持   数据读取   trace   from   big   break   协助   

原文地址:https://www.cnblogs.com/yaowen/p/9063053.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!