8天玩转并行开发——第三天 plinq的使用

时间:2019-05-31 14:17 作者:Mr.Pan 阅读数:592


 相信在.net平台下,我们都玩过linq,是的,linq让我们的程序简洁优美,简直玩的是爱不释手,但是传统的linq只是串行代码,在并行的

年代如果linq不支持并行计算那该是多么遗憾的事情啊。

   当然linq有很多种方式,比如linq to sql ,xml,object 等等,如果要将linq做成并行还是很简单的,这里我就举一个比较实际一点的例子,

我们知道为了更快的响应用户操作,码农们想尽了各种办法,绞尽了脑汁,其中有一个办法就是将数据库数据预加载到内存中,然后通过各种

数据结构的手段来加速CURD,是的,比如一个排序地球人只能做到N(lgN),那么如果我还想再快一点的话该怎么办呢?那么现在的并行就能发

挥巨大的优势,尤其是现在的服务器配置都是在8个硬件线程的情况下,你简直会狂笑好几天啊,好,不乱扯了。

 

1:AsParallel(并行化)

下面我们模拟给ConcurrentDictionary灌入1500w条记录,看看串行和并行效率上的差异,注意我的老爷机是2个硬件线程。

复制代码

  System;
  System.Threading;
  System.Threading.Tasks;
  System.Diagnostics;
  System.Collections.Concurrent;
  System.Collections.Generic;
 
  System.Linq;
 
  Program
 {
       Main([] args)
     {
          dic = LoadData();
 
         Stopwatch watch =  Stopwatch();
 
         watch.Start();
 
                   query1 = ( n  dic.Values
                        n.Age >  && n.Age < 
                        n).ToList();
 
         watch.Stop();
 
         Console.WriteLine(, watch.ElapsedMilliseconds);
 
         watch.Restart();
 
          query2 = ( n  dic.Values.AsParallel()
                        n.Age >  && n.Age < 
                        n).ToList();
 
         watch.Stop();
 
         Console.WriteLine(, watch.ElapsedMilliseconds);
 
         Console.Read();
     }
 
       ConcurrentDictionary<, Student> LoadData()
     {
         ConcurrentDictionary<, Student> dic =  ConcurrentDictionary<, Student>();
 
                  Parallel.For(, , (i) =>
         {
              single =  Student()
             {
                 ID = i,
                 Name =  + i,
                 Age = i % ,
                 CreateTime = DateTime.Now.AddSeconds(i)
             };
             dic.TryAdd(i, single);
         });
 
          dic;
     }
 
       Student
     {
           ID { ; ; }
 
           Name { ; ; }
 
           Age { ; ; }
 
          DateTime CreateTime { ; ; }
     }
 }

复制代码

执行的结果还是比较震撼的,将近7倍,这是因为plinq的查询引擎会尽量利用cpu的所有硬件线程。

 

2:常用方法的使用

<1> orderby 

      有时候我们并不是简单的select一下就ok了,可能需要将结果进行orderby操作,并行化引擎会把要遍历的数据分区,然后在每个区上进行

orderby操作,最后来一个总的orderby,这里很像算法中的“归并排序”。

复制代码

  System;
  System.Threading;
  System.Threading.Tasks;
  System.Diagnostics;
  System.Collections.Concurrent;
  System.Collections.Generic;
 
  System.Linq;
 
  Program
 {
       Main([] args)
     {
          dic = LoadData();
 
          query1 = ( n  dic.Values.AsParallel()
                        n.Age >  && n.Age < 
                        n).ToList();
 
 
         Console.WriteLine();
         query1.Take().ToList().ForEach((i) =>
         {
             Console.WriteLine(i.CreateTime);
         });
 
          query2 = ( n  dic.Values.AsParallel()
                        n.Age >  && n.Age < 
                        n.CreateTime descending
                        n).ToList();
 
         Console.WriteLine();
         query2.Take().ToList().ForEach((i) =>
         {
             Console.WriteLine(i.CreateTime);
         });
 
         Console.Read();
     }
 
       ConcurrentDictionary<, Student> LoadData()
     {
         ConcurrentDictionary<, Student> dic =  ConcurrentDictionary<, Student>();
 
                  Parallel.For(, , (i) =>
         {
              single =  Student()
             {
                 ID = i,
                 Name =  + i,
                 Age = i % ,
                 CreateTime = DateTime.Now.AddSeconds(i)
             };
             dic.TryAdd(i, single);
         });
 
          dic;
     }
 
       Student
     {
           ID { ; ; }
 
           Name { ; ; }
 
           Age { ; ; }
 
          DateTime CreateTime { ; ; }
     }
 }

