标签:
这本书介绍了一些多线程编程的模式,也就是会使用多线程的场景,以及可以使用.net中的什么技术实现--当然主要是TPL(Task parallel Library)和PLINQ(parallel LINQ)。TPL是.NET Framework 4中加的功能,目的是封装以前的thread和同步,线程池等概念,大家只要使用task和System.Threading.Tasks.Parallel,提供并行任务,并行和同步的细节交给库处理。 PLINQ是LINQ to Objects的并行版本。下面是这些结构的一个概览。
包括的模式有并行循环(parallel loops)和并行聚合(parallel aggregation)。并行循环强调并行循环之间没有数据依赖,不用控制并行顺序;并行聚合类似于map-reduce,任务有并发的部分,但数据也有需要控制同步的地方,.net提供的库很好的封装了同步,使用起来就像不需要同步一样简单。
包括的模式有并行任务parallel tasks、future模式、动态任务并行(dynamic task parallelism)、流水线pipelines。
int n = ...
for (int i = 0; i < n; i++)
{
// do some task
}
对应的并行版本:
int n = ...
Parallel.For(0, n, i =>
{
// ...
});
Foreach也有相应的并行版本
IEnumerable<MyObject> myEnumerable = ...
foreach (var obj in myEnumerable)
{
// ...
}
IEnumerable<MyObject> myEnumerable = ...
Parallel.ForEach(myEnumerable, obj =>
{
// ...
});
PLINQ多线程的例子
IEnumerable<MyObject> source = ...
// LINQ
var query1 = from i in source select Normalize(i);
// PLINQ
var query2 = from i in source.AsParallel()
select Normalize(i);
int n = ...
for (int i = 0; i < n; i++)
{
// ...
if (/* stopping condition is true */)
break;
}
采用多线程版本时可以这样退出循环:
int n = ...
Parallel.For(0, n, (i, loopState) =>
{
// ...
if (/* stopping condition is true */)
{
loopState.Break();
return;
}
});
签名:
Parallel.For(int fromInclusive,
int toExclusive,
Action<int, ParallelLoopState> body);
检查task状态是不是中断退出的方法:
int n = ...
var result = new double[n];
var loopResult = Parallel.For(0, n, (i, loopState) =>
{
if (/* break condition is true */)
{
loopState.Break();
return;
}
result[i] = DoWork(i);
});
if (!loopResult.IsCompleted &&
loopResult.LowestBreakIteration.HasValue)
{
Console.WriteLine("Loop encountered a break at {0}",
loopResult.LowestBreakIteration.Value);
}
var n = ...
var loopResult = Parallel.For(0, n, (i, loopState) =>
{
if (/* stopping condition is true */)
{
loopState.Stop();
return;
}
result[i] = DoWork(i);
});
if (!loopResult.IsCompleted &&
!loopResult.LowestBreakIteration.HasValue)
{
Console.WriteLine(“Loop was stopped”);
}
void DoLoop(CancellationTokenSource cts)
{
int n = ...
CancellationToken token = cts.Token;
var options = new ParallelOptions
{ CancellationToken = token };
try
{
Parallel.For(0, n, options, (i) =>
{
// ...
// ... optionally check to see if cancellation happened
if (token.IsCancellationRequested)
{
// ... optionally exit this iteration early
return;
}
});
}
catch (OperationCanceledException ex)
{
// ... handle the loop cancellation
}
}
函数签名:
Parallel.For(int fromInclusive,
int toExclusive,
ParallelOptions parallelOptions,
Action<int> body);
问题:如果使用cancel,是否还会调度执行剩下的任务?
int n = ...
double[] result = new double[n];
Parallel.ForEach(Partitioner.Create(0, n),
(range) =>
{
for (int i = range.Item1; i < range.Item2; i++)
{
// very small, equally sized blocks of work
result[i] = (double)(i * i);
}
});
函数签名:
Parallel.ForEach<TSource>(
Partitioner<TSource> source,
Action<TSource> body);
下面的设置每个任务执行50000个循环。
double[] result = new double[1000000];
Parallel.ForEach(Partitioner.Create(0, 1000000, 50000),
(range) =>
{
for (int i = range.Item1; i < range.Item2; i++)
{
// small, equally sized blocks of work
result[i] = (double)(i * i);
}
});
这里System.Collections.Concurrent.Partitioner将区间切割成IEnumerable<Tuple<int,int>>的形式。
var n = ...
var options = new ParallelOptions()
{ MaxDegreeOfParallelism = 2};
Parallel.For(0, n, options, i =>
{
// ...
});
函数签名:
Parallel.For(int fromInclusive,
int toExclusive,
ParallelOptions parallelOptions,
Action<int> body);
PLINQ使用示例:
IEnumerable<T> myCollection = // ...
myCollection.AsParallel()
.WithDegreeOfParallelism(8)
.ForAll(obj => /* ... */);
int numberOfSteps = 10000000;
double[] result = new double[numberOfSteps];
Parallel.ForEach(
Partitioner.Create(0, numberOfSteps),
new ParallelOptions(),
() => { return new Random(MakeRandomSeed()); },
(range, loopState, random) =>
{
for (int i = range.Item1; i < range.Item2; i++)
result[i] = random.NextDouble();
return random;
},
_ => {});
函数签名:
ForEach<TSource, TLocal>(
OrderablePartitioner<TSource> source,
ParallelOptions parallelOptions,
Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally)
Parallel.Invoke(DoLeft, DoRight);
等价于下面的方法:
Task t1 = Task.Factory.StartNew(DoLeft);
Task t2 = Task.Factory.StartNew(DoRight);
Task.WaitAll(t1, t2);
try
{
Task t = Task.Factory.StartNew( ... );
// ...
t.Wait();
}
catch (AggregateException ae)
{
ae.Handle(e =>
{
if (e is MyException)
{
// ... handle exception ...
return true;
}
else
{
return false;
}
});
}
由于异常有可能嵌套其它的异常,形成一个多级的树结构,可以使用Flatten压平树结构,然后调用handle,可以保证聚合的所有异常都可以被处理。
try
{
Task t1 = Task.Factory.StartNew(() =>
{
Task t2 = Task.Factory.StartNew(() =>
{
// ...
throw new MyException();
});
// ...
t2.Wait();
});
// ...
t1.Wait();
}
catch (AggregateException ae)
{
ae.Flatten().Handle(e =>
{
if (e is MyException)
{
// ... handle exception ...
return true;
}
else
{
return false;
}
});
}
var taskIndex = -1;
Task[] tasks = new Task[]
{
Task.Factory.StartNew(DoLeft),
Task.Factory.StartNew(DoRight),
Task.Factory.StartNew(DoCenter)
};
Task[] allTasks = tasks;
// Print completion notices one by one as tasks finish.
while (tasks.Length > 0)
{
taskIndex = Task.WaitAny(tasks);
Console.WriteLine("Finished task {0}.", taskIndex + 1);
tasks = tasks.Where((t) => t != tasks[taskIndex]).ToArray();
}
// Observe any exceptions that might have occurred.
try
{
Task.WaitAll(allTasks);
}
catch (AggregateException ae)
{
...
}
下面的例子等待第一个完成的任务,然后取消其它任务,处理取消操作异常,其它的异常会重新抛出。
public static void SpeculativeInvoke(
params Action<CancellationToken>[] actions)
{
var cts = new CancellationTokenSource();
var token = cts.Token;
var tasks =
(from a in actions
select Task.Factory.StartNew(() => a(token), token))
.ToArray();
// Wait for fastest task to complete.
Task.WaitAny(tasks);
// Cancel all of the slower tasks.
cts.Cancel();
// Wait for cancellation to finish and observe exceptions.
try
{
Task.WaitAll(tasks);
}
catch (AggregateException ae)
{
// Filter out the exception caused by cancellation itself.
ae.Flatten().Handle(e => e is OperationCanceledException);
}
finally
{
if (cts != null) cts.Dispose();
}
}
for (int i = 0; i < 4; i++)
{ // WARNING: BUGGY CODE, i has unexpected value
Task.Factory.StartNew(() => Console.WriteLine(i));
}
你可能希望输出数字1,2,3,4,只是打乱了顺序。实际上你很可能看到4,4,4,4. 因为几个Task其实访问了同一个变量i。可以使用下面的方法来避免这个问题。
for (int i = 0; i < 4; i++)
{
var tmp = i;
Task.Factory.StartNew(() => Console.WriteLine(tmp));
}
Task<string> t;
using (var file = new StringReader("text"))
{
t = Task<string>.Factory.StartNew(() => file.ReadLine());
}
// WARNING: BUGGY CODE, file has been disposed
Console.WriteLine(t.Result);
很可能任务执行的时候file已经dispose了。
标签:
原文地址:http://www.cnblogs.com/lingshf/p/5266616.html