@myecho
2019-03-18T16:21:31.000000Z
字数 5634
阅读 1005
Java
1. 如果线程池中的线程数量少于corePoolSize(核心线程数量),那么会直接开启一个新的核心线程来执行任务,即使此时有空闲线程存在.
2. 如果线程池中线程数量大于等于corePoolSize(核心线程数量),那么任务会被插入到任务队列中排队,等待被执行.此时并不添加新的线程.
3. 如果在步骤2中由于任务队列已满导致无法将新任务进行排队,这个时候有两种情况:
* 线程数量 [未] 达到maximumPoolSize(线程池最大线程数) , 立刻启动一个非核心线程来执行任务.
* 线程数量 [已] 达到maximumPoolSize(线程池最大线程数) , 拒绝执行此任务.ThreadPoolExecutor会通过RejectedExecutionHandler,抛出RejectExecutionException异常.
4. keepAliveTime
非核心线程闲置时的超时时长,超过这个时长,非核心线程就会被回收.当ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true时, keepAliveTime同样会作用于核心线程.
5. handler:饱和策略,大家都很忙,咋办呢,有四种策略
CallerRunsPolicy:只要线程池没关闭,就直接用调用者所在线程来运行任务
AbortPolicy:直接抛出 RejectedExecutionException 异常
DiscardPolicy:悄悄把任务放生,不做了
DiscardOldestPolicy:把队列里待最久的那个任务扔了,然后再调用 execute() 试试看能行不
我们也可以实现自己的 RejectedExecutionHandler 接口自定义策略,比如如记录日志什么的
[线程池状态]:
RUNNING: 接收新任务,并执行队列中的任务
SHUTDOWN: 不接收新任务,但是执行队列中的任务
STOP: 不接收新任务,不执行队列中的任务,中断正在执行中的任务
TIDYING: 所有的任务都已结束,线程数量为0,处于该状态的线程池即将调用terminated()方法
TERMINATED: terminated()方法执行完成
图中是线程运行的基本状态:线程调用start()方法开始后,就进入到可运行状态,随着CPU的资源调度在运行和可运行之间切换;遇到阻塞则进入阻塞状态。
CachedThreadPool 用于并发执行大量短期的小任务,或者是负载较轻的服务器。大小可伸缩的线程池。如果当前没有可用线程,则创建一个线程。在执行结束后缓存60s,如果不被调用则移除线程。调用execute()方法时可以重用缓存中的线程。适用于很多短期异步任务的环境,可以提高程序性能。
FixedThreadPool 用于负载比较重的服务器,为了资源的合理利用,需要限制当前线程数量。
SingleThreadExecutor 用于串行执行任务的场景,每个任务必须按顺序执行,不需要并发执行。
ScheduledThreadPoolExecutor 用于需要多个后台线程执行周期任务,同时需要限制线程数量的场景。
在这之前,这些工作需要依靠Timer/TimerTask或者其它第三方工具来完成,线程任务周期性的获得调度。
ExecutorService 提供了两种提交任务的方法:
execute():提交不需要返回值的任务
submit():提交需要返回值的任务
设置核心池的数量为 CPU 数的两倍,一般是 4、8,好点的 16 个线程
最大线程数设置为 64
空闲线程的存活时间设置为 1 秒
== ThreadPoolExecutor 参数解析 ==
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {}
队列
不使用JDK 自带的 Executors.newXxx 工厂方法. 一个重要原因是这些工厂方法容易让人忽略思考线程池相关的参数, 而这些参数对线程池的行为是及其重要的.
互联网场景下基本都要求 不排队、fail fast. 所以公司的大部分场景应该都适用 FenbiExecutors.newNonQueuedThreadPool
/**
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
*
*
*
@author chentienan, xuhongfeng
*/
public class FenbiExecutors {
private static final RejectedExecutionHandler defaultRejectHandler = new ThreadPoolExecutor.AbortPolicy();
public static ThreadPoolExecutor newNonQueuedThreadPool(String threadPoolName, int coreSize, int maximumPoolSize, long keepAliveTimeMillis) {
SynchronousQueue workQueue = new SynchronousQueue<>();
return newThreadPool(threadPoolName, coreSize, maximumPoolSize, keepAliveTimeMillis, workQueue);
}
public static ThreadPoolExecutor newQueuedThreadPool(String threadPoolName, int coreSize, int maximumPoolSize, long keepAliveTimeMillis, int queueCapacity) {
ArrayBlockingQueue workQueue = new ArrayBlockingQueue<>(queueCapacity);
return newThreadPool(threadPoolName, coreSize, maximumPoolSize, keepAliveTimeMillis, workQueue);
}
public static ThreadPoolExecutor newUnlimitedQueuedThreadPool(String threadPoolName, int coreSize, long keepAliveTimeMillis) {
LinkedBlockingQueue workQueue = new LinkedBlockingQueue<>();
int maximumPoolSize = coreSize; // unlimited queued 模式下, maximumPoolSize 不起作用
return newThreadPool(threadPoolName, coreSize, maximumPoolSize, keepAliveTimeMillis, workQueue);
}
public static ThreadPoolExecutor newThreadPool(String threadPoolName, int coreSize, int maximumPoolSize, long keepAliveTimeMillis, BlockingQueue workQueue) {
return newThreadPool(threadPoolName, coreSize, maximumPoolSize, keepAliveTimeMillis, workQueue, defaultRejectHandler);
}
public static ThreadPoolExecutor newThreadPool(String threadPoolName, int coreSize, int maximumPoolSize, long keepAliveTimeMillis, BlockingQueue workQueue, RejectedExecutionHandler handler) {
DefaultThreadFactory threadFactory = new DefaultThreadFactory(threadPoolName);
return new ThreadPoolExecutor(coreSize, maximumPoolSize, keepAliveTimeMillis, TimeUnit.MILLISECONDS,
workQueue, threadFactory, handler);
}
@Deprecated
public static ExecutorService newFixedThreadPool(int nThreads) {
return Executors.newFixedThreadPool(nThreads, new DefaultThreadFactory());
}
@Deprecated
public static ExecutorService newSingleThreadExecutor() {
return Executors.newSingleThreadExecutor(new DefaultThreadFactory());
}
@Deprecated
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory());
}
@Deprecated
public static ExecutorService newCachedThreadPool() {
return Executors.newCachedThreadPool(new DefaultThreadFactory());
}
}