[关闭]
@Yano 2017-08-07T08:01:47.000000Z 字数 4061 阅读 1841

JDK源码分析 线程池

Java


说明

对于JDK源码分析的文章,仅仅记录我认为重要的地方。源码的细节实在太多,不可能面面俱到地写清每个逻辑。所以我的JDK源码分析,着重在JDK的体系架构层面,具体源码可以参考:http://www.cnblogs.com/skywang12345/category/455711.html

架构图

Executor 函数接口

Executor:提供一种将"任务提交"与"任务如何运行"分离开来的机制。

void execute(Runnable command)

ExecutorService

ExecutorService提供了"将任务提交给执行者的接口(submit方法)","让执行者执行任务(invokeAll, invokeAny方法)"的接口等等。

AbstractExecutorService

抽象类,为ExecutorService中的函数接口提供了默认实现。

ThreadPoolExecutor

大名鼎鼎的“线程池”。

ScheduledExecutorService

接口,提供了“延时”和“周期执行”接口。

ScheduledThreadPoolExecutor

继承于ThreadPoolExecutor,并且实现了ScheduledExecutorService接口。它相当于提供了"延时"和"周期执行"功能的ScheduledExecutorService。

Executors

静态工厂类。它通过静态工厂方法返回 ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 等类的对象。

示例

  1. class MyThreadPool extends Thread {
  2. @Override
  3. public void run() {
  4. System.out.println(Thread.currentThread().getName() + " is running... ");
  5. }
  6. }
  7. @Test
  8. public void testThreadPoolExecutor() throws InterruptedException {
  9. ExecutorService pool = Executors.newFixedThreadPool(2);
  10. for (int i = 0; i < 5; i++) {
  11. TimeUnit.SECONDS.sleep(2);
  12. pool.execute(new MyThreadPool());
  13. }
  14. pool.shutdown();
  15. }

输出:

pool-1-thread-1 is running... 
pool-1-thread-2 is running... 
pool-1-thread-1 is running... 
pool-1-thread-2 is running... 
pool-1-thread-1 is running... 

分析:

示例代码中,新建了一个大小固定为2的线程池,并将5个线程依次放入线程池。从输出结果中可以看出,是线程pool-1-thread-1和线程pool-1-thread-2相互交替,并没有新建多余的线程。

静态工厂类

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

创建一个线程池,线程池的容量是nThreads。已提交但是没有执行的任务,会被阻塞,直到有任务运行完成。

newFixedThreadPool()在调用ThreadPoolExecutor()时,会传递一个LinkedBlockingQueue()对象,而LinkedBlockingQueue单向链表实现的阻塞队列。在线程池中,就是通过该阻塞队列来实现"当线程池中任务数量超过允许的任务数量时,部分任务会阻塞等待"。

ThreadFactory

  1. public static ThreadFactory defaultThreadFactory() {
  2. return new DefaultThreadFactory();
  3. }
  1. static class DefaultThreadFactory implements ThreadFactory {
  2. private static final AtomicInteger poolNumber = new AtomicInteger(1);
  3. private final ThreadGroup group;
  4. private final AtomicInteger threadNumber = new AtomicInteger(1);
  5. private final String namePrefix;
  6. DefaultThreadFactory() {
  7. SecurityManager s = System.getSecurityManager();
  8. group = (s != null) ? s.getThreadGroup() :
  9. Thread.currentThread().getThreadGroup();
  10. namePrefix = "pool-" +
  11. poolNumber.getAndIncrement() +
  12. "-thread-";
  13. }
  14. // 提供创建线程的API。
  15. public Thread newThread(Runnable r) {
  16. // 线程对应的任务是Runnable对象r
  17. Thread t = new Thread(group, r,
  18. namePrefix + threadNumber.getAndIncrement(),
  19. 0);
  20. // 设为“非守护线程”
  21. if (t.isDaemon())
  22. t.setDaemon(false);
  23. // 将优先级设为“Thread.NORM_PRIORITY”
  24. if (t.getPriority() != Thread.NORM_PRIORITY)
  25. t.setPriority(Thread.NORM_PRIORITY);
  26. return t;
  27. }
  28. }

excute 添加任务到“线程池”

  1. public void execute(Runnable command) {
  2. // 如果任务为null,则抛出异常。
  3. if (command == null)
  4. throw new NullPointerException();
  5. // 获取ctl对应的int值。该int值保存了"线程池中任务的数量"和"线程池状态"信息
  6. int c = ctl.get();
  7. // 当线程池中的任务数量 < "核心池大小"时,即线程池中少于corePoolSize个任务。
  8. // 则通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
  9. if (workerCountOf(c) < corePoolSize) {
  10. if (addWorker(command, true))
  11. return;
  12. c = ctl.get();
  13. }
  14. // 当线程池中的任务数量 >= "核心池大小"时,
  15. // 而且,"线程池处于允许状态"时,则尝试将任务添加到阻塞队列中。
  16. if (isRunning(c) && workQueue.offer(command)) {
  17. // 再次确认“线程池状态”,若线程池异常终止了,则删除任务;然后通过reject()执行相应的拒绝策略的内容。
  18. int recheck = ctl.get();
  19. if (! isRunning(recheck) && remove(command))
  20. reject(command);
  21. // 否则,如果"线程池中任务数量"为0,则通过addWorker(null, false)尝试新建一个线程,新建线程对应的任务为null。
  22. else if (workerCountOf(recheck) == 0)
  23. addWorker(null, false);
  24. }
  25. // 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
  26. // 如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
  27. else if (!addWorker(command, false))
  28. reject(command);
  29. }

shutdown 关闭线程池

  1. public void shutdown() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. // 获取锁
  4. mainLock.lock();
  5. try {
  6. // 检查终止线程池的“线程”是否有权限。
  7. checkShutdownAccess();
  8. // 设置线程池的状态为关闭状态。
  9. advanceRunState(SHUTDOWN);
  10. // 中断线程池中空闲的线程。
  11. interruptIdleWorkers();
  12. // 钩子函数,在ThreadPoolExecutor中没有任何动作。
  13. onShutdown(); // hook for ScheduledThreadPoolExecutor
  14. } finally {
  15. // 释放锁
  16. mainLock.unlock();
  17. }
  18. // 尝试终止线程池
  19. tryTerminate();
  20. }

线程池状态

  1. rivate final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl是一个AtomicInteger类型的原子对象。ctl记录了"线程池中的任务数量"和"线程池状态"2个信息。
ctl共包括32位。其中,高3位表示"线程池状态",低29位表示"线程池中的任务数量"。

RUNNING    -- 对应的高3位值是111。
SHUTDOWN   -- 对应的高3位值是000。
STOP       -- 对应的高3位值是001。
TIDYING    -- 对应的高3位值是010。
TERMINATED -- 对应的高3位值是011。
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注