码迷,mamicode.com
首页 > 其他好文 > 详细

SuperSocket1.6Code解析

时间:2020-01-28 10:54:37      阅读:103      评论:0      收藏:0      [点我收藏+]

标签:ioc   real   option   sync   失败   本地   dexp   asto   his   

SuperSocket1.6Code解析

Normal Socket

System.Net.Sockets.dll程序集中使用socket类:

服务器:

  1. 创建socket:_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  2. 创建IPIPAddress _ip = IPAddress.Parse(ip);_endPoint = new IPEndPoint(_ip, port);
  3. 绑定IP地址: _socket.Bind(_endPoint); //绑定端口
  4. 服务开启监听:_socket.Listen(BACKLOG); //开启监听,backlog是监听的最大数列
  5. 开启监听线程:创建新的监听线程,在监听线程中while调用Socket acceptSocket = _socket.Accept();
    1. 一旦acceptSocket 不为空,说明有客户端连接成功,保存客户端socket,并查看该socket的isConnected属性是否连接socket.RemoteEndPoint.ToString();
    2. 一旦连接创建接收线程,并启动线程,在该线程中创建whilewhile (sInfo.isConnected){sInfo.socket.BeginReceive(sInfo.buffer, 0, sInfo.buffer.Length, SocketFlags.None, ReceiveCallBack, sInfo.socket.RemoteEndPoint);}来接收客户端传来的消息。
  6. BeginReceive()有一个回调函数ReceiveCallBack()通过读取byte[]buffer
  7. 向客户端发送信息socket.Send(Encoding.ASCII.GetBytes(text));

receivebuffer默认值8192

SocketAsyncEventArgs

异步套接字操作

  1. 创建IPEndPoint
  2. 创建socketListenerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  3. 绑定IP地址ListenerSocket.Bind(e);
  4. 开始监听ListenerSocket.Listen(10);
  5. 创建异步套接字,并绑定异步完成事件Args = new SocketAsyncEventArgs();Args.Completed += new EventHandler<SocketAsyncEventArgs>(ProcessAccept);
  6. 调用socket的AcceptAsync(Args)方法ListenerSocket.AcceptAsync(Args);
  7. 在异步套接字完成事件的回调函数中,创建新的异步套接字用于接收客户端传入消息的异步操作。var args = new SocketAsyncEventArgs();args.Completed += new EventHandler<SocketAsyncEventArgs>(OnIOCompleted);args.AcceptSocket = s;s.ReceiveAsync(args)s.ReceiveAsync(args),s接收的socket的,新建一个异步套接字,并传入ReceiveAsync()方法。
  8. switch (e.LastOperation)case SocketAsyncOperation.Receive:

Socket.AcceptAsync(SocketAsyncEventArgs) 方法

返回:如果 I/O 操作挂起,则为 true。 操作完成时,将引发 Completed 参数的 e 事件。

如果 I/O 操作同步完成,则为 false。 将不会引发 Completed 参数的 e 事件,并且可能在方法调用返回后立即检查作为参数传递的 e 对象以检索操作的结果。

SuperSocket Architecture

SuperSocket 层次示意图

技术图片

  • Reusable IO Buffer Pool:BufferManager类

SuperSocket 对象模型图示意图

技术图片

SuperSocket 请求处理模型示意图

技术图片

SuperSocket 隔离模型示意图

技术图片

Config

Command Filters

Log/LogFactory

Command Loaders

技术图片

ReceiveFilterFactory

技术图片

ReceiveFilter

技术图片

Connection Filters

技术图片

SocketBase.dll

ISessionBase

技术图片

AppSession

技术图片

技术图片

对AppServer和SocketSession的包装

ServerConfig

服务参数配置,在serverbase基类SetUp中创建

