[关闭]
@TryLoveCatch 2022-04-20T15:09:23.000000Z 字数 67748 阅读 2257

Java知识体系之线程

Java知识体系


基础知识

实现线程主要有3种方式:
1)使用内核线程实现
2)使用用户线程实现
3)使用用户线程轻量级进程混合实现

内核线程(Kernel-Level Thread,KLT)

内核线程(Kernel-Level Thread): 就是直接由操作系统内核(下称内核 Kernel)支持的线程,这种线程由内核来完成线程切换,内核通过操纵调度器(Scheduler)对线程进行调度,并负责将线程的任务映射到各个处理器上。

多线程内核(Multi-Threads Kernel):每个内核线程可以视为内核的一个分身,这样操作系统就有能力同时处理多个事情,支持多线程的内核就叫多线程内核。

轻量级进程(Light Weight Process,LWP)

程序一般不会直接去使用内核线程,而是去使用内核线程的一种高级接口——轻量级进程(Light Weight Process,LWP)轻量级进程就是我们通常意义上所讲的线程,由于每个轻量级进程都由一个内核线程支持,因此只有先支持内核线程,才能有轻量级进程。这种轻量级进程内核线程之间1:1的关系称为一对一的线程模型。

轻量级进程内核线程之间1:1的关系:

劣势:

用户线程(User Thread,UT)

用户线程(User Thread,UT),广义上,一个线程只要不是内核线程,就是用户线程轻量级进程也属于用户线程,但轻量级进程的实现始终是建立在内核之上的,许多操作都要进行系统调用,效率会受到限制。
狭义上,用户线程指的是完全建立在用户空间的线程库上,系统内核不能感知线程存在的实现。

这种进程与用户线程之间1:N的关系称为一对多的线程模型:

优势:

劣势:

用户线程加轻量级进程混合实现

在这种混合实现下,即存在用户线程,也存在轻量级进程。在这种混合模式中,用户线程与轻量级进程的数量比是不定的,即为N:M的关系:

优势:

java线程的实现

Java线程JDK1.2之前,是基于称为绿色线程(Green Threads)用户线程实现的,后来放弃了;而在JDK1.2中,线程模型替换为基于操作系统原生线程模型来实现。因此,在目前的JDK版本中,操作系统支持怎样的线程模型,在很大程度上决定了Java虚拟机的线程是怎样映射的。

对于Sun JDK来说,它的Windows版Linux版都是使用一对一的线程模型实现的,一条Java线程就映射到一条轻量级进程之中,因为WindowsLinux系统提供的线程模型就是一对一的。


线程状态

Java 中,线程的状态底层操作系统中线程的状态并不是一一对应的关系,我们所能见到的是 JVM 虚拟机层面暴露的状态。

操作系统线程状态

通常,在操作系统这一层面,线程存在五类状态,状态的转换关系如下图:

java线程状态

注意:这里说的线程状态,指的是JVM层面暴露给我们的状态,与操作系统底层的线程状态是两个不同层面的事儿。我们下面说的线程状态都是Thread.State明确定义的。

先上图:

我们来看一下 Thread.State的代码:

  1. public enum State {
  2. /**
  3. * Thread state for a thread which has not yet started.
  4. */
  5. NEW,
  6. /**
  7. * Thread state for a runnable thread. A thread in the runnable
  8. * state is executing in the Java virtual machine but it may
  9. * be waiting for other resources from the operating system
  10. * such as processor.
  11. */
  12. RUNNABLE,
  13. /**
  14. * Thread state for a thread blocked waiting for a monitor lock.
  15. * A thread in the blocked state is waiting for a monitor lock
  16. * to enter a synchronized block/method or
  17. * reenter a synchronized block/method after calling
  18. * {@link Object#wait() Object.wait}.
  19. */
  20. BLOCKED,
  21. /**
  22. * Thread state for a waiting thread.
  23. * A thread is in the waiting state due to calling one of the
  24. * following methods:
  25. * <ul>
  26. * <li>{@link Object#wait() Object.wait} with no timeout</li>
  27. * <li>{@link #join() Thread.join} with no timeout</li>
  28. * <li>{@link LockSupport#park() LockSupport.park}</li>
  29. * </ul>
  30. *
  31. * <p>A thread in the waiting state is waiting for another thread to
  32. * perform a particular action.
  33. *
  34. * For example, a thread that has called <tt>Object.wait()</tt>
  35. * on an object is waiting for another thread to call
  36. * <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
  37. * that object. A thread that has called <tt>Thread.join()</tt>
  38. * is waiting for a specified thread to terminate.
  39. */
  40. WAITING,
  41. /**
  42. * Thread state for a waiting thread with a specified waiting time.
  43. * A thread is in the timed waiting state due to calling one of
  44. * the following methods with a specified positive waiting time:
  45. * <ul>
  46. * <li>{@link #sleep Thread.sleep}</li>
  47. * <li>{@link Object#wait(long) Object.wait} with timeout</li>
  48. * <li>{@link #join(long) Thread.join} with timeout</li>
  49. * <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
  50. * <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
  51. * </ul>
  52. */
  53. TIMED_WAITING,
  54. /**
  55. * Thread state for a terminated thread.
  56. * The thread has completed execution.
  57. */
  58. TERMINATED;
  59. }

New

新建状态,新创建了一个线程对象。

Runnable

javadoc中的注释:

A thread in the runnable state is executing in the Java virtual machine but it may be waiting for other resources from the operating system such as processor.
处于 runnable 状态下的线程正在 Java 虚拟机中执行,但它可能正在等待来自于操作系统的其它资源,比如处理器。

实际上, Java 线程状态Runnable包括了传统的 readyrunning状态,并且还包含了一部分的waiting状态。

java中为什么没有区分ready和running呢?

操作系统架构通常都是用所谓的时间分片(time quantum or time slice)方式进行抢占式线程调度(Preemptive Threads-Scheduling)。正常情况下,这个时间分片通常是很小的,一个线程一次最多只能在 cpu 上运行比如10-20ms 的时间(此时处于 running状态),也即大概只有0.01秒这一量级,时间片用后就要被切换下来放入调度队列的末尾等待再次调度(也即回到 ready 状态)。
所以,线程切换得是如此之快,那么区分 readyrunning 就没什么太大意义了。

为什么Runnable还包含一部分waiting状态?
这里的部分waiting,指的就是I/O阻塞
先说结论: I/O阻塞的时候,操作系统的线程状态处于waiting,但是java线程状态处于Runnable

再次看一下操作系统的线程状态图:

我们知道传统的I/O都是阻塞式(blocked)的,原因是I/O操作比起cpu来实在是太慢了,可能差到好几个数量级都说不定。如果让 cpu去等I/O 的操作,很可能时间片都用完了,I/O操作还没完成呢,不管怎样,它会导致 cpu 的利用率极低。

所以,解决办法就是:一旦线程中执行到 I/O 有关的代码,相应线程立马被切走,然后调度 ready 队列中另一个线程来运行。

这时执行了 I/O 的线程就不再运行,即所谓的被阻塞了。它也不会被放到调度队列中去,因为很可能再次调度到它时,I/O可能仍没有完成。

线程会被放到所谓的等待队列中,处于上图中的 waiting 状态。

当然了,我们所谓阻塞只是指这段时间 cpu 暂时不会理它了,但另一个部件比如硬盘则在努力地为它服务。cpu与硬盘间是并发的。当在 cpu 上是 waiting 时,在硬盘上却处于 running,只是我们在操作系统层面讨论线程状态时通常是围绕着 cpu这一中心去述说的。

而当 I/O 完成时,则用一种叫中断(interrupt)的机制来通知 cpu
cpu 会收到一个比如说来自硬盘中断信号,并进入中断处理流程,手头正在执行的线程因此被打断,回到 ready 队列。而先前因 I/Owaiting 的线程,随着 I/O 的完成也再次回到 ready 队列,这时 cpu 可能会选择它来执行。

理解Runnable状态

当进行阻塞式的 IO 操作时,或许底层的操作系统线程确实处在阻塞状态,但我们关心的是 JVM 的线程状态JVM 并不关心底层的实现细节,什么时间分片也好,什么 IO 时就要切换也好,它并不关心。

前面说到:

处于 runnable 状态下的线程正在 Java 虚拟机中执行,但它可能正在等待来自于操作系统的其它资源,比如处理器

JVM 把哪些都视作资源呢?

cpu 也好,硬盘,网卡也罢,有东西在为线程服务,它就认为线程在“执行”。

处于 IO 阻塞,只是说 cpu 不执行线程了,但网卡可能还在监听呀,虽然可能暂时没有收到数据;或者硬盘可能在执行啊,所以 JVM 认为线程还在执行。

操作系统的线程状态是围绕着 cpu 这一核心去述说的,这与 JVM 的侧重点是有所不同的。

总结:
RUNNABLE 状态对应了传统的 readyrunning 以及部分的 waiting 状态。如下图:

Blocked

BLOCKEDI/O 阻塞是不同的,它不是一般意义上的阻塞,而是特指被 synchronized 块阻塞

首先我们要明白java中对象锁的模型,JVM会为一个使用内部锁(synchronized)的对象维护两个集合,Entry Set和Wait Set,也有人翻译为锁池和等待池,意思基本一致。

对于Entry Set:如果线程A已经持有了对象锁,此时如果有其他线程也想获得该对象锁的话,它只能进入Entry Set,并且处于线程的BLOCKED状态。

对于Wait Set:如果线程A调用了wait()方法,那么线程A会释放该对象的锁,进入到Wait Set,并且处于线程的WAITING状态。

还有需要注意的是,某个线程B想要获得对象锁,一般情况下有两个先决条件,一是对象锁已经被释放了(如曾经持有锁的前任线程A执行完了synchronized代码块或者调用了wait()方法等等),二是线程B已处于RUNNABLE状态。

那么这两类集合中的线程都是在什么条件下可以转变为RUNNABLE呢?

对于Entry Set中的线程,当对象锁被释放的时候,JVM会唤醒处于Entry Set中的某一个线程,这个线程的状态就从BLOCKED转变为RUNNABLE。

对于Wait Set中的线程,当对象的notify()方法被调用时,JVM会唤醒处于Wait Set中的某一个线程,这个线程的状态就从WAITING转变为RUNNABLE;或者当notifyAll()方法被调用时,Wait Set中的全部线程会转变为RUNNABLE状态。所有Wait Set中被唤醒的线程会被转移到Entry Set中。

然后,每当对象的锁被释放后,那些所有处于RUNNABLE状态的线程会共同去竞争获取对象的锁,最终会有一个线程(具体哪一个取决于JVM实现,队列里的第一个?随机的一个?)真正获取到对象的锁,而其他竞争失败的线程继续在Entry Set中等待下一次机会。

作者:A_客
链接:https://www.jianshu.com/p/25e243850bd2
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

对上面的理解:

线程A得到锁之后,线程B和线程C就会进入锁对象的enrty set中去,当线程A wait()之后,进入锁对象的wait set中,然后假设线程B获取了锁,然后调用notifyAll(),这时,线程A会被唤醒进入到锁对象的enrty set中去,线程B退出同步代码块的时候,释放锁,这个时候,会唤醒entry set里面的某一个线程,这个线程就竞争到了锁。

javadoc中的注释:

1、A thread in the blocked state is waiting for a monitor lock to enter a synchronized block/method.
2、A thread in the blocked state is waiting for a monitor lock to reenter a synchronized block/method after calling Object.wait.

1、一个处于 blocked 状态的线程正在等待一个监视器锁以进入一个同步的块或方法。
2、一个处于 blocked 状态的线程正在等待一个监视器锁,在其调用 Object.wait 方法之后,以再次进入一个同步的块或方法。

先看第一句:
监视器锁用于同步访问,以达到多线程间的互斥。

所以一旦一个线程获取进入同步块,在其出来之前,如果其它线程想进入,就会因为获取不到而阻塞在同步块之外,这时的状态就是 BLOCKED

注意,这一状态的进入及解除都不受我们控制,当可用时,线程即从阻塞状态中恢复。

再来看第二句:

一个处于 blocked 状态的线程正在等待一个监视器锁,在其调用 Object.wait 方法之后,以再次进入一个同步的块或方法。

这是一个Reenter的过程:

WAITING

javadoc中的注释:

A thread that is waiting indefinitely for another thread to perform a particular action is in this state.
一个正在无限期等待另一个线程执行一个特别的动作的线程处于这一状态。

那么,这个"特别的动作"指的是什么呢?继续看javadoc:

A thread is in the waiting state due to calling one of the following methods:

Object.wait with no timeout
Thread.join with no timeout
LockSupport.park
A thread in the waiting state is waiting for another thread to perform a particular action. For example, a thread that has called Object.wait() on an object is waiting for another thread to call Object.notify() or Object.notifyAll() on that object. A thread that has called Thread.join() is waiting for a specified thread to terminate.

一个线程进入 WAITING 状态是因为调用了以下方法:
1、不带时限的 Object.wait()
2、不带时限的 Thread.join()
3、LockSupport.park()

然后会等其它线程执行一个特别的动作,比如:

1、一个调用了某个对象的 Object.wait()的线程会等待另一个线程调用此对象的 Object.notify()Object.notifyAll()
2、一个调用了 Thread.join()的线程会等待指定的线程结束。

很显然,WAITING 状态所涉及的不是单独一个线程,相反,它涉及多线程,具体地讲,这是多个线程间的一种协作机制(cooperate)。当然,多个线程间也会有竞争(race)

首先,为什么要 wait 呢?简单讲,是因为条件(condition)不满足。那么什么是条件呢?为方便理解,我们设想一个场景:

有一节列车车厢,有很多乘客,每个乘客相当于一个线程;里面有个厕所,这是一个公共资源,且一次只允许一个线程进去访问(毕竟没人希望在上厕所期间还与他人共享~)。

竞争(race)

假如有多个乘客同时上厕所,那么这里首先就会存在竞争的关系。如果将厕所视为一个对象,它有一把,想上厕所乘客线程需要先获取到,然后才能进入厕所

