[关闭]
@boothsun 2017-06-25T11:19:22.000000Z 字数 5095 阅读 3990

从Java Future到Guava ListenableFuture实现异步调用

Java多线程


原文地址: http://blog.csdn.net/pistolove/article/details/51232004

Java Future

    通过Executors可以创建不同类似的线程池,常见的大概有下表几种类型,还有些可能为被列出。在实际应用中,个人感觉主要使用newCachedThreadPook和newFixedThreadPool来创建线程池。

Executors创建线程池源码

  1. //调用newCachedThreadPool方法,可以创建一个缓冲型线程池,而在改方法中通过传参创建一个ThreadPoolExecutor,也许你会很奇怪明明返回的是一个ExecutorService,怎么会创建了一个ThreadPoolExecutor呢?
  2. public static ExecutorService newCachedThreadPool() {
  3. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L,
  4. TimeUnit.SECONDS, new SynchronousQueue<Runnable());
  5. }
  6. // ThreadPoolExecutor继承了抽象的service类AbstractExecutorService
  7. public class ThreadPoolExecutor extends AbstractExecutorService {}
  8. //AbstractExecutorService实现了ExecutorService接口
  9. public abstract class AbstractExecutorService implements ExecutorService {}
  10. //所以ExecutorService其实是ThreadPoolExecutor的基类,这也就解释清楚了

ExecutorService(线程池)

ExecutorService是一个接口,它继承了Executor,在原有execute方法的基础上新增了submit方法,传入一个任务,该方法能够返回一个Future对象,可以获取异步计算结果。

  1. //ExecutorService继承了Executor,并扩展了新方法。
  2. public interface ExecutorService extends Executor { }
  3. //Executor中的方法
  4. void execute(Runnable command);
  5. //增加了submit方法,该方法传任务来获取Future对象,而Future对象中可以获取任务的执行结果
  6. <T> Future<T> submit(Callable<T> task);
  7. Future<?> submit(Runnable task);

Future(获取异步计算结果)

Future接口中有下表所示方法,可以获取当前正在执行的任务相关信息。

FutureTask

Executor框架利用FutureTask来完成异步任务,并可以用来进行任何潜在的耗时计算,一般FutureTask多用于耗时的计算,主线程可以在完成自己任务后,再去获取结果。

FutureTask包装了Callable和Runnable接口对象,提供了对Future接口的基本实现,开始、取消计算、查询结果是否完成、获取计算结果。仅当计算完成时才能检索结果,当计算没有完成时,该方法会一直阻塞直到任务转入完成状态。一旦完成计算,不能够重新开始或取消计算。通过Excutor(线程池)来执行,也可传递给Thread对象执行。如果在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future对象在后台完成,当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。

  1. import java.util.Random;
  2. import java.util.concurrent.Callable;
  3. import java.util.concurrent.ExecutionException;
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.Executors;
  6. import java.util.concurrent.Future;
  7. public class TestFuture {
  8. // 创建线程池
  9. final static ExecutorService service = Executors.newCachedThreadPool();
  10. public static void main(String[] args) throws InterruptedException, ExecutionException {
  11. Long t1 = System.currentTimeMillis();
  12. // 任务1
  13. Future<Boolean> booleanTask = service.submit(new Callable<Boolean>() {
  14. @Override
  15. public Boolean call() throws Exception {
  16. return true;
  17. }
  18. });
  19. while (true) {
  20. if (booleanTask.isDone() && !booleanTask.isCancelled()) {
  21. //模拟耗时
  22. Thread.sleep(500);
  23. Boolean result = booleanTask.get();
  24. System.err.println("BooleanTask: " + result);
  25. break;
  26. }
  27. }
  28. // 任务2
  29. Future<String> stringTask = service.submit(new Callable<String>() {
  30. @Override
  31. public String call() throws Exception {
  32. return "Hello World";
  33. }
  34. });
  35. while (true) {
  36. if (stringTask.isDone() && !stringTask.isCancelled()) {
  37. String result = stringTask.get();
  38. System.err.println("StringTask: " + result);
  39. break;
  40. }
  41. }
  42. // 任务3
  43. Future<Integer> integerTask = service.submit(new Callable<Integer>() {
  44. @Override
  45. public Integer call() throws Exception {
  46. return new Random().nextInt(100);
  47. }
  48. });
  49. while (true) {
  50. if (integerTask.isDone() && !integerTask.isCancelled()) {
  51. Integer result = integerTask.get();
  52. System.err.println("IntegerTask: " + result);
  53. break;
  54. }
  55. }
  56. // 执行时间
  57. System.err.println("time: " + (System.currentTimeMillis() - t1));
  58. }
  59. }

