标签:
线程池虽然好用,但限制也不少:
(1)总觉得默认的 MaxThread 小了一点,每次使用都要手工调大= =
(2)任务不能等待完成
(3)任务一旦加入不能取消,甚至不知道是正在排队/正在执行/执行完毕
(4)最恶心的一点,全是 MTAThread,好多COM都不能用。ClipBoard、WebBrowser ...
实在不能忍了,自己写了个“任务池”,模拟线程池的功能。不同点有:
(1)没有数量上限,新进的任务不需要排队(但任务太多可能影响系统性能)
(2)任务的创建和开始可以分开,也可以创建时就开始。
(3)任务可等待
(4)任务可强制取消,虽然非常非常非常非常非常不建议这么做,这可能造成不可预知的问题
(5)可以选择任务线程的 ApartmentState (MTA / STA / Unknown = 无所谓)
相同点在于:
(1)保证(近乎绝对的)线程安全,逻辑上没有线程不安全的地方
(2)线程可重用
不足之处在于:
(1)还是应该添加限制线程数目的功能,避免对系统性能造成太大影响
(2)任务应该支持“尝试取消”操作
(3)有些 TTask 类里的字段应该对用户只读,但语法上实在做不到= =(没有 friend 关键字)
Version 1.0
1 using System; 2 using System.Collections; 3 using System.Collections.Generic; 4 using System.Collections.ObjectModel; 5 using System.Linq; 6 using System.Net.Mime; 7 using System.Runtime.CompilerServices; 8 using System.Text; 9 using System.Threading; 10 11 12 // ReSharper Disable InconsistentNaming 13 14 namespace Efficiency.Core { 15 16 17 //+ delegate TTaskDelegate 18 public delegate void TTaskDelegate(TTask task, object param); 19 20 //+ class TTask 21 public class TTask { 22 23 public readonly AutoResetEvent Event = new AutoResetEvent(false); 24 public readonly ApartmentState ApartmentState; 25 26 public readonly TTaskDelegate Callback; 27 public readonly object Param; 28 29 public Thread Thread { get; set; } 30 31 //-- .ctor() 32 public TTask(TTaskDelegate callback, object param, ApartmentState state) 33 { 34 this.ApartmentState = state; 35 this.Param = param; 36 this.Callback = callback; 37 } 38 39 40 public int Status = 0; 41 // 0: Working... 42 // 1: Marked Finished 43 // 2: Marked Force Exited 44 public bool IsFinished => (this.Status != 0); 45 46 public bool Wait(int timeout = -1) => this.Event.WaitOne(timeout); 47 48 49 //-- void ForceExit() 50 public void ForceExit() 51 { 52 if (Interlocked.CompareExchange(ref this.Status, 2, 0) != 0) return; 53 try { 54 this.Thread.Abort(); 55 } 56 catch { 57 // Ignored 58 } 59 } 60 61 //-- void ForceExit(out Exception) 62 public bool ForceExit(out Exception exception) 63 { 64 exception = null; 65 if (Interlocked.CompareExchange(ref this.Status, 2, 0) != 0) return true; 66 try { 67 this.Thread.Abort(); 68 return true; 69 } 70 catch(Exception ex) { 71 exception = ex; 72 return false; 73 } 74 } 75 } // class TTask 76 77 78 //+ class TTaskPool 79 public static class TTaskPool { 80 81 82 //+ class TTaskPool.TTaskThreadContext 83 private class TTaskThreadContext { 84 public static volatile int CanExit = 0; 85 86 public readonly AutoResetEvent WaitEvent = new AutoResetEvent(false); 87 public readonly Queue<Thread> ThreadQueue; 88 public TTask Task; 89 90 //-- .ctor() 91 public TTaskThreadContext(Queue<Thread> threadQueue, TTask task) 92 { 93 this.ThreadQueue = threadQueue; 94 this.Task = task; 95 } 96 97 } // class TTaskPool.TTaskThreadContext 98 99 100 101 private static readonly Queue<Thread> m_STAQueue = new Queue<Thread>(); 102 private static readonly ReaderWriterLock m_STARWLock = new ReaderWriterLock(); 103 private static readonly Dictionary<Thread, TTaskThreadContext> m_STAContext = 104 new Dictionary<Thread, TTaskThreadContext>(); 105 106 private static readonly Queue<Thread> m_MTAQueue = new Queue<Thread>(); 107 private static readonly ReaderWriterLock m_MTARWLock = new ReaderWriterLock(); 108 private static readonly Dictionary<Thread, TTaskThreadContext> m_MTAContext = 109 new Dictionary<Thread, TTaskThreadContext>(); 110 111 112 private static int s_PeakSTATaskCount = 0; 113 public static int PeakSTATaskCount => s_PeakSTATaskCount; 114 115 private static int s_PeakMTATaskCount = 0; 116 public static int PeakMTATaskCount => s_PeakMTATaskCount; 117 118 119 public static int PeakTaskCount => PeakMTATaskCount + PeakSTATaskCount; 120 121 122 //-- TTask CreateTask(TTaskDelegate, object) 123 [MethodImpl(0x100)] 124 public static TTask CreateTask( 125 TTaskDelegate callback, 126 object param = null, 127 ApartmentState state = ApartmentState.Unknown) 128 { 129 if (callback == null) { 130 throw new ArgumentNullException(nameof(callback)); 131 } 132 return new TTask(callback, param, state); 133 } 134 135 136 //-- bool TryInsertTask(TTask, ThreadQueue<Thread>, Dictionary<Thread, TTaskThreadContext>, ReaderWriterLock) 137 [MethodImpl(0x100)] 138 private static bool TryInsertTask( 139 TTask task, 140 Queue<Thread> queue, 141 Dictionary<Thread, TTaskThreadContext> dict, 142 ReaderWriterLock rwlock, 143 ApartmentState state, 144 bool force) 145 { 146 Thread thread = null; 147 TTaskThreadContext context; 148 bool isNew; 149 lock (queue) { 150 if (queue.Count == 0) { 151 if (! force) return false; 152 isNew = true; 153 } 154 else { 155 thread = queue.Dequeue(); 156 isNew = false; 157 } 158 } 159 160 if (isNew) { 161 thread = new Thread(TTaskThreadRoutine); 162 thread.SetApartmentState(state); 163 thread.IsBackground = true; 164 if (state == ApartmentState.STA) 165 Interlocked.Increment(ref s_PeakSTATaskCount); 166 else 167 Interlocked.Increment(ref s_PeakMTATaskCount); 168 169 context = new TTaskThreadContext(queue, task); 170 rwlock.AcquireWriterLock(-1); 171 dict.Add(thread, context); 172 rwlock.ReleaseWriterLock(); 173 thread.Start(context); 174 } 175 else { 176 rwlock.AcquireReaderLock(-1); 177 context = dict[thread]; 178 rwlock.ReleaseReaderLock(); 179 context.Task = task; 180 context.WaitEvent.Set(); 181 } 182 183 return true; 184 } 185 186 187 //-- void InsertTask(TTask) 188 public static void InsertTask(TTask task) 189 { 190 if (task == null) { 191 throw new ArgumentNullException(nameof(task)); 192 } 193 194 switch (task.ApartmentState) { 195 case (ApartmentState.STA): 196 TryInsertTask(task, m_STAQueue, m_STAContext, m_STARWLock, ApartmentState.STA, true); 197 break; 198 199 case (ApartmentState.MTA): 200 TryInsertTask(task, m_MTAQueue, m_MTAContext, m_MTARWLock, ApartmentState.MTA, true); 201 break; 202 203 default: 204 if (TryInsertTask(task, m_MTAQueue, m_MTAContext, m_MTARWLock, ApartmentState.MTA, false)) return; 205 if (TryInsertTask(task, m_STAQueue, m_STAContext, m_STARWLock, ApartmentState.STA, false)) return; 206 TryInsertTask(task, m_MTAQueue, m_MTAContext, m_MTARWLock, ApartmentState.MTA, true); 207 break; 208 } 209 210 } 211 212 213 //-- TTask CreateInsertTask(STATaskDelegate, object, ApartmentState) 214 [MethodImpl(0x100)] 215 public static TTask CreateInsertTask( 216 TTaskDelegate callback, 217 object param = null, 218 ApartmentState state = ApartmentState.Unknown) 219 { 220 if (callback == null) { 221 throw new ArgumentNullException(nameof(callback)); 222 } 223 TTask task = new TTask(callback, param, state); 224 InsertTask(task); 225 return task; 226 } 227 228 229 //-- void TTaskThreadRoutine(object) 230 private static void TTaskThreadRoutine(object threadContext) 231 { 232 TTaskThreadContext context = (TTaskThreadContext)threadContext; 233 Thread thisThread = Thread.CurrentThread; 234 while (true) { 235 if (TTaskThreadContext.CanExit != 0) return; 236 thisThread.Priority = ThreadPriority.Normal; 237 238 TTask task = context.Task; 239 task.Callback.Invoke(task, task.Param); 240 task.Thread = thisThread; 241 thisThread.IsBackground = true; 242 243 task.Event.Set(); 244 if (Interlocked.CompareExchange(ref task.Status, 1, 0) == 2) { 245 try { 246 thisThread.Abort(); 247 } 248 catch { 249 // Ignored 250 } 251 } 252 if (TTaskThreadContext.CanExit != 0) return; 253 254 lock (context.ThreadQueue) { 255 context.ThreadQueue.Enqueue(thisThread); 256 } 257 context.WaitEvent.WaitOne(-1); 258 } 259 } 260 261 262 } // class TTaskPool 263 264 } // namespace Efficiency.Core
标签:
原文地址:http://www.cnblogs.com/catchyrime/p/4438510.html