@huangyichun
2017-08-30T08:06:01.000000Z
字数 2698
阅读 938
多线程
线程池接口:
/*** 自定义线程池*/public interface ThreadPool <Job extends Runnable>{//执行一个job,这个job需要实现Runnable接口void execute(Job job);//关闭线程池void shutdown();//增加工作线程void addWorkers(int num);//减少工作线程void removeWorker(int num);}
默认线程池
package threadpool;import java.util.*;import java.util.concurrent.atomic.AtomicLong;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;/*** 默认线程池* 该线程池在初始化固定的线程数,用Worker进行封装* 且提供一个线程安全的队列,存储Worker。* 同时提供一个线程不安全的工作队列,让线程池中的线程互斥无限循环的获取队列中的Job* 如果队列为空,则等待。如果队列中有新的Job,则唤醒等待线程。*/public class DefaultThreadPool <Job extends Runnable> implements ThreadPool<Job> {//线程池最大限制数private static final int MAX_WORKER_NUMBERS = 20;//线程池默认个数private static final int DEFAULT_WORKER_NUMBERS = 5;//线程池最小个数private static final int MIN_WOKER_NUMBERS = 1;//记录线程个数private AtomicLong threadNum = new AtomicLong();//工作线程列表private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());//存储工作队列private final LinkedList<Job> jobs = new LinkedList<>();//默认工作线程个数private int workerNum = DEFAULT_WORKER_NUMBERS;//互斥访问工作队列private ReentrantLock mainLock = new ReentrantLock();private Condition condition = mainLock.newCondition();//互斥修改工作线程列表private ReentrantLock workLock = new ReentrantLock();public DefaultThreadPool(int num) {workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS :num < MIN_WOKER_NUMBERS ? MIN_WOKER_NUMBERS : num;initializerWorkers(workerNum);}private void initializerWorkers(int num){for(int i=0; i<num; ++i){Worker worker = new Worker();workers.add(worker);}}@Overridepublic void execute(Job job) {if(job != null){mainLock.lock();condition.signal();try {jobs.addLast(job);} finally {mainLock.unlock();}}}@Overridepublic void shutdown() {workers.forEach(Worker::shutdown);}@Overridepublic void addWorkers(int num) {workLock.lock();try {if(num + this.workerNum > MAX_WORKER_NUMBERS){num = MAX_WORKER_NUMBERS - this.workerNum;}initializerWorkers(num);this.workerNum += num;} finally {workLock.unlock();}}@Overridepublic void removeWorker(int num) {workLock.lock();try {if(num > workerNum){throw new IllegalArgumentException("删除的线程个数超出总个数");}int count = 0;while(count < num){Worker worker = workers.get(count);if(workers.remove(worker)){worker.shutdown();++count;}}} finally {workLock.unlock();}}private final class Worker implements Runnable{final Thread thread;private volatile boolean running = true;//控制线程运行public Worker() {thread = new Thread(this, "ThreadPool-Worker-"+threadNum.getAndIncrement());thread.start();}@Overridepublic void run() {while (running){Job job = null;mainLock.lock();try {while(jobs.isEmpty()){condition.await();}job = jobs.removeFirst();} catch (InterruptedException e) {//感知外接对WorkerThread进行中断任务Thread.currentThread().interrupt();System.out.println("停止当前线程");return;}finally {mainLock.unlock();}if(job != null){job.run();}}}/*** 关闭改线程*/public void shutdown(){running = false;}}}