/// <summary>
/// Setups with the specified ip and port.
/// </summary>
/// <param name="ip">The ip.</param>
/// <param name="port">The port.</param>
/// <param name="socketServerFactory">The socket server factory.</param>
/// <param name="receiveFilterFactory">The Receive filter factory.</param>
/// <param name="logFactory">The log factory.</param>
/// <param name="connectionFilters">The connection filters.</param>
/// <param name="commandLoaders">The command loaders.</param>
/// <returns>return setup result</returns>
public bool Setup(string ip, int port, ISocketServerFactory socketServerFactory = null, IReceiveFilterFactory<TRequestInfo> receiveFilterFactory = null, ILogFactory logFactory = null, IEnumerable<IConnectionFilter> connectionFilters = null, IEnumerable<ICommandLoader<ICommand<TAppSession, TRequestInfo>>> commandLoaders = null)
{
    return Setup(new ServerConfig
                    {
                        Ip = ip,
                        Port = port
                    },
                    socketServerFactory,
                    receiveFilterFactory,
                    logFactory,
                    connectionFilters,
                    commandLoaders);
}

技术图片

RootConfig

技术图片

  • MaxWorkingThreads:最大工作线程数量
  • MaxCompletionPortThreads:线程池中异步 I/O 线程的最大数目。
  • PerformanceDataCollectInterval:性能数据收集间隔

RequestInfo

类图

技术图片

  • 基类是RequestInfo,提供了两个方法Key和Body,Body是模板,由子类确定具体类型
  • StringRequestInfo,在父类基础上提供了一个参数,String[] Parameters
  • RequestInfo<TRequestHeader, TRequestBody>:提供了请求头和请求体类型的模板。
  • 三个接口,key属性,body属性,heater属性

ListenerInfo

技术图片

监听节点

ListenerConfig

技术图片

ReflectCommandLoader

技术图片

  • ReflectCommandLoader:通过TryLoadCommands方法反射出程序集中的所有命令
/// <summary>
/// Tries to load commands.
/// </summary>
/// <param name="commands">The commands.</param>
/// <returns></returns>
public override bool TryLoadCommands(out IEnumerable<TCommand> commands)
{
    commands = null;
    var commandAssemblies = new List<Assembly>();
    if (m_AppServer.GetType().Assembly != this.GetType().Assembly)
        commandAssemblies.Add(m_AppServer.GetType().Assembly);
    string commandAssembly = m_AppServer.Config.Options.GetValue("commandAssembly");
    if (!string.IsNullOrEmpty(commandAssembly))
    {
        OnError("The configuration attribute 'commandAssembly' is not in used, please try to use the child node 'commandAssemblies' instead!");
        return false;
    }
    if (m_AppServer.Config.CommandAssemblies != null && m_AppServer.Config.CommandAssemblies.Any())
    {
        try
        {
            var definedAssemblies = AssemblyUtil.GetAssembliesFromStrings(m_AppServer.Config.CommandAssemblies.Select(a => a.Assembly).ToArray());

            if (definedAssemblies.Any())
                commandAssemblies.AddRange(definedAssemblies);
        }
        catch (Exception e)
        {
            OnError(new Exception("Failed to load defined command assemblies!", e));
            return false;
        }
    }
    if (!commandAssemblies.Any())
    {
        commandAssemblies.Add(Assembly.GetEntryAssembly());
    }
    var outputCommands = new List<TCommand>();
    foreach (var assembly in commandAssemblies)
    {
        try
        {
            outputCommands.AddRange(assembly.GetImplementedObjectsByInterface<TCommand>());
        }
        catch (Exception exc)
        {
            OnError(new Exception(string.Format("Failed to get commands from the assembly {0}!", assembly.FullName), exc));
            return false;
        }
    }
    commands = outputCommands;
    return true;
}
}

StatusInfoCollection

技术图片

AppServerBase

技术图片

技术图片

AppSeverBase<TAppSession,TRequestInfo>

m_CommandContainer:命令容器

m_CommandLoaders

m_ConnectionFilters

m_GlobalCommandFilters

m_Listeners

m_SocketServerFactory:在SetupBas

Facility.dll

