[关闭]
@liter 2015-12-11T15:14:00.000000Z 字数 3626 阅读 3490

Android网络通信框架LiteHttp 第十五节:并发调度控制器详解

litehttp2.x版本系列教程


官网: http://litesuits.com

QQ群: 大群 47357508,二群 42960650

本系列文章面向android开发者,展示开源网络通信框架LiteHttp的主要用法,并讲解其关键功能的运作原理,同时传达了一些框架作者在日常开发中的一些最佳实践和经验。

本系列文章目录总览: https://zybuluo.com/liter/note/186513


第十五节:LiteHttp之并发调度控制器详解

1. 基本简介和使用

框架内置了一枚并发调度器,即SmartExecutor,不仅用来支持lite-http的异步并发支持,更可以直接投入 Runnable、Callable、FutureTask 等类型的运行任务。

可以用来作为 App 内支持异步并发的重要组件,在一个 App 中 SmartExecutor 可以有多个实例,每个实例都有独立核心和等待线程数指标,每个实例都有独立的调度和满载处理策略,但它们 共享一个线程池。这种机制既满足不同木块对线程控制和任务调度的独立需求,又共享一个池资源。独立又共享,最大程度上节省资源,提升性能。

SmartExecutor具有一下特性:

可定义核心并发线程数,即同一时间并发的请求数量。

可定义等待排队线程数,即超出核心并发数后可排队请求数量。

可定义等待队列进入执行状态的策略:先来先执行,后来先执行。

可定义等待队列满载后处理新请求的策略:

  • 抛弃队列中最新的任务
  • 抛弃队列中最旧的任务
  • 抛弃当前新任务
  • 直接执行(阻塞当前线程)
  • 抛出异常(中断当前线程)

看一下实例代码,初始化:

  1. // 智能并发调度控制器:设置[最大并发数],和[等待队列]大小
  2. SmartExecutor smallExecutor = new SmartExecutor();
  3. // set temporary parameter just for test
  4. // 一下参数设置仅用来测试,具体设置看实际情况。
  5. // number of concurrent threads at the same time, recommended core size is CPU count
  6. smallExecutor.setCoreSize(2);
  7. // adjust maximum number of waiting queue size by yourself or based on phone performance
  8. smallExecutor.setQueueSize(2);
  9. // 任务数量超出[最大并发数]后,自动进入[等待队列],等待当前执行任务完成后按策略进入执行状态:后进先执行。
  10. smallExecutor.setSchedulePolicy(SchedulePolicy.LastInFirstRun);
  11. // 后续添加新任务数量超出[等待队列]大小时,执行过载策略:抛弃队列内最旧任务。
  12. smallExecutor.setOverloadPolicy(OverloadPolicy.DiscardOldTaskInQueue);

上述代码设计了一个可同时并发2个线程,并发满载后等待队列可容纳2个线程,排队队列中后进的任务先执行,等待队列装满后新任务来到将抛弃队列中最老的任务。

测试多个线程并发的情况:

  1. // 一次投入 4 个任务
  2. for (int i = 0; i < 4; i++) {
  3. final int j = i;
  4. smallExecutor.execute(new Runnable() {
  5. @Override
  6. public void run() {
  7. HttpLog.i(TAG, " TASK " + j + " is running now ----------->");
  8. SystemClock.sleep(j * 200);
  9. }
  10. });
  11. }
  12. // 再投入1个可能需要取消的任务
  13. Future future = smallExecutor.submit(new Runnable() {
  14. @Override
  15. public void run() {
  16. HttpLog.i(TAG, " TASK 4 will be canceled... ------------>");
  17. SystemClock.sleep(1000);
  18. }
  19. });
  20. // 合适的时机取消此任务
  21. future.cancel(false);
  22. ```
  23. 上述代码,一次依次投入 0123 四个任务,然后接着投入了新任务4,返回一个Future对象。
  24. 根据设置,01会立即执行,执行满载后23进入排队队列,排队满载后独立投入的任务4来到,队列中最老的任务2被移除,队列中为34
  25. 因为4随后被取消执行,所以最后输出:
  26. ```java
  27. TASK 0 is running now ----------->
  28. TASK 1 is running now ----------->
  29. TASK 3 is running now ----------->
  30. <div class="md-section-divider"></div>

向我在另一篇文章中建议的,同时并发的线程数量不要过多,可以保持在CPU核数左右,并发线程过多了CPU时间片过多的轮转分配造成吞吐量降低,过少不能充分利用CPU,并发数可以适当比CPU核数多一点没问题。

2. 基本原理

我们看 SmartExecutor 的几个主要方法:

  1. public Future<?> submit(Runnable task)
  2. public <T> Future<T> submit(Runnable task, T result)
  3. public <T> Future<T> submit(Callable<T> task)
  4. public <T> void submit(RunnableFuture<T> task)
  5. public void execute(final Runnable command)
  6. <div class="md-section-divider"></div>

最主要的是 execute 方法,其他几个方法是将任务封装为 FutureTask 投入到 execute 方法执行。因为 FutureTask 本质就是一个 RunnableFuture 对象,兼具 Runnable 和 Future 的特性和功能。

那么重点就是看 execute 方法了:

  1. @Override
  2. public void execute(final Runnable command) {
  3. if (command == null) {
  4. return;
  5. }
  6. WrappedRunnable scheduler = new WrappedRunnable() {
  7. @Override
  8. public Runnable getRealRunnable() {
  9. return command;
  10. }
  11. public Runnable realRunnable;
  12. @Override
  13. public void run() {
  14. try {
  15. command.run();
  16. } finally {
  17. scheduleNext(this);
  18. }
  19. }
  20. };
  21. boolean callerRun = false;
  22. synchronized (lock) {
  23. if (runningList.size() < coreSize) {
  24. runningList.add(scheduler);
  25. threadPool.execute(scheduler);
  26. } else if (waitingList.size() < queueSize) {
  27. waitingList.addLast(scheduler);
  28. } else {
  29. switch (overloadPolicy) {
  30. case DiscardNewTaskInQueue:
  31. waitingList.pollLast();
  32. waitingList.addLast(scheduler);
  33. break;
  34. case DiscardOldTaskInQueue:
  35. waitingList.pollFirst();
  36. waitingList.addLast(scheduler);
  37. break;
  38. case CallerRuns:
  39. callerRun = true;
  40. break;
  41. case DiscardCurrentTask:
  42. break;
  43. case ThrowExecption:
  44. throw new RuntimeException("Task rejected from lite smart executor. " + command.toString());
  45. default:
  46. break;
  47. }
  48. }
  49. //printThreadPoolInfo();
  50. }
  51. if (callerRun) {
  52. command.run();
  53. }
  54. }

可以看到整个过程简单概括为:

  1. 把任务封装为一个类似“链表”的结构体,执行完一个,调度下一个。
  2. 加锁防止并发时抢夺资源,判断当前运行任务数量。
  3. 当前任务数少于并发最大数量则投入运行,若满载则投入等待队列尾部。
  4. 若等待队列未满新任务进入排队,若满则执行满载处理策略。
  5. 当一个任务执行完,其尾部通过“链接”的方式调度一个新任务执行。若没有任务,则结束。

其中 加锁 和使用一个 WrappedRunnable 将任务包装成“链接”是重点。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注