[关闭]
@adamhand 2018-12-22T11:22:02.000000Z 字数 8410 阅读 978

Java并发--阻塞队列


什么是阻塞队列?

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

阻塞队列提供了四种处理方法:

方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用

Java中的阻塞队列

JDK7提供了7个阻塞队列。分别是:

队列 有界性 数据结构
ArrayBlockingQueue 有界 加锁 数组
LinkedBlockingQueue 有界 加锁 单链表
PriorityBlockingQueue 无界 加锁
DelayQueue 无界 加锁
SynchronousQueue 有界 无锁(CAS) -
LinkedTransferQueue 无界 无锁(CAS) 单链表
LinkedBlockingDeque 无界 加锁 双链表
  1. public int compareTo(Delayed other) {
  2. if (other == this) // compare zero ONLY if same object
  3. return 0;
  4. if (other instanceof ScheduledFutureTask) {
  5. ScheduledFutureTask x = (ScheduledFutureTask)other;
  6. long diff = time - x.time;
  7. if (diff < 0)
  8. return -1;
  9. else if (diff > 0)
  10. return 1;
  11. else if (sequenceNumber < x.sequenceNumber)
  12. return -1;
  13. else
  14. return 1;
  15. }
  16. long d = (getDelay(TimeUnit.NANOSECONDS) -
  17. other.getDelay(TimeUnit.NANOSECONDS));
  18. return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
  19. }
  1. private static final int NOW = 0; // for untimed poll, tryTransfer
  2. private static final int ASYNC = 1; // for offer, put, add
  3. private static final int SYNC = 2; // for transfer, take
  4. private static final int TIMED = 3; // for timed poll, tryTransfer

① NOW :在取数据的时候,如果没有数据,则直接返回,无需阻塞等待。
② ASYNC:入队的操作都不会阻塞,也就是说,入队后线程会立即返回,不需要等到消费者线程来取数据。
③ SYNC :取数据的时候,如果没有数据,则会进行阻塞等待。
④ TIMED : 取数据的时候,如果没有数据,则会进行超时阻塞等待。

阻塞队列的实现原理

由上面的分析可以看到,阻塞队列有两种:加锁不加锁

加锁的队列是使用ReentrantLock的Condition的await()和signal()方法来实现生产者和消费者之间通信的,当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。而await()方法调用的是LockSupport.park()方法,这个park方法调用的又是调用的unsafe.park()方法实现队列的阻塞的。

不加锁的队列使用的是CAS算法+LockSupport.park()/unpark()方法来实现的。

下面以ArrayBlockingQueue为例看一下。通过查看JDK源码发现ArrayBlockingQueue使用了Condition来实现,代码如下:

  1. private final Condition notFull;
  2. private final Condition notEmpty;
  3. public ArrayBlockingQueue(int capacity, boolean fair) {
  4. //省略其他代码
  5. notEmpty = lock.newCondition();
  6. notFull = lock.newCondition();
  7. }
  8. public void put(E e) throws InterruptedException {
  9. checkNotNull(e);
  10. final ReentrantLock lock = this.lock;
  11. lock.lockInterruptibly();
  12. try {
  13. while (count == items.length)
  14. notFull.await();
  15. insert(e);
  16. } finally {
  17. lock.unlock();
  18. }
  19. }
  20. public E take() throws InterruptedException {
  21. final ReentrantLock lock = this.lock;
  22. lock.lockInterruptibly();
  23. try {
  24. while (count == 0)
  25. notEmpty.await();
  26. return extract();
  27. } finally {
  28. lock.unlock();
  29. }
  30. }
  31. private void insert(E x) {
  32. items[putIndex] = x;
  33. putIndex = inc(putIndex);
  34. ++count;
  35. notEmpty.signal();
  36. }

当往队列里插入一个元素时,如果队列不可用,阻塞生产者主要通过LockSupport.park(this);来实现:

  1. public final void await() throws InterruptedException {
  2. if (Thread.interrupted())
  3. throw new InterruptedException();
  4. Node node = addConditionWaiter();
  5. int savedState = fullyRelease(node);
  6. int interruptMode = 0;
  7. while (!isOnSyncQueue(node)) {
  8. LockSupport.park(this);
  9. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  10. break;
  11. }
  12. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  13. interruptMode = REINTERRUPT;
  14. if (node.nextWaiter != null) // clean up if cancelled
  15. unlinkCancelledWaiters();
  16. if (interruptMode != 0)
  17. reportInterruptAfterWait(interruptMode);
  18. }

继续进入源码,发现调用setBlocker先保存下将要阻塞的线程,然后调用unsafe.park阻塞当前线程。

  1. public static void park(Object blocker) {
  2. Thread t = Thread.currentThread();
  3. setBlocker(t, blocker);
  4. unsafe.park(false, 0L);
  5. setBlocker(t, null);
  6. }

unsafe.park是个native方法,这个方法会阻塞当前线程,只有以下四种情况中的一种发生时,该方法才会返回。

