码迷,mamicode.com
首页 > Web开发 > 详细

Netty5 序列化方式(Jboss Marshalling)

时间:2017-09-02 15:40:51      阅读:169      评论:0      收藏:0      [点我收藏+]

标签:java netty 序列化


Netty作为很多高性能的底层通讯工具,被很多开发框架应用再底层,今天来说说常用的序列化工具,用Jboss的Marshalling。


直接上代码,Marshalling的工厂类

package com.netty.serialize.marshalling;

import io.netty.handler.codec.marshalling.*;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

/**
 * Created by sdc on 2017/8/28.
 */
public class MarshallingCodeCFactory {

    /**
     * 解码
     * @return
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
        //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        //创建了MarshallingConfiguration对象,配置了版本号为5
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        //根据marshallerFactory和configuration创建provider
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
        return decoder;
    }

    /**
     * 编码
     * @return
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }

}

这个是Marshalling的序列化方式,Marshalling自带编解码,所以不用担心中途编解码半包的问题。


服务端的Server实现:

package com.netty.serialize.server;

import com.netty.serialize.coder.MsgDecoder;
import com.netty.serialize.coder.MsgEncoder;
import com.netty.serialize.handler.ServerHandler;
import com.netty.serialize.marshalling.MarshallingCodeCFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * Created by sdc on 2017/8/26.
 */
public class MsgServer {

    public void bind(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap sb = new ServerBootstrap();
            sb.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
//                    .childHandler(new ChildChannelHandler())
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                            channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                            channel.pipeline().addLast(new ServerHandler());
                        }
                    })
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture cf = sb.bind(port).sync();
            System.out.println("服务端已启动");

            cf.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static class ChildChannelHandler extends ChannelInitializer {

        protected void initChannel(Channel channel) throws Exception {
            channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
            channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
            channel.pipeline().addLast(new ServerHandler());
        }

    }

    public static void main(String[] args){
        try {
            new MsgServer().bind(8080);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
package com.netty.serialize.handler;

import com.netty.serialize.message.Message;
import com.netty.serialize.message.MsgHeader;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * 用于测试服务端实现的
 * Created by sdc on 2017/8/29.
 */
public class ServerHandler extends ChannelHandlerAdapter{

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
//        System.out.println("active");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
//        ctx.close();
        super.exceptionCaught(ctx, cause);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Message newMsg = (Message)msg;
//        String msgStrClient = (String)msg;
        System.out.println("获取客户端里的内容:" + newMsg);

        Message message = new Message();
        String msgStr = "客户端接受到通知";
        MsgHeader header = new MsgHeader();
        header.setStartTag(new Byte("0"));
        header.setCmdCode("1234".getBytes());
        header.setLength(msgStr.length());
        header.setVersion("11".getBytes());

        message.setBody(msgStr);
        message.setHeader(header);

        ctx.writeAndFlush(message);
    }

}


客户端的实现:

package com.netty.serialize.client;

import com.netty.serialize.coder.MsgDecoder;
import com.netty.serialize.coder.MsgEncoder;
import com.netty.serialize.handler.ClientHandler;
import com.netty.serialize.handler.ServerHandler;
import com.netty.serialize.marshalling.MarshallingCodeCFactory;
import com.netty.serialize.message.Message;
import com.netty.serialize.message.MsgHeader;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * Created by sdc on 2017/8/26.
 */
public class MsgClient {

    public void connect(String ip, int port) throws Exception {
        EventLoopGroup workerGroup = new NioEventLoopGroup();

//        Message message = new Message();
//        String msgStr = "我想发送一条消息";
//        MsgHeader header = new MsgHeader();
//        header.setStartTag(new Byte("0"));
//        header.setCmdCode("1234".getBytes());
//        header.setLength(msgStr.length());
//        header.setVersion("11".getBytes());
//
//        message.setBody(msgStr);
//        message.setHeader(header);
        try {
            Bootstrap bs = new Bootstrap();
            bs.group(workerGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)//
                    .handler(new ChildChannelHandler());

            ChannelFuture f = bs.connect(ip,port).sync();

            //写入消息
//            f.channel().writeAndFlush(message).sync();

            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

    public static class ChildChannelHandler extends ChannelInitializer {
        protected void initChannel(Channel channel) throws Exception {
            channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
            channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
            channel.pipeline().addLast(new ClientHandler());
        }
    }

    public static void main(String[] args){
        try {
            new MsgClient().connect("127.0.0.1", 8080);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
package com.netty.serialize.handler;

import com.netty.serialize.message.Message;
import com.netty.serialize.message.MsgHeader;
import io.netty.channel.*;
import io.netty.util.ReferenceCountUtil;

/**
 * Created by sdc on 2017/8/29.
 */
public class ClientHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Message message = new Message();
        String msgStr = "我想发送一条消息";
        MsgHeader header = new MsgHeader();
        header.setStartTag(new Byte("0"));
        header.setCmdCode("1234".getBytes());
        header.setLength(msgStr.length());
        header.setVersion("11".getBytes());

        message.setBody(msgStr);
        message.setHeader(header);
        ctx.writeAndFlush(message).addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    // do sth
                    System.out.println("成功发送到服务端消息");
                } else {
                    // do sth
                    System.out.println("失败服务端消息失败");
                }
            }
        });
//        ctx.writeAndFlush(message);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//        ctx.close();
        super.exceptionCaught(ctx, cause);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            Message newMsg = (Message) msg;
            System.out.println("收到服务端的内容" + newMsg);
        }finally {
            ReferenceCountUtil.release(msg);
        }
    }

}


