方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用



队列 有界性 数据结构
ArrayBlockingQueue 有界 加锁 数组
LinkedBlockingQueue 有界 加锁 单链表
PriorityBlockingQueue 无界 加锁
DelayQueue 无界 加锁
SynchronousQueue 有界 无锁(CAS) -
LinkedTransferQueue 无界 无锁(CAS) 单链表
LinkedBlockingDeque 无界 加锁 双链表
  1. public int compareTo(Delayed other) {
  2. if (other == this) // compare zero ONLY if same object
  3. return 0;
  4. if (other instanceof ScheduledFutureTask) {
  5. ScheduledFutureTask x = (ScheduledFutureTask)other;
  6. long diff = time - x.time;
  7. if (diff < 0)
  8. return -1;
  9. else if (diff > 0)
  10. return 1;
  11. else if (sequenceNumber < x.sequenceNumber)
  12. return -1;
  13. else
  14. return 1;
  15. }
  16. long d = (getDelay(TimeUnit.NANOSECONDS) -
  17. other.getDelay(TimeUnit.NANOSECONDS));
  18. return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
  19. }
  1. private static final int NOW = 0; // for untimed poll, tryTransfer
  2. private static final int ASYNC = 1; // for offer, put, add
  3. private static final int SYNC = 2; // for transfer, take
  4. private static final int TIMED = 3; // for timed poll, tryTransfer

① NOW :在取数据的时候,如果没有数据,则直接返回,无需阻塞等待。
② ASYNC:入队的操作都不会阻塞,也就是说,入队后线程会立即返回,不需要等到消费者线程来取数据。
③ SYNC :取数据的时候,如果没有数据,则会进行阻塞等待。
④ TIMED : 取数据的时候,如果没有数据,则会进行超时阻塞等待。






  1. private final Condition notFull;
  2. private final Condition notEmpty;
  3. public ArrayBlockingQueue(int capacity, boolean fair) {
  4. //省略其他代码
  5. notEmpty = lock.newCondition();
  6. notFull = lock.newCondition();
  7. }
  8. public void put(E e) throws InterruptedException {
  9. checkNotNull(e);
  10. final ReentrantLock lock = this.lock;
  11. lock.lockInterruptibly();
  12. try {
  13. while (count == items.length)
  14. notFull.await();
  15. insert(e);
  16. } finally {
  17. lock.unlock();
  18. }
  19. }
  20. public E take() throws InterruptedException {
  21. final ReentrantLock lock = this.lock;
  22. lock.lockInterruptibly();
  23. try {
  24. while (count == 0)
  25. notEmpty.await();
  26. return extract();
  27. } finally {
  28. lock.unlock();
  29. }
  30. }
  31. private void insert(E x) {
  32. items[putIndex] = x;
  33. putIndex = inc(putIndex);
  34. ++count;
  35. notEmpty.signal();
  36. }


  1. public final void await() throws InterruptedException {
  2. if (Thread.interrupted())
  3. throw new InterruptedException();
  4. Node node = addConditionWaiter();
  5. int savedState = fullyRelease(node);
  6. int interruptMode = 0;
  7. while (!isOnSyncQueue(node)) {
  8. LockSupport.park(this);
  9. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  10. break;
  11. }
  12. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  13. interruptMode = REINTERRUPT;
  14. if (node.nextWaiter != null) // clean up if cancelled
  15. unlinkCancelledWaiters();
  16. if (interruptMode != 0)
  17. reportInterruptAfterWait(interruptMode);
  18. }


  1. public static void park(Object blocker) {
  2. Thread t = Thread.currentThread();
  3. setBlocker(t, blocker);
  4. unsafe.park(false, 0L);
  5. setBlocker(t, null);
  6. }


