[关闭]
@linux1s1s 2017-01-22T16:58:54.000000Z 字数 7401 阅读 2867

Java 并发编程框架(二)

Java 2015-04

在基本了解了并发线程的主要类以后(如果你对这些类没有基本的概念,请阅读Java 并发编程框架(一)
这篇文章会对前一篇文章提及的主要类做进一步说明

CompletionService

CompletionService究竟是什么,如果一开始很难理解它,那么我们可以通过一个Demo来模仿一下ExecutorCompletionService这个实现类

Case.Java 测试用例

  1. public class Case
  2. {
  3. public static void main(String[] args)
  4. {
  5. CommonCompletionService commonCompletionService = new CommonCompletionService();
  6. commonCompletionService.submit(CompletionCase.COMPLETION_MANUL);
  7. }
  8. public static enum FutureCase
  9. {
  10. FUTURETASK_EXECUTOR, FUTURE_EXECUTOR, FUTURETASK_THREAD, COMPLETIONSERVICE
  11. }
  12. public static enum CompletionCase
  13. {
  14. COMPLETION_MANUL, COMPLETION_LIB
  15. }
  16. }

CommonCompletionService.java 业务逻辑类

  1. import java.util.Random;
  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.Callable;
  4. import java.util.concurrent.CompletionService;
  5. import java.util.concurrent.ExecutionException;
  6. import java.util.concurrent.ExecutorCompletionService;
  7. import java.util.concurrent.ExecutorService;
  8. import java.util.concurrent.Executors;
  9. import java.util.concurrent.Future;
  10. import java.util.concurrent.LinkedBlockingQueue;
  11. import com.executor.demo.Case.CompletionCase;
  12. public class CommonCompletionService
  13. {
  14. public static class WorkBack implements Callable<String>
  15. {
  16. private String name;
  17. public WorkBack(String name)
  18. {
  19. this.name = name;
  20. }
  21. @Override
  22. public String call() throws Exception
  23. {
  24. try
  25. {
  26. Thread.sleep(new Random().nextInt(2000));
  27. }
  28. catch (Exception e)
  29. {
  30. e.printStackTrace();
  31. }
  32. return name;
  33. }
  34. }
  35. private static final int TASK_TOTAL = 10;
  36. public void submit(CompletionCase completionCase)
  37. {
  38. if (completionCase == null) return;
  39. switch (completionCase)
  40. {
  41. case COMPLETION_MANUL:
  42. ExecutorService pool = Executors.newFixedThreadPool(5);
  43. BlockingQueue<Future<String>> queue = new LinkedBlockingQueue<Future<String>>();
  44. for (int i = 0; i < TASK_TOTAL; i++)
  45. {
  46. Future<String> future = pool.submit(new WorkBack(Thread.currentThread().getName() + " " + i));
  47. queue.add(future);
  48. }
  49. for (int i = 0; i < TASK_TOTAL; i++)
  50. {
  51. try
  52. {
  53. System.out.println("COMPLETION_MANUL:" + queue.take().get());
  54. }
  55. catch (InterruptedException e)
  56. {
  57. e.printStackTrace();
  58. }
  59. catch (ExecutionException e)
  60. {
  61. e.printStackTrace();
  62. }
  63. }
  64. pool.shutdown();
  65. break;
  66. case COMPLETION_LIB:
  67. ExecutorService pool2 = Executors.newFixedThreadPool(5);
  68. CompletionService<String> completionService = new ExecutorCompletionService<String>(pool2);
  69. for (int i = 0; i < TASK_TOTAL; i++)
  70. {
  71. completionService.submit(new WorkBack(Thread.currentThread().getName() + " " + i));
  72. }
  73. for (int i = 0; i < TASK_TOTAL; i++)
  74. {
  75. try
  76. {
  77. System.out.println("COMPLETION_LIB:" + completionService.take().get());
  78. }
  79. catch (InterruptedException e)
  80. {
  81. e.printStackTrace();
  82. }
  83. catch (ExecutionException e)
  84. {
  85. e.printStackTrace();
  86. }
  87. }
  88. pool2.shutdown();
  89. break;
  90. default:
  91. break;
  92. }
  93. }
  94. }

运行结果:

COMPLETION_MANUL:main 0
COMPLETION_MANUL:main 1
COMPLETION_MANUL:main 2
COMPLETION_MANUL:main 3
COMPLETION_MANUL:main 4
COMPLETION_MANUL:main 5
COMPLETION_MANUL:main 6
COMPLETION_MANUL:main 7
COMPLETION_MANUL:main 8
COMPLETION_MANUL:main 9

接下来分析一下上面的程序,case COMPLETION_MANUL:这个部分的程序是模仿CompletionService的简单实现,而case COMPLETION_LIB:这个部分程序就是CompletionService简单Demo,所以对比上下这个两部分程序,可以看出,CompletionService的主要功能是封装了具有BlockingQueue的ExecutorService线程池,这样理解起来比较形象,其实看看CompletionServic的实现类,也是这样做的,感兴趣的话可以看看源代码,以便更深入的理解。

ThreadPoolExecutor

还记得Java 并发编程框架(一)这篇文章中提及的Executors 其中比较重要的静态方法,仔细看看代码会发现,这些静态方法都指向了同一个类ThreadPoolExecutor,很明显它是线程池的生成类,接下来如果想进一步了解线程池,那么不可避免的会和这个类打交道了。

  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. public ThreadPoolExecutor(int corePoolSize,
  3. int maximumPoolSize,
  4. long keepAliveTime,
  5. TimeUnit unit,
  6. BlockingQueue<Runnable> workQueue) {
  7. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  8. Executors.defaultThreadFactory(), defaultHandler);
  9. }
  10. public ThreadPoolExecutor(int corePoolSize,
  11. int maximumPoolSize,
  12. long keepAliveTime,
  13. TimeUnit unit,
  14. BlockingQueue<Runnable> workQueue,
  15. ThreadFactory threadFactory,
  16. RejectedExecutionHandler handler) {
  17. if (corePoolSize < 0 ||
  18. maximumPoolSize <= 0 ||
  19. maximumPoolSize < corePoolSize ||
  20. keepAliveTime < 0)
  21. throw new IllegalArgumentException();
  22. if (workQueue == null || threadFactory == null || handler == null)
  23. throw new NullPointerException();
  24. this.corePoolSize = corePoolSize;
  25. this.maximumPoolSize = maximumPoolSize;
  26. this.workQueue = workQueue;
  27. this.keepAliveTime = unit.toNanos(keepAliveTime);
  28. this.threadFactory = threadFactory;
  29. this.handler = handler;
  30. }
  31. ...
  32. }