Java中的同步机制,也即是 synchronized 关键字:

synchronized(expression) {……}

它的机制是这样的:对表达式(expresssion)求值(值的类型须是引用类型(reference type)),获取它所代表的对象,然后尝试获取这个对象的

说一下这个阻塞,当一个乘客线程还在厕所期间,也就是它已经获取到了,其它同时想上厕所乘客线程就被阻塞了,处在该对象的 entry set 中,处于 BLOCKED 状态。

完事之后,退出厕所,归还

之后,系统再在 entry set 中挑选一个乘客线程,将给到它。

这个整体流程,如下所示:

条件(condition)

现在,假设有个女乘客线程,她抢到了,进去之后裤子脱了一半,发现纸没了,于是拒绝尿。

那么这时,条件出现了:有纸没纸,这就是某种条件。

那么,现在条件不满足,这位女乘客线程该怎么办呢?如果只是在里面干等,显然是不行的:

所以,当条件不满足时,需要出来,要把还回去,以使得诸如乘务员线程的能进去增加纸张。

等待是必要的吗?

这里所谓“等待”,指的是使线程处于不再活动的状态,即是从调度队列中剔除。说白了就是不调用wait(),进入waiting状态,而是直接归还

如果不等待,只是简单归还,也就是说,用一个反复的循环来判断条件是否满足,不满足,就归还,因为只是简单的归还了,那么还是可以再次回到调度队列,然后还是有机会被选中,继续获取到,但是这个时候条件依然还是有可能不满足。

现在让我们考虑一种比较极端的情况:厕所外一大堆的女乘客线程想进去方便,同时还有一个焦急的乘务员线程想进去增加厕纸。

如果线程都不等待,而厕所又是一个公共资源,无法并发访问。调度器每次挑一个线程进去,挑中乘务员线程的几率降低了。

所以,这会干扰正常工作的线程,挤占了资源,反而影响了自身条件的满足。

另外,乘务员线程可能这段时间根本没有启动,此时,不愿等待的女乘客线程不过是徒劳地进进出出,占用了 CPU 资源却没有办成正事。

协作机制(cooperate)

综上,等待还是有必要的,我们需要一种更高效的机制,也即是 wait/notify协作机制

条件不满足时,应该调用 wait(),这时线程释放,并进入所谓的 wait set中,具体的讲,是进入这个厕所对象的 wait set 中:

这时,线程不再活动,不再参与调度,因此不会浪费 CPU 资源,也不会去竞争了,这时的线程状态即是 WAITING

现在的问题是:她们什么时候才能再次活动呢?显然,最佳的时机是当条件满足的时候。

之后,乘务员线程进去增加厕纸,当然,此时,它也不能只是简单加完厕纸就完了,它还要执行一个特别的动作,也即是通知(notify)在这个对象上等待女乘客线程

大概就是向她们喊一声:“有纸啦!赶紧去尿吧!”

所谓通知,也即是把女乘客线程wait set 中释放出来,重新进入到调度队列(ready queue)中。

整个过程如下图所示:

对于上述过程,我们也给出以下 gif 动图演示:

注意:哪怕只通知了一个等待的线程被通知线程也不能立即恢复执行,因为她当初中断的地方是在同步块内,而此刻她已经不持有,所以她需要再次尝试去获取(很可能面临其它线程竞争),成功后才能在当初调用 wait()之后的地方恢复执行。(这也即是所谓的 reenter after calling Object.wait,在Blocked中曾详细的讨论了这一过程。)
1、如果能获取线程就从 WAITING 状态变成 RUNNABLE 状态;
2、否则,从 wait set 出来,又进入 entry set线程就从 WAITING 状态又变成 BLOCKED 状态。

综上,这是一个协作机制女乘客线程乘务员线程间存在一个协作关系。显然,这种协作关系的存在,女乘客线程可以避免在条件不满足时的盲目尝试,也为乘务员线程的顺利执行腾出了资源;同时,在条件满足时,又能及时得到通知协作关系的存在使得彼此都能受益。

join 的场景
从定义中可知,除了 wait/notify 外,调用 join()也会让线程处于 WAITING 状态。

join 的机制中并没有显式的 wait/notify 的调用,但可以视作是一种特殊的,隐式的 wait/notify 机制。

假如有 ab 两个线程,在 a 线程中执行 b.join(),相当于让 a 去等待 b,此时 a 停止执行,等 b 执行完了,系统内部会隐式地通知 a,使 a 解除等待状态,恢复执行。

换言之,a 等待的条件b 执行完毕b完成后,系统会自动通知 a

总结:
Thread.State.WAITING 状态与传统的 waiting 状态基本类似:

Timed_waiting

javadoc中的注释:

A thread that is waiting for another thread to perform an action for up to a specified waiting time is in this state.
一个正在限时等待另一个线程执行一个动作的线程处于这一状态。

继续看详细的注释:

Thread state for a waiting thread with a specified waiting time. A thread is in the timed waiting state due to calling one of the following methods with a specified positive waiting time:

  • Thread.sleep
  • Object.wait with timeout
  • Thread.join with timeout
  • LockSupport.parkNanos
  • LockSupport.parkUntil

带指定的等待时间的等待线程所处的状态。一个线程处于这一状态是因为用一个指定的正的等待时间(为参数)调用了以下方法中的其一:

  • Thread.sleep
  • 带时限(timeout)的 Object.wait
  • 带时限(timeout)的 Thread.join
  • LockSupport.parkNanos
  • LockSupport.parkUntil

不难看出,TIMED_WAITINGWAITING 间的联系还是很紧密的,主要差异在于:

注意:上面Waiting我们说的没有参数的wait(),等同于wait(0)wait(0) 它不是等0毫秒,恰恰相反,它的意思是永久的等下去,到天荒地老,除非收到通知

场景

还是我们上面的例子,现在设想一种情况:

乘务员线程增加了厕纸,正当它准备执行 notify 时,乘务员线程因某种原因被杀死了(持有的也随之释放)。这种情况下,条件已经满足了,但等待的线程却没有收到通知,还在傻乎乎地等待

简而言之,就是存在通知失效的情况。

如果有个乘务员线程,她考虑得比较周全,她不是调用 wait(),而是调用 wait(1000)

wait(1000)相当于自带倒计时 1000 毫秒的闹钟,换言之,她在同时等待两个通知,并取决于哪个先到:

这样,在通知失效的情况下,她还是有机会自我唤醒的,进而完成尿尿动作。

虚假唤醒(spurious wakeup)

我们举个例子来说明:

女乘客线程A女乘客线程B,来竞争 厕所这个公共资源,而乘务员线程来负责给厕所添加厕纸。

女乘客线程的判断逻辑如下:

  1. synchronization(厕所){
  2. if(厕所没纸){
  3. wait();
  4. }
  5. doing();
  6. }

接下来,
1、女乘客线程A先获取到,然后发现厕所没纸,所以进入到等待队列,处于waiting状态;
2、而女乘客线程B执行的时候,发现乘务员线程已经获取到了,所以进入了阻塞线程,处于blocked状态;
3、乘务员线程执行完毕之后,释放并且调用notify()唤醒女乘客线程A
4、我们知道,女乘客线程A虽然被唤醒了,但是她只是退出了waiting状态,还是需要竞争,才能进入runnable状态,否则进入blocked状态。
5、假设,女乘客线程A竞争失败,没有获取,进入blocked状态。
6、女乘客线程B获取到了,而且这个时候,条件也满足,已经有纸了,所以女乘客线程B顺利执行,并且用完了厕纸(爱干净,没办法),这个时候厕所又没有纸了。
7、女乘客线程A终于获取了,可以继续往下执行了,然而,这个时候其实已经没有厕纸了,所以就出现问题了,这就是虚假唤醒(spurious wakeup)

解决方法很简单,上面的代码修改一下:

  1. synchronization(厕所){
  2. while(厕所没纸){
  3. wait();
  4. }
  5. doing();
  6. }

if改成while,这样,每次被唤醒的时候,都需要判断一下条件是否满足,满足就继续执行,不满足就继续等待,避免虚假唤醒(spurious wakeup)的发生。

虚假唤醒(spurious wakeup)的根本原因,在于被唤醒的线程,不会立马进入runnable状态,而是需要去竞争锁,失败了,则会进入blocked状态

Thread.sleep()

sleep()无关。

sleep(time)的时候,线程处于Timed_waiting,在这个time时间内,此线程不处于调度队列中,不会被调度,直到time时间到了,重新进入调度队列中,不过这个时候,线程是否运行,还是得等到被再次调度

如果sleep()是在同步块里面调用了,和wait()不一样,它并不会释放。也就是说,如果线程调用 sleep 时是带了sleep 期间则还是这个线程所拥有。

另外,sleep(0)wait(0) 是不一样的,sleep 不存在无限等待的情况,sleep(0) 相当于几乎不等待。

join(timeout) 的情景与 wait(timeout) 原理类似,这里不再展开叙述。

总结
在说完了 BLOCKEDWAITINGTIMED_WAITING 后,我们可以综合来看看它们。
显然,BLOCKED 同样可以视作是一种特殊的,隐式的 wait/nofity 机制。等待的条件就是有锁还是没锁
可用时,其中的一个线程会被系统隐式通知,并被赋予,从而获得在同步块中的执行权。等待锁的线程系统同步机制形成了一个协作关系
WAITING 状态属于主动地显式地申请的阻塞BLOCKED 则属于被动的阻塞

Terminated

线程 run()执行完毕或者异常退出,则该线程的生命周期终结,不能再次使用。

停止线程

Java多线程初探——正确停止线程

stop()

