前言:JUC包,Exchanger源码学习
简介
Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,
如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。因此使用Exchanger的重点是成对的线程使用exchange()方法,当有一对线程达到了同步点,就会进行交换数据。
简单实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
public class MyThread extends Thread { private Exchanger<String> exchanger; private String data;
public MyThread() { }
public MyThread(String data, Exchanger<String> exchanger) { this.data = data; this.exchanger = exchanger; }
@Override public void run() { for (int i = 0; i < 3; i++) { try { TimeUnit.MILLISECONDS.sleep(10); System.out.println(Thread.currentThread().getName() + "第" + (i + 1) + "次交换前数据:" + data); data = exchanger.exchange(data); System.out.println(Thread.currentThread().getName() + "第" + (i + 1) + "次交换后数据:" + data); } catch (InterruptedException e) { e.printStackTrace(); } } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
public class Main { public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<>(); MyThread consumer = new MyThread("数据A",exchanger); MyThread producer = new MyThread("数据B",exchanger); consumer.setName("消费者"); producer.setName("生产者"); consumer.start(); producer.start(); } }
|
在上述例子中,两个线程中分别调用exchange方法进行数据的交换,当第一个线程执行后会进入等待状态,然后第二个线程调用exchange方法的时候,数据进行交换。
源码学习
Node
Exchanger 内部静态类,Node用于作为结点存储数据,注意一个Node对应两个线程数据,item、match分别存一个线程数据,当两个都不为null时,就进行交换。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @sun.misc.Contended static final class Node {
int index; int bound; int collides;
int hash; Object item; volatile Object match; volatile Thread parked; }
|
@sun.misc.Contended解决伪共享问题,减少内存消耗,提高性能
Participant
静态内部类,继承类ThreadLocal类。本身就一个方法,初始化创建Node,所以本质就是ThreadLocal类,用来保存线程本地变量Node
1 2 3
| static final class Participant extends ThreadLocal<Node> { public Node initialValue() { return new Node(); } }
|
属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| private static final int ASHIFT = 7;
private static final int MMASK = 0xff;
private static final int SEQ = MMASK + 1;
private static final int NCPU = Runtime.getRuntime().availableProcessors();
static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
private static final int SPINS = 1 << 10;
private static final Object NULL_ITEM = new Object();
private static final Object TIMED_OUT = new Object();
private final Participant participant;
private volatile Node[] arena;
private volatile Node slot;
private volatile int bound;
|
在定位arena数组的有效槽位时,需要考虑缓存行的影响。由于高速缓存与内存之间是以缓存行为单位交换数据的,根据局部性原理,相邻地址空间的数据会被加载到高速缓存的同一个数据块上(缓存行),而数组是连续的(逻辑,涉及到虚拟内存)内存地址空间,因此,多个slot会被加载到同一个缓存行上,当一个slot改变时,会导致这个slot所在的缓存行上所有的数据(包括其他的slot)无效,需要从内存重新加载,影响性能。下面的属性就是用来解决这个问题的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
private static final sun.misc.Unsafe U; private static final long BOUND; private static final long SLOT; private static final long MATCH; private static final long BLOCKER; private static final int ABASE; static { int s; try { U = sun.misc.Unsafe.getUnsafe(); Class<?> ek = Exchanger.class; Class<?> nk = Node.class; Class<?> ak = Node[].class; Class<?> tk = Thread.class; BOUND = U.objectFieldOffset (ek.getDeclaredField("bound")); SLOT = U.objectFieldOffset (ek.getDeclaredField("slot")); MATCH = U.objectFieldOffset (nk.getDeclaredField("match")); BLOCKER = U.objectFieldOffset (tk.getDeclaredField("parkBlocker")); s = U.arrayIndexScale(ak); ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);
} catch (Exception e) { throw new Error(e); } if ((s & (s-1)) != 0 || s > (1 << ASHIFT)) throw new Error("Unsupported array scale"); }
|
构造函数
创建Participant类
1 2 3
| public Exchanger() { participant = new Participant(); }
|
exchange
1 2 3 4 5 6 7 8 9 10 11 12
| public V exchange(V x) throws InterruptedException { Object v; Object item = (x == null) ? NULL_ITEM : x; if ((arena != null || (v = slotExchange(item, false, 0L)) == null) && ((Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null))) throw new InterruptedException(); return (v == NULL_ITEM) ? null : (V)v; }
|
核心操作就是if的判断条件,我把这个判断条件格式化一下,如下
1 2 3 4 5 6 7 8 9 10 11 12 13
| ( arena != null || (v = slotExchange(item, false, 0L)) == null ) && ( ( Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null ) )
|
Exchanger有两种数据交换的方式,当并发量低的时候,内部采用“单槽位交换”;并发量高的时候会采用“多槽位交换”。
arena==null,表示未出现线程竞争,进行单槽位交换,执行slotExchange(item, false, 0L)方法进行数据交换,并将赋值给v
arena!=null或者单槽交换失败,进行多槽位交换,执行arenaExchange(item, false, 0L)方法进行数据交换,并将赋值给v
这个exchange重载方法的区别就是加了时间限制,当超过指定时间回抛出异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { Object v; Object item = (x == null) ? NULL_ITEM : x; long ns = unit.toNanos(timeout); if ((arena != null || (v = slotExchange(item, true, ns)) == null) && ((Thread.interrupted() || (v = arenaExchange(item, true, ns)) == null))) throw new InterruptedException(); if (v == TIMED_OUT) throw new TimeoutException(); return (v == NULL_ITEM) ? null : (V)v; }
|
slotExchange
单槽交换,没有并发,一个线程先执行exchange方法,另一方再执行exchange方法,然后数据彼此交换
slotExchange的入参item表示当前线程携带的数据,返回值正常情况下为配对线程携带的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
|
private final Object slotExchange(Object item, boolean timed, long ns) { Node p = participant.get(); Thread t = Thread.currentThread(); if (t.isInterrupted()) return null; for (Node q;;) { if ((q = slot) != null) { if (U.compareAndSwapObject(this, SLOT, q, null)) { Object v = q.item; q.match = item; Thread w = q.parked; if (w != null) U.unpark(w); return v; } if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ)) arena = new Node[(FULL + 2) << ASHIFT]; } else if (arena != null) return null; else { p.item = item; if (U.compareAndSwapObject(this, SLOT, null, p)) break; p.item = null; } }
int h = p.hash; long end = timed ? System.nanoTime() + ns : 0L; int spins = (NCPU > 1) ? SPINS : 1; Object v; while ((v = p.match) == null) { if (spins > 0) { h ^= h << 1; h ^= h >>> 3; h ^= h << 10; if (h == 0) h = SPINS | (int)t.getId(); else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); } else if (slot != p) spins = SPINS; else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) { U.putObject(t, BLOCKER, this); p.parked = t; if (slot == p) U.park(false, ns); p.parked = null; U.putObject(t, BLOCKER, null); } else if (U.compareAndSwapObject(this, SLOT, p, null)) { v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; break; } } U.putOrderedObject(p, MATCH, null); p.item = null; p.hash = h; return v; }
|
简单的说单槽交换,就是第一个进程抢占slot字段,然后自旋等待(自旋结束根据情况进行下一步操作,可能堵塞、结束、超时、重新自旋等等),然后第二个线程进来与第一个吸纳城交换数据并设置slot为null。
交换数据后,两个线程分别返回匹配数据(就是对方的数据,或者后交换后得到的数据)
arean可以看做是一个并发符号(当然本质是一个数组,用于存储N个线程结点),arean!=null说明并发量大,需要使用多槽交换,至于自旋次数、CPU释放等等,都是优化操作,不必深入探究。
这样,两个线程就实现了数据交换,整个过程都没有用到同步操作。
arenaExchange
多槽交换比单槽交换要复杂很多。当并发量大时使用多槽交换,但是如何判断并发量是否大?通过上文单槽交换的源码,我们可以知道,当单槽CAS操作失败,说明同时有多个配对线程竞争修改slot槽位,此时就会初始化arena数组并开始多槽交换,如下图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
| private final Object arenaExchange(Object item, boolean timed, long ns) { Node[] a = arena; Node p = participant.get(); for (int i = p.index;;) { int b, m, c; long j; Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); if (q != null && U.compareAndSwapObject(a, j, q, null)) { Object v = q.item; q.match = item; Thread w = q.parked; if (w != null) U.unpark(w); return v; } else if (i <= (m = (b = bound) & MMASK) && q == null) { p.item = item; if (U.compareAndSwapObject(a, j, null, p)) { long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; Thread t = Thread.currentThread(); for (int h = p.hash, spins = SPINS;;) { Object v = p.match; if (v != null) { U.putOrderedObject(p, MATCH, null); p.item = null; p.hash = h; return v; } else if (spins > 0) { h ^= h << 1; h ^= h >>> 3; h ^= h << 10; if (h == 0) h = SPINS | (int)t.getId(); else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); } else if (U.getObjectVolatile(a, j) != p) spins = SPINS; else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) { U.putObject(t, BLOCKER, this); p.parked = t; if (U.getObjectVolatile(a, j) == p) U.park(false, ns); p.parked = null; U.putObject(t, BLOCKER, null); } else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) { if (m != 0) U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); p.item = null; p.hash = h; i = p.index >>>= 1; if (Thread.interrupted()) return null; if (timed && m == 0 && ns <= 0L) return TIMED_OUT; break; } } } else p.item = null; } else { if (p.bound != b) { p.bound = b; p.collides = 0; i = (i != m || m == 0) ? m : m - 1; } else if ((c = p.collides) < m || m == FULL || !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { p.collides = c + 1; i = (i == 0) ? m : i - 1; } else i = m + 1; p.index = i; } } }
|
arenaExchange方法的整体流程和slotExchange方法类似,主要区别在于使用了arena槽数组,然后使用了需要计算命中。
如果槽位被占用,说明已经有线程先到了,之后的处理和slotExchange一样;
如果槽位有效且为null,说明当前线程是先到的,就占用槽位,然后按照:spin->yield->block这种锁升级的顺序进行优化的等待,等不到配对线程就会进入阻塞。
另外,由于arenaExchange利用了槽数组,所以涉及到槽数组的扩容和缩减问题,也就是CASE2与CASE3中涉及的操作,这里就不深入
总结
单槽交换示意图:
多槽交换示意图:
参考
Java多线程进阶(二一)—— J.U.C之synchronizer框架:Exchanger
Exchanger 源码分析