标签:ioc real option sync 失败 本地 dexp asto his
System.Net.Sockets.dll程序集中使用socket类:
服务器:
_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPAddress _ip = IPAddress.Parse(ip);_endPoint = new IPEndPoint(_ip, port);
_socket.Bind(_endPoint);
//绑定端口_socket.Listen(BACKLOG);
//开启监听,backlog是监听的最大数列Socket acceptSocket = _socket.Accept();
socket.RemoteEndPoint.ToString();
while (sInfo.isConnected){sInfo.socket.BeginReceive(sInfo.buffer, 0, sInfo.buffer.Length, SocketFlags.None, ReceiveCallBack, sInfo.socket.RemoteEndPoint);}
来接收客户端传来的消息。socket.Send(Encoding.ASCII.GetBytes(text));
receivebuffer默认值8192
异步套接字操作
ListenerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
ListenerSocket.Bind(e);
ListenerSocket.Listen(10);
Args = new SocketAsyncEventArgs();Args.Completed += new EventHandler<SocketAsyncEventArgs>(ProcessAccept);
ListenerSocket.AcceptAsync(Args);
var args = new SocketAsyncEventArgs();args.Completed += new EventHandler<SocketAsyncEventArgs>(OnIOCompleted);args.AcceptSocket = s;s.ReceiveAsync(args)
s.ReceiveAsync(args),s接收的socket的,新建一个异步套接字,并传入ReceiveAsync()方法。switch (e.LastOperation)case SocketAsyncOperation.Receive:
Socket.AcceptAsync(SocketAsyncEventArgs) 方法
返回:如果 I/O 操作挂起,则为 true
。 操作完成时,将引发 Completed 参数的 e
事件。
如果 I/O 操作同步完成,则为 false
。 将不会引发 Completed 参数的 e
事件,并且可能在方法调用返回后立即检查作为参数传递的 e
对象以检索操作的结果。
对AppServer和SocketSession的包装
服务参数配置,在serverbase基类SetUp中创建
/// <summary>
/// Setups with the specified ip and port.
/// </summary>
/// <param name="ip">The ip.</param>
/// <param name="port">The port.</param>
/// <param name="socketServerFactory">The socket server factory.</param>
/// <param name="receiveFilterFactory">The Receive filter factory.</param>
/// <param name="logFactory">The log factory.</param>
/// <param name="connectionFilters">The connection filters.</param>
/// <param name="commandLoaders">The command loaders.</param>
/// <returns>return setup result</returns>
public bool Setup(string ip, int port, ISocketServerFactory socketServerFactory = null, IReceiveFilterFactory<TRequestInfo> receiveFilterFactory = null, ILogFactory logFactory = null, IEnumerable<IConnectionFilter> connectionFilters = null, IEnumerable<ICommandLoader<ICommand<TAppSession, TRequestInfo>>> commandLoaders = null)
{
return Setup(new ServerConfig
{
Ip = ip,
Port = port
},
socketServerFactory,
receiveFilterFactory,
logFactory,
connectionFilters,
commandLoaders);
}
类图
监听节点
/// <summary>
/// Tries to load commands.
/// </summary>
/// <param name="commands">The commands.</param>
/// <returns></returns>
public override bool TryLoadCommands(out IEnumerable<TCommand> commands)
{
commands = null;
var commandAssemblies = new List<Assembly>();
if (m_AppServer.GetType().Assembly != this.GetType().Assembly)
commandAssemblies.Add(m_AppServer.GetType().Assembly);
string commandAssembly = m_AppServer.Config.Options.GetValue("commandAssembly");
if (!string.IsNullOrEmpty(commandAssembly))
{
OnError("The configuration attribute 'commandAssembly' is not in used, please try to use the child node 'commandAssemblies' instead!");
return false;
}
if (m_AppServer.Config.CommandAssemblies != null && m_AppServer.Config.CommandAssemblies.Any())
{
try
{
var definedAssemblies = AssemblyUtil.GetAssembliesFromStrings(m_AppServer.Config.CommandAssemblies.Select(a => a.Assembly).ToArray());
if (definedAssemblies.Any())
commandAssemblies.AddRange(definedAssemblies);
}
catch (Exception e)
{
OnError(new Exception("Failed to load defined command assemblies!", e));
return false;
}
}
if (!commandAssemblies.Any())
{
commandAssemblies.Add(Assembly.GetEntryAssembly());
}
var outputCommands = new List<TCommand>();
foreach (var assembly in commandAssemblies)
{
try
{
outputCommands.AddRange(assembly.GetImplementedObjectsByInterface<TCommand>());
}
catch (Exception exc)
{
OnError(new Exception(string.Format("Failed to get commands from the assembly {0}!", assembly.FullName), exc));
return false;
}
}
commands = outputCommands;
return true;
}
}
m_CommandContainer:命令容器
m_CommandLoaders
m_ConnectionFilters
m_GlobalCommandFilters
m_Listeners
m_SocketServerFactory:在SetupBas
ReceiveFilterBase
类图
在初始化里对AppSession产生依赖,同时维护Socket和SmartPool(SendingQueue[]),因为维护着socket所以发送接收数据都是通过这个类。
方法
Initialize()方法:
TrySend()方法:参数:IList<ArraySegment<byte>> segments:将segments压入sendingqueue队列并调用StartSend最终是调用SendAsync或SendSync,这个是由子类实现。
在子类中维护SocketAsyncEventArgs
在初始化中如果同步发送就使用m_SocketEventArgSend,并OnSendingCompleted方法绑定其Completed事件
在SendAsync()方法中将SendingQueue实例给m_SocketEventArgSend的UserToken属性,并调用m_SocketEventArgSend的SetBuffer和SendAsync方法,发送失败也调用OnSendingCompleted
SocketAsyncProxy中的Completed事件中调用ProcessReceive方法,再调用this.AppSession.ProcessRequest(e.Buffer, e.Offset, e.BytesTransferred, true);
方法
/// <summary>
/// Creates the socket server.
/// </summary>
/// <typeparam name="TRequestInfo">The type of the request info.</typeparam>
/// <param name="appServer">The app server.</param>
/// <param name="listeners">The listeners.</param>
/// <param name="config">The config.</param>
/// <returns></returns>
public ISocketServer CreateSocketServer<TRequestInfo>(IAppServer appServer, ListenerInfo[] listeners, IServerConfig config)
where TRequestInfo : IRequestInfo
{
if (appServer == null)
throw new ArgumentNullException("appServer");
if (listeners == null)
throw new ArgumentNullException("listeners");
if (config == null)
throw new ArgumentNullException("config");
switch(config.Mode)
{
case(SocketMode.Tcp):
return new AsyncSocketServer(appServer, listeners);
case(SocketMode.Udp):
return new UdpSocketServer<TRequestInfo>(appServer, listeners);
default:
throw new NotSupportedException("Unsupported SocketMode:" + config.Mode);
}
}
构造函数,父类
public TcpSocketServerBase(IAppServer appServer, ListenerInfo[] listeners)
: base(appServer, listeners)
{
var config = appServer.Config;
uint dummy = 0;
m_KeepAliveOptionValues = new byte[Marshal.SizeOf(dummy) * 3];
m_KeepAliveOptionOutValues = new byte[m_KeepAliveOptionValues.Length];
//whether enable KeepAlive
BitConverter.GetBytes((uint)1).CopyTo(m_KeepAliveOptionValues, 0);
//how long will start first keep alive
BitConverter.GetBytes((uint)(config.KeepAliveTime * 1000)).CopyTo(m_KeepAliveOptionValues, Marshal.SizeOf(dummy));
//keep alive interval
BitConverter.GetBytes((uint)(config.KeepAliveInterval * 1000)).CopyTo(m_KeepAliveOptionValues, Marshal.SizeOf(dummy) * 2);
m_SendTimeOut = config.SendTimeOut;
m_ReceiveBufferSize = config.ReceiveBufferSize;
m_SendBufferSize = config.SendBufferSize;
}
public override bool Start()
{
try
{
int bufferSize = AppServer.Config.ReceiveBufferSize;
if (bufferSize <= 0)
bufferSize = 1024 * 4;
m_BufferManager = new BufferManager(bufferSize * AppServer.Config.MaxConnectionNumber, bufferSize);
try
{
m_BufferManager.InitBuffer();
}
catch (Exception e)
{
AppServer.Logger.Error("Failed to allocate buffer for async socket communication, may because there is no enough memory, please decrease maxConnectionNumber in configuration!", e);
return false;
}
// preallocate pool of SocketAsyncEventArgs objects
SocketAsyncEventArgs socketEventArg;
var socketArgsProxyList = new List<SocketAsyncEventArgsProxy>(AppServer.Config.MaxConnectionNumber);
for (int i = 0; i < AppServer.Config.MaxConnectionNumber; i++)
{
//Pre-allocate a set of reusable SocketAsyncEventArgs
socketEventArg = new SocketAsyncEventArgs();
m_BufferManager.SetBuffer(socketEventArg);
socketArgsProxyList.Add(new SocketAsyncEventArgsProxy(socketEventArg));
}
m_ReadWritePool = new ConcurrentStack<SocketAsyncEventArgsProxy>(socketArgsProxyList);
if (!base.Start())
return false;
IsRunning = true;
return true;
}
catch (Exception e)
{
AppServer.Logger.Error(e);
return false;
}
}
SocketAsyncEventArgs的代理
维护着一个SocketAsyncEventArgs对象,并订阅了该对象的Completed事件(异步完成事件)
IsRecyclable:是否可以循环使用
OrigOffset:原始偏移量
每当异步完成的时候调用SocketAsyncEventArgs实例中的UserToken属性,该属性实际上保存着SocketSession实例,并调用SocketSession的ProcessReceive()和AsyncRun()方法;socketSession.AsyncRun(() => socketSession.ProcessReceive(e));
UserToken属性是在SocketAsyncEventArgsProxy的初始化方法中定义的
public void Initialize(IAsyncSocketSession socketSession)
{
SocketEventArgs.UserToken = socketSession;
}
代理模式
引导配置文件并通过配置实例化各个server和factory,在CreateWorkItemInstance方法通过Activator.CreateInstance(serviceType)实例化
监听类,由三个事件:监听错误,监听停止,新的客户端连接
m_ListrnSocket:监听Socket
配置文件载入 LoadResult,载入配置的connectionFilter,logfactory,commandloaderfactory,将appserver转化成IworkItem接口,
此类创建一个大缓冲区,该缓冲区可以分配给每个套接字I / O操作使用,并分配给SocketAsyncEventArgs对象。 这使得bufffer可以轻松地重用,并且可以防止堆内存碎片化。
BufferManager类上公开的操作不是线程安全的。我觉得这个类不需要线程安全,因为每个socket获得数据基本不会并发执行。
主要提供两个方法:一个是SetBuffer和FreeBuffer
SetBuffer:
FreeBuffer:
方法:
IndexOf:T在所有缓存中的索引
维护ArraySegment<byte>[] globalQueue, globalQueue中包含着所有所有缓存
入栈,出战,开始入栈,开始出栈。
所有的发送队列内存片组成一个大的arraysegment,由SendingQueueSourceCreator创建,并由SmartPool维护
实际就是SmartPoolSourceCreator,发送队列创建者,默认有5个发送队列,其实每个连接一个发送队列,这边的所有sendingQueue组数是由SmartPool维护的
m_SendingQueueSize:发送队列大小,默认为5
/// <summary>
/// Creates the specified size.
/// </summary>
/// <param name="size">The size.</param>
/// <param name="poolItems">The pool items.</param>
/// <returns></returns>
public ISmartPoolSource Create(int size, out SendingQueue[] poolItems)
{
var source = new ArraySegment<byte>[size * m_SendingQueueSize];//256*5
poolItems = new SendingQueue[size];//size=256
for (var i = 0; i < size; i++)
{
poolItems[i] = new SendingQueue(source, i * m_SendingQueueSize, m_SendingQueueSize);//SendingQueue中的source是所有的队列缓存,发送队列偏移量和发送队列容量
}
return new SmartPoolSource(source, size);
}
其中维护了一个T(实际是SendingQueue)线程安全栈(m_GlobalStack)。由此看出SmartPool就是SendingQueue的池
m_MinPoolSize:Math.Max(config.MaxConnectionNumber / 6, 256)
m_MaxPoolSize:Math.Max(config.MaxConnectionNumber * 2, 256)
m_SourceCreator:new SendingQueueSourceCreator(config.SendingQueueSize)
m_ItemsSource:保存着SmartPoolSource[]对象,该对象实际上是所有的sendingqueue缓存。
m_GlobalStack:保存着单个SendingQueuep对象的数组
Initialize():初始化函数,初始化上面的变量
维护所有的发送队列缓存,并保存sendingQueue的个数
Source:是object类型,实际上是ArraySegment<byte>[],实际上是所有的sendingqueue的缓存,大小为size*sendingqueuesize=256*5
,
Count:为默认值5
表示异步套接字操作。
在订阅了NewRequestReceived事件之后,该事件会有两个参数,一个是appsession,一个是requestinfo,
appsession和socketsession完成,
在appsession的InteralSend函数中对sendtimeout进行限制。
在socketsession中将消息压入消息栈对消息进行校验,最终是通过socket.send和socket.sendasync两个方法将消息发送。
先调用stop再调用close
socketserver的stop,释放m_ReadWritePool中所有SocketAsyncEventArgs,所有listener的stop,释放其SocketAsyncEventArgs
socket‘session的closed,回收所有sendingqqueue到pool中
标签:ioc real option sync 失败 本地 dexp asto his
原文地址:https://www.cnblogs.com/lovexinyi/p/12237451.html