@adamhand
2018-12-22T11:22:02.000000Z
字数 8410
阅读 996
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
阻塞队列提供了四种处理方法:
方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
IllegalStateException(“Queue full”)
异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException
异常 。true
。移除方法,则是从队列里拿出一个元素,如果没有则返回null
。put
元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take
元素,队列也会阻塞消费者线程,直到队列可用。JDK7提供了7个阻塞队列。分别是:
队列 | 有界性 | 锁 | 数据结构 |
---|---|---|---|
ArrayBlockingQueue | 有界 | 加锁 | 数组 |
LinkedBlockingQueue | 有界 | 加锁 | 单链表 |
PriorityBlockingQueue | 无界 | 加锁 | 堆 |
DelayQueue | 无界 | 加锁 | 堆 |
SynchronousQueue | 有界 | 无锁(CAS) | - |
LinkedTransferQueue | 无界 | 无锁(CAS) | 单链表 |
LinkedBlockingDeque | 无界 | 加锁 | 双链表 |
ReentrantLock
默认实现非公平锁,但是可以使用一个带参的构造函数实现公平锁。
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask x = (ScheduledFutureTask)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer
① NOW :在取数据的时候,如果没有数据,则直接返回,无需阻塞等待。
② ASYNC:入队的操作都不会阻塞,也就是说,入队后线程会立即返回,不需要等到消费者线程来取数据。
③ SYNC :取数据的时候,如果没有数据,则会进行阻塞等待。
④ TIMED : 取数据的时候,如果没有数据,则会进行超时阻塞等待。
由上面的分析可以看到,阻塞队列有两种:加锁和不加锁。
加锁的队列是使用ReentrantLock的Condition的await()和signal()方法来实现生产者和消费者之间通信的,当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。而await()方法调用的是LockSupport.park()方法,这个park方法调用的又是调用的unsafe.park()方法实现队列的阻塞的。
不加锁的队列使用的是CAS算法+LockSupport.park()/unpark()方法来实现的。
下面以ArrayBlockingQueue为例看一下。通过查看JDK源码发现ArrayBlockingQueue使用了Condition来实现,代码如下:
private final Condition notFull;
private final Condition notEmpty;
public ArrayBlockingQueue(int capacity, boolean fair) {
//省略其他代码
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
当往队列里插入一个元素时,如果队列不可用,阻塞生产者主要通过LockSupport.park(this);来实现:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
继续进入源码,发现调用setBlocker先保存下将要阻塞的线程,然后调用unsafe.park阻塞当前线程。
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
unsafe.park(false, 0L);
setBlocker(t, null);
}
unsafe.park是个native方法,这个方法会阻塞当前线程,只有以下四种情况中的一种发生时,该方法才会返回。
继续看一下JVM是如何实现park方法的,park在不同的操作系统使用不同的方式实现,在linux下是使用的是系统方法pthread_cond_wait实现。实现代码在JVM源码路径src/os/linux/vm/os_linux.cpp里的 os::PlatformEvent::park方法,代码如下:
void os::PlatformEvent::park() {
int v ;
for (;;) {
v = _Event ;
if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
}
guarantee (v >= 0, "invariant") ;
if (v == 0) {
// Do this the hard way by blocking ...
int status = pthread_mutex_lock(_mutex);
assert_status(status == 0, status, "mutex_lock");
guarantee (_nParked == 0, "invariant") ;
++ _nParked ;
while (_Event < 0) {
status = pthread_cond_wait(_cond, _mutex);
// for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
// Treat this the same as if the wait was interrupted
if (status == ETIME) { status = EINTR; }
assert_status(status == 0 || status == EINTR, status, "cond_wait");
}
-- _nParked ;
// In theory we could move the ST of 0 into _Event past the unlock(),
// but then we'd need a MEMBAR after the ST.
_Event = 0 ;
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "mutex_unlock");
}
guarantee (_Event >= 0, "invariant") ;
}
}
pthread_cond_wait是一个多线程的条件变量函数,cond是condition的缩写,字面意思可以理解为线程在等待一个条件发生,这个条件是一个全局变量。这个方法接收两个参数,一个共享变量_cond,一个互斥量_mutex。而unpark方法在linux下是使用pthread_cond_signal实现的。park 在windows下则是使用WaitForSingleObject实现的。
在我们的业务中通常会有一些需求是这样的:
那么这类业务我们可以总结出一个特点:需要延迟工作。由此的情况,就是我们的DelayQueue应用需求的产生。
我们在网咖或者网吧上网时会用到一个网吧综合系统,其中有一个主要功能就是给每一位网民计时,用户充值一定金额会有相应的上网时常,这里我们用DelayQueue模拟实现一下:用DelayQueue存储网民(Wangmin类),每一个考生都有自己的名字和完成试卷的时间,Wangba线程对DelayQueue进行监控,从队列中取出到时间的网民执行下机操作。
public class Wangmin implements Delayed {
private String name;
//身份证
private String id;
//截止时间
private long endTime;
//定义时间工具类
private TimeUnit timeUnit = TimeUnit.SECONDS;
public Wangmin(String name,String id,long endTime){
this.name=name;
this.id=id;
this.endTime = endTime;
}
public String getName(){
return this.name;
}
public String getId(){
return this.id;
}
/**
* 用来判断是否到了截止时间
*/
@Override
public long getDelay(TimeUnit unit) {
//return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return endTime - System.currentTimeMillis();
}
/**
* 相互批较排序用
*/
@Override
public int compareTo(Delayed delayed) {
Wangmin w = (Wangmin)delayed;
return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0;
}
}
public class WangBa implements Runnable {
private DelayQueue<Wangmin> queue = new DelayQueue<Wangmin>();
public boolean yingye =true;
/**
* 上机
*/
public void shangji(String name,String id,int money){
Wangmin man = new Wangmin(name, id, 1000 * money + System.currentTimeMillis());
System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"交钱"+money+"块,开始上机...");
this.queue.add(man);
}
// 下机
public void xiaji(Wangmin man){
System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"时间到下机...");
}
@Override
public void run() {
while(yingye){
try {
Wangmin man = queue.take();
xiaji(man);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String args[]){
try{
System.out.println("网吧开始营业");
WangBa siyu = new WangBa();
Thread shangwang = new Thread(siyu);
shangwang.start();
siyu.shangji("路人甲", "123", 1);
siyu.shangji("路人乙", "234", 10);
siyu.shangji("路人丙", "345", 5);
}
catch(Exception e){
e.printStackTrace();
}
}
}
聊聊并发(七)——Java中的阻塞队列
Java 并发 --- 阻塞队列总结
Java并发编程-阻塞队列(BlockingQueue)的实现原理
java并发之SynchronousQueue实现原理
使用delayedQueue实现你本地的延迟队列
DelayedQueue学习笔记