[关闭]
@linux1s1s 2017-01-22T16:58:36.000000Z 字数 7784 阅读 4512

Java 并发编程框架(一)

Java 2015-04

在Java1.5之前,编写多线程并非易事,那么编写多线程为啥不想想象的那么简单,为什么需要线程池?先来回答这个问题。

Why Thread Pool

在Java中,如果每当一个请求到达就创建一个新线程,开销是相当大的。在实际使用中,每个请求创建新线程的服务器在创建和销毁线程上花费的时间和消耗的系统资源,甚至可能要比花在实际处理实际的用户请求的时间和资源要多的多。除了创建和销毁线程的开销之外,活动的线程也需要消耗系统资源。如果在一个JVM中创建太多的线程,可能会导致系统由于过度消耗内存或者“切换过度”而导致系统资源不足。为了防止资源不足,服务器应用程序需要一些办法来限制任何给定时刻处理的请求数目,尽可能减少创建和销毁线程的次数,特别是一些资源耗费比较大的线程的创建和销毁,尽量利用已有对象来进行服务,这就是“池化资源”技术产生的原因。

线程池主要用来解决线程生命周期开销问题和资源不足问题,通过对多个任务重用线程,线程创建的开销被分摊到多个任务上了,而且由于在请求到达时线程已经存在,所以消除了创建所带来的延迟。这样,就可以立即请求服务,使应用程序响应更快。另外,通过适当的调整线程池中的线程数据可以防止出现资源不足的情况。

Java1.5推出线程池Executor框架以后,编写多线程变得简单易行,既然好用,那么很值得对Executor框架做个简单的了解,在了解之前,需要先了解一下Thread基本概念

Thread

我们知道生成一个子线程有两种方法,一种是直接生成一个匿名内部类,重写run方法,另外一种是新建一个runable类实现runable接口,然后作为参数生成Thread实例

方式1:

  1. new Thread()
  2. {
  3. @Override
  4. public void run()
  5. {
  6. super.run();
  7. //TODO
  8. }
  9. }.start();

方式2:

  1. private class Worker implements Runnable
  2. {
  3. @Override
  4. public void run()
  5. {
  6. //TODO
  7. }
  8. }
  9. new Thread(new Worker()).start();

Tips: 方式2 灵活性更高,因为Worker类还可以继承其他父类或者实现其他接口,而方式1除了可以继承Thread外不可以再继承别的类,虽然可以实现其他接口,灵活性毕竟不如方式2
Note: 即使方式2比较灵活,但是如果我们想重用Thread,很容易想到Java1.5为我们提供的Executor框架,所以接下来一起来看看这个Executor框架长什么样?

Executor框架

在了解这个框架之前,首先看看这个框架有哪些相关的主要类

然后分别对上面列出来的主要类做个简单的代码了解

Executor

  1. public interface Executor {
  2. void execute(Runnable command);
  3. }

这是个顶层的接口,只有一个execute方法,接收Runnable实例作为参数。

ExecutorService

  1. public interface ExecutorService extends Executor {
  2. void shutdown();
  3. boolean isShutdown();
  4. <T> Future<T> submit(Callable<T> task);
  5. Future<?> submit(Runnable task);
  6. ...
  7. }

这是个顶层Executor接口的子接口,这个子接口扩张了顶层Executor接口,这些接口明显增强了开发者对Thread的控制能力。

Executors

  1. public class Executors {
  2. public static ExecutorService newFixedThreadPool(int nThreads) {
  3. return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
  4. }
  5. public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
  6. return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);
  7. }
  8. public static ExecutorService newSingleThreadExecutor() {
  9. return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
  10. }
  11. public static ExecutorService newCachedThreadPool() {
  12. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
  13. }
  14. ...
  15. }

这个类定义成ExecutorUtil更合适,因为它是个彻头彻尾的工具类,通过Executors类中的静态方法可以获得十分丰富的线程池,对于这些线程池具体有什么区别,后面会逐一介绍

Future

  1. public interface Future<V> {
  2. boolean isDone();
  3. boolean cancel(boolean mayInterruptIfRunning);
  4. boolean isCancelled();
  5. V get() throws InterruptedException, ExecutionException;
  6. V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
  7. ...
  8. }