我们重点看一下参数是5个的构造器即可,下面解释一下这5个参数分别代表神马意思。

这些个参数一开始很容易让人望文生义:线程池里保持corePoolSize个线程,如果不够用,就加线程入池直至maximumPoolSize大小,如果还不够就往workQueue里加,如果workQueue也不够就用RejectedExecutionHandler来做拒绝处理。

在你认真查看ThreadPoolExecutor源代码上面一大段英文注释过后,你会发现上面的理解是多么可笑,下面将原文翻译过后大概的理解说一下:(为了表述上的方便我们定义一下当前线程池线程数量为CurrentPoolSize

上面解释了基本的参数含义,接下来看看上文提及的Executor静态方法

newFixedThreadPool

  1. public class Executors {
  2. public static ExecutorService newFixedThreadPool(int nThreads) {
  3. return new ThreadPoolExecutor(nThreads, nThreads,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>());
  6. }
  7. }

它将corePoolSize和maximumPoolSize都设定为了nThreads,这样便实现了线程池的大小的固定,不会动态地扩大,另外,keepAliveTime设定为了0,也就是说线程只要空闲下来,就会被移除线程池,关于LinkedBlockingQueue后面会给出详细说明。

newCachedThreadPool

  1. public class Executors {
  2. public static ExecutorService newCachedThreadPool() {
  3. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  4. 60L, TimeUnit.SECONDS,
  5. new SynchronousQueue<Runnable>());
  6. }
  7. }

它将corePoolSize设定为0,而将maximumPoolSize设定为了Integer的最大值,线程空闲超过60秒,将会从线程池中移除。由于核心线程数为0,因此每次添加任务,都会先从线程池中找空闲线程,如果没有就会创建一个线程(SynchronousQueue决定的,后面会说)来执行新的任务,并将该线程加入到线程池中,而最大允许的线程数为Integer的最大值,因此这个线程池理论上可以不断扩大。

Queue排队策略

RejectedExecutionHandler拒绝策略

当任务源源不断的过来,而我们的系统又处理不过来的时候,我们要采取的策略是拒绝服务。RejectedExecutionHandler接口提供了拒绝任务处理的自定义方法的机会。在ThreadPoolExecutor中已经包含四种处理策略。

  1. CallerRunsPolicy
  2. AbortPolicy
  3. DiscardPolicy
  4. DiscardOldestPolicy

分别对这个四种策略说明如下:

CallerRunsPolicy

线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

  1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  2. if (!e.isShutdown()) {
  3. r.run();
  4. }
  5. }

这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。(开始我总不想丢弃任务的执行,但是对某些应用场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执行的,很可能导致程序没法继续跑了。需要视业务情景而定吧。)

AbortPolicy

处理程序遭到拒绝将抛出运行时 RejectedExecutionException。

  1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  2. throw new RejectedExecutionException();
  3. }

这个策略直接抛出异常,丢弃任务。(jdk默认策略,队列满并线程满时直接拒绝添加新任务,并抛出异常,所以说有时候放弃也是一种勇气,为了保证后续任务的正常进行,丢弃一些也是可以接收的,记得做好记录)

DiscardPolicy

不能执行的任务将被删除。

  1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}

这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。

DiscardOldestPolicy

如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)

  1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  2. if (!e.isShutdown()) {
  3. e.getQueue().poll();
  4. e.execute(r);
  5. }
  6. }

该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注