[关闭]
@Catyee 2017-06-14T08:53:28.000000Z 字数 8932 阅读 465

阻塞队列 DelayQueue

java 多线程 阻塞队列 DelayQUeue


阻塞队列

阻塞队列(BlockingQueue)是那些支持持阻塞的插入和移除的队列。
1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) ofer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用

·抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException("Queuefull")异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。
·返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里取出一个元素,如果没有则返回null。
·一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。
·超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。

注意:如果是无界阻塞队列,队列不可能会出现满的情况,所以使用put或offer方法永远不会被阻塞,而且使用offer方法时,该方法永远返回true。

JDK 7中的阻塞队列:

DelayQUeue

DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。DelayQueue非常有用,可以将DelayQueue运用在以下应用场景:

Delayed一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。此接口的实现必须定义一个 compareTo方法,该方法提供与此接口的getDelay方法一致的排序。

使用DelayQueue

一、实现Delayed接口:
DelayQueue队列的元素必须实现Delayed接口。我们可以参考ScheduledThreadPoolExecutor里ScheduledFutureTask类的实现,首先,在对象创建的时候,初始化基本数据。使用time记录当前对象延迟到什么时候可以使用,使用sequenceNumber来标识元素在队列中的先后顺序。代码如下:

  1. private static final AtomicLong sequencer = new AtomicLong(0);
  2. Scheduled FutureTask(Runnable r, V result, long ns, long period) {
  3. Scheduled FutureTask(Runnable r, V result, long ns, long period) {
  4. super(r, result);
  5. this.time = ns;
  6. this.period = period;
  7. this.sequence Number = sequencer.getAndIncrement();
  8. }

第二步:实现getDelay方法,该方法返回当前元素还需要延时多长时间,单位是纳秒,代码如下。

  1. public long getDelay(TimeUnit unit) {
  2. return unit.convert(time - now(), Time Unit.NANOSECONDS);
  3. }

通过构造函数可以看出延迟时间参数ns的单位是纳秒,自己设计的时候最好使用纳秒,因为实现getDelay()方法时可以指定任意单位,一旦以秒或分作为单位,而延时时间又精确不到纳秒就麻烦了。使用时请注意当time小于当前时间时,getDelay会返回负数。
第三步:实现compareTo方法来指定元素的顺序。例如,让延时时间最长的放在队列的末尾。实现代码如下:

  1. public int compareTo(Delayed other) {
  2. if (other == this)// compare zero ONLY if same object
  3. return 0;
  4. if (other instanceof Scheduled FutureTask) {
  5. Scheduled FutureTask<> x = (Scheduled FutureTask<>)other;
  6. long diff = time - x.time;
  7. if (diff < 0)
  8. return -1;
  9. else if (diff > 0)
  10. return 1;
  11. else if (sequence Number < x.sequence Number)
  12. return -1;
  13. else
  14. return 1;
  15. }
  16. long d = (getDelay(Time Unit.NANOSECONDS) -
  17. other.getDelay(Time Unit.NANOSECONDS));
  18. return (d == 0) 0 : ((d < 0) -1 : 1);
  19. }

二、实现延时阻塞队列
延时阻塞队列的实现很简单,当消费者从队列里获取元素时,如果元素没有达到延时时间,就阻塞当前线程。

  1. long delay = first.getDelay(Time Unit.NANOSECONDS);
  2. if (delay <= 0)
  3. return q.poll();
  4. else if (leader != null)
  5. available.await();
  6. else {
  7. Thread thisThread = Thread.currentThread();
  8. leader = thisThread;
  9. try {
  10. available.awaitNanos(delay);
  11. } finally {
  12. if (leader == thisThread)
  13. leader = null;
  14. }
  15. }

代码中的变量leader是一个等待获取队列头部元素的线程。如果leader不等于空,表示已经有线程在等待获取队列的头元素。所以,使用await()方法让当前线程等待信号。如果leader等于空,则把当前线程设置成leader,并使用awaitNanos()方法让当前线程等待接收信号或等待delay时间。

下面实现两种具体场景:

1、模拟一个考试的日子,考试时间为120分钟,30分钟后才可交卷,当时间到了,或学生都交完卷了考试结束。这个场景中几个点需要注意:

实现思想:用DelayQueue存储考生(Student类),每一个考生都有自己的名字和完成试卷的时间,Teacher线程对DelayQueue进行监控,收取完成试卷小于120分钟的学生的试卷。当考试时间120分钟到时,先关闭Teacher线程,然后强制DelayQueue中还存在的考生交卷。每一个考生交卷都会进行一次countDownLatch.countDown(),当countDownLatch.await()不再阻塞说明所有考生都交完卷了,而后结束考试。

Student类实现Runnable和Delayed接口,之后就可以存入DelayQueue中去了:

  1. class Student implements Runnable,Delayed{
  2. private String name;
  3. private long workTime;
  4. private long submitTime;
  5. private boolean isForce = false;
  6. private CountDownLatch countDownLatch;
  7. public Student(){}
  8. public Student(String name,long workTime,CountDownLatch countDownLatch){
  9. this.name = name;
  10. this.workTime = workTime;
  11. this.submitTime = TimeUnit.NANOSECONDS.convert(workTime,
  12. TimeUnit.NANOSECONDS)+System.nanoTime();
  13. this.countDownLatch = countDownLatch;
  14. }
  15. @Override
  16. public int compareTo(Delayed o) {
  17. // TODO Auto-generated method stub
  18. if(o == null || ! (o instanceof Student)) return 1;
  19. if(o == this) return 0;
  20. Student s = (Student)o;
  21. if (this.workTime > s.workTime) {
  22. return 1;
  23. }else if (this.workTime == s.workTime) {
  24. return 0;
  25. }else {
  26. return -1;
  27. }
  28. }
  29. @Override
  30. public long getDelay(TimeUnit unit) {
  31. // TODO Auto-generated method stub
  32. return unit.convert(submitTime - System.nanoTime(), TimeUnit.NANOSECONDS);
  33. }
  34. @Override
  35. public void run() {
  36. // TODO Auto-generated method stub
  37. if (isForce) {
  38. System.out.println(name + " 交卷, 希望用时" + workTime + "分钟"+" ,实际用时 120分钟" );
  39. }else {
  40. System.out.println(name + " 交卷, 希望用时" + workTime +
  41. "分钟"+" ,实际用时 "+workTime +" 分钟");
  42. }
  43. countDownLatch.countDown();
  44. }
  45. public boolean isForce() {
  46. return isForce;
  47. }
  48. public void setForce(boolean isForce) {
  49. this.isForce = isForce;
  50. }
  51. }

Teacher类用来收取DelayQueue中时间到了的学生的试卷。也就是说一个学生如果用时大于30分钟小于120分钟,那么当时间到了的时候Teacheer类就会从QelayQueue中取出这个学生。

  1. class Teacher implements Runnable{
  2. private DelayQueue<Student> students;
  3. public Teacher(DelayQueue<Student> students){
  4. this.students = students;
  5. }
  6. @Override
  7. public void run() {
  8. // TODO Auto-generated method stub
  9. try {
  10. System.out.println(" test start");
  11. while(!Thread.interrupted()){
  12. students.take().run();
  13. }
  14. } catch (Exception e) {
  15. // TODO: handle exception
  16. e.printStackTrace();
  17. }
  18. }
  19. }

EndExam类是强制交卷类,当考生用时超过120分钟就会强制从DelayQueue中取出来。

  1. class EndExam extends Student{
  2. private DelayQueue<Student> students;
  3. private CountDownLatch countDownLatch;
  4. private Thread teacherThread;
  5. public EndExam(DelayQueue<Student> students, long workTime,
  6. CountDownLatch countDownLatch,Thread teacherThread) {
  7. super("强制收卷", workTime,countDownLatch);
  8. this.students = students;
  9. this.countDownLatch = countDownLatch;
  10. this.teacherThread = teacherThread;
  11. }
  12. @Override
  13. public void run() {
  14. // TODO Auto-generated method stub
  15. teacherThread.interrupt();
  16. Student tmpStudent;
  17. for (Iterator<Student> iterator2 = students.iterator(); iterator2.hasNext();) {
  18. tmpStudent = iterator2.next();
  19. tmpStudent.setForce(true);
  20. tmpStudent.run();
  21. }
  22. countDownLatch.countDown();
  23. }
  24. }

Exam是考试主类,包含一个main方法:

  1. public class Exam {
  2. public static void main(String[] args) throws InterruptedException {
  3. // TODO Auto-generated method stub
  4. int studentNumber = 20;
  5. CountDownLatch countDownLatch = new CountDownLatch(studentNumber+1);
  6. DelayQueue< Student> students = new DelayQueue<Student>();
  7. Random random = new Random();
  8. for (int i = 0; i < studentNumber; i++) {
  9. students.put(new Student("student"+(i+1), 30+random.nextInt(120),countDownLatch));
  10. }
  11. Thread teacherThread =new Thread(new Teacher(students));
  12. students.put(new EndExam(students, 120,countDownLatch,teacherThread));
  13. teacherThread.start();
  14. countDownLatch.await();
  15. System.out.println(" 考试时间到,全部交卷!");
  16. }
  17. }

2、具有过期时间的缓存
向缓存添加内容时,给每一个key设定过期时间,系统自动将超过过期时间的key清除。这个场景中几个点需要注意:

Cache主类:

  1. public class Cache<K, V> {
  2. public ConcurrentHashMap<K, V> map = new ConcurrentHashMap<K, V>();
  3. public DelayQueue<DelayedItem<K>> queue = new DelayQueue<DelayedItem<K>>();
  4. public void put(K k,V v,long liveTime){
  5. V v2 = map.put(k, v);
  6. DelayedItem<K> tmpItem = new DelayedItem<K>(k, liveTime);
  7. if (v2 != null) {
  8. queue.remove(tmpItem);
  9. }
  10. queue.put(tmpItem);
  11. }
  12. public Cache(){
  13. Thread t = new Thread(){
  14. @Override
  15. public void run(){
  16. dameonCheckOverdueKey();
  17. }
  18. };
  19. t.setDaemon(true);
  20. t.start();
  21. }
  22. public void dameonCheckOverdueKey(){
  23. while (true) {
  24. DelayedItem<K> delayedItem = queue.poll();
  25. if (delayedItem != null) {
  26. map.remove(delayedItem.getT());
  27. System.out.println(System.nanoTime()+" remove "+
  28. delayedItem.getT() +" from cache");
  29. }
  30. try {
  31. Thread.sleep(300);
  32. } catch (Exception e) {
  33. // TODO: handle exception
  34. }
  35. }
  36. }
  37. public static void main(String[] args) throws InterruptedException {
  38. Random random = new Random();
  39. int cacheNumber = 10;
  40. int liveTime = 0;
  41. Cache<String, Integer> cache = new Cache<String, Integer>();
  42. for (int i = 0; i < cacheNumber; i++) {
  43. liveTime = random.nextInt(3000);
  44. System.out.println(i+" "+liveTime);
  45. cache.put(i+"", i, random.nextInt(liveTime));
  46. if (random.nextInt(cacheNumber) > 7) {
  47. liveTime = random.nextInt(3000);
  48. System.out.println(i+" "+liveTime);
  49. cache.put(i+"", i, random.nextInt(liveTime));
  50. }
  51. }
  52. Thread.sleep(3000);
  53. System.out.println();
  54. }
  55. }

DelayedItem类:

  1. class DelayedItem<T> implements Delayed{
  2. private T t;
  3. private long liveTime ;
  4. private long removeTime;
  5. public DelayedItem(T t,long liveTime){
  6. this.setT(t);
  7. this.liveTime = liveTime;
  8. this.removeTime = TimeUnit.NANOSECONDS.convert(liveTime, TimeUnit.NANOSECONDS) +
  9. System.nanoTime();
  10. }
  11. @Override
  12. public int compareTo(Delayed o) {
  13. if (o == null) return 1;
  14. if (o == this) return 0;
  15. if (o instanceof DelayedItem){
  16. DelayedItem<T> tmpDelayedItem = (DelayedItem<T>)o;
  17. if (liveTime > tmpDelayedItem.liveTime ) {
  18. return 1;
  19. }else if (liveTime == tmpDelayedItem.liveTime) {
  20. return 0;
  21. }else {
  22. return -1;
  23. }
  24. }
  25. long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
  26. return diff > 0 ? 1:diff == 0? 0:-1;
  27. }
  28. @Override
  29. public long getDelay(TimeUnit unit) {
  30. return unit.convert(removeTime - System.nanoTime(), unit);
  31. }
  32. public T getT() {
  33. return t;
  34. }
  35. public void setT(T t) {
  36. this.t = t;
  37. }
  38. @Override
  39. public int hashCode(){
  40. return t.hashCode();
  41. }
  42. @Override
  43. public boolean equals(Object object){
  44. if (object instanceof DelayedItem) {
  45. return object.hashCode() == hashCode() ?true:false;
  46. }
  47. return false;
  48. }
  49. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注