码迷,mamicode.com
首页 > 其他好文 > 详细

Scut:SocketListener 的解析

时间:2016-08-14 20:41:46      阅读:265      评论:0      收藏:0      [点我收藏+]

标签:

  大致浏览了一遍,Scut 的网络模型采用的是 SAEA 模型, 它是 .NET Framework 3.5 开始支持的一种支持高性能 Socket 通信的实现。

  通过分析 Scut 的套接字监听控制,就能大致明白它是如何使用 SAEA 架构的。

 

1. 套接字缓冲区内存管理器

  先来看下 Scut 对套接字缓冲区的内存管理:

    class BufferManager
    {
        int capacity;
        byte[] bufferBlock;
        Stack<int> freeIndexPool;
        int currentIndex;
        int saeaSize;
    
        /* 
         * capacity 表示为所有套接字准备的内存容量
         * saeaSzie 表示单个套接字所需的内存量 
         */
        public BufferManager(int capacity, int saeaSize)    
        {
            this.capacity = capacity;
            this.saeaSize = saeaSize;
            this.freeIndexPool = new Stack<int>();
        }

        //申请整份的内存空间
        internal void InitBuffer()
        {
            this.bufferBlock = new byte[capacity];
        }

        //为每个 SAEA 向缓存管理器申请缓存
        internal bool SetBuffer(SocketAsyncEventArgs args)
        {
            if (this.freeIndexPool.Count > 0)    //用一个堆栈记录非顺序释放的内存块,优先使用这些内存块作为缓存
            {
                args.SetBuffer(this.bufferBlock, this.freeIndexPool.Pop(), this.saeaSize);
            }
            else
            {
                if ((capacity - this.saeaSize) < this.currentIndex)
                {
                    return false;
                }
                args.SetBuffer(this.bufferBlock, this.currentIndex, this.saeaSize);
                this.currentIndex += this.saeaSize;
            }
            return true;
        }

        //为SAEA将缓存还给缓存管理器
        internal void FreeBuffer(SocketAsyncEventArgs args)
        {
            this.freeIndexPool.Push(args.Offset);
            args.SetBuffer(null, 0, 0);
        }
    }

  使用一个堆栈来管理”碎片大小相同、随时取用与释放”的内存块,这段代码算是十分高效与简介了。

 

2. SocketListener 的初始化

        private void Init()
        {
            this.bufferManager.InitBuffer();

            for (int i = 0; i < this.socketSettings.MaxAcceptOps; i++)         //创建一个接受连接的SAEA池子
            {
                this.acceptEventArgsPool.Push(CreateAcceptEventArgs());
        private SocketAsyncEventArgs CreateAcceptEventArgs()
        {
            SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();
            acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(Accept_Completed);  //这部分SAEA绑定的都是“完成连接”事件处理API
            return acceptEventArg;
        }
            }

            SocketAsyncEventArgs ioEventArgs;
            for (int i = 0; i < this.socketSettings.NumOfSaeaForRecSend; i++)  //创建一个处理IO的SAEA池子
            {
                ioEventArgs = new SocketAsyncEventArgs();
                this.bufferManager.SetBuffer(ioEventArgs);
                ioEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);    //这部分SAEA绑定的都是“IO”事件处理API
                DataToken dataToken = new DataToken();
                dataToken.bufferOffset = ioEventArgs.Offset;  //每个SAEA在缓存管理器中获取的内存块的起始偏移都是唯一的,可以用来做唯一标识
                ioEventArgs.UserToken = dataToken;
                this.ioEventArgsPool.Push(ioEventArgs);
            }
            _summaryTimer = new Timer(OnSummaryTrace, null, 600, 60000);
       
    public class SummaryStatus     //日志定时记录连接的状态
    {
        /// <summary>
        /// 
        /// </summary>
        public long TotalConnectCount;
        /// <summary>
        /// 
        /// </summary>
        public int CurrentConnectCount;
        /// <summary>
        /// 
        /// </summary>
        public int RejectedConnectCount;
        /// <summary>
        /// 
        /// </summary>
        public int CloseConnectCount;
    }
        }

 

3. 监听-连接-数据传输流程

  那么,这么多SAEA是如何工作的呢?

        public void StartListen()
        {
            listenSocket = new Socket(this.socketSettings.LocalEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);   //建立TCP监听套接字
            listenSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);                       //进一步设置监听套接字参数
            listenSocket.Bind(this.socketSettings.LocalEndPoint);  //绑定端口
            listenSocket.Listen(socketSettings.Backlog);           //开始监听,并设置最大排队连接数
            _isStart = true;
            requestHandler.Bind(this);
            PostAccept();
        }

看一下 SocketOptionLevel 的作用:

  SocketOptionLevel.IP:仅适用于 IP 套接字;

  SocketOptionLevel.IPv6:仅适用于 IPv6 套接字;

  SocketOptionLevel.Socket:适用于所有套接字;

  SocketOptionLevel.Tcp、SocketOptionLevel.Udp:适用于TCP、UDP套接字;

