标签:控制 item wait pool 应用 symbol option 通知 arch
1.Task概述:Task是对操作系统线程的抽象,目的是使线程池能高效地管理线程的分配和回收,Task使用的底层线程属于一种共享资源,任务需要互相协作,并及时归还线程,以便用相同的共享资源(线程)满足其他请求。
2.Task.AsyncState:获取在创建 Task 时提供的状态对象,如果未提供,则为 null。若状态对象在task内部改变了,AsyncState的数据也是改变后的状态对象。可查看ThreadApply.TaskAsyncState()方法的代码。
3.Task.ContinueWith():它的作用是把任务链接起来,在父任务完成后会立即执行后续任务。若在父任务的实例上多次调用ContinueWith()方法,在父任务完成后,所添加的后续任务会并行执行。当然可以根据TaskContinuationOptions枚举来指定根据父任务的执行情况,来执行后续任务。
4.Task的异常:Task执行期间产生的未处理的异常后会被禁止,直到调用某个任务完成(Task)成员,如Wait()、Result、WaitAll()、WaitAny(),上述的成员都会引发在任务执行期间发生的未处理异常。可查看代码TreadApply.TaskException()。
5.Task的异常2:要在不使用try/catch块的情况下处理未处理的异常,另一个办法是使用ContinueWith()任务,利用ContinueWith()委托的task参数,可以评估父任务的Exception属性,可查看代码TreadApply.TaskException2()。
6.Task的取消:可查看代码ThreadApply.TaskCancellationToken()方法,需要用到CancellationTokenSource类,对CancellationTokenSource.Cancel()方法的调用,会在从CancellationTokenSource.Token复制的所有取消标志上设置IsCancellationRequested属性。此中需要注意的2点,
(1)取消标志:CancellationToken(而不是CancellationTokenSource)会在异步任务中求值,CancellationToken看起来和CancellationTokenSource差不多。但CancellationToken用于监视和响应一个取消请求,而CancellationTokenSource用于取消任务本身。
(2)复制的:CancellationToken 是一个struct,所以调用CancellationTokenSource.Token会创建标志的一个副本,这样一来所有取消标志的实例都是线程安全的。
7.长时间运行的任务:如果开发人员知道一个Task要长时间运行,会长时间“霸占”一个底层线程资源,开发人员应告诉线程池共享线程不会太快交还。这样一来,线程池更有可能为任务创建一个专用线程(而不是分配其中的一个共享线程),为此,在调用StartNew()时,要使用TaskCreationOption.LongRunning选项。
8.并行迭代:.net4.0中新增了2个并行迭代,分别为Parallel.For()与Parallel.Foreach()。API会判断同时执行多少个线程才是效率最高的,可查看ThreadApply.ParallelFor()与ThreadApply.ParallelForeach()方法的代码。
9.并行异常的处理:在并行处理时,在并行的内容中可能出现多个异常,那么其异常信息会归到AggregateException异常类型,它是包含了多个内部异常的一个异常。System.Threading.Task命名空间一致使用System.AggregateException对未处理的异常进行分组,因为对于并行操作,经常都可能产生多个异常。如ThreadApply.ParallelForeach()代码清单。
10.并行循环的取消:Task需要一个显式调用才能阻塞(它的调用线程,并一直等)到它完成,并行循环虽然和任务不同,它以并行方式执行迭代,但它仍会阻塞(它的调用线程,并一直等)到整个Parallel.For()或Parallel.ForEach()循环结束。所以为了取消并行循环,调用取消请求的那个线程通常不能是正在执行并行循环的那个线程。如代码ThreadApply.CancelParallelForeach()清单。
11.并行迭代的中断:和标准的for循环相似,Parallel的循环也支持中断(ParallelLoopState.Break())以退出循环并取消进一步迭代的概念。但是,在并行执行的上下文中,中断循环意味着中断迭代之后的新迭代不应开始,当前正在执行的迭代还是会继续运行直至完成的。要想知道执行了一次中断的最低的迭代,并了解中断是否阻止了一个或多个迭代启动,可查看并行For()/ForEach()方法返回的ParallelLoopResult对象(其含有IsCompleted、LowestBreakIteration属性)。
12.并行Linq查询:并行Linq的功能都在System.Linq.ParallelEnumerable类中,该类的方法形式与System.Linq.Enumerable类的方法形式基本一致,ParallelEnumerable类中有对IEnumerable<T>类型,进行转化为并行的处理类型的扩展方法,即IEnumerable<T>.AsParallel()。只有在执行此转化后,才能对集合进行并行处理。
13.并行Linq的异常:与并行For和Foreach一样,PLinq运算也可能因为完全相同的原因返回多个异常(不同的迭代同时执行),幸好,捕获异常的机制也是一样。PLinq异常可以通过AggregateException的InnerException属性来访问,因此,将一个Plinq查询包装到一个try/catch块中,并捕捉AggregateException类型的异常,就可以处理每一次迭代中的未处理异常。
14.取消PLinq查询:和并行循环相似,取消的PLinq查询会引发一个OperationCanceledException,另外,PLinq查询会阻塞调用线程,直到查询完成。所以应该把并行Linq包装在Task中。在使用取消PLinq功能前,需要对并行集合执行WithCancellation()方法,如代码TreadApply.CancelParallelLinq()清单。
15.异步类使用说明:在选择要使用的异步类时,按照优选顺序从高到低依次是Task、ThreadPool、Thread。换而言之,首选TPL,如果不合适,就是使用ThreadPool,如果还是不合适就用Thread。
16.AppDomain的未处理异常:在主应用程序上,如果要登记一个回调方法来接收关于未处理的异常通知,只需要向应用程序域的UnhandledException事件注册即可。可查看ThreadApply.AppDomainException()代码清单,应用程序域的线程(包括主线程)上发生任何未处理的异常,都会触发UnhandledException回调,这只是一个通知机制,而不是实际捕捉和处理异常以使应用程序能继续运行的机制。事件发生之后,应用程序会退出。
public class ThreadApply { public void TaskCreate() { int times = 1000; Task task = new Task(() => { for (int i = 0; i < times; i++) { Console.Write("-"); } }); task.Start(); for (int i = 0; i < times; i++) { Console.Write("."); } task.Wait(); } public void TaskStaticCreate() { Task<string> task = Task.Factory.StartNew(() => { Thread.Sleep(100); return "主神"; }); foreach (char busySymbol in Utility.BusySymbols()) { if (task.IsCompleted) { Console.Write(‘\b‘); break; } Console.Write(busySymbol); } Console.WriteLine(); Console.WriteLine(task.Result); Console.WriteLine(task.Status); Trace.Assert(task.IsCompleted); } public void TaskAsyncState() { Man man = new Man() { Name = "主神", Age = 1 }; Task task = Task.Factory.StartNew((_man) => { Man tempMan = (Man)_man; Console.WriteLine("执行前{0}", tempMan); tempMan.Name = "诸神"; tempMan.Age += 10; Console.WriteLine("执行后{0}", tempMan); }, man); task.Wait(); Console.WriteLine("原始{0},异步后{1}", man, task.AsyncState); } public void TaskContinueWith() { Task<string> task = Task<string>.Factory.StartNew(() => { Console.WriteLine("主任务"); Thread.Sleep(200); return "红星"; }); Task faultedTask = task.ContinueWith((antecedentTask) => { Trace.Assert(antecedentTask.IsFaulted); Console.WriteLine("Task State: Faulted"); }, TaskContinuationOptions.OnlyOnFaulted); Task cancelTask = task.ContinueWith((antecedentTask) => { Trace.Assert(antecedentTask.IsCanceled); Console.WriteLine("Task State: Canceled"); }, TaskContinuationOptions.OnlyOnCanceled); Task completedTask = task.ContinueWith((antecedentTask) => { Trace.Assert(antecedentTask.IsCompleted); Console.WriteLine("Task Result:{0}", antecedentTask.Result); Console.WriteLine("Task State: Completed"); }, TaskContinuationOptions.OnlyOnRanToCompletion); completedTask.Wait();//这句代码可以不用,仅仅是为了阻止控制台关闭 } public void TaskException() { //此处代码的演示了未处理的框架,如何将任务的未处理的异常传回给主线程,注意异常的数据类型是异常集合AggregateException, Task task = Task.Factory.StartNew(() => { throw new ApplicationException(); }); try { task.Wait(); } catch (AggregateException ex) { foreach (Exception item in ex.InnerExceptions) { Console.WriteLine("error : {0}", item.Message); } } } public void TaskException2() { /*Task task = Task.Factory.StartNew(() => { throw new ApplicationException(); }); Task faultedTask = task.ContinueWith(antecedentTask => { Console.WriteLine("父任务执行失败"); }, TaskContinuationOptions.OnlyOnFaulted); faultedTask.Wait(); if (task.IsFaulted) { Console.WriteLine("Error: {0}", task.Exception.Message);//task的异常类型为AggregateException }*/ Task task = Task.Factory.StartNew(() => { throw new ApplicationException(); }); Task faultedTask = task.ContinueWith(antecedentTask => { foreach (var item in antecedentTask.Exception.InnerExceptions) { Console.WriteLine("Error:{0}", item.Message); } Console.WriteLine("父任务执行失败"); }, TaskContinuationOptions.OnlyOnFaulted); faultedTask.Wait(); } public void TaskCancellationToken() { string stars = "*".PadRight(50, ‘*‘); Console.WriteLine("Push Enter to exit."); CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); Task task = Task.Factory.StartNew(() => Utility.WriteNaturalNumber(cancellationTokenSource.Token), cancellationTokenSource.Token); Thread.Sleep(2000); Console.WriteLine(); cancellationTokenSource.Cancel(); Console.WriteLine(stars); task.Wait(); Console.WriteLine("Task Status: {0}", task.Status); //在任务中是正常取消的,所以任务的状态是RanToCompletion。若要状态为Canceled,可是使用CancellationToken.ThrowIfCancellationRequested()方法,报告异常。 } public void ParallelFor() { Parallel.For(0, 100, (i) => { Console.WriteLine("第{0}次,threadId={1}", i, Thread.CurrentThread.ManagedThreadId); }); } public void ParallelForeach() { IEnumerable<string> files = Directory.GetFiles("D:\\", "*.txt", SearchOption.AllDirectories); try { Parallel.ForEach(files, (fileName) => { //Encrypt(fileName); }); } catch (AggregateException ex) { foreach (var item in ex.InnerExceptions) { Console.WriteLine(item.Message); } } } public void CancelParallelForeach() { IEnumerable<string> files = Directory.GetFiles("D:\\", "*.txt", SearchOption.AllDirectories); CancellationTokenSource cts = new CancellationTokenSource(); ParallelOptions parallelOptions = new ParallelOptions() { CancellationToken = cts.Token }; cts.Token.Register(() => Console.WriteLine("Cancelling...")); Console.WriteLine("Push Enter to exit."); Task task = Task.Factory.StartNew(() => { //注意,在内部,并行循环条件通过IsCancellationRequested属性阻止尚未开始的新迭代开始。 Parallel.ForEach(files, parallelOptions, (fileName, loopState) => { //Encrypt(fileName); }); }); Console.ReadLine(); cts.Cancel(); Console.WriteLine("---------分割线----------"); task.Wait(); } public void ParalleEncrypt(List<string> data) { ParallelQuery<string> result = data.AsParallel().Select(item => Utility.Encrypt(item)); } public void CancelParallelLinq() { IEnumerable<int> data = Enumerable.Range(0, 10000); CancellationTokenSource cts = new CancellationTokenSource(); Console.WriteLine("Push Enter to exit."); Task task = Task.Factory.StartNew(() => { data.AsParallel().WithCancellation(cts.Token).Select(item => { Thread.Sleep(100);//模拟损耗的时间 return item >> 1; }); }, cts.Token); /*CancellationToken除了要传给WithCancellation(),还要作为StartNew()的第二个参数传递,这会造成Task.Wait()引发一个AggregateException, 它的InnerException属性会被设为一个TaskCanceledAException*/ Console.ReadLine(); cts.Cancel(); Console.WriteLine("------------分割线----------------"); try { task.Wait(); } catch (AggregateException ex) { foreach (Exception item in ex.InnerExceptions) { Console.WriteLine(item.Message); } } } public void AppDomainException() { try { AppDomain.CurrentDomain.UnhandledException += Utility.OnUnHandledException; ThreadPool.QueueUserWorkItem(state => { throw new Exception("不可饶恕"); }); Thread.Sleep(3000); Console.WriteLine("Still Running..."); } finally { Console.WriteLine("Exiting...."); } } public class Man { public string Name { get; set; } public int Age { get; set; } public override string ToString() { return string.Format("Name:{0},Age:{1}", Name, Age); } } } public class Utility { public static IEnumerable<char> BusySymbols() { string busySymbols = @"-\|/-\|/"; int next = 0; while (true) { yield return busySymbols[next]; next = ++next % busySymbols.Length; yield return ‘\b‘; } } /// <summary> /// 输出自然数 /// </summary> /// <param name="cancellationToken">取消标记</param> public static void WriteNaturalNumber(CancellationToken cancellationToken) { int num = 0; while (!cancellationToken.IsCancellationRequested) { Thread.Sleep(100); Console.Write("{0},", num++); } } public static string Encrypt(string text) { throw new NotImplementedException(); } public static void OnUnHandledException(object sender, UnhandledExceptionEventArgs e) { Exception exception = (Exception)e.ExceptionObject; Console.WriteLine("Error {0}:{1} -->{2}", exception.GetType().Name, exception.Message, exception.InnerException.Message); } }
--------------以上内容根据《C#本质论 第三版》进行整理
标签:控制 item wait pool 应用 symbol option 通知 arch
原文地址:http://www.cnblogs.com/zwt-blog/p/6344764.html