这是个顶层接口,这个接口明显的弥补了Thread的缺陷,因为Thread并不能对自身进行控制,更不能获得线程执行的结果(显示的结果,当然如果想进一步将这个Result返回给其他线程,只需要通过线程间通信将Result传递过去亦可,但是过程稍显冗余)。

RunnableFuture

  1. public interface RunnableFuture<V> extends Runnable, Future<V> {
  2. void run();
  3. }

这是个FutureRunnable接口的子接口,这个接口只是将Runnable接口的唯一run方法缩小的可见范围,为啥要缩小可见范围?尚不清楚,那么如果这么改写这个类,可以行得通否?。

  1. public interface RunnableFuture<V> extends Runnable, Future<V> {
  2. }

FutureTask

  1. public class FutureTask<V> implements RunnableFuture<V> {
  2. public FutureTask(Callable<V> callable) {
  3. if (callable == null)
  4. throw new NullPointerException();
  5. this.callable = callable;
  6. this.state = NEW; // ensure visibility of callable
  7. }
  8. public FutureTask(Runnable runnable, V result) {
  9. this.callable = Executors.callable(runnable, result);
  10. this.state = NEW; // ensure visibility of callable
  11. }
  12. public V get() throws InterruptedException, ExecutionException {
  13. int s = state;
  14. if (s <= COMPLETING)
  15. s = awaitDone(false, 0L);
  16. return report(s);
  17. }
  18. public boolean isCancelled() {
  19. return state >= CANCELLED;
  20. }
  21. public boolean isDone() {
  22. return state != NEW;
  23. }
  24. ...
  25. }

这个类是RunnableFuture接口实现类,从构造器可以看出,接收Callable实例或者Runnable并带有V泛型的Result实例。

Callable

  1. public interface Executor {
  2. V call() throws Exception;
  3. }

这是个顶层的接口,只有一个call方法,并且返回一个*V泛型作为Result,这个是所有可控制,可取得线程执行结果的顶层类,一般作为参数生成线程实例或者传给线程池。

CompletionService

  1. public interface CompletionService<V> {
  2. Future<V> submit(Callable<V> task);
  3. Future<V> submit(Runnable task, V result);
  4. Future<V> take() throws InterruptedException;
  5. Future<V> poll();
  6. Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
  7. }

这是个顶层的接口,那么这个接口有神马用呢?这里简单的给一点提示。

Tips: 当向Executor提交批处理任务时,并且希望在它们完成后获得结果,如果用FutureTask,你可以循环获取task,并用future.get()去获取结果,但是如果这个task没有完成,你就得阻塞在这里,这个实效性不高,其实在很多场合,其实你拿第一个任务结果时,此时结果并没有生成并阻塞,其实在阻塞在第一个任务时,第二个task的任务已经早就完成了,显然这种情况用future task不合适的,效率也不高。
Note: 那么自己维护list和CompletionService的区别?
1: 从list中遍历的每个Future对象并不一定处于完成状态,这时调用get()方法就会被阻塞住,如果系统是设计成每个线程完成后就能根据其结果继续做后面的事,这样对于处于list后面的但是先完成的线程就会增加了额外的等待时间。
2: 而CompletionService的实现是维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。

ExecutorCompletionService

  1. public class ExecutorCompletionService<V> implements CompletionService<V> {
  2. private class QueueingFuture extends FutureTask<Void> {
  3. QueueingFuture(RunnableFuture<V> task) {
  4. super(task, null);
  5. this.task = task;
  6. }
  7. protected void done() { completionQueue.add(task); }
  8. private final Future<V> task;
  9. }
  10. public Future<V> submit(Callable<V> task) {
  11. if (task == null) throw new NullPointerException();
  12. RunnableFuture<V> f = newTaskFor(task);
  13. executor.execute(new QueueingFuture(f));
  14. return f;
  15. }
  16. ...
  17. }

这是类是CompletionService实现类,这里需要特别注意的是completionQueue这个队列的维护。

Example1

