标签:c# action blog add 示意图 tar 重复 com 线程安全
在上一篇文章演示了并行的流水线操作(生产者和消费者并行同时执行),C#是通过BlockingCollection这个线程安全的对象作为Buffer,并且结合Task来实现的。但是上一篇文章有个缺陷,在整个流水线上,生产者和消费者是唯一的。本文将演示多个消费者多个生产者同时并行执行。
与前一篇文章演示的流水线思想类似,不同之处就是本文的topic:消费者和生产者有多个,以buffer1为例,起生产者有两个,消费者有两个,现在有三个纬度的并行:
class PiplelineDemo { PRivate int seed; public PiplelineDemo() { seed = 10; } public void Action11(BlockingCollection<string> output) { for (var i = 0; i < seed; i++) { output.Add(i.ToString());//initialize data to buffer1 } } public void Action12(BlockingCollection<string> output) { for (var i = 0; i < seed; i++) { output.Add(i.ToString());//initialize data to buffer1 } } public void Action21(BlockingCollection<string> input, BlockingCollection<string> output) { foreach (var item in input.GetConsumingEnumerable()) { var itemToInt = int.Parse(item); output.Add((itemToInt * itemToInt).ToString());// add new data to buffer2 } } public void Action22(BlockingCollection<string> input, BlockingCollection<string> output) { foreach (var item in input.GetConsumingEnumerable()) { var itemToInt = int.Parse(item); output.Add((itemToInt * itemToInt).ToString());// add new data to buffer2 } } public void Action31(BlockingCollection<string> input, BlockingCollection<string> output) { foreach (var item in input.GetConsumingEnumerable()) { output.Add((item));// add new data to buffer3 } } public void Action32(BlockingCollection<string> input, BlockingCollection<string> output) { foreach (var item in input.GetConsumingEnumerable()) { output.Add((item));// add new data to buffer3 } } public void Pipeline() { var buffer1 = new BlockingCollection<string>(seed * 2); var buffer2 = new BlockingCollection<string>(seed * 2); var buffer3 = new BlockingCollection<string>(seed * 2); var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None); var stage11 = taskFactory.StartNew(() => Action11(buffer1)); var stage12 = taskFactory.StartNew(() => Action12(buffer1)); Task.Factory.ContinueWhenAll(new Task[] { stage11, stage12 }, (tasks) => { buffer1.CompleteAdding(); }); var stage21 = taskFactory.StartNew(() => Action21(buffer1, buffer2)); var stage22 = taskFactory.StartNew(() => Action22(buffer1, buffer2)); Task.Factory.ContinueWhenAll(new Task[] { stage21, stage22 }, (tasks) => { buffer2.CompleteAdding(); }); var stage31 = taskFactory.StartNew(() => Action31(buffer2, buffer3)); var stage32 = taskFactory.StartNew(() => Action32(buffer2, buffer3)); Task.Factory.ContinueWhenAll(new Task[] { stage31, stage32 }, (tasks) => { buffer3.CompleteAdding(); }); Task.WaitAll(stage11, stage12, stage21, stage22, stage31, stage32); foreach (var item in buffer3.GetConsumingEnumerable())//print data in buffer3 { Console.WriteLine(item); } } }
ParallelProgramming-多消费者,多生产者同时运行并行
标签:c# action blog add 示意图 tar 重复 com 线程安全
原文地址:http://www.cnblogs.com/endv/p/6781507.html