业务需求: 发送特定的请求,根据返回的信息执行特定的事件。
目前的做法:把我的请求放入一个容器内,然后待到某一条件,就从这个容器把请求发送出去,等客户返回信息时,查询容器中对应请求中特定的事件。开始的时候我使用 List .其中遇到一些问题,纪录一下
1 namespace CollSecExp 2 { 3 class Program 4 { 5 static void Main(string[] args) 6 { 7 List<int> list = new List<int>(); 8 for (int i = 0; i < 10; i++) 9 { 10 list.Add(i); 11 } 12 13 Thread t1 = new Thread(() => 14 { 15 foreach (var item in list) 16 { 17 Console.WriteLine("t1.item:{0}", item); 18 Thread.Sleep(1000); 19 } 20 }); 21 t1.Start(); 22 23 Thread t2 = new Thread(() => 24 { 25 Thread.Sleep(1000); 26 list.RemoveAt(1); 27 list.RemoveAt(3); 28 foreach (var item in list) 29 { 30 Console.WriteLine("t2.item:{0}", item); 31 } 32 }); 33 t2.Start(); 34 } 35 } 36 }运行会抛出InvalidOperationException异常,提示“集合已修改;可能无法执行枚举操作。”
这是因为,线程2移除index=1,3的元素导致集合被修改。使用加锁
1 namespace CollSecExp 2 { 3 class Program 4 { 5 static void Main(string[] args) 6 { 7 object sycObj = new object(); 8 List<int> list = new List<int>(); 9 for (int i = 0; i < 10; i++) 10 { 11 list.Add(i); 12 } 13 14 Thread t1 = new Thread(() => 15 { 16 lock (sycObj) 17 { 18 foreach (var item in list) 19 { 20 Console.WriteLine("t1.item:{0}", item); 21 Thread.Sleep(1000); 22 } 23 } 24 }); 25 t1.Start(); 26 27 Thread t2 = new Thread(() => 28 { 29 Thread.Sleep(1000); 30 lock (sycObj) 31 { 32 list.RemoveAt(1); 33 list.RemoveAt(3); 34 foreach (var item in list) 35 { 36 Console.WriteLine("t2.item:{0}", item); 37 } 38 } 39 }); 40 t2.Start(); 41 } 42 } 43 }加锁就可以解决了。
后来使用了想起了对于请求,我们可以使用队列,最简单的请求Queue如下
1 static void Main(string[] args) 2 { 3 int count = 0; 4 Queue<int> queue = new Queue<int>(); 5 Task.Factory.StartNew(() => 6 { 7 while (true) 8 { 9 Thread.Sleep(3000); 10 Console.WriteLine("生产的元素是: " + count); 11 queue.Enqueue(count); 12 count++; 13 14 } 15 16 }); 17 18 ; 19 Task.Factory.StartNew(() => 20 { 21 while (true) 22 { 23 if (queue.Count > 0) 24 { 25 int value = queue.Dequeue(); 26 Console.WriteLine("worker1 " + ": 消费的元素是: " + value); 27 } 28 } 29 30 }); 31 32 Task.Factory.StartNew(() => 33 { 34 while (true) 35 { 36 if (queue.Count > 0) 37 { 38 int value = queue.Dequeue(); 39 Console.WriteLine(Thread.CurrentThread + " : 消费的元素是: " + value); 40 } 41 } 42 43 }); 44 45 46 Console.ReadKey(); 47 48 }由于没有加锁或是其他机制,那么很容易出错,于是改进了版本。
1 static void Main(string[] args) 2 { 3 int count = 0; 4 var queue = new ConcurrentQueue<int>(); 5 Task.Factory.StartNew(() => 6 { 7 while (true) 8 { 9 Thread.Sleep(1000); 10 Console.WriteLine("生产的元素是: " + count); 11 queue.Enqueue(count); 12 count++; 13 14 } 15 16 }); 17 18 ; 19 Task.Factory.StartNew(() => 20 { 21 while (true) 22 { 23 24 int value; 25 if (queue.TryDequeue(out value)) 26 { 27 Console.WriteLine("worker1 " + ": 消费的元素是: " + value); 28 } 29 30 } 31 32 }); 33 34 Task.Factory.StartNew(() => 35 { 36 while (true) 37 { 38 int value; 39 if (queue.TryDequeue(out value)) 40 { 41 Console.WriteLine("worker2 " + " : 消费的元素是: " + value); 42 } 43 } 44 45 });
执行这段代码,可以工作,但是有点不太优雅,能不能不要去判断集合是否为空?集合当自己没有元素的时候自己Block一下可以吗?答案当然是可以的,使用BlockingCollection即可:
1 static void Main(string[] args) 2 { 3 int count = 0; 4 var queue = new BlockingCollection<int>(); 5 var product = Task.Factory.StartNew(() => 6 { 7 for (int i = 20 - 1; i >= 0; i--) 8 { 9 Console.WriteLine("生产的元素是: " + count); 10 queue.Add(count); 11 count++; 12 } 13 14 }); 15 16 17 var consumer1 = Task.Factory.StartNew(() => 18 { 19 foreach (int value in queue.GetConsumingEnumerable()) 20 { 21 Console.WriteLine("worker1 " + ": 消费的元素是: " + value); 22 23 } 24 25 }); 26 27 var consumer2 = Task.Factory.StartNew(() => 28 { 29 foreach (int value in queue.GetConsumingEnumerable()) 30 { 31 if (value % 2 == 0) 32 { 33 Console.WriteLine("worker2 " + " : 消费的元素是: " + value); 34 } 35 else 36 { 37 queue.Add(value); 38 } 39 } 40 }); 41 Task.WaitAny(product, consumer2, consumer1); 42 Console.ReadKey(); 43 }
当我需要控制Queue停止反复遍历获取queue是否存在元素时,可以使用如下方法,同时在构造方法中可以指定队列的大小
1 blockingCollection.CompleteAdding();
参考文章: