[关闭]
@boothsun 2018-03-06T20:36:57.000000Z 字数 3626 阅读 1468

Exchanger学习

Java多线程


  1. Java并发新构件之Exchanger
  2. JDK API

Exchaner 介绍

JDK API 解释

A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the exchange method, matches with a partner thread, and receives its partner's object on return. An Exchanger may be viewed as a bidirectional form of a SynchronousQueue. Exchangers may be useful in applications such as genetic algorithms and pipeline designs.
Sample Usage: Here are the highlights of a class that uses an Exchanger to swap buffers between threads so that the thread filling the buffer gets a freshly emptied one when it needs it, handing off the filled one to the thread emptying the buffer.

我的翻译:
Exchanger可以看成是一个同步点,在这个同步点上两个线程可以结对并且交换各自的数据。每个线程都可传入某个对象给exchange方法,然后与对应的伙伴线程匹配,并且接收伙伴线程交换的某个对象。Exchanger可以看成是一个双向的SynchronousQueue队列。Exchanger可以被用在诸如遗传算法和通道设计等应用里。

张孝祥老师的比喻
Exchaner好比两个毒贩要进行交易,一手交money,一手交drug,不管是谁先到接头地点后,就处于等待状态;当另外一个方也达到接头地点(所谓到达接头地点,也就是到达了准备接头的状态)时,两者的数据就立即交换了,然后就可以各忙各的了。

我的理解
Exchanger的作用:两个结对线程之前交换数据的工具类。

简单使用

  1. import java.util.concurrent.Exchanger;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. /**
  5. * Created by 58 on 2017-7-11.
  6. */
  7. public class ExchangerTest1 {
  8. private static final Exchanger<String> exchanger = new Exchanger<>();
  9. private static ExecutorService threadPool = Executors.newFixedThreadPool(3) ;
  10. public static void main(String[] args) {
  11. threadPool.execute(() -> {
  12. String A = "A" ;
  13. try {
  14. System.out.println("A--->" + exchanger.exchange(A)) ;
  15. } catch (Exception e) {
  16. e.printStackTrace();
  17. }
  18. });
  19. threadPool.execute(() -> {
  20. String B = "B" ;
  21. try {
  22. System.out.println("B--->" + exchanger.exchange(B));
  23. } catch (Exception e) {
  24. e.printStackTrace();
  25. }
  26. });
  27. threadPool.execute(() -> {
  28. String C = "C" ;
  29. try {
  30. System.out.println("C--->" + exchanger.exchange(C));
  31. } catch (Exception e) {
  32. e.printStackTrace();
  33. }
  34. });
  35. }
  36. }

应用场景

一个任务在创建对象,而这些对象的生产代价很高,另一个任务在消费这些对象。通过这种方式,可以有很多的对象被创建的同时被消费。

  1. import java.util.List;
  2. import java.util.concurrent.*;
  3. public class ExchangerDemo {
  4. static int size = 10;
  5. static int delay = 5; //秒
  6. public static void main(String[] args) throws Exception {
  7. ExecutorService exec = Executors.newCachedThreadPool();
  8. // 这个特定的List变体允许列表在被遍历的时候调用remove()方法,而不会抛出ConcurrentModifiedException异常
  9. List<Fat> producerList = new CopyOnWriteArrayList<>();
  10. List<Fat> consumerList = new CopyOnWriteArrayList<>();
  11. Exchanger<List<Fat>> exchanger = new Exchanger<>();
  12. exec.execute(new ExchangerProducer(exchanger, producerList));
  13. exec.execute(new ExchangerConsumer(exchanger, consumerList));
  14. TimeUnit.SECONDS.sleep(delay);
  15. exec.shutdownNow();
  16. }
  17. }
  18. class ExchangerProducer implements Runnable {
  19. private List<Fat> holder;
  20. private Exchanger<List<Fat>> exchanger;
  21. public ExchangerProducer(Exchanger<List<Fat>> exchanger, List<Fat> holder) {
  22. this.exchanger = exchanger;
  23. this.holder = holder;
  24. }
  25. @Override
  26. public void run() {
  27. try {
  28. while(!Thread.interrupted()) {
  29. //生产对象
  30. for (int i = 0;i < ExchangerDemo.size; i++) {
  31. holder.add(new Fat());
  32. }
  33. //生产完毕后 等待进行交换
  34. holder = exchanger.exchange(holder);
  35. }
  36. } catch (InterruptedException e) {
  37. }
  38. System.out.println("Producer stopped.");
  39. }
  40. }
  41. class ExchangerConsumer implements Runnable {
  42. private List<Fat> holder;
  43. private Exchanger<List<Fat>> exchanger;
  44. private volatile Fat value;
  45. private static int num = 0;
  46. public ExchangerConsumer(Exchanger<List<Fat>> exchanger, List<Fat> holder) {
  47. this.exchanger = exchanger;
  48. this.holder = holder;
  49. }
  50. @Override
  51. public void run() {
  52. try {
  53. while(!Thread.interrupted()) {
  54. //等待交换
  55. holder = exchanger.exchange(holder);
  56. //取到生产者生产的数据 开始模拟消费的场景
  57. for (Fat x : holder) {
  58. num ++;
  59. value = x;
  60. //消费 在循环内删除元素,这对于CopyOnWriteArrayList是没有问题的
  61. holder.remove(x);
  62. }
  63. if (num % 10000 == 0) {
  64. System.out.println("Exchanged count=" + num);
  65. }
  66. }
  67. } catch (InterruptedException e) {
  68. }
  69. System.out.println("Consumer stopped. Final value: " + value);
  70. }
  71. }
  72. class Fat {
  73. private volatile double d;
  74. private static int counter = 1;
  75. private final int id = counter++;
  76. public Fat() {
  77. //执行一段耗时的操作
  78. for (int i = 1; i<10000; i++) {
  79. d += (Math.PI + Math.E) / (double)i;
  80. }
  81. }
  82. public String toString() {return "Fat id=" + id;}
  83. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注