标签:
等待信号和触发信号 - Signaling with Wait and Puls
前面讨论事件等待句柄--一个简单的信号机制,一个线程一直阻塞直到接收到另外一个线程的通知。
一个更强大的信号机制被Monitor类所经由静态函数Wait和Pluse(及PulseAll)提供。你自己编写通知逻辑,使用自定义标记和字段(加上锁),然后引入Wait和Pluse命令来阻止自旋。就lock语句和这些函数,你就可以完成AutoResetEvent,ManualResetEven和Semaplore的功能,同样你也可以完成WaitHandle静态函数WaitAll和WaitAny的功能。因此,Wait和Pluse能够在等待句柄使用的地方可以使用。
Wait和Pluse信号比起等待事件句柄也有不足的地方:
在文档中没有很明显地告诉你如何使用Wait和Pluse,即使你读完了它们是如何工作。Wait和Pluse对于业余的人来说特别讨厌。幸运的是,这里有一个很简单的模式来驯服Wait和Pluse。
Pluse大概耗费100ns,是调用Set函数的三分之一。在不竞争信号上等等的时间完全由你决定--因为你自己实现了逻辑。
How to Use Wait and Pluse
这种方式允许你让任何线程为任何条件等待任何时间。下面是一个简单的例子,工作线程等待直到_go字段为true:
class SimpleWaitPluse { static object _locker = new object(); static bool _go; static void Main() { new Thread(Work).Start(); Console.ReadLine(); lock(_locker) { _go=true; Monitor.Pluse(_locker); } } static void Work() { lock(_locker) { while(!go)Monitor.Wait(_locker); } Console.WriteLine("Woken!"); } }
为了线程安全,必须确保所有共享字段都加锁。因此,围绕着读/更新_go标记添加一行lock语句。这很关键(除非你想使用非阻塞同步原理)。
Work函数一直阻塞,直到_go为true。Monitor.Wait做了以下的事情,按顺序:
Monitor.Wait要求在lock语句内使用,否则将仍处一个异常。对于Monitor.Pluse也一样。
在Main中,通过设置_go为true且调用Pluse来通知工作线程。只要一释放锁,工作线程立马执行。
Pluse和PluseAll释放阻塞在Wait上的线程。Pluse最多释放一个线程;Pluse释放所有。这个例子中只有一个线程阻塞。如果多个线程使用我们的建议调用PluseAll更加安全。
为了让Wait和Pluse或PluseAll通讯,同步对象必须是相同的(这个例子是_locker)。
在我们的模式中,触发(Pluse)表示有些东西已经改变,等待线程必须重新检查阻塞条件。Work工作线程中通过while循环来检查。然后,等待者决定是否继续。
移除while循环就得到一个皮包骨头的例子:
class SimpleWaitPluse { static object _locker = new object(); static bool _go; static void Main() { new Thread(Work).Start(); Console.ReadLine(); lock(_locker) { _go=true; Monitor.Pluse(_locker); } } static void Work() { lock(_locker) { Monitor.Wait(_locker); } Console.WriteLine("Woken!"); } }
它的输出是不确定的。如果Wait先执行,那么工作正常。如果Pluse线执行,那么这个通知将丢失,工作线程永远卡住。这一点与AutoResetEvent是不同的,Set语句将有一块内存并锁住影响,所以即使在WaitOne之前调用它也是有效的。
Pluse不会锁住自己,所以必须用go标记。这就是Wait和Pluse的才艺:我们使用一个bool标记,可以使他像AutoResetEvent一样工作;使用一个整数字段,可以编写一个CountdownEvent Seamphore。使用更多的数据结构,可以编写一个生产/消耗者队列。
生产/消耗者队列
前面已经描述了生产/消耗者队列的概念,及如何使用AutoResetEvent来编写。现在使用Wait和Pluse来编写更强大的生产/消耗者队列。这次允许任意数量的工作项,每一个拥有自己的线程。使用数组来跟踪线程,这使得可以在关闭对列时使用Join选项。
每个工作线程执行一个Consume函数。创建和启动这些线程在一个循环中。将使用更灵活的方法而不是字符串来描述一个任务。使用System.Action委托,它可以匹配任何参数方法,不像ThreadStart委托。仍然调用带有参数的方法来表示任务,通过封装在一个匿名函数或lambda表达式中。使用Queue<T>集合来表示任务的队列。
下面是完整的代码:
using System; using System.Threading; using System.Collections.Generic; public class PCQueue { readonly object _locker = new object(); Thread[] _workers; Queue<Action> _itemQ = new Queue<Action>(); public PCQueue (int workerCount) { _workers = new Thread [workerCount]; // Create and start a separate thread for each worker for (int i = 0; i < workerCount; i++) (_workers [i] = new Thread (Consume)).Start(); } public void Shutdown (bool waitForWorkers) { // Enqueue one null item per worker to make each exit. foreach (Thread worker in _workers) EnqueueItem (null); // Wait for workers to finish if (waitForWorkers) foreach (Thread worker in _workers) worker.Join(); } public void EnqueueItem (Action item) { lock (_locker) { _itemQ.Enqueue (item); // We must pulse because we‘re Monitor.Pulse (_locker); // changing a blocking condition. } } void Consume() { while (true) // Keep consuming until { // told otherwise. Action item; lock (_locker) { while (_itemQ.Count == 0) Monitor.Wait (_locker); item = _itemQ.Dequeue(); } if (item == null) return; // This signals our exit. item(); // Execute item. } } }
标签:
原文地址:http://www.cnblogs.com/C-Sharp2/p/4259280.html