继续看一下JVM是如何实现park方法的,park在不同的操作系统使用不同的方式实现,在linux下是使用的是系统方法pthread_cond_wait实现。实现代码在JVM源码路径src/os/linux/vm/os_linux.cpp里的 os::PlatformEvent::park方法,代码如下:

  1. void os::PlatformEvent::park() {
  2. int v ;
  3. for (;;) {
  4. v = _Event ;
  5. if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
  6. }
  7. guarantee (v >= 0, "invariant") ;
  8. if (v == 0) {
  9. // Do this the hard way by blocking ...
  10. int status = pthread_mutex_lock(_mutex);
  11. assert_status(status == 0, status, "mutex_lock");
  12. guarantee (_nParked == 0, "invariant") ;
  13. ++ _nParked ;
  14. while (_Event < 0) {
  15. status = pthread_cond_wait(_cond, _mutex);
  16. // for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
  17. // Treat this the same as if the wait was interrupted
  18. if (status == ETIME) { status = EINTR; }
  19. assert_status(status == 0 || status == EINTR, status, "cond_wait");
  20. }
  21. -- _nParked ;
  22. // In theory we could move the ST of 0 into _Event past the unlock(),
  23. // but then we'd need a MEMBAR after the ST.
  24. _Event = 0 ;
  25. status = pthread_mutex_unlock(_mutex);
  26. assert_status(status == 0, status, "mutex_unlock");
  27. }
  28. guarantee (_Event >= 0, "invariant") ;
  29. }
  30. }

pthread_cond_wait是一个多线程的条件变量函数,cond是condition的缩写,字面意思可以理解为线程在等待一个条件发生,这个条件是一个全局变量。这个方法接收两个参数,一个共享变量_cond,一个互斥量_mutex。而unpark方法在linux下是使用pthread_cond_signal实现的。park 在windows下则是使用WaitForSingleObject实现的。

补充:使用DelayQueue实现本地的延迟队列

DelayQueue能做什么?

在我们的业务中通常会有一些需求是这样的:

那么这类业务我们可以总结出一个特点:需要延迟工作。由此的情况,就是我们的DelayQueue应用需求的产生。

应用举例

我们在网咖或者网吧上网时会用到一个网吧综合系统,其中有一个主要功能就是给每一位网民计时,用户充值一定金额会有相应的上网时常,这里我们用DelayQueue模拟实现一下:用DelayQueue存储网民(Wangmin类),每一个考生都有自己的名字和完成试卷的时间,Wangba线程对DelayQueue进行监控,从队列中取出到时间的网民执行下机操作。

实现了Delayed接口的网民类,并实现CompareTo()方法

  1. public class Wangmin implements Delayed {
  2. private String name;
  3. //身份证
  4. private String id;
  5. //截止时间
  6. private long endTime;
  7. //定义时间工具类
  8. private TimeUnit timeUnit = TimeUnit.SECONDS;
  9. public Wangmin(String name,String id,long endTime){
  10. this.name=name;
  11. this.id=id;
  12. this.endTime = endTime;
  13. }
  14. public String getName(){
  15. return this.name;
  16. }
  17. public String getId(){
  18. return this.id;
  19. }
  20. /**
  21. * 用来判断是否到了截止时间
  22. */
  23. @Override
  24. public long getDelay(TimeUnit unit) {
  25. //return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
  26. return endTime - System.currentTimeMillis();
  27. }
  28. /**
  29. * 相互批较排序用
  30. */
  31. @Override
  32. public int compareTo(Delayed delayed) {
  33. Wangmin w = (Wangmin)delayed;
  34. return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0;
  35. }
  36. }

实现网吧类

  1. public class WangBa implements Runnable {
  2. private DelayQueue<Wangmin> queue = new DelayQueue<Wangmin>();
  3. public boolean yingye =true;
  4. /**
  5. * 上机
  6. */
  7. public void shangji(String name,String id,int money){
  8. Wangmin man = new Wangmin(name, id, 1000 * money + System.currentTimeMillis());
  9. System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"交钱"+money+"块,开始上机...");
  10. this.queue.add(man);
  11. }
  12. // 下机
  13. public void xiaji(Wangmin man){
  14. System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"时间到下机...");
  15. }
  16. @Override
  17. public void run() {
  18. while(yingye){
  19. try {
  20. Wangmin man = queue.take();
  21. xiaji(man);
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. }
  27. public static void main(String args[]){
  28. try{
  29. System.out.println("网吧开始营业");
  30. WangBa siyu = new WangBa();
  31. Thread shangwang = new Thread(siyu);
  32. shangwang.start();
  33. siyu.shangji("路人甲", "123", 1);
  34. siyu.shangji("路人乙", "234", 10);
  35. siyu.shangji("路人丙", "345", 5);
  36. }
  37. catch(Exception e){
  38. e.printStackTrace();
  39. }
  40. }
  41. }

参考

聊聊并发(七)——Java中的阻塞队列
Java 并发 --- 阻塞队列总结
Java并发编程-阻塞队列(BlockingQueue)的实现原理
java并发之SynchronousQueue实现原理
使用delayedQueue实现你本地的延迟队列
DelayedQueue学习笔记

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注