@linux1s1s
2017-01-22T16:58:54.000000Z
字数 7401
阅读 2828
Java
2015-04
在基本了解了并发线程的主要类以后(如果你对这些类没有基本的概念,请阅读Java 并发编程框架(一))
这篇文章会对前一篇文章提及的主要类做进一步说明
CompletionService究竟是什么,如果一开始很难理解它,那么我们可以通过一个Demo来模仿一下ExecutorCompletionService这个实现类
Case.Java 测试用例
public class Case
{
public static void main(String[] args)
{
CommonCompletionService commonCompletionService = new CommonCompletionService();
commonCompletionService.submit(CompletionCase.COMPLETION_MANUL);
}
public static enum FutureCase
{
FUTURETASK_EXECUTOR, FUTURE_EXECUTOR, FUTURETASK_THREAD, COMPLETIONSERVICE
}
public static enum CompletionCase
{
COMPLETION_MANUL, COMPLETION_LIB
}
}
CommonCompletionService.java 业务逻辑类
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import com.executor.demo.Case.CompletionCase;
public class CommonCompletionService
{
public static class WorkBack implements Callable<String>
{
private String name;
public WorkBack(String name)
{
this.name = name;
}
@Override
public String call() throws Exception
{
try
{
Thread.sleep(new Random().nextInt(2000));
}
catch (Exception e)
{
e.printStackTrace();
}
return name;
}
}
private static final int TASK_TOTAL = 10;
public void submit(CompletionCase completionCase)
{
if (completionCase == null) return;
switch (completionCase)
{
case COMPLETION_MANUL:
ExecutorService pool = Executors.newFixedThreadPool(5);
BlockingQueue<Future<String>> queue = new LinkedBlockingQueue<Future<String>>();
for (int i = 0; i < TASK_TOTAL; i++)
{
Future<String> future = pool.submit(new WorkBack(Thread.currentThread().getName() + " " + i));
queue.add(future);
}
for (int i = 0; i < TASK_TOTAL; i++)
{
try
{
System.out.println("COMPLETION_MANUL:" + queue.take().get());
}
catch (InterruptedException e)
{
e.printStackTrace();
}
catch (ExecutionException e)
{
e.printStackTrace();
}
}
pool.shutdown();
break;
case COMPLETION_LIB:
ExecutorService pool2 = Executors.newFixedThreadPool(5);
CompletionService<String> completionService = new ExecutorCompletionService<String>(pool2);
for (int i = 0; i < TASK_TOTAL; i++)
{
completionService.submit(new WorkBack(Thread.currentThread().getName() + " " + i));
}
for (int i = 0; i < TASK_TOTAL; i++)
{
try
{
System.out.println("COMPLETION_LIB:" + completionService.take().get());
}
catch (InterruptedException e)
{
e.printStackTrace();
}
catch (ExecutionException e)
{
e.printStackTrace();
}
}
pool2.shutdown();
break;
default:
break;
}
}
}
运行结果:
COMPLETION_MANUL:main 0
COMPLETION_MANUL:main 1
COMPLETION_MANUL:main 2
COMPLETION_MANUL:main 3
COMPLETION_MANUL:main 4
COMPLETION_MANUL:main 5
COMPLETION_MANUL:main 6
COMPLETION_MANUL:main 7
COMPLETION_MANUL:main 8
COMPLETION_MANUL:main 9
接下来分析一下上面的程序,case COMPLETION_MANUL:这个部分的程序是模仿CompletionService的简单实现,而case COMPLETION_LIB:这个部分程序就是CompletionService简单Demo,所以对比上下这个两部分程序,可以看出,CompletionService的主要功能是封装了具有BlockingQueue的ExecutorService线程池,这样理解起来比较形象,其实看看CompletionServic的实现类,也是这样做的,感兴趣的话可以看看源代码,以便更深入的理解。
还记得Java 并发编程框架(一)这篇文章中提及的Executors 其中比较重要的静态方法,仔细看看代码会发现,这些静态方法都指向了同一个类ThreadPoolExecutor,很明显它是线程池的生成类,接下来如果想进一步了解线程池,那么不可避免的会和这个类打交道了。
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
...
}
我们重点看一下参数是5个的构造器即可,下面解释一下这5个参数分别代表神马意思。
这些个参数一开始很容易让人望文生义:线程池里保持corePoolSize个线程,如果不够用,就加线程入池直至maximumPoolSize大小,如果还不够就往workQueue里加,如果workQueue也不够就用RejectedExecutionHandler来做拒绝处理。
在你认真查看ThreadPoolExecutor源代码上面一大段英文注释过后,你会发现上面的理解是多么可笑,下面将原文翻译过后大概的理解说一下:(为了表述上的方便我们定义一下当前线程池线程数量为CurrentPoolSize)
上面解释了基本的参数含义,接下来看看上文提及的Executor静态方法
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
}
它将corePoolSize和maximumPoolSize都设定为了nThreads,这样便实现了线程池的大小的固定,不会动态地扩大,另外,keepAliveTime设定为了0,也就是说线程只要空闲下来,就会被移除线程池,关于LinkedBlockingQueue后面会给出详细说明。
public class Executors {
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
}
它将corePoolSize设定为0,而将maximumPoolSize设定为了Integer的最大值,线程空闲超过60秒,将会从线程池中移除。由于核心线程数为0,因此每次添加任务,都会先从线程池中找空闲线程,如果没有就会创建一个线程(SynchronousQueue决定的,后面会说)来执行新的任务,并将该线程加入到线程池中,而最大允许的线程数为Integer的最大值,因此这个线程池理论上可以不断扩大。
当任务源源不断的过来,而我们的系统又处理不过来的时候,我们要采取的策略是拒绝服务。RejectedExecutionHandler接口提供了拒绝任务处理的自定义方法的机会。在ThreadPoolExecutor中已经包含四种处理策略。
分别对这个四种策略说明如下:
线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。(开始我总不想丢弃任务的执行,但是对某些应用场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执行的,很可能导致程序没法继续跑了。需要视业务情景而定吧。)
处理程序遭到拒绝将抛出运行时 RejectedExecutionException。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException();
}
这个策略直接抛出异常,丢弃任务。(jdk默认策略,队列满并线程满时直接拒绝添加新任务,并抛出异常,所以说有时候放弃也是一种勇气,为了保证后续任务的正常进行,丢弃一些也是可以接收的,记得做好记录)
不能执行的任务将被删除。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。
如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心。