内容简介:.NET Framework 4在现在的System.Threading命名空间中提供了6个同步原语,通过这个命名空间可以访问遗留的线程类、类型和枚举,还提供了新的基于任务的编程模型及特定情形紧密相关的数据结构Barrier使多个任务能够采用并行方式依据某种算法在多个阶段中协同工作 通过屏障CountdownEvent表示在计数变为0时处于有信号状态的同步基元 通过信号机制
同步原语
.NET Framework 4在现在的System.Threading命名空间中提供了6个同步原语,通过这个命名空间可以访问遗留的线程类、类型和枚举,还提供了新的基于任务的编程模型及特定情形紧密相关的数据结构
Barrier使多个任务能够采用并行方式依据某种算法在多个阶段中协同工作 通过屏障
CountdownEvent表示在计数变为0时处于有信号状态的同步基元 通过信号机制
ManualResetEventSlim允许很多任务等待直到另一个任务手工发出事件句柄,当预计等待时间很短的时候,ManualResetEventSlim 的性能比对应的重量级ManualResetEvent的性能要高。通过信号机制
SemaphoreSlim限制对可同时访问资源或资源池的线程数,比对应的Semaphore性能要高 通过信号机制
SpinLock 提供一个相互排斥锁基元,在该基元中,尝试获得锁的线程将在重复检查的循环中等待,直至该锁变为可用为止。
SpinWait 提供对基于自旋的等待的支持。
Barrier
当在需要一组任务并行地运行一连串的阶段,但是每一个阶段都要等待其他任务完成前一阶段之后才能开始时,您可以通过使用Barrier类的实例来同步这一类协同工作
生产东西 demo,普通生产,
一个产品经过 工序1+工序2+工序3
现在有machines台机器 //每台生产4个产品
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace testmulity { class Program { //private static Task[] _CookTasks { get; set; } //private static Barrier _barrier { get; set; } ///*获取当前计算机处理器数*/ private static int machines = Environment.ProcessorCount; private static int requestCount = 4; static void Main(string[] args) { Console.Title = "AY2019"; //一个产品经过 工序1+工序2+工序3 //现在有machines台机器 //每台生产4个产品 Stopwatch swTask1 = new Stopwatch(); swTask1.Start(); for (int i = 1; i <= machines; i++) { for (int j = 1; j <= requestCount; j++) { Procedure1(i,j); Procedure2(i, j); Procedure3(i, j); } } swTask1.Stop(); Console.WriteLine("生产完成,时长:"+ swTask1.ElapsedMilliseconds); Console.ReadLine(); } //工序1 public static void Procedure1(int toolIndex, int productIndex) { Console.WriteLine($"第{toolIndex}台机器{productIndex}生产了零件1"); Thread.Sleep(100); } //工序2 public static void Procedure2(int toolIndex, int productIndex) { Console.WriteLine($"第{toolIndex}台机器{productIndex}生产了零件2"); Thread.Sleep(100); } //工序3 public static void Procedure3(int toolIndex, int productIndex) { Console.WriteLine($"第{toolIndex}台机器{productIndex}生产了零件3"); Thread.Sleep(100); } } }
组装起来 4*8台机器,可以有32个产品,一共32*3=96个零件,生产时间有点长
因为机器是等待上台机器完成的,就感觉电压不行,同时带不动8台机器,或者每台机器生产完就要休息1天的样子
修改代码:
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace testmulity { class Program { private static Task[] Tasks { get; set; } private static Barrier _barrier { get; set; } ///*获取当前计算机处理器数*/ private static int machines = Environment.ProcessorCount; private static int requestCount = 4; static void Main(string[] args) { Console.Title = "AY2019"; Tasks = new Task[machines]; _barrier = new Barrier(machines, (barrier) => { Console.WriteLine("当前阶段:{0}", barrier.CurrentPhaseNumber); }); Stopwatch swTask1 = new Stopwatch(); swTask1.Start(); for (int i = 0; i < machines; i++) { Tasks[i] = Task.Factory.StartNew((num) => { int index = Convert.ToInt32(num); for (int j = 1; j <= requestCount; j++) { Procedure1(index+1, j); Procedure2(index+1, j); Procedure3(index+1, j); } }, i); } var finalTask = Task.Factory.ContinueWhenAll(Tasks, (tasks) => { /*等待任务完成*/ Task.WaitAll(Tasks); swTask1.Stop(); Console.WriteLine("生产完成,时长:" + swTask1.ElapsedMilliseconds); /*释放资源*/ _barrier.Dispose(); }); //一个产品经过 工序1+工序2+工序3 //现在有machines台机器 //每台生产4个产品 //Stopwatch swTask1 = new Stopwatch(); //swTask1.Start(); //for (int i = 1; i <= machines; i++) //{ // for (int j = 1; j <= requestCount; j++) // { // Procedure1(i,j); // Procedure2(i, j); // Procedure3(i, j); // } //} //swTask1.Stop(); //Console.WriteLine("生产完成,时长:"+ swTask1.ElapsedMilliseconds); Console.ReadLine(); } //工序1 public static void Procedure1(int toolIndex, int productIndex) { Console.WriteLine($"第{toolIndex}台机器执行第{productIndex}次任务生产了零件1"); Thread.Sleep(100); _barrier.SignalAndWait(); } //工序2 public static void Procedure2(int toolIndex, int productIndex) { Console.WriteLine($"第{toolIndex}台机器执行第{productIndex}次任务生产了零件2"); Thread.Sleep(100); _barrier.SignalAndWait(); } //工序3 public static void Procedure3(int toolIndex, int productIndex) { Console.WriteLine($"第{toolIndex}台机器执行第{productIndex}次任务生产了零件3"); Thread.Sleep(100); _barrier.SignalAndWait(); } } }
现在8个机器同时生产 零件1,零件2,零件3 以此类推完成4次。
ContinueWhenAll是一组task完成后,执行啥
异常处理
try { if (!_barrier.SignalAndWait(2000)) { /*抛出超时异常*/ throw new OperationCanceledException("等待超时,抛出异常"); } } catch (Exception ex) { /*处理异常*/ Console.WriteLine(ex.Message); continue; } Procedure2(index+1, j); _barrier.SignalAndWait();
完成后,手动SignalAndWait()
互斥和可见性。互斥指的是一次只允许一个线程持有某个特定的锁,因此可以保证共享数据内容的一致性;
可见性指的是必须确保锁被释放之前对共享数据的修改,随后获得锁的另一个线程能够知道该行为。
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace testmulity { class Program { private static Task[] _CookTasks { get; set; } private static object o = new object(); private static StringBuilder AppendStrUnLock = new StringBuilder(); private static StringBuilder AppendStrLock = new StringBuilder(); private static StringBuilder AppendStrMonitorLock = new StringBuilder(); /*获取当前计算机处理器数*/ private static int _particpants = Environment.ProcessorCount; static void Main(string[] args) { _CookTasks = new Task[_particpants]; Stopwatch swTask1 = new Stopwatch(); swTask1.Start(); for (int task_index = 0; task_index < _particpants; task_index++) { _CookTasks[task_index] = Task.Factory.StartNew((num) => { Parallel.For(1, 1000, (i) => { string str = "append message " + i; AppendStrUnLock.Append(str); }); }, task_index); } /*ContinueWhenAll 提供一组任务完成后 延续方法*/ var finalTask = Task.Factory.ContinueWhenAll(_CookTasks, (tasks) => { /*等待任务完成*/ Task.WaitAll(_CookTasks); swTask1.Stop(); Console.WriteLine("不采用Lock操作,字符串长度:{0},耗时:{1}", AppendStrUnLock.Length, swTask1.ElapsedMilliseconds); /*释放资源*/ }); Console.ReadLine(); } } }
采用lock
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace testmulity { class Program { private static Task[] _CookTasks { get; set; } private static object o = new object(); private static StringBuilder AppendStrUnLock = new StringBuilder(); private static StringBuilder AppendStrLock = new StringBuilder(); private static StringBuilder AppendStrMonitorLock = new StringBuilder(); /*获取当前计算机处理器数*/ private static int _particpants = Environment.ProcessorCount; static void Main(string[] args) { _CookTasks = new Task[_particpants]; Stopwatch swTask1 = new Stopwatch(); swTask1.Start(); for (int task_index = 0; task_index < _particpants; task_index++) { _CookTasks[task_index] = Task.Factory.StartNew((num) => { Parallel.For(1, 1000, (i) => { string str = "append message " + i; lock (o) { AppendStrLock.Append(str); } }); }, task_index); } /*ContinueWhenAll 提供一组任务完成后 延续方法*/ var finalTask = Task.Factory.ContinueWhenAll(_CookTasks, (tasks) => { /*等待任务完成*/ Task.WaitAll(_CookTasks); swTask1.Stop(); Console.WriteLine("采用Lock操作,字符串长度:{0},耗时:{1}", AppendStrLock.Length, swTask1.ElapsedMilliseconds); /*释放资源*/ }); Console.ReadLine(); } } }
采用互斥锁代码下:
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace testmulity { class Program { private static Task[] _CookTasks { get; set; } private static object o = new object(); private static StringBuilder AppendStrUnLock = new StringBuilder(); private static StringBuilder AppendStrLock = new StringBuilder(); private static StringBuilder AppendStrMonitorLock = new StringBuilder(); /*获取当前计算机处理器数*/ private static int _particpants = Environment.ProcessorCount; /* coder:释迦苦僧 */ static void Main(string[] args) { _CookTasks = new Task[_particpants]; Stopwatch swTask1 = new Stopwatch(); swTask1.Start(); for (int task_index = 0; task_index < _particpants; task_index++) { _CookTasks[task_index] = Task.Factory.StartNew((num) => { Parallel.For(1, 1000, (i) => { string str = "append message " + i; bool lockTaken = false; try { Monitor.Enter(o, ref lockTaken); AppendStrMonitorLock.Append(str); } finally { if (lockTaken) Monitor.Exit(o); } }); }, task_index); } /*ContinueWhenAll 提供一组任务完成后 延续方法*/ var finalTask = Task.Factory.ContinueWhenAll(_CookTasks, (tasks) => { /*等待任务完成*/ Task.WaitAll(_CookTasks); swTask1.Stop(); Console.WriteLine("使用互斥锁,字符串长度:{0},耗时:{1}", AppendStrMonitorLock.Length, swTask1.ElapsedMilliseconds); /*释放资源*/ }); Console.ReadLine(); } } }
锁超时 Monitor.TryEnter(o, 2000, ref lockTaken);
在多任务中,很多任务试图获得锁从而进入临界区,如果其中一个参与者不能释放锁,那么其他所有的任务都要在Monitor.Enter的方法内永久的等待下去。Monitor.TryEnter方法则提供了超时机制
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace testmulity { class Program { private static Task[] _CookTasks { get; set; } private static object o = new object(); private static StringBuilder AppendStrUnLock = new StringBuilder(); private static StringBuilder AppendStrLock = new StringBuilder(); private static StringBuilder AppendStrMonitorLock = new StringBuilder(); /*获取当前计算机处理器数*/ private static int _particpants = Environment.ProcessorCount; static void Main(string[] args) { _CookTasks = new Task[_particpants]; Stopwatch swTask1 = new Stopwatch(); swTask1.Start(); for (int task_index = 0; task_index < _particpants; task_index++) { _CookTasks[task_index] = Task.Factory.StartNew((num) => { try { Parallel.For(1, 200000, (i) => { string str = "append message " + i; bool lockTaken = false; try { Monitor.TryEnter(o, 2000, ref lockTaken); if (!lockTaken) { throw new OperationCanceledException("锁超时...."); } if (i == 2) { Thread.Sleep(10000); } AppendStrMonitorLock.Append(str); } catch (Exception ex) { throw ex; } finally { if (lockTaken) Monitor.Exit(o); } }); } catch (Exception ex) { throw ex; } }, task_index); } /*ContinueWhenAll 提供一组任务完成后 延续方法*/ var finalTask = Task.Factory.ContinueWhenAll(_CookTasks, (tasks) => { /*等待任务完成*/ Task.WaitAll(_CookTasks); swTask1.Stop(); foreach (Task task in _CookTasks) { if (task.Exception != null) { /*任务执行完成后 输出所有异常 打印异常报表*/ foreach (Exception exception in task.Exception.InnerExceptions) { Console.WriteLine("异常信息:{0}", exception.Message); } } } Console.WriteLine("不采用Lock操作,字符串长度:{0},耗时:{1}", AppendStrMonitorLock.Length, swTask1.ElapsedMilliseconds); /*释放资源*/ }); Console.ReadLine(); } } }
需要注意,上述代码中,异常并没有被捕捉到,因此每一个不能获得锁的任务都会出错退出并停止执行。
System.Threading.Monitor类还提供了以下三个方法,大家可以参考MSND:
自旋锁 - System.Threading.SpinLock
如果持有锁的时间非常短,锁的粒度很精细,那么自旋锁可以获得比其他锁机制更好的性能,互斥锁System.Threading.Monitor的开销非常大。
下述代码展现System.Threading.Monitor和System.Threading.SpinLock的性能:
使用Monitor
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace testmulity { class Program { private static Task[] _CookTasks { get; set; } private static object o = new object(); private static StringBuilder AppendStrUnLock = new StringBuilder(); private static StringBuilder AppendStrLock = new StringBuilder(); private static StringBuilder AppendStrMonitorLock = new StringBuilder(); /*获取当前计算机处理器数*/ private static int _particpants = Environment.ProcessorCount; static void Main(string[] args) { SpinLock sl = new SpinLock(); _CookTasks = new Task[_particpants]; Thread.Sleep(4000); Stopwatch swTask1 = new Stopwatch(); swTask1.Start(); for (int task_index = 0; task_index < _particpants; task_index++) { _CookTasks[task_index] = Task.Factory.StartNew((num) => { Parallel.For(1, 200000, (i) => { string str = "append message " + i; bool lockTaken = false; try { Monitor.Enter(o, ref lockTaken); AppendStrMonitorLock.Append(str); } finally { if (lockTaken) Monitor.Exit(o); } }); }, task_index); } /*ContinueWhenAll 提供一组任务完成后 延续方法*/ var finalTask = Task.Factory.ContinueWhenAll(_CookTasks, (tasks) => { /*等待任务完成*/ Task.WaitAll(_CookTasks); swTask1.Stop(); Console.WriteLine("采用Monitor操作,字符串长度:{0},耗时:{1}", AppendStrMonitorLock.Length, swTask1.ElapsedMilliseconds); /*释放资源*/ }); Console.ReadLine(); } } }
使用SpinLock
class Program { private static Task[] _CookTasks { get; set; } private static object o = new object(); private static StringBuilder AppendStrUnLock = new StringBuilder(); private static StringBuilder AppendStrLock = new StringBuilder(); private static StringBuilder AppendStrMonitorLock = new StringBuilder(); /*获取当前计算机处理器数*/ private static int _particpants = Environment.ProcessorCount; static void Main(string[] args) { SpinLock sl = new SpinLock(); _CookTasks = new Task[_particpants]; Thread.Sleep(4000); Stopwatch swTask1 = new Stopwatch(); swTask1.Start(); for (int task_index = 0; task_index < _particpants; task_index++) { _CookTasks[task_index] = Task.Factory.StartNew((num) => { Parallel.For(1, 200000, (i) => { string str = "append message " + i; bool lockTaken = false; try { sl.Enter(ref lockTaken); AppendStrMonitorLock.Append(str); } finally { if (lockTaken) sl.Exit(); } }); }, task_index); } /*ContinueWhenAll 提供一组任务完成后 延续方法*/ var finalTask = Task.Factory.ContinueWhenAll(_CookTasks, (tasks) => { /*等待任务完成*/ Task.WaitAll(_CookTasks); swTask1.Stop(); Console.WriteLine("采用SpinLock操作,字符串长度:{0},耗时:{1}", AppendStrMonitorLock.Length, swTask1.ElapsedMilliseconds); /*释放资源*/ }); Console.ReadLine(); } }
SpinLock不要声明readlony的
基于自旋锁的等待-System.Threading.SpinWait
如果等待某个条件满足需要的时间很短,而且不希望发生昂贵的上下文切换,那么基于自旋的等待时一种很好的替换方案,SpinWait不仅提供了基本自旋功能,而且还提供了SpinWait.SpinUntil方法,使用这个方法能够自旋直到满足某个条件为止,此外SpinWait是一个Struct,从内存的角度上说,开销很小。SpinLock是对SpinWait的简单封装。
需要注意的是:长时间的自旋不是很好的做法,因为自旋会阻塞更高级的线程及其相关的任务,还会阻塞垃圾回收机制。SpinWait并没有设计为让多个任务或线程并发使用,因此多个任务或线程通过SpinWait方法进行自旋,那么每一个任务或线程都应该使用自己的SpinWait实例。
当一个线程自旋时,会将一个内核放入到一个繁忙的循环中,而不会让出当前处理器时间片剩余部分,当一个任务或者线程调用Thread.Sleep方法时,底层线程可能会让出当前处理器时间片的剩余部分,这是一个大开销的操作。
因此,在大部分情况下,不要在循环内调用Thread.Sleep方法等待特定的条件满足。
下面贴代码,方便大家理解,如有错误请指正:
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace testmulity { class Program { private static Task[] _CookTasks { get; set; } /*定义一个变量 该变量指示是否可以进行下一步操作*/ private static bool _stepbool = false; /*获取当前计算机处理器数*/ private static int _particpants = Environment.ProcessorCount; /* coder:释迦苦僧 */ static void Main(string[] args) { _CookTasks = new Task[_particpants]; for (int task_index = 0; task_index < _particpants; task_index++) { _CookTasks[task_index] = Task.Factory.StartNew((num) => { CookStep1(); /*等待5秒钟 _stepbool变为true ,如果1秒钟内没有淘好米 则提示超时*/ if (!SpinWait.SpinUntil(() => (_stepbool), 1000)) { Console.WriteLine("淘个米都花这么长时间...."); } else { /*按时淘好米开始煮饭*/ Console.WriteLine("淘好米煮饭...."); } }, task_index); } /*主线程创造超时条件*/ Thread.Sleep(3000); _stepbool = true; Console.ReadLine(); } static void CookStep1() { Console.WriteLine("淘米...."); } } }
if (!SpinWait.SpinUntil(() => (_stepbool), 1000))
volatile
volatile关键字能够保证;当这个共享变量被不同线程访问和更新且没有锁和原子操作的时候,最新的值总能在共享变量中表示出来。
volatile变量可以看作是“轻量级lock”。当出于简单编码和可伸缩性考虑时,我们可能会选择使用volatile变量而不是锁机制。某些情况下,如果读操作远多于写操作,也会比锁机制带来更高性能。
volatile变量具有“lock”的可见性,却不具备原子特性。也就是说线程能够自动发现volatile变量的最新值。volatile变量可以实现线程安全,但其应用有限。使用volatile变量的主要原因在于它使用非常简单,至少比使用锁机制要简单的多;其次便是性能原因了,某些情况下,它的性能要优于锁机制。此外,volatile操作不会造成阻塞。
参考: http://www.cnblogs.com/lucifer1982/archive/2008/03/23/1116981.html 大家可以看下 写的不错
private volatile bool stopped; public void Stop() { stopped = true; } public void FindFiles() { while (!stopped) { // searching files } }
ManualResetEventSlim
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace testmulity { class Program { static void Main() { MRES_SetWaitReset(); } static void MRES_SetWaitReset() { ManualResetEventSlim mres1 = new ManualResetEventSlim(false); var observer = Task.Factory.StartNew(() => { Console.WriteLine("阻塞当前线程,使 mres1 处于等待状态...!"); mres1.Wait(); while (true) { if (mres1.IsSet) { /*等待 mres1 Set()信号 当有信号时 在执行后面代码*/ Console.WriteLine("得到mres1信号,执行后续代码....!"); } Thread.Sleep(100); } }); Thread.Sleep(2000); Console.WriteLine("取消mres1等待状态"); mres1.Set(); Console.WriteLine("当前信号状态:{0}", mres1.IsSet); Thread.Sleep(300); mres1.Reset(); Console.WriteLine("当前信号状态:{0}", mres1.IsSet); Thread.Sleep(300); mres1.Set(); Console.WriteLine("当前信号状态:{0}", mres1.IsSet); Thread.Sleep(300); mres1.Reset(); Console.WriteLine("当前信号状态:{0}", mres1.IsSet); observer.Wait(); mres1.Dispose(); Console.ReadLine(); } } }
SemaphoreSlim
有时候,需要对访问一个资源或一个资源池的并发任务或者线程的数量做限制时,采用System.Threading.SemaphoreSlim类非常有用。
该了表示一个Windows内核信号量对象,如果等待的时间非常短,System.Threading.SemaphoreSlim类带来的额外开销会更少,而且更适合对任务处理,System.Threading.SemaphoreSlim提供的计数信号量没有使用Windows内核的信号量。
计数信号量:通过跟踪进入和离开任务或线程来协调对资源的访问,信号量需要知道能够通过信号量协调机制所访问共享资源的最大任务数,然后,信号量使用了一个计数器,根据任务进入或离开信号量控制区对计数器进行加减。
需要注意的是:信号量会降低可扩展型,而且信号量的目的就是如此。SemaphoreSlim实例并不能保证等待进入信号量的任务或线程的顺序。
下面贴代码,方便大家理解:
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace testmulity { class Program { static void Main() { SemaphoreSlim ss = new SemaphoreSlim(3); // 创建SemaphoreSlim 初始化信号量最多计数为3次 Console.WriteLine("创建SemaphoreSlim 初始化信号量最多计数为{0}次", ss.CurrentCount); // Launch an asynchronous Task that releases the semaphore after 100 ms Task t1 = Task.Factory.StartNew(() => { while (true) { /*阻止当前线程,直至它可进入 SemaphoreSlim 为止。*/ /*阻塞当前任务或线程,直到信号量几首大于0时它才能进入信号量*/ ss.Wait(); Console.WriteLine("允许进入 SemaphoreSlim 的线程的数量:{0}", ss.CurrentCount); Thread.Sleep(10); } }); Thread.Sleep(3000); /*当前Task只能进入3次*/ /*退出一次信号量 并递增信号量的计数*/ Console.WriteLine("退出一次信号量 并递增信号量的计数"); ss.Release(); Thread.Sleep(3000); /*退出3次信号量 并递增信号量的计数*/ Console.WriteLine("退出三次信号量 并递增信号量的计数"); ss.Release(3); /*等待任务完成*/ Task.WaitAll(t1); /*释放*/ ss.Dispose(); Console.ReadLine(); } } }
CountdownEvent
有时候,需要对数目随时间变化的任务进行跟踪,CountdownEvent是一个非轻量级的同步原语,与Task.WaitAll或者TaskFactory.ContinueWhenAll 等待其他任务完成执行而运行代码相比,CountdownEvent的开销要小得多。
CountdownEvent实例带有一个初始的信号计数,在典型的fork/join场景下,每当一个任务完成工作的时候,这个任务都会发出一个CountdownEvent实例的信号,并将其信号计数递减1,调用CountdownEvent的wait方法的任务将会阻塞,直到信号计数达到0.
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace testmulity { class Program { static void Main() { CountdownEvent cde = new CountdownEvent(3); // 创建SemaphoreSlim 初始化信号量最多计数为3次 Console.WriteLine(" InitialCount={0}, CurrentCount={1}, IsSet={2}", cde.InitialCount, cde.CurrentCount, cde.IsSet); // Launch an asynchronous Task that releases the semaphore after 100 ms Task t1 = Task.Factory.StartNew(() => { while (true) { Thread.Sleep(1000); if (!cde.IsSet) { cde.Signal(); Console.WriteLine(" InitialCount={0}, CurrentCount={1}, IsSet={2}", cde.InitialCount, cde.CurrentCount, cde.IsSet); } } }); cde.Wait(); /*将 CurrentCount 重置为 InitialCount 的值。*/ Console.WriteLine("将 CurrentCount 重置为 InitialCount 的值。"); cde.Reset(); cde.Wait(); /*将 CurrentCount 重置为 5*/ Console.WriteLine("将 CurrentCount 重置为 5"); cde.Reset(5); cde.AddCount(2); cde.Wait(); /*等待任务完成*/ Task.WaitAll(t1); Console.WriteLine("任务执行完成"); /*释放*/ cde.Dispose(); Console.ReadLine(); } } }
修改代码
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace testmulity { class Program { static void Main() { Console.Title = "AY2019"; CountdownEvent cde = new CountdownEvent(3); // 创建SemaphoreSlim 初始化信号量最多计数为3次 Console.WriteLine(" InitialCount={0}, CurrentCount={1}, IsSet={2}", cde.InitialCount, cde.CurrentCount, cde.IsSet); /*创建任务执行计数*/ Task t1 = Task.Factory.StartNew(() => { for (int index = 0; index <= 5; index++) { /*重置计数器*/ cde.Reset(); /*创建任务执行计数*/ while (true) { Thread.Sleep(1000); if (!cde.IsSet) { cde.Signal(); Console.WriteLine("第{0}轮计数 CurrentCount={1}", index, cde.CurrentCount); } else { Console.WriteLine("第{0}轮计数完成", index); break; } } /*等待计数完成*/ cde.Wait(); } }); t1.Wait(); /*释放*/ cde.Dispose(); Console.ReadLine(); } } }
reset()重置为初始的值,这里是3,然后执行3次,不是最后一个IsReset=false,是最后1个IsReset=true,通过手动Signal方法减去1
Wait就是阻塞到IsReset=true,或者值=0了;
PLINQ
并行是无序的。
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace testmulity { class Program { static void Main() { Console.Title = "AY2019"; ConcurrentQueue<Product> products = new ConcurrentQueue<Product>(); /*向集合中添加多条数据 可以修改数据量查看Linq和Plinq的性能*/ Parallel.For(0, 600000, (num) => { products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num }); }); /*采用LINQ查询符合条件的数据*/ Stopwatch sw = new Stopwatch(); sw.Restart(); var productListLinq = from product in products where (product.Name.Contains("1") && product.Name.Contains("2") && product.Category.Contains("1") && product.Category.Contains("2")) select product; Console.WriteLine("采用Linq 查询得出数量为:{0}", productListLinq.Count()); sw.Stop(); Console.WriteLine("采用Linq 耗时:{0}", sw.ElapsedMilliseconds); /*采用PLINQ查询符合条件的数据*/ sw.Restart(); var productListPLinq = from product in products.AsParallel() /*AsParallel 试图利用运行时所有可用的逻辑内核,从而使运行的速度比串行的版本要快 但是需要注意开销所带来的性能损耗*/ where (product.Name.Contains("1") && product.Name.Contains("2") && product.Category.Contains("1") && product.Category.Contains("2")) select product; Console.WriteLine("采用PLinq 查询得出数量为:{0}", productListPLinq.Count()); sw.Stop(); Console.WriteLine("采用PLinq 耗时:{0}", sw.ElapsedMilliseconds); Console.ReadLine(); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } } }
AsOrdered()与orderby
AsOrdered:保留查询的结果按源序列排序,在并行查询中,多条数据会被分在多个区域中进行查询,查询后再将多个区的数据结果合并到一个结果集中并按源序列顺序返回。
orderby:将返回的结果集按指定顺序进行排序
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace testmulity { class Program { /*code:释迦苦僧*/ static void Main() { ConcurrentQueue<string> products = new ConcurrentQueue<string>(); products.Enqueue("E"); products.Enqueue("F"); products.Enqueue("B"); products.Enqueue("G"); products.Enqueue("A"); products.Enqueue("C"); products.Enqueue("SS"); products.Enqueue("D"); /*不采用并行化 其数据输出结果 不做任何处理 */ var productListLinq = from product in products where (product.Length == 1) select product; string appendStr = string.Empty; foreach (string str in productListLinq) { appendStr += str + " "; } Console.WriteLine("不采用并行化 输出:{0}", appendStr); /*不采用任何 排序 策略 其数据输出结果 是直接将分区数据结果合并起来 不做任何处理 */ var productListPLinq = from product in products.AsParallel() where (product.Length == 1) select product; appendStr = string.Empty; foreach (string str in productListPLinq) { appendStr += str + " "; } Console.WriteLine("不采用AsOrdered 输出:{0}", appendStr); /*采用 AsOrdered 排序策略 其数据输出结果 是直接将分区数据结果合并起来 并按原始数据顺序排序*/ var productListPLinq1 = from product in products.AsParallel().AsOrdered() where (product.Length == 1) select product; appendStr = string.Empty; foreach (string str in productListPLinq1) { appendStr += str + " "; } Console.WriteLine("采用AsOrdered 输出:{0}", appendStr); /*采用 orderby 排序策略 其数据输出结果 是直接将分区数据结果合并起来 并按orderby要求进行排序*/ var productListPLinq2 = from product in products.AsParallel() where (product.Length == 1) orderby product select product; appendStr = string.Empty; foreach (string str in productListPLinq2) { appendStr += str + " "; } Console.WriteLine("采用orderby 输出:{0}", appendStr); Console.ReadLine(); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } } }
指定执行模式 WithExecutionMode,下面求平均数
ConcurrentQueue<int> products = new ConcurrentQueue<int>(); /*向集合中添加多条数据*/ Parallel.For(0, 6000000, (num) => { products.Enqueue(num); }); /*采用LINQ 返回 IEumerable<int>*/ var productListLinq = (from product in products select product).Average(); Console.WriteLine("采用Average计算平均值:{0}", productListLinq); /*采用PLINQ 返回 ParallelQuery<int>*/ var productListPLinq = (from product in products.AsParallel() select product).Average(); Console.WriteLine("采用Average计算平均值:{0}", productListPLinq); Console.ReadLine();
PLINQ查询中使用类似于Average,Max,Min,Sum之类的聚合函数就可以充分利用并行所带来好处
在LINQ版本中,该方法会返回一个 IEumerable<int>,即调用 Eumerable.Range方法生成指定范围整数序列的结果,
如果想对特定数据源进行LINQ查询时,可以定义为 private IEquatable<int> products
在PLINQ版本中,该方法会返回一个 ParallelQuery<int>,即调用并行版本中System.Linq.ParallelEumerable的ParallelEumerable.Range方法,通过这种方法得到的结果序列也是并行序列,可以再PLINQ中并行运行。
如果想对特定数据源进行PLINQ查询时,可以定义为 private ParallelQuery<int> products
并发PLINQ任务
如代码所示tk1,tk2,tk3三个任务,tk2,tk3任务的运行需要基于tk1任务的结果,因此,参数中指定了TaskContinuationOptions.OnlyOnRanToCompletion,通过这种方式,每个被串联的任务都会等待之前的任务完成之后才开始执行,tk2,tk3在tk1执行完成后,这两个任务的PLINQ查询可以并行运行,并将会可能地使用多个逻辑内核。
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace testmulity { class Program { /*code:释迦苦僧*/ static void Main() { ConcurrentQueue<Product> products = new ConcurrentQueue<Product>(); /*向集合中添加多条数据*/ Parallel.For(0, 600000, (num) => { products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num }); }); CancellationTokenSource cts = new CancellationTokenSource(); /*创建tk1 任务 查询 符合 条件的数据*/ Task<ParallelQuery<Product>> tk1 = new Task<ParallelQuery<Product>>((ct) => { Console.WriteLine("开始执行 tk1 任务", products.Count); Console.WriteLine("tk1 任务中 数据结果集数量为:{0}", products.Count); var result = products.AsParallel().Where(p => p.Name.Contains("1") && p.Name.Contains("2")); return result; }, cts.Token); /*创建tk2 任务,在执行tk1任务完成 基于tk1的结果查询 符合 条件的数据*/ Task<ParallelQuery<Product>> tk2 = tk1.ContinueWith<ParallelQuery<Product>>((tk) => { Console.WriteLine("开始执行 tk2 任务", products.Count); Console.WriteLine("tk2 任务中 数据结果集数量为:{0}", tk.Result.Count()); var result = tk.Result.Where(p => p.Category.Contains("1") && p.Category.Contains("2")); return result; }, TaskContinuationOptions.OnlyOnRanToCompletion); /*创建tk3 任务,在执行tk1任务完成 基于tk1的结果查询 符合 条件的数据*/ Task<ParallelQuery<Product>> tk3 = tk1.ContinueWith<ParallelQuery<Product>>((tk) => { Console.WriteLine("开始执行 tk3 任务", products.Count); Console.WriteLine("tk3 任务中 数据结果集数量为:{0}", tk.Result.Count()); var result = tk.Result.Where(p => p.SellPrice > 1111 && p.SellPrice < 222222); return result; }, TaskContinuationOptions.OnlyOnRanToCompletion); tk1.Start(); Task.WaitAll(tk1, tk2, tk3); Console.WriteLine("tk2任务结果输出,筛选后记录总数为:{0}", tk2.Result.Count()); Console.WriteLine("tk3任务结果输出,筛选后记录总数为:{0}", tk3.Result.Count()); tk1.Dispose(); tk2.Dispose(); tk3.Dispose(); cts.Dispose(); Console.ReadLine(); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } } }
取消PLINQ WithCancellation
通过WithCancellation取消当前PLINQ正在执行的查询操作,代码如下
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace testmulity { class Program { /*code:释迦苦僧*/ static void Main() { ConcurrentQueue<Product> products = new ConcurrentQueue<Product>(); /*向集合中添加多条数据*/ Parallel.For(0, 600000, (num) => { products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num }); }); CancellationTokenSource cts = new CancellationTokenSource(); CancellationToken token = cts.Token; /*创建tk1 任务 查询 符合 条件的数据*/ Task<ParallelQuery<Product>> tk1 = new Task<ParallelQuery<Product>>((ct) => { var result = products.AsParallel(); try { Console.WriteLine("开始执行 tk1 任务", products.Count); Console.WriteLine("tk1 任务中 数据结果集数量为:{0}", products.Count); result = products.AsParallel().WithCancellation(token).Where(p => p.Name.Contains("1") && p.Name.Contains("2")); } catch (AggregateException ex) { foreach (Exception e in ex.InnerExceptions) { Console.WriteLine("tk3 错误:{0}", e.Message); } } return result; }, cts.Token); /*创建tk2 任务,在执行tk1任务完成 基于tk1的结果查询 符合 条件的数据*/ Task<ParallelQuery<Product>> tk2 = tk1.ContinueWith<ParallelQuery<Product>>((tk) => { var result = tk.Result; try { Console.WriteLine("开始执行 tk2 任务", products.Count); Console.WriteLine("tk2 任务中 数据结果集数量为:{0}", tk.Result.Count()); result = tk.Result.WithCancellation(token).Where(p => p.Category.Contains("1") && p.Category.Contains("2")); } catch (AggregateException ex) { foreach (Exception e in ex.InnerExceptions) { Console.WriteLine("tk3 错误:{0}", e.Message); } } return result; }, TaskContinuationOptions.OnlyOnRanToCompletion); /*创建tk3 任务,在执行tk1任务完成 基于tk1的结果查询 符合 条件的数据*/ Task<ParallelQuery<Product>> tk3 = tk1.ContinueWith<ParallelQuery<Product>>((tk) => { var result = tk.Result; try { Console.WriteLine("开始执行 tk3 任务", products.Count); Console.WriteLine("tk3 任务中 数据结果集数量为:{0}", tk.Result.Count()); result = tk.Result.WithCancellation(token).Where(p => p.SellPrice > 1111 && p.SellPrice < 222222); } catch (AggregateException ex) { foreach (Exception e in ex.InnerExceptions) { Console.WriteLine("tk3 错误:{0}", e.Message); } } return result; }, TaskContinuationOptions.OnlyOnRanToCompletion); tk1.Start(); try { Thread.Sleep(10); cts.Cancel();//取消任务 Task.WaitAll(tk1, tk2, tk3); Console.WriteLine("tk2任务结果输出,筛选后记录总数为:{0}", tk2.Result.Count()); Console.WriteLine("tk3任务结果输出,筛选后记录总数为:{0}", tk3.Result.Count()); } catch (AggregateException ex) { foreach (Exception e in ex.InnerExceptions) { Console.WriteLine("错误:{0}", e.Message); } } tk1.Dispose(); tk2.Dispose(); tk3.Dispose(); cts.Dispose(); Console.ReadLine(); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } } }
指定查询时所需的并行度 WithDegreeOfParallelism
默认情况下,PLINQ总是会试图利用所有的可用逻辑内核达到最佳性能,在程序中我们可以利用WithDegreeOfParallelism方法指定一个不同最大并行度。
下面贴代码:
/*tk1任务 采用所有可用处理器*/ result = products.AsParallel().WithCancellation(token).WithDegreeOfParallelism(Environment.ProcessorCount).Where(p => p.Name.Contains("1") && p.Name.Contains("2") && p.Category.Contains("1") && p.Category.Contains("2")); /*tk1任务 采用1个可用处理器*/ result = products.AsParallel().WithCancellation(token).WithDegreeOfParallelism(1).Where(p => p.Name.Contains("1") && p.Name.Contains("2") && p.Category.Contains("1") && p.Category.Contains("2"));
ForAll
products.AsParallel().Where(P => P.Name.Contains("1") && P.Name.Contains("2") && P.Name.Contains("3")).ForAll(product => { Console.WriteLine("Name:{0}", product.Name); });
ForAll是并行,foreach是串行,如果需要以特定的顺序处理数据,那么必须使用上述串行循环或方法。
通过WithMergeOptions扩展方法提示PLINQ应该优先使用哪种方式合并并行结果片段,如下:
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace testmulity { class Program { static void Main() { Console.WriteLine("当前计算机处理器数:{0}", Environment.ProcessorCount); ConcurrentQueue<Product> products = new ConcurrentQueue<Product>(); /*向集合中添加多条数据*/ Parallel.For(0, 600000, (num) => { products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num }); }); Stopwatch sw = new Stopwatch(); Thread.Sleep(1000); sw.Restart(); int count = 0; Task tk1 = Task.Factory.StartNew(() => { var result = products.AsParallel().WithMergeOptions(ParallelMergeOptions.AutoBuffered).Where(p => p.Name.Contains("1") && p.Name.Contains("2") && p.Category.Contains("1") && p.Category.Contains("2")); count = result.Count(); }); Task.WaitAll(tk1); sw.Stop(); Console.WriteLine("ParallelMergeOptions.AutoBuffered 耗时:{0},数量:{1}", sw.ElapsedMilliseconds, count); sw.Restart(); int count1 = 0; Task tk2 = Task.Factory.StartNew(() => { var result = products.AsParallel().WithMergeOptions(ParallelMergeOptions.Default).Where(p => p.Name.Contains("1") && p.Name.Contains("2") && p.Category.Contains("1") && p.Category.Contains("2")); count1 = result.Count(); }); Task.WaitAll(tk2); sw.Stop(); Console.WriteLine("ParallelMergeOptions.Default 耗时:{0},数量:{1}", sw.ElapsedMilliseconds, count1); sw.Restart(); int count2 = 0; Task tk3 = Task.Factory.StartNew(() => { var result = products.AsParallel().WithMergeOptions(ParallelMergeOptions.FullyBuffered).Where(p => p.Name.Contains("1") && p.Name.Contains("2") && p.Category.Contains("1") && p.Category.Contains("2")); count2 = result.Count(); }); Task.WaitAll(tk3); sw.Stop(); Console.WriteLine("ParallelMergeOptions.FullyBuffered 耗时:{0},数量:{1}", sw.ElapsedMilliseconds, count2); sw.Restart(); int count3 = 0; Task tk4 = Task.Factory.StartNew(() => { var result = products.AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered).Where(p => p.Name.Contains("1") && p.Name.Contains("2") && p.Category.Contains("1") && p.Category.Contains("2")); count3 = result.Count(); }); Task.WaitAll(tk4); sw.Stop(); Console.WriteLine("ParallelMergeOptions.NotBuffered 耗时:{0},数量:{1}", sw.ElapsedMilliseconds, count3); tk4.Dispose(); tk3.Dispose(); tk2.Dispose(); tk1.Dispose(); Console.ReadLine(); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } } }
需要注意的是:每一个选项都有其优点和缺点,因此一定奥测量显示第一个结果的时间以及完成整个查询所需要的时间,这点很重要 。
使用PLINQ执行MapReduce算法 ILookup IGrouping
mapreduce ,也称为Map/reduce 或者Map&Reduce ,是一种非常流行的框架,能够充分利用并行化处理巨大的数据集,MapReduce的基本思想非常简单:将数据处理问题分解为以下两个独立且可以并行执行的操作:
映射(Map)-对数据源进行操作,为每一个数据项计算出一个键值对。运行的结果是一个键值对的集合,根据键进行分组。
规约(Reduce)-对映射操作产生的根据键进行分组的所有键值对进行操作,对每一个组执行归约操作,这个操作可以返回一个或多个值。
下面贴代码,方便大家理解,但是该案列所展示的并不是一个纯粹的MapReduce算法实现:
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace testmulity { class Program { static void Main() { ConcurrentQueue<string> list = new ConcurrentQueue<string>(); list.Enqueue("A"); list.Enqueue("B"); list.Enqueue("C"); list.Enqueue("D"); list.Enqueue("A"); list.Enqueue("D"); Console.WriteLine("Select......."); list.AsParallel().Select(p => new { Name = p, Count = 1 }).ForAll((p) => { Console.WriteLine("{0}\t{1}", p.Name, p.Count); }); Console.WriteLine("ILookup......."); /*map操作生成的键值对由一个单词和数量1组成,该代码意在将每个单词作为键并将1作为值加入*/ ILookup<string, int> map = list.AsParallel().ToLookup(p => p, k => 1); foreach (var v in map) { Console.Write(v.Key); foreach (int val in v) Console.WriteLine("\t{0}", val); } /*reduce操作单词出现的次数*/ var reduce = from IGrouping<string, int> reduceM in map.AsQueryable() select new { key = reduceM.Key, count = reduceM.Count() }; Console.WriteLine("IGrouping......."); foreach (var v in reduce) { Console.Write(v.key); Console.WriteLine("\t{0}", v.count); } Console.ReadLine(); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } } }
参考文章: https://www.cnblogs.com/woxpp/p/3951096.html
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang===============
推荐您阅读更多有关于“C#,C#多线程,”的文章
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- AY C# 线程 并行 Task 笔记
- 你应该这样去开发接口:Java多线程并行计算
- C# 并行和多线程编程:认识和使用 Task
- 15分钟读懂进程线程、同步异步、阻塞非阻塞、并发并行,太实用了!
- sqltoy-orm-4.17.6 发版,支持 Greenplum、并行查询可设置并行数量
- PostgreSQL并行查询介绍
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。