Guava Future

ListenableFuture是可以监听的Future,它是对Java原始Future的扩展增强。Future表示一个异步计算任务,当任务完成时可以得到计算结果。如果希望计算完成时马上就拿到结果展示给用户或者做另外的计算,就必须使用另一个线程不断的查询计算状态。这样做会使得代码复杂,且效率低下。如果使用ListenableFuture,Guava会帮助检测Future是否完成了,如果完成就自动调用回调函数,这样可以减少并发编程的复杂度。

常用API

1. MoreExecutors

该类是final类型的工具类,提供了很多静态方法。比如ListeningDecorator方法初始化ListeningExecutorService方法,使用此实例submit方法即可初始化ListenableFuture对象。

2. ListeningExecutorService

该类是对ExecutorService的扩展,重新ExecutorService类中的submit方法,返回ListenableFuture对象。

3. ListenableFuture

该接口扩展了Future接口,增加了addListener方法,该方法在给定的executor上注册一个监听器,当计算完成时会马上调用该监听器。不能够确保监听器执行的顺序,但可以在计算完成时确保马上被调用。

4. FutureCallback

该接口提供了OnSuccess和OnFailure方法。获取异步计算的结果并回调。

5. Futures

该类提供了很多实用的静态方法以供实用。

6. ListenableFutureTask

该类扩展了FutureTask类并实现了ListenableFuture接口,增加了addListener方法。

7.

  1. public class TestListenableFuture2 {
  2. // 创建线程池
  3. final static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
  4. public static void main(String[] args) throws Exception {
  5. Long t1 = System.currentTimeMillis();
  6. // 任务1
  7. ListenableFuture<Boolean> booleanTask = service.submit(new Callable<Boolean>() {
  8. @Override
  9. public Boolean call() throws Exception {
  10. return true;
  11. }
  12. });
  13. Futures.addCallback(booleanTask, new FutureCallback<Boolean>() {
  14. @Override
  15. public void onSuccess(Boolean result) {
  16. System.err.println("BooleanTask: " + result);
  17. }
  18. @Override
  19. public void onFailure(Throwable t) {
  20. }
  21. });
  22. // 任务2
  23. ListenableFuture<String> stringTask = service.submit(new Callable<String>() {
  24. @Override
  25. public String call() throws Exception {
  26. return "Hello World";
  27. }
  28. });
  29. Futures.addCallback(stringTask, new FutureCallback<String>() {
  30. @Override
  31. public void onSuccess(String result) {
  32. System.err.println("StringTask: " + result);
  33. }
  34. @Override
  35. public void onFailure(Throwable t) {
  36. }
  37. });
  38. // 任务3
  39. ListenableFuture<Integer> integerTask = service.submit(new Callable<Integer>() {
  40. @Override
  41. public Integer call() throws Exception {
  42. return new Random().nextInt(100);
  43. }
  44. });
  45. Futures.addCallback(integerTask, new FutureCallback<Integer>() {
  46. @Override
  47. public void onSuccess(Integer result) {
  48. try {
  49. Thread.sleep(500);
  50. } catch (InterruptedException e) {
  51. e.printStackTrace();
  52. }
  53. System.err.println("IntegerTask: " + result);
  54. }
  55. @Override
  56. public void onFailure(Throwable t) {
  57. }
  58. });
  59. // 执行时间
  60. System.err.println("time: " + (System.currentTimeMillis() - t1));
  61. }
  62. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注