标签:main and msdn 数据量 reading 写入 adc dom class
最近在弄一个小项目,大概600w行的数据,要进行数据清洗,因数据量偏大,如果单线程去执行,会造成效率偏低,只能用多线程了,但采用多线程存在线程安全问题,于是查了下资料,发现有ConcurrentQueue<T>该数据结构,完美的解决了我目前问题。
采自msdn上面解释:表示线程安全的先进先出 (FIFO) 集合。
先说说简单的用法吧:(来自msdn)
1.Enqueue(T) 将对象添加到 ConcurrentQueue<T> 的结尾处。
2.TryDequeue(T) 尝试移除并返回位于并发队列开头处的对象。
3.Count 获取 ConcurrentQueue<T> 中包含的元素数
4.IsEmpty 获取一个值,该值指示 ConcurrentQueue<T> 是否为空。
下面是小项目的实现方案,采用最简单的方式(生产者/消费者模式),先将数据写入到队列中,再由消费者进行消费,以下是我写的一个小Demo,用于学习,不对的地方请各位多多指教!
using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; namespace ThreadCQueue { class Program { static void Main(string[] args) { Task t = RunProgram(); t.Wait(); Console.WriteLine("ok"); Console.ReadKey(); } static async Task RunProgram() { var taskQueue = new ConcurrentQueue<CustomTask>(); //生产 var taskSource = Task.Run(() => TaskProducer(taskQueue)); await taskSource; //消费者 var processors = new Task[4]; for (var i = 1; i <= 4; i++) { string processordId = i.ToString(); processors[i - 1] = Task.Run(() => TaskProcessor(taskQueue, $"Processor {processordId}")); } await Task.WhenAll(processors); } static async Task TaskProducer(ConcurrentQueue<CustomTask> queue) { for (var i = 1; i <= 20; i++) { await Task.Delay(50); var workItem = new CustomTask { Id = i }; queue.Enqueue(workItem); } } static async Task TaskProcessor(ConcurrentQueue<CustomTask> queue, string name) { CustomTask workItem; await GetRandomDelay(); while (queue.TryDequeue(out workItem)) { Console.WriteLine($"消费 {workItem.Id}===>{name}"); await GetRandomDelay(); } } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1, 500); return Task.Delay(delay); } } class CustomTask { public int Id { get; set; } } }
标签:main and msdn 数据量 reading 写入 adc dom class
原文地址:https://www.cnblogs.com/SmallHan/p/11874867.html