SocketOptionName.ReuseAddress:允许将套接字绑定到已在使用中的地址。

        private void PostAccept()
        {
            try
            {
                if (!_isStart)
                {
                    return;
                }
                SocketAsyncEventArgs acceptEventArgs = acceptEventArgsPool.Pop() ?? CreateAcceptEventArgs();   //从accept SAEA池中取出一个SAEA交给监听套接字去获取连接参数
                bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArgs);  
                if (!willRaiseEvent)                  //直接同步取得连接则顺序执行,异步取得则触发 Accept_Completed 事件处理函数
                {
                    ProcessAccept(acceptEventArgs);   //处理连接
                }
            }
            catch (Exception ex)
            {
                TraceLog.WriteError("Post accept listen error:{0}", ex);
            }
        }

  我们可以看到在 Accept_Completed 中也是同样调用了 ProcessAccept;

        private void Accept_Completed(object sender, SocketAsyncEventArgs acceptEventArgs)
        {
            try
            {
                ProcessAccept(acceptEventArgs);
            }
            catch (Exception ex)
            {  
                ... ...
            }
        }        

  继续看 ProcessAccept 是如何工作的:

        private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs)
        {
            try
            {
                Interlocked.Increment(ref _summaryStatus.TotalConnectCount);    //向监控器提交“总连接数+1”
                maxConnectionsEnforcer.WaitOne();                    //堵塞一个信号量,由此可知,信号量的总数控制了可并发处理的accept连接数

                if (acceptEventArgs.SocketError != SocketError.Success)
                {
                    Interlocked.Increment(ref _summaryStatus.RejectedConnectCount);   //向监控器提交“被拒绝连接数+1”
                    HandleBadAccept(acceptEventArgs);
                }
                else
                {
                    Interlocked.Increment(ref _summaryStatus.CurrentConnectCount);   //向监控器提交“当前连接数+1”

                    SocketAsyncEventArgs ioEventArgs = this.ioEventArgsPool.Pop();     //获取IO SAEA 池中的一个SAEA
                    ioEventArgs.AcceptSocket = acceptEventArgs.AcceptSocket;           //将 accept 建立的 io 套接字交给该 SAEA
                    var dataToken = (DataToken)ioEventArgs.UserToken;
                    ioEventArgs.SetBuffer(dataToken.bufferOffset, socketSettings.BufferSize);  //为 io SAEA 提供缓存
                    var exSocket = new ExSocket(ioEventArgs.AcceptSocket);             // 将 io 套接字用 ExSocket 管理起来
                    exSocket.LastAccessTime = DateTime.Now;
                    dataToken.Socket = exSocket;
                    acceptEventArgs.AcceptSocket = null;

                    //release connect when socket has be closed.
                    ReleaseAccept(acceptEventArgs, false);   //该 accept SAEA 已经完成任务,释放其资源                
                    try
                    {
                        OnConnected(new ConnectionEventArgs { Socket = exSocket });  //OnConnected 是 SocketListener 的“连接事件订阅器”,成功连接时触发该订阅
                    }
                    catch (Exception ex)
                    {
                        TraceLog.WriteError("OnConnected error:{0}", ex);
                    }
                    PostReceive(ioEventArgs);
                }

            }
            finally
            {
                PostAccept();  //处理完毕后又重新开始监听
            }
     }

  可以看到这个api 做的最重要的事情:1. 将建立连接的socket交给ioSAEA;2. ioSAEA去底层获取消息;3. 继续监听;

  疑问:如果只有1个监听套接字,为什么要做一个 acceptpool? 

  再来看下 ioSAEA 的工作流程:

        private void PostReceive(SocketAsyncEventArgs ioEventArgs)
        {
            if (ioEventArgs.AcceptSocket == null) return;

            bool willRaiseEvent = ioEventArgs.AcceptSocket.ReceiveAsync(ioEventArgs);   //异步接收io数据

            if (!willRaiseEvent)    //如果同步获得直接处理,异步获得则由异步回调处理
            {
                ProcessReceive(ioEventArgs);
            }
        }

  无论哪种处理方式,都是调用 ProcessReceive,其中比较重要的部分:

bool needPostAnother = requestHandler.TryReceiveMessage(ioEventArgs, out messages, out hasHandshaked);

  在 SocketListener 启动的时候我们注意到:

requestHandler.Bind(this);

  监听套接字管理器自带 requestHandle,这是个什么东西?

    public class RequestHandler
    {
        public RequestHandler(BaseMessageProcessor messageProcessor)
        {
            MessageProcessor = messageProcessor;
        }

        internal virtual void Bind(ISocket appServer)
        {
            AppServer = appServer;
        }

        public ISocket AppServer { get; private set; }
        ... ...
    }
        protected GameSocketHost()
            : this(new RequestHandler(new MessageHandler()))
        {
        }

        protected GameWebSocketHost(bool isSecurity = false)
            : this(new WebSocketRequestHandler(isSecurity))
        {
        }

  从 Scut 的以上代码应该可以得知:什么类型的套接字宿主应该绑定相应类型的消息处理API

  进一步观察,websocket 与 socket 的“消息发送API”是一致的,而“消息读取”API则完全不同,有空再回来研究这块内容。

  继续回到 ProcessReceive:

                        switch (message.OpCode)
                        {
                            case OpCode.Close:
                                var statusCode = requestHandler.MessageProcessor != null
                                    ? requestHandler.MessageProcessor.GetCloseStatus(message.Data)
                                    : OpCode.Empty;
                                if (statusCode != OpCode.Empty)
                                {
                                    DoClosedStatus(exSocket, statusCode);
                                }
                                Closing(ioEventArgs, OpCode.Empty);
                                needPostAnother = false;
                                break;
                            case OpCode.Ping:
                                DoPing(new ConnectionEventArgs { Socket = exSocket, Meaage = message });
                                break;
                            case OpCode.Pong:
                                DoPong(new ConnectionEventArgs { Socket = exSocket, Meaage = message });
                                break;
                            default:
                                OnDataReceived(new ConnectionEventArgs { Socket = exSocket, Meaage = message });
                                break;
                        }

  如果是常规数据,则调用 OnDataReceived,这是更上一层注册的逻辑消息处理API,正常来说,到了进入“应用消息分发器”-IActionDispatcher 的节奏了。

  继续往上查,果然不出意料。

  

 

Scut:SocketListener 的解析

标签:

原文地址:http://www.cnblogs.com/Daniel-Liang/p/5769179.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!