复制代码

 

<2> sum(),average()等等这些聚合函数的效果跟orderby类型一样,都是实现了类型归并排序的效果,这里就不举例子了。

 

3:指定并行度,这个我在前面文章也说过,为了不让并行计算占用全部的硬件线程,或许可能要留一个线程做其他事情。

          query2 = ( n  dic.Values.AsParallel()
                      
                        n.Age >  && n.Age < 
                        n.CreateTime descending
                        n).ToList();

 

4: 了解ParallelEnumerable类

   首先这个类是Enumerable的并行版本,提供了很多用于查询实现的一组方法,截个图,大家看看是不是很熟悉,要记住,他们都是并行的。

下面列举几个简单的例子。

复制代码

  Program
 {
       Main([] args)
     {
         ConcurrentBag<> bag =  ConcurrentBag<>();
 
          list = ParallelEnumerable.Range(, );
 
         list.ForAll((i) =>
         {
             bag.Add(i);
         });
 
         Console.WriteLine(, bag.Count);
 
         Console.WriteLine(, list.Sum());
 
         Console.WriteLine(, list.Max());
 
         Console.WriteLine(, list.FirstOrDefault());
 
         Console.Read();
     }
 }

复制代码

 

5: plinq实现MapReduce算法

  mapReduce是一个非常流行的编程模型,用于大规模数据集的并行计算,非常的牛X啊,记得mongodb中就用到了这个玩意。

map:  也就是“映射”操作,可以为每一个数据项建立一个键值对,映射完后会形成一个键值对的集合。

reduce:“化简”操作,我们对这些巨大的“键值对集合“进行分组,统计等等。

具体大家可以看看百科:http://baike.baidu.com/view/2902.htm

 

下面我举个例子,用Mapreduce来实现一个对age的分组统计。

复制代码

 System;
 System.Threading;
 System.Threading.Tasks;
 System.Diagnostics;
 System.Collections.Concurrent;

 System.Collections.Generic;

 System.Linq;

 Program
{
      Main([] args)
    {
        List<Student> list =  List<Student>()
        {
             Student(){ ID=, Name=, Age=},
             Student(){ ID=, Name=, Age=},
             Student(){ ID=, Name=, Age=},
             Student(){ ID=, Name=, Age=},
        };

                 map = list.AsParallel().ToLookup(i => i.Age, count => );

                 reduce =  IGrouping<, > singleMap
                      map.AsParallel()
                      
                     {
                         Age = singleMap.Key,
                         Count = singleMap.Count()
                     };

                reduce.ForAll(i =>
        {
            Console.WriteLine(, i.Age, i.Count);
        });
    }

      Student
    {
          ID { ; ; }

          Name { ; ; }

          Age { ; ; }

         DateTime CreateTime { ; ; }
    }
}

复制代码

 

文章转自:https://www.cnblogs.com/huangxincheng/archive/2012/04/04/2431616.html


推荐文章

[推荐] C# 设置Excel数据自适应行高、列宽的2种情况 (转)

[推荐] C# 图片处理帮助类(压缩,裁剪,缩略图)

[推荐] C# 通过AForge.dll 调用摄像头设备

[推荐] C#创建Word并填充内容

[推荐] C# UDP 通讯 简单例子

[推荐] 8天玩转并行开发——第七天 简要分析任务与线程池

[推荐] 8天玩转并行开发——第六天 异步编程模型

[推荐] 8天玩转并行开发——第四天 同步机制(上)

[推荐] 8天玩转并行开发——第一天 Parallel的使用

[推荐] c# 个人总结

[推荐] 8天玩转并行开发——第五天 同步机制(下)

[推荐] 将DataRow[]转换成DataTable的两种写法

[推荐] 8天玩转并行开发——第三天 plinq的使用

[推荐] Bitmap与byte[] 互转

[推荐] 关于.net登录前端加密传输


评论列表



注册用户登录后才能发表评论,请登录注册,访问 博客首页