码迷,mamicode.com
首页 > 其他好文 > 详细

TPL Part 4 -- Task的协同

时间:2015-05-01 21:17:52      阅读:177      评论:0      收藏:0      [点我收藏+]

标签:

简单的Continuation

Task.ContinueWith(Task): 当指定的Task执行完毕时。

void Main()
{
Task rootTask = new Task(()=>{
Console.WriteLine("root task completed");
});
root Task.ContinueWith((Task previousTask)=>{
Console.WriteLine("continute task completed");
});
 
rootTask.Start();
 
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
 
}


Task.ContinueWhenAll(Task[]):当指定的所有Task都执行完毕时,示例代码:

 

Task continuation = Task.Factory.ContinueWhenAll<int>(tasks, antecedents =>{
foreach(Task<int> t in antecedents) {
// dosomething
}
});


 

 

TaskFactory.ContinueWhenAny(Task[]):当指定的所有Task的任意1个执行完毕时,代码与ContinueWhenAll类似(以下代码中,打印出前1个Task的执行时间):

Task continuation = Task.Factory.ContinueWhenAny<int>(tasks,
(Task<int>antecedent) => {
//write out a message using the antecedent result
Console.WriteLine("The first task slept for {0} milliseconds",
antecedent.Result);
});


Continue 选项

OnlyOnRanToCompletion仅当执行完

NotOnRanToCompletion:没有执行完(被取消或出现异常)

OnlyOnFaulted:仅当出现异常

NotOnFaulted:没有出现异常

OnlyOnCancelled:仅当被取消

NotOnCancelled:没有被取消

处理异常

void Main()
{
Task rootTask = new Task(()=>{
Console.WriteLine("root task completed");
throw new Exception("root throwed exception");
});
rootTask.ContinueWith((Task previousTask)=>{
Console.WriteLine("even root throw exception , I still run");
});
 
rootTask.Start();
 
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
 
}


以上代码中,第一个task中抛出了异常,Continue的Task仍然会继续执行。可是Task被Finalized时异常就会抛出。

解决方案:

void Main()
{
Task rootTask = new Task(()=>{
Console.WriteLine("root task completed");
throw new Exception("root throwed exception");
});
var t2 = rootTask.ContinueWith((Task previousTask)=>{
//
if(previousTask.Status== TaskStatus.Faulted){
throw previousTask.Exception.InnerException;
}
Console.WriteLine("even root throw exception , I still run");
});
 
rootTask.Start();
 
try{
t2.Wait();
}
catch(AggregateException ex){
ex.Handle(inner=>{Console.WriteLine("exception handled in main thread"); return true;});
}
 
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
 
}


在Task中冒泡抛出异常,在主线程中等待最后那个Task的执行并对AggregateException进行处理。

创建子Task

创建子Task并附加在父Task上:

void Main()
{
 
Task parentTask = new Task(() => {
Console.WriteLine("parent task started");
//create the first child task
Task childTask = new Task(() => {
// writeout a message and wait
Console.WriteLine("Child task running");
Thread.Sleep(1000);
Console.WriteLine("Child task throwed exception");
throw new Exception();
} ,TaskCreationOptions.AttachedToParent);
Console.WriteLine("start child task...");
childTask.Start();
 
Console.WriteLine("parent task ended");
});
// startthe parent task
parentTask.Start();
 
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}


1. 父Task会抛出子Task中的异常

2. 父Task的状态会受到所附加的子Task状态的影响

Barrier的使用

 

class BankAccount {
public int Balance {
get;
set;
}
} ;
 
void Main()
{
//create the array of bank accounts
BankAccount[] accounts = new BankAccount[6];
for(int i = 0;i < accounts.Length; i++) {
accounts[i] = new BankAccount();
}
//create the total balance counter
int totalBalance = 0;
//create the barrier
Barrier barrier = new Barrier(3, (myBarrier) => {
 
// zerothe balance
totalBalance= 0;
// sumthe account totals
foreach(BankAccount account in accounts) {
totalBalance+= account.Balance;
}
// writeout the balance
Console.WriteLine("[From barrier :] Total balance: {0}",totalBalance);
});
//define the tasks array
Task[] tasks = new Task[3];
// loopto create the tasks
for(int i = 0;i < tasks.Length; i++) {
tasks[i]= new Task((stateObj) => {
//create a typed reference to the account
BankAccount account = (BankAccount)stateObj;
// startof phase
Random rnd = new Random();
for(int j = 0;j < 1000; j++) {
account.Balance+= 2;
}
 
Thread.Sleep(new Random().Next(3000));
 
Console.WriteLine("Task {0} waiting, phase {1} ",
Task.CurrentId,barrier.CurrentPhaseNumber);
//signal the barrier
 
barrier.SignalAndWait();
 
account.Balance-= 1000;
Console.WriteLine("barrier finished .");
// endof phase
Console.WriteLine("Task {0}, phase {1} ended",
Task.CurrentId,barrier.CurrentPhaseNumber);
//signal the barrier
barrier.SignalAndWait();
},
accounts[i]);
}
 
 
// startthe task
foreach(Task t in tasks) {
t.Start();
}
// waitfor all of the tasks to complete
Task.WaitAll(tasks);
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
 
}


在以上代码中,打开了3个barrier和三个Task,在Task中为每个账户添加2000,然后给barrier发出同步信号,当barrier收到3个信号时,对账号进行求和并保存;当barrier完成逻辑后,控制权交给了每个Task,此时每个Task对account减1000,再次求和,最后结果为3000。