PolicyReceiveFilterFactory

技术图片

PolicyRecieveFilter

技术图片

Protocol

ReceiveFilterBase

类图

技术图片

技术图片

技术图片

  • 在SuperSocket.SocketBase.Protocol程序集中
  • IReceiveFilter<TRequestInfo>接口,接收解析接口
    • Filter方法,解析会话请求的信息,参数包括,读取缓冲,偏移量,长度,是否copy,没有被解析的长度
    • LeftBufferSize属性:空余的缓冲区长度
    • NextReceiveFilter属性,下一个接收解析器
    • Reset方法,恢复初始化
    • State:解析器状态,正常和错误状态
  • ArraySegmentEx<T>数段类
    • T为数组模板
    • Array数组,count:数量,Offset偏移量,From从,To到
  • ArraySegmentList<T>数段列表
    • 实现了一个数组段列表
    • m_PrevSegment:当前的数段
    • m_PrevSegmentIndex,数段所在的index
  • ReceiveFilterBase<TRequestInfo>
    • BufferSegments属性

SocketEngine.dll

PerformanceMonitor

技术图片

SocketSession

技术图片

在初始化里对AppSession产生依赖,同时维护Socket和SmartPool(SendingQueue[]),因为维护着socket所以发送接收数据都是通过这个类。

  • 设置状态:AddStateFlag()TryAddStateFlag()RemoveStateFlag(),AddStateFlag:自旋设置m_State状态,线程安全的
  • m_Client:Socket
  • SessionID:new guid
  • LocalEndPoint:本地Id端
  • RemoteEndPoint:远程终结点
  • m_SendingQueuePool:实际是SmartPool类的实例,该实例维护者sendingQueue数组
  • m_SendingQueue:从SmarlPool中获取一个SendingQueue实例。

方法

Initialize()方法:

  • 初始化m_SendingQueuePool和m_SendingQueue

TrySend()方法:参数:IList<ArraySegment<byte>> segments:将segments压入sendingqueue队列并调用StartSend最终是调用SendAsyncSendSync,这个是由子类实现。

AsyncSocketSession

在子类中维护SocketAsyncEventArgs

  • SocketAsyncProxy:维护着SocketAsyncEventArgs
  • m_SocketEventArgSend:发送的SocketAsyncEventArgs实例

在初始化中如果同步发送就使用m_SocketEventArgSend,并OnSendingCompleted方法绑定其Completed事件

在SendAsync()方法中将SendingQueue实例给m_SocketEventArgSend的UserToken属性,并调用m_SocketEventArgSend的SetBufferSendAsync方法,发送失败也调用OnSendingCompleted

SocketAsyncProxy中的Completed事件中调用ProcessReceive方法,再调用this.AppSession.ProcessRequest(e.Buffer, e.Offset, e.BytesTransferred, true);方法

AsyncStreamSocketSession

SocketFactory

技术图片

/// <summary>
/// Creates the socket server.
/// </summary>
/// <typeparam name="TRequestInfo">The type of the request info.</typeparam>
/// <param name="appServer">The app server.</param>
/// <param name="listeners">The listeners.</param>
/// <param name="config">The config.</param>
/// <returns></returns>
public ISocketServer CreateSocketServer<TRequestInfo>(IAppServer appServer, ListenerInfo[] listeners, IServerConfig config)
    where TRequestInfo : IRequestInfo
{
    if (appServer == null)
        throw new ArgumentNullException("appServer");
    if (listeners == null)
        throw new ArgumentNullException("listeners");
    if (config == null)
        throw new ArgumentNullException("config");
    switch(config.Mode)
    {
        case(SocketMode.Tcp):
            return new AsyncSocketServer(appServer, listeners);
        case(SocketMode.Udp):
            return new UdpSocketServer<TRequestInfo>(appServer, listeners);
        default:
            throw new NotSupportedException("Unsupported SocketMode:" + config.Mode);
    }
}

SocketServers

技术图片

