[关闭]
@huangyichun 2017-08-01T16:37:54.000000Z 字数 6524 阅读 1284

Java线程池总结

多线程


  1. /**
  2. * 一.线程池:提供了一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁额外开销,提高了响应速度
  3. * 二.线程池的体系结构
  4. * java.util.concurrent.Executor:负责线程使用和调度的根接口
  5. * |--**ExecutorService 子接口:线程池的主要接口
  6. * |--ThreadPoolExecutor 线程池实现类
  7. * |--ScheduledExecutorService 子接口:负责线程的调度
  8. * |--ScheduledThreadPoolExecutor: 继承了ThreadPoolExecutor,实现了ScheduledExecutorService
  9. * 三、工具类: Executors
  10. * ExecutorService newFixedThreadPool()创建固定的线程池
  11. * ExecutorService newCachedThreadPool():缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量。
  12. * ExecutorService newSingleThreadExecutor() 创建单个线程池。线程池中只有一个线程
  13. *
  14. * ScheduledExecutorService newScheduledThreadPool():创建固定大小的线程,可以延迟或者定时的执行任务。
  15. */

image.png-34.7kB

线程池使用好处:

创建一个线程池需要输入几个参数:

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. if (corePoolSize < 0 ||
  9. maximumPoolSize <= 0 ||
  10. maximumPoolSize < corePoolSize ||
  11. keepAliveTime < 0)
  12. throw new IllegalArgumentException();
  13. if (workQueue == null || threadFactory == null || handler == null)
  14. throw new NullPointerException();
  15. this.corePoolSize = corePoolSize;
  16. this.maximumPoolSize = maximumPoolSize;
  17. this.workQueue = workQueue;
  18. this.keepAliveTime = unit.toNanos(keepAliveTime);
  19. this.threadFactory = threadFactory;
  20. this.handler = handler;
  21. }

向线程池提交任务

我们可以使用execute提交的任务,但是execute方法没有返回值,所以无法判断任务知否被线程池执行成功。通过以下代码可知execute方法输入的任务是一个Runnable类的实例。

  1. ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10,20,2, TimeUnit.SECONDS,
  2. new LinkedBlockingQueue<Runnable>(50));
  3. poolExecutor.execute(new Runnable() {
  4. @Override
  5. public void run() {
  6. System.out.println(Thread.currentThread().getName());
  7. }
  8. });

我们也可以使用submit 方法来提交任务,它会返回一个future,那么我们可以通过这个future来判断任务是否执行成功,通过future的get方法来获取返回值,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。

  1. Future<Integer> future = poolExecutor.submit(new Callable<Integer>() {
  2. @Override
  3. public Integer call() throws Exception {
  4. int sum = 0;
  5. for(int i=0; i<1000; ++i){
  6. sum += i;
  7. }
  8. return sum;
  9. }
  10. });
  11. int i = 0;
  12. try {
  13. i = future.get();
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. } catch (ExecutionException e) {
  17. e.printStackTrace();
  18. }finally {
  19. poolExecutor.shutdown();
  20. }
  21. System.out.println(i);

线程池的关闭

线程池的分析

此处输入图片的描述
从上图我们可以看出,当提交一个新任务到线程池时,线程池的处理流程如下:

  1. 首先线程池判断基本线程池是否已满?没满,创建一个工作线程来执行任务。满了,则进入下个流程。
  2. 其次线程池判断工作队列是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。
  3. 最后线程池判断整个线程池是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。
  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. int c = ctl.get();
  5. //如果线程数小于基本线程数,则创建线程执行当前任务
  6. if (workerCountOf(c) < corePoolSize) {
  7. //调用addWorker原子的检查runState和workerCount防止创建出错
  8. if (addWorker(command, true))
  9. return;
  10. c = ctl.get();
  11. }
  12. //如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中
  13. if (isRunning(c) && workQueue.offer(command)) {
  14. int recheck = ctl.get();
  15. if (! isRunning(recheck) && remove(command))
  16. reject(command);
  17. else if (workerCountOf(recheck) == 0)
  18. addWorker(null, false);
  19. }
  20. //如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,则创建一个线程执行任务。
  21. else if (!addWorker(command, false))
  22. reject(command);
  23. }

工作线程: 线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会无限循环获取工作队列里的任务来执行。我们可以从Worker的run方法里看到这点:

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;
  4. w.firstTask = null;
  5. w.unlock(); // allow interrupts
  6. boolean completedAbruptly = true;
  7. try {
  8. //getTask()获取任务队列
  9. while (task != null || (task = getTask()) != null) {
  10. w.lock();
  11. // If pool is stopping, ensure thread is interrupted;
  12. // if not, ensure thread is not interrupted. This
  13. // requires a recheck in second case to deal with
  14. // shutdownNow race while clearing interrupt
  15. if ((runStateAtLeast(ctl.get(), STOP) ||
  16. (Thread.interrupted() &&
  17. runStateAtLeast(ctl.get(), STOP))) &&
  18. !wt.isInterrupted())
  19. wt.interrupt();
  20. try {
  21. beforeExecute(wt, task);
  22. Throwable thrown = null;
  23. try {
  24. task.run();
  25. } catch (RuntimeException x) {
  26. thrown = x; throw x;
  27. } catch (Error x) {
  28. thrown = x; throw x;
  29. } catch (Throwable x) {
  30. thrown = x; throw new Error(x);
  31. } finally {
  32. afterExecute(task, thrown);
  33. }
  34. } finally {
  35. task = null;
  36. w.completedTasks++;
  37. w.unlock();
  38. }
  39. }
  40. completedAbruptly = false;
  41. } finally {
  42. processWorkerExit(w, completedAbruptly);
  43. }
  44. }

合理的配置线程池

要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:

- 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
- 任务的优先级:高,中和低。
- 任务的执行时间:长,中和短。
- 任务的依赖性:是否依赖其他系统资源,如数据库连接。

任务性质不同的任务可以用不同规模的线程池分开处理。


建议使用有界队列

有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。有一次我们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞住,任务积压在线程池里。如果当时我们设置成无界队列,线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然我们的系统所有的任务是用的单独的服务器部署的,而我们使用不同规模的线程池跑不同类型的任务,但是出现这样问题时也会影响到其他任务。

参考文档:
http://ifeve.com/java-threadpool/
Java并发编程的艺术

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注