码迷,mamicode.com
首页 > Web开发 > 详细

.NET并行编程1 -

时间:2016-03-11 18:43:58      阅读:193      评论:0      收藏:0      [点我收藏+]

标签:

  1. 设计模式——.net并行编程,清华大学出版的中译本。
    1. 相关资源地址主页面: http://parallelpatterns.codeplex.com/
    2. 代码下载: http://parallelpatterns.codeplex.com/releases/view/50473
    3. 书籍在线地址: https://msdn.microsoft.com/en-us/library/ff963553.aspx
    4. 使用并行编程的一些示例: https://code.msdn.microsoft.com/ParExtSamples
    5. Task的介绍页面: https://msdn.microsoft.com/en-us/library/system.threading.tasks.task(v=vs.110).aspx

技术分享

这本书介绍了一些多线程编程的模式,也就是会使用多线程的场景,以及可以使用.net中的什么技术实现--当然主要是TPL(Task parallel Library)和PLINQ(parallel LINQ)。TPL是.NET Framework 4中加的功能,目的是封装以前的thread和同步,线程池等概念,大家只要使用task和System.Threading.Tasks.Parallel,提供并行任务,并行和同步的细节交给库处理。 PLINQ是LINQ to Objects的并行版本。下面是这些结构的一个概览。

技术分享

  1. 模式分类
    1. 数据并行data parallelism。对不同的数据执行相同的计算——比如for循环中的计算,这是属于数据并行。

包括的模式有并行循环(parallel loops)和并行聚合(parallel aggregation)。并行循环强调并行循环之间没有数据依赖,不用控制并行顺序;并行聚合类似于map-reduce,任务有并发的部分,但数据也有需要控制同步的地方,.net提供的库很好的封装了同步,使用起来就像不需要同步一样简单。

  1. 任务并行task parallelism。强调并行执行的任务不同,一般任务的输入数据也不同。

包括的模式有并行任务parallel tasks、future模式、动态任务并行(dynamic task parallelism)、流水线pipelines。

技术分享

  1. 并行循环。当需要对集合中的每个元素执行相同的独立操作时,可以使用并行循环模式,注意循环需要相互独立。
    1. 一般情况。例如我们可能有这么一个for循环

int n = ...

for (int i = 0; i < n; i++)

{

// do some task

}

对应的并行版本:

int n = ...

Parallel.For(0, n, i =>

{

// ...

});

Foreach也有相应的并行版本

IEnumerable<MyObject> myEnumerable = ...

foreach (var obj in myEnumerable)

{

// ...

}

IEnumerable<MyObject> myEnumerable = ...

Parallel.ForEach(myEnumerable, obj =>

{

// ...

});

PLINQ多线程的例子

IEnumerable<MyObject> source = ...

// LINQ

var query1 = from i in source select Normalize(i);

// PLINQ

var query2 = from i in source.AsParallel()

select Normalize(i);

  1. 控制循环过程。可以在任务执行过程中控制其它并行任务,可以使用break,stop和cancel。一般stop和cancel使用比较常见。
    1. 中断循环break。类似于for循环的break。注意break之后,依然会执行比break 的task的索引值小的任务,而且会保证所有索引值小的任务都会执行。如果索引值大的任务在break前开始执行,也会执行完毕。这种方式适用于任务顺序有依赖的情况,需要保证中断前的任务执行完毕。

int n = ...

for (int i = 0; i < n; i++)

{

// ...

if (/* stopping condition is true */)

break;

}

采用多线程版本时可以这样退出循环:

int n = ...

Parallel.For(0, n, (i, loopState) =>

{

// ...

if (/* stopping condition is true */)

{

loopState.Break();

return;

}

});

签名:

Parallel.For(int fromInclusive,

int toExclusive,

Action<int, ParallelLoopState> body);

检查task状态是不是中断退出的方法:

int n = ...

var result = new double[n];

var loopResult = Parallel.For(0, n, (i, loopState) =>

{

if (/* break condition is true */)

{

loopState.Break();

return;

}

result[i] = DoWork(i);

});

if (!loopResult.IsCompleted &&

loopResult.LowestBreakIteration.HasValue)