在1.0版本的jdk中,提供了一个stop方法来停止线程,但是这个方法现在已经被废弃了,因为使用这个方法停止线程,将会使线程戛然而止,我们甚至不知道程序执行到了哪里,资源是否已经释放,会出现一些不可预料的结果。

  1. public class YieldRunnable implements Runnable{
  2. public volatile boolean isRunning = true;
  3. @Override
  4. public void run() {
  5. String name = Thread.currentThread().getName();
  6. System.out.println(name + "开始执行!");
  7. while(isRunning) {
  8. for(int i = 1; i < 6; i++) {
  9. System.out.println(name + "执行了[" + i + "]次");
  10. //注意,yield是静态方法
  11. Thread.yield();
  12. }
  13. }
  14. System.out.println(name + "执行结束!");
  15. }
  16. }
  17. public static void main(String[] args) {
  18. System.out.println("主线程开始执行!");
  19. YieldRunnable runnable = new YieldRunnable();
  20. Thread thread = new Thread(runnable, "线程1");
  21. thread.start();
  22. try {
  23. Thread.sleep(1000);
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. thread.stop();
  28. System.out.println("主线程执行结束!");
  29. }

执行结果:

  1. 线程1执行了[1]次
  2. 线程1执行了[2]次
  3. 线程1执行了[3]次
  4. 线程1执行了[4]次
  5. 线程1执行了[5]次
  6. 线程1执行了[1]次
  7. 主线程执行结束!

在调用了stop方法之后,线程中的循环语句还没有执行结束线程就戛然而止了,甚至连最后执行结束的输出都没有打印出来,我们甚至不知道程序执行到哪里了,所以stop()方法是不推荐使用的。

标记位

还是上一个例子,修改main方法:

  1. public static void main(String[] args) {
  2. System.out.println("主线程开始执行!");
  3. YieldRunnable runnable = new YieldRunnable();
  4. Thread thread = new Thread(runnable, "线程1");
  5. thread.start();
  6. try {
  7. Thread.sleep(1000);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. runnable.isRunning = false;
  12. System.out.println("主线程执行结束!");
  13. }

执行结果:

  1. 线程1执行了[4]次
  2. 线程1执行了[5]次
  3. 线程1执行了[1]次
  4. 线程1执行了[2]次
  5. 线程1执行了[3]次
  6. 线程1执行了[4]次
  7. 线程1执行了[5]次
  8. 主线程执行结束!
  9. 线程1执行结束!

注意:isRunning时volatile的

interrupt()

在Java中还提供了一个方法,interrupt()方法,在jdk文档中这样描述:中断线程,这个方法真的能将线程停止吗?现在用一个实例尝试一下。

  1. public class YieldRunnable implements Runnable{
  2. @Override
  3. public void run() {
  4. String name = Thread.currentThread().getName();
  5. System.out.println(name + "开始执行!");
  6. while(true) {
  7. for(int i = 1; i < 6; i++) {
  8. System.out.println(name + "执行了[" + i + "]次");
  9. //注意,yield是静态方法
  10. Thread.yield();
  11. }
  12. long time = System.currentTimeMillis();
  13. while(System.currentTimeMillis() - time < 1000) {
  14. /*
  15. * 使用while循环模拟 sleep 方法,这里不要使用sleep,具体原因下面再说
  16. */
  17. }
  18. }
  19. }
  20. }
  21. public static void main(String[] args) {
  22. System.out.println("主线程开始执行!");
  23. YieldRunnable runnable = new YieldRunnable();
  24. Thread thread = new Thread(runnable, "线程1");
  25. thread.start();
  26. try {
  27. Thread.sleep(1000);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. thread.interrupt();
  32. System.out.println("主线程执行结束!");
  33. }

运行之后会发现,虽然执行了interrupt()方法,但是子线程并没有停下来,反而是一直执行下去,由此可以得知,

interrupt()方法单独使用不能停止线程。

我们发现线程有一个状态叫做中断状态,在调用interrupt()之后,线程的中断状态为true,Java提供两种方法获取线程的中断状态:

  1. public static boolean interrupted();
  2. public boolean isInterrupted()

这两个方法的区别:

  1. public class YieldRunnable implements Runnable{
  2. @Override
  3. public void run() {
  4. String name = Thread.currentThread().getName();
  5. System.out.println(name + "开始执行!");
  6. while(!Thread.currentThread().isInterrupted()) {
  7. for(int i = 1; i < 6; i++) {
  8. System.out.println(name + "执行了[" + i + "]次");
  9. //注意,yield是静态方法
  10. Thread.yield();
  11. }
  12. long time = System.currentTimeMillis();
  13. while(System.currentTimeMillis() - time < 1000) {
  14. /*
  15. * 使用while循环模拟 sleep 方法,这里不要使用sleep,具体原因下面再说
  16. */
  17. }
  18. }
  19. System.out.println(name + "执行结束!");
  20. }
  21. }

这样就可以正确停止线程了,要想使线程停止,其实还是判断的线程的中断状态,那么它与标志位的方法有什么区别呢?
在上面的代码中,我们注意到:

  1. while(System.currentTimeMillis() - time < 1000) {
  2. }

这是用来模拟Thread的sleep(long l)方法的,为什么不直接使用sleep方法呢?

当线程调用了interrupt()方法后,如果线程在调用 Object 类的 wait()、wait(long) 或 wait(long, int) 方法,或者该类的 join()、join(long)、join(long,
int)、sleep(long) 或 sleep(long, int) 方法过程中受阻,则其中断状态将被清除,它还将收到一个 InterruptedException。
也就是说线程不仅不会被中断,系统还会抛出一个异常。

我们来试一下:

  1. public class YieldRunnable implements Runnable{
  2. @Override
  3. public void run() {
  4. String name = Thread.currentThread().getName();
  5. System.out.println(name + "开始执行!");
  6. while(!Thread.currentThread().isInterrupted()) {
  7. for(int i = 1; i < 6; i++) {
  8. System.out.println(name + "执行了[" + i + "]次");
  9. //注意,yield是静态方法
  10. Thread.yield();
  11. }
  12. try {
  13. Thread.sleep(1000);
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. System.out.println(name + "执行结束!");
  19. }
  20. }

执行结果:

  1. 主线程开始执行!
  2. 线程1开始执行!
  3. 线程1执行了[1]次
  4. 线程1执行了[2]次
  5. 线程1执行了[3]次
  6. 线程1执行了[4]次
  7. 线程1执行了[5]次
  8. 线程1执行了[1]次java.lang.InterruptedException: sleep interrupted
  9. 主线程执行结束!
  10. 线程1执行了[2]次
  11. 线程1执行了[3]次
  12. ……

不仅线程没有结束,在执行sleep()方法的时候还抛出了一个异常。那么对于这种情形下该如何处理才能使线程正确停止呢?
比较通用的终止线程的形式如下:

  1. @Override
  2. public void run() {
  3. try {
  4. // 1. isInterrupted()保证,只要中断标记为true就终止线程。
  5. while (!isInterrupted()) {
  6. // 执行任务...
  7. }
  8. } catch (InterruptedException ie) {
  9. // 2. InterruptedException异常保证,当InterruptedException异常产生时,线程被终止。
  10. }
  11. }

线程池

尽量使用Exector而不是直接用Thread类进行并发编程。

为什么使用线程池?

线程池ThreadPoolExecutor

Java里面线程池的顶级接口是 Executor,不过真正的线程池接口是 ExecutorServiceExecutorService 的默认实现是 ThreadPoolExecutor;类 Executors 里面调用的就是 ThreadPoolExecutor

  1. public interface Executor {
  2. void execute(Runnable command);
  3. }
  4. public interface ExecutorService extends Executor {
  5. void shutdown();
  6. List<Runnable> shutdownNow();
  7. boolean isShutdown();
  8. boolean isTerminated();
  9. <T> Future<T> submit(Callable<T> task);
  10. <T> Future<T> submit(Runnable task, T result);
  11. Future<?> submit(Runnable task);
  12. ...
  13. }

线程池通过ThreadPoolExector类构造,这个构造函数参数比较多:

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler)

那么,现在,当一个新任务来到的时候,线程池是如何调度的呢?

举例

假设workQueue大小为 4corePoolSize2maximumPoolSize6,那么当加入15个任务时,线程的调度是什么呢?

我们来分析一下:

  • 首先执行任务 1、2,启动核心线程,直接执行。这时,corePoolSize已经满了。
  • 执行任务3~6,corePoolSize已经满了,所以放入队列,这时,workQueue也已经满了。
  • 执行任务7、8、9、10 ,因为corePoolSizeworkQueue都已经满了,所以它们会被马上执行,然后,达到了maximumPoolSize
  • 执行任务 11~15 则会执行默认饱和策略,抛出异常。

最终顺序是:1、2、7、8、9、10、3、4、5、6。

当然这个过程是针对指定大小的ArrayBlockingQueue<Runnable>来说,如果是LinkedBlockingQueue<Runnable>,因为该队列无大小限制,所以不存在上述问题。

总结

当有新的任务要处理时,先看线程池中的线程数量是否大于 corePoolSize,再看缓冲队列 workQueue 是否满,最后看线程池中的线程数量是否大于 maximumPoolSize。另外,当线程池中的线程数量大于 corePoolSize 时,如果里面有线程的空闲时间超过了 keepAliveTime,就将其移除线程池,这样,可以动态地调整线程池中线程的数量。

等待队列

并发编程4:Java 阻塞队列源码分析(上)

咱们上面说的workQueue,对应的就是BlockingQueue接口,JDKBlockingQueue提供了几种实现方式,常用的有:

饱和策略

我们在上面提过:

handler: 当线程池由于线程数目和队列限制而导致后续任务阻塞的时候,线程池的处理方式,也就是饱和策略

什么时候执行饱和策略?我们在上面也说过:

如果工作队列workQueue满了,并且正在运行的线程数量大于或等于 maximumPoolSize,那么就会用最后一个构造参数handler处理;默认的处理方式是直接丢掉任务,然后抛出一个异常。

而这个handler对应的就是RejectedExecutionHandler接口。JDKRejectedExecutionHandler提供了几种实现方式,常用的有:

Executors

Executors是系统提供的工具,提供了四种线程池:

1、newFixedThreadPool

创建固定大小的线程池。它只有核心线程,线程池创建后,线程数量将会固定不变,适合需要线程很稳定的场合。

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }
  • corePoolSize: 传入的线程个数nThreads
  • maximumPoolSize:等于corePoolSize,也就是说所有线程都为核心线程。所以就不存在超时,因为最大线程恒等于核心线程
  • keepAliveTime:无用
  • workQueue:LinkedBlockingQueue,无界队列,所以不会执行拒绝策略
  • handler:无用

2、newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的核心线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }
  • corePoolSize: 1
  • maximumPoolSize:1
  • keepAliveTime:无用
  • workQueue:LinkedBlockingQueue
  • handler:无用

3、newCachedThreadPool
创建一个可缓存线程池。它只有非核心线程线程池用起来的效果是:如果有空闲线程,会复用线程;如果没有空闲线程,会新建线程;如果线程空闲超过1分钟,将会被回收。主要用在吞吐量不固定的情况下,可以自动调节吞吐量。

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }
  • corePoolSize: 0
  • maximumPoolSize:Integer.MAX_VALUE 相当于无限大
  • keepAliveTime:60秒
  • workQueue:SynchronousQueue,移交机制,put一个,take一个
  • handler:无用

CachedThreadPool为何使用的是SynchronousQueue?
https://github.com/huanzhiyazi/articles/blob/master/%E6%8A%80%E6%9C%AF/Java/Executors.newCachedThreadPool%E5%A6%82%E4%BD%95%E5%81%9A%E5%88%B0%E7%BA%BF%E7%A8%8B%E7%BC%93%E5%AD%98%E7%9A%84/Executors.newCachedThreadPool%E5%A6%82%E4%BD%95%E5%81%9A%E5%88%B0%E7%BA%BF%E7%A8%8B%E7%BC%93%E5%AD%98%E7%9A%84.md
https://blog.csdn.net/qq_41634872/article/details/109393730

不能使用size大于0的arrayblockingqueue的原因在于,只来了一个任务,根据线程池运行原理,那个任务会被放入工作队列,如果后续一直没有任务,那么他就会一
直躺尸。
那size==0呢,这时候每个任务过来,阻塞队列为0已经满了,都会新创建一个线程,线程池失去了节省线程创建与销毁的次数的意义。
而SynchronousQueue不同:
1. 任务1来了,调用SynchronousQueue#offer(往queue里放一个element后立即返回,如果碰巧这个element被另一个thread取走了,offer方法返回true,认为offer成功;否则返回false),这个时候没有线程来poll,所以任务1 put失败,直接新起一个线程1来执行任务1
2. 任务2来了,跟任务1情况一样,继续新起一个线程2执行
3. 这时,任务1执行完毕了,就会去队列里面看是否有需要执行的任务,就执行了SynchronousQueue#poll(keepAliveTime),此时keepAliveTime为60s,所以会等待60s
4. 任务3来了,如果是在60s之内,那么就会唤醒线程1,并把任务3交给它来执行;如果是在60s之后,那么任务3就继续新起一个线程3执行,而线程1就会被释放掉。

4、newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。

  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  2. return new ScheduledThreadPoolExecutor(corePoolSize);
  3. }
  4. public ScheduledThreadPoolExecutor(int corePoolSize) {
  5. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  6. new DelayedWorkQueue());
  7. }
  • corePoolSize: corePoolSize
  • maximumPoolSize:Integer.MAX_VALUE
  • keepAliveTime:无用
  • workQueue:DelayedWorkQueue,延时获取元素的无界阻塞队列
  • handler:无用

线程池大小

https://www.hellojava.com/a/87734.html

  • CPU 密集型
    CPU 密集型是指那种包含大量运算、在持有的 CPU 分配的时间片上一直在执行任务、几乎不需要依赖或等待其他任何东西。 这样的任务,一直占有 CPU 进行执行,才是最好的方式。 设置线程数时,针对单台机器,最好就是有几个 CPU ,就创建几个线程,然后每个线程都在执行这种任务,永不停歇。
  • IO 密集型
    我们所接触到的 IO ,大致可以分成两种:磁盘 IO和网络 IO。
    磁盘 IO ,大多都是一些针对磁盘的读写操作,最常见的就是文件的读写,假如你的数据库、 Redis 也是在本地的话,那么这个也属于磁盘 IO。
    网络 IO ,这个应该是大家更加熟悉的,我们会遇到各种网络请求,比如 http 请求、远程数据库读写、远程 Redis 读写等等。
    IO 操作的特点就是需要等待,我们请求一些数据,由对方将数据写入缓冲区,在这段时间中,需要读取数据的线程根本无事可做,因此可以把 CPU 时间片让出去,直到缓冲区写满。

一般来说:

假设机器有N个CPU:
对于计算密集型的任务,应该设置线程数为N+1;
对于IO密集型的任务,应该设置线程数为2N;
对于同时有计算工作和IO工作的任务,应该考虑使用两个线程池,一个处理计算任务,一个处理IO任务,分别对两个线程池按照计算密集型和IO密集型来设置线程数。

N+1和2N是怎么来的?

是个经验值。

经验值吗?那为什么不是N+2或者N+3,而非得是N+1呢?

无语凝噎

那假如在一个请求中,计算操作需要5ms,DB操作需要100ms,对于一台8个CPU的服务器,怎么设置线程数呢?

这是一个计算和IO混合型的任务,可以将其分解为两个线程池来处理。一个线程池处理计算操作,设置N+1=9个线程,一个线程处理IO操作,设置2N=16个线程。

如果一个任务同时包含了一个计算操作和DB操作呢,不能拆分怎么设置?你能讲一下具体的计算过程吗?

首先这个任务整体上是一个IO密集型的任务。在处理一个请求的过程中,总共耗时100+5=105ms,而其中只有5ms是用于计算操作的,CPU利用率为5/(100+5)。使用线程池是为了尽量提高CPU的利用率,减少对CPU资源的浪费,假设以100%的CPU利用率来说,要达到100%的CPU利用率,对于一个CPU就要设置其利用率的倒数个数的线程数,也即1/(5/(100+5)),8个CPU的话就乘以8。那么算下来的话,就是……168,对,这个线程池要设置168个线程数。

如果实际的任务差异较大,不同任务实际的CPU操作耗时和IO操作耗时有所不同,那么怎么设置线程数呢?

那对所有任务的CPU操作耗时和IO操作耗时求个平均值就好了。

那如果现在这个IO操作是DB操作,而DB的QPS上限是1000,这个线程池又该设置为多大呢?

按比例来减少就可以了,按照之前的计算过程,可以计算出来当线程数设置为168的时候,DB操作的QPS为,168*(1000/(100+5))=1600,如果现在DB的QPS最大为1000,那么对应的,最大只能设置168*(1000/1600)=105个线程。

那设置线程池的时候除了考虑这些,还需要考虑哪些内容呢?

除了考虑任务CPU操作耗时、IO操作耗时之外,还需要服务器的内存资源、硬盘资源、网络带宽等等的。

线程清理

工作池中任意一个 worker(工作线程)完成其任务后,会马上向任务队列请求新的任务。在任务队列返回新任务之前,worker 中的闲置线程有一个存活时间——keepAliveTime:

工作池尝试清理过程如下:

关闭线程池

https://segmentfault.com/a/1190000021302247
https://www.cnblogs.com/liuyishi/p/10508596.html

ExecutorService 接口提供了三个方法用于手动关闭线程池,分别是shutdown(),shutdownNow()和awaitTermination()。我们最经常使用的 ThreadPoolExecutor 正是 ExecutorService 的实现类,自然也实现了这些方法。

线程池的状态

ThreadPoolExecutor 使用 runState (运行状态)这个变量对线程池的生命周期进行控制,线程池关闭过程会有频繁的运行状态转化,所以我们首先需要了解线程池的各种运行状态及其之间的转化关系,runState 一共有以下5种取值:

void shutdown()

