@boothsun
2018-03-06T12:36:57.000000Z
字数 3626
阅读 1613
Java多线程
A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the exchange method, matches with a partner thread, and receives its partner's object on return. An Exchanger may be viewed as a bidirectional form of a SynchronousQueue. Exchangers may be useful in applications such as genetic algorithms and pipeline designs.
Sample Usage: Here are the highlights of a class that uses an Exchanger to swap buffers between threads so that the thread filling the buffer gets a freshly emptied one when it needs it, handing off the filled one to the thread emptying the buffer.
我的翻译:
Exchanger可以看成是一个同步点,在这个同步点上两个线程可以结对并且交换各自的数据。每个线程都可传入某个对象给exchange方法,然后与对应的伙伴线程匹配,并且接收伙伴线程交换的某个对象。Exchanger可以看成是一个双向的SynchronousQueue队列。Exchanger可以被用在诸如遗传算法和通道设计等应用里。
张孝祥老师的比喻
Exchaner好比两个毒贩要进行交易,一手交money,一手交drug,不管是谁先到接头地点后,就处于等待状态;当另外一个方也达到接头地点(所谓到达接头地点,也就是到达了准备接头的状态)时,两者的数据就立即交换了,然后就可以各忙各的了。
我的理解
Exchanger的作用:两个结对线程之前交换数据的工具类。
import java.util.concurrent.Exchanger;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/*** Created by 58 on 2017-7-11.*/public class ExchangerTest1 {private static final Exchanger<String> exchanger = new Exchanger<>();private static ExecutorService threadPool = Executors.newFixedThreadPool(3) ;public static void main(String[] args) {threadPool.execute(() -> {String A = "A" ;try {System.out.println("A--->" + exchanger.exchange(A)) ;} catch (Exception e) {e.printStackTrace();}});threadPool.execute(() -> {String B = "B" ;try {System.out.println("B--->" + exchanger.exchange(B));} catch (Exception e) {e.printStackTrace();}});threadPool.execute(() -> {String C = "C" ;try {System.out.println("C--->" + exchanger.exchange(C));} catch (Exception e) {e.printStackTrace();}});}}
一个任务在创建对象,而这些对象的生产代价很高,另一个任务在消费这些对象。通过这种方式,可以有很多的对象被创建的同时被消费。
import java.util.List;import java.util.concurrent.*;public class ExchangerDemo {static int size = 10;static int delay = 5; //秒public static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool();// 这个特定的List变体允许列表在被遍历的时候调用remove()方法,而不会抛出ConcurrentModifiedException异常List<Fat> producerList = new CopyOnWriteArrayList<>();List<Fat> consumerList = new CopyOnWriteArrayList<>();Exchanger<List<Fat>> exchanger = new Exchanger<>();exec.execute(new ExchangerProducer(exchanger, producerList));exec.execute(new ExchangerConsumer(exchanger, consumerList));TimeUnit.SECONDS.sleep(delay);exec.shutdownNow();}}class ExchangerProducer implements Runnable {private List<Fat> holder;private Exchanger<List<Fat>> exchanger;public ExchangerProducer(Exchanger<List<Fat>> exchanger, List<Fat> holder) {this.exchanger = exchanger;this.holder = holder;}@Overridepublic void run() {try {while(!Thread.interrupted()) {//生产对象for (int i = 0;i < ExchangerDemo.size; i++) {holder.add(new Fat());}//生产完毕后 等待进行交换holder = exchanger.exchange(holder);}} catch (InterruptedException e) {}System.out.println("Producer stopped.");}}class ExchangerConsumer implements Runnable {private List<Fat> holder;private Exchanger<List<Fat>> exchanger;private volatile Fat value;private static int num = 0;public ExchangerConsumer(Exchanger<List<Fat>> exchanger, List<Fat> holder) {this.exchanger = exchanger;this.holder = holder;}@Overridepublic void run() {try {while(!Thread.interrupted()) {//等待交换holder = exchanger.exchange(holder);//取到生产者生产的数据 开始模拟消费的场景for (Fat x : holder) {num ++;value = x;//消费 在循环内删除元素,这对于CopyOnWriteArrayList是没有问题的holder.remove(x);}if (num % 10000 == 0) {System.out.println("Exchanged count=" + num);}}} catch (InterruptedException e) {}System.out.println("Consumer stopped. Final value: " + value);}}class Fat {private volatile double d;private static int counter = 1;private final int id = counter++;public Fat() {//执行一段耗时的操作for (int i = 1; i<10000; i++) {d += (Math.PI + Math.E) / (double)i;}}public String toString() {return "Fat id=" + id;}}