{

Console.WriteLine("Loop encountered a break at {0}",

loopResult.LowestBreakIteration.Value);

}

  1. 中断停止stop。类似于break,不同的是stop以后不会再执行索引值小的任务,也就是正在执行的执行完毕,其它的就不再执行。任务之间完全没有依赖,只要有一个任务stop,那么不再调度剩下的任务。

var n = ...

var loopResult = Parallel.For(0, n, (i, loopState) =>

{

if (/* stopping condition is true */)

{

loopState.Stop();

return;

}

result[i] = DoWork(i);

});

if (!loopResult.IsCompleted &&

!loopResult.LowestBreakIteration.HasValue)

{

Console.WriteLine(“Loop was stopped”);

}

  1. 外部循环取消。对于一些执行时间比较长的任务,可以使用取消操作。任务执行期间检查是否取消的标识。

void DoLoop(CancellationTokenSource cts)

{

int n = ...

CancellationToken token = cts.Token;

var options = new ParallelOptions

{ CancellationToken = token };

try

{

Parallel.For(0, n, options, (i) =>

{

// ...

// ... optionally check to see if cancellation happened

if (token.IsCancellationRequested)

{

// ... optionally exit this iteration early

return;

}

});

}

catch (OperationCanceledException ex)

{

// ... handle the loop cancellation

}

}

函数签名:

Parallel.For(int fromInclusive,

int toExclusive,

ParallelOptions parallelOptions,

Action<int> body);

问题:如果使用cancel,是否还会调度执行剩下的任务?

  1. 异常处理。如果有一个任务中抛出了异常,后面不会再调度新的任务,已经调度的会执行完成。最后所有任务可能的异常会打包放在一个异常AggregationException里面抛出来。
  2. 分批执行小循环体。有的循环体执行时间较少,如果每次循环都调度一个任务,显然得不偿失。可以讲循环的执行过程分区,比如每100个循环调度一次。下面的例子根据cpu的核数自动分配每批任务的数量。

int n = ...

double[] result = new double[n];

Parallel.ForEach(Partitioner.Create(0, n),

(range) =>

{

for (int i = range.Item1; i < range.Item2; i++)

{

// very small, equally sized blocks of work

result[i] = (double)(i * i);

}

});

函数签名:

Parallel.ForEach<TSource>(

Partitioner<TSource> source,

Action<TSource> body);

下面的设置每个任务执行50000个循环。

double[] result = new double[1000000];
Parallel.ForEach(Partitioner.Create(0, 1000000, 50000),
(range) =>
{
for (int i = range.Item1; i < range.Item2; i++)
{
// small, equally sized blocks of work
result[i] = (double)(i * i);
}
});

这里System.Collections.Concurrent.Partitioner将区间切割成IEnumerable<Tuple<int,int>>的形式。

  1. 控制并行度。一般TPL会自动根据CPU内核数控制同时执行的任务数,你也可以通过ParallelOption的MaxDegreeOfParallelism来控制最大的并行任务数。

var n = ...

var options = new ParallelOptions()

{ MaxDegreeOfParallelism = 2};

Parallel.For(0, n, options, i =>

{

// ...

});

函数签名:

Parallel.For(int fromInclusive,

int toExclusive,

ParallelOptions parallelOptions,

Action<int> body);

PLINQ使用示例:

IEnumerable<T> myCollection = // ...

myCollection.AsParallel()
.WithDegreeOfParallelism(8)
.ForAll(obj => /* ... */);

  1. 在循环体中使用局部任务状态。

int numberOfSteps = 10000000;

double[] result = new double[numberOfSteps];

Parallel.ForEach(

Partitioner.Create(0, numberOfSteps),

new ParallelOptions(),

() => { return new Random(MakeRandomSeed()); },

(range, loopState, random) =>

{

for (int i = range.Item1; i < range.Item2; i++)

result[i] = random.NextDouble();

return random;

},

_ => {});

函数签名:

ForEach<TSource, TLocal>(

OrderablePartitioner<TSource> source,

ParallelOptions parallelOptions,

Func<TLocal> localInit,

Func<TSource, ParallelLoopState, TLocal, TLocal> body,

Action<TLocal> localFinally)

  1. 并行任务。如果有多个一步任务可以同时执行,可以使用并行任务模式。例如

Parallel.Invoke(DoLeft, DoRight);

等价于下面的方法:

