标签:
本文介绍如何使用C#实现并行执行的流水线(生产者消费者):
上图演示了流水线,action1接收input,然后产生结果保存在buffer1中,action2读取buffer1中由action1产生的数据,以此类推指导action4完成产生Output。
以上也是典型的生产者消费者模式。
上面的模式如果使用普通常规的串行执行是很简单的,按部就班按照流程图一步一步执行即可。如果为了提高效率,想使用并行执行,也就是说生产者和消费者同时并行执行,该怎么办么?
class PiplelineDemo
{
private int seed;
public PiplelineDemo()
{
seed = 10;
}
public void Action1(BlockingCollection<string> output)
{
try
{
for (var i = 0; i < seed; i++)
{
output.Add(i.ToString());//initialize data to buffer1
}
}
finally
{
output.CompleteAdding();
}
}
public void Action2(BlockingCollection<string> input, BlockingCollection<string> output)
{
try
{
foreach (var item in input.GetConsumingEnumerable())
{
var itemToInt = int.Parse(item);
output.Add((itemToInt * itemToInt).ToString());// add new data to buffer2
}
}
finally
{
output.CompleteAdding();
}
}
public void Action3(BlockingCollection<string> input, BlockingCollection<string> output)
{
try
{
foreach (var item in input.GetConsumingEnumerable())
{
output.Add(item);//set data into buffer3
}
}
finally
{
output.CompleteAdding();
}
}
public void Pipeline()
{
var buffer1 = new BlockingCollection<string>(seed);
var buffer2 = new BlockingCollection<string>(seed);
var buffer3 = new BlockingCollection<string>(seed);
var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
var stage1 = taskFactory.StartNew(() => Action1(buffer1));
var stage2 = taskFactory.StartNew(() => Action2(buffer1, buffer2));
var stage3 = taskFactory.StartNew(() => Action3(buffer2, buffer3));
Task.WaitAll(stage1, stage2, stage3);
foreach(var item in buffer3.GetConsumingEnumerable())//print data in buffer3
{
Console.WriteLine(item);
}
}
}
class Program
{
static void Main(string[] args)
{
new PiplelineDemo().Pipeline();
Console.Read();
}
}
预期打印出了0-9自我相乘的结果。
代码本身的逻辑和本文开始的流程图是一一对应的。
BlockingCollection<T>是.Net里面的一个线程安全集合。实现了IProducerConsumerCollection<T>.
GetConsumingEnumberable是一个非常强大的东东,专门写一片文章介绍介绍。
Parallel Programming-实现并行操作的流水线(生产者、消费者)
标签:
原文地址:http://www.cnblogs.com/kmpp/p/Parallel_Programming_Pipeline.html