码迷,mamicode.com
首页 > 其他好文 > 详细

How to: 使用 数据流 实现生产者-消费者模式

时间:2015-09-25 13:09:48      阅读:287      评论:0      收藏:0      [点我收藏+]

标签:

?

producer把消息发送到消息块,consumer从块读取消息。

安装:

  1. Install-Package Microsoft.Tpl.Dataflow
  2. ?
  3. using System.Threading.Tasks.Dataflow;

?

解释:

Produce方法随机生成字节,并Post到ITargetBlock对象;

Consumer方法从ISourceBlock对象读取字节;

可以使用BufferBlock来同时扮演源和目标对象。

Post():同步发送消息。

Complete():表明当前块(source block)已经没有数据更多的数据了。

Consumer方法使用await和async操作符异步地计算总的字节数。

OutputAvailableAsync():从source block收到一个通知(接收到Complete的通知),表明没有更多的数据可用。

?

  1. public static class DataflowProducerConsumer
  2. {
  3. ????// Demonstrates the production end of the producer and consumer pattern.
  4. ????static void Produce(ITargetBlock<byte[]> target)
  5. ????{
  6. ????????Random rand = new Random();
  7. ?
  8. ????????// In a loop, fill a buffer with random data and
  9. ????????// post the buffer to the target block.
  10. ????????for (int i = 0; i < 100; i++)
  11. ????????{
  12. ????????????// Create an array to hold random byte data.
  13. ????????????byte[] buffer = new byte[1024];
  14. ?
  15. ????????????// Fill the buffer with random bytes.
  16. ????????????rand.NextBytes(buffer);
  17. ?
  18. ????????????// Post the result to the message block.
  19. ????????????target.Post(buffer);
  20. ????????}
  21. ?
  22. ????????// Set the target to the completed state to signal to the consumer
  23. ????????// that no more data will be available.
  24. ????????target.Complete();
  25. ????}
  26. ?
  27. ????// Demonstrates the consumption end of the producer and consumer pattern.
  28. ????static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
  29. ????{
  30. ????????// Initialize a counter to track the number of bytes that are processed.
  31. ????????int bytesProcessed = 0;
  32. ?
  33. ????????// Read from the source buffer until the source buffer has no
  34. ????????// available output data.
  35. ????????while (await source.OutputAvailableAsync())
  36. ????????{
  37. ????????????byte[] data = source.Receive();
  38. ?
  39. ????????????// Increment the count of bytes received.
  40. ????????????bytesProcessed += data.Length;
  41. ????????}
  42. ?
  43. ????????return bytesProcessed;
  44. ????}
  45. ?
  46. ????static void Run(string[] args)
  47. ????{
  48. ????????// Create a BufferBlock<byte[]> object. This object serves as the
  49. ????????// target block for the producer and the source block for the consumer.
  50. ????????var buffer = new BufferBlock<byte[]>();
  51. ?
  52. ????????// Start the consumer. The Consume method runs asynchronously.
  53. ????????var consumer = ConsumeAsync(buffer);
  54. ?
  55. ????????// Post source data to the dataflow block.
  56. ????????Produce(buffer);
  57. ?
  58. ????????// Wait for the consumer to process all data.
  59. ????????consumer.Wait();
  60. ?
  61. ????????// Print the count of bytes processed to the console.
  62. ????????Console.WriteLine("Processed {0} bytes.", consumer.Result);
  63. ????}
  64. }

?

参考:https://msdn.microsoft.com/en-us/library/hh228601(v=vs.110).aspx

How to: 使用 数据流 实现生产者-消费者模式

标签:

原文地址:http://www.cnblogs.com/pengzhen/p/4837597.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!