8天玩转并行开发——第四天 同步机制(上)

时间:2019-05-31 14:18 作者:Mr.Pan 阅读数:650



 在并行计算中,不可避免的会碰到多个任务共享变量,实例,集合。虽然task自带了两个方法:task.ContinueWith()和Task.Factory

.ContinueWhenAll()来实现任务串行化,但是这些简单的方法远远不能满足我们实际的开发需要,从.net 4.0开始,类库给我们提供了很多

的类来帮助我们简化并行计算中复杂的数据同步问题。

 

大体上分为二种:

①   并发集合类:           这个在先前的文章中也用到了,他们的出现不再让我们过多的关注同步细节。

②  轻量级同步机制:      相对于老版本中那些所谓的重量级同步机制而言,新的机制更加节省cpu的额外开销。

 

关于并发集合类没什么好讲的,如果大家熟悉非线程安全的集合,那么这些并发的集合对你来说小菜一碟,这一篇和下一篇我们仔细来玩玩这

些轻量级的同步机制。

 

一:Barrier(屏障同步)

 

1:基本概念

    msdn对它的解释是:使多个任务能够采用并行方式依据某种算法在多个阶段协同工作。乍一看有点不懂,没关系,我们采取提干法。

”多个任务“,”多个阶段”,“协同”,仔细想想知道了,下一阶段的执行必须等待上一个阶段中多task全部执行完,那么我们实际中有这样

的需求吗?当然有的,比如我们数据库中有100w条数据需要导入excel,为了在数据库中加速load,我们需要开多个任务去跑,比如这

里的4个task,要想load产品表,必须等4个task都跑完用户表才行,那么你有什么办法可以让task为了你两肋插刀呢?它就是Barrier。

 

好,我们知道barrier叫做屏障,就像下图中的“红色线”,如果我们的屏障设为4个task就认为已经满了的话,那么执行中先到的task必须等待

后到的task,通知方式也就是barrier.SignalAndWait(),屏障中线程设置操作为new Barrier(4,(i)=>{})。

 

啰嗦了半天,还是上下代码说话:

复制代码

  System.Collections.Concurrent;
  System.Threading.Tasks;
  System;
  System.Diagnostics;
  System.Collections.Generic;
  System.Linq;
  System.Threading;
 
  Program
 {
   Task[] tasks =  Task[];
 
  Barrier barrier = ;
 
   Main([] args)
 {
 barrier =  Barrier(tasks.Length, (i) =>
 {
 Console.WriteLine();
 Console.WriteLine(, i.CurrentPhaseNumber);
 Console.WriteLine();
 });
 
  ( j = ; j < tasks.Length; j++)
 {
 tasks[j] = Task.Factory.StartNew((obj) =>
 {
  single = Convert.ToInt32(obj);
 
 LoadUser(single);
 barrier.SignalAndWait();
 
 LoadProduct(single);
 barrier.SignalAndWait();
 
 LoadOrder(single);
 barrier.SignalAndWait();
 }, j);
 }
 
 Task.WaitAll(tasks);
 
 Console.WriteLine();
 
 Console.Read();
 }
 
   LoadUser( num)
 {
 Console.WriteLine(, num);
 }
 
   LoadProduct( num)
 {
 Console.WriteLine(, num);
 }
 
   LoadOrder( num)
 {
 Console.WriteLine(, num);
 }
 }

复制代码


2:死锁问题

    先前的例子我们也知道,屏障必须等待4个task通过SignalAndWait()来告知自己已经到达,当4个task全部达到后,我们可以通过

barrier.ParticipantsRemaining来获取task到达状态,那么如果有一个task久久不能到达那会是怎样的情景呢?好,我举个例子。

复制代码

  System.Collections.Concurrent;
  System.Threading.Tasks;
  System;
  System.Diagnostics;
  System.Collections.Generic;
  System.Linq;
  System.Threading;
 
  Program
 {
   Task[] tasks =  Task[4];
 
  Barrier barrier = ;
 
   Main([] args)
 {
 barrier =  Barrier(tasks.Length, (i) =>
 {
 Console.WriteLine();
 Console.WriteLine(, i.CurrentPhaseNumber);
 Console.WriteLine();
 });
 
  ( j = ; j < tasks.Length; j++)
 {
 tasks[j] = Task.Factory.StartNew((obj) =>
 {
  single = Convert.ToInt32(obj);
 
 LoadUser(single);
 barrier.SignalAndWait();
 
 LoadProduct(single);
 barrier.SignalAndWait();
 
 LoadOrder(single);
 barrier.SignalAndWait();
 
 }, j);
 }
 
 Task.WaitAll(tasks);
 
 barrier.Dispose();
 
 Console.WriteLine();
 
 Console.Read();
 }
 
   LoadUser( num)
 {
 Console.WriteLine(, num);
 
  (num == )
 {
    SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == );
 }
 }
 
   LoadProduct( num)
 {
 Console.WriteLine(, num);
 }
 
   LoadOrder( num)
 {
 Console.WriteLine(, num);
 }
 }

复制代码

我们发现程序在加载User表的时候卡住了,出现了类似死循环,这句SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == 0)中

的ParticipantsRemaining==0 永远也不能成立,导致task0永远都不能退出,然而barrier还在一直等待task0调用SignalAndWait来结束屏障。