先来看一下源码的注释:

Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down. This method does not wait for previously submitted tasks to complete execution. Use {@link #awaitTermination awaitTermination} to do that.
开始有序关闭线程池,与此同时,已提交的任务将继续执行,但不再接收新的任务,如果线程池已关闭,此方法调用不会产生额外影响。此方法不会等待已提交的任务执行完毕,要等待已提交任务执行完毕可以使用 awaitTermination() 方法。

刚开始看这段注释的时候真是百思不得其解,前面说已提交的任务会继续执行,后面又说此方法不会等待已提交的任务执行完毕,这不是自相矛盾吗?
原来后半句说此方法不会等任务执行完毕的意思是这个方法不会阻塞,也就是说任务队列的任务会继续执行,但是其他的线程(例如主线程)可能会抢在这些任务之前执行,而awaitTermination()方法可以实现阻塞并等待任务执行完毕,具体使用方法参考链接中的例子。

List shutdownNow()

Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution. These tasks are drained (removed) from the task queue upon return from this method. This method does not wait for actively executing tasks to terminate. Use {@link #awaitTermination awaitTermination} to do that. There are no guarantees beyond best-effort attempts to stop processing actively executing tasks. This implementation cancels tasks via {@link Thread#interrupt}, so any task that fails to respond to interrupts may never terminate.
尝试终止所有正在执行的任务,并停止处理等待队列中的的任务,最后将所有未执行的任务列表的形式返回,此方法会将任务队列中的任务移除并以列表形式返回。此方法不会等待正在执行的任务执行完毕,要等待任务执行完毕可以使用awaitTermination()方法。此方法会尽最大努力终止正在执行的任务,除此之外不做其他保证,因为此方法底层实现是通过 Thread 类的interrupt()方法终止任务的,所以interrupt()未能终止的任务可能无法结束。

它试图终止线程的方法是通过调用Thread.interrupt()方法来实现的,但是大家知道,这种方法的作用有限,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。所以,ShutdownNow()并不代表线程池就一定立即就能退出,它也可能必须要等待所有正在执行的任务都执行完成了才能退出。但是大多数时候是能立即退出的。

boolean awaitTermination(long timeOut, TimeUnit unit)

会阻塞当前线程,接收timeout和TimeUnit两个参数,用于设定超时时间及单位。
s

这个方法并不是用来关闭线程池的,而是调用后等待timeout时间后,反馈线程池的状态,有三种情况:

awaitTermination会等待一段时间并查看线程池是否已经关闭了,若关闭则返回true,否则返回false。一般情况下会和shutdown方法组合使用。

优雅的关闭线程池

  1. service.shutdown();
  2. try {
  3. // 注意会阻塞当前线程,所以最好关闭线程池在子线程里面调用
  4. if(!service.awaitTermination(2, TimeUnit.SECONDS)) {
  5. service.shutdownNow();
  6. }
  7. } catch (InterruptedException e) {
  8. service.shutdownNow();
  9. }

或者,增加对等待队列中任务处理

  1. //第一步,调用shutdown等待在执行的任务和提交等待的任务执行,同时不允许提交任务
  2. service.shutdown();
  3. try {
  4. if(!service.awaitTermination(2, TimeUnit.SECONDS)){
  5. //如果等待一段时间后还有任务在执行中被中断或者有任务提交了未执行
  6. //1.正在执行被中断的任务需要编写任务代码的时候响应中断
  7. List<Runnable> waitToExecuteTaskList = service.shutdownNow();
  8. //2.处理提交了未执行的任务,一般情况不会出现
  9. for(Runnable runnable:waitToExecuteTaskList){
  10. //todo
  11. }
  12. }
  13. }catch (InterruptedException e){//如果被中断了
  14. //1.正在执行被中断的任务需要编写任务代码的时候响应中断
  15. List<Runnable> waitToExecuteTaskList = service.shutdownNow();
  16. //2.处理提交了未执行的任务,一般情况不会出现
  17. for(Runnable runnable:waitToExecuteTaskList){
  18. //todo
  19. }
  20. }

小结

方法 描述 --
shutdown 将线程池状态置为SHUTDOWN,并不会立即停止线程池 --
停止接收外部submit的任务 --
内部正在跑的任务和队列里等待的任务,会执行完 --
等到第二步完成后,才真正停止 --
shutdownNow 将线程池状态置为STOP。尝试立即停止线程池,事实上不一定 --
停止接收外部submit的任务 --
忽略队列里等待的任务 --
尝试将正在跑的任务interrupt中断 --
返回未执行的任务列表 --

shutdown() 和 shutdownNow()的区别:

自己实现线程池

Android程序员会遇到的算法(part 4 消息队列的应用)
如何实现一个线程池

  1. public class ThreadPool{
  2. //用一个Set或者其他数据结构把创建的线程保存起来,为的是方便以后获取线程的handle,做其他操作。
  3. Set<WorkerThread> set = null;
  4. private Queue<Runnable> queue;
  5. //初始化线程池,创建内部类WorkerThread并且启动它
  6. public ThreadPool(int size){
  7. for( int i = 0 ;i < size ;i++ ){
  8. WorkerThread t = new WorkerThread();
  9. t.start();
  10. set.add( t );
  11. }
  12. queue = new LinkedList<Runnable>();
  13. }
  14. //submit一个runnable进线程池
  15. public void submit(Runnable runnable){
  16. synchronized (queue){
  17. queue.add(runnable);
  18. }
  19. }
  20. //WorkerThread用一个死循环不停的去向Runnable队列拿Runnable执行。
  21. public class WorkerThread extends Thread{
  22. @Override
  23. public void run() {
  24. super.run();
  25. while(true){
  26. synchronized (queue){
  27. if( !queue.isEmpty() ){
  28. Runnable current = queue.poll();
  29. current.run();
  30. }
  31. }
  32. }
  33. }
  34. }
  35. }
  36. ThreadPool pool = new ThreadPool(5);
  37. pool.submit(new Runnable(...))

ThreadLocal

ThreadLocal-面试必问深度解析


从上面的结构图,我们已经窥见ThreadLocal的核心机制:

使用例子

  1. public class Test {
  2. ThreadLocal<String> stringLocal = new ThreadLocal<String>();
  3. public void set(String str) {
  4. stringLocal.set(str);
  5. }
  6. public String getString() {
  7. return stringLocal.get();
  8. }
  9. public static void main(String[] args) throws InterruptedException {
  10. final Test test = new Test();
  11. test.set("11111");
  12. System.out.println("父线程 main :");
  13. System.out.println(test.getString());
  14. Thread thread1 = new Thread() {
  15. public void run() {
  16. test.set("22222");
  17. System.out.println("\n子线程 Thread-0 :");
  18. System.out.println(test.getString());
  19. };
  20. };
  21. thread1.start();
  22. }
  23. }
  24. /* Output:
  25. 父线程 main :
  26. 11111
  27. 子线程 Thread-0 :
  28. 22222
  29. *///:~

同一个ThreadLocal对象,但是在两个线程里面存储的值是不一样的。

源码

get()

  1. public T get() {
  2. Thread t = Thread.currentThread();
  3. ThreadLocalMap map = getMap(t);
  4. if (map != null) {
  5. ThreadLocalMap.Entry e = map.getEntry(this);
  6. if (e != null)
  7. return (T)e.value;
  8. }
  9. return setInitialValue();
  10. }
  11. ThreadLocalMap getMap(Thread t) {
  12. return t.threadLocals;
  13. }
  14. private T setInitialValue() {
  15. T value = initialValue();
  16. Thread t = Thread.currentThread();
  17. ThreadLocalMap map = getMap(t);
  18. if (map != null)
  19. map.set(this, value);
  20. else
  21. createMap(t, value);
  22. return value;
  23. }
  24. protected T initialValue() {
  25. return null;
  26. }

set()

  1. public void set(T value) {
  2. Thread t = Thread.currentThread();
  3. ThreadLocalMap map = getMap(t);
  4. if (map != null)
  5. map.set(this, value);
  6. else
  7. createMap(t, value);
  8. }
  9. ThreadLocalMap getMap(Thread t) {
  10. return t.threadLocals;
  11. }
  12. void createMap(Thread t, T firstValue) {
  13. t.threadLocals = new ThreadLocalMap(this, firstValue);
  14. }

remove()

  1. public void remove() {
  2. ThreadLocalMap m = getMap(Thread.currentThread());
  3. if (m != null)
  4. m.remove(this);
  5. }
  6. ThreadLocalMap getMap(Thread t) {
  7. return t.threadLocals;
  8. }

ThreadLocalMap

ThreadLocalMap是ThreadLocal的内部类,没有实现Map接口,用独立的方式实现了Map的功能,其内部的Entry也独立实现。

在ThreadLocalMap中,也是用Entry来保存K-V结构数据的。但是Entry中key只能是ThreadLocal对象

  1. static class Entry extends WeakReference<ThreadLocal> {
  2. /** The value associated with this ThreadLocal. */
  3. Object value;
  4. Entry(ThreadLocal k, Object v) {
  5. super(k);
  6. value = v;
  7. }
  8. }

Entry继承自WeakReference(弱引用,生命周期只能存活到下次GC前),但只有Key是弱引用类型的,Value并非弱引用。

Hash冲突怎么解决

ThreadLocalMap解决Hash冲突的方式就是简单的步长加1或减1,寻找下一个相邻的位置。

  1. private static int nextIndex(int i, int len) {
  2. return ((i + 1 < len) ? i + 1 : 0);
  3. }
  4. private static int prevIndex(int i, int len) {
  5. return ((i - 1 >= 0) ? i - 1 : len - 1);
  6. }

显然ThreadLocalMap采用线性探测的方式解决Hash冲突的效率很低,如果有大量不同的ThreadLocal对象放入map中时发送冲突,或者发生二次冲突,则效率很低。

ThreadLocal内存泄露

ThreadLocal内存泄漏真因探究

ThreadLocal的原理:每个Thread内部维护着一个ThreadLocalMap,它是一个Map。这个映射表的Key是一个弱引用,其实就是ThreadLocal本身,Value是真正存的线程变量Object。注意上图的虚线,它代表一个弱引用类型,而弱引用的生命周期只能存活到下次GC前。

内存泄露是怎么发生的呢?

ThreadLocal在ThreadLocalMap中是以一个弱引用身份被Entry中的Key引用的,因此如果ThreadLocal没有外部强引用来引用它,那么ThreadLocal会在下次JVM垃圾收集时被回收。
这个时候就会出现Entry中Key已经被回收,出现一个null Key的情况,外部读取ThreadLocalMap中的元素是无法通过null Key来找到Value的。
因此如果当前线程的生命周期很长,一直存在,那么其内部的ThreadLocalMap对象也一直生存下来,这些null key就存在一条强引用链的关系一直存在:Thread --> ThreadLocalMap-->Entry-->Value,这条强引用链会导致Entry不会回收,Value也不会回收,但Entry中的Key却已经被回收的情况,造成内存泄漏。

JVM团队已经考虑到这样的情况,并做了一些措施来保证ThreadLocal尽量不会内存泄漏:

在ThreadLocal的get()、set()、remove()方法调用的时候会清除掉线程ThreadLocalMap中所有Entry中Key为null的Value,并将整个Entry设置为null,利于下次内存回收。

我们以get()方法为例来说明:

  1. public T get() {
  2. Thread t = Thread.currentThread();
  3. ThreadLocalMap map = getMap(t);
  4. if (map != null) {
  5. ThreadLocalMap.Entry e = map.getEntry(this);
  6. if (e != null)
  7. return (T)e.value;
  8. }
  9. return setInitialValue();
  10. }

在调用map.getEntry(this)时,内部会判断key是否为null

  1. private Entry getEntry(ThreadLocal key) {
  2. int i = key.threadLocalHashCode & (table.length - 1);
  3. Entry e = table[i];
  4. if (e != null && e.get() == key)
  5. return e;
  6. else
  7. return getEntryAfterMiss(key, i, e);
  8. }
  1. private Entry getEntryAfterMiss(ThreadLocal key, int i, Entry e) {
  2. Entry[] tab = table;
  3. int len = tab.length;
  4. while (e != null) {
  5. ThreadLocal k = e.get();
  6. if (k == key)
  7. return e;
  8. if (k == null)
  9. expungeStaleEntry(i);
  10. else
  11. i = nextIndex(i, len);
  12. e = tab[i];
  13. }
  14. return null;
  15. }

注意k == null这里,继续调用了expungeStaleEntry(i)方法,expunge的意思是擦除,删除的意思,见名知意,在来看expungeStaleEntry方法的内部实现:

  1. private int expungeStaleEntry(int staleSlot) {
  2. Entry[] tab = table;
  3. int len = tab.length;
  4. // expunge entry at staleSlot(意思是,删除value,设置为null便于下次回收)
  5. tab[staleSlot].value = null;
  6. tab[staleSlot] = null;
  7. size--;
  8. // Rehash until we encounter null
  9. Entry e;
  10. int i;
  11. for (i = nextIndex(staleSlot, len);
  12. (e = tab[i]) != null;
  13. i = nextIndex(i, len)) {
  14. ThreadLocal k = e.get();
  15. if (k == null) {
  16. e.value = null;
  17. tab[i] = null;
  18. size--;
  19. } else {
  20. int h = k.threadLocalHashCode & (len - 1);
  21. if (h != i) {
  22. tab[i] = null;
  23. // Unlike Knuth 6.4 Algorithm R, we must scan until
  24. // null because multiple entries could have been stale.
  25. while (tab[h] != null)
  26. h = nextIndex(h, len);
  27. tab[h] = e;
  28. }
  29. }
  30. }
  31. return i;
  32. }

将当前Entry删除后,会继续循环往下检查是否有key为null的节点,如果有则一并删除,防止内存泄漏。

但这样也并不能保证ThreadLocal不会发生内存泄漏:

分配使用了ThreadLocal,又不再调用get()、set()、remove()方法,那么就会导致内存泄漏。

从表面上看,发生内存泄漏,是因为Key使用了弱引用类型。我们来对比一下使用和不使用弱引用的情况:

比较两种情况,我们可以发现:由于ThreadLocalMap的生命周期跟Thread一样长,如果都没有手动删除对应key,都会导致内存泄漏。
但是使用弱引用可以多一层保障:弱引用ThreadLocal不会内存泄漏,对应的value在下一次ThreadLocalMap调用set,get,remove的时候会被清除。

因此,ThreadLocal内存泄漏的根源是:

ThreadLocalMap的生命周期跟Thread一样长,如果没有手动删除对应key的value就会导致内存泄漏,而不是因为弱引用。
所以,每次使用完ThreadLocal,都调用它的remove()方法,清除数据。

AsyncTask

AsyncTask不过是封装了线程池Handler,用线程池来处理后台任务,用Handler来处理与UI的交互。

AsyncTask中的线程池

AsyncTask里面有两个线程池;一个THREAD_POOL_EXECUTOR,一个SERIAL_EXECUTOR。而SERIAL_EXECUTOR也使用THREAD_POOL_EXECUTOR实现的,只不过加了一个队列弄成了串行而已,那么这个THREAD_POOL_EXECUTOR是如何构造的呢?

  1. private static final int CORE_POOL_SIZE = CPU_COUNT + 1;
  2. private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
  3. private static final int KEEP_ALIVE = 1;
  4. private static final BlockingQueue<Runnable> sPoolWorkQueue =
  5. new LinkedBlockingQueue<Runnable>(128);
  6. public static final Executor THREAD_POOL_EXECUTOR
  7. = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
  8. TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);

可以看到,AsyncTask里面线程池是:

那么,就会有以下问题:

AsyncTask中的Handler

  1. private static InternalHandler sHandler;
  2. private static Handler getHandler() {
  3. synchronized (AsyncTask.class) {
  4. if (sHandler == null) {
  5. sHandler = new InternalHandler();
  6. }
  7. return sHandler;
  8. }
  9. }
  10. private static class InternalHandler extends Handler {
  11. public InternalHandler() {
  12. super(Looper.getMainLooper());
  13. }
  14. }

可以看出来,AsyncTask中的Handler用法很简单。
这里面也是有问题的:

AsyncTask并行还是串行

Android 1.6之前,AsyncTask串行执行任务;Android 1.6的时候AsyncTask开始采用线程池来处理并发任务;但是,Android 3.0开始,为了避免AsyncTask所带来的并发错误,AsyncTask有采用了一个线程串行执行任务。当然,我们可以通过AsyncTask的executeOnExecutor()来并发执行任务。

所以,Android 3.0以上,AsyncTask默认并不是并行执行的;

为什么不并行?

由于一个进程内所有的AsyncTask都是使用的同一个线程池执行任务;如果同时有几个AsyncTask一起并行执行的话,恰好AysncTask的使用者在doInbackgroud里面访问了相同的资源,但是自己没有处理同步问题;那么就有可能导致灾难性的后果!

当然,我们可以通过AsyncTask的executeOnExecutor()来并发执行任务。但是,这个方法是Android 3.0之后才有的。

总结

HandlerThread

HandlerThread封装了Looper,和UI线程类似,是一个具有消息循环的线程。

HandlerThread继承了Thread,它是一种可以使用HandlerThread。它的实现也很简单,就是在run()的时候,通过Looper.prepare()来创建消息队列,然后通过Looper.loop()来开启消息循环,代码如下所示:

  1. @Override
  2. public void run() {
  3. mTid = Process.myTid();
  4. Looper.prepare();
  5. synchronized (this) {
  6. mLooper = Looper.myLooper();
  7. notifyAll();
  8. }
  9. Process.setThreadPriority(mPriority);
  10. onLooperPrepared();
  11. Looper.loop();
  12. mTid = -1;
  13. }

基本用法

  1. private Handler mHandlerMain = new Handler();
  2. private Handler mHandler;
  3. private HandlerThread mThread;
  4. mThread = new HandlerThread();
  5. mThread.start();
  6. mHandler = new Handler(mThread.getLooper());
  7. mHandler.post(new Runnable(){
  8. doSomething();//耗时操作
  9. mHandlerMain.sendMessage();
  10. });

总结

IntentService

IntentService是一个Service,内部采用了HandlerThread来执行任务,当任务执行完毕的时候,IntentService会自动退出。它的作用很像一个后台线程,但是它是一个Service,所以不容易被系统杀死,从而可以尽量保证任务的执行。

基本用法

IntentService是一个抽象类,需要实现它的抽象方法onHandleIntent():

  1. public class MyIntentService extends IntentService{
  2. public MyIntentService(){
  3. super("MyIntentService")
  4. }
  5. protected void onHandleIntent(Intent intent){
  6. int tTypeAction = intent.getIntExtra("action_type");
  7. switch(tTypeAction){
  8. case 1:
  9. //doTask1();
  10. Log.d(TAG, "action1");
  11. break;
  12. case 2:
  13. //doTask2();
  14. Log.d(TAG, "action2");
  15. break;
  16. case 3:
  17. //doTask3();
  18. Log.d(TAG, "action3");
  19. break;
  20. default:
  21. Log.d(TAG, "action default");
  22. break;
  23. }
  24. }
  25. public void onDestory(){
  26. Log.d(TAG, "onDestory");
  27. super.onDestory();
  28. }
  29. }

处理任务的IntentService简单实现了,接下来,我们需要发起任务:

  1. Intent tIntent = new Intent(this, MyIntentService.class);
  2. tIntent.putExtra("action_type", 2);
  3. startService(tIntent);
  4. tIntent = new Intent(this, MyIntentService.class);
  5. tIntent.putExtra("action_type", 1);
  6. startService(tIntent);
  7. tIntent = new Intent(this, MyIntentService.class);
  8. tIntent.putExtra("action_type", 3);
  9. startService(tIntent);

运行程序,打印日志如下:

  1. action2
  2. action1
  3. action3
  4. onDestory

由此可见,这三个任务是串行的,action3执行完毕之后,IntentService就停止了。

源码分析

IntentService是一个Service,所以在调用startService()的时候,会首先执行onCreate(),如下所示:

  1. @Override
  2. public void onCreate() {
  3. super.onCreate();
  4. HandlerThread thread = new HandlerThread("IntentService[" + mName + "]");
  5. thread.start();
  6. mServiceLooper = thread.getLooper();
  7. mServiceHandler = new ServiceHandler(mServiceLooper);
  8. }

可以看出来,基本上面HandlerThread 基本用法说的一样。
创建了一个HandlerThread,然后用他的looper生成一个Handler,这样通过mServiceHandler发送的消息,就会在HandlerThread里面执行了。

然后,和Service一样,onCreate()仅仅执行一次,接下来执行onStartCommand(),可以被调用很多次。每一次调用,都会通过mServiceHandler发送一个消息:

  1. Override
  2. public int onStartCommand(Intent intent, int flags, int startId) {
  3. onStart(intent, startId);
  4. return mRedelivery ? START_REDELIVER_INTENT : START_NOT_STICKY;
  5. }
  6. @Override
  7. public void onStart(Intent intent, int startId) {
  8. Message msg = mServiceHandler.obtainMessage();
  9. msg.arg1 = startId;
  10. msg.obj = intent;
  11. mServiceHandler.sendMessage(msg);
  12. }

那么,我们来看一下ServiceHandler是如何处理这些任务的?

  1. private final class ServiceHandler extends Handler {
  2. public ServiceHandler(Looper looper) {
  3. super(looper);
  4. }
  5. @Override
  6. public void handleMessage(Message msg) {
  7. onHandleIntent((Intent)msg.obj);
  8. stopSelf(msg.arg1);
  9. }
  10. }

直接调用了抽象方法onHandleIntent(),这个是需要我们自己实现的,毕竟如何处理任务,只有我们自己知道。
onHandleIntent()执行完毕后,IntentService都会调用stopSelf(int startId),来尝试终止服务。startId类似于一个唯一标识,每发送一次,都会生成一个,因为任务都是串行的,所以只有getLastStartId == startId,才认为任务全部执行完成了,才会去终止这个服务。
stopSelf()会立马终止服务,不管有没有未完成的任务。

总结

内存模型

java的多线程并发问题最终都会反映在java的内存模型上,所谓线程安全无非是要控制多个线程对某个资源的有序访问或修改。

什么是Java内存模型?

Java内存模型(JMM),Java虚拟机的规范,用来屏蔽掉各种硬件和操作系统的内存访问差异,以实现让Java程序在各个平台下都能达到一致的并发效果。

Java内存模型的目标?

JMM定义程序中各个变量的访问规则,即在虚拟机中将变量存储到内存和从内存中取出这样的底层细节。此处的变量包括实例字段、静态字段和构成数组对象的元素,但是不包括局部变量和方法参数,因为这些是线程私有的,不会被共享,所以不存在竞争问题。

主内存与工作内存

注意:线程的工作内存是cpu寄存器和高速缓存的抽象描述,并不是java内存模型里面的虚拟机栈

JMM规定了jvm有主内存(Main Memory)和工作内存(Working Memory) ,主内存存放程序中所有的类实例、静态数据等变量,是多个线程共享的,而工作内存存放的是该线程从主内存中拷贝过来的变量以及访问方法所取得的局部变量,是每个线程私有的其他线程不能访问,每个线程对变量的操作都是以先从主内存将其拷贝到工作内存再对其进行操作的方式进行,多个线程之间不能直接互相传递数据通信,只能通过共享变量来进行。。

内存间的交互操作

一个变量如何从主内存拷贝到工作内存、如何从工作内存同步回主内存,JMM定义了8种操作:

这8中操作都是原子的/不可再分的,JMM还规定了执行上述 8 种基本操作时必须满足如下规则::
1. 不允许read和load、store和write操作之一单独出现。
2. 不允许一个线程丢弃最近的assign操作,变量在工作内存中改变了之后必须把该变化同步回主内存中。
3. 不允许一个线程没有发生过任何assign操作把数据从线程的工作内存同步回主内存中。
4. 一个新的变量只能在主内存中诞生。
5. 一个变量在同一时刻只允许一条线程对其进行lock操作,但可以被同一条线程重复执行多次。
6. 如果对一个变量执行lock操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量前,需要重新执行read、load操作。
7. 如果一个变量事先没有被lock操作锁定,则不允许对它执行unlock操作。
8. 对一个变量执行unlock操作前,必须先把该变量同步回主内存中。

线程要引用某变量,如果线程工作内存中没有该个变量,通过read-load从主内存中拷贝一个副本到工作内存中,完成后线程会引用该副本,当同一个线程再次引用该变量时,有可能重新从主存中获取变量副本(read-load-use),也有可能直接引用原来的副本(use),也就是说read、load、use顺序可以由jvm实现系统决定。
线程要写入某变量,它会将值指定给工作内存中的变量副本(assign),完成后这个变量副本会同步到主存(store-write),至于何时同步过去,即assign,store,write顺序由jvm实现系统决定。

并发编程

现在的计算机,cpu在计算的时候,并不总是从内存读取数据,它的数据读取顺序优先级 是:寄存器-高速缓存-内存。线程耗费的是CPU,线程计算的时候,原始的数据来自内存,在计算过程中,有些数据可能被频繁读取,这些数据被存储在寄存器 和高速缓存中,当线程计算完后,这些缓存的数据在适当的时候应该写回内存。当个多个线程同时读写某个内存数据时,就会产生多线程并发问题

在并发编程中,我们通常会遇到以下三个问题:原子性问题可见性问题有序性问题。我们先看具体看一下这三个概念:

原子性

一个操作或者多个操作 要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行。

一个很经典的例子就是银行账户转账问题:
比如从账户A向账户B转1000元,那么必然包括2个操作:从账户A减去1000元,往账户B加上1000元。
必须要保证这两个步骤是原子操作的。

在Java中,对基本数据类型的变量的读取和赋值操作是原子性操作,即这些操作是不可被中断的,要么执行,要么不执行。

  1. x = 10; //语句1
  2. y = x; //语句2
  3. x++; //语句3
  4. x = x + 1; //语句4

上面四条语句哪些是原子操作,哪些不是呢?
这四条语句只有语句1的操作具备原子性。

1、语句1是直接将数值10赋值给x,也就是说线程执行这个语句的会直接将数值10写入到工作内存中。
2、语句2实际上包含2个操作,它先要去读取x的值,再将x的值写入工作内存,虽然读取x的值以及 将x的值写入工作内存 这2个操作都是原子性操作,但是合起来就不是原子性操作了。
3、x++和 x = x+1包括3个操作:读取x的值,进行加1操作,写入新的值。

也就是说:

只有简单的读取、赋值(而且必须是将数字赋值给某个变量,变量之间的相互赋值不是原子操作)才是原子操作。
Java内存模型只保证了基本读取和赋值是原子性操作,如果要实现更大范围操作的原子性,可以通过synchronized和Lock来实现。由于synchronized和Lock能够保证任一时刻只有一个线程执行该代码块,那么自然就不存在原子性问题了,从而保证了原子性。

可见性

当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。

举个简单的例子,看下面这段代码:

  1. //共享变量
  2. int i = 0;
  3. //线程1执行的代码
  4. i = 10;
  5. //线程2执行的代码
  6. j = i;

效果一:

或者,效果二:

Java内存模型是通过在变量修改后将新值同步回主内存,在变量读取之前从主内存刷新变量值来实现可见性的。但是,无法保证什么时候来同步。

普通的共享变量不能保证可见性,因为普通共享变量被修改之后,什么时候被写入主存是不确定的,当其他线程去读取时,此时内存中可能还是原来的旧值,因此无法保证可见性。
而volatile的特殊规则保证了新值能够立即同步到主内存,每次使用前立即从主内存刷新。

synchronized和final也能实现可见性。final修饰的字段在构造器中一旦被初始化完成,并且构造器没有把this的引用传递出去,那么其他线程中就能看见final字段的值。

有序性

程序执行的顺序按照代码的先后顺序执行

  1. int i = 0;
  2. boolean flag = false;
  3. i = 1; //语句1
  4. flag = true; //语句2

语句1是在语句2前面的,那么JVM在真正执行这段代码的时候会保证语句1一定会在语句2前面执行吗?不一定,为什么呢?这里可能会发生指令重排序(Instruction Reorder)

一般来说,处理器为了提高程序运行效率,可能会对输入代码进行优化,它不保证程序中各个语句的执行先后顺序同代码中的顺序一致,但是它会保证程序最终执行结果和代码顺序执行的结果是一致的。

比如上面的代码中,语句1和语句2谁先执行对最终的程序结果并没有影响,那么就有可能在执行过程中,语句2先执行而语句1后执行。

但是要注意,虽然处理器会对指令进行重排序,但是它会保证程序最终结果会和代码顺序执行结果相同,在来看一个例子:

  1. int a = 10; //语句1
  2. int r = 2; //语句2
  3. a = a + 3; //语句3
  4. r = a*a; //语句4

可能的顺序是:2-1-3-4

那么,有没有可能是2-1-4-3呢?
不可能,因为处理器在进行重排序时是会考虑指令之间的数据依赖性,如果一个指令Instruction 2必须用到Instruction 1的结果,那么处理器会保证Instruction 1会在Instruction 2之前执行。

虽然重排序不会影响单个线程内程序执行的结果,但是多线程呢?下面看一个例子:

  1. //线程1:
  2. context = loadContext(); //语句1
  3. inited = true; //语句2
  4. //线程2:
  5. while(!inited ){
  6. sleep()
  7. }
  8. doSomething(context);

上面代码中,由于语句1和语句2没有数据依赖性,因此可能会被重排序。假如发生了重排序,在线程1执行过程中先执行语句2,而此是线程2会以为初始化工作已经完成,那么就会跳出while循环,去执行doSomething(context)方法,而此时context并没有被初始化,就会导致程序出错。

从上面可以看出,指令重排序不会影响单个线程的执行,但是会影响到线程并发执行的正确性。

Java内存模型具备一些先天的“有序性”,即不需要通过任何手段就能够得到保证的有序性,这个通常也称为 happens-before 原则(先行发生原则)。如果两个操作的执行次序无法从happens-before原则推导出来,那么它们就不能保证它们的有序性,虚拟机可以随意地对它们进行重排序。

程序次序规则

是一段程序代码的执行在单个线程中看起来是有序的。注意,虽然这条规则中提到“书写在前面的操作先行发生于书写在后面的操作”,这个应该是程序看起来执行的顺序是按照代码顺序执行的,因为虚拟机可能会对程序代码进行指令重排序。虽然进行重排序,但是最终执行的结果是与程序顺序执行的结果一致的,它只会对不存在数据依赖性的指令进行重排序。因此,在单个线程中,程序执行看起来是有序执行的,这一点要注意理解。事实上,这个规则是用来保证程序在单线程中执行结果的正确性,但无法保证程序在多线程中执行的正确性。

锁定规则

无论在单线程中还是多线程中,同一个锁如果出于被锁定的状态,那么必须先对锁进行了释放操作,后面才能继续进行lock操作。

volatile变量规则

如果一个线程先去写一个变量,然后一个线程去进行读取,那么写入操作肯定会先行发生于读操作。

  1. private volatile boolean flag;
  2. public void setFlag(boolean flag) {
  3. this.flag = flag;
  4. }
  5. public void getFlag() {
  6. return flag;
  7. }

假设线程A来调用setFlag(true),线程B同时来调用getFlag,对于一般的变量,是无法保证B能读到A设置的值的,因为它们执行的顺序是未知的。但是像上面,加上volatile修饰以后,虚拟机会保证,线程A的写操作在线程B的读操作之前完成,换句话,B能读到最新的值。当然了,用锁机制也能达到同样的效果,比如在方法前面都加上synchronized关键字,但是性能会远不如使用volatile。

synchronized

一个线程执行临界区代码过程如下:

1 获得同步锁
2 清空工作内存
3 从主存拷贝变量副本到工作内存
4 对这些变量计算
5 将变量从工作内存写回到主存
6 释放锁

synchronized是一个独占锁、非公平锁、可重入锁、悲观锁。

重点说下非公平锁

公平锁和非公平锁

参考:公平锁和非公平锁

  • 公平锁就是在获取锁之前会先判断等待队列是否为空或者自己是否位于队列头部,该条件通过才能继续获取锁。
  • 若在释放锁的时候总是没有新的兔子来打扰,则非公平锁等于公平锁;
  • 若释放锁的时候,正好一个兔子来喝水,而此时位于队列头的兔子还没有被唤醒(因为线程上下文切换是需要不少开销的),此时后来的兔子则优先获得锁,成功打破公平,成为非公平锁;
  • 对于非公平锁,只要线程进入了等待队列,队列里面依然是FIFO的原则,跟公平锁的顺序是一样的

综上所述:

队列中的永远都是FIFO的顺序,公平不公平,并不是指的队列中,而是指新来的线程是否可以直接尝试获取锁,如果是公平锁,那么新线程会先进入队列尾部,然后从队列头的线程会尝试获取锁;而非公平锁,新线程会直接尝试获取锁,如果获取到就直接执行,打破公平。

非公平锁性能高于公平锁性能。

上面这个是基于ReentryLock来说的,对于synchronized来说,这个队列其实就是锁对象的entry set,后面会详细说明。

使用

synchronized是Java提供的一个并发控制的关键字。主要有两种用法,分别是同步方法和同步代码块。也就是说,synchronized既可以修饰方法也可以修饰代码块。

  1. public class SynchronizedDemo {
  2. //同步方法
  3. public synchronized void doSth(){
  4. System.out.println("Hello World");
  5. }
  6. //同步代码块
  7. public void doSth1(){
  8. synchronized (SynchronizedDemo.class){
  9. System.out.println("Hello World");
  10. }
  11. }
  12. }

被synchronized修饰的代码块及方法,在同一时间,只能被单个线程访问。

synchronzed的执行过程

一个线程执行临界区代码过程如下:

  • 获得同步锁
  • 清空工作内- 存
    从主存拷贝变量副本到工作内存
  • 对这些变量计算
  • 将变量从工作内存写回到主存
  • 释放锁

可以保证原子性、可见性和有序性。

原理

再有人问你synchronized是什么,就把这篇文章发给他。
我们对使用里面的代码进行Javap反编译,可以得到如下代码:

  1. public synchronized void doSth();
  2. descriptor: ()V
  3. flags: ACC_PUBLIC, ACC_SYNCHRONIZED
  4. Code:
  5. stack=2, locals=1, args_size=1
  6. 0: getstatic #2 // Field java/lang/System.out:Ljava/io/PrintStream;
  7. 3: ldc #3 // String Hello World
  8. 5: invokevirtual #4 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
  9. 8: return
  10. public void doSth1();
  11. descriptor: ()V
  12. flags: ACC_PUBLIC
  13. Code:
  14. stack=2, locals=3, args_size=1
  15. 0: ldc #5 // class com/hollis/SynchronizedTest
  16. 2: dup
  17. 3: astore_1
  18. 4: monitorenter
  19. 5: getstatic #2 // Field java/lang/System.out:Ljava/io/PrintStream;
  20. 8: ldc #3 // String Hello World
  21. 10: invokevirtual #4 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
  22. 13: aload_1
  23. 14: monitorexit
  24. 15: goto 23
  25. 18: astore_2
  26. 19: aload_1
  27. 20: monitorexit
  28. 21: aload_2
  29. 22: athrow
  30. 23: return

我们发现,JVM对于同步方法和同步代码块的处理方式不同:

同步方法

方法级的同步是隐式的。
同步方法的常量池中会有一个ACC_SYNCHRONIZED标志。
当某个线程要访问某个方法的时候,会检查是否有ACC_SYNCHRONIZED,如果有设置,则需要先获得监视器锁,然后开始执行方法,方法执行之后再释放监视器锁;如果获取不到,会因为无法获得监视器锁而被阻断住。
值得注意的是,如果在方法执行过程中,发生了异常,并且方法内部并没有处理该异常,那么在异常被抛到方法外面之前监视器锁会被自动释放。

同步代码块

同步代码块使用monitorenter和monitorexit两个指令实现。
执行monitorenter指令理解为加锁,执行monitorexit理解为释放锁。
每个对象维护着一个记录着被锁次数的计数器。未被锁定的对象的该计数器为0,当一个线程获得锁(执行monitorenter)后,该计数器自增变为 1 ,当同一个线程再次获得该对象的锁的时候,计数器再次自增。
当同一个线程释放锁(执行monitorexit指令)的时候,计数器再自减。
当计数器为0的时候,锁将被释放,其他线程便可以获得锁。

Monitor

Monitor是一种同步机制,这个机制保证了在同一时刻只有一个线程能访问共享资源。

Monitor是基于C++实现的,由ObjectMonitor实现的,其主要数据结构如下:

  1. ObjectMonitor() {
  2. _header = NULL;
  3. _count = 0;
  4. _waiters = 0,
  5. _recursions = 0;
  6. _object = NULL;
  7. _owner = NULL;
  8. _WaitSet = NULL;
  9. _WaitSetLock = 0 ;
  10. _Responsible = NULL ;
  11. _succ = NULL ;
  12. _cxq = NULL ;
  13. FreeNext = NULL ;
  14. _EntryList = NULL ;
  15. _SpinFreq = 0 ;
  16. _SpinClock = 0 ;
  17. OwnerIsThread = 0 ;
  18. }

ObjectMonitor中有几个关键属性:

  1. _owner:指向持有ObjectMonitor对象的线程
  2. _WaitSet:存放处于wait状态的线程队列
  3. _EntryList:存放处于等待锁block状态的线程队列
  4. _recursions:锁的重入次数
  5. _count:用来记录该线程获取锁的次数

当多个线程同时访问一段同步代码时,首先会进入_EntryList队列中,当某个线程获取到对象的monitor后进入_Owner区域并把monitor中的_owner变量设置为当前线程,同时monitor中的计数器_count加1。即获得对象锁。
若持有monitor的线程调用wait()方法,将释放当前持有的monitor,_owner变量恢复为null,_count自减1,同时该线程进入_WaitSet集合中等待被唤醒。若当前线程执行完毕也将释放monitor(锁)并复位变量的值,以便其他线程进入获取monitor(锁)。

锁优化

聊聊并发(二)——Java SE1.6中的Synchronized
深入理解多线程(五)—— Java虚拟机的锁优化技术


这个例子中,每个顾客是一个线程。 柜台前面的那把椅子,就是锁。 柜台后面的柜员,就是共享资源。 你发现无法直接办理业务,要取号等待的过程叫做阻塞。 当你听到叫你的号码的时候,你起身去办业务,这就是唤醒。 当你坐在椅子上开始办理业务的时候,你就获得锁。 当你办完业务离开的时候,你就释放锁。

自旋锁

如果要阻塞或唤醒一个线程就需要操作系统的帮忙,这就要从用户态转换到内核态,因此状态转换需要花费很多的处理器时间。

自旋锁和阻塞锁最大的区别就是,到底要不要放弃处理器的执行时间。对于阻塞锁和自旋锁来说,都是要等待获得共享资源。但是阻塞锁是放弃了CPU时间,进入了等待区,等待被唤醒。而自旋锁是一直“自旋”在那里,时刻的检查共享资源是否可以被访问。

自旋锁只是将当前线程不停地执行循环体,不进行线程状态的改变,所以响应速度更快。但当线程数不停增加时,性能下降明显,因为每个线程都需要执行,占用CPU时间。
如果线程竞争不激烈,并且保持锁的时间段。适合使用自旋锁。

锁消除

  1. public void f() {
  2. Object hollis = new Object();
  3. synchronized(hollis) {
  4. System.out.println(hollis);
  5. }
  6. }
  1. public void f() {
  2. Object hollis = new Object();
  3. System.out.println(hollis);
  4. }

我们经常在代码中使用StringBuffer作为局部变量,而StringBuffer中的append是线程安全的,有synchronized修饰的,这种情况开发者可能会忽略。这时候,JIT就可以帮忙优化,进行锁消除。

锁粗化

  1. for(int i=0;i<100000;i++){
  2. synchronized(this){
  3. do();
  4. }
  1. synchronized(this){
  2. for(int i=0;i<100000;i++){
  3. do();
  4. }

Java SE1.6为了减少获得锁和释放锁所带来的性能消耗,引入了“偏向锁”和“轻量级锁”,所以在Java SE1.6里锁一共有四种状态,无锁状态,偏向锁状态,轻量级锁状态和重量级锁状态,它会随着竞争情况逐渐升级。锁可以升级但不能降级,意味着偏向锁升级成轻量级锁后不能降级成偏向锁。这种锁升级却不能降级的策略,目的是为了提高获得锁和释放锁的效率

偏向锁

研究发现大多数情况下锁不仅不存在多线程竞争,而且总是由同一线程多次获得,为了让线程获得锁的代价更低而引入了偏向锁。

它会偏向于第一个访问锁的线程,如果在运行过程中,同步锁只有一个线程访问,不存在多线程争用的情况,则线程是不需要触发同步的,这种情况下,就会给线程加一个偏向锁。
如果在运行过程中,遇到了其他线程抢占锁,则持有偏向锁的线程会被挂起,JVM会消除它身上的偏向锁,将锁升级到轻量级锁。

获取过程
  1. 访问Mark Word中偏向锁的标识是否设置成1,锁标志位是否为01,确认为可偏向状态。
  2. 如果为可偏向状态,则测试线程ID是否指向当前线程,如果是,进入步骤5,否则进入步骤3。
  3. 如果线程ID并未指向当前线程,则通过CAS操作竞争锁。如果竞争成功,则将Mark Word中线程ID设置为当前线程ID,然后执行5;如果竞争失败,执行4。
  4. 如果CAS获取偏向锁失败,则表示有竞争。当到达全局安全点(safepoint)时获得偏向锁的线程被挂起,偏向锁升级为轻量级锁,然后被阻塞在安全点的线程继续往下执行同步代码。(撤销偏向锁的时候会导致stop the word)
  5. 执行同步代码。

偏向锁的释放

偏向锁的撤销在上述第四步骤中有提到。偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动去释放偏向锁。偏向锁的撤销,需要等待全局安全点(在这个时间点上没有字节码正在执行),它会首先暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态,撤销偏向锁后恢复到未锁定(标志位为“01”)或轻量级锁(标志位为“00”)的状态。

轻量级锁

轻量级锁原理非常简单,如果持有锁的线程能在很短时间内释放锁资源,那么那些等待竞争锁的线程就不需要做内核态和用户态之间的切换进入阻塞挂起状态,它们只需要等一等(自旋),等持有锁的线程释放锁后即可立即获取锁,这样就避免用户线程和内核的切换的消耗。

优缺点

volatile

volatile关键字的两层语义

一旦一个共享变量(类的成员变量、类的静态成员变量)被volatile修饰之后,那么就具备了两层语义:

  1. 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。
  2. 禁止进行指令重排序。

那么经过volatile修饰之后:

线程写volatile变量过程:

  • 改变线程工作内存中volatile变量副本的值;
  • 将改变后的副本的值从工作内存刷新到主内存。

线程读volatile变量过程:

  • 从主内存中读取volatile变量的最新值到工作内存中;
  • 从工作内存中读取volatile变量副本。
  1. //共享变量
  2. boolean stop = false;
  3. //线程1
  4. while(!stop){
  5. doSomething();
  6. }
  7. //线程2
  8. stop = true;

这段代码是很典型的一段代码,很多人在中断线程时可能都会采用这种标记办法。但是事实上,这段代码会完全运行正确么?即一定会将线程中断么?不一定,也许在大多数时候,这个代码能够把线程中断,但是也有可能会导致无法中断线程。

但是用volatile修饰之后就变得不一样了:
1. 线程2在写的时候,会强制将修改的值立即写入主存;
2. 线程1在读的时候,会去主存读取,然后使用。

volatile保证原子性吗?

  1. public class Test {
  2. public volatile int i = 0;
  3. public void increase() {
  4. i++;
  5. }
  6. }

多个线程调用increase(),结果会正确吗?

我们上面说过,i++是三个步骤:读取i的值,进行加1操作,写入新的值

假设某个时刻i=10:
1. 线程1读取i的值,因为是volatile,所以会从主存中读取到10,然后线程1挂起
2. 线程2执行自增操作,并且执行完成了,因为volatile会立即将i=11刷新进主存中
3. 线程1继续执行加1操作,因为之前已经从主存中都去过了,所以直接+1,i=11,并写入主存中

自增操作不是原子性操作,而且volatile也无法保证对变量的任何操作都是原子性的。

要实现原子性,可以:
使用synchronized:

  1. public class Test {
  2. public int i = 0;
  3. public synchronized void increase() {
  4. i++;
  5. }
  6. }

使用Lock:

  1. public class Test {
  2. public int i = 0;
  3. Lock lock = new ReentrantLock();
  4. public void increase() {
  5. lock.lock();
  6. try {
  7. i++;
  8. } finally{
  9. lock.unlock();
  10. }
  11. }
  12. }

使用AtomicInteger:

  1. public class Test {
  2. public AtomicInteger i = new AtomicInteger();
  3. public void increase() {
  4. i.getAndIncrement();
  5. }
  6. }

volatile能保证有序性吗?

在前面提到volatile关键字能禁止指令重排序,所以volatile能在一定程度上保证有序性。
volatile关键字禁止指令重排序有两层意思:

举个例子说明一下:

  1. //x、y为非volatile变量
  2. //flag为volatile变量
  3. x = 2; //语句1
  4. y = 0; //语句2
  5. flag = true; //语句3
  6. x = 4; //语句4
  7. y = -1; //语句5

flag变量为volatile变量,那么在进行指令重排序的过程的时候,不会将语句3放到语句1、语句2前面,也不会讲语句3放到语句4、语句5后面。但是要注意语句1和语句2的顺序、语句4和语句5的顺序是不作任何保证的。
并且volatile关键字能保证,执行到语句3时,语句1和语句2必定是执行完毕了的,且语句1和语句2的执行结果对语句3、语句4、语句5是可见的。

那么我们再说下前面举的一个例子:

  1. //线程1:
  2. context = loadContext(); //语句1
  3. inited = true; //语句2
  4. //线程2:
  5. while(!inited ){
  6. sleep()
  7. }
  8. doSomething(context);

前面说过,语句2会在语句1之前执行,那么久可能导致context还没被初始化,而线程2中就使用未初始化的context去进行操作,导致程序出错。

这里可以用volatile关键字对inited变量进行修饰,就不会出现这种问题了,因为当执行到语句2时,必定能保证context已经初始化完毕。

volatile的原理和实现机制

下面这段话摘自《深入理解Java虚拟机》:

观察加入volatile关键字和没有加入volatile关键字时所生成的汇编代码发现,加入volatile关键字时,会多出一个lock前缀指令

lock前缀指令实际上相当于一个内存屏障(也成内存栅栏),内存屏障会提供3个功能:

使用volatile关键字的场景

通常来说,使用volatile必须具备以下2个条件:

单例模式为什么需要volatile?

  1. class Singleton {
  2. private volatile static Singleton instance;
  3. public static Singleton getInstance() {
  4. if (instance == null) {
  5. synchronized (Singleton.class) {
  6. if (instance == null)
  7. instance = new Singleton();
  8. }
  9. }
  10. return instance;
  11. }
  12. public static void main(String[] args) {
  13. Singleton.getInstance();
  14. }
  15. }

我们知道,实例化一个对象其实可以分为三个步骤:
1. 分配内存空间。
2. 初始化对象。
3. 将内存空间的地址赋值给对应的引用。

所以,new Singleton是分为三个步骤的:
1. 给instance指向的对象分配内存,并设置初始值为null(根据JVM类加载机制的原理,对于静态变量这一步应该在new Singleton之前就已经完成了)。
2. 执行构造函数真正初始化instance
3. 将instance指向对象分配内存空间(分配内存空间之后instance就是非null了)

经过重排序,步骤2, 3之间的顺序是可以颠倒的,如果线程1在执行步骤3之后并没有执行步骤2,但是被线程2抢占了,线程2得到的instance是非null,但是instance却还没有初始化,如果使用instance,就会导致不可预料的问题。而使用volatile则可以保证程序的有序性。

引用自知乎:

这里volatile的作用仅仅是阻止指令重排序,不涉及可见性问题,可见性已经由synchronized来保证了。
为什么要用到volatile的指令重排序功能呢?
初始化对象的指令可能会被重排成:  
(1)分配内存空间。  
(2)将内存空间的地址赋值给对应的引用。  
(3)初始化对象
问题的关键在于,如果A线程正好初始化到了第(2)步的时候,正好有其它线程B来获取这个对象,
那线程B能不能看到这个由A初始化但是还没初始化完毕的对象呢?答案是可能会看到这个未完全初始化的对象, 因为这里初始化的是一个共享变量。
为什么初始化共享变量会带来问题呢?
如果共享变量的引用已经和构造器的调用内联了, 即使构造器未完成初始化, 共享变量也可能会立即被更新,也就是说其它线程可能会看到一个未经完全初始化的对象。
所以,这里使用的是volatile禁止重排的功能。

Lock

Java 并发:Lock 框架详解

synchronized已经很优秀了,为什么还会有Lock呢?

我们考虑以下三个场景:

Lock接口

  1. public interface Lock {
  2. void lock();
  3. void lockInterruptibly() throws InterruptedException;
  4. boolean tryLock();
  5. boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  6. void unlock();
  7. Condition newCondition();
  8. }
  1. lock.lock();
  2. try{
  3. //处理任务
  4. }catch(Exception ex){
  5. }finally{
  6. lock.unlock(); //释放锁
  7. }
  1. if(lock.tryLock()) {
  2. try{
  3. //处理任务
  4. }catch(Exception ex){
  5. }finally{
  6. lock.unlock(); //释放锁
  7. }
  8. }else {
  9. //如果不能获取锁,则直接做其他事情
  10. }
  1. public void method() throws InterruptedException {
  2. if(lock.tryLock(4, TimeUnit.SECONDS)) {
  3. try{
  4. //处理任务
  5. }catch(Exception ex){
  6. }finally{
  7. lock.unlock(); //释放锁
  8. }
  9. }else {
  10. //如果不能获取锁,则直接做其他事情
  11. }
  12. }
  1. public void method() throws InterruptedException {
  2. lock.lockInterruptibly();
  3. try {
  4. //.....
  5. }
  6. finally {
  7. lock.unlock();
  8. }
  9. }

ReentrantLock

ReentrantLock,即可重入锁。ReentrantLock是唯一实现了Lock接口的类

ReadWriteLock

读写锁,ReadWriteLock也是一个接口,在它里面只定义了两个方法:

  1. public interface ReadWriteLock {
  2. /**
  3. * Returns the lock used for reading.
  4. *
  5. * @return the lock used for reading
  6. */
  7. Lock readLock();
  8. /**
  9. * Returns the lock used for writing.
  10. *
  11. * @return the lock used for writing
  12. */
  13. Lock writeLock();
  14. }

一个用来获取读锁,一个用来获取写锁。也就是说,将对临界资源的读写操作分成两个锁来分配给线程,从而使得多个线程可以同时进行读操作。

ReentrantReadWriteLock

ReentrantReadWriteLock 实现了 ReadWriteLock 接口( 注意,ReentrantReadWriteLock 并没有实现 Lock 接口 ),其包含两个很重要的方法:readLock() 和 writeLock() 分别用来获取读锁和写锁,并且这两个锁实现了Lock接口。

举个例子,假如有多个线程要同时进行读操作
先看一下synchronized达到的效果

  1. public class Test {
  2. public static void main(String[] args) {
  3. final Test test = new Test();
  4. new Thread("A"){
  5. public void run() {
  6. test.get(Thread.currentThread());
  7. };
  8. }.start();
  9. new Thread("B"){
  10. public void run() {
  11. test.get(Thread.currentThread());
  12. };
  13. }.start();
  14. }
  15. public synchronized void get(Thread thread) {
  16. long start = System.currentTimeMillis();
  17. System.out.println("线程"+ thread.getName()+"开始读操作...");
  18. while(System.currentTimeMillis() - start <= 1) {
  19. System.out.println("线程"+ thread.getName()+"正在进行读操作...");
  20. }
  21. System.out.println("线程"+ thread.getName()+"读操作完毕...");
  22. }
  23. }
  24. /* Output:
  25. 线程A开始读操作...
  26. 线程A正在进行读操作...
  27. ...
  28. 线程A正在进行读操作...
  29. 线程A读操作完毕...
  30. 线程B开始读操作...
  31. 线程B正在进行读操作...
  32. ...
  33. 线程B正在进行读操作...
  34. 线程B读操作完毕...
  35. *///:~

可以看出来,直到线程A执行完读操作之后,才会打印线程B执行读操作的信息。
再看一下ReentrantReadWriteLock的效果:

  1. public class Test {
  2. private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
  3. public static void main(String[] args) {
  4. final Test test = new Test();
  5. new Thread("A") {
  6. public void run() {
  7. test.get(Thread.currentThread());
  8. };
  9. }.start();
  10. new Thread("B") {
  11. public void run() {
  12. test.get(Thread.currentThread());
  13. };
  14. }.start();
  15. }
  16. public void get(Thread thread) {
  17. rwl.readLock().lock(); // 在外面获取锁
  18. try {
  19. long start = System.currentTimeMillis();
  20. System.out.println("线程" + thread.getName() + "开始读操作...");
  21. while (System.currentTimeMillis() - start <= 1) {
  22. System.out.println("线程" + thread.getName() + "正在进行读操作...");
  23. }
  24. System.out.println("线程" + thread.getName() + "读操作完毕...");
  25. } finally {
  26. rwl.readLock().unlock();
  27. }
  28. }
  29. }
  30. /* Output:
  31. 线程A开始读操作...
  32. 线程B开始读操作...
  33. 线程A正在进行读操作...
  34. 线程A正在进行读操作...
  35. 线程B正在进行读操作...
  36. ...
  37. 线程A读操作完毕...
  38. 线程B读操作完毕...
  39. *///:~

我们可以看到,线程A和线程B在同时进行读操作,这样就大大提升了读操作的效率。

不过要注意的是,如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。
如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程也会一直等待释放写锁。

synchronized 和lock的区别

在性能上来说,如果竞争资源不激烈,两者的性能是差不多的。而当竞争资源非常激烈时(即有大量线程同时竞争),此时Lock的性能要远远优于synchronized。所以说,在具体使用时要根据适当情况选择。

CAS & AQS

CAS

CAS 是 compare and swap 的简写,即比较并交换。它是原子操作函数。CAS 在 Java 的原子类和并发包中有大量使用。

它是指一种操作机制,而不是某个具体的类或方法。在 Java 平台上对这种操作进行了包装。在 Unsafe 类中,调用代码如下:

  1. unsafe.compareAndSwapInt(this, valueOffset, expect, update);

CAS 是基于乐观锁的原理进行操作的。它总是认为自己可以成功完成操作。当多个线程同时使用 CAS 操作一个变量时,只有一个会胜出,并成功更新,其余均会失败。失败的线程不会被挂起,仅是被告知失败,并且允许再次尝试或放弃操作。

CAS 就是通过这种方式实现比较和交换操作的原子性的。值得注意的是, CAS 只是保证了操作的原子性,并不保证变量的可见性,因此变量需要加上 volatile 关键字。

缺点

ABA

线程1 从内存位置 V 中取出 A,这时候另一个线程2 也从内存中取出 A,并且线程2 进行了一些操作变成了 B,然后 线程2 又将 V 位置的数据变成 A,这时候线程1 进行 CAS 操作发现内存中仍然是 A,然后 线程1 操作成功。尽管线程 线程1 的 CAS 操作成功,但是不代表这个过程就是没有问题的。

部分乐观锁的实现是通过版本号(version)的方式来解决 ABA 问题,乐观锁每次在执行数据的修改操作时,都会带上一个版本号,一旦版本号和数据的版本号一致就可以执行修改操作并对版本号执行+1 操作,否则就执行失败。因为每次操作的版本号都会随之增加,所以不会出现 ABA 问题,因为版本号只会增加不会减少。

AQS

AQS即AbstractQueuedSynchronizer (抽象的队列式的同步器),AQS 定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch。

AQS 维护了一个 volatile int state(代表共享资源)和一个 FIFO 线程等待队列(多线程争用资源被阻塞时会进入此队列)。

AQS 中有两个重要的成员:

请求锁时有三种可能:

拾遗

拾遗一

生产者消费者

  1. /**
  2. * 生产者和消费者,wait()和notify()的实现
  3. * @author ZGJ
  4. * @date 2017年6月22日
  5. */
  6. public class Test1 {
  7. private static Integer count = 0;
  8. private static final Integer FULL = 10;
  9. private static String LOCK = "lock";
  10. public static void main(String[] args) {
  11. Test1 test1 = new Test1();
  12. new Thread(test1.new Producer()).start();
  13. new Thread(test1.new Consumer()).start();
  14. new Thread(test1.new Producer()).start();
  15. new Thread(test1.new Consumer()).start();
  16. new Thread(test1.new Producer()).start();
  17. new Thread(test1.new Consumer()).start();
  18. new Thread(test1.new Producer()).start();
  19. new Thread(test1.new Consumer()).start();
  20. }
  21. class Producer implements Runnable {
  22. @Override
  23. public void run() {
  24. for (int i = 0; i < 10; i++) {
  25. try {
  26. Thread.sleep(3000);
  27. } catch (Exception e) {
  28. e.printStackTrace();
  29. }
  30. synchronized (LOCK) {
  31. while (count == FULL) {
  32. try {
  33. LOCK.wait();
  34. } catch (Exception e) {
  35. e.printStackTrace();
  36. }
  37. }
  38. count++;
  39. System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count);
  40. LOCK.notifyAll();
  41. }
  42. }
  43. }
  44. }
  45. class Consumer implements Runnable {
  46. @Override
  47. public void run() {
  48. for (int i = 0; i < 10; i++) {
  49. try {
  50. Thread.sleep(3000);
  51. } catch (InterruptedException e) {
  52. e.printStackTrace();
  53. }
  54. synchronized (LOCK) {
  55. while (count == 0) {
  56. try {
  57. LOCK.wait();
  58. } catch (Exception e) {
  59. }
  60. }
  61. count--;
  62. System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有" + count);
  63. LOCK.notifyAll();
  64. }
  65. }
  66. }
  67. }
  68. }

拾遗一

wait()方法外面为什么是while循环而不是if判断

我们上面说的虚假唤醒,这个是其中一个原因;还有一个,就是生产者可以唤醒生产者,消费者可以唤醒消费者。
如果只有一个生产者线程,一个消费者线程,那其实是可以用if代替while的,因为线程调度的行为是开发者可以预测的,生产者线程只有可能被消费者线程唤醒,反之亦然,因此被唤醒时条件始终满足,程序不会出错。但是这种情况只是多线程情况下极为简单的一种,更普遍的是多个线程生产,多个线程消费,那么就极有可能出现唤醒生产者的是另一个生产者或者唤醒消费者的是另一个消费者,这样的情况下用if就必然会现类似过度生产或者过度消费的情况了,典型如IndexOutOfBoundsException的异常。所以所有的java书籍都会建议开发者永远都要把wait()放到循环语句里面。

拾遗二

生产者消费者中为什么要用notifyAll()方法,用notify()行吗

耐心看下面这个两个生产者两个消费者的场景,如果我们代码中使用了notify()而非notifyAll(),假设消费者线程1拿到了锁,判断buffer为空,那么wait(),释放锁;然后消费者2拿到了锁,同样buffer为空,wait(),也就是说此时Wait Set中有两个线程;然后生产者1拿到锁,生产,buffer满,notify()了,那么可能消费者1被唤醒了,但是此时还有另一个线程生产者2在Entry Set中盼望着锁,并且最终抢占到了锁,但因为此时buffer是满的,因此它要wait();然后消费者1拿到了锁,消费,notify();这时就有问题了,此时生产者2和消费者2都在Wait Set中,buffer为空,如果唤醒生产者2,没毛病;但如果唤醒了消费者2,因为buffer为空,它会再次wait(),这就尴尬了,万一生产者1已经退出不再生产了,没有其他线程在竞争锁了,只有生产者2和消费者2在Wait Set中互相等待,那传说中的死锁就发生了。

拾遗三

如何实现生产者只通知消费者,消费者只通知生产者呢?

可以使用ReentrantLock和Condition

  1. import java.util.LinkedList;
  2. import java.util.Queue;
  3. import java.util.Random;
  4. import java.util.concurrent.locks.Condition;
  5. import java.util.concurrent.locks.Lock;
  6. import java.util.concurrent.locks.ReentrantLock;
  7. /**
  8. * Java Program to demonstrate how to use Lock and Condition variable in Java by
  9. * solving Producer consumer problem. Locks are more flexible way to provide
  10. * mutual exclusion and synchronization in Java, a powerful alternative of
  11. * synchronized keyword.
  12. *
  13. * @author Javin Paul
  14. */
  15. public class ProducerConsumerSolutionUsingLock {
  16. public static void main(String[] args) {
  17. // Object on which producer and consumer thread will operate
  18. ProducerConsumerImpl sharedObject = new ProducerConsumerImpl();
  19. // creating producer and consumer threads
  20. Producer p = new Producer(sharedObject);
  21. Consumer c = new Consumer(sharedObject);
  22. // starting producer and consumer threads
  23. p.start();
  24. c.start();
  25. }
  26. }
  27. class ProducerConsumerImpl {
  28. // producer consumer problem data
  29. private static final int CAPACITY = 10;
  30. private final Queue queue = new LinkedList<>();
  31. private final Random theRandom = new Random();
  32. // lock and condition variables
  33. private final Lock aLock = new ReentrantLock();
  34. private final Condition bufferNotFull = aLock.newCondition();
  35. private final Condition bufferNotEmpty = aLock.newCondition();
  36. public void put() throws InterruptedException {
  37. aLock.lock();
  38. try {
  39. while (queue.size() == CAPACITY) {
  40. System.out.println(Thread.currentThread().getName()
  41. + " : Buffer is full, waiting");
  42. bufferNotEmpty.await();
  43. }
  44. int number = theRandom.nextInt();
  45. boolean isAdded = queue.offer(number);
  46. if (isAdded) {
  47. System.out.printf("%s added %d into queue %n", Thread
  48. .currentThread().getName(), number);
  49. // signal consumer thread that, buffer has element now
  50. System.out.println(Thread.currentThread().getName()
  51. + " : Signalling that buffer is no more empty now");
  52. bufferNotFull.signalAll();
  53. }
  54. } finally {
  55. aLock.unlock();
  56. }
  57. }
  58. public void get() throws InterruptedException {
  59. aLock.lock();
  60. try {
  61. while (queue.size() == 0) {
  62. System.out.println(Thread.currentThread().getName()
  63. + " : Buffer is empty, waiting");
  64. bufferNotFull.await();
  65. }
  66. Integer value = queue.poll();
  67. if (value != null) {
  68. System.out.printf("%s consumed %d from queue %n", Thread
  69. .currentThread().getName(), value);
  70. // signal producer thread that, buffer may be empty now
  71. System.out.println(Thread.currentThread().getName()
  72. + " : Signalling that buffer may be empty now");
  73. bufferNotEmpty.signalAll();
  74. }
  75. } finally {
  76. aLock.unlock();
  77. }
  78. }
  79. }
  80. class Producer extends Thread {
  81. ProducerConsumerImpl pc;
  82. public Producer(ProducerConsumerImpl sharedObject) {
  83. super("PRODUCER");
  84. this.pc = sharedObject;
  85. }
  86. @Override
  87. public void run() {
  88. try {
  89. pc.put();
  90. } catch (InterruptedException e) {
  91. e.printStackTrace();
  92. }
  93. }
  94. }
  95. class Consumer extends Thread {
  96. ProducerConsumerImpl pc;
  97. public Consumer(ProducerConsumerImpl sharedObject) {
  98. super("CONSUMER");
  99. this.pc = sharedObject;
  100. }
  101. @Override
  102. public void run() {
  103. try {
  104. pc.get();
  105. } catch (InterruptedException e) {
  106. // TODO Auto-generated catch block
  107. e.printStackTrace();
  108. }
  109. }
  110. }
  111. 输出:
  112. CONSUMER : Buffer is empty, waiting
  113. PRODUCER added 279133501 into queue
  114. PRODUCER : Signalling that buffer is no more empty now
  115. CONSUMER consumed 279133501 from queue
  116. CONSUMER : Signalling that buffer may be empty now

我们可以理解为,例如,一个ReentrantLock有ConditionA和ConditionB,调用ConditionA的await(),释放锁,然后保存到ConditionA的锁队列去,调用ConditionA的signalAll(),会通知ConditionA的锁队列中的所有线程,而不会通知ConditionB锁队列中的线程,这样这些线程就可以和等待队列中线程一起抢夺锁。

  1. 线程1调用reentrantLock.lock时,线程被加入到AQS的等待队列中。
  2. 线程1调用await方法被调用时,该线程从AQS中移除,对应操作是锁的释放。
  3. 接着马上被加入到Condition的等待队列中,等待着该线程需要signal信号。
  4. 线程2,因为线程1释放锁的关系,被唤醒,并判断可以获取锁,于是线程2获取锁,并被加入到AQS的等待队列中。
  5. 线程2调用signal方法,这个时候Condition的等待队列中只有线程1一个节点,于是它被取出来,并被加入到AQS的等待队列中。 注意,这个时候,线程1 并没有被唤醒。
  6. signal方法执行完毕,线程2调用reentrantLock.unLock()方法,释放锁。这个时候因为AQS中只有线程1,于是,AQS释放锁后按从头到尾的顺序唤醒线程时,线程1被唤醒,于是线程1回复执行。
  7. 直到释放所整个过程执行完毕。

拾遗四

自己实现一个简易的线程池
Android程序员会遇到的算法(part 4 消息队列的应用)

  1. public class ThreadPool{
  2. //用一个Set或者其他数据结构把创建的线程保存起来,为的是方便以后获取线程的handle,做其他操作。
  3. Set<WorkerThread> set = null;
  4. private Queue<Runnable> queue;
  5. //初始化线程池,创建内部类WorkerThread并且启动它
  6. public ThreadPool(int size){
  7. for( int i = 0 ;i < size ;i++ ){
  8. WorkerThread t = new WorkerThread();
  9. t.start();
  10. set.add( t );
  11. }
  12. queue = new LinkedList<Runnable>();
  13. }
  14. //submit一个runnable进线程池
  15. public void submit(Runnable runnable){
  16. synchronized (queue){
  17. queue.add(runnable);
  18. }
  19. }
  20. //WorkerThread用一个死循环不停的去向Runnable队列拿Runnable执行。
  21. public class WorkerThread extends Thread{
  22. @Override
  23. public void run() {
  24. super.run();
  25. while(true){
  26. synchronized (queue){
  27. if( !queue.isEmpty() ){
  28. Runnable current = queue.poll();
  29. current.run();
  30. }
  31. }
  32. }
  33. }
  34. }
  35. }
  36. ThreadPool pool = new ThreadPool(5);
  37. pool.submit(new Runnable(...))

拾遗五

死锁

  1. public class LockTest {
  2. public static String obj1 = "obj1";
  3. public static String obj2 = "obj2";
  4. public static void main(String[] args) {
  5. LockA la = new LockA();
  6. new Thread(la).start();
  7. LockB lb = new LockB();
  8. new Thread(lb).start();
  9. }
  10. }
  11. class LockA implements Runnable{
  12. public void run() {
  13. try {
  14. System.out.println(new Date().toString() + " LockA 开始执行");
  15. while(true){
  16. synchronized (LockTest.obj1) {
  17. System.out.println(new Date().toString() + " LockA 锁住 obj1");
  18. Thread.sleep(3000); // 此处等待是给B能锁住机会
  19. synchronized (LockTest.obj2) {
  20. System.out.println(new Date().toString() + " LockA 锁住 obj2");
  21. Thread.sleep(60 * 1000); // 为测试,占用了就不放
  22. }
  23. }
  24. }
  25. } catch (Exception e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. }
  30. class LockB implements Runnable{
  31. public void run() {
  32. try {
  33. System.out.println(new Date().toString() + " LockB 开始执行");
  34. while(true){
  35. synchronized (LockTest.obj2) {
  36. System.out.println(new Date().toString() + " LockB 锁住 obj2");
  37. Thread.sleep(3000); // 此处等待是给A能锁住机会
  38. synchronized (LockTest.obj1) {
  39. System.out.println(new Date().toString() + " LockB 锁住 obj1");
  40. Thread.sleep(60 * 1000); // 为测试,占用了就不放
  41. }
  42. }
  43. }
  44. } catch (Exception e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. }

参考

Java实现线程的三种方式
Java多线程编程总结
Java 线程状态
Java 线程状态之 RUNNABLE
你真的了解AsyncTask?
分析Java线程池的创建
谈谈 Java 线程状态相关的几个方法
Java 虚拟机面试题全面解析
Java并发编程:volatile关键字解析
深入理解Java内存模型(六)——final
java线程安全总结
【Java线程】Java内存模型总结
你真的懂wait、notify和notifyAll吗
Java多线程(九)之ReentrantLock与Condition
如何优雅的使用线程池
深入浅出CAS
https://www.cnblogs.com/weechang/p/12545271.html
https://blog.csdn.net/weixin_44552215/article/details/108650823
https://blog.csdn.net/q1105441883/article/details/118693603

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注