[关闭]
@MrXiao 2018-03-15T13:49:48.000000Z 字数 6179 阅读 931

Java线程池的创建

Java Concurrent


1. Executor框架

从JDK 1.5 开始,java.util.concurrent包中的Executor执行器将用来管理Thread对象,从而简化并发编程。

Executor是一套线程池管理框架,接口里只有一个方法execute,执行Runnable任务。
ExecutorService接口扩展了Executor,添加了线程生命周期的管理,提供任务终止、返回任务结果等方法。
AbstractExecutorService实现了ExecutorService,提供例如submit方法的默认实现逻辑。

然后到今天的主题ThreadPoolExecutor,继承了AbstractExecutorService,提供线程池的具体实现。

2. ThreadPoolExecutor

2.1 构造

ThreadPoolExecutor的构造方法有四种,下面是参数最全的构造:

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. }

2.2 定制线程池

ThreadPoolExecutor中有几个预先设置好的线程池,通过Executors的工厂方法创建。

2.2.1 newFixedThreadPool

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

newFixedThreadPool的corePoolSize和maximumPoolSize都设置为传入的固定数量,keepAliveTim设置为0。线程池创建后,线程数量将会固定不变,适合需要线程很稳定的场合。

2.2.2 newSingleThreadExecutor

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }

newSingleThreadExecutor是线程数量固定为1的newFixedThreadPool版本,保证池内的任务串行。注意到返回的是FinalizableDelegatedExecutorService,来看看源码:

  1. static class FinalizableDelegatedExecutorService
  2. extends DelegatedExecutorService {
  3. FinalizableDelegatedExecutorService(ExecutorService executor) {
  4. super(executor);
  5. }
  6. protected void finalize() {
  7. super.shutdown();
  8. }
  9. }

FinalizableDelegatedExecutorService继承了DelegatedExecutorService,仅仅在gc时增加关闭线程池的操作,再来看看DelegatedExecutorService的源码:

  1. static class DelegatedExecutorService extends AbstractExecutorService {
  2. private final ExecutorService e;
  3. DelegatedExecutorService(ExecutorService executor) { e = executor; }
  4. public void execute(Runnable command) { e.execute(command); }
  5. public void shutdown() { e.shutdown(); }
  6. public List<Runnable> shutdownNow() { return e.shutdownNow(); }
  7. public boolean isShutdown() { return e.isShutdown(); }
  8. public boolean isTerminated() { return e.isTerminated(); }
  9. ...
  10. }

DelegatedExecutorService包装了ExecutorService,使其只暴露出ExecutorService的方法,因此不能再配置线程池的参数。本来,线程池创建的参数是可以调整的,ThreadPoolExecutor提供了set方法。使用newSingleThreadExecutor目的是生成单线程串行的线程池,如果还能配置线程池大小,那就没意思了。

Executors还提供了unconfigurableExecutorService方法,将普通线程池包装成不可配置的线程池。如果不想线程池被不明所以的后人修改,可以调用这个方法。

2.2.3 newCachedThreadPool

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }

newCachedThreadPool生成一个会缓存的线程池,线程数量可以从0到Integer.MAX_VALUE,超时时间为1分钟。线程池用起来的效果是:如果有空闲线程,会复用线程;如果没有空闲线程,会新建线程;如果线程空闲超过1分钟,将会被回收。

2.2.4 newScheduledThreadPool

  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  2. return new ScheduledThreadPoolExecutor(corePoolSize);
  3. }
  1. public ScheduledThreadPoolExecutor(int corePoolSize) {
  2. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  3. new DelayedWorkQueue());
  4. }

newScheduledThreadPool将会创建一个可定时执行任务的线程池。这个不打算在本文展开,后续会另开文章细讲。

2.3 等待队列

看完上面四个预设置的线程池,回发现在构造中都有一个 Queue 参数。

线程池的大小或有限(newFixedThreadPool)、或几乎无限(newCachedThreadPool),但是系统资源是有限的,任务的处理速度总有可能比不上任务的提交速度。因此,可以为ThreadPoolExecutor提供一个阻塞队列来保存因线程不足而等待的Runnable任务,这就是BlockingQueue。

Executor提供了以下几种等待队列(部分):

