[关闭]
@zsh-o 2018-11-05T15:53:33.000000Z 字数 18417 阅读 1106

从头理解理解future模式

Java


问题

首先给出一个同步问题的定义
未命名文件 (1).png-16.6kB
这里就要提到python的动态类型的tuple的好处,可以实现统一形式的多输入和多输出

接下来从最简单的开始一步一步进行优化

顺序结构

首先从最基本的顺序结构执行,要保证结果的正确性,需要按照上述有向图的一种拓扑排序顺序来组织代码
顺序结构我们只能按照其一种拓扑排序的方法组织其运行,这里按照[A,B,C,D,E,F]顺序运行即可

  1. package com.zsh_o.future;
  2. /**
  3. * 顺序结构执行
  4. * */
  5. public class SequentialBase {
  6. public static void main(String[] args) {
  7. SequentialBase app = new SequentialBase();
  8. try {
  9. app.run();
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. void run() throws InterruptedException {
  15. long startTime = System.currentTimeMillis();
  16. int a = getA();
  17. int b = getB(a);
  18. int c = getC();
  19. int d = getD();
  20. int e = getE(c, d);
  21. int f = getF(b, e);
  22. long endTime = System.currentTimeMillis();
  23. System.out.printf("Final Result: %d\n", f);
  24. System.out.printf("Total Time: %d\n", endTime - startTime);
  25. }
  26. /**
  27. * Define Functions A -> F
  28. * */
  29. int getA() throws InterruptedException {
  30. System.out.println("A: Running");
  31. Thread.sleep(1000);
  32. System.out.println("A: Returned");
  33. return 1;
  34. }
  35. int getB(int a) throws InterruptedException {
  36. System.out.println("B: Running");
  37. Thread.sleep(1000);
  38. System.out.println("B: Returned");
  39. return a + 1;
  40. }
  41. int getC() throws InterruptedException {
  42. System.out.println("C: Running");
  43. Thread.sleep(1000);
  44. System.out.println("C: Returned");
  45. return 10;
  46. }
  47. int getD() throws InterruptedException {
  48. System.out.println("D: Running");
  49. Thread.sleep(1000);
  50. System.out.println("D: Returned");
  51. return 20;
  52. }
  53. int getE(int c, int d) throws InterruptedException {
  54. System.out.println("E: Running");
  55. Thread.sleep(1000);
  56. System.out.println("E: Returned");
  57. return c + d;
  58. }
  59. int getF(int b, int e) throws InterruptedException {
  60. System.out.println("F: Running");
  61. Thread.sleep(1000);
  62. System.out.println("F: Returned");
  63. return b * e;
  64. }
  65. }

执行结果:

  1. A: Running
  2. A: Returned
  3. B: Running
  4. B: Returned
  5. C: Running
  6. C: Returned
  7. D: Running
  8. D: Returned
  9. E: Running
  10. E: Returned
  11. F: Running
  12. F: Returned
  13. Final Result: 60
  14. Total Time: 6003

可以看到执行效果还是非常不错的,加上输入输出也只多了3ms,再复杂的代码都是由最基础的代码一步一步按照场景优化来的,所以先来优化这个顺序结构

先从小功能开始,记录时间的代码太长了,如果要计时的太多会产生很多的计时变量,把其封装一下:

  1. package com.zsh_o.util;
  2. /**
  3. * 用以记录程序运行时间
  4. * */
  5. public class CountTimer {
  6. private long startTime;
  7. private long endTime;
  8. private long time;
  9. public CountTimer() {
  10. startTime = 0;
  11. endTime = 0;
  12. time = 0;
  13. }
  14. public void start() {
  15. startTime = System.currentTimeMillis();
  16. }
  17. public void end() {
  18. endTime = System.currentTimeMillis();
  19. time = endTime - startTime;
  20. }
  21. public long getTime() {
  22. return time;
  23. }
  24. }

第二个是,为了查看效果我们强行家了sleep一秒,但sleep是要catch异常的,所以每sleep一次都要处理下异常,在这个里面是完全没有必要的,我们单拿出来并处理下异常

  1. package com.zsh_o.util;
  2. public class Util {
  3. public static void sleep(long ms) {
  4. try {
  5. Thread.sleep(ms);
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. }
  9. }
  10. }

接下来我们考虑如何对代码进行复用,首先发现可以抽象出函数的调用,这样用户只需要关心函数体的内容就可以了,所以抽象出函数的调用,增加Callable接口

  1. package com.zsh_o.future;
  2. public interface Callable<T> {
  3. T call();
  4. }

这个地方可以看到我们定义的接口没有传递参数,是由于我们不知道上层用户想要传递几个参数,也不知道每个参数是什么类型,所以最好的办法就是不写参数。。。
python里面传递参数相当于直接传递tuple和dict,然后在函数里面检验参数正确性,但对于Java这种静态编译语言这种方法太影响性能,直接用Object[]在函数里面类型转换,这样对函数参数的约束太弱,很容易崩溃。Java也用泛型实现Tuple,但无法保证长度,所以这个地方直接不加参数,用闭包实现功能。

  1. package com.zsh_o.future;
  2. import com.zsh_o.util.CountTimer;
  3. import com.zsh_o.util.Util;
  4. import java.util.ArrayList;
  5. public class SequentialCase1 {
  6. int a, b, c, d, e, f;
  7. public static void main(String[] args) {
  8. SequentialCase1 app = new SequentialCase1();
  9. app.run();
  10. }
  11. void run() {
  12. CountTimer timer = new CountTimer();
  13. timer.start();
  14. ArrayList<Callable> array = new ArrayList<>();
  15. array.add(() -> {System.out.println("A: Running"); Util.sleep(1000); a = 1; System.out.println("A: Returned"); return a;});
  16. array.add(() -> {System.out.println("B: Running"); Util.sleep(1000); b = a + 1; System.out.println("B: Returned"); return b;});
  17. array.add(() -> {System.out.println("C: Running"); Util.sleep(1000); c = 10; System.out.println("C: Returned"); return c;});
  18. array.add(() -> {System.out.println("D: Running"); Util.sleep(1000); d = 20; System.out.println("D: Returned"); return d;});
  19. array.add(() -> {System.out.println("E: Running"); Util.sleep(1000); e = c + d; System.out.println("E: Returned"); return e;});
  20. array.add(() -> {System.out.println("F: Running"); Util.sleep(1000); f = b * e; System.out.println("F: Returned"); return f;});
  21. for(var e : array) {
  22. e.call();
  23. }
  24. System.out.printf("Final Result: %d\n", f);
  25. timer.end();
  26. System.out.printf("Total Time: %d\n", timer.getTime());
  27. }
  28. }

结果如下

  1. A: Running
  2. A: Returned
  3. B: Running
  4. B: Returned
  5. C: Running
  6. C: Returned
  7. D: Running
  8. D: Returned
  9. E: Running
  10. E: Returned
  11. F: Running
  12. F: Returned
  13. Final Result: 60
  14. Total Time: 6046

可以看到ArrayList还是蛮耗时的

接下来我们发现,为了查看方法的执行效果,输出了每个方法的运行状态,但该运行状态在每个方法上都是相似的,包括sleep一秒也是相似的,所以,上面两个部分相当于对想实现功能的结果的附加和扩展,所以这里我们用装饰器模式对其扩展附加功能,首先定义该装饰器

  1. package com.zsh_o.future;
  2. /**
  3. * 采用装饰器对Call进行装饰,为Call增加附加功能
  4. * */
  5. public class CallDecorator<T> implements Callable<T> {
  6. Callable<T> callable;
  7. String name;
  8. public CallDecorator(String name, Callable<T> callable) {
  9. this.callable = callable;
  10. this.name = name;
  11. }
  12. @Override
  13. public T call() {
  14. System.out.printf("%s: Running\n", name);
  15. try {
  16. Thread.sleep(1000);
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. T value = callable.call();
  21. System.out.printf("%s: Returned\n", name);
  22. return value;
  23. }
  24. }

这里有一个问题,为什么是装饰器而不是代理?代理强调的是对代理对象的访问和执行控制,而装饰器强调对行为的扩展,这个地方只是加了输出和暂停一秒,所以叫装饰器更为合适

  1. package com.zsh_o.future;
  2. import com.zsh_o.util.CountTimer;
  3. import java.util.ArrayList;
  4. public class SequentialCase2{
  5. int a, b, c, d, e, f;
  6. public static void main(String[] args) {
  7. SequentialCase2 app = new SequentialCase2();
  8. try {
  9. app.run();
  10. } catch (InterruptedException e1) {
  11. e1.printStackTrace();
  12. }
  13. }
  14. void run() throws InterruptedException {
  15. CountTimer timer = new CountTimer();
  16. timer.start();
  17. ArrayList<CallDecorator> array = new ArrayList<>();
  18. array.add(new CallDecorator("A", () -> {a = 1; return a;}));
  19. array.add(new CallDecorator("B", () -> {b = a + 1; return b;}));
  20. array.add(new CallDecorator("C", () -> {c = 10; return c;}));
  21. array.add(new CallDecorator("D", () -> {d = 20; return d;}));
  22. array.add(new CallDecorator("E", () -> {e = c + d; return e;}));
  23. array.add(new CallDecorator("F", () -> {f = b * e; return f;}));
  24. for(var e : array) {
  25. e.call();
  26. }
  27. timer.end();
  28. System.out.printf("Final Result: %d\n", f);
  29. System.out.printf("Total Time: %d\n", timer.getTime());
  30. }
  31. }

结果:

  1. A: Running
  2. A: Returned
  3. B: Running
  4. B: Returned
  5. C: Running
  6. C: Returned
  7. D: Running
  8. D: Returned
  9. E: Running
  10. E: Returned
  11. F: Running
  12. F: Returned
  13. Final Result: 60
  14. Total Time: 6055

并发

我们用并发的方式优化该代码,该程序理论上最少的运行时间是3s,每一列的代码可以一起执行。在最一开始我们采用了6把锁来完成这个同步问题

  1. package com.zsh_o.future;
  2. import com.zsh_o.util.CountTimer;
  3. import com.zsh_o.util.Util;
  4. import java.util.ArrayList;
  5. import java.util.concurrent.CountDownLatch;
  6. import java.util.concurrent.locks.ReentrantLock;
  7. /**
  8. * 多线程解决,可能会出现的问题是还没来得及加锁,其他的线程已经开始运行
  9. * */
  10. public class ThreadBase1 {
  11. private int a, b, c, d, e, f;
  12. private ReentrantLock lockA, lockB, lockC, lockD, lockE, lockF;
  13. public ThreadBase1() {
  14. this.lockA = new ReentrantLock();
  15. this.lockB = new ReentrantLock();
  16. this.lockC = new ReentrantLock();
  17. this.lockD = new ReentrantLock();
  18. this.lockE = new ReentrantLock();
  19. this.lockF = new ReentrantLock();
  20. }
  21. void run() throws InterruptedException {
  22. CountTimer timer = new CountTimer();
  23. timer.start();
  24. // 用CountDownLatch确保主线程在所有子线程执行完再执行
  25. CountDownLatch latch = new CountDownLatch(6);
  26. ArrayList<Thread> array = new ArrayList<>();
  27. array.add(new Thread(() -> {
  28. lockA.lock();
  29. System.out.println("A: Running");
  30. Util.sleep(1000);a = 1;
  31. System.out.println("A: Returned");
  32. lockA.unlock();
  33. latch.countDown();
  34. }));
  35. array.add(new Thread(() -> {
  36. lockB.lock();lockA.lock();
  37. System.out.println("B: Running");
  38. Util.sleep(1000);b = a + 1;
  39. System.out.println("B: Returned");
  40. lockA.unlock();lockB.unlock();
  41. latch.countDown();
  42. }));
  43. array.add(new Thread(() -> {
  44. lockC.lock();
  45. System.out.println("C: Running");
  46. Util.sleep(1000);c = 10;
  47. System.out.println("C: Returned");
  48. lockC.unlock();
  49. latch.countDown();
  50. }));
  51. array.add(new Thread(() -> {
  52. lockD.lock();
  53. System.out.println("D: Running");
  54. Util.sleep(1000);d = 20;
  55. System.out.println("D: Returned");
  56. lockD.unlock();
  57. latch.countDown();
  58. }));
  59. array.add(new Thread(() -> {
  60. lockE.lock();lockC.lock();lockD.lock();
  61. System.out.println("E: Running");
  62. Util.sleep(1000);e = c + d;
  63. System.out.println("E: Returned");
  64. lockD.unlock();lockC.unlock();lockE.unlock();
  65. latch.countDown();
  66. }));
  67. array.add(new Thread(() -> {
  68. lockF.lock();lockB.lock();lockE.lock();
  69. System.out.println("F: Running");
  70. Util.sleep(1000);f = b * e;
  71. System.out.println("F: Returned");
  72. lockE.unlock();lockB.unlock();lockF.unlock();
  73. latch.countDown();
  74. }));
  75. for(var e: array) {
  76. e.start();
  77. }
  78. latch.await();
  79. timer.end();
  80. System.out.printf("Final Result: %d\n", f);
  81. System.out.printf("Total Time: %d\n", timer.getTime());
  82. }
  83. public static void main(String[] args) {
  84. ThreadBase1 app = new ThreadBase1();
  85. try {
  86. app.run();
  87. } catch (InterruptedException e1) {
  88. e1.printStackTrace();
  89. }
  90. }
  91. }

结果:

  1. D: Running
  2. A: Running
  3. C: Running
  4. D: Returned
  5. C: Returned
  6. A: Returned
  7. E: Running
  8. B: Running
  9. E: Returned
  10. B: Returned
  11. F: Running
  12. F: Returned
  13. Final Result: 60
  14. Total Time: 3030

这个可能会出现的问题是,可能会出现还没来得及加锁后面的就运行了,则会出现错误结果,所以我们用CountDownLatch来修改该代码

  1. package com.zsh_o.future;
  2. import com.zsh_o.util.CountTimer;
  3. import com.zsh_o.util.Util;
  4. import java.util.ArrayList;
  5. /**
  6. * 多线程解决
  7. * 用CountDownLatch完善
  8. * */
  9. public class ThreadCase1 {
  10. private int a, b, c, d, e, f;
  11. private CountDownLatch latchB, latchE, latchF;
  12. public ThreadCase1() {
  13. latchB = new CountDownLatch(1);
  14. latchE = new CountDownLatch(2);
  15. latchF = new CountDownLatch(2);
  16. }
  17. void run() throws InterruptedException {
  18. CountTimer timer = new CountTimer();
  19. timer.start();
  20. CountDownLatch latchMain = new CountDownLatch(1);
  21. ArrayList<Thread> array = new ArrayList<>();
  22. array.add(new Thread(() -> {
  23. System.out.println("A: Running");
  24. Util.sleep(1000);a = 1;
  25. System.out.println("A: Returned");
  26. latchB.countDown();
  27. }));
  28. array.add(new Thread(() -> {
  29. try {
  30. latchB.await();
  31. System.out.println("B: Running");
  32. Util.sleep(1000);b = a + 1;
  33. System.out.println("B: Returned");
  34. latchF.countDown();
  35. } catch (InterruptedException e1) {
  36. e1.printStackTrace();
  37. }
  38. }));
  39. array.add(new Thread(() -> {
  40. System.out.println("C: Running");
  41. Util.sleep(1000);c = 10;
  42. System.out.println("C: Returned");
  43. latchE.countDown();
  44. }));
  45. array.add(new Thread(() -> {
  46. System.out.println("D: Running");
  47. Util.sleep(1000);d = 20;
  48. System.out.println("D: Returned");
  49. latchE.countDown();
  50. }));
  51. array.add(new Thread(() -> {
  52. try {
  53. latchE.await();
  54. System.out.println("E: Running");
  55. Util.sleep(1000);e = c + d;
  56. System.out.println("E: Returned");
  57. latchF.countDown();
  58. } catch (InterruptedException e1) {
  59. e1.printStackTrace();
  60. }
  61. }));
  62. array.add(new Thread(() -> {
  63. try {
  64. latchF.await();
  65. System.out.println("F: Running");
  66. Util.sleep(1000);f = b * e;
  67. System.out.println("F: Returned");
  68. latchMain.countDown();
  69. } catch (InterruptedException e1) {
  70. e1.printStackTrace();
  71. }
  72. }));
  73. for(var e: array) {
  74. e.start();
  75. }
  76. latchMain.await();
  77. System.out.printf("Final Result: %d\n", f);
  78. timer.end();
  79. System.out.printf("Total Time: %d\n", timer.getTime());
  80. }
  81. public static void main(String[] args) {
  82. ThreadCase1 app = new ThreadCase1();
  83. try {
  84. app.run();
  85. } catch (InterruptedException e1) {
  86. e1.printStackTrace();
  87. }
  88. }
  89. }

执行结果与上面相同,可以看到CountownLatch非常好用,而且原理非常简单,那么我们来实现个吧
原理:await()阻断线程,并且MaxCount个线程执行countDown()之后才打开阻断,需要是不同的线程,需要注意的是countDown()函数不截断执行的线程,只截断执行await()的线程,如果想同时截断需要配合使用

  1. package com.zsh_o.future;
  2. import com.zsh_o.util.CountTimer;
  3. import com.zsh_o.util.Util;
  4. import javax.swing.event.MouseInputListener;
  5. import java.util.concurrent.locks.ReentrantLock;
  6. public class CountDownLatch {
  7. private int count;
  8. final private Object gobal;
  9. private ReentrantLock lockCount;
  10. public CountDownLatch(int count) {
  11. this.count = count;
  12. lockCount = new ReentrantLock();
  13. gobal = new Object();
  14. }
  15. public void countDown() {
  16. lockCount.lock();
  17. if(count > 0)
  18. count--;
  19. else return;
  20. if(count == 0)
  21. synchronized (gobal) {
  22. gobal.notifyAll();
  23. }
  24. lockCount.unlock();
  25. }
  26. public void await() throws InterruptedException {
  27. synchronized (gobal) {
  28. if (count > 0)
  29. gobal.wait();
  30. }
  31. }
  32. }

这里用了java的Object的wait和noifyAll()函数,用以任意数量的线程等待和恢复

Future

上面的问题是需要用户自己来考虑同步机制,如何把同步方法也封装到代码里面,让用户只关心代码逻辑即可,这个地方用的是Future模式,顾名思义,Future表示的是未来的一种意愿,表示未来会发生的事情,这个地方表示该实体在未来会得到返回值,调用future.get的表示期望得到给结果,如果结果还没有准备好则等待结果,例如上面的E会执行int e = futureC.get() + futureD.get()则E会等待C和D执行完之后继续执行。这个地方涉及到两个点:一,如何进入等待,二,如何从等待中恢复;这里的原则是等待过程不能占用计算资源,也就是不能用while一直判断的方法实现,那么实现方法就应该是事件驱动,进入等待释放计算资源,从等待恢复再重新可计算。

这个地方需要用代理模式,用代理形成逻辑,最后具体的执行由代理控制
首先定义futurable接口

  1. package com.zsh_o.future;
  2. public interface Futurable<T> {
  3. T get();
  4. }

下面是Future抽象类

  1. package com.zsh_o.future;
  2. public abstract class Future<T> implements Futurable<T>, Runnable {
  3. protected Callable<T> callable;
  4. protected boolean finished;
  5. public Future() {
  6. this.callable = null;
  7. this.finished = false;
  8. }
  9. public void register(Callable<T> callable) {
  10. this.callable = callable;
  11. }
  12. }

接下来是以线程的方式实现Future,当然也可以实现分布式的方式,核心思想相似

  1. package com.zsh_o.future;
  2. import com.zsh_o.util.CountTimer;
  3. import com.zsh_o.util.Util;
  4. public class ThreadFuture<T> extends Future<T> {
  5. private CountDownLatch latch;
  6. private T value;
  7. private boolean finished;
  8. private final Object lock;
  9. public ThreadFuture() {
  10. latch = new CountDownLatch(1);
  11. value = null;
  12. finished = false;
  13. lock = new Object();
  14. }
  15. @Override
  16. public T get() {
  17. try {
  18. if (callable == null)
  19. throw new Exception("Unregistered Callable");
  20. if (!finished){
  21. run();
  22. latch.await();
  23. }
  24. } catch (Exception e) {
  25. e.printStackTrace();
  26. }
  27. return value;
  28. }
  29. @Override
  30. public void run() {
  31. new Thread(()->{
  32. synchronized (lock) {
  33. if (!finished) {
  34. value = callable.call();
  35. finished = true;
  36. latch.countDown();
  37. }
  38. }
  39. }).start();
  40. }
  41. public void reset() {
  42. synchronized (lock) {
  43. finished = false;
  44. }
  45. }
  46. public boolean getState() {
  47. return finished;
  48. }
  49. }

实现的功能是,用register注册函数体逻辑Callable,需要注意的是Callable中需要调用代理的future.get()方法,以形成执行逻辑,get()执行await进入等待,等待其他线程执行完run(),并且保证只执行一次

  1. package com.zsh_o.future;
  2. import com.zsh_o.util.CountTimer;
  3. import com.zsh_o.util.Util;
  4. public class ThreadCase2 {
  5. private ThreadFuture<Integer> futureA, futureB, futureC, futureD, futureE, futureF;
  6. public ThreadCase2() {
  7. this.futureA = new ThreadFuture<>();
  8. this.futureB = new ThreadFuture<>();
  9. this.futureC = new ThreadFuture<>();
  10. this.futureD = new ThreadFuture<>();
  11. this.futureE = new ThreadFuture<>();
  12. this.futureF = new ThreadFuture<>();
  13. }
  14. void run() {
  15. CountTimer timer = new CountTimer();
  16. timer.start();
  17. futureA.register(()->{System.out.println("A: Runing");int a = 1;Util.sleep(1000);System.out.println("A: Over");return a;});
  18. futureB.register(()->{System.out.println("B: Runing");int b = futureA.get()+1;Util.sleep(1000);System.out.println("B: Over");return b;});
  19. futureC.register(()->{System.out.println("C: Runing");int c = 10;Util.sleep(1000);System.out.println("C: Over");return c;});
  20. futureD.register(()->{System.out.println("D: Runing");int d = 20;Util.sleep(1000);System.out.println("D: Over");return d;});
  21. futureE.register(()->{System.out.println("E: Runing");int e = futureC.get()+futureD.get();Util.sleep(1000);System.out.println("E: Over");return e;});
  22. futureF.register(()->{System.out.println("F: Runing");int f = futureB.get()*futureE.get();Util.sleep(1000);System.out.println("F: Over");return f;});
  23. futureA.run();futureB.run();futureC.run();futureD.run();futureE.run();futureF.run();
  24. System.out.printf("Final Result: %d\n", futureF.get());
  25. timer.end();
  26. System.out.printf("Total Time: %d\n", timer.getTime());
  27. }
  28. public static void main(String[] args) {
  29. ThreadCase2 app = new ThreadCase2();
  30. app.run();
  31. }
  32. }

结果:

  1. A: Runing
  2. B: Runing
  3. C: Runing
  4. D: Runing
  5. E: Runing
  6. F: Runing
  7. C: Over
  8. D: Over
  9. A: Over
  10. E: Over
  11. B: Over
  12. F: Over
  13. Final Result: 60
  14. Total Time: 3051

最后,如果并发的非常多可能会崩掉,所以我们加了一个限制线程执行状态最大值的方法

  1. package com.zsh_o.future;
  2. import java.util.concurrent.ConcurrentHashMap;
  3. import java.util.concurrent.ConcurrentLinkedDeque;
  4. public class ThreadPoolFuture<T> {
  5. private class Item<D> extends Future<D> {
  6. Thread thread;
  7. D value;
  8. CountDownLatch latch;
  9. final Object lock;
  10. boolean finished;
  11. public Item() {
  12. this.value = null;
  13. this.latch = new CountDownLatch(1);
  14. this.lock = new Object();
  15. this.finished = false;
  16. }
  17. @Override
  18. public D get() {
  19. return value;
  20. }
  21. @Override
  22. public void run() {
  23. if (!thread.isAlive()) {
  24. thread.start();
  25. }
  26. }
  27. }
  28. private int maxPool;
  29. private final Object lockObject;
  30. private ConcurrentHashMap<String, Item<T>> pool;
  31. private ConcurrentLinkedDeque<Item<T>> poolList;
  32. private int currentRuning;
  33. private CountDownLatch lathGobal;
  34. public ThreadPoolFuture(int maxPool) {
  35. this.maxPool = maxPool;
  36. this.pool = new ConcurrentHashMap<>();
  37. poolList = new ConcurrentLinkedDeque<>();
  38. this.lockObject = new Object();
  39. this.currentRuning = 0;
  40. }
  41. public void register(String name, Callable<T> callable) {
  42. Item<T> item = new Item<>();
  43. item.thread = new Thread(()->{
  44. synchronized (item.lock) {
  45. if (!item.finished) {
  46. synchronized (lockObject) {
  47. while (currentRuning >= maxPool) {
  48. try {
  49. lockObject.wait();
  50. } catch (InterruptedException e) {
  51. e.printStackTrace();
  52. }
  53. }
  54. currentRuning++;
  55. System.out.println(name + " Run: " + currentRuning);
  56. }
  57. item.value = callable.call();
  58. synchronized (lockObject) {
  59. currentRuning--;
  60. lockObject.notifyAll();
  61. }
  62. item.latch.countDown();
  63. lathGobal.countDown();
  64. item.finished = true;
  65. }
  66. }
  67. });
  68. pool.put(name, item);
  69. poolList.push(item);
  70. }
  71. public T get(String name) {
  72. if (!pool.containsKey(name)) {
  73. try {
  74. throw new Exception("Unregister Callable");
  75. } catch (Exception e) {
  76. e.printStackTrace();
  77. }
  78. }
  79. Item<T> item = pool.get(name);
  80. if (!item.finished) {
  81. try {
  82. synchronized (lockObject) {
  83. lockObject.notifyAll();
  84. currentRuning--;
  85. System.out.println(name + " Get: " + currentRuning);
  86. }
  87. item.run();
  88. item.latch.await();
  89. synchronized (lockObject) {
  90. while (currentRuning >= maxPool) {
  91. try {
  92. lockObject.wait();
  93. } catch (InterruptedException e) {
  94. e.printStackTrace();
  95. }
  96. }
  97. currentRuning++;
  98. System.out.println(name + " Get: " + currentRuning);
  99. }
  100. } catch (InterruptedException e) {
  101. e.printStackTrace();
  102. return null;
  103. }
  104. }
  105. return item.value;
  106. }
  107. public void start() {
  108. lathGobal = new CountDownLatch(poolList.size());
  109. for(var e: poolList) {
  110. e.run();
  111. }
  112. }
  113. public void await() throws InterruptedException {
  114. if (lathGobal == null)
  115. lathGobal = new CountDownLatch(poolList.size());
  116. lathGobal.await();
  117. }
  118. }

基本思路是,由于上面我们用消息的方法执行等待和恢复执行,所以进入等待和恢复执行对于我们来说是可见的,所以只需要监控当前运行的最大线程即可,只是执行状态的数量不包括阻塞和等待

  1. package com.zsh_o.future;
  2. import com.zsh_o.util.CountTimer;
  3. import com.zsh_o.util.Util;
  4. /**
  5. * Future模式解决问题,限制最大运行线程数
  6. * */
  7. public class ThreadCase3 {
  8. private ThreadPoolFuture<Integer> pool;
  9. public ThreadCase3() {
  10. this.pool = new ThreadPoolFuture<>(2);
  11. }
  12. void run() throws Exception {
  13. CountTimer timer = new CountTimer();
  14. timer.start();
  15. pool.register("A", ()->{
  16. System.out.println("A: Running");
  17. int a = 1;
  18. Util.sleep(1000);
  19. System.out.println("A: Over");
  20. return a;
  21. });
  22. pool.register("B", ()->{
  23. System.out.println("B: Running");
  24. int b = pool.get("A") + 1;
  25. Util.sleep(1000);
  26. System.out.println("B: Over");
  27. return b;
  28. });
  29. pool.register("C", ()->{
  30. System.out.println("C: Running");
  31. int c = 10;
  32. Util.sleep(1000);
  33. System.out.println("C: Over");
  34. return c;
  35. });
  36. pool.register("D", ()->{
  37. System.out.println("D: Running");
  38. Util.sleep(1000);
  39. System.out.println("D: Over");
  40. return 20;
  41. });
  42. pool.register("E",()->{
  43. System.out.println("E: Running");
  44. Util.sleep(1000);
  45. int e = pool.get("C") + pool.get("D");
  46. System.out.println("E: Over");
  47. return e;
  48. });
  49. pool.register("F", ()->{
  50. System.out.println("F: Running");
  51. int f = pool.get("B") * pool.get("E");
  52. Util.sleep(1000);
  53. System.out.println("F: Over");
  54. return f;
  55. });
  56. pool.start();
  57. pool.await();
  58. System.out.printf("Final Result: %d\n", pool.get("F"));
  59. timer.end();
  60. System.out.printf("Total Time: %d\n", timer.getTime());
  61. }
  62. public static void main(String[] args) {
  63. ThreadCase3 app = new ThreadCase3();
  64. try {
  65. app.run();
  66. } catch (Exception e) {
  67. e.printStackTrace();
  68. }
  69. }
  70. }

结果:

  1. maxCount = 2
  2. F Run: 1
  3. F: Running
  4. B Get: 0
  5. A Run: 1
  6. A: Running
  7. B Run: 2
  8. B: Running
  9. A Get: 1
  10. C Run: 2
  11. C: Running
  12. A: Over
  13. C: Over
  14. E Run: 2
  15. E: Running
  16. D Run: 2
  17. D: Running
  18. D Get: 1
  19. D: Over
  20. A Get: 2
  21. D Get: 2
  22. E: Over
  23. B: Over
  24. B Get: 1
  25. F: Over
  26. Final Result: 60
  27. Total Time: 4072
  28. ------------------------------
  29. maxCount = 1
  30. F Run: 1
  31. F: Running
  32. B Get: 0
  33. A Run: 1
  34. A: Running
  35. A: Over
  36. E Run: 1
  37. E: Running
  38. C Get: 0
  39. B Run: 1
  40. B: Running
  41. B: Over
  42. D Run: 1
  43. D: Running
  44. D: Over
  45. B Get: 1
  46. E Get: 0
  47. C Run: 1
  48. C: Running
  49. C: Over
  50. C Get: 1
  51. E: Over
  52. E Get: 1
  53. F: Over
  54. Final Result: 60
  55. Total Time: 6098
  56. ---------------------
  57. maxCount = 3
  58. F Run: 1
  59. F: Running
  60. A Run: 2
  61. A: Running
  62. B Run: 3
  63. B: Running
  64. A Get: 2
  65. E Run: 3
  66. E: Running
  67. B Get: 2
  68. D Run: 3
  69. D: Running
  70. A: Over
  71. C Run: 3
  72. D: Over
  73. C Get: 2
  74. C: Running
  75. A Get: 3
  76. C: Over
  77. C Get: 2
  78. E: Over
  79. B: Over
  80. B Get: 1
  81. F: Over
  82. Final Result: 60
  83. Total Time: 3088
  84. -------------------------
  85. maxCount = 4
  86. F Run: 1
  87. F: Running
  88. A Run: 2
  89. A: Running
  90. B Run: 3
  91. B: Running
  92. C Run: 4
  93. C: Running
  94. A Get: 3
  95. B Get: 2
  96. E Run: 3
  97. E: Running
  98. D Run: 4
  99. D: Running
  100. A: Over
  101. C: Over
  102. D: Over
  103. C Get: 2
  104. A Get: 2
  105. C Get: 2
  106. E: Over
  107. B: Over
  108. B Get: 1
  109. F: Over
  110. Final Result: 60
  111. Total Time: 3078
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注