Task t1 = Task.Factory.StartNew(DoLeft);

Task t2 = Task.Factory.StartNew(DoRight);

Task.WaitAll(t1, t2);

  1. 处理异常( https://msdn.microsoft.com/en-us/library/dd997415(v=vs.110).aspx )。使用Wait和WaitAll可以观察任务抛出来的异常,WaitAny不能。收到的异常会包装到AggregateException里面,可以使用Handle方法来处理里面的异常。

try

{

Task t = Task.Factory.StartNew( ... );

// ...

t.Wait();

}

catch (AggregateException ae)

{

ae.Handle(e =>

{

if (e is MyException)

{

// ... handle exception ...

return true;

}

else

{

return false;

}

});

}

由于异常有可能嵌套其它的异常,形成一个多级的树结构,可以使用Flatten压平树结构,然后调用handle,可以保证聚合的所有异常都可以被处理。

try

{

Task t1 = Task.Factory.StartNew(() =>

{

Task t2 = Task.Factory.StartNew(() =>

{

// ...

throw new MyException();

});

// ...

t2.Wait();

});

// ...

t1.Wait();

}

catch (AggregateException ae)

{

ae.Flatten().Handle(e =>

{

if (e is MyException)

{

// ... handle exception ...

return true;

}

else

{

return false;

}

});

}

  1. 等待第一个任务完成。可以使用WaitAny等待第一个任务完成。注意WaitAny不会观察到异常,后面加了WaitAll来处理异常。

var taskIndex = -1;

Task[] tasks = new Task[]

{

Task.Factory.StartNew(DoLeft),

Task.Factory.StartNew(DoRight),

Task.Factory.StartNew(DoCenter)

};

Task[] allTasks = tasks;

// Print completion notices one by one as tasks finish.

while (tasks.Length > 0)

{

taskIndex = Task.WaitAny(tasks);

Console.WriteLine("Finished task {0}.", taskIndex + 1);

tasks = tasks.Where((t) => t != tasks[taskIndex]).ToArray();

}

// Observe any exceptions that might have occurred.

try

{

Task.WaitAll(allTasks);

}

catch (AggregateException ae)

{

...

}

下面的例子等待第一个完成的任务,然后取消其它任务,处理取消操作异常,其它的异常会重新抛出。

public static void SpeculativeInvoke(

params Action<CancellationToken>[] actions)

{

var cts = new CancellationTokenSource();

var token = cts.Token;

var tasks =

(from a in actions

select Task.Factory.StartNew(() => a(token), token))

.ToArray();

// Wait for fastest task to complete.

Task.WaitAny(tasks);

// Cancel all of the slower tasks.

cts.Cancel();

// Wait for cancellation to finish and observe exceptions.

try

{

Task.WaitAll(tasks);

}

catch (AggregateException ae)

{

// Filter out the exception caused by cancellation itself.

ae.Flatten().Handle(e => e is OperationCanceledException);

}

finally

{

if (cts != null) cts.Dispose();

}

}

  1. 新手易犯的错误。
    1. 闭包捕获的变量问题。考虑下面这段代码。

for (int i = 0; i < 4; i++)

{ // WARNING: BUGGY CODE, i has unexpected value

Task.Factory.StartNew(() => Console.WriteLine(i));

}

你可能希望输出数字1,2,3,4,只是打乱了顺序。实际上你很可能看到4,4,4,4. 因为几个Task其实访问了同一个变量i。可以使用下面的方法来避免这个问题。

for (int i = 0; i < 4; i++)

{

var tmp = i;

Task.Factory.StartNew(() => Console.WriteLine(tmp));

}

  1. 错误的时间清理任务所需资源的问题。考虑下面这段代码。

Task<string> t;

using (var file = new StringReader("text"))

{

t = Task<string>.Factory.StartNew(() => file.ReadLine());

}

// WARNING: BUGGY CODE, file has been disposed

Console.WriteLine(t.Result);

很可能任务执行的时候file已经dispose了。

  1. 任务的生命周期。

技术分享

  1. 任务调度机制。一个Task Scheduler的例子:"How to: Create a Task Scheduler That Limits the Degree of Concurrency."
  1. 并行合并计算。

.NET并行编程1 -

标签:

原文地址:http://www.cnblogs.com/lingshf/p/5266616.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!