结果就是造成了相互等待的尴尬局面,我们下个断点看看情况。

 

3:超时机制

    当我们coding的时候遇到了这种问题还是很纠结的,所以我们必须引入一种“超时机制”,如果在指定的时候内所有的参与者(task)都

没有到达屏障的话,我们就需要取消这些参与者的后续执行,幸好SignalAndWait给我们提供了超时的重载,为了能够取消后续执行,我们

还要采用CancellationToken机制。

复制代码

  System.Collections.Concurrent;
  System.Threading.Tasks;
  System;
  System.Diagnostics;
  System.Collections.Generic;
  System.Linq;
  System.Threading;
 
  Program
 {
   Task[] tasks =  Task[];
 
  Barrier barrier = ;
 
   Main([] args)
 {
 CancellationTokenSource cts =  CancellationTokenSource();
 
 CancellationToken ct = cts.Token;
 
 barrier =  Barrier(tasks.Length, (i) =>
 {
 Console.WriteLine();
 Console.WriteLine(, i.CurrentPhaseNumber);
 Console.WriteLine();
 });
 
  ( j = ; j < tasks.Length; j++)
 {
 tasks[j] = Task.Factory.StartNew((obj) =>
 {
  single = Convert.ToInt32(obj);
 
 LoadUser(single);
 
  (!barrier.SignalAndWait())
 {
    OperationCanceledException(.Format(, single), ct);
 }
 
 LoadProduct(single);
 barrier.SignalAndWait();
 
 LoadOrder(single);
 barrier.SignalAndWait();
 
 }, j, ct);
 }
 
  Task.WaitAll(tasks, );
 
 
 {
  ( i = ; i < tasks.Length; i++)
 {
  (tasks[i].Status == TaskStatus.Faulted)
 {
   ( single  tasks[i].Exception.InnerExceptions)
 {
 Console.WriteLine(single.Message);
 }
 }
 }
 
 barrier.Dispose();
 }
  (AggregateException e)
 {
 Console.WriteLine(, e.Message);
 }
 
 Console.Read();
 }
 
   LoadUser( num)
 {
 Console.WriteLine(, num);
 
  (num == )
 {
   (!SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == , ))
 ;
 }
 
 Console.WriteLine(, num);
 }
 
   LoadProduct( num)
 {
 Console.WriteLine(, num);
 }
 
   LoadOrder( num)
 {
 Console.WriteLine(, num);
 }
 }

复制代码


二:spinLock(自旋锁)

    我们初识多线程或者多任务时,第一个想到的同步方法就是使用lock或者Monitor,然而在4.0 之后给我们提供了另一把武器spinLock,

如果你的任务持有锁的时间非常短,具体短到什么时候msdn也没有给我们具体的答案,但是有一点值得确定的时,如果持有锁的时候比较

短,那么它比那些重量级别的Monitor具有更小的性能开销,它的用法跟Monitor很相似,下面举个例子,Add2方法采用自旋锁。

复制代码

  System.Collections.Concurrent;
  System.Threading.Tasks;
  System;
  System.Diagnostics;
  System.Collections.Generic;
  System.Linq;
  System.Threading;
 
  Program
 {
  SpinLock slock =  SpinLock();
 
   sum1 = ;
 
   sum2 = ;
 
   Main([] args)
 {
 Task[] tasks =  Task[];
 
  ( i = ; i <= ; i++)
 {
 tasks[i - ] = Task.Factory.StartNew((num) =>
 {
 Add1(()num);
 
 Add2(()num);
 
 }, i);
 }
 
 Task.WaitAll(tasks);
 
 Console.WriteLine(, sum1);
 
 Console.WriteLine(, sum2);
 
 Console.Read();
 }
 
    Add1( num)
 {
 Thread.Sleep();
 
 sum1 += num;
 }
 
    Add2( num)
 {
  lockTaken = ;
 
 Thread.Sleep();
 
 
 {
 slock.Enter( lockTaken);
 sum2 += num;
 }
 
 {
  (lockTaken)
 slock.Exit();
 }
 }
 }

复制代码


文章转自:https://www.cnblogs.com/huangxincheng/archive/2012/04/07/2437110.html


推荐文章

[推荐] C# 设置Excel数据自适应行高、列宽的2种情况 (转)

[推荐] C# 图片处理帮助类(压缩,裁剪,缩略图)

[推荐] C#创建Word并填充内容

[推荐] C# 通过AForge.dll 调用摄像头设备

[推荐] C# UDP 通讯 简单例子

[推荐] 8天玩转并行开发——第七天 简要分析任务与线程池

[推荐] 8天玩转并行开发——第六天 异步编程模型

[推荐] 8天玩转并行开发——第四天 同步机制(上)

[推荐] 8天玩转并行开发——第一天 Parallel的使用

[推荐] c# 个人总结

[推荐] 8天玩转并行开发——第五天 同步机制(下)

[推荐] 将DataRow[]转换成DataTable的两种写法

[推荐] 8天玩转并行开发——第三天 plinq的使用

[推荐] Bitmap与byte[] 互转

[推荐] 关于.net登录前端加密传输


评论列表



注册用户登录后才能发表评论,请登录注册,访问 博客首页