@TryLoveCatch
2022-04-21T15:58:58.000000Z
字数 8388
阅读 676
Java知识体系
并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。这两大问题,管程都是能够解决的。Java SDK并发包通过Lock和Condition两个接口来实现管程,其中Lock用于解决互斥问题,Condition用于解决同步问题。
Java语言本身提供的synchronized也是管程的一种实现,既然Java从语言层面已经实现了管程了,那为什么还要在SDK里提供另外一种实现呢?
你也许曾经听到过很多这方面的传说,例如在Java的1.5版本中,synchronized性能不如SDK里面的Lock,但1.6版本之后,synchronized做了很多优化,将性能追了上来,所以1.6之后的版本又有人推荐使用synchronized了。那性能是否可以成为“重复造轮子”的理由呢?显然不能。因为性能问题优化一下就可以了,完全没必要“重复造轮子”。
到这里,关于这个问题,你是否能够想出一条理由来呢?如果你细心的话,也许能想到一点。那就是我们前面在介绍死锁问题的时候,提出了一个破坏不可抢占条件方案,但是这个方案synchronized没有办法解决。原因是synchronized申请资源的时候,如果申请不到,线程直接进入阻塞状态了,而线程进入阻塞状态,啥都干不了,也释放不了线程已经占有的资源。但我们希望的是:
如果我们重新设计一把互斥锁去解决这个问题,那该怎么设计呢?我觉得有三种方案。
这三种方案可以全面弥补synchronized的问题。到这里相信你应该也能理解了,这三个方案就是“重复造轮子”的主要原因,体现在API上,就是Lock接口的三个方法。详情如下:
// 支持中断的API
void lockInterruptibly() throws InterruptedException;
// 支持超时的API
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 支持非阻塞获取锁的API
boolean tryLock();
Java SDK里面Lock的使用,有一个经典的范例,就是try{}finally{},需要重点关注的是在finally里面释放锁。这个范例无需多解释,你看一下下面的代码就明白了:
class X {
private final Lock rtl = new ReentrantLock();
int value;
public void addOne() {
// 获取锁
rtl.lock();
try {
value+=1;
} finally {
// 保证锁能释放
rtl.unlock();
}
}
}
但是有一点需要解释一下,那就是可见性是怎么保证的。你已经知道Java里多线程的可见性是通过Happens-Before规则保证的,而synchronized之所以能够保证可见性,也是因为有一条synchronized相关的规则:synchronized的解锁 Happens-Before 于后续对这个锁的加锁。
那Java SDK里面Lock靠什么保证可见性呢?
例如在下面的代码中,线程T1对value进行了+=1操作,那后续的线程T2能够看到value的正确结果吗?
答案必须是肯定的。
Java SDK里面锁的实现非常复杂,这里我就不展开细说了,但是原理还是需要简单介绍一下:它是利用了volatile相关的Happens-Before规则。
Java SDK里面的ReentrantLock,内部持有一个volatile 的成员变量state,获取锁的时候,会读写state的值;解锁的时候,也会读写state的值(简化后的代码如下面所示):
class SampleLock {
volatile int state;
// 加锁
lock() {
// 省略代码无数
state = 1;
}
// 解锁
unlock() {
// 省略代码无数
state = 0;
}
}
也就是说,在执行value+=1之前,程序先读写了一次volatile变量state,在执行value+=1之后,又读写了一次volatile变量state。根据相关的Happens-Before规则:
所以说,后续线程T2能够看到value的正确结果。
如果你细心观察,会发现我们创建的锁的具体类名是ReentrantLock,这个翻译过来叫可重入锁,这个概念前面我们一直没有介绍过。
所谓可重入锁,顾名思义,指的是线程可以重复获取同一把锁。
例如下面代码中:
class X {
private final Lock rtl =
new ReentrantLock();
int value;
public int get() {
// 获取锁
rtl.lock(); ②
try {
return value;
} finally {
// 保证锁能释放
rtl.unlock();
}
}
public void addOne() {
// 获取锁
rtl.lock();
try {
value = 1 + get(); ①
} finally {
// 保证锁能释放
rtl.unlock();
}
}
}
当线程T1执行到 ① 处时,已经获取到了锁 rtl ,当在 ① 处调用 get()方法时,会在 ② 再次对锁 rtl 执行加锁操作。
除了可重入锁,可能你还听说过可重入函数,可重入函数怎么理解呢?指的是线程可以重复调用?
显然不是,所谓可重入函数,指的是多个线程可以同时调用该函数,每个线程都能得到正确结果;同时在一个线程内支持线程切换,无论被切换多少次,结果都是正确的。多线程可以同时执行,还支持线程切换,这意味着什么呢?线程安全啊。所以,可重入函数是线程安全的。
在使用ReentrantLock的时候,你会发现ReentrantLock这个类有两个构造函数,一个是无参构造函数,一个是传入fair参数的构造函数。fair参数代表的是锁的公平策略,如果传入true就表示需要构造一个公平锁,反之则表示要构造一个非公平锁。
//无参构造函数:默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
//根据公平策略参数创建锁
public ReentrantLock(boolean fair){
sync = fair ? new FairSync()
: new NonfairSync();
}
在前面我们介绍过入口等待队列,锁都对应着一个等待队列,如果一个线程没有获得锁,就会进入等待队列,当有线程释放锁的时候,就需要从等待队列中唤醒一个等待的线程。如果是公平锁,唤醒的策略就是谁等待的时间长,就唤醒谁,很公平;如果是非公平锁,则不提供这个公平保证,有可能等待时间短的线程反而先被唤醒。
你已经知道,用锁虽然能解决很多并发问题,但是风险也是挺高的。可能会导致死锁,也可能影响性能。这方面有是否有相关的最佳实践呢?有,还很多。但是我觉得最值得推荐的是并发大师Doug Lea《Java并发编程:设计原则与模式》一书中,推荐的三个用锁的最佳实践,它们分别是:
这三条规则,前两条估计你一定会认同,最后一条你可能会觉得过于严苛。但是我还是倾向于你去遵守,因为调用其他对象的方法,实在是太不安全了,也许“其他”方法里面有线程sleep()的调用,也可能会有奇慢无比的I/O操作,这些都会严重影响性能。更可怕的是,“其他”类的方法可能也会加锁,然后双重加锁就可能导致死锁。
并发问题,本来就难以诊断,所以你一定要让你的代码尽量安全,尽量简单,哪怕有一点可能会出问题,都要努力避免。
Java SDK 并发包里的Lock接口里面的每个方法,你可以感受到,都是经过深思熟虑的。除了支持类似synchronized隐式加锁的lock()方法外,还支持超时、非阻塞、可中断的方式获取锁,这三种方式为我们编写更加安全、健壮的并发程序提供了很大的便利。希望你以后在使用锁的时候,一定要仔细斟酌。
除了并发大师Doug Lea推荐的三个最佳实践外,你也可以参考一些诸如:减少锁的持有时间、减小锁的粒度等业界广为人知的规则,其实本质上它们都是相通的,不过是在该加锁的地方加锁而已。你可以自己体会,自己总结,最终总结出自己的一套最佳实践来。
接着再来详细聊聊Java SDK并发包里的Condition,Condition实现了管程模型里面的条件变量。
在《08 | 管程:并发编程的万能钥匙》里我们提到过Java 语言内置的管程里只有一个条件变量,而Lock&Condition实现的管程是支持多个条件变量的,这是二者的一个重要区别。
在很多并发场景下,支持多个条件变量能够让我们的并发程序可读性更好,实现起来也更容易。例如,实现一个阻塞队列,就需要两个条件变量。
一个阻塞队列,需要两个条件变量,一个是队列不空(空队列不允许出队),另一个是队列不满(队列已满不允许入队),这个例子我们前面在介绍管程的时候详细说过,这里就不再赘述。相关的代码,我这里重新列了出来,你可以温故知新一下。
public class BlockedQueue{
final Lock lock =
new ReentrantLock();
// 条件变量:队列不满
final Condition notFull =
lock.newCondition();
// 条件变量:队列不空
final Condition notEmpty =
lock.newCondition();
// 入队
void enq(T x) {
lock.lock();
try {
while (队列已满){
// 等待队列不满
notFull.await();
}
// 省略入队操作...
//入队后,通知可出队
notEmpty.signal();
}finally {
lock.unlock();
}
}
// 出队
void deq(){
lock.lock();
try {
while (队列已空){
// 等待队列不空
notEmpty.await();
}
// 省略出队操作...
//出队后,通知可入队
notFull.signal();
}finally {
lock.unlock();
}
}
}
不过,这里你需要注意,Lock和Condition实现的管程,线程等待和通知需要调用await()、signal()、signalAll(),它们的语义和wait()、notify()、notifyAll()是相同的。
但是不一样的是,Lock&Condition实现的管程里只能使用前面的await()、signal()、signalAll(),而后面的wait()、notify()、notifyAll()只有在synchronized实现的管程里才能使用。
如果一不小心在Lock&Condition实现的管程里调用了wait()、notify()、notifyAll(),那程序可就彻底玩儿完了。
Java SDK并发包里的Lock和Condition不过就是管程的一种实现而已,管程你已经很熟悉了,那Lock和Condition的使用自然是小菜一碟。下面我们就来看看在知名项目Dubbo中,Lock和Condition是怎么用的。不过在开始介绍源码之前,我还先要介绍两个概念:同步和异步。
我们平时写的代码,基本都是同步的。但最近几年,异步编程大火。那同步和异步的区别到底是什么呢?通俗点来讲就是调用方是否需要等待结果,如果需要等待结果,就是同步;如果不需要等待结果,就是异步。
比如在下面的代码里,有一个计算圆周率小数点后100万位的方法pai1M(),这个方法可能需要执行俩礼拜,如果调用pai1M()之后,线程一直等着计算结果,等俩礼拜之后结果返回,就可以执行 printf("hello world")了,这个属于同步;如果调用pai1M()之后,线程不用等待计算结果,立刻就可以执行 printf("hello world"),这个就属于异步。
// 计算圆周率小说点后100万位
String pai1M() {
//省略代码无数
}
pai1M()
printf("hello world")
同步,是Java代码默认的处理方式。如果你想让你的程序支持异步,可以通过下面两种方式来实现:
调用方创建一个子线程,在子线程中执行方法调用,这种调用我们称为异步调用;
方法实现的时候,创建一个新的线程执行主要逻辑,主线程直接return,这种方法我们一般称为异步方法。
Dubbo源码分析
其实在编程领域,异步的场景还是挺多的,比如TCP协议本身就是异步的,我们工作中经常用到的RPC调用,在TCP协议层面,发送完RPC请求后,线程是不会等待RPC的响应结果的。可能你会觉得奇怪,平时工作中的RPC调用大多数都是同步的啊?这是怎么回事呢?
其实很简单,一定是有人帮你做了异步转同步的事情。例如目前知名的RPC框架Dubbo就给我们做了异步转同步的事情,那它是怎么做的呢?下面我们就来分析一下Dubbo的相关源码。
对于下面一个简单的RPC调用,默认情况下sayHello()方法,是个同步方法,也就是说,执行service.sayHello(“dubbo”)的时候,线程会停下来等结果。
DemoService service = 初始化部分省略
String message =
service.sayHello("dubbo");
System.out.println(message);
如果此时你将调用线程dump出来的话,会是下图这个样子,你会发现调用线程阻塞了,线程状态是TIMED_WAITING。本来发送请求是异步的,但是调用线程却阻塞了,说明Dubbo帮我们做了异步转同步的事情。通过调用栈,你能看到线程是阻塞在DefaultFuture.get()方法上,所以可以推断:Dubbo异步转同步的功能应该是通过DefaultFuture这个类实现的。
调用栈信息
不过为了理清前后关系,还是有必要分析一下调用DefaultFuture.get()之前发生了什么。DubboInvoker的108行调用了DefaultFuture.get(),这一行很关键,我稍微修改了一下列在了下面。这一行先调用了request(inv, timeout)方法,这个方法其实就是发送RPC请求,之后通过调用get()方法等待RPC返回结果。
public class DubboInvoker{
Result doInvoke(Invocation inv){
// 下面这行就是源码中108行
// 为了便于展示,做了修改
return currentClient
.request(inv, timeout)
.get();
}
}
DefaultFuture这个类是很关键,我把相关的代码精简之后,列到了下面。不过在看代码之前,你还是有必要重复一下我们的需求:当RPC返回结果之前,阻塞调用线程,让调用线程等待;当RPC返回结果后,唤醒调用线程,让调用线程重新执行。不知道你有没有似曾相识的感觉,这不就是经典的等待-通知机制吗?这个时候想必你的脑海里应该能够浮现出管程的解决方案了。有了自己的方案之后,我们再来看看Dubbo是怎么实现的。
// 创建锁与条件变量
private final Lock lock
= new ReentrantLock();
private final Condition done
= lock.newCondition();
// 调用方通过该方法等待结果
Object get(int timeout){
long start = System.nanoTime();
lock.lock();
try {
while (!isDone()) {
done.await(timeout);
long cur=System.nanoTime();
if (isDone() ||
cur-start > timeout){
break;
}
}
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException();
}
return returnFromResponse();
}
// RPC结果是否已经返回
boolean isDone() {
return response != null;
}
// RPC结果返回时调用该方法
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
}
调用线程通过调用get()方法等待RPC返回结果,这个方法里面,你看到的都是熟悉的“面孔”:调用lock()获取锁,在finally里面调用unlock()释放锁;获取锁后,通过经典的在循环中调用await()方法来实现等待。
当RPC结果返回时,会调用doReceived()方法,这个方法里面,调用lock()获取锁,在finally里面调用unlock()释放锁,获取锁后通过调用signal()来通知调用线程,结果已经返回,不用继续等待了。
至此,Dubbo里面的异步转同步的源码就分析完了,有没有觉得还挺简单的?最近这几年,工作中需要异步处理的越来越多了,其中有一个主要原因就是有些API本身就是异步API。例如websocket也是一个异步的通信协议,如果基于这个协议实现一个简单的RPC,你也会遇到异步转同步的问题。现在很多公有云的API本身也是异步的,例如创建云主机,就是一个异步的API,调用虽然成功了,但是云主机并没有创建成功,你需要调用另外一个API去轮询云主机的状态。如果你需要在项目内部封装创建云主机的API,你也会面临异步转同步的问题,因为同步的API更易用。
总结
Lock&Condition是管程的一种实现,所以能否用好Lock和Condition要看你对管程模型理解得怎么样。管程的技术前面我们已经专门用了一篇文章做了介绍,你可以结合着来学,理论联系实践,有助于加深理解。
Lock&Condition实现的管程相对于synchronized实现的管程来说更加灵活、功能也更丰富。
结合我自己的经验,我认为了解原理比了解实现更能让你快速学好并发编程,所以没有介绍太多Java SDK并发包里锁和条件变量是如何实现的。但如果你对实现感兴趣,可以参考《Java并发编程的艺术》一书的第5章《Java中的锁》,里面详细介绍了实现原理,我觉得写得非常好。
另外,专栏里对DefaultFuture的代码缩减了很多,如果你感兴趣,也可以去看看完整版。
Dubbo的源代码在Github上,DefaultFuture的路径是:incubator-dubbo/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java。