Case.Java 测试用例

  1. public class Case
  2. {
  3. public static void main(String[] args)
  4. {
  5. CommonExecutor commonExecutor = new CommonExecutor();
  6. commonExecutor.submit(FutureCase.FUTURETASK_THREAD);
  7. }
  8. public static enum FutureCase
  9. {
  10. //分别是三种方式,通过Future和Executor线程池
  11. //FutureTask和Executor线程池
  12. //FutureTask和Thread
  13. FUTURE_EXECUTOR, FUTURETASK_EXECUTOR, FUTURETASK_THREAD;
  14. }
  15. }

CommonExecutor.java 业务逻辑类

  1. public class CommonExecutor
  2. {
  3. public static class Worker implements Callable<Integer>
  4. {
  5. private int mNumber;
  6. Worker(int number)
  7. {
  8. mNumber = number;
  9. }
  10. @Override
  11. public Integer call() throws Exception
  12. {
  13. try
  14. {
  15. Thread.sleep(new Random().nextInt(2000));
  16. System.out.println("Current Thread is: " + Thread.currentThread().getName() + " " + mNumber);
  17. }
  18. catch (Exception e)
  19. {
  20. e.printStackTrace();
  21. }
  22. return mNumber;
  23. }
  24. }
  25. public void submit(FutureCase futureCase)
  26. {
  27. ExecutorService executor = Executors.newCachedThreadPool();
  28. Worker worker = new Worker(1);
  29. switch (futureCase)
  30. {
  31. case FUTURE_EXECUTOR:
  32. Future<Integer> future = executor.submit(worker);
  33. executor.shutdown();
  34. System.out.println("Wait......");
  35. try
  36. {
  37. System.out.println("Result: " + future.get());
  38. }
  39. catch (InterruptedException e)
  40. {
  41. e.printStackTrace();
  42. }
  43. catch (ExecutionException e)
  44. {
  45. e.printStackTrace();
  46. }
  47. break;
  48. case FUTURETASK_THREAD:
  49. FutureTask<Integer> futureTask2 = new FutureTask<Integer>(worker);
  50. new Thread(futureTask2).start();
  51. System.out.println("Wait......");
  52. try
  53. {
  54. System.out.println("Result: " + futureTask2.get());
  55. }
  56. catch (InterruptedException e)
  57. {
  58. e.printStackTrace();
  59. }
  60. catch (ExecutionException e)
  61. {
  62. e.printStackTrace();
  63. }
  64. break;
  65. case FUTURETASK_EXECUTOR:
  66. FutureTask<Integer> futureTask = new FutureTask<Integer>(worker);
  67. executor.submit(futureTask);
  68. executor.shutdown();
  69. System.out.println("Wait......");
  70. try
  71. {
  72. System.out.println("Result: " + futureTask.get());
  73. }
  74. catch (InterruptedException e)
  75. {
  76. e.printStackTrace();
  77. }
  78. catch (ExecutionException e)
  79. {
  80. e.printStackTrace();
  81. }
  82. break;
  83. default:
  84. break;
  85. }
  86. }
  87. }

运行结果

Wait......
Current Thread is: Thread-0 1
Result: 1

Example2

我们将上面的FutureCase再增加一种情况,专门测试CompletionService
Case.java 测试用例类

  1. public static enum FutureCase
  2. {
  3. FUTURETASK_EXECUTOR, FUTURE_EXECUTOR, FUTURETASK_THREAD, COMPLETIONSERVICE
  4. }

CommonExecutor.java 业务逻辑类

  1. case COMPLETIONSERVICE:
  2. ExecutorService pool = Executors.newFixedThreadPool(5);
  3. CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(pool);
  4. for (int i = 0; i < 5; i++)
  5. {
  6. completionService.submit(new Worker(i));
  7. }
  8. for (int i = 0; i < 5; i++)
  9. {
  10. try
  11. {
  12. System.out.println("Result: " + completionService.take().get());
  13. }
  14. catch (InterruptedException e)
  15. {
  16. e.printStackTrace();
  17. }
  18. catch (ExecutionException e)
  19. {
  20. e.printStackTrace();
  21. }
  22. }
  23. break;

运行结果

Current Thread is: pool-2-thread-2 1
Result: 1
Current Thread is: pool-2-thread-4 3
Result: 3
Current Thread is: pool-2-thread-5 4
Result: 4
Current Thread is: pool-2-thread-1 0
Result: 0
Current Thread is: pool-2-thread-3 2
Result: 2
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注