newFixedThreadPool和newSingleThreadExecutor在默认情况下使用一个无界的LinkedBlockingQueue。要注意的是,如果任务一直提交,但线程池又不能及时处理,等待队列将会无限制地加长,系统资源总会有消耗殆尽的一刻。所以,推荐使用有界的等待队列,避免资源耗尽。但解决一个问题,又会带来新问题:队列填满之后,再来新任务,这个时候怎么办?后文会介绍如何处理队列饱和。

newCachedThreadPool使用的SynchronousQueue十分有趣,看名称是个队列,但它却不能存储元素。要将一个任务放进队列,必须有另一个线程去接收这个任务,一个进就有一个出,队列不会存储任何东西。因此,SynchronousQueue是一种移交机制,不能算是队列。newCachedThreadPool生成的是一个没有上限的线程池,理论上提交多少任务都可以,使用SynchronousQueue作为等待队列正合适。

2.4 饱和策略

在上一小节中说道,等待队列不能无限制的增长,但若队列填满了该如何处理?此时就是饱和策略在发挥作用。ThreadPoolExecutor的饱和策略通过传入RejectedExecutionHandler来实现。如果没有为构造函数传入,将会使用默认的defaultHandler。

  1. private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
  1. public static class AbortPolicy implements RejectedExecutionHandler {
  2. public AbortPolicy() { }
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4. throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
  5. }
  6. }

AbortPolicy是默认的实现,直接抛出一个RejectedExecutionException异常,让调用者自己处理。除此之外,还有几种饱和策略,来看一下

  1. public static class DiscardPolicy implements RejectedExecutionHandler {
  2. public DiscardPolicy() { }
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4. }
  5. }

DiscardPolicy的rejectedExecution直接是空方法,什么也不干。如果队列满了,后续的任务都抛弃掉。

  1. public static class DiscardOldestPolicy implements RejectedExecutionHandler {
  2. public DiscardOldestPolicy() { }
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4. if (!e.isShutdown()) {
  5. e.getQueue().poll();
  6. e.execute(r);
  7. }
  8. }
  9. }

DiscardOldestPolicy会将等待队列里最旧的任务踢走,让新任务得以执行。

  1. public static class CallerRunsPolicy implements RejectedExecutionHandler {
  2. public CallerRunsPolicy() { }
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4. if (!e.isShutdown()) {
  5. r.run();
  6. }
  7. }
  8. }

CallerRunsPolicy它既不抛弃新任务,也不抛弃旧任务,而是直接在当前线程运行这个任务。当前线程一般就是主线程啊,让主线程运行任务,说不定就阻塞了。如果不是想清楚了整套方案,还是少用这种策略为妙。

2.5 ThreadFactory

每当线程池需要创建一个新线程,都是通过线程工厂获取。如果不为ThreadPoolExecutor设定一个线程工厂,就会使用默认的defaultThreadFactory:

  1. public static ThreadFactory defaultThreadFactory() {
  2. return new DefaultThreadFactory();
  3. }
  1. static class DefaultThreadFactory implements ThreadFactory {
  2. private static final AtomicInteger poolNumber = new AtomicInteger(1);
  3. private final ThreadGroup group;
  4. private final AtomicInteger threadNumber = new AtomicInteger(1);
  5. private final String namePrefix;
  6. DefaultThreadFactory() {
  7. SecurityManager s = System.getSecurityManager();
  8. group = (s != null) ? s.getThreadGroup() :
  9. Thread.currentThread().getThreadGroup();
  10. namePrefix = "pool-" +
  11. poolNumber.getAndIncrement() +
  12. "-thread-";
  13. }
  14. public Thread newThread(Runnable r) {
  15. Thread t = new Thread(group, r,
  16. namePrefix + threadNumber.getAndIncrement(),
  17. 0);
  18. if (t.isDaemon())
  19. t.setDaemon(false);
  20. if (t.getPriority() != Thread.NORM_PRIORITY)
  21. t.setPriority(Thread.NORM_PRIORITY);
  22. return t;
  23. }
  24. }

平时打印线程池里线程的name时,会输出形如pool-1-thread-1之类的名称,就是在这里设置的。这个默认的线程工厂,创建的线程是普通的非守护线程,如果需要定制,实现ThreadFactory后传给ThreadPoolExecutor即可。

参考文章链接

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注