内容简介:我AY的总结:每个方法处理数据多的时候,大于10000吧,考虑用这种方式,否则不如顺序方式。不应该抢外面同一个资源,每个方法都可以单独执行。
我AY的总结:
1 Parallel.Invoke(N个需要同时执行的方法);
每个方法处理数据多的时候,大于10000吧,考虑用这种方式,否则不如顺序方式。
不应该抢外面同一个资源,每个方法都可以单独执行。
2Parallel.For(1,100, index=>{}) 只能整数的
跟for循环对比,Parallel.For是无序执行的。
10万循环对比
10万循环的并行for
没感觉到
1万对比
1万并行,
这个取决于CPU了,数据大,并行优势还是有的。
3Parallel.ForEach(集合,(model)=>{})
无序的,可以用来处理无顺序性的任务。
ParallelLoopState
ParallelLoopState该实例提供了以下两个方法用于停止 Parallel.For,Parallel.ForEach
Break-这个方法告诉并行循环应该在执行了当前迭代后尽快地停止执行。吐过调用Break时正在处理迭代100,那么循环仍然会处理所有小于100的迭代。
Stop-这个方法告诉并行循环应该尽快停止执行,如果调用Stop时迭代100正在被处理,那么循环无法保证处理完所有小于100的迭代
Parallel是无序的,所以,Break感觉像过滤器,Stop感觉立即关闭水管了,剩下的都不处理了。
示例代码
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace testmulity
{
class Program
{
static void Main(string[] args)
{
Console.Title = "AY2019";
List<Product> productList = GetProcuctList_500();
Thread.Sleep(3000);
Parallel.For(0, productList.Count, (i, loopState) =>
{
if (i < 100)
{
Console.WriteLine("采用Stop index:{0}", i);
}
else
{
/* 满足条件后 尽快停止执行,无法保证小于100的索引数据全部输出*/
loopState.Stop();
return;
}
});
Thread.Sleep(3000);
Parallel.For(0, productList.Count, (i, loopState) =>
{
if (i < 100)
{
Console.WriteLine("采用Break index:{0}", i);
}
else
{
/* 满足条件后 尽快停止执行,保证小于100的索引数据全部输出*/
loopState.Break();
return;
}
});
Thread.Sleep(3000);
Parallel.ForEach(productList, (model, loopState) =>
{
if (model.SellPrice < 10)
{
Console.WriteLine("采用Stop index:{0}", model.SellPrice);
}
else
{
/* 满足条件后 尽快停止执行,无法保证满足条件的数据全部输出*/
loopState.Stop();
return;
}
});
Thread.Sleep(3000);
Parallel.ForEach(productList, (model, loopState) =>
{
if (model.SellPrice < 10)
{
Console.WriteLine("采用Break index:{0}", model.SellPrice);
}
else
{
/* 满足条件后 尽快停止执行,保证满足条件的数据全部输出*/
loopState.Break();
return;
}
});
Console.ReadLine();
}
private static List<Product> GetProcuctList_500()
{
List<Product> result = new List<Product>();
for (int index = 1; index < 500; index++)
{
Product model = new Product();
model.Category = "Category" + index;
model.Name = "Name" + index;
model.SellPrice = index;
result.Add(model);
}
return result;
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}
}
System.Threading.Tasks.Task
一个Task表示一个异步操作,Task提供了很多方法和属性,通过这些方法和属性能够对Task的执行进行控制,并且能够获得其状态信息。
Task的创建和执行都是独立的,因此可以对关联操作的执行拥有完全的控制权。
使用Parallel.For、Parallel.ForEach的循环迭代的并行执行,TPL会在后台创建System.Threading.Tasks.Task的实例。
使用Parallel.Invoke时,TPL也会创建与调用的委托数目一致的System.Threading.Tasks.Task的实例。
Task生命周期
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace testmulity
{
class Program
{
static void Main(string[] args)
{
Console.Title = "AY2019";
/* 创建一个任务 不调用 不执行 状态为Created */
Task tk = new Task(() =>
{
});
Console.WriteLine(tk.Status.ToString());
/* 创建一个任务 执行 状态为 WaitingToRun */
Task tk1 = new Task(() =>
{
});
tk1.Start();/*对于安排好的任务,就算调用Start方法也不会立马启动 此时任务的状态为WaitingToRun*/
Console.WriteLine(tk1.Status.ToString());
/* 创建一个主任务 */
Task mainTask = new Task(() =>
{
SpinWait.SpinUntil(() =>
{
return false;
}, 30000);
});
/* 将子任务加入到主任务完成之后执行 */
Task subTask = mainTask.ContinueWith((t1) =>
{
});
/* 启动主任务 */
mainTask.Start();
/* 此时子任务状态为 WaitingForActivation */
Console.WriteLine(subTask.Status.ToString());
/* 创建一个任务 执行 后 等待一段时间 并行未结束的情况下 状态为 Running */
Task tk2 = new Task(() =>
{
SpinWait.SpinUntil(() => false, 30000);
});
tk2.Start();/*对于安排好的任务,就算调用Start方法也不会立马启动*/
SpinWait.SpinUntil(() => false, 300);
Console.WriteLine(tk2.Status.ToString());
/* 创建一个任务 然后取消该任务 状态为Canceled */
CancellationTokenSource cts = new CancellationTokenSource();
Task tk3 = new Task(() =>
{
for (int i = 0; i < int.MaxValue; i++)
{
if (!cts.Token.IsCancellationRequested)
{
cts.Token.ThrowIfCancellationRequested();
}
}
}, cts.Token);
tk3.Start();/*启动任务*/
SpinWait.SpinUntil(() => false, 100);
cts.Cancel();/*取消该任务执行 但并非立马取消 所以对于Canceled状态也不会立马生效*/
SpinWait.SpinUntil(() => false, 1000);
Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled);
SpinWait.SpinUntil(() => false, 1000);
Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled);
SpinWait.SpinUntil(() => false, 1000);
Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled);
/*创建一个任务 让它成功的运行完成 会得到 RanToCompletion 状态*/
Task tk4 = new Task(() =>
{
SpinWait.SpinUntil(() => false, 10);
});
tk4.Start();
SpinWait.SpinUntil(() => false, 300);
Console.WriteLine(tk4.Status.ToString());
/*创建一个任务 让它运行失败 会得到 Faulted 状态*/
Task tk5 = new Task(() =>
{
throw new Exception();
});
tk5.Start();
SpinWait.SpinUntil(() => false, 300);
Console.WriteLine(tk5.Status.ToString());
Console.ReadLine();
}
}
}
异步执行多个任务
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace testmulity
{
class Program
{
private static ConcurrentQueue<Product> queue = null;
static void Main(string[] args)
{
Console.Title = "AY2019";
queue = new ConcurrentQueue<Product>();
Task tk1 = new Task(() => { SetProduct(1); SetProduct(3); });
Task tk2 = new Task(() => SetProduct(2));
tk1.Start();
tk2.Start();
Console.ReadLine();
}
static void SetProduct(int index)
{
Parallel.For(0, 10000, (i) =>
{
Product model = new Product();
model.Name = "Name" + i;
model.SellPrice = i;
model.Category = "Category" + i;
queue.Enqueue(model);
});
Console.WriteLine("SetProduct {0} 执行完成", index);
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}
}
Task.WaitAll(N个Task,逗号隔开)
Task.WaitAll 方法,这个方法是同步执行的,在Task作为参数被接受,所有Task结束其执行前,主线程不会继续执行下一条指令
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace testmulity
{
class Program
{
private static ConcurrentQueue<Product> queue = null;
/* coder:释迦苦僧 */
static void Main(string[] args)
{
Console.Title = "AY2019";
queue = new ConcurrentQueue<Product>();
Task tk1 = new Task(() => { SetProduct(1); SetProduct(3); });
Task tk2 = new Task(() => SetProduct(2));
tk1.Start();
tk2.Start();
/*等待任务执行完成后再输出 ====== */
Task.WaitAll(tk1, tk2);
Console.WriteLine("等待任务执行完成后再输出 ======");
Task tk3 = new Task(() => { SetProduct(1); SetProduct(3); });
Task tk4 = new Task(() => SetProduct(2));
tk3.Start();
tk4.Start();
/*等待任务执行前输出 ====== */
Console.WriteLine("等待任务执行前输出 ======");
Task.WaitAll(tk3, tk4);
Console.ReadLine();
}
static void SetProduct(int index)
{
Parallel.For(0, 10000, (i) =>
{
Product model = new Product();
model.Name = "Name" + i;
model.SellPrice = i;
model.Category = "Category" + i;
queue.Enqueue(model);
});
Console.WriteLine("SetProduct {0} 执行完成", index);
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}
}
等待超时
if (!Task.WaitAll(new Task[] { tk1, tk2 }, 10))
{
Console.WriteLine("tk1和tk2在10秒内就完成了");
}
CancellationTokenSource,CancellationToken取消任务
class Program
{
private static ConcurrentQueue<Product> queue = null;
static void Main(string[] args)
{
queue = new ConcurrentQueue<Product>();
System.Threading.CancellationTokenSource token = new CancellationTokenSource();
Task tk1 = Task.Factory.StartNew(() => SetProduct(token.Token));
Task tk2 = Task.Factory.StartNew(() => SetProduct(token.Token));
Thread.Sleep(10);
/*取消任务操作*/
token.Cancel();
try
{
/*等待完成*/
Task.WaitAll(new Task[] { tk1, tk2 });
}
catch (AggregateException ex)
{
/*如果当前的任务正在被取消,那么还会抛出一个TaskCanceledException异常,这个异常包含在AggregateException异常中*/
Console.WriteLine("tk1 Canceled:{0}", tk1.IsCanceled);
Console.WriteLine("tk1 Canceled:{0}", tk2.IsCanceled);
}
Thread.Sleep(2000);
Console.WriteLine("tk1 Canceled:{0}", tk1.IsCanceled);
Console.WriteLine("tk1 Canceled:{0}", tk2.IsCanceled);
Console.ReadLine();
}
static void SetProduct(System.Threading.CancellationToken ct)
{
/* 每一次循环迭代,都会有新的代码调用 ThrowIfCancellationRequested
* 这行代码能够对 OpreationCanceledException 异常进行观察
* 并且这个异常的标记与Task实例关联的那个标记进行比较,如果两者相同 ,而且IsCancelled属性为True,那么Task实例就知道存在一个要求取消的请求,并且会将状态转变为Canceled状态,中断任务执行。
* 如果当前的任务正在被取消,那么还会抛出一个TaskCanceledException异常,这个异常包含在AggregateException异常中
/*检查取消标记*/
ct.ThrowIfCancellationRequested();
for (int i = 0; i < 50000; i++)
{
Product model = new Product();
model.Name = "Name" + i;
model.SellPrice = i;
model.Category = "Category" + i;
queue.Enqueue(model);
ct.ThrowIfCancellationRequested();
}
Console.WriteLine("SetProduct 执行完成");
}
在调用的方法,传入System.Threading.CancellationTokenSource的示例
Task异常
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace testmulity
{
class Program
{
private static ConcurrentQueue<Product> queue = null;
static void Main(string[] args)
{
queue = new ConcurrentQueue<Product>();
System.Threading.CancellationTokenSource token = new CancellationTokenSource();
Task tk1 = Task.Factory.StartNew(() => SetProduct(token.Token));
Thread.Sleep(2000);
if (tk1.IsFaulted)
{
/* 循环输出异常 */
foreach (Exception ex in tk1.Exception.InnerExceptions)
{
Console.WriteLine("tk1 Exception:{0}", ex.Message);
}
}
Console.ReadLine();
}
static void SetProduct(System.Threading.CancellationToken ct)
{
for (int i = 0; i < 5; i++)
{
throw new Exception(string.Format("Exception Index {0}", i));
}
Console.WriteLine("SetProduct 执行完成");
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}
}
根据声明周期来的,通过CancellationTokenSource的实例知道状态
Task返回值
static void Main(string[] args)
{
Task<List<Product>> tk1 = Task<List<Product>>.Factory.StartNew(() => SetProduct());
Task.WaitAll(tk1);
Console.WriteLine(tk1.Result.Count);
Console.WriteLine(tk1.Result[0].Name);
Console.ReadLine();
}
static List<Product> SetProduct()
{
List<Product> result = new List<Product>();
for (int i = 0; i < 500; i++)
{
Product model = new Product();
model.Name = "Name" + i;
model.SellPrice = i;
model.Category = "Category" + i;
result.Add(model);
}
Console.WriteLine("SetProduct 执行完成");
return result;
}
Task的泛型T就是返回值类型,使用Result获得
ContinueWith 串联多个任务,有点linq。
Task t2 = t1.ContinueWith((t) =>
{
Console.WriteLine("执行 t2 任务");
SpinWait.SpinUntil(() =>
{
return false;
}, 2000);
});
/*创建任务t3 t3任务的执行 依赖与t2任务的执行完成*/
Task t3 = t2.ContinueWith((t) =>
{
Console.WriteLine("执行 t3 任务");
});
任务2执行完再执行任务3
TaskContinuationOptions
TaskContinuationOptions.NotOnFaulted 在上一个任务失败时候,不执行当前任务
TaskContinuationOptions.NotOnCanceled 同理上一个任务取消时候,不执行当前任务
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace testmulity
{
class Program
{
static void Main(string[] args)
{
/*创建任务t1*/
Task t1 = Task.Factory.StartNew(() =>
{
Console.WriteLine("执行 t1 任务");
SpinWait.SpinUntil(() =>
{
return false;
}, 2000);
throw new Exception("异常");
});
/*创建任务t2 t2任务的执行 依赖与t1任务的执行完成*/
Task t2 = t1.ContinueWith((t) =>
{
Console.WriteLine(t.Status);
Console.WriteLine("执行 t2 任务");
SpinWait.SpinUntil(() =>
{
return false;
}, 2000);
}, TaskContinuationOptions.NotOnFaulted);
/*创建任务t3 t3任务的执行 依赖与t2任务的执行完成*/
Task t3 = t2.ContinueWith((t) =>
{
Console.WriteLine(t.Status);
Console.WriteLine("执行 t3 任务");
},TaskContinuationOptions.NotOnCanceled);
Console.WriteLine("结束");
Console.ReadLine();
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}
}
1失败了,2被取消,所以3不执行。
改成longRunning以后
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
并行集合
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace testmulity
{
class Program
{
private static object o = new object();
private static List<Product> _Products { get; set; }
/*
* 代码中 创建三个并发线程 来操作_Products 集合
* System.Collections.Generic.List 这个列表在多个线程访问下,不能保证是安全的线程,所以不能接受并发的请求,我们必须对ADD方法的执行进行串行化
*/
static void Main(string[] args)
{
_Products = new List<Product>();
/*创建任务 t1 t1 执行 数据集合添加操作*/
Task t1 = Task.Factory.StartNew(() =>
{
AddProducts();
});
/*创建任务 t2 t2 执行 数据集合添加操作*/
Task t2 = Task.Factory.StartNew(() =>
{
AddProducts();
});
/*创建任务 t3 t3 执行 数据集合添加操作*/
Task t3 = Task.Factory.StartNew(() =>
{
AddProducts();
});
Task.WaitAll(t1, t2, t3);
Console.WriteLine(_Products.Count);
Console.ReadLine();
}
/*执行集合数据添加操作*/
static void AddProducts()
{
Parallel.For(0, 1000, (i) =>
{
Product product = new Product();
product.Name = "name" + i;
product.Category = "Category" + i;
product.SellPrice = i;
_Products.Add(product);
});
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}
}
代码中开启了三个并发操作,每个操作都向集合中添加1000条数据,在没有保障线程安全和串行化的运行下,实际得到的数据并没有3000条,结果如下
使用lock集合
锁带来性能损失
System.Collections.Concurrent
线程安全并不是没有代价的,比起System.Collenctions和System.Collenctions.Generic命名空间中的列表、集合和数组来说,并发集合会有更大的开销。因此,应该只在需要从多个任务中并发访问集合的时候才使用并发几个,在串行代码中使用并发集合是没有意义的,因为它们会增加无谓的开销。
为此,在.NET Framework中提供了System.Collections.Concurrent新的命名空间可以访问用于解决线程安全问题,通过这个命名空间能访问以下为并发做好了准备的集合。
1.BlockingCollection 与经典的阻塞队列数据结构类似,能够适用于多个任务添加和删除数据,提供阻塞和限界能力。
2.ConcurrentBag 提供对象的线程安全的无序集合
3.ConcurrentDictionary 提供可有多个线程同时访问的键值对的线程安全集合
4.ConcurrentQueue 提供线程安全的先进先出集合
5.ConcurrentStack 提供线程安全的后进先出集合
这些集合通过使用比较并交换和内存屏障等技术,避免使用典型的互斥重量级的锁,从而保证线程安全和性能。
ConcurrentQueue是完全无锁的,能够支持并发的添加元素,先进先出。下面贴代码,详解见注释:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace testmulity
{
class Program
{
private static object o = new object();
/*定义 Queue*/
private static Queue<Product> _Products { get; set; }
private static ConcurrentQueue<Product> _ConcurrenProducts { get; set; }
/*
* 代码中 创建三个并发线程 来操作_Products 和 _ConcurrenProducts 集合,每次添加 10000 条数据 查看 一般队列Queue 和 多线程安全下的队列ConcurrentQueue 执行情况
*/
static void Main(string[] args)
{
Thread.Sleep(1000);
_Products = new Queue<Product>();
Stopwatch swTask = new Stopwatch();
swTask.Start();
/*创建任务 t1 t1 执行 数据集合添加操作*/
Task t1 = Task.Factory.StartNew(() =>
{
AddProducts();
});
/*创建任务 t2 t2 执行 数据集合添加操作*/
Task t2 = Task.Factory.StartNew(() =>
{
AddProducts();
});
/*创建任务 t3 t3 执行 数据集合添加操作*/
Task t3 = Task.Factory.StartNew(() =>
{
AddProducts();
});
Task.WaitAll(t1, t2, t3);
swTask.Stop();
Console.WriteLine("List<Product> 当前数据量为:" + _Products.Count);
Console.WriteLine("List<Product> 执行时间为:" + swTask.ElapsedMilliseconds);
Thread.Sleep(1000);
_ConcurrenProducts = new ConcurrentQueue<Product>();
Stopwatch swTask1 = new Stopwatch();
swTask1.Start();
/*创建任务 tk1 tk1 执行 数据集合添加操作*/
Task tk1 = Task.Factory.StartNew(() =>
{
AddConcurrenProducts();
});
/*创建任务 tk2 tk2 执行 数据集合添加操作*/
Task tk2 = Task.Factory.StartNew(() =>
{
AddConcurrenProducts();
});
/*创建任务 tk3 tk3 执行 数据集合添加操作*/
Task tk3 = Task.Factory.StartNew(() =>
{
AddConcurrenProducts();
});
Task.WaitAll(tk1, tk2, tk3);
swTask1.Stop();
Console.WriteLine("ConcurrentQueue<Product> 当前数据量为:" + _ConcurrenProducts.Count);
Console.WriteLine("ConcurrentQueue<Product> 执行时间为:" + swTask1.ElapsedMilliseconds);
Console.ReadLine();
}
/*执行集合数据添加操作*/
static void AddProducts()
{
Parallel.For(0, 30000, (i) =>
{
Product product = new Product();
product.Name = "name" + i;
product.Category = "Category" + i;
product.SellPrice = i;
lock (o)
{
_Products.Enqueue(product);
}
});
}
/*执行集合数据添加操作*/
static void AddConcurrenProducts()
{
Parallel.For(0, 30000, (i) =>
{
Product product = new Product();
product.Name = "name" + i;
product.Category = "Category" + i;
product.SellPrice = i;
_ConcurrenProducts.Enqueue(product);
});
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}
}
提供了 TryPeek和TryDequeue方法
用法同Queue差不多
ConcurrentStack 是完全无锁的,能够支持并发的添加元素,后进先出
/*执行集合数据添加操作*/
static void AddConcurrenProducts()
{
Parallel.For(0, 30000, (i) =>
{
Product product = new Product();
product.Name = "name" + i;
product.Category = "Category" + i;
product.SellPrice = i;
_ConcurrenProducts.Push(product);
});
}
TryPop 尝试移除并返回 和 TryPeek 尝试返回但不移除
其他“http://msdn.microsoft.com/zh-cn/library/system.collections.concurrent(v=vs.110).aspx”
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
参考文章:https://www.cnblogs.com/woxpp/p/3935557.html
推荐您阅读更多有关于“C#多线程,”的文章
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- AY C#高级知识 - 线程 并行 Task 笔记2
- 你应该这样去开发接口:Java多线程并行计算
- C# 并行和多线程编程:认识和使用 Task
- 15分钟读懂进程线程、同步异步、阻塞非阻塞、并发并行,太实用了!
- sqltoy-orm-4.17.6 发版,支持 Greenplum、并行查询可设置并行数量
- PostgreSQL并行查询介绍
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Measure What Matters
John Doerr / Portfolio / 2018-4-24 / GBP 19.67
In the fall of 1999, John Doerr met with the founders of a start-up he’d just given $11.8 million, the biggest investment of his career. Larry Page and Sergey Brin had amazing technology, entrepreneur......一起来看看 《Measure What Matters》 这本书的介绍吧!