本节内容
软件开发过程中, 多线程编码方面的工作看起来比较棘手的;由多线程的 传值、取值、资源同步、线程暂停、取消等操作会困扰每一个尝试编写此类代码的程序员。微软也做了许多努力,现在在FCL中有丰富的API可供选择,以便编写多线程代码;
我们通过具体的例子来熟悉异步,多线程,任务和并行。要了解到异步的本质,任务的实质,以及为什么有了任务还需要一个并行类(Parallel)等问题。 同时要学习到如何优雅的控制线程, 并且处理任务和并行中的异常;
多线程是开发人员进阶的一个坎,尝试着学习克服它;
一些建议
建议71:区分异步和多线程应用场景
对于有一个WinForm界面, 上面有一个按钮,单击这个按钮就可以获取网页的内容并显示出来; 如果该网页内容很多,或者网络状况不好, 这个获取的过程耗时比较长, 这个时候如果顺序地处理界面有可能就会被卡住, 那我们可能会想到新起工作线程的方法来完成这项工作,这样在等待网友内容返回的过程中WinForm就不会被卡住了;写下如下的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
using System;
using System.IO;
using System.Net;
using System.Threading;
using System.Windows.Forms;
namespace WindowsFormsApp1
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private void buttonGetPage_Click(object sender, EventArgs e)
{
Thread t = new Thread(() =>
{
var url = "https://www.cnblogs.com/luminji/";
var request = HttpWebRequest.Create(url);
var response = request.GetResponse();
var stream = response.GetResponseStream();
using (StreamReader reader = new StreamReader(stream))
{
var content = reader.ReadLine();
textBoxPage.Text = content;
}
});
t.Start();
}
}
}
|
的确,上面的代码解决了页面阻滞的问题, 但是, 它高效吗? 答案是否定的;
要理解这一点, 我们需要先来了解IO操作的DMA模式(Direct Memory Access),即直接内存访问, 是一种不经过CPU而直接进行内存数据存储的数据交换模式。通过DMA的数据交换几乎可以不损耗CPU的资源。在硬件中,硬盘,网卡,网卡,声卡,显卡等都有DMA功能。CLR所提供的异步编程模型就是让我们充分利用硬件的DMA功能来释放CPU的压力;
为了获取网页, CLR新起了一个工作线程, 然后在读取网页的整个过程中,该工作线程始终阻滞着,直到获取网页完毕为止;在整个过程中,工作线程被占用着,这意味着系统的资源始终被消耗着,等待着;
如果我们修改代码, 使用异步模式来实现, 代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
using System;
using System.IO;
using System.Net;
using System.Threading;
using System.Windows.Forms;
namespace WindowsFormsApp1
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private void buttonGetPage_Click(object sender, EventArgs e)
{
var url = "https://www.cnblogs.com/luminji/";
var request = HttpWebRequest.Create(url);
request.BeginGetResponse(this.AsyncCallbackImpl, request);
}
private void AsyncCallbackImpl(IAsyncResult ar)
{
WebRequest request = ar.AsyncState as WebRequest;
var response = request.EndGetResponse(ar);
var stream = response.GetResponseStream();
using (StreamReader reader = new StreamReader(stream))
{
var content = reader.ReadLine();
textBoxPage.Text = content;
}
}
}
}
|
上述代码的工作机制如下图所示:
经过修改的代码采用了异步的模式, 它使用线程池来管理。新起异步操作后, CLR会将工作丢给线程池的某个工作线程来完成。当开始I/O操作的时候,异步会将工作线程归还给线程池, 这时候就像等于获取网页的工作不会占用任何CPU了。直到异步完成,可见,异步模式借助于线程池, 极大地节约了CPU的资源;
通过上述示例,大概明白了异步和多线程的区别, 进一步我们来确定两者的应用场景:
- 计算密集型工作,采用多线程;
- IO密集型工作,采用异步机制;(涉及到网络、磁盘IO的任务都是IO密集型任务)
建议72:在线程同步中使用信号量
线程同步就是指多个线程在某个对象上执行等待(也可理解为锁定该对象),直到该对象被解除锁定。C#中对象的类型分为引用类型和值类型, CLR在这两种类型上的等待是不一样的;简单的理解就是, 在CLR中,值类型是不能被锁定的,即不能再一个值类型对象上进行等待;而再引用类型上的等待机制,又分为两类: 锁定和信号同步;
锁定使用关键字lock和Monitor, 两者实质没区别, 后者其实是语法糖, 这是常用的同步技术;
信号同步机制中涉及的类型都继承自抽象类WaitHandle, 这些类型有EventWaitHandle(类型化为AutoResetEvent, ManualResetESvent)、Semaphore以及Mutex。
EventWaitHandle子类为AutoResetEvent, ManualResetESvent, Semaphore以及Mutex都继承自WaitHandle,所以它们底层原理是一致的,维护的都是一个系统内核句柄,我们需要简单地为这3个类型做个区分:
- EventWaitHandle维护一个由内核产生的布尔类型对象(称为"阻滞状态"),如果其值为false, 那么在它上面等待的线程就阻塞。可以调用类型的Set方将其值设置为true, 解除阻塞; EventWaitHandle子类为AutoResetEvent, ManualResetESvent, 它们的区别不大;
- Semaphore维护一个内核产生的整型变量,如果其值为0,则在它上面等待的线程就会阻塞;如果其值大于0,则解除阻塞, 同时, 每解除一个线程阻塞,其值就减1;
- EventWaitHandle和Semaphore提供的都是单应用程序域内的线程同步功能,Mutex则不同,它为我们提供了跨应用程序域阻塞和解除阻塞线程的能力;
下面看一个例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
AutoResetEvent autoResetEvent = new AutoResetEvent(false);
private void buttonStartAThread_Click(object sender, EventArgs e)
{
Thread tWork = new Thread(() =>
{
label1.Text = "线程启动..." + Environment.NewLine;
label1.Text += "开始处理一些实际的工作" + Environment.NewLine;
//省略工作代码
label1.Text += "我开始等待别的线程给我信号,才愿意继续下去" + Environment.NewLine;
autoResetEvent.WaitOne();
label1.Text += "我继续做一些工作,然后结束了!";
//省略工作代码
});
tWork.IsBackground = true;
tWork.Start();
}
|
AutoResetEvent autoResetEvent = new AutoResetEvent(false);
这段代码创建了一个同步类型对象autoResetEvent, 它设置自己的默认阻滞状态是false。这意味着任何在它上面进行等待的进程都将被阻滞,所谓等待,就是在线程中应用:
autoResetEvent.WaitOne();
AutoResetEvent和ManualResetESvent的区别是, 前者在发送信号完毕后(即调用Set方法),会自动将自己的阻滞状态设置为false, 而后者则需要进行手动设定。
建议73:避免锁定不恰当的同步对象
在C#中, 让线程同步的另一种编码的方式就是使用线程锁。线程锁的原理,就是锁住一个资源,使得应用程序在此刻只有一个线程访问该资源。通俗的说,就是多线程变成单线程了,在C#中,可以把锁住的资源理解为new出来的普通的CLR对象;
既然需要锁定的对象就是C#中的一个对象, 那么什么样的对象能够成为锁对象(同步对象)呢?在选择同步对象的时候,应当始终注意以下几点:
- 同步对象在需要同步的多个线程中是可见的同一个对象;
- 在非静态方法中,静态变量不应作为同步对象;
- 值类型对象不能作为同步对象;
- 避免将字符串作为同步对象;
- 降低同步对象的可见性;
Note 1: 需要锁定的对象是在多个线程中是可见的, 而且是同一个对象。模拟一下啊必须用到锁的场景: 在遍历一个集合的过程中, 同时在另外一个线程中删除集合中的某项。(同时对集合又读又写),如果没有lock语句,将会抛出异常,System.InvalidOperationException 集合已修改;可能无法执行枚举操作
Note 2: 在非静态方法中,静态变量不应作为同步对象,要修正第一个note里面的示例问题(生成的两个实例不是同一个对象),这个时候只需要把syncObject变成static。
在编写多线程代码时, 要遵循这样的原则: 类型的静态方法应当保证线程安全,非静态方法不需要实现线程安全;
Note 3:值类型对象不能作为同步对象。值类型在传递给另一个线程的时候,会创建一个副本,这相当于每个线程锁定的也是两个对象。因此,值类型对象不能作为同步对象;
Note 4:锁定字符串完全没有必要, 且很危险;这个过程看起来和值类型正好相反,字符串在CLR中会被暂存到内存中,如果有两个变量分配了相同内容的字符串,那么这两个引用会被指向同一块内存, 所以两个地方同时使用了lock, 其实锁定的是同一个对象,这会导致整个程序被阻滞;
Note 5: 降低同步对象的可见性;可见范围最广的一种同步对象是typeof(SampleClass)
, typeof方法所返回的结果是SampleClass所有实例共有的,所有实例的type都指typeof方法的结果;在编写代码的时候,应当始终考虑降低同步对象的可见性, 将同步对象隐藏起来, 只开放给自己或者自己的子类(实际上需要开放给自己的子类的情况都不多);
建议74:警惕线程的IsBackground
在CLR中, 线程分为前台线程和后台线程, 即每个线程都有一个IsBackground属性。两者在表现形式上的唯一区别是:如果前台线程不退出,应用程序的进程就会一直存在,必须所有的前台线程全部退出,应用程序才算退出。而后台线程则没有这个方面的限制,如果应用程序退出,后台线程也会一并退出;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
using System;
using System.Threading;
namespace Advice74
{
internal class Program
{
static void Main(string[] args)
{
Thread t = new Thread(() =>
{
Console.WriteLine("线程开始工作...");
// 省略工作代码
Console.ReadKey();
Console.WriteLine("线程结束");
});
// 注意, 默认为false
t.IsBackground = false;
t.Start();
Console.WriteLine("主线程完毕!");
}
}
}
|
用Thread创建的线程默认是前台线程, 也就是IsBackground默认属性为false, 上面的代码运行起来要敲一个按键应用程序才会结束; 而如果设置IsBackground为true, 应用程序就会立刻结束;
代码demo使用的是Thread, 但我们要注意线程池中的线程默认都是后台线程;
基于前后台的线程的区别, 在实际编码中应该更多地使用后台线程。只有在非常关键的工作中, 如线程正在执行事务或占有的某些非托管资源需要释放时,才使用前台线程;
建议75:警惕线程不会立即启动
现代的大多数操作系统都不是实时的操作系统, Windows操作系统亦是如此;因此,线程并不是立即启动的,Windows内部会实现特殊的算法以进行线程之间的调度,在某个具体的时刻,它会决定当前应该运行哪个线程,反映到最底层就是某个线程分配到了一定的CPU时间, 可用来执行一小段工作(由于被分配的CPU时间很短,即使操作系统中运行了上千个线程, 我们也会觉得这些应用程序是在同时执行的)。Windows会选择在适当时间根据自己的算法决定下一段CPU时间如何调度。
线程的调度是相当复杂的一个过程,我们需要理解的是: 线程之间的调度占有一定的时间和空间开销,并且不是实时的;demo(将0-9分别传给10个不同的线程)如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
using System;
using System.Threading;
namespace Advice75
{
internal class Program
{
static int _id = 0;
static void Main(string[] args)
{
for (int i = 0; i < 10; i++, _id++)
{
Thread t = new Thread(() =>
{
Console.WriteLine(string.Format("{0}:{1}", Thread.CurrentThread.Name, _id));
});
t.Name = string.Format("Thread{0}", i);
t.IsBackground = true;
t.Start();
}
Console.ReadLine();
}
}
}
// 代码可能输出的结果是:
Thread0:1
Thread2:2
Thread1:2
Thread3:4
Thread4:5
Thread5:5
Thread6:7
Thread7:8
Thread8:9
Thread9:9
|
这段代码从两个方面验证了线程不是立即启动的;
首先, 线程并没有按照顺序启动;在代码逻辑中, 前面Start的那个线程也许会迟于后面Start的那个线程执行;
其次,传入线程内部的ID值,不再是for循环执行中当前的ID值。以Thread7为例, 在for循环中, 其当前的值为7, 而Thread7真正得到执行的时候,ID却已经跳出循环,早已经变成8了;
如果我们想要得到需求正确的代码, 需要把上面的for循环修改成为一段同步代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
using System;
using System.Threading;
namespace Advice75
{
internal class Program
{
static int _id = 0;
static void Main(string[] args)
{
for (int i = 0; i < 10; i++, _id++)
{
NewMethod(i, _id);
}
Console.ReadLine();
}
private static void NewMethod(int i, int realTimeID)
{
Thread t = new Thread(() =>
{
Console.WriteLine(string.Format("{0}:{1}", Thread.CurrentThread.Name, realTimeID));
});
t.Name = string.Format("Thread{0}", i);
t.IsBackground = true;
t.Start();
}
}
}
// 可能的一个运行结果是:
Thread0:0
Thread2:2
Thread1:1
Thread3:3
Thread4:4
Thread5:5
Thread6:6
Thread7:7
Thread8:8
Thread9:9
|
虽然线程还是保持了不会立即启动的特点, 但是传入线程的ID值, 由于在for循环内部变成了同步代码, 所以能正确传入;
建议76:警惕线程的优先级
线程在C#中有5个优先级, Highest, AboveNormal, Normal, BelowNormal和Lowest。讲到线程的优先级, 就会涉及线程的调度。Windows是一个基于优先级的抢占式调度系统。在系统中, 如果有一个线程的优先级较高,并且它正好处在就绪状态, 系统总是会优先运行该线程;换句话说, 高优先级的线程总是在系统调度算法中获取更多的CPU执行时间;
我们在一个单CPU系统中测试下面的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
using System;
using System.Threading;
namespace Advice76
{
internal class Program
{
static void Main(string[] args)
{
long t1Num = 0;
long t2Num = 0;
CancellationTokenSource cts = new CancellationTokenSource();
Thread t1 = new Thread(() =>
{
while (true && !cts.Token.IsCancellationRequested)
{
t1Num++;
}
});
t1.IsBackground = true;
t1.Priority = ThreadPriority.Highest;
t1.Start();
Thread t2 = new Thread(() =>
{
while (true && !cts.Token.IsCancellationRequested)
{
t2Num++;
}
});
t2.IsBackground = true;
t2.Start();
Console.ReadLine();
// 停止线程
cts.Cancel();
Console.WriteLine("t1Num:" + t1Num.ToString());
Console.WriteLine("t2Num:" + t2Num.ToString());
}
}
}
|
(结论摘抄)结果我们会发现, 如果这段程序运行在单核计算机中, 优先级为Highest的线程t1, 其输出值几乎总是会大于优先级为Normal(默认)的线程t2;
在C#中, 使用Thread和ThreadPool新起的线程, 默认的优先级都是Normal。虽然可以像上述代码去修改线程的优先,但是一般不建议这么做;当然, 如果是一些非常关键的线程, 还是可以提升线程的优先级的; 这些关键线程应当具有运行时间短,能即刻进入等待状态等特征。
建议77:正确停止线程
devloper总希望对代码有更多的一些控制, 例如, ”让那个还在工作的线程马上停止下来“, 但是实际上, 不是我们想怎么样的就能怎么样, 至少得处理好这两个问题;
第一个问题 正如线程不能立即启动一样, 线程也不是说停就停的。无论采用何种方式通知工作线程需要停止, 工作线程都会忙完手头最紧要的活,然后在它觉得合适的时候退出。以最传统的Thread.Abort方法为例, 如果线程当前正在执行的是一段非托管的代码, 那么CLR就不会抛出ThreadAbortException,只有在代码继续回到CLR中时,只有当代码继续回到CLR中时,才会引发ThreadAbortException。当然, 即便是在CLR环境中, ThreadAbortException也不会立即触发;
第二个问题 要正确停止线程,不在于调用者采取了什么行为(如最开始的Thread.Abort()方法),而更多依赖于工作线程是否能主动响应调用者的停止请求。大体机制是,如果线程需要被停止, 那么线程自身就应该给调用者开放这样的接口: Canceled。线程在工作的同时, 还要以某种频率检测Canceled标识,若检测到Canceled,线程自己才会负责退出;
FCL现在为我们提供了标准的取消模式: 协作式取消(Cooperative Cancellation)。协作式取消的机制就是上文第二个问题所提到的机制。下面是一个最基础的协作式取消的示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
using System;
using System.Threading;
namespace Advice77
{
internal class Program
{
static void Main(string[] args)
{
CancellationTokenSource cts = new CancellationTokenSource();
Thread t = new Thread(() =>
{
while (true)
{
if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("线程被终止! ");
break;
}
Console.WriteLine(DateTime.Now.ToString());
Thread.Sleep(1000);
}
});
t.Start();
Console.ReadLine();
cts.Cancel();
}
}
}
|
调用者使用CancellationTokenSource的Cancel方法通知工作线程退出。工作线程则以1000ms
的频率一边工作,一边检查是否有外界传入的Cancel信号,若有这样的信号,则退出;可以看到,在正确停止线程的机制中,真正起到重要作用的是线程本身。示例中工作代码比较简单,但足以说明问题;更复杂的计算式工作,也应该用这样一种方式,妥善而正确地处理问题;
协作式取消中的关键类型是CancellationTokenSource, 它有一个关键的属性Token, Token是一个名为CancellationToken的值类型, CancellationToken继而进一步提供了bool值的属性IsCancellationRequested
作为需要取消工作的标识, CancellationToken还有一个方法尤其值得注意,那就是Register方法。它负责传递一个Action委托,在线程停止的时候被回调, 使用方法如下:
1
2
3
4
|
cts.Token.Register(() =>
{
Console.WriteLine("工作线程被终止了.");
});
|
例子中使用了Thread进行了演示, 如果使用ThreadPool也是一样的模式。对于任务的Task, 它依赖于CancellationTokenSource和 CancellationToken完成了所有的取消控制;
建议78:应避免线程数量过多
多数情况下, 创建过多的线程意味者应用程序的架构设计可能存在着缺陷。那么可能有这么一个问题,一个应用程序中应该有多少线程是合理的。以PC机为例, 打开Windows的任务管理器,看看操作系统中正在运行的程序有多少个线程;
从本人的PC机来看, 线程数教多的应用软件是360安全卫士软件, 一共有152个线程数;Windows自身的System进程,当前有167个线程;接着是WPS的124个线程, 剩下的是80多, 60多个线程的进程, 还有60多个进程, 每个进程平均10来个线程的样子。 因此来讲,大部分应用程序的线程数不会太多;
错误地创建过多线程的一个典型的例子是:
为每一个Socket连接建立一个线程去管理。每个连接一个线程,意味着在32位系统的服务器不能同时管理超过约1000台的客户机。CLR为每个线程分配的内存会超过1MB
。约1000个线程,加上.NET进程启动本身所占用的一些内存,即刻就耗尽了系统能分配给进程的最大可用地址空间2GB
。即便应用程序在设计之初的需求设计书中说明,生产环境中客户端数目不会超过500台,在管理这500台客户端时进行线程上下文切换,也会损耗相当多的CPU时间。这类I/O密集型场合应该使用异步去完成(请参考建议71的相关阐述)。
过多的线程还会带来另外的问题:新起的线程可能需要等待相当长的时间才会真正运行。这是一个相当无奈的结果,在多数情况下,我们都不能忍受等待这么长时间。以下的这段测试代码,在我的系统中,经过了大概5s的时间,才运行到了线程T201:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
using System;
using System.Threading;
namespace Advice78
{
internal class Program
{
static void Main(string[] args)
{
for (int i = 0; i < 200; i++)
{
Thread t = new Thread(() =>
{
int j = i;
while (true)
{
j++;
}
t.IsBackground = true;
t.Start();
});
}
Thread.Sleep(5000);
Thread T201 = new Thread(() =>
{
while (true)
{
Console.WriteLine("T201正在执行!");
}
});
T201.Start();
Console.ReadKey();
}
}
}
|
除了启动问题外,线程之间的切换也存在同样的问题,T201的下一次执行,还会等待相当长的时间。
所以,不要滥用线程,尤其不要滥用过多的线程。当新起线程的时候,需要仔细思考这项工作是否真的需要新起线程去完成。即使真的需要线程也应该考虑使用线程池技术。例如本建议所提到的Socket连接这样的I/O密集型场合,应当始终考虑使用异步来完成。异步会在后台使用线程池进行管理。1000台客户端在使用了异步技术后,实际只要几个线程就能完成所有的管理工作(具体取决于“心跳频率”)。
建议79:使用ThreadPool或BackgroundWorker代替Thread
使用线程能极大地提升用户体验度,但是作为开发者应该注意到,线程的开销是很大的。
线程的空间开销来自:
-
线程内核对象(Thread Kernel Object)。每个线程都会创建一个这样的对象,它主要包含线程上下文信息,在32位系统中,它所占用的内存在700字节左右。
-
线程环境块(Thread Environment Block)。TEB包括线程的异常处理链,32位系统中占用4KB
内存。
-
用户模式栈(User Mode Stack),即线程栈。线程栈用于保存方法的参数、局部变量和返回值。每个线程栈占用1024KB
的内存。要用完这些内存很简单,写一个不能结束的递归方法,让方法参数和返回值不停地消耗内存,很快就会发生OutOfMemoryException。
-
内核模式栈(Kernel Mode Stack)。当调用操作系统的内核模式函数时,系统会将函数参数从用户模式栈复制到内核模式栈。在32位系统中,内核模式栈会占用12KB
内存。
线程的时间开销来自:
- 线程创建的时候,系统相继初始化以上这些内存空间。
- 接着CLR会调用所有加载
DLL
的DLLMain
方法,并传递连接标志(线程终止的时候,也会调用DLL
的DLLMain
方法,并传递分离标志)。
- 线程上下文切换。一个系统中会加载很多的进程,而一个进程又包含若干个线程。但是一个CPU在任何时候都只能有一个线程在执行。为了让每个线程看上去都在运行,系统会不断地切换“线程上下文”:每个线程大概得到几十毫秒的执行时间片,然后就会切换到下一个线程了。这个过程大概又分为以下5个步骤:
-
步骤1 进入内核模式。
-
步骤2 将上下文信息(主要是一些CPU 寄存器信息)保存到正在执行的线程内核对象上。
-
步骤3 系统获取一个 Spinlock,并确定下一个要执行的线程,然后释放 Spinlock。如果下一个线程不在同一个进程内,则需要进行虚拟地址交换。
-
步骤4 从将被执行的线程内核对象上载入上下文信息。
-
步骤5 离开内核模式。
由于要进行如此多的工作, 所以创建和销毁一个线程的代价是昂贵的, 为了程序员无节制地使用线程, 微软开发了”线程池“的技术。简单来说, 线程池就是替开发人员管理工作线程。当一项工作完毕时,CLR不会摧毁这个线程,而是会保留这个线程一段时间, 看看有没有别的工作需要这个线程。至于何时销毁或新起线程,由CLR根据自身的算法来做这个决定。所以,如果我们要多线程编码,不应想到:
1
2
3
4
5
6
|
Thread t = new Thread(() =>
{
// work code
t.IsBackground = true;
t.Start();
});
|
应该首先想到:
1
2
3
4
|
ThreadPool.QueueUserWorkItem((objState) =>
{
//工作代码
}, null);
|
线程池能让我们更关注到业务的实现, 而不是线程的性能测试。
书中的此条建议还提到一个类型 BackgroundWorker, BackgroundWorker是在内部使用了线程池的技术;同时,在Winform或WPF编码中,它还给工作线程和UI线程提供了交互的能力。如果我们稍加注意,就会发现:Thread和ThreadPool默认都没有提供这种交互能力,而BackgroundWorker却通过事件提供了这种能力。这种能力包括:报告进度、支持完成回调、取消任务、暂停任务等。一个使用BackgroundWorker的简单示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
using System.ComponentModel;
using System.Threading;
using System.Windows.Forms;
namespace Advice79
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private BackgroundWorker worker;
private void startAsyncButton_Click(System.Object sender,
System.EventArgs e)
{
worker.DoWork += new DoWorkEventHandler(worker_DoWork);
worker.ProgressChanged += new ProgressChangedEventHandler(worker_ProgressChanged);
worker.RunWorkerAsync();
}
private void worker_DoWork(object sender, DoWorkEventArgs e)
{
BackgroundWorker worker = sender as BackgroundWorker;
for (int i = 0; i < 10; i++)
{
worker.ReportProgress(i);
Thread.Sleep(100);
}
}
private void worker_ProgressChanged(object sender, ProgressChangedEventArgs e)
{
this.label1.Text = e.ProgressPercentage.ToString();
}
}
}
|
此示例是一个Winform
程序, 从事Winform
或WPF开发的程序员,可考虑使用BackgroundWorker;
建议80:用Task代替ThreadPool
ThreadPool相对于Thread来说有很多优势,但是ThreadPool使用起来却存在一定的不方便, 比如:
- ThreadPool不支持线程的取消、完成、失败通知等交互性操作。
- ThreadPool不支持线程执行的先后次序。
以往,如果开发者要实现上述功能,需要完成很多额外的工作。现在,FCL中提供了一个功能更强大的概念:Task。Task在线程池的基础上进行了优化,并提供了更多的API。在FCL 4.0中,如果我们要编写多线程程序,Task显然已经优于传统的方式了。
下面看一个简单的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Advice80
{
internal class Program
{
static void Main(string[] args)
{
Task t = new Task(() =>
{
Console.WriteLine("任务开始工作...");
// 模拟工作过程
Thread.Sleep(5000);
});
t.Start();
t.ContinueWith((task) =>
{
Console.WriteLine("任务完成, 完成时候的状态为: ");
Console.WriteLine("IsCanceled={0}\tIsCompleted={1}\tIsFaulted={2}", task.IsCanceled, task.IsCompleted, task.IsFaulted);
});
Console.ReadLine();
}
}
}
|
任务Task具备以下属性, 可以让我们查询任务完成时的状态:
IsCanceled
因为被取消而完成
IsCompleted
成功完成
IsFaulted
因为发生异常而完成
需要注意的是, 任务没有提供回调事件来通知完成(像BackgroundWorker一样),它是通过启用一个新任务的方式来完成类似的功能。ContinueWith
方法可以在一个任务完成的时候发起一个新任务,这种方式天然就支持了任务的完成通知:我们可以在新任务中获取原任务的结果值。
下面是一个稍微复杂的例子, 同时支持完成通知、取消、获取任务返回值等功能:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Advice80
{
internal class Program
{
static void Main(string[] args)
{
CancellationTokenSource cts = new CancellationTokenSource();
Task<int> t = new Task<int>(() => Add(cts.Token), cts.Token);
t.Start();
t.ContinueWith(TaskEnd);
// 等待按任意键取消任务
Console.ReadKey();
cts.Cancel();
Console.ReadKey();
}
private static void TaskEnd(Task<int> task)
{
Console.WriteLine("任务完成,完成时候的状态为: ");
Console.WriteLine("IsCanceled={0}\tIsCompleted={1}\tIsFaulted={2}", task.IsCanceled, task.IsCompleted, task.IsFaulted);
Console.WriteLine("任务的返回值为: {0}", task.Result);
}
private static int Add(CancellationToken ct)
{
Console.WriteLine("任务开始...");
int result = 0;
while (!ct.IsCancellationRequested)
{
result++;
Thread.Sleep(1000);
}
return result;
}
}
}
// 开始几秒后,按下按键,运行结果是:
任务开始...
任务完成,完成时候的状态为:
IsCanceled=False IsCompleted=True IsFaulted=False
任务的返回值为: 9
|
也许我们会奇怪, 不是通过按键取消的么, 为什么完成的状态IsCanceled
还是为False, 因为在工作任务中,我们对IsCancellationRequested
进行了业务逻辑上的处理, 但是并没有通过ThrowIfCancellationRequested
来处理,如果采用ThrowIfCancellationRequested
, 代码应当如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Advice80
{
internal class Program
{
static void Main(string[] args)
{
CancellationTokenSource cts = new CancellationTokenSource();
Task<int> t = new Task<int>(() => AddCancelByThrow(cts.Token), cts.Token);
t.Start();
t.ContinueWith(TaskEndByCatch);
// 等待按任意键取消任务
Console.ReadKey();
cts.Cancel();
Console.ReadKey();
}
private static void TaskEndByCatch(Task<int> task)
{
Console.WriteLine("任务完成,完成时候的状态为: ");
Console.WriteLine("IsCanceled={0}\tIsCompleted={1}\tIsFaulted={2}", task.IsCanceled, task.IsCompleted, task.IsFaulted);
try
{
Console.WriteLine("任务的返回值为: {0}", task.Result);
}
catch (AggregateException e)
{
e.Handle((err) => err is OperationCanceledException);
}
}
private static int AddCancelByThrow(CancellationToken ct)
{
Console.WriteLine("任务开始...");
int result = 0;
while (true)
{
ct.ThrowIfCancellationRequested();
result++;
Thread.Sleep(1000);
}
return result;
}
}
}
// 运行结果是:
任务开始...
任务完成,完成时候的状态为:
IsCanceled=True IsCompleted=True IsFaulted=False
|
在任务结束求值的方法TaskEndedByCatch
中,如果任务是通过ThrowIfCancellation
Requested方法结束的,对任务求结果值将会抛出异常OperationCanceledException
,而不是得到抛出异常前的结果值。这意味着任务是通过异常的方式被取消掉的,所以可以注意到上面代码的输出中,状态IsCanceled
为True。
接着来看上面的输出,我们注意到取消是通过异常的方式实现的,而表示任务中发生了异常的IsFaulted
状态却还是为False,为什么呢?这是因为ThrowIfCancellation
Requested是协作式取消方式的类型CancellationTokenSource中的一个方法,CLR对其进行了特殊的处理。CLR知道这一行程序是开发者有意为之,所以不把它看做是一个异常(它被理解为取消)。要得到IsFaulted
等于True的状态,我们可以修改While循环,模拟一个异常出来,具体方法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
private static int AddCancelByThrow(CancellationToken ct)
{
Console.WriteLine("任务开始...");
int result = 0;
while (true)
{
// ct.ThrowIfCancellationRequested();
if (result == 5)
{
throw new Exception("error");
}
result++;
Thread.Sleep(1000);
}
return result;
}
// 运行结果是:
任务开始...
任务完成,完成时候的状态为:
IsCanceled=False IsCompleted=True IsFaulted=True
|
Task还支持任务工厂的概念, 任务工厂支持多个任务之间共享相同的状态,如取消类型CancellationTokenSource是可以被共享的, 通过使用任务工厂, 可以同时取消一组任务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Advice80
{
internal class Program
{
static void Main(string[] args)
{
CancellationTokenSource cts = new CancellationTokenSource();
TaskFactory taskFactory = new TaskFactory();
Task[] tasks = new Task[]
{
taskFactory.StartNew(() => Add(cts.Token)),
taskFactory.StartNew(() => Add(cts.Token)),
taskFactory.StartNew(() => Add(cts.Token)),
};
// CancellationToken.None, 指示TasksEnded不能被取消
taskFactory.ContinueWhenAll(tasks, TaskEnded, CancellationToken.None);
Console.ReadKey();
cts.Cancel();
Console.ReadKey();
}
private static int Add(CancellationToken ct)
{
Console.WriteLine("任务开始...");
int result = 0;
while (!ct.IsCancellationRequested)
{
result++;
Thread.Sleep(1000);
}
return result;
}
private static void TaskEnded(Task[] task)
{
Console.WriteLine("所有任务已完成!");
}
}
}
|
该条建议演示了Task(任务)和TaskFactory(任务工厂)的使用方法, Task进一步优化了后台线程的调度,加快了线程的处理速度。所以在FCL4.x
时代, 如果要使用多线程, 我们应该更多的使用Task;
建议81:使用Parallel简化同步状态下的Task的使用
在命名空间System.threading.Tasks
中, 有一个静态类Parallel简化了在同步状态下的Task的操作, Parallel主要有3个有用的方法:For、ForEach、Invoke
。
1.For方法主要用于处理针对数组元素的并行操作, 如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
using System;
using System.Threading.Tasks;
namespace Advice81
{
internal class Program
{
static void Main(string[] args)
{
int[] nums = new int[] { 1, 2, 3, 4, 5};
Parallel.For(0, nums.Length, i =>
{
Console.WriteLine("ArrayIndex{0} -> Element{1}", i, nums[i]);
});
Console.ReadKey();
}
}
}
|
可以看出,打印的结果并不是按数组的索引次序进行遍历的,这是因为我们的遍历是并行的,不是顺序的;因此,如果我们是想要的输出必须是同步的或者说是顺序输出的, 则不应该使用Parallel的形式;
使用For和ForEach
方法,Parallel类型会自动为我们分配Task来完成针对元素的一些工作。当然我们也可以直接使用Task,但是上面的这种形式在语法上更为简洁。
Parallel的Invoke方法为我们简化了启动一组并行操作,它隐式启动的就是Task。该方法接受Params Action[ ]
参数,如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
using System;
using System.Threading.Tasks;
namespace Advice81
{
internal class Program
{
static void Main(string[] args)
{
Parallel.Invoke(
() =>
{
Console.WriteLine("任务1......");
},
() =>
{
Console.WriteLine("任务2......");
},
() =>
{
Console.WriteLine("任务3......");
},
() =>
{
Console.WriteLine("任务4......");
});
Console.ReadKey();
}
}
}
// 可能的输出结果是:
任务1......
任务3......
任务2......
任务4......
|
同样的,由于所有的任务都是并行的, 所以它不保证先后顺序;
建议82:Parallel简化但不等同于Task默认行为
建议81说到了Parallel的使用方法,不知道大家是否注意到文中使用的字眼:在同步状态下简化了Task的使用。也就是说,在运行Parallel中的For、ForEach
方法时,调用者线程(在示例中就是主线程)是被阻滞的。Parallel虽然将任务交给Task去处理,即交给CLR线程池去处理,不过调用者会一直等到线程池中的相关工作全部完成。表示并行的静态类Parallel甚至只提供了Invoke方法,而没有同时提供一个BeginInvoke
方法,这也从一定程度上说明了这个问题。
从使用Task时, 我们最常使用的是Start方法(Task也提供了RunSynchronously), 它不会阻滞调用者线程。如下所示:
1
2
3
4
5
6
7
8
9
10
|
Task t = new Task(() =>
{
while (true)
{
}
});
t.Start();
Console.WriteLine("主线程即将结束");
Console.ReadKey();
|
如果执行这段代码, 永远不会有输出;
并行编程,意味者运行时在后台将任务分配到尽量多的CPU上,虽然它在后台使用Task进行管理,但这并不意味者它等同于异步;
建议83:小心Parallel中的陷阱
Parallel的For和ForEach
方法还支持一些相对复杂的应用。在这些应用中,它允许在每个任务启动时执行一些初始化的操作,在每个任务结束后,又执行一些后续操作;同时,还允许我们监视**线程(而不是任务)**的状态。
我们需要深刻理解这些具体的操作和应用, 下面去体会这段代码的输出是什么?:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Advice83
{
internal class Program
{
static void Main(string[] args)
{
int[] nums = { 1, 2, 3, 4 };
int total = 0;
Parallel.For<int>(0, nums.Length, () =>
{
return 1;
}, (i, loopState, subtotal) =>
{
subtotal += nums[i];
return subtotal;
},
(x) => Interlocked.Add(ref total, x));
Console.WriteLine("total={0}", total);
Console.ReadKey();
}
}
}
|
这段代码有可能输出11, 较少的情况下输出12,虽然理论上有可能输出13和14,但是我们应该很少有机会观察到。要明白为什么会有这样的输出,首先必须详细了解For方法的各个参数。上面这个For方法的声明如下:
1
2
|
[__DynamicallyInvokable]
public static ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)
|
前面的两个参数相对容易理解, 分别时起始索引和结束索引;
参数body也比较容易理解, 即任务体本身, 其中subtotal为单个任务的返回值;
localInit
和localFinally
就比较难以理解了, 并且陷阱也在这里; 我们需要先去理解Parallel.For
方法的运作模式。For方法采用并发的方式来启动循环体中的每个任务,这意味着,任务是交给线程池去管理的。在上面的代码中,循环次数共计4次,实际运行时调度启动的后台线程也就只有一个或两个。这就是并发的优势,也是线程池的优势,Parallel通过内部的调度算法,最大化地节约了线程的消耗。localInit
的作用是如果Parallel为我们新起了一个线程,它就会执行一些初始化的任务在上面的例子中:
1
2
3
4
|
() =>
{
return 1;
}
|
它会将任务体中的subtotal这个值初始化为1。
localFinally
的作用是,在每个线程结束的时候,它执行一些收尾工作:
(x) => Interlocked.Add(ref total, x)
这行代码所代表的收尾工作实际就是:
totaltotal = total + subtotal;
其中的x,其实代表的就是任务体中的返回值,具体在这个例子中就是subtotal在返回时的值。使用Interlocked是对total使用原子操作,以避免并发所带来的问题。
现在,我们应该很好理解为什么上面这段代码的输出会不确定了。Parallel一共启动了4个任务,但是我们不能确定Parallel到底为我们启动了多少个线程,那是运行时根据自己的调度算法决定的。如果所有的并发任务只用了一个线程,则输出为11;如果用了两个线程,那么根据程序的逻辑来看,输出就是12了。
在这段代码中,如果让localInit
返回的值为0,也许你就永远不会注意到这个陷阱:
1
2
3
4
|
() =>
{
return 0;
}
|
为了更加清晰的理解这个问题, 使用下面这个更好理解的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
using System;
using System.Threading.Tasks;
namespace Advice83
{
internal class Program
{
static void Main(string[] args)
{
string[] stringArr = new string[] { "aa", "bb", "cc", "dd", "ee", "ff", "gg", "hh"};
string result = string.Empty;
Parallel.For(0, stringArr.Length, () => "-", (i, loopState, subResult) =>
{
return subResult += stringArr[i];
}, (threadEndString) =>
{
result += threadEndString;
Console.WriteLine("Inner:" + threadEndString);
});
Console.WriteLine(result);
Console.ReadKey();
}
}
}
|
建议84:使用PLINQ
LINQ的基本功能就是对集合进行遍历查询, 并在此基础上对元素进行操作。仔细推敲就能发现, 并行编程特别适合这一类应用; 微软特别的为LINQ拓展了一个类ParallelEnumerable(该类型也在命名空间下System.Linq
中),它所提供的扩展方法会让Linq
支持并行计算, 这就是PLINQ;
传统的LINQ计算是单线程的,PLINQ则是并发的、多线程的,我们通过下面这个示例就可以看出这个区别:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
using System;
using System.Collections.Generic;
using System.Linq;
namespace Advice84
{
internal class Program
{
static void Main(string[] args)
{
List<int> intlist = new List<int>() { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
var query = from p in intlist select p;
Console.WriteLine("以下是LINQ的输出: ");
foreach (var item in query)
{
Console.WriteLine(item.ToString());
}
Console.WriteLine("以下是PLINQ的输出: ");
var queryParallel = from p in intlist.AsParallel() select p;
foreach (var item in queryParallel)
{
Console.WriteLine(item.ToString());
}
}
}
}
|
LINQ的输出会按照intList中的索引顺序打印出来。而PLINQ的输出是杂乱无章的。
并行输出还有另外一种方式可以处理,那就是对queryParallel求ForAll
:
1
2
3
4
|
queryParallel.ForAll((item) =>
{
Console.WriteLine(item.ToString());
});
|
但是这种方法会带来一个问题,如果要将并行输出后的结果进行排序,ForAll会忽略掉查询的AsOrdered请求。如下所示, AsOrdered方法可以对并行计算后的队列进行重新组合,以便保持顺序。可是在ForAll方法中,它所完成的输出仍是无序的。如果要保持AsOrdered方法的需求,我们应当始终使用第一种并行方式,即:
1
2
3
4
5
6
7
8
9
10
11
|
var queryParallel = from p in intlist.AsParallel().AsOrdered() select p;
foreach (var item in queryParallel) // 有序
{
Console.WriteLine(item.ToString());
}
Console.WriteLine("**************Split Line****************");
queryParallel.ForAll((item) => // 无序
{
Console.WriteLine(item.ToString());
});
|
在并行查询后再进行排序,会牺牲掉一定的性能。一些扩展方法默认会对元素进行排序,这些方法包括:OrderBy、OrderByDescending、ThenBy和ThenByDescending。在实际的使用中,一定要注意到各种方式之间的差别,以便程序按照我们的设想运行。
还有一些其他的查询方法,比如Take。如果我们这样编码:
1
2
3
4
|
foreach (int item in queryParallel.Take(5))
{
Console.WriteLine(item.ToString());
}
|
在顺序查询中,会返回前5个元素。但是在PLINQ中,会选出5个无序的元素。
建议在对集合中的元素项进行操作的时候使用PLINQ代替LINQ。但是要记住,不是所有并行查询的速度都会比顺序查询快,在对集合执行某些方法时,顺序查询的速度会更快一点,如方法ElementAt
等。在开发中,我们应该仔细辨别这方面的需求,以便找到最佳的解决方案。
建议85:Task中的异常处理
在任何时候,异常处理都是非常重要的一个环节。多线程与并行编程中尤其是这样。如果不处理这些后台任务中的异常,应用程序将会莫名其妙的退出。处理那些不是主线程(如果是窗体程序,那就是UI主线程)产生的异常,最终的办法都是将其包装到主线程上。
在任务并行库中,如果对任务运行Wait、WaitAny、WaitAll
等方法,或者求Result属性,都能捕获到AggregateException
异常。可以将AggregateException
异常看做是任务并行库编程中最上层的异常。在任务中捕获的异常,最终都应该包装到AggregateException
中。一个任务并行库异常的简单处理示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
using System;
using System.Threading.Tasks;
namespace Advice85
{
internal class Program
{
static void Main(string[] args)
{
Task t = new Task(() =>
{
throw new Exception("任务并行时候产生的异常");
});
t.Start();
try
{
// 若有Result, 可求Result
t.Wait();
}
catch (AggregateException e)
{
foreach (var item in e.InnerExceptions)
{
Console.WriteLine("ExceptionType: {0}{1} from {2}{3} Exception Content:{4}", item.GetType(), Environment.NewLine, item.Source, Environment.NewLine, item.Message);
}
}
Console.WriteLine("主线程马上结束!");
Console.ReadKey();
}
}
}
// 运行结果是:
ExceptionType: System.Exception
from Advice85
Exception Content:任务并行时候产生的异常
主线程马上结束!
|
大家也许已经注意到,虽然运行Wait、WaitAny、WaitAll
方法,或者求Result属性能得到任务的异常信息,但是这会阻滞当前线程。这往往不是我们所希望看到的,岂能为了得到一个异常就故意等待?这时可以考虑任务并行库中Task类型的一个功能:新起一个后续任务,就可以解决等待的问题:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
using System;
using System.Threading.Tasks;
namespace Advice85
{
internal class Program
{
static void Main(string[] args)
{
Task t = new Task(() =>
{
throw new Exception("任务并行编码时候产生的未知异常");
});
t.Start();
Task ttEnd = t.ContinueWith((task) =>
{
foreach (var item in task.Exception.InnerExceptions)
{
Console.WriteLine("ExceptionType: {0}{1} from {2}{3} Exception Content:{4}", item.GetType(), Environment.NewLine, item.Source, Environment.NewLine, item.Message);
}
});
Console.WriteLine("主线程马上结束!");
Console.ReadKey();
}
}
}
|
以上方法解决了主线程等待的问题,但是仔细研究我们会发现,异常处理没有回到主线程中,它还是在线程池中。在某些场合,比如对于业务逻辑上特定异常的处理,需要采取这种方式,而且我们也鼓励这种用法。但很明显,更多时候我们还需要更进一步将异常处理封装到主线程。
Task没有提供将任务中的异常包装到主线程的接口。一个可行的办法是,仍旧使用类似Wait的方法来达到此目的。在本建议一开始的代码中,我们对于主工作任务采用Wait的方法,这是不可取的。因为主工作任务也许会持续一段较长的时间,那样会阻塞调用者,并让调用者觉得不能忍受。而本建议的第二段代码中,新任务只完成了处理异常,这意味着新任务不会延续较长时间,所以,在这个新任务上维持等待对于调用者来说,是可以忍受的。所以,我们可以采用这个方法将异常包装到主线程中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
using System;
using System.Threading.Tasks;
namespace Advice85
{
internal class Program
{
static void Main(string[] args)
{
Task t = new Task(() =>
{
throw new Exception("任务并行编码时候产生的未知异常");
});
t.Start();
Task ttEnd = t.ContinueWith((task) =>
{
throw task.Exception;
}, TaskContinuationOptions.OnlyOnFaulted);
try
{
ttEnd.Wait();
}
catch (AggregateException err)
{
foreach (var item in err.InnerExceptions)
{
Console.WriteLine("ExceptionType: {0} {1} from {2} {3} Exception Content:{4}", item.InnerException.GetType(), Environment.NewLine, item.InnerException.Source, Environment.NewLine, item.InnerException.Message);
}
}
Console.WriteLine("主线程马上结束!");
Console.ReadKey();
}
}
}
// 程序运行的结果是:
ExceptionType: System.Exception
from Advice85
Exception Content:任务并行编码时候产生的未知异常
主线程马上结束!
|
故事并没有到此结束。
对线程调用Wait方法(或者求Result)不是最好的办法,因为它会阻滞主线程,并且CLR在后台会新起线程池线程来完成额外的工作。如果要包装异常到主线程,另外一个方法就是使用事件通知的方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
using System;
using System.Threading.Tasks;
namespace Advice85
{
public class Program
{
public class AggregateExceptionArgs : EventArgs
{
public AggregateException AggregateException { get; set; }
}
static event EventHandler<AggregateExceptionArgs> AggregateExceptionCatched;
static void Main(string[] args)
{
AggregateExceptionCatched += EventHandler<AggregateExceptionArgs>(Program_AggregateExceptionCatched);
Task t = new Task(() =>
{
try
{
throw new Exception("任务并行编码时候产生的未知异常");
}
catch (Exception err)
{
AggregateExceptionArgs errArgs = new AggregateExceptionArgs { AggregateException = new AggregateException(err) };
AggregateExceptionCatched(null, errArgs);
}
});
t.Start();
Console.WriteLine("主线程马上结束!");
Console.ReadKey();
}
private static EventHandler<T> EventHandler<T>(Action<object, T> program_AggregateExceptionCatched)
{
throw new NotImplementedException();
}
static void Program_AggregateExceptionCatched(object sender, AggregateExceptionArgs e)
{
foreach (var item in e.AggregateException.InnerExceptions)
{
Console.WriteLine("ExceptionType: {0} {1} from {2} {3} Exception Content:{4}", item.GetType(), Environment.NewLine, item.Source, Environment.NewLine, item.Message);
}
}
}
}
|
在这个实例中, 我们声明了一个委托AggregateExceptionCatchHandler
,它接受两个参数,一个是事件的通知者;另一个是事件变量AggregateExceptionArgs。AggregateExceptionArgs是为了包装异常而新建的一个类型。
在主线程中,我们为事件AggregateExceptionCatched分配了事件处理方法Program_AggregateExceptionCatched,当任务Task捕获到异常时,代码引发事件。这种方式完全没有阻滞主线程。如果是在Winform或WPF
窗体程序中,要在事件处理方法中处理UI界面,还可以将异常信息交给窗体的线程模型去处理。所以,最终建议大家采用事件通知的模型处理Task中的异常。
注意 任务调度器TaskScheduler
提供了这样一个功能,它有一个静态事件用于处理未捕获到的异常。一般不建议这样使用,因为事件回调是在进行垃圾回收的时候才发生的。如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
using System;
using System.Threading.Tasks;
namespace Advice85
{
public class Program
{
static void Main()
{
TaskScheduler.UnobservedTaskException += new EventHandler<
UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
Task t = new Task(() =>
{
throw new Exception("任务并行编码中产生的未知异常");
});
t.Start();
Console.ReadKey();
t.Dispose();
t = null;
GC.Collect(0);
Console.WriteLine("主线程马上结束");
Console.ReadKey();
}
static void TaskScheduler_UnobservedTaskException(object sender,
UnobservedTaskExceptionEventArgs e)
{
foreach (Exception item in e.Exception.InnerExceptions)
{
Console.WriteLine("异常类型:{0}{1}来自:{2}{3}异常内容:{4}",
item.GetType(), Environment.NewLine, item.Source,
Environment.NewLine, item.Message);
}
//将异常标识为已经观察到
e.SetObserved();
}
}
}
|
上面的这段代码运行的结果中并不会输出异常信息,因为发生异常的时刻,并没有发生垃圾回收(垃圾回收时机由CLR决定)。必须要将GC.Collect(0)
的注释去掉,强制执行垃圾回收,才会观察到异常信息。这也正是此种方式的局限性。
建议86:Parallel
中的异常处理
上一个建议探讨了如何处理Task的异常, 由于Task的Start方法是异步启动的,因此需要额外的技术来完成异常的处理;Parallel相对来说会简单的多,因为Parallel的调用者线程会等到所有的任务全部完成后,再继续自己的工作;简而言之, 它具有同步的特性, 下面示例的代码就可以实现将并发异常包装到主线程中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace Advice86
{
internal class Program
{
static void Main(string[] args)
{
try
{
var parallelExceptions = new ConcurrentQueue<Exception>();
Parallel.For(0, 1, (i) =>
{
try
{
throw new InvalidOperationException("并行任务中的出现的异常");
}
catch (Exception e)
{
parallelExceptions.Enqueue(e);
}
if (parallelExceptions.Count > 0)
{
throw new AggregateException(parallelExceptions);
}
});
}
catch (AggregateException err)
{
foreach (Exception item in err.InnerExceptions)
{
Console.WriteLine("异常类型:{0}{1}来自:{2}{3}异常内容:{4}",
item.InnerException.GetType(), Environment.NewLine, item.InnerException.Source,
Environment.NewLine, item.InnerException.Message);
}
Console.WriteLine("主线程马上结束!");
Console.ReadLine();
}
}
}
}
|
在Parallel的异常处理中,使用了一个线程安全的泛型集合ConcurrentQueue<T>
来处理并发中可能会遇到的集合线程安全性的问题;
WPF和WinForm窗体应用程序都有一个要求,那就是UI元素(如Button
、TextBox
等)必须由创建它的那个线程进行更新。WinForm在这方面的限制并不是很严格,所以像下面这样的代码,在WinForm中大部分情况下还能运行(本建议后面会详细解释为什么会出现这种现象):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
private void buttonStartAsync_Click(object sender, EventArgs e)
{
Task t = new Task(() =>
{
while (true)
{
label1.Text = DateTime.Now.ToString();
Thread.Sleep(1000);
}
});
//如果有异常,就启动一个新任务
t.ContinueWith((task) =>
{
try
{
task.Wait();
}
catch (AggregateException ex)
{
foreach (Exception inner in ex.InnerExceptions)
{
MessageBox.Show(string.Format("异常类型:{0}{1}来自于:{2}{3}异常内容:{4}", inner.GetType(), Environment.NewLine, inner.Source, Environment.NewLine, inner.Message));
}
}
}, TaskContinuationOptions.OnlyOnFaulted);
t.Start();
}
|
上面这段代码如果放在WPF中,肯定会抛System.InvalidOperationException
异常。
理论上,WinForm和WPF的线程模型非常接近,它们最后都是调用API(GetMessage或PeekMessage)来处理其他线程发送过来的消息,这些消息存储在系统的一个消息队列中。在WinForm和WPF中,创建主界面的线程就是主线程,也就是UI线程,UI线程负责处理该消息队列。只是两者在处理消息队列的上层机制上稍微有一些不同,这就造成了同样的代码得到不同的结果。
在WinForm框架中有一个ISynchronizeInvoke
接口,所有的UI元素(表现为Control)都继承了该接口。其中,接口中的InvokdRequired
属性表示了当前线程是否是创建它的线程。接口中的Invoke和BeginInvoke
方法负责将消息发送到消息队列中,这样,UI线程就能够正确处理它了。那么,上面的这段代码在WinForm上的改进版本为(仅列出While循环部分):
1
2
3
4
5
6
7
8
9
10
11
|
while (true)
{
if (label1.InvokeRequired)
label1.BeginInvoke(new Action(() =>
{
label1.Text = DateTime.Now.ToString();
}));
else
label1.Text = DateTime.Now.ToString();
Thread.Sleep(1000);
}
|
BeginInvoke
方法接受的是一个Delegate类型的参数,在这里我们用一个Action来实现。
WPF应用程序的线程模型则完全依赖于DispatcherObject
类型。所有的WPF控件都继承自一个抽象类Visual,而这个抽象类又最终继承自DispatcherObject
类型。在这个DispatcherObject
类型中有一个属性,两个方法。属性Dispatcher完成所有的工作线程和UI线程之间的调度任务。CheckAccess
方法负责检测工作线程是否可以访问控件,如果是,则返回True;否则返回False。VerifyAccess
方法则负责检测工作线程是否具有控件的访问权限,如果不能访问则抛出异常InvalidOperationException
。
WinForm应用程序用类似CheckAccess
的方式进行访问权限的判断;WPF应用程序则进行了改进,所有的UI控件都采用VerifyAccess
的方式进行工作线程访问权限的判断。这直接决定了本建议开头处那个例子的输出,WPF只要判断出工作线程和UI线程不是同一个线程的,则直接抛出异常,而WinForm却有成功执行的余地。但是,WinForm的这种机制直接造成了程序的不稳定,因为即使在大部分情况下代码能很好的工作,可是在不确定的情况下,那样的代码中工作线程会直接操作UI元素,这样还是会抛出异常的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
//用于表示主线程,在本例中就是UI线程
Thread mainThread;
bool CheckAccess()
{
return mainThread == Thread.CurrentThread;
}
void VerifyAccess()
{
if (!CheckAccess())
throw new InvalidOperationException("调用线程无法访问此对象,因为另一个线程拥有此对象");
}
private void buttonStartAsync_Click(object sender, EventArgs e)
{
//当前线程就是主线程
mainThread = Thread.CurrentThread;
Task t = new Task(() =>
{
while (true)
{
if (!CheckAccess())
label1.BeginInvoke(new Action(() =>
{
label1.Text = DateTime.Now.ToString();
}));
else
label1.Text = DateTime.Now.ToString();
Thread.Sleep(1000);
}
});
// 如果有异常,就启动一个新任务
t.ContinueWith((task) =>
{
try
{
task.Wait();
}
catch (AggregateException ex)
{
foreach (Exception inner in ex.InnerExceptions)
{
MessageBox.Show(string.Format("异常类型:{0}{1}来自于:{2}{3}异常内容:{4}", inner.GetType(), Environment.NewLine, inner.Source, Environment.NewLine, inner.Message));
}
}
}, TaskContinuationOptions.OnlyOnFaulted);
t.Start();
}
|
在这段代码中, 模拟WPF中DispatcherObject的两个方法CheckAccess和VerifyAccess对线程模型进行了重新处理,增强了系统的稳定性。在实际工作中,我们也可以提取这两个方法为扩展方法,以便项目中的所有UI类型都能使用到。
WPF支持这两个方法,其全部代码如下所示(注意查看While循环部分)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
private void buttonStart_Click(object sender, RoutedEventArgs e)
{
Task t = new Task(() =>
{
while (true)
{
this.Dispatcher.BeginInvoke(new Action(() =>
{
textBlock1.Text = DateTime.Now.ToString();
}));
Thread.Sleep(1000);
}
});
//为了捕获异常,启动了一个新任务
t.ContinueWith((task) =>
{
try
{
task.Wait();
}
catch (AggregateException ex)
{
foreach (Exception inner in ex.InnerExceptions)
{
MessageBox.Show(string.Format("异常类型:{0}{1}来自:{2}{3}异常内容:{4}", inner.GetType(), Environment.NewLine,
inner.Source, Environment.NewLine, inner.Message));
}
}
}, TaskContinuationOptions.OnlyOnFaulted);
t.Start();
}
|
注意: 为了演示方便, 本建议中的异常没有传递到主线程;实际编码中,应该始终考虑将异常包装到主线程;
建议88:并行并不意味着速度更快
并行所带来的后台任务以及任务的管理, 都会带来一定的开销,如果一项工作本来就能很快的完成,或者说其循环体就很小,那么如果采用并行可能会比非并行慢;
看下面的一个例子, 比较一下同步和并行状态下的时间消耗:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
using System;
using System.Diagnostics;
using System.Threading.Tasks;
namespace Advice88
{
internal class Program
{
static void Main(string[] args)
{
Stopwatch watch = new Stopwatch();
watch.Start();
DoInFor();
watch.Stop();
Console.WriteLine("Sync Cost time:{0}", watch.Elapsed);
watch.Restart();
DoInParalleFor();
watch.Stop();
Console.WriteLine("Parallel Cost time:{0}", watch.Elapsed);
}
static void DoSometing()
{
for (int i = 0; i < 10000000; i++)
{
i++;
}
}
static void DoInFor()
{
for (int i = 0; i < 200; i++)
{
DoSometing();
}
}
static void DoInParalleFor()
{
Parallel.For(0, 200, (i) =>
{
DoSometing();
});
}
}
}
|
这个是在测试的四核PC机上的运行结果是:
可见,同步只用了0.1084ms,
而并行用了7.8ms才完成工作;
现在,为了模拟让循环体做更多事情,将DoSomething
方法中的循环体由10变为10000000。运行的结果为:
当循环体需要做更多工作的时候,我们发现,同步需要3.77秒才能完成工作,而并行则仅使用1.03秒就完成了工作。
建议89:在并行方法体中谨慎使用锁
除建议88里面提到的场合, 要谨慎使用并行的情况还包括:某些本身就需要同步运行的场合,或者需要较长时间锁定共享资源的场合;在对整型数据进行同步操作时,可以使用静态类InterLocked
的Add方法,这就极大地避免了由于进行原子操作长时间锁定某个共享资源所带来的同步性能损耗。建议83的示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Advice83
{
internal class Program
{
static void Main(string[] args)
{
int[] nums = { 1, 2, 3, 4 };
int total = 0;
Parallel.For<int>(0, nums.Length, () =>
{
return 1;
}, (i, loopState, subtotal) =>
{
subtotal += nums[i];
return subtotal;
},
(x) => Interlocked.Add(ref total, x));
Console.WriteLine("total={0}", total);
Console.ReadKey();
}
}
}
|
理论上,针对total的加法操作,需要使用一个同步锁,否则就无法避免一次torn read(即两次mov
操作所导致的字段内存地址边界对齐问题)。FCL通过提供Interlocked类型解决了这个问题。FCL用来解决简单类型的原子性操作还提供了volatile关键字。不过这些都不是本建议所要讨论的重点。FCL现有的原子性操作为我们同步整型数据的时候,带来了性能上的提高。但是,在其他一些场合,我们却不得不考虑因为同步锁带来的损耗。
来看一个例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
using System;
using System.Threading.Tasks;
namespace Advice89
{
internal class Program
{
static void Main(string[] args)
{
SampleClass sample = new SampleClass();
Parallel.For(0, 10000000, (i) =>
{
sample.SimpleAdd();
});
Console.WriteLine(sample.SomeCount);
}
}
internal class SampleClass
{
public long SomeCount { get; private set; }
public void SimpleAdd()
{
SomeCount++;
}
}
}
|
可能输出的一个结果是: 3318274
;
显然, 这和我们期待输出的10000000
有较大的差距。为了保证输出正确, 必须为并行中的方法体加锁(假设SampelClass是外部提供的API,无权进行源码修改在其内部加锁):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
using System;
using System.Threading.Tasks;
namespace Advice89
{
internal class Program
{
static void Main(string[] args)
{
SampleClass sample = new SampleClass();
object syncObj = new object();
Parallel.For(0, 10000000, (i) =>
{
lock (syncObj)
{
sample.SimpleAdd();
}
});
Console.WriteLine(sample.SomeCount);
}
}
internal class SampleClass
{
public long SomeCount { get; private set; }
public void SimpleAdd()
{
SomeCount++;
}
}
}
|
加锁之后,代码的输出变正确了。但是,这段代码会带来其他的问题;由于锁的存在,系统的开销也增加了,同步带来的线程上下文切换,使我们牺牲了CPU时间与空间性能。简单来说,这段代码还不如不用并行,要确保锁定恰当的同步对象,锁其实就是让多线程变成单线程(因为同时只允许一个线程访问资源)。所以我们需要谨慎的对待并行方法中的同步问题。如果方法体的全部内容都需要同步运行,就完全不应该使用并行;