传输的POJO的类,是自定义的封装好的信息。

package com.netty.serialize.message;

import java.io.Serializable;

/**
 * Created by sdc on 2017/8/26.
 */
public class Message implements Serializable{

    /**
     *
     */
    private static final long serialVersionUID = 4923081103118853877L;

    private MsgHeader header;

    private Object body;

    //检验和
//    private byte crcCode;

//    public byte getCrcCode() {
//        return crcCode;
//    }
//
//    public void setCrcCode(byte crcCode) {
//        this.crcCode = crcCode;
//    }

    public MsgHeader getHeader() {
        return header;
    }

    public void setHeader(MsgHeader header) {
        this.header = header;
    }

    public Object getBody() {
        return body;
    }

    public void setBody(Object body) {
        this.body = body;
    }

    @Override
    public String toString() {
        return "Message{" +
                "header=" + header +
                ", body=" + body +
//                ", crcCode=" + crcCode +
                ‘}‘;
    }
}
package com.netty.serialize.message;

import java.io.Serializable;
import java.util.Arrays;

/**
 * Created by sdc on 2017/8/26.
 */
public class MsgHeader implements Serializable{

    /**
     *
     */
    private static final long serialVersionUID = 4923081103118853877L;

    //固定头
    private byte startTag;

    //命令码,4位
    private byte[] cmdCode;

    //版本 2位
    private byte[] version;

    private int length;

    public byte[] getVersion() {
        return version;
    }

    public void setVersion(byte[] version) {
        this.version = version;
    }

    public byte[] getCmdCode() {
        return cmdCode;
    }

    public void setCmdCode(byte[] cmdCode) {
        this.cmdCode = cmdCode;
    }

    public byte getStartTag() {
        return startTag;
    }

    public void setStartTag(byte startTag) {
        this.startTag = startTag;
    }

    public int getLength() {
        return length;
    }

    public void setLength(int length) {
        this.length = length;
    }

    @Override
    public String toString() {
        return "MsgHeader{" +
                "startTag=" + startTag +
                ", cmdCode=" + Arrays.toString(cmdCode) +
                ", version=" + Arrays.toString(version) +
                ", length=" + length +
                ‘}‘;
    }
}


到此就完事了,netty的版本,和marshalling的版本,其他的版本我不清楚会不会有什么错误。

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.11</version>
    <scope>test</scope>
</dependency>

<!--netty -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha2</version>
</dependency>

<!--jboss-marshalling -->
<dependency>
    <groupId>org.jboss.marshalling</groupId>
    <artifactId>jboss-marshalling-serial</artifactId>
    <version>2.0.0.Beta2</version>
</dependency>



Netty5 序列化方式(Jboss Marshalling)

标签:java netty 序列化

原文地址:http://shangdc.blog.51cto.com/10093778/1962121

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!