标签:ecs service play def resolve last coder 技术分享 技术
在服务端定义了IServer 和ServerBase负责服务端的启动,关闭等;在Server启动时,需要1.开启端口检测 2.注册服务 3.提供了接口IServerAddInsInitializer注入,在启动时执行额外逻辑;具体代码如下:
using GP.RPC.Message; using GP.RPC.Route.Manager; using GP.RPC.Server.SerivceType; using GP.RPC.Server.ServerInitializer; using GP.RPC.Server.ServiceExecutor; using GP.RPC.Server.ServiceRegistion; using GP.RPC.Transport.Receiver; using GP.RPC.Transport.Sender; using Microsoft.Extensions.DependencyModel; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace GP.RPC.Server.Abstract { public abstract class ServerBase : IServer { public IServerAddInsInitializer ServerAddInsInitializer { get; set; } public IServiceRegistion Registion { get; set; } public ServerBase() { Registion = NullServiceRegistion.Instance; } public virtual async Task StartAsync(int port) { await StartCoreAsync(port); await Registion.RegisterAsync(port); ServerAddInsInitializer = NullServerAddInsInitializer.Instance; ServerAddInsInitializer.Initialize(); } protected abstract Task StartCoreAsync(int port); public abstract void ShutDown(); public abstract void Print(); } }
通信使用DotNetty完成,因此,服务端实际实现为NettyServer类,具体代码如下:
using DotNetty.Codecs; using DotNetty.Handlers.Logging; using DotNetty.Transport.Bootstrapping; using DotNetty.Transport.Channels; using DotNetty.Transport.Channels.Sockets; using GP.RPC.IOC; using GP.RPC.Message; using GP.RPC.Server; using GP.RPC.Server.Abstract; using GP.RPC.Server.ServiceExecutor; using GP.RPC.Server.ServiceRegistion; using GP.RPC.Transport.Netty.Codec; using GP.RPC.Transport.Netty.Handler; using GP.RPC.Transport.Netty.Sender; using GP.RPC.Transport.Receiver; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace GP.RPC.Transport.Netty.Server { public class NettyServer : ServerBase { private ServerBootstrap bootstrap; private MultithreadEventLoopGroup boss; private MultithreadEventLoopGroup worker; private IChannel bootstrapChannel; //private NettyMessageDecoderAdapter decoder = IocManager.Instance.Resolve<NettyMessageDecoderAdapter>(); //private NettyMessageEncoderAdapter encoder = IocManager.Instance.Resolve<NettyMessageEncoderAdapter>(); //private NettyServerHandler handler = new NettyServerHandler(); private IMessageReceiver _receiver; private IServiceExecutor _executor; public NettyServer( IMessageReceiver receiver, IServiceExecutor executor) { _executor = executor; _receiver = receiver; _receiver.Received += Message_Received; //handler.AsyncServerReceiverHook += _receiver.OnReceived; } public override void ShutDown() { Task.WhenAll( bootstrapChannel.CloseAsync(), boss.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)), worker.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)) ); } protected override async Task StartCoreAsync(int port) { boss = new MultithreadEventLoopGroup(1); worker = new MultithreadEventLoopGroup(); bootstrap = new ServerBootstrap(); bootstrap.Group(boss, worker) .Channel<TcpServerSocketChannel>() .Option(ChannelOption.SoBacklog, 100) .Handler(new LoggingHandler(LogLevel.INFO)) .ChildHandler(new ActionChannelInitializer<ISocketChannel>(channel => { IChannelPipeline pipeline = channel.Pipeline; var handler = new NettyServerHandler(); handler.AsyncServerReceiverHook += _receiver.OnReceived; pipeline.AddLast(new LengthFieldPrepender(4)); pipeline.AddLast(new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4)); pipeline.AddLast(IocManager.Instance.Resolve<NettyMessageEncoderAdapter>(), IocManager.Instance.Resolve<NettyMessageDecoderAdapter>(), handler); })); bootstrapChannel = await bootstrap.BindAsync(port); } private async Task Message_Received(object message) { var context = message as ServerReceiverContext; if (context != null) { var request = context.Message as RequestMessage; if (request != null) { var response = await _executor.ExecuteAsync(request); var sender = new NettySeverMessageSender(context.Context); await sender.SendAsync(null,response); } } } public override void Print() { Console.WriteLine($"Channel id is{this.bootstrapChannel.Id} State is {this.bootstrapChannel.Open} Active is{this.bootstrapChannel.Active}"); } } }
服务端接收到消息后,将消息解码为RequestMessage,并交给IServiceExecutor进行实际的服务调用;实现代码如下:
using System; using System.Collections.Generic; using System.Reflection; using System.Text; using System.Threading.Tasks; using GP.RPC.Message; using GP.RPC.Utilities; using Microsoft.Extensions.DependencyModel; using System.Linq; using GP.RPC.Server.SerivceType; namespace GP.RPC.Server.ServiceExecutor { public class SimpleServiceExecutor : IServiceExecutor { private IServiceInstanceFactory _factory; private IServiceTypeManager _typeManager; public SimpleServiceExecutor(IServiceTypeManager typeManager,IServiceInstanceFactory factory) { _factory = factory; _typeManager = typeManager; } public async Task<ResponseMessage> ExecuteAsync(RequestMessage request) { ResponseMessage response = null; try { ContractUtility.CheckNull(request, "RequestMessage"); ContractUtility.CheckEmptyOrNull(request.ServiceType, "ServiceType"); ContractUtility.CheckEmptyOrNull(request.ServiceMethod, "ServiceMethod"); //这里仅通过名字进行匹配 var type = GetServiceType(request.ServiceType); ContractUtility.Require(type != null, $"Not found matched ServiceType {request.ServiceType}"); var parameterTypes = GetParameterTypes(request.Parameters); var method = GetServiceMethod(type,request.ServiceMethod, parameterTypes); ContractUtility.Require(method != null, $"Not found matched ServiceMethod {request.ServiceMethod} in SerivceType {request.ServiceType}"); var result = await ExecuteCoreAsync(_factory.Create(type), method, method.ReturnType, request.Parameters); response = new ResponseMessage { RequestId = request.Id, Success=true, Result = result }; //Console.WriteLine($"Server:{request.Id}"); } catch (Exception ex) { response =ResponseMessage.FromException(request.Id, ex); } return response; } protected Type GetServiceType(string serviceType) { return _typeManager.GetServiceType(serviceType); } protected Type[] GetParameterTypes(object[] parameters) { if (null == parameters || 0 == parameters.Length) { return Type.EmptyTypes; } return parameters.Select(p => p.GetType()).ToArray(); } protected MethodInfo GetServiceMethod(Type type,string serviceMethod, Type[] parameterTypes) { var method = type.GetMethod(serviceMethod, parameterTypes); if (method == null || method.IsGenericMethod) { return null; } return method; } protected async Task<object> ExecuteCoreAsync(object instance, MethodInfo method,Type returnType , object[] parameters) { var result = method.Invoke(instance, parameters); if (!IsAsync(returnType)) { return result; } if (returnType == typeof(Task)) { await ProcessAsyncTask((Task)result); return null; } return CallProcessAsyncTaskWithResult(returnType.GenericTypeArguments[0], result); } protected bool IsAsync(Type returnType) { return ( returnType == typeof(Task) || (returnType.GetTypeInfo().IsGenericType && returnType.GetGenericTypeDefinition() == typeof(Task<>)) ); } protected async Task ProcessAsyncTask(Task actualTask) { await actualTask; } protected async Task<T> ProcessAsyncTaskWithResult<T>(Task<T> actualTask) { return await actualTask; } protected object CallProcessAsyncTaskWithResult(Type taskReturnType, object actualTaskValue) { return this.GetType() .GetMethod("ProcessAsyncTaskWithResult", BindingFlags.Instance) .MakeGenericMethod(taskReturnType) .Invoke(this, new object[] { actualTaskValue }); } } }
服务调用完成后,生成相应消息ResponseMessage,并交给Server端的MessageSender,进行发送回客户端。由此服务端完成整个处理。
标签:ecs service play def resolve last coder 技术分享 技术
原文地址:https://www.cnblogs.com/fisher3/p/10003290.html