标签:
本例基于networkcomms2.3.1开源版本 gplv3协议
如果networkcomms是一顶皇冠,那么CommsThreadPool(自定义线程池)就是皇冠上的明珠了,这样说应该不夸张的,她那么优美,简洁,高效。
在 《c#网络通信框架networkcomms内核解析之六 处理接收到的二进制数据》中我们曾经提到,服务器收到数据后,如果是系统内部保留类型数据或者是最高优先级数据,系统会在主线程中处理,其他的会交给自定义线程池进行处理。
作为服务器,处理成千上万的连接及数据,单线程性能肯定是不行的,所以我们的支持优先级的自定义线程池毫无疑问是多线程的:)
注释就不写了,她美丽的面纱还是由您来揭开好了
// Copyright 2011-2013 Marc Fletcher, Matthew Dean // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see <http://www.gnu.org/licenses/>. // // A commercial license of this software can also be purchased. // Please see <http://www.networkcomms.net/licensing/> for details. using System; using System.Collections.Generic; using System.ComponentModel; using System.Text; using System.Threading; namespace NetworkCommsDotNet { /// <summary> /// A compact thread pool used by NetworkComms.Net to run packet handlers /// </summary> public class CommsThreadPool { /// <summary> /// A sync object to make things thread safe /// </summary> object SyncRoot = new object(); /// <summary> /// Dictionary of threads, index is ThreadId /// </summary> Dictionary<int, Thread> threadDict = new Dictionary<int,Thread>(); /// <summary> /// Dictionary of thread worker info, index is ThreadId /// </summary> Dictionary<int, WorkerInfo> workerInfoDict = new Dictionary<int, WorkerInfo>(); /// <summary> /// The minimum timespan between thread wait sleep join updates /// </summary> TimeSpan ThreadWaitSleepJoinCountUpdateInterval = new TimeSpan(0, 0, 0, 0, 250); /// <summary> /// A quick lookup of the number of current threads which are idle and require jobs /// </summary> int requireJobThreadsCount = 0; /// <summary> /// Priority queue used to order call backs /// </summary> PriorityQueue<WaitCallBackWrapper> jobQueue = new PriorityQueue<WaitCallBackWrapper>(); /// <summary> /// Set to true to ensure correct shutdown of worker threads. /// </summary> bool shutdown = false; /// <summary> /// The timespan after which an idle thread will close /// </summary> TimeSpan ThreadIdleTimeoutClose { get; set; } /// <summary> /// The maximum number of threads to create in the pool /// </summary> public int MaxTotalThreadsCount {get; private set;} /// <summary> /// The maximum number of active threads in the pool. This can be less than MaxTotalThreadsCount, taking account of waiting threads. /// </summary> public int MaxActiveThreadsCount { get; private set; } /// <summary> /// The minimum number of idle threads to maintain in the pool /// </summary> public int MinThreadsCount {get; private set;} /// <summary> /// The most recent count of pool threads which are waiting for IO /// </summary> public int CurrentNumWaitSleepJoinThreadsCache { get; private set; } /// <summary> /// The dateTime associated with the most recent count of pool threads which are waiting for IO /// </summary> public DateTime LastThreadWaitSleepJoinCountCacheUpdate { get; private set; } /// <summary> /// The total number of threads currently in the thread pool /// </summary> public int CurrentNumTotalThreads { get { lock(SyncRoot) return threadDict.Count; } } /// <summary> /// The total number of idle threads currently in the thread pool /// </summary> public int CurrentNumIdleThreads { get { lock (SyncRoot) return requireJobThreadsCount; } } /// <summary> /// The total number of items currently waiting to be collected by a thread /// </summary> public int QueueCount { get { return jobQueue.Count; } } /// <summary> /// Create a new comms thread pool /// </summary> /// <param name="minThreadsCount">Minimum number of idle threads to maintain in the pool</param> /// <param name="maxActiveThreadsCount">The maximum number of active (i.e. not waiting for IO) threads</param> /// <param name="maxTotalThreadsCount">Maximum number of threads to create in the pool</param> /// <param name="threadIdleTimeoutClose">Timespan after which an idle thread will close</param> public CommsThreadPool(int minThreadsCount, int maxActiveThreadsCount, int maxTotalThreadsCount, TimeSpan threadIdleTimeoutClose) { MinThreadsCount = minThreadsCount; MaxTotalThreadsCount = maxTotalThreadsCount; MaxActiveThreadsCount = maxActiveThreadsCount; ThreadIdleTimeoutClose = threadIdleTimeoutClose; } /// <summary> /// Prevent any additional threads from starting. Returns immediately. /// </summary> public void BeginShutdown() { lock(SyncRoot) shutdown = true; } /// <summary> /// Prevent any additional threads from starting and return once all existing workers have completed. /// </summary> /// <param name="threadShutdownTimeoutMS"></param> public void EndShutdown(int threadShutdownTimeoutMS = 1000) { List<Thread> allWorkerThreads = new List<Thread>(); lock(SyncRoot) { foreach (var thread in threadDict) { workerInfoDict[thread.Key].ThreadSignal.Set(); allWorkerThreads.Add(thread.Value); } } //Wait for all threads to finish foreach (Thread thread in allWorkerThreads) { try { if (!thread.Join(threadShutdownTimeoutMS)) thread.Abort(); } catch (Exception ex) { NetworkComms.LogError(ex, "ManagedThreadPoolShutdownError"); } } lock (SyncRoot) { jobQueue.Clear(); shutdown = false; } } /// <summary> /// Enqueue a callback to the thread pool. /// </summary> /// <param name="priority">The priority with which to enqueue the provided callback</param> /// <param name="callback">The callback to execute</param> /// <param name="state">The state parameter to pass to the callback when executed</param> /// <returns>Returns the managed threadId running the callback if one was available, otherwise -1</returns> public int EnqueueItem(QueueItemPriority priority, WaitCallback callback, object state) { int chosenThreadId = -1; lock (SyncRoot) { UpdateThreadWaitSleepJoinCountCache(); int numInJobActiveThreadsCount = Math.Max(0, threadDict.Count - CurrentNumWaitSleepJoinThreadsCache - requireJobThreadsCount); //int numActiveThreads = Math.Max(0,threadDict.Count - CurrentNumWaitSleepJoinThreadsCache); if (!shutdown && requireJobThreadsCount == 0 && numInJobActiveThreadsCount < MaxActiveThreadsCount && threadDict.Count < MaxTotalThreadsCount) { //Launch a new thread Thread newThread = new Thread(ThreadWorker); newThread.Name = "ManagedThreadPool_" + newThread.ManagedThreadId.ToString(); WorkerInfo info = new WorkerInfo(newThread.ManagedThreadId, new WaitCallBackWrapper(callback, state)); chosenThreadId = newThread.ManagedThreadId; threadDict.Add(newThread.ManagedThreadId, newThread); workerInfoDict.Add(newThread.ManagedThreadId, info); newThread.Start(info); } else if (!shutdown && requireJobThreadsCount > 0 && numInJobActiveThreadsCount < MaxActiveThreadsCount) { jobQueue.TryAdd(new KeyValuePair<QueueItemPriority, WaitCallBackWrapper>(priority, new WaitCallBackWrapper(callback, state))); int checkCount = 0; foreach (var info in workerInfoDict) { //Trigger the first idle thread checkCount++; if (info.Value.ThreadIdle) { info.Value.ClearThreadIdle(); requireJobThreadsCount--; info.Value.ThreadSignal.Set(); chosenThreadId = info.Value.ThreadId; break; } if (checkCount == workerInfoDict.Count) throw new Exception("IdleThreads count is " + requireJobThreadsCount.ToString() + " but unable to locate thread marked as idle."); } } else if (!shutdown) { //If there are no idle threads and we can‘t start any new ones we just have to enqueue the item jobQueue.TryAdd(new KeyValuePair<QueueItemPriority, WaitCallBackWrapper>(priority, new WaitCallBackWrapper(callback, state))); } } return chosenThreadId; } /// <summary> /// The worker object for the thread pool /// </summary> /// <param name="state"></param> private void ThreadWorker(object state) { WorkerInfo threadInfo = (WorkerInfo)state; do { //While there are jobs in the queue process the jobs while (true) { if (threadInfo.CurrentCallBackWrapper == null) { KeyValuePair<QueueItemPriority, WaitCallBackWrapper> packetQueueItem; lock (SyncRoot) { UpdateThreadWaitSleepJoinCountCache(); int numInJobActiveThreadsCount = Math.Max(0, threadDict.Count - CurrentNumWaitSleepJoinThreadsCache - requireJobThreadsCount); if (shutdown || threadDict.Count > MaxTotalThreadsCount) //If we have too many active threads { //If shutdown was true then we may need to set thread to idle if (threadInfo.ThreadIdle && requireJobThreadsCount > 0) requireJobThreadsCount--; threadInfo.ClearThreadIdle(); threadDict.Remove(threadInfo.ThreadId); workerInfoDict.Remove(threadInfo.ThreadId); UpdateThreadWaitSleepJoinCountCache(); return; } else if (numInJobActiveThreadsCount > MaxActiveThreadsCount) //If we have too many active threads { //We wont close here to prevent thread creation/destruction thrashing. //We will instead act as if there is no work and wait to potentially be timed out if (!threadInfo.ThreadIdle) { threadInfo.SetThreadIdle(); requireJobThreadsCount++; } break; } else { //Try to get a job if (!jobQueue.TryTake(out packetQueueItem)) //We fail to get a new job { //If we failed to get a job we switch to idle and wait to be triggered if (!threadInfo.ThreadIdle) { threadInfo.SetThreadIdle(); requireJobThreadsCount++; } break; } else { if (threadInfo.ThreadIdle && requireJobThreadsCount > 0) requireJobThreadsCount--; threadInfo.UpdateCurrentCallBackWrapper(packetQueueItem.Value); threadInfo.ClearThreadIdle(); } } } } //Perform the waitcallBack try { threadInfo.SetInsideCallBack(); threadInfo.CurrentCallBackWrapper.WaitCallBack(threadInfo.CurrentCallBackWrapper.State); } catch (Exception ex) { NetworkComms.LogError(ex, "ManagedThreadPoolCallBackError", "An unhandled exception was caught while processing a callback. Make sure to catch errors in callbacks to prevent this error file being produced."); } finally { threadInfo.ClearInsideCallBack(); } threadInfo.UpdateLastActiveTime(); threadInfo.ClearCallBackWrapper(); } //As soon as the queue is empty we wait until perhaps close time if (!threadInfo.ThreadSignal.WaitOne(250)) { //While we are waiting we check to see if we need to close if (DateTime.Now - threadInfo.LastActiveTime > ThreadIdleTimeoutClose) { lock (SyncRoot) { //We have timed out but we don‘t go below the minimum if (threadDict.Count > MinThreadsCount) { if (threadInfo.ThreadIdle && requireJobThreadsCount > 0) requireJobThreadsCount--; threadInfo.ClearThreadIdle(); threadDict.Remove(threadInfo.ThreadId); workerInfoDict.Remove(threadInfo.ThreadId); UpdateThreadWaitSleepJoinCountCache(); return; } } } } //We only leave via one of our possible breaks } while (true); } /// <summary> /// Returns the total number of threads in the pool which are waiting for IO /// </summary> private void UpdateThreadWaitSleepJoinCountCache() { lock (SyncRoot) { if (DateTime.Now - LastThreadWaitSleepJoinCountCacheUpdate > ThreadWaitSleepJoinCountUpdateInterval) { int returnValue = 0; foreach (var thread in threadDict) { if (workerInfoDict[thread.Key].InsideCallBack && thread.Value.ThreadState == ThreadState.WaitSleepJoin) returnValue++; } CurrentNumWaitSleepJoinThreadsCache = returnValue; LastThreadWaitSleepJoinCountCacheUpdate = DateTime.Now; } } } /// <summary> /// Provides a brief string summarisation the state of the thread pool /// </summary> /// <returns></returns> public override string ToString() { lock (SyncRoot) { UpdateThreadWaitSleepJoinCountCache(); return "TotalTs:" + CurrentNumTotalThreads.ToString() + ", IdleTs:" + CurrentNumIdleThreads.ToString() + ", SleepTs:" + CurrentNumWaitSleepJoinThreadsCache.ToString() + ", Q:" + QueueCount.ToString(); } } } class WorkerInfo { public int ThreadId { get; private set; } public AutoResetEvent ThreadSignal { get; private set; } public bool ThreadIdle { get; private set; } public DateTime LastActiveTime {get; private set;} public WaitCallBackWrapper CurrentCallBackWrapper { get; private set; } public bool InsideCallBack { get; private set; } public WorkerInfo(int threadId, WaitCallBackWrapper initialisationCallBackWrapper) { ThreadSignal = new AutoResetEvent(false); ThreadIdle = false; ThreadId = threadId; LastActiveTime = DateTime.Now; this.CurrentCallBackWrapper = initialisationCallBackWrapper; } public void UpdateCurrentCallBackWrapper(WaitCallBackWrapper waitCallBackWrapper) { CurrentCallBackWrapper = waitCallBackWrapper; } public void UpdateLastActiveTime() { LastActiveTime = DateTime.Now; } public void ClearCallBackWrapper() { CurrentCallBackWrapper = null; } /// <summary> /// Set InsideCallBack to true /// </summary> public void SetInsideCallBack() { InsideCallBack = true; } /// <summary> /// Set InsideCallBack to false /// </summary> public void ClearInsideCallBack() { InsideCallBack = false; } /// <summary> /// Set threadIdle to true /// </summary> public void SetThreadIdle() { this.ThreadIdle = true; } /// <summary> /// Set threadIdle to false /// </summary> public void ClearThreadIdle() { this.ThreadIdle = false; } } /// <summary> /// A private wrapper used by CommsThreadPool /// </summary> class WaitCallBackWrapper { public WaitCallback WaitCallBack { get; private set; } public object State { get; private set; } public WaitCallBackWrapper(WaitCallback waitCallBack, object state) { this.WaitCallBack = waitCallBack; this.State = state; } } }
本例基于networkcomms2.3.1开源版本 gplv3协议
www.networkcomms.cn
www.cnblogs.com/networkcomms
c#网络通信框架networkcomms内核解析之十 支持优先级的自定义线程池
标签:
原文地址:http://www.cnblogs.com/networkcomms/p/4292185.html