继续看一下JVM是如何实现park方法的,park在不同的操作系统使用不同的方式实现,在linux下是使用的是系统方法pthread_cond_wait实现。实现代码在JVM源码路径src/os/linux/vm/os_linux.cpp里的 os::PlatformEvent::park方法,代码如下:

  1. void os::PlatformEvent::park() {
  2. int v ;
  3. for (;;) {
  4. v = _Event ;
  5. if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
  6. }
  7. guarantee (v >= 0, "invariant") ;
  8. if (v == 0) {
  9. // Do this the hard way by blocking ...
  10. int status = pthread_mutex_lock(_mutex);
  11. assert_status(status == 0, status, "mutex_lock");
  12. guarantee (_nParked == 0, "invariant") ;
  13. ++ _nParked ;
  14. while (_Event < 0) {
  15. status = pthread_cond_wait(_cond, _mutex);
  16. // for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
  17. // Treat this the same as if the wait was interrupted
  18. if (status == ETIME) { status = EINTR; }
  19. assert_status(status == 0 || status == EINTR, status, "cond_wait");
  20. }
  21. -- _nParked ;
  22. // In theory we could move the ST of 0 into _Event past the unlock(),
  23. // but then we'd need a MEMBAR after the ST.
  24. _Event = 0 ;
  25. status = pthread_mutex_unlock(_mutex);
  26. assert_status(status == 0, status, "mutex_unlock");
  27. }
  28. guarantee (_Event >= 0, "invariant") ;
  29. }
  30. }

pthread_cond_wait是一个多线程的条件变量函数,cond是condition的缩写,字面意思可以理解为线程在等待一个条件发生,这个条件是一个全局变量。这个方法接收两个参数,一个共享变量_cond,一个互斥量_mutex。而unpark方法在linux下是使用pthread_cond_signal实现的。park 在windows下则是使用WaitForSingleObject实现的。








  1. public class Wangmin implements Delayed {
  2. private String name;
  3. //身份证
  4. private String id;
  5. //截止时间
  6. private long endTime;
  7. //定义时间工具类
  8. private TimeUnit timeUnit = TimeUnit.SECONDS;
  9. public Wangmin(String name,String id,long endTime){
  10. this.name=name;
  11. this.id=id;
  12. this.endTime = endTime;
  13. }
  14. public String getName(){
  15. return this.name;
  16. }
  17. public String getId(){
  18. return this.id;
  19. }
  20. /**
  21. * 用来判断是否到了截止时间
  22. */
  23. @Override
  24. public long getDelay(TimeUnit unit) {
  25. //return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
  26. return endTime - System.currentTimeMillis();
  27. }
  28. /**
  29. * 相互批较排序用
  30. */
  31. @Override
  32. public int compareTo(Delayed delayed) {
  33. Wangmin w = (Wangmin)delayed;
  34. return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0;
  35. }
  36. }


  1. public class WangBa implements Runnable {
  2. private DelayQueue<Wangmin> queue = new DelayQueue<Wangmin>();
  3. public boolean yingye =true;
  4. /**
  5. * 上机
  6. */
  7. public void shangji(String name,String id,int money){
  8. Wangmin man = new Wangmin(name, id, 1000 * money + System.currentTimeMillis());
  9. System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"交钱"+money+"块,开始上机...");
  10. this.queue.add(man);
  11. }
  12. // 下机
  13. public void xiaji(Wangmin man){
  14. System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"时间到下机...");
  15. }
  16. @Override
  17. public void run() {
  18. while(yingye){
  19. try {
  20. Wangmin man = queue.take();
  21. xiaji(man);
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. }
  27. public static void main(String args[]){
  28. try{
  29. System.out.println("网吧开始营业");
  30. WangBa siyu = new WangBa();
  31. Thread shangwang = new Thread(siyu);
  32. shangwang.start();
  33. siyu.shangji("路人甲", "123", 1);
  34. siyu.shangji("路人乙", "234", 10);
  35. siyu.shangji("路人丙", "345", 5);
  36. }
  37. catch(Exception e){
  38. e.printStackTrace();
  39. }
  40. }
  41. }