如果希望通过Cancel来控制barrier的行为,还可以在barrier中传入tokenSource.Token:barrier.SignalAndWait(tokenSource.Token);并在Task中执行Cancel:tokenSource.Cancel()。

可以通过调用barrier.RemoveParticipant();来减少barrier的count。

CountEventDown

作用和Barrier类似,累计信号数量,当信号量达到指定数量,set event。

void Main()
{
 
CountdownEvent cdevent = new CountdownEvent(5);
//create a Random that we will use to generate
// sleepintervals
Random rnd = new Random();
//create 5 tasks, each of which will wait for
// arandom period and then signal the event
Task[] tasks = new Task[6];
for(int i = 0;i < tasks.Length; i++) {
//create the new task
tasks[i]= new Task(() => {
// putthe task to sleep for a random period
// up toone second
Thread.Sleep(rnd.Next(500, 1000));
//signal the event
Console.WriteLine("Task {0} signalling event",Task.CurrentId);
cdevent.Signal();
});
};
//create the final task, which will rendezous with the other 5
// usingthe count down event
tasks[5] = new Task(()=> {
// waiton the event
Console.WriteLine("Rendezvous task waiting");
cdevent.Wait();
Console.WriteLine("CountDownEvent has been set");
});
 
// startthe tasks
foreach(Task t in tasks) {
t.Start();
}
Task.WaitAll(tasks);
 
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
 
}


在以上代码中,开启了5个Task和1个count为5的CountDownEvent对象,每个Task中完成任务后分别对CountDownEvent发信号,当凑齐5个信号后,会打印出CountDownEvent has been set。

ManualResetEvent 和 AutoResetEvent

熟悉.net之前版本的应该都对它们很熟悉,用于在多线程环境中完成线程同步。区别在于,前者必须调用reset才能恢复信号;而AutoResetEvent则会自动reset。在此不再赘述。

SemaphoreSlim

void Main()
{
    SemaphoreSlim semaphore = new SemaphoreSlim(3);
//create the cancellation token source
CancellationTokenSource tokenSource
= new CancellationTokenSource();
 
//create and start the task that will wait on the event
for(int i = 0;i < 10; i++) {
Task.Factory.StartNew((obj)=> {
 
semaphore.Wait(tokenSource.Token);
// printout a message when we are released
Console.WriteLine("Task {0} released", obj);
 
},i,tokenSource.Token);
}
 
//create and start the signalling task
Task signallingTask = Task.Factory.StartNew(() => {
// loopwhile the task has not been cancelled
while(!tokenSource.Token.IsCancellationRequested) {
// go tosleep for a random period
tokenSource.Token.WaitHandle.WaitOne(500);
//signal the semaphore
semaphore.Release(3);
Console.WriteLine("Semaphore released");
}
// if wereach this point, we know the task has been cancelled
tokenSource.Token.ThrowIfCancellationRequested();
},tokenSource.Token);
// askthe user to press return before we cancel
// thetoken and bring the tasks to an end
Console.WriteLine("Press enter to cancel tasks");
Console.ReadLine();
//cancel the token source and wait for the tasks
tokenSource.Cancel();
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
 
}


在以上代码中,new了1个SemaphoreSlim对象并传入3,开了10个Task线程,每当有信号从Semaphore传来时,打印Task[i]被release。同时开1个信号线程,每500毫秒release3个Task。

可见,Semaphore的作用主要是可以选择一次release多少个Task。

 

Producer / Consumer(生产者/消费者模式)

以下代码中,new了1个BlockingCollection,类型为Deposit。开了3个生产者Task,每个生产者中创建20个Deposit对象并给Amount赋值为100。在主线程中等待生产者Task执行完毕,调用blockingCollection.CompleteAdding()方法。之后开1个消费者Task用于操作账户对象,循环判断blockingCollection.IsCompleted属性(生产者是否完成工作),从集合拿出存款对象,增加账户余额。

示例代码:

class BankAccount {
public int Balance {
get;
set;
}
}
class Deposit {
public int Amount {
get;
set;
}
}
 
void Main()
{
BlockingCollection<Deposit> blockingCollection
= new BlockingCollection<Deposit>();
 
var producers = new List<Task>();
for(int i = 0;i < 3; i++) {
var producer = Task.Factory.StartNew((obj) => {
//create a series of deposits
for(int j = 0;j < 20; j++) {
//create the transfer
var randAmount = new Random().Next(100);
Deposit deposit = new Deposit { Amount = randAmount};
Thread.Sleep(newRandom().Next(200));
// placethe transfer in the collection
blockingCollection.Add(deposit);
Console.WriteLine(string.Format("Amount: {0} deposit Processed, index: {1}",randAmount, int.Parse(obj.ToString()) +j));
 
}
}, i*20);
producers.Add(producer);
};
//create a many to one continuation that will signal
// theend of production to the consumer
Task.Factory.ContinueWhenAll(producers.ToArray(),antecedents => {
//signal that production has ended
Console.WriteLine("Signalling production end");
blockingCollection.CompleteAdding();
});
//create a bank account
BankAccount account = new BankAccount();
//create the consumer, which will update
// thebalance based on the deposits
Task consumer = Task.Factory.StartNew(() => {
while(!blockingCollection.IsCompleted) {
Deposit deposit;
// tryto take the next item
if(blockingCollection.TryTake(outdeposit)) {
//update the balance with the transfer amount
account.Balance+= deposit.Amount;
}
}
// printout the final balance
Console.WriteLine("Final Balance: {0}", account.Balance);
});
// waitfor the consumer to finish
consumer.Wait();
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
 
}


TPL Part 4 -- Task的协同

标签:

原文地址:http://blog.csdn.net/lan_liang/article/details/45421919

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!