AsyncSocketServer

  • 缓存管理器m_BufferManager
  • 线程安全的SocketAsyncEventArgsProxy栈

构造函数,父类

public TcpSocketServerBase(IAppServer appServer, ListenerInfo[] listeners)
    : base(appServer, listeners)
{
    var config = appServer.Config;

    uint dummy = 0;
    m_KeepAliveOptionValues = new byte[Marshal.SizeOf(dummy) * 3];
    m_KeepAliveOptionOutValues = new byte[m_KeepAliveOptionValues.Length];
    //whether enable KeepAlive
    BitConverter.GetBytes((uint)1).CopyTo(m_KeepAliveOptionValues, 0);
    //how long will start first keep alive
    BitConverter.GetBytes((uint)(config.KeepAliveTime * 1000)).CopyTo(m_KeepAliveOptionValues, Marshal.SizeOf(dummy));
    //keep alive interval
    BitConverter.GetBytes((uint)(config.KeepAliveInterval * 1000)).CopyTo(m_KeepAliveOptionValues, Marshal.SizeOf(dummy) * 2);

    m_SendTimeOut = config.SendTimeOut;
    m_ReceiveBufferSize = config.ReceiveBufferSize;
    m_SendBufferSize = config.SendBufferSize;
}
public override bool Start()
{
    try
    {
        int bufferSize = AppServer.Config.ReceiveBufferSize;

        if (bufferSize <= 0)
            bufferSize = 1024 * 4;

        m_BufferManager = new BufferManager(bufferSize * AppServer.Config.MaxConnectionNumber, bufferSize);

        try
        {
            m_BufferManager.InitBuffer();
        }
        catch (Exception e)
        {
            AppServer.Logger.Error("Failed to allocate buffer for async socket communication, may because there is no enough memory, please decrease maxConnectionNumber in configuration!", e);
            return false;
        }

        // preallocate pool of SocketAsyncEventArgs objects
        SocketAsyncEventArgs socketEventArg;

        var socketArgsProxyList = new List<SocketAsyncEventArgsProxy>(AppServer.Config.MaxConnectionNumber);

        for (int i = 0; i < AppServer.Config.MaxConnectionNumber; i++)
        {
            //Pre-allocate a set of reusable SocketAsyncEventArgs
            socketEventArg = new SocketAsyncEventArgs();
            m_BufferManager.SetBuffer(socketEventArg);

            socketArgsProxyList.Add(new SocketAsyncEventArgsProxy(socketEventArg));
        }

        m_ReadWritePool = new ConcurrentStack<SocketAsyncEventArgsProxy>(socketArgsProxyList);

        if (!base.Start())
            return false;

        IsRunning = true;
        return true;
    }
    catch (Exception e)
    {
        AppServer.Logger.Error(e);
        return false;
    }
}

SocketAsyncEventArgsProxy

技术图片

SocketAsyncEventArgs的代理

维护着一个SocketAsyncEventArgs对象,并订阅了该对象的Completed事件(异步完成事件)

IsRecyclable:是否可以循环使用

OrigOffset:原始偏移量

每当异步完成的时候调用SocketAsyncEventArgs实例中的UserToken属性,该属性实际上保存着SocketSession实例,并调用SocketSession的ProcessReceive()AsyncRun()方法;socketSession.AsyncRun(() => socketSession.ProcessReceive(e));

UserToken属性是在SocketAsyncEventArgsProxy的初始化方法中定义的

public void Initialize(IAsyncSocketSession socketSession)
{
    SocketEventArgs.UserToken = socketSession;
}

代理模式

技术图片

BootstrapFactory

技术图片

DefaultBootStrap

技术图片

引导配置文件并通过配置实例化各个server和factory,在CreateWorkItemInstance方法通过Activator.CreateInstance(serviceType)实例化

ConfigurationWatcher

技术图片

SocketListenerBase

技术图片

TcpAsyncSocketListener

监听类,由三个事件:监听错误,监听停止,新的客户端连接

