标签:ISE ali print rup utf-8 trace equal conf png
本次利用Java+netty实现自定义rpc框架,共分为三个工程,公共模块+服务提供者+服务消费者: rpc-common工程 pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.rpc.common</groupId> <artifactId>rpc-common</artifactId> <version>0.0.1-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.16.Final</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.41</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>utf-8</encoding> </configuration> </plugin> </plugins> </build> </project> RpcDecoder.java package com.rpc.decoder; import java.util.List; import com.rpc.util.SerializationUtil; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; /** * * @author linxu * */ public class RpcDecoder extends ByteToMessageDecoder { private Class<?> genericClass; public RpcDecoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < 4) { return; } in.markReaderIndex(); int dataLength = in.readInt(); if (dataLength < 0) { ctx.close(); } if (in.readableBytes() < dataLength) { in.resetReaderIndex(); } byte[] data = new byte[dataLength]; in.readBytes(data); Object obj = SerializationUtil.toClass(genericClass, data); out.add(obj); } } RpcEncoder.java package com.rpc.decoder; import com.rpc.util.SerializationUtil; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; /** * * @author linxu * */ @SuppressWarnings("rawtypes") public class RpcEncoder extends MessageToByteEncoder { private Class<?> genericClass; public RpcEncoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { if (genericClass.isInstance(msg)) { byte[] data = SerializationUtil.toByte(msg); out.writeInt(data.length); out.writeBytes(data); } } } SrpcRequest.java package com.rpc.message; import java.io.Serializable; import java.util.Arrays; /** * * @author linxu * */ public class SrpcRequest implements Serializable{ private static final long serialVersionUID = 6132853628325824727L; // 请求Id private String requestId; // 远程调用接口名称 private String interfaceName; //远程调用方法名称 private String methodName; // 参数类型 private Class<?>[] parameterTypes; // 参数值 private Object[] parameters; public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } public String getInterfaceName() { return interfaceName; } public void setInterfaceName(String interfaceName) { this.interfaceName = interfaceName; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Class<?>[] getParameterTypes() { return parameterTypes; } public void setParameterTypes(Class<?>[] parameterTypes) { this.parameterTypes = parameterTypes; } public Object[] getParameters() { return parameters; } public void setParameters(Object[] parameters) { this.parameters = parameters; } @Override public String toString() { return "SrpcRequest [requestId=" + requestId + ", interfaceName=" + interfaceName + ", methodName=" + methodName + ", parameterTypes=" + Arrays.toString(parameterTypes) + ", parameters=" + Arrays.toString(parameters) + "]"; } } SrpcResponse.java package com.rpc.message; import java.io.Serializable; /** * * @author linxu * */ public class SrpcResponse implements Serializable{ private static final long serialVersionUID = -5934073769679010930L; // 请求的Id private String requestId; // 异常 private Throwable error; // 响应 private Object result; public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } public Throwable getError() { return error; } public void setError(Throwable error) { this.error = error; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } @Override public String toString() { return "SrpcResponse [requestId=" + requestId + ", error=" + error + ", result=" + result + "]"; } } SerializationUtil.java package com.rpc.util; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; /** * * @author 86136 * */ public class SerializationUtil { /** * 序列化 * * @param t * @return */ public static <T> byte[] toByte(T t) { ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = null; try { o = new ObjectOutputStream(b); o.writeObject(t); return b.toByteArray(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if (b != null) { try { b.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } if (o != null) { try { o.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } return null; } /** * 反序列 * * @param clazz * @param buffer * @return * @throws Exception */ @SuppressWarnings("unchecked") public static <T> T toClass(Class<T> clazz, byte[] buffer) throws Exception { ByteArrayInputStream i = new ByteArrayInputStream(buffer); ObjectInputStream o = null; try { o = new ObjectInputStream(i); return (T) o.readObject(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if (i != null) { i.close(); } if (o != null) { o.close(); } } return null; } } DeptService.java package com.user.service; public interface DeptService { public String selectDept(String d); } UserService.java package com.user.service; public interface UserService { String sayHello(String word); } rpc-consumer工程 ClientBootstrap.java package com.rpc; import com.user.service.DeptService; import com.user.service.UserService; /** * 调用测试 * * @author linxu * */ public class ClientBootstrap { public static void main(String[] args) throws InterruptedException { test1(); test2(); } public static void test1() { RpcConsumer consumer = new RpcConsumer(); UserService service = (UserService) consumer.createProxy(UserService.class); System.out.println(service.sayHello("are you ok 001 ?")); } public static void test2() { RpcConsumer consumer = new RpcConsumer(); DeptService service = (DeptService) consumer.createProxy(DeptService.class); System.out.println(service.selectDept("are you ok 002 ?")); } } ClientHandler.java package com.rpc; import java.util.concurrent.Callable; import com.rpc.message.SrpcRequest; import com.rpc.message.SrpcResponse; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class ClientHandler extends ChannelInboundHandlerAdapter implements Callable<Object> { private ChannelHandlerContext context; private SrpcResponse result; private SrpcRequest para; @Override public void channelActive(ChannelHandlerContext ctx) { context = ctx; } @Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) { result = (SrpcResponse)msg; notify(); } @Override public synchronized Object call() throws InterruptedException { context.writeAndFlush(para); wait(); return result; } void setPara(SrpcRequest para) { this.para = para; } } RpcConsumer.java package com.rpc; import java.lang.reflect.Proxy; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.lang3.StringUtils; import com.rpc.decoder.RpcDecoder; import com.rpc.decoder.RpcEncoder; import com.rpc.message.SrpcRequest; import com.rpc.message.SrpcResponse; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class RpcConsumer { private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private static ClientHandler client; /** * 代理对象去执行了一个socket连接请求, */ public Object createProxy(final Class<?> interfaceClass) { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[] { interfaceClass }, (proxy, method, arguments) -> { if (client == null) { initClient(); } //请求封装 SrpcRequest request = new SrpcRequest(); request.setRequestId(UUID.randomUUID().toString()); request.setInterfaceName(interfaceClass.getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(arguments); client.setPara(request); //请求结果 SrpcResponse response = (SrpcResponse) executor.submit(client).get(); if (response == null || !StringUtils.equals(request.getRequestId(), response.getRequestId())) { return null; } if (response.getError() != null) { throw response.getError(); } return response.getResult(); }); } private static void initClient() { client = new ClientHandler(); EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new RpcEncoder(SrpcRequest.class)); p.addLast(new RpcDecoder(SrpcResponse.class)); p.addLast(client); } }); try { b.connect("localhost", 8888).sync(); } catch (InterruptedException e) { e.printStackTrace(); } } } pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.rpc.consumer</groupId> <artifactId>rpc-consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.rpc.common</groupId> <artifactId>rpc-common</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>utf-8</encoding> </configuration> </plugin> </plugins> </build> </project> rpc-provider工程 Bootstrap.java package com.rpc.bootstrap; import java.util.HashMap; import java.util.Map; import com.rpc.decoder.RpcDecoder; import com.rpc.decoder.RpcEncoder; import com.rpc.message.SrpcRequest; import com.rpc.message.SrpcResponse; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; @SuppressWarnings({ }) public class Bootstrap { // 服务器接口容器 @SuppressWarnings({ }) public static final Map<String, Object> serviceRegistry = new HashMap<String, Object>(); public static void startServer(String hostName, int port) { try { NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new RpcDecoder(SrpcRequest.class)); p.addLast(new RpcEncoder(SrpcResponse.class)); p.addLast(new RpcServerHandler()); } }); bootstrap.bind(hostName, port).sync(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { //服务注册 registryService(); //netty启动 startServer("localhost", 8888); } public static void registryService() { final Map<String, String> serviceInterfaceConfiguration = ServiceRegistry.registry(); if (serviceInterfaceConfiguration != null && !serviceInterfaceConfiguration.isEmpty()) { serviceInterfaceConfiguration.forEach((k, v) -> { try { @SuppressWarnings("deprecation") Object object = Class.forName(v).newInstance(); serviceRegistry.put(k, object); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InstantiationException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IllegalAccessException e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } } } RpcServerHandler.java package com.rpc.bootstrap; import java.lang.reflect.Method; import com.rpc.message.SrpcRequest; import com.rpc.message.SrpcResponse; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class RpcServerHandler extends SimpleChannelInboundHandler<SrpcRequest> { @Override protected void channelRead0(ChannelHandlerContext ctx, SrpcRequest msg) throws Exception { SrpcResponse response = new SrpcResponse(); response.setRequestId(msg.getRequestId()); try { response.setResult(handle(msg)); } catch (Exception e) { response.setError(e); e.printStackTrace(); } ctx.writeAndFlush(response); } /** * 执行服务接口方法 * * @param request * @return * @throws Exception */ private Object handle(SrpcRequest request) throws Exception { Object service = Bootstrap.serviceRegistry.get(request.getInterfaceName()); Method method = service.getClass().getMethod(request.getMethodName(), request.getParameterTypes()); return method.invoke(service, request.getParameters()); } } ServiceRegistry.java package com.rpc.bootstrap; import java.util.HashMap; import java.util.Map; public class ServiceRegistry { public static Map<String,String>serviceInterfaceConfiguration=new HashMap<String, String>(); public static void put(String k,String v) { serviceInterfaceConfiguration.put(k, v); } public static Map<String,String> registry() { //服务接口注册 put("com.user.service.UserService", "com.user.serviceimp.UserServiceImpl"); put("com.user.service.DeptService", "com.user.serviceimp.DeptServiceImpl"); return serviceInterfaceConfiguration; } } DeptServiceImpl.java package com.user.serviceimp; import com.user.service.DeptService; public class DeptServiceImpl implements DeptService{ @Override public String selectDept(String d) { return d+":successful"; } } UserServiceImpl.java package com.user.serviceimp; import com.user.service.UserService; public class UserServiceImpl implements UserService{ @Override public String sayHello(String word) { // TODO Auto-generated method stub System.err.println("调用成功"+word); return "调用成功"+word; } } pom,xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.rpc.provider</groupId> <artifactId>rpc-provider</artifactId> <version>0.0.1-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.rpc.common</groupId> <artifactId>rpc-common</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>utf-8</encoding> </configuration> </plugin> </plugins> </build> </project>
标签:ISE ali print rup utf-8 trace equal conf png
原文地址:https://www.cnblogs.com/mature1021/p/13354230.html