[关闭]
@huangyichun 2017-08-30T16:06:01.000000Z 字数 2698 阅读 750

自定义线程池

多线程


线程池接口:

  1. /**
  2. * 自定义线程池
  3. */
  4. public interface ThreadPool <Job extends Runnable>{
  5. //执行一个job,这个job需要实现Runnable接口
  6. void execute(Job job);
  7. //关闭线程池
  8. void shutdown();
  9. //增加工作线程
  10. void addWorkers(int num);
  11. //减少工作线程
  12. void removeWorker(int num);
  13. }

默认线程池

  1. package threadpool;
  2. import java.util.*;
  3. import java.util.concurrent.atomic.AtomicLong;
  4. import java.util.concurrent.locks.Condition;
  5. import java.util.concurrent.locks.ReentrantLock;
  6. /**
  7. * 默认线程池
  8. * 该线程池在初始化固定的线程数,用Worker进行封装
  9. * 且提供一个线程安全的队列,存储Worker。
  10. * 同时提供一个线程不安全的工作队列,让线程池中的线程互斥无限循环的获取队列中的Job
  11. * 如果队列为空,则等待。如果队列中有新的Job,则唤醒等待线程。
  12. */
  13. public class DefaultThreadPool <Job extends Runnable> implements ThreadPool<Job> {
  14. //线程池最大限制数
  15. private static final int MAX_WORKER_NUMBERS = 20;
  16. //线程池默认个数
  17. private static final int DEFAULT_WORKER_NUMBERS = 5;
  18. //线程池最小个数
  19. private static final int MIN_WOKER_NUMBERS = 1;
  20. //记录线程个数
  21. private AtomicLong threadNum = new AtomicLong();
  22. //工作线程列表
  23. private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
  24. //存储工作队列
  25. private final LinkedList<Job> jobs = new LinkedList<>();
  26. //默认工作线程个数
  27. private int workerNum = DEFAULT_WORKER_NUMBERS;
  28. //互斥访问工作队列
  29. private ReentrantLock mainLock = new ReentrantLock();
  30. private Condition condition = mainLock.newCondition();
  31. //互斥修改工作线程列表
  32. private ReentrantLock workLock = new ReentrantLock();
  33. public DefaultThreadPool(int num) {
  34. workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS :
  35. num < MIN_WOKER_NUMBERS ? MIN_WOKER_NUMBERS : num;
  36. initializerWorkers(workerNum);
  37. }
  38. private void initializerWorkers(int num){
  39. for(int i=0; i<num; ++i){
  40. Worker worker = new Worker();
  41. workers.add(worker);
  42. }
  43. }
  44. @Override
  45. public void execute(Job job) {
  46. if(job != null){
  47. mainLock.lock();
  48. condition.signal();
  49. try {
  50. jobs.addLast(job);
  51. } finally {
  52. mainLock.unlock();
  53. }
  54. }
  55. }
  56. @Override
  57. public void shutdown() {
  58. workers.forEach(Worker::shutdown);
  59. }
  60. @Override
  61. public void addWorkers(int num) {
  62. workLock.lock();
  63. try {
  64. if(num + this.workerNum > MAX_WORKER_NUMBERS){
  65. num = MAX_WORKER_NUMBERS - this.workerNum;
  66. }
  67. initializerWorkers(num);
  68. this.workerNum += num;
  69. } finally {
  70. workLock.unlock();
  71. }
  72. }
  73. @Override
  74. public void removeWorker(int num) {
  75. workLock.lock();
  76. try {
  77. if(num > workerNum){
  78. throw new IllegalArgumentException("删除的线程个数超出总个数");
  79. }
  80. int count = 0;
  81. while(count < num){
  82. Worker worker = workers.get(count);
  83. if(workers.remove(worker)){
  84. worker.shutdown();
  85. ++count;
  86. }
  87. }
  88. } finally {
  89. workLock.unlock();
  90. }
  91. }
  92. private final class Worker implements Runnable{
  93. final Thread thread;
  94. private volatile boolean running = true;//控制线程运行
  95. public Worker() {
  96. thread = new Thread(this, "ThreadPool-Worker-"+threadNum.getAndIncrement());
  97. thread.start();
  98. }
  99. @Override
  100. public void run() {
  101. while (running){
  102. Job job = null;
  103. mainLock.lock();
  104. try {
  105. while(jobs.isEmpty()){
  106. condition.await();
  107. }
  108. job = jobs.removeFirst();
  109. } catch (InterruptedException e) {
  110. //感知外接对WorkerThread进行中断任务
  111. Thread.currentThread().interrupt();
  112. System.out.println("停止当前线程");
  113. return;
  114. }finally {
  115. mainLock.unlock();
  116. }
  117. if(job != null){
  118. job.run();
  119. }
  120. }
  121. }
  122. /**
  123. * 关闭改线程
  124. */
  125. public void shutdown(){
  126. running = false;
  127. }
  128. }
  129. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注