m_ListrnSocket:监听Socket

WorkItemFactoryInfoLoader

技术图片

配置文件载入 LoadResult,载入配置的connectionFilter,logfactory,commandloaderfactory,将appserver转化成IworkItem接口,

Common.dll

BufferManager

技术图片

此类创建一个大缓冲区,该缓冲区可以分配给每个套接字I / O操作使用,并分配给SocketAsyncEventArgs对象。 这使得bufffer可以轻松地重用,并且可以防止堆内存碎片化

BufferManager类上公开的操作不是线程安全的。我觉得这个类不需要线程安全,因为每个socket获得数据基本不会并发执行。

  • m_buffer:所有的字节缓存
  • m_bufferSize:单个片段的缓存大小
  • m_currentIndex:当前字节在总缓存中的索引
  • m_freeIndexPool:空闲索引池
  • m_numBytes:缓存片段的数目

主要提供两个方法:一个是SetBuffer和FreeBuffer

SetBuffer:

  • 检查空闲索引栈中是否有值,有值就直接使用空闲索引栈中的值,并将其值从栈中推出,
  • 如果没有空闲栈的值就先检查剩余的缓存是否有一个片段大小,有的化就设置并改变m_currentIndex索引,没有返回false

FreeBuffer:

  • 将当前索引添加到空闲索引栈中,并释放SocketAsyncEventArgs中用的缓存片段。

ArraySegmentList

技术图片

方法:

IndexOf:T在所有缓存中的索引

ArraySegmentEx

  • 数组,是保存着所有缓存,T[]
  • 偏移,该片段在缓存中的位置
  • 数量,该片段的长度

SendingQueue

技术图片

维护ArraySegment<byte>[] globalQueue, globalQueue中包含着所有所有缓存

入栈,出战,开始入栈,开始出栈。

所有的发送队列内存片组成一个大的arraysegment,由SendingQueueSourceCreator创建,并由SmartPool维护

SendingQueueSourceCreator

实际就是SmartPoolSourceCreator,发送队列创建者,默认有5个发送队列,其实每个连接一个发送队列,这边的所有sendingQueue组数是由SmartPool维护的

m_SendingQueueSize:发送队列大小,默认为5

/// <summary>
/// Creates the specified size.
/// </summary>
/// <param name="size">The size.</param>
/// <param name="poolItems">The pool items.</param>
/// <returns></returns>
public ISmartPoolSource Create(int size, out SendingQueue[] poolItems)
{
    var source = new ArraySegment<byte>[size * m_SendingQueueSize];//256*5
    poolItems = new SendingQueue[size];//size=256
    for (var i = 0; i < size; i++)
    {
        poolItems[i] = new SendingQueue(source, i * m_SendingQueueSize, m_SendingQueueSize);//SendingQueue中的source是所有的队列缓存,发送队列偏移量和发送队列容量
    }
    return new SmartPoolSource(source, size);
}

SmartPool

技术图片

其中维护了一个T(实际是SendingQueue)线程安全栈(m_GlobalStack)。由此看出SmartPool就是SendingQueue的池

m_MinPoolSize:Math.Max(config.MaxConnectionNumber / 6, 256)

m_MaxPoolSize:Math.Max(config.MaxConnectionNumber * 2, 256)

m_SourceCreator:new SendingQueueSourceCreator(config.SendingQueueSize)

m_ItemsSource:保存着SmartPoolSource[]对象,该对象实际上是所有的sendingqueue缓存。

m_GlobalStack:保存着单个SendingQueuep对象的数组

Initialize():初始化函数,初始化上面的变量

SmartPoolSource

维护所有的发送队列缓存,并保存sendingQueue的个数

Source:是object类型,实际上是ArraySegment<byte>[],实际上是所有的sendingqueue的缓存,大小为size*sendingqueuesize=256*5

Count:为默认值5

Other.dll

SocketAsyncEventArgs

表示异步套接字操作。

设置IP和Port调用流程

  1. 创建ServerConfig实例,RootConfig实例
  2. 设置m_State状态,线程安全的,通过Interlocked.CompareExchange方法设置
  3. 在setbasic中设置RootConfig,m_Name,Config,设置currentculture,设置线程池参数,设置m_socketfactory,设置textencoding,
  4. 设置logfactory
  5. 在setMedium中设置ReceiveFilterFactory,m_ConnectionFilters,m_CommandLoaders(add ReflectCommandLoader
  6. 在SetupAdvanced中设置BaseSecurity和Certificate,设置listners(ListenerInfo) 设置CommandFilterAttribute,遍历m_CommandLoaders,订阅Error,Updated事件,调用Initialize方法,通过TryLoadCommands方法获取命令集合commands,遍历命令集合添加命令到discoveredCommands集合中
  7. 遍历discoveredCommands集合,将其添加到命令容器 m_CommandContainer中,使用Interlocked.Exchange方法保证线程安全
  8. 在SetupFinal中设置ReceiveFilterFactory=new CommandLineReceiveFilterFactory(TextEncoding),设置m_ServerStatus,通过socketfactory获得serverfactory。

start调用流程

  1. 调用SuperSocket.SocketBase.AppServer中start()方法,调用基类AppServerBase的start()方法,该方法中调用socketserver的start方法
  2. 在socketserver的start方法中设置BufferManager,创建SocketAsyncEventArg,并通过buffermanager设置其buffer,并创建SocketAsyncEventArgProxys, SocketAsyncEventArgProxys集合赋值给m_ReadWritePool。调用SocketServer基类中的start
  3. 在socketserver基类的start中创建SendingQueuePool并初始化,实际是初始化队列池中的sendingqueue队列;通过遍历ListenerInfo集合创建TcpAsyncSocketListener监听者,订阅监听者的stop,error,NewClientAccepted事件,并开始监听Listener.Start,也添加到容器中。
  4. Listener.Start中创建一个监听Listen_socket和new异步套接字SocketAsyncEventArgs,并订阅Compeleted事件,启用socket监听,并调用AcceptAsync方法,异步完成触发compeleted事件,调用ProcessAccept方法,原来的方法异步已经触发重新调用一下AcceptAsync方法,通过函数递归实现while,判定acceptsocket是否正常,触发NewClientAccepted事件,
  5. 事件触发AsyncSocketServer 类中的ProcessNewClient方法,从m_ReadWritePool池中取一个空闲的SocketAsyncEventArgProxy,并通过代理,socket创建AsyncSocketSession,并通过socketsession创建Appsession,在创建过程中做连接过滤,初始化app‘session,通过receivefactory创建receivefilter,同时初始化socketsession,主要是订阅SocketAsyncEventArgProxy中的compeleted事件。调用socketsession的start方法
  6. 在socketsession中调用startreceive方法,中调用socket.ReceiveAsync方法,当异步完成时调用socketProxy的SocketEventArgs_Completed方法,该方法调用SocketSession的ProcessReceive方法,在该方法中执行过滤FilterRequest,执行命令,再一次调用startReceive方法,如此不停通过异步直接实现接收循环

send调用流程

在订阅了NewRequestReceived事件之后,该事件会有两个参数,一个是appsession,一个是requestinfo,

appsession和socketsession完成,

在appsession的InteralSend函数中对sendtimeout进行限制。

在socketsession中将消息压入消息栈对消息进行校验,最终是通过socket.send和socket.sendasync两个方法将消息发送。

Stop调用流程

先调用stop再调用close

socketserver的stop,释放m_ReadWritePool中所有SocketAsyncEventArgs,所有listener的stop,释放其SocketAsyncEventArgs

socket‘session的closed,回收所有sendingqqueue到pool中

SuperSocket1.6Code解析

标签:ioc   real   option   sync   失败   本地   dexp   asto   his   

原文地址:https://www.cnblogs.com/lovexinyi/p/12237451.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!