内容简介:我AY的总结:每个方法处理数据多的时候,大于10000吧,考虑用这种方式,否则不如顺序方式。不应该抢外面同一个资源,每个方法都可以单独执行。
我AY的总结:
1 Parallel.Invoke(N个需要同时执行的方法);
每个方法处理数据多的时候,大于10000吧,考虑用这种方式,否则不如顺序方式。
不应该抢外面同一个资源,每个方法都可以单独执行。
2Parallel.For(1,100, index=>{}) 只能整数的
跟for循环对比,Parallel.For是无序执行的。
10万循环对比
10万循环的并行for
没感觉到
1万对比
1万并行,
这个取决于CPU了,数据大,并行优势还是有的。
3Parallel.ForEach(集合,(model)=>{})
无序的,可以用来处理无顺序性的任务。
ParallelLoopState
ParallelLoopState该实例提供了以下两个方法用于停止 Parallel.For,Parallel.ForEach
Break-这个方法告诉并行循环应该在执行了当前迭代后尽快地停止执行。吐过调用Break时正在处理迭代100,那么循环仍然会处理所有小于100的迭代。
Stop-这个方法告诉并行循环应该尽快停止执行,如果调用Stop时迭代100正在被处理,那么循环无法保证处理完所有小于100的迭代
Parallel是无序的,所以,Break感觉像过滤器,Stop感觉立即关闭水管了,剩下的都不处理了。
示例代码
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace testmulity
{
class Program
{
static void Main(string[] args)
{
Console.Title = "AY2019";
List<Product> productList = GetProcuctList_500();
Thread.Sleep(3000);
Parallel.For(0, productList.Count, (i, loopState) =>
{
if (i < 100)
{
Console.WriteLine("采用Stop index:{0}", i);
}
else
{
/* 满足条件后 尽快停止执行,无法保证小于100的索引数据全部输出*/
loopState.Stop();
return;
}
});
Thread.Sleep(3000);
Parallel.For(0, productList.Count, (i, loopState) =>
{
if (i < 100)
{
Console.WriteLine("采用Break index:{0}", i);
}
else
{
/* 满足条件后 尽快停止执行,保证小于100的索引数据全部输出*/
loopState.Break();
return;
}
});
Thread.Sleep(3000);
Parallel.ForEach(productList, (model, loopState) =>
{
if (model.SellPrice < 10)
{
Console.WriteLine("采用Stop index:{0}", model.SellPrice);
}
else
{
/* 满足条件后 尽快停止执行,无法保证满足条件的数据全部输出*/
loopState.Stop();
return;
}
});
Thread.Sleep(3000);
Parallel.ForEach(productList, (model, loopState) =>
{
if (model.SellPrice < 10)
{
Console.WriteLine("采用Break index:{0}", model.SellPrice);
}
else
{
/* 满足条件后 尽快停止执行,保证满足条件的数据全部输出*/
loopState.Break();
return;
}
});
Console.ReadLine();
}
private static List<Product> GetProcuctList_500()
{
List<Product> result = new List<Product>();
for (int index = 1; index < 500; index++)
{
Product model = new Product();
model.Category = "Category" + index;
model.Name = "Name" + index;
model.SellPrice = index;
result.Add(model);
}
return result;
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}
}
System.Threading.Tasks.Task
一个Task表示一个异步操作,Task提供了很多方法和属性,通过这些方法和属性能够对Task的执行进行控制,并且能够获得其状态信息。
Task的创建和执行都是独立的,因此可以对关联操作的执行拥有完全的控制权。
使用Parallel.For、Parallel.ForEach的循环迭代的并行执行,TPL会在后台创建System.Threading.Tasks.Task的实例。
使用Parallel.Invoke时,TPL也会创建与调用的委托数目一致的System.Threading.Tasks.Task的实例。
Task生命周期
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace testmulity
{
class Program
{
static void Main(string[] args)
{
Console.Title = "AY2019";
/* 创建一个任务 不调用 不执行 状态为Created */
Task tk = new Task(() =>
{
});
Console.WriteLine(tk.Status.ToString());
/* 创建一个任务 执行 状态为 WaitingToRun */
Task tk1 = new Task(() =>
{
});
tk1.Start();/*对于安排好的任务,就算调用Start方法也不会立马启动 此时任务的状态为WaitingToRun*/
Console.WriteLine(tk1.Status.ToString());
/* 创建一个主任务 */
Task mainTask = new Task(() =>
{
SpinWait.SpinUntil(() =>
{
return false;
}, 30000);
});
/* 将子任务加入到主任务完成之后执行 */
Task subTask = mainTask.ContinueWith((t1) =>
{
});
/* 启动主任务 */
mainTask.Start();
/* 此时子任务状态为 WaitingForActivation */
Console.WriteLine(subTask.Status.ToString());
/* 创建一个任务 执行 后 等待一段时间 并行未结束的情况下 状态为 Running */
Task tk2 = new Task(() =>
{
SpinWait.SpinUntil(() => false, 30000);
});
tk2.Start();/*对于安排好的任务,就算调用Start方法也不会立马启动*/
SpinWait.SpinUntil(() => false, 300);
Console.WriteLine(tk2.Status.ToString());
/* 创建一个任务 然后取消该任务 状态为Canceled */
CancellationTokenSource cts = new CancellationTokenSource();
Task tk3 = new Task(() =>
{
for (int i = 0; i < int.MaxValue; i++)
{
if (!cts.Token.IsCancellationRequested)
{
cts.Token.ThrowIfCancellationRequested();
}
}
}, cts.Token);
tk3.Start();/*启动任务*/
SpinWait.SpinUntil(() => false, 100);
cts.Cancel();/*取消该任务执行 但并非立马取消 所以对于Canceled状态也不会立马生效*/
SpinWait.SpinUntil(() => false, 1000);
Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled);
SpinWait.SpinUntil(() => false, 1000);
Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled);
SpinWait.SpinUntil(() => false, 1000);
Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled);
/*创建一个任务 让它成功的运行完成 会得到 RanToCompletion 状态*/
Task tk4 = new Task(() =>
{
SpinWait.SpinUntil(() => false, 10);
});
tk4.Start();
SpinWait.SpinUntil(() => false, 300);
Console.WriteLine(tk4.Status.ToString());
/*创建一个任务 让它运行失败 会得到 Faulted 状态*/
Task tk5 = new Task(() =>
{
throw new Exception();
});
tk5.Start();
SpinWait.SpinUntil(() => false, 300);
Console.WriteLine(tk5.Status.ToString());
Console.ReadLine();
}
}
}
异步执行多个任务
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace testmulity
{
class Program
{
private static ConcurrentQueue<Product> queue = null;
static void Main(string[] args)
{
Console.Title = "AY2019";
queue = new ConcurrentQueue<Product>();
Task tk1 = new Task(() => { SetProduct(1); SetProduct(3); });
Task tk2 = new Task(() => SetProduct(2));
tk1.Start();
tk2.Start();
Console.ReadLine();
}
static void SetProduct(int index)
{
Parallel.For(0, 10000, (i) =>
{
Product model = new Product();
model.Name = "Name" + i;
model.SellPrice = i;
model.Category = "Category" + i;
queue.Enqueue(model);
});
Console.WriteLine("SetProduct {0} 执行完成", index);
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}
}
Task.WaitAll(N个Task,逗号隔开)
Task.WaitAll 方法,这个方法是同步执行的,在Task作为参数被接受,所有Task结束其执行前,主线程不会继续执行下一条指令
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace testmulity
{
class Program
{
private static ConcurrentQueue<Product> queue = null;
/* coder:释迦苦僧 */
static void Main(string[] args)
{
Console.Title = "AY2019";
queue = new ConcurrentQueue<Product>();
Task tk1 = new Task(() => { SetProduct(1); SetProduct(3); });
Task tk2 = new Task(() => SetProduct(2));
tk1.Start();
tk2.Start();
/*等待任务执行完成后再输出 ====== */
Task.WaitAll(tk1, tk2);
Console.WriteLine("等待任务执行完成后再输出 ======");
Task tk3 = new Task(() => { SetProduct(1); SetProduct(3); });
Task tk4 = new Task(() => SetProduct(2));
tk3.Start();
tk4.Start();
/*等待任务执行前输出 ====== */
Console.WriteLine("等待任务执行前输出 ======");
Task.WaitAll(tk3, tk4);
Console.ReadLine();
}
static void SetProduct(int index)
{
Parallel.For(0, 10000, (i) =>
{
Product model = new Product();
model.Name = "Name" + i;
model.SellPrice = i;
model.Category = "Category" + i;
queue.Enqueue(model);
});
Console.WriteLine("SetProduct {0} 执行完成", index);
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}
}
等待超时
if (!Task.WaitAll(new Task[] { tk1, tk2 }, 10))
{
Console.WriteLine("tk1和tk2在10秒内就完成了");
}
CancellationTokenSource,CancellationToken取消任务
class Program
{
private static ConcurrentQueue<Product> queue = null;
static void Main(string[] args)
{
queue = new ConcurrentQueue<Product>();
System.Threading.CancellationTokenSource token = new CancellationTokenSource();
Task tk1 = Task.Factory.StartNew(() => SetProduct(token.Token));
Task tk2 = Task.Factory.StartNew(() => SetProduct(token.Token));
Thread.Sleep(10);
/*取消任务操作*/
token.Cancel();
try
{
/*等待完成*/
Task.WaitAll(new Task[] { tk1, tk2 });
}
catch (AggregateException ex)
{
/*如果当前的任务正在被取消,那么还会抛出一个TaskCanceledException异常,这个异常包含在AggregateException异常中*/
Console.WriteLine("tk1 Canceled:{0}", tk1.IsCanceled);
Console.WriteLine("tk1 Canceled:{0}", tk2.IsCanceled);
}
Thread.Sleep(2000);
Console.WriteLine("tk1 Canceled:{0}", tk1.IsCanceled);
Console.WriteLine("tk1 Canceled:{0}", tk2.IsCanceled);
Console.ReadLine();
}
static void SetProduct(System.Threading.CancellationToken ct)
{
/* 每一次循环迭代,都会有新的代码调用 ThrowIfCancellationRequested
* 这行代码能够对 OpreationCanceledException 异常进行观察
* 并且这个异常的标记与Task实例关联的那个标记进行比较,如果两者相同 ,而且IsCancelled属性为True,那么Task实例就知道存在一个要求取消的请求,并且会将状态转变为Canceled状态,中断任务执行。
* 如果当前的任务正在被取消,那么还会抛出一个TaskCanceledException异常,这个异常包含在AggregateException异常中
/*检查取消标记*/
ct.ThrowIfCancellationRequested();
for (int i = 0; i < 50000; i++)
{
Product model = new Product();
model.Name = "Name" + i;
model.SellPrice = i;
model.Category = "Category" + i;
queue.Enqueue(model);
ct.ThrowIfCancellationRequested();
}
Console.WriteLine("SetProduct 执行完成");
}
在调用的方法,传入System.Threading.CancellationTokenSource的示例
Task异常
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace testmulity
{
class Program
{
private static ConcurrentQueue<Product> queue = null;
static void Main(string[] args)
{
queue = new ConcurrentQueue<Product>();
System.Threading.CancellationTokenSource token = new CancellationTokenSource();
Task tk1 = Task.Factory.StartNew(() => SetProduct(token.Token));
Thread.Sleep(2000);
if (tk1.IsFaulted)
{
/* 循环输出异常 */
foreach (Exception ex in tk1.Exception.InnerExceptions)
{
Console.WriteLine("tk1 Exception:{0}", ex.Message);
}
}
Console.ReadLine();
}
static void SetProduct(System.Threading.CancellationToken ct)
{
for (int i = 0; i < 5; i++)
{
throw new Exception(string.Format("Exception Index {0}", i));
}
Console.WriteLine("SetProduct 执行完成");
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}
}
根据声明周期来的,通过CancellationTokenSource的实例知道状态
Task返回值
static void Main(string[] args)
{
Task<List<Product>> tk1 = Task<List<Product>>.Factory.StartNew(() => SetProduct());
Task.WaitAll(tk1);
Console.WriteLine(tk1.Result.Count);
Console.WriteLine(tk1.Result[0].Name);
Console.ReadLine();
}
static List<Product> SetProduct()
{
List<Product> result = new List<Product>();
for (int i = 0; i < 500; i++)
{
Product model = new Product();
model.Name = "Name" + i;
model.SellPrice = i;
model.Category = "Category" + i;
result.Add(model);
}
Console.WriteLine("SetProduct 执行完成");
return result;
}
Task的泛型T就是返回值类型,使用Result获得
ContinueWith 串联多个任务,有点linq。
Task t2 = t1.ContinueWith((t) =>
{
Console.WriteLine("执行 t2 任务");
SpinWait.SpinUntil(() =>
{
return false;
}, 2000);
});
/*创建任务t3 t3任务的执行 依赖与t2任务的执行完成*/
Task t3 = t2.ContinueWith((t) =>
{
Console.WriteLine("执行 t3 任务");
});
任务2执行完再执行任务3
TaskContinuationOptions
TaskContinuationOptions.NotOnFaulted 在上一个任务失败时候,不执行当前任务
TaskContinuationOptions.NotOnCanceled 同理上一个任务取消时候,不执行当前任务
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace testmulity
{
class Program
{
static void Main(string[] args)
{
/*创建任务t1*/
Task t1 = Task.Factory.StartNew(() =>
{
Console.WriteLine("执行 t1 任务");
SpinWait.SpinUntil(() =>
{
return false;
}, 2000);
throw new Exception("异常");
});
/*创建任务t2 t2任务的执行 依赖与t1任务的执行完成*/
Task t2 = t1.ContinueWith((t) =>
{
Console.WriteLine(t.Status);
Console.WriteLine("执行 t2 任务");
SpinWait.SpinUntil(() =>
{
return false;
}, 2000);
}, TaskContinuationOptions.NotOnFaulted);
/*创建任务t3 t3任务的执行 依赖与t2任务的执行完成*/
Task t3 = t2.ContinueWith((t) =>
{
Console.WriteLine(t.Status);
Console.WriteLine("执行 t3 任务");
},TaskContinuationOptions.NotOnCanceled);
Console.WriteLine("结束");
Console.ReadLine();
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}
}
1失败了,2被取消,所以3不执行。
改成longRunning以后
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
并行集合
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace testmulity
{
class Program
{
private static object o = new object();
private static List<Product> _Products { get; set; }
/*
* 代码中 创建三个并发线程 来操作_Products 集合
* System.Collections.Generic.List 这个列表在多个线程访问下,不能保证是安全的线程,所以不能接受并发的请求,我们必须对ADD方法的执行进行串行化
*/
static void Main(string[] args)
{
_Products = new List<Product>();
/*创建任务 t1 t1 执行 数据集合添加操作*/
Task t1 = Task.Factory.StartNew(() =>
{
AddProducts();
});
/*创建任务 t2 t2 执行 数据集合添加操作*/
Task t2 = Task.Factory.StartNew(() =>
{
AddProducts();
});
/*创建任务 t3 t3 执行 数据集合添加操作*/
Task t3 = Task.Factory.StartNew(() =>
{
AddProducts();
});
Task.WaitAll(t1, t2, t3);
Console.WriteLine(_Products.Count);
Console.ReadLine();
}
/*执行集合数据添加操作*/
static void AddProducts()
{
Parallel.For(0, 1000, (i) =>
{
Product product = new Product();
product.Name = "name" + i;
product.Category = "Category" + i;
product.SellPrice = i;
_Products.Add(product);
});
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}
}
代码中开启了三个并发操作,每个操作都向集合中添加1000条数据,在没有保障线程安全和串行化的运行下,实际得到的数据并没有3000条,结果如下
使用lock集合
锁带来性能损失
System.Collections.Concurrent
线程安全并不是没有代价的,比起System.Collenctions和System.Collenctions.Generic命名空间中的列表、集合和数组来说,并发集合会有更大的开销。因此,应该只在需要从多个任务中并发访问集合的时候才使用并发几个,在串行代码中使用并发集合是没有意义的,因为它们会增加无谓的开销。
为此,在.NET Framework中提供了System.Collections.Concurrent新的命名空间可以访问用于解决线程安全问题,通过这个命名空间能访问以下为并发做好了准备的集合。
1.BlockingCollection 与经典的阻塞队列数据结构类似,能够适用于多个任务添加和删除数据,提供阻塞和限界能力。
2.ConcurrentBag 提供对象的线程安全的无序集合
3.ConcurrentDictionary 提供可有多个线程同时访问的键值对的线程安全集合
4.ConcurrentQueue 提供线程安全的先进先出集合
5.ConcurrentStack 提供线程安全的后进先出集合
这些集合通过使用比较并交换和内存屏障等技术,避免使用典型的互斥重量级的锁,从而保证线程安全和性能。
ConcurrentQueue是完全无锁的,能够支持并发的添加元素,先进先出。下面贴代码,详解见注释:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace testmulity
{
class Program
{
private static object o = new object();
/*定义 Queue*/
private static Queue<Product> _Products { get; set; }
private static ConcurrentQueue<Product> _ConcurrenProducts { get; set; }
/*
* 代码中 创建三个并发线程 来操作_Products 和 _ConcurrenProducts 集合,每次添加 10000 条数据 查看 一般队列Queue 和 多线程安全下的队列ConcurrentQueue 执行情况
*/
static void Main(string[] args)
{
Thread.Sleep(1000);
_Products = new Queue<Product>();
Stopwatch swTask = new Stopwatch();
swTask.Start();
/*创建任务 t1 t1 执行 数据集合添加操作*/
Task t1 = Task.Factory.StartNew(() =>
{
AddProducts();
});
/*创建任务 t2 t2 执行 数据集合添加操作*/
Task t2 = Task.Factory.StartNew(() =>
{
AddProducts();
});
/*创建任务 t3 t3 执行 数据集合添加操作*/
Task t3 = Task.Factory.StartNew(() =>
{
AddProducts();
});
Task.WaitAll(t1, t2, t3);
swTask.Stop();
Console.WriteLine("List<Product> 当前数据量为:" + _Products.Count);
Console.WriteLine("List<Product> 执行时间为:" + swTask.ElapsedMilliseconds);
Thread.Sleep(1000);
_ConcurrenProducts = new ConcurrentQueue<Product>();
Stopwatch swTask1 = new Stopwatch();
swTask1.Start();
/*创建任务 tk1 tk1 执行 数据集合添加操作*/
Task tk1 = Task.Factory.StartNew(() =>
{
AddConcurrenProducts();
});
/*创建任务 tk2 tk2 执行 数据集合添加操作*/
Task tk2 = Task.Factory.StartNew(() =>
{
AddConcurrenProducts();
});
/*创建任务 tk3 tk3 执行 数据集合添加操作*/
Task tk3 = Task.Factory.StartNew(() =>
{
AddConcurrenProducts();
});
Task.WaitAll(tk1, tk2, tk3);
swTask1.Stop();
Console.WriteLine("ConcurrentQueue<Product> 当前数据量为:" + _ConcurrenProducts.Count);
Console.WriteLine("ConcurrentQueue<Product> 执行时间为:" + swTask1.ElapsedMilliseconds);
Console.ReadLine();
}
/*执行集合数据添加操作*/
static void AddProducts()
{
Parallel.For(0, 30000, (i) =>
{
Product product = new Product();
product.Name = "name" + i;
product.Category = "Category" + i;
product.SellPrice = i;
lock (o)
{
_Products.Enqueue(product);
}
});
}
/*执行集合数据添加操作*/
static void AddConcurrenProducts()
{
Parallel.For(0, 30000, (i) =>
{
Product product = new Product();
product.Name = "name" + i;
product.Category = "Category" + i;
product.SellPrice = i;
_ConcurrenProducts.Enqueue(product);
});
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}
}
提供了 TryPeek和TryDequeue方法
用法同Queue差不多
ConcurrentStack 是完全无锁的,能够支持并发的添加元素,后进先出
/*执行集合数据添加操作*/
static void AddConcurrenProducts()
{
Parallel.For(0, 30000, (i) =>
{
Product product = new Product();
product.Name = "name" + i;
product.Category = "Category" + i;
product.SellPrice = i;
_ConcurrenProducts.Push(product);
});
}
TryPop 尝试移除并返回 和 TryPeek 尝试返回但不移除
其他“http://msdn.microsoft.com/zh-cn/library/system.collections.concurrent(v=vs.110).aspx”
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
参考文章:https://www.cnblogs.com/woxpp/p/3935557.html
推荐您阅读更多有关于“C#多线程,”的文章
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- AY C#高级知识 - 线程 并行 Task 笔记2
- 你应该这样去开发接口:Java多线程并行计算
- C# 并行和多线程编程:认识和使用 Task
- 15分钟读懂进程线程、同步异步、阻塞非阻塞、并发并行,太实用了!
- sqltoy-orm-4.17.6 发版,支持 Greenplum、并行查询可设置并行数量
- PostgreSQL并行查询介绍
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
XML Hacks
Michael Fitzgerald / O'Reilly Media, Inc. / 2004-07-27 / USD 24.95
Developers and system administrators alike are uncovering the true power of XML, the Extensible Markup Language that enables data to be sent over the Internet from one computer platform to another or ......一起来看看 《XML Hacks》 这本书的介绍吧!