0%

Exchanger源码学习

前言:JUC包,Exchanger源码学习

简介

Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,

image-20200105194725264

如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。因此使用Exchanger的重点是成对的线程使用exchange()方法,当有一对线程达到了同步点,就会进行交换数据。

简单实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* @author shency
* @description: Exchanger学习
* @date: 2019/10/30
*/
public class MyThread extends Thread {
private Exchanger<String> exchanger;
private String data;

public MyThread() {
}

public MyThread(String data, Exchanger<String> exchanger) {
this.data = data;
this.exchanger = exchanger;
}

@Override
public void run() {
for (int i = 0; i < 3; i++) {
try {
TimeUnit.MILLISECONDS.sleep(10);
System.out.println(Thread.currentThread().getName() + "第" + (i + 1) + "次交换前数据:" + data);
data = exchanger.exchange(data);
System.out.println(Thread.currentThread().getName() + "第" + (i + 1) + "次交换后数据:" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* @author shency
* @description: 测试
* @date: 2019/10/30
*/
public class Main {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
MyThread consumer = new MyThread("数据A",exchanger);
MyThread producer = new MyThread("数据B",exchanger);
consumer.setName("消费者");
producer.setName("生产者");
consumer.start();
producer.start();
}
}

image-20200105194738499

在上述例子中,两个线程中分别调用exchange方法进行数据的交换,当第一个线程执行后会进入等待状态,然后第二个线程调用exchange方法的时候,数据进行交换。

源码学习

Node

Exchanger 内部静态类,Node用于作为结点存储数据,注意一个Node对应两个线程数据,item、match分别存一个线程数据,当两个都不为null时,就进行交换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 表示线程交换数据结点
@sun.misc.Contended static final class Node {
/**
* 多槽交换使用
*/
// 索引
int index;
// 上次记录的Exchanger.bound值
int bound;
// CAS失败次数
int collides;

/**
* 单槽、多槽交换使用
*/
// hash值
int hash;
// 线程存储对象
Object item;
// 匹配的对象
volatile Object match;
// 设置线程park
volatile Thread parked;
}

@sun.misc.Contended解决伪共享问题,减少内存消耗,提高性能

Participant

静态内部类,继承类ThreadLocal类。本身就一个方法,初始化创建Node,所以本质就是ThreadLocal类,用来保存线程本地变量Node

1
2
3
static final class Participant extends ThreadLocal<Node> {
public Node initialValue() { return new Node(); }
}
属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// arena数组中两个已使用的slot 之间的索引距离,将它们分开以避免错误的共享,1<<ASHIFT是至少应为缓存大小。
private static final int ASHIFT = 7;

// arena数组大小限制为MMASK+1,MMASK一定是2的N次方减1,初始为0xff(255)
private static final int MMASK = 0xff;

// 绑定字段吧版本号
private static final int SEQ = MMASK + 1;

// JVM 的 CPU 核数,用于自旋和扩容控制
private static final int NCPU = Runtime.getRuntime().availableProcessors();

// arena 的最大索引值:原则上可以让所有线程不发生竞争
static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;

// 当前线程阻塞等待匹配节点前的自旋次数,CPU==1时不进行自旋
private static final int SPINS = 1 << 10;

// 支持null值作为交换数据
private static final Object NULL_ITEM = new Object();

// 交换超时的返回值对象
private static final Object TIMED_OUT = new Object();

// Participant对象,本质就是TreadLocal
private final Participant participant;

// arena英文是竞技场的意思,这里代表多槽,即一个结点数组,当发生竞争时,多槽交换使用
private volatile Node[] arena;

// slot英文有窄缝、单槽的意思,这里表示单槽交换结点,未发生竞争时,单槽交换使用。slot字段最终会指向首个到达的线程的自身Node结点,表示线程占用了槽位。
private volatile Node slot;

// 最大有效arena数组位置的索引
private volatile int bound;

在定位arena数组的有效槽位时,需要考虑缓存行的影响。由于高速缓存与内存之间是以缓存行为单位交换数据的,根据局部性原理,相邻地址空间的数据会被加载到高速缓存的同一个数据块上(缓存行),而数组是连续的(逻辑,涉及到虚拟内存)内存地址空间,因此,多个slot会被加载到同一个缓存行上,当一个slot改变时,会导致这个slot所在的缓存行上所有的数据(包括其他的slot)无效,需要从内存重新加载,影响性能。下面的属性就是用来解决这个问题的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

// 对内存直接操作,不安全
private static final sun.misc.Unsafe U;
private static final long BOUND;
private static final long SLOT;
private static final long MATCH;
private static final long BLOCKER;
private static final int ABASE;
static {
int s;
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> ek = Exchanger.class;
Class<?> nk = Node.class;
Class<?> ak = Node[].class;
Class<?> tk = Thread.class;
BOUND = U.objectFieldOffset
(ek.getDeclaredField("bound"));
SLOT = U.objectFieldOffset
(ek.getDeclaredField("slot"));
MATCH = U.objectFieldOffset
(nk.getDeclaredField("match"));
BLOCKER = U.objectFieldOffset
(tk.getDeclaredField("parkBlocker"));
s = U.arrayIndexScale(ak);
// ABASE absorbs padding in front of element 0
ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);

} catch (Exception e) {
throw new Error(e);
}
if ((s & (s-1)) != 0 || s > (1 << ASHIFT))
throw new Error("Unsupported array scale");
}
构造函数

创建Participant类

1
2
3
public Exchanger() {
participant = new Participant();
}
exchange
1
2
3
4
5
6
7
8
9
10
11
12
public V exchange(V x) throws InterruptedException {
Object v;
// 判断是否为空,如果是null转换为NULL_ITEM用于交换
Object item = (x == null) ? NULL_ITEM : x;
if ((arena != null ||
(v = slotExchange(item, false, 0L)) == null) &&
((Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0L)) == null)))
throw new InterruptedException();
// 返回对象V
return (v == NULL_ITEM) ? null : (V)v;
}

核心操作就是if的判断条件,我把这个判断条件格式化一下,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
(
arena != null
||
(v = slotExchange(item, false, 0L)) == null
)
&&
(
(
Thread.interrupted()
||
(v = arenaExchange(item, false, 0L)) == null
)
)

Exchanger有两种数据交换的方式,当并发量低的时候,内部采用“单槽位交换”;并发量高的时候会采用“多槽位交换”。

arena==null,表示未出现线程竞争,进行单槽位交换,执行slotExchange(item, false, 0L)方法进行数据交换,并将赋值给v

arena!=null或者单槽交换失败,进行多槽位交换,执行arenaExchange(item, false, 0L)方法进行数据交换,并将赋值给v

image-20200105195101559

这个exchange重载方法的区别就是加了时间限制,当超过指定时间回抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public V exchange(V x, long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
Object v;
Object item = (x == null) ? NULL_ITEM : x;
long ns = unit.toNanos(timeout);
if ((arena != null ||
(v = slotExchange(item, true, ns)) == null) &&
((Thread.interrupted() ||
(v = arenaExchange(item, true, ns)) == null)))
throw new InterruptedException();
if (v == TIMED_OUT)
throw new TimeoutException();
return (v == NULL_ITEM) ? null : (V)v;
}
slotExchange

单槽交换,没有并发,一个线程先执行exchange方法,另一方再执行exchange方法,然后数据彼此交换

slotExchange的入参item表示当前线程携带的数据,返回值正常情况下为配对线程携带的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/**

*单槽交换,在arenas为null时使用
*
*@param item代表要交换的数据
*@param timed为true表示定时
*@param 如果为定时,NS表示最大等待时间,否则0L
*@return 其它配对线程的数据; 如果多槽交换被激活或被中断返回null, 如果超时返回TIMED_OUT(一个Obejct对象)
*/
private final Object slotExchange(Object item, boolean timed, long ns) {
// 当前线程交换结点
Node p = participant.get();
Thread t = Thread.currentThread();
// 线程的中断状态检查
if (t.isInterrupted())
return null;
// CAS循环
for (Node q;;) {
// slot!=null 代表已经有线程先抢占了slot,此时就可以与这个线程交换数据
if ((q = slot) != null) {
// CAS操作,将slot从p设为null
if (U.compareAndSwapObject(this, SLOT, q, null)) {
// 获取交换值
Object v = q.item;
// 设置交换值
q.match = item;
Thread w = q.parked;
// 如果先来的线程堵塞了,则唤醒在此槽位等待的线程
if (w != null)
U.unpark(w);
// 交换成功, 返回结果
return v;
}
// CAS操作失败,CPU核数多于1个, 且bound为0时创建arena数组,并将bound设置为SEQ大小
if (NCPU > 1 && bound == 0 &&
U.compareAndSwapInt(this, BOUND, 0, SEQ))
arena = new Node[(FULL + 2) << ASHIFT];
}
else if (arena != null)
// slot == null && arena != null,说明在第一个线程单槽交换时,初始化了arena,结束单槽交换并才是多槽交换
return null;
else {
// slot==null&&arena==null,CAS操作将slot从null设为p,成功就break
p.item = item;
if (U.compareAndSwapObject(this, SLOT, null, p))
break;
// CAS操作失败, 继续下一次自旋
p.item = null;
}
}

// 执行到这, 说明当前线程先到达, 且已经占用了slot槽(就是else中CAS操作成功然后break跳出循环), 需要等待配对线程到达
int h = p.hash;
long end = timed ? System.nanoTime() + ns : 0L;
// 根据处理器数目,决定自旋次数,如果NCPU==1,自旋1次(也就是不自旋),NCPU>1,就自旋SPINS次(1 << 10)
int spins = (NCPU > 1) ? SPINS : 1;
Object v;
while ((v = p.match) == null) {
// 自旋
if (spins > 0) {
// 随机释放CPU
h ^= h << 1;
h ^= h >>> 3;
h ^= h << 10;
if (h == 0)
h = SPINS | (int)t.getId();
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield();
}
// spins==0&&slot!=p,说明有新的线程到达并抢占了slot,此时就可以与其交换数据, 但是还未完全准备好, 所以重新设置自旋次数让其再自旋等一下
else if (slot != p)
spins = SPINS;
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
// 自旋结束,但没有等不到配对, 此时才阻塞当前线程
U.putObject(t, BLOCKER, this);
p.parked = t;
// 堵塞线程
if (slot == p)
U.park(false, ns);
p.parked = null;
U.putObject(t, BLOCKER, null);
}
// 自旋结束,遇到超时等情况,腾出slot给其他线程使用
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
break;
}
}
U.putOrderedObject(p, MATCH, null);
p.item = null;
p.hash = h;
return v;
}

image-20200105195203584

简单的说单槽交换,就是第一个进程抢占slot字段,然后自旋等待(自旋结束根据情况进行下一步操作,可能堵塞、结束、超时、重新自旋等等),然后第二个线程进来与第一个吸纳城交换数据并设置slot为null。

交换数据后,两个线程分别返回匹配数据(就是对方的数据,或者后交换后得到的数据)

arean可以看做是一个并发符号(当然本质是一个数组,用于存储N个线程结点),arean!=null说明并发量大,需要使用多槽交换,至于自旋次数、CPU释放等等,都是优化操作,不必深入探究。

这样,两个线程就实现了数据交换,整个过程都没有用到同步操作。

arenaExchange

多槽交换比单槽交换要复杂很多。当并发量大时使用多槽交换,但是如何判断并发量是否大?通过上文单槽交换的源码,我们可以知道,当单槽CAS操作失败,说明同时有多个配对线程竞争修改slot槽位,此时就会初始化arena数组并开始多槽交换,如下图

image-20200105195214394

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
private final Object arenaExchange(Object item, boolean timed, long ns) {
// arena多槽交换数组
Node[] a = arena;
// 当前线程携带的交换结点
Node p = participant.get();
// 当前线程的arena索引
for (int i = p.index;;) {
int b, m, c;
long j;
// 从arena数组中选出偏移地址为(i << ASHIFT) + ABASE的元素, 即真正可用的Node,这里是直接对内存操作
Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
// CASE1: 槽不为空,说明已经有线程到达并在等待了
if (q != null && U.compareAndSwapObject(a, j, q, null)) {
// 获取已经到达的线程所携带的值
Object v = q.item;
// 把当前线程携带的值交换给已经到达的线程
q.match = item;
// q.parked指向已经到达的线程,如果堵塞,则唤醒已经到达的线程
Thread w = q.parked;
if (w != null)
U.unpark(w);
// 返回交换后的数据
return v;
}
// CASE2: 槽位有效并且为空
else if (i <= (m = (b = bound) & MMASK) && q == null) {
p.item = item;
// CAS操作,将该槽位从null设为p
if (U.compareAndSwapObject(a, j, null, p)) {
// 如果限定时间,则需要计算结束时间
long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
// 获取当前线程
Thread t = Thread.currentThread();
// 自旋,期间查看有没其它配对线程到达该槽位
for (int h = p.hash, spins = SPINS;;) {
Object v = p.match;
// 有配对线程到达了该槽位
if (v != null) {
//交换结束并清除,注意交换操作在CASE1中,这里负责换后的清除操作
U.putOrderedObject(p, MATCH, null);
p.item = null;
p.hash = h;
// 返回配对线程交换过来的值
return v;
}
else if (spins > 0) {
// 随机释放CPU
h ^= h << 1;
h ^= h >>> 3;
h ^= h << 10;
// 初始化Hash
if (h == 0)
h = SPINS | (int)t.getId();
// 50%的可能h<0,如果整个条件为true,就释放CPU
else if (h < 0 &&
(--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield();
}
//spins==0&&slot!=p,说明有新的线程到达并抢占了slot,此时就可以与其交换数据, 但是还未完全准备好, 所以重新设置自旋次数让其再自旋等一下
else if (U.getObjectVolatile(a, j) != p)
spins = SPINS; // releaser hasn't set match yet
else if (!t.isInterrupted() && m == 0 &&
(!timed ||
(ns = end - System.nanoTime()) > 0L)) {
// 等不到配对线程了, 阻塞当前线程
U.putObject(t, BLOCKER, this);
// 在结点引用当前线程,以便配对线程到达后唤醒我
p.parked = t;
if (U.getObjectVolatile(a, j) == p)
U.park(false, ns);
p.parked = null;
U.putObject(t, BLOCKER, null);
}
else if (U.getObjectVolatile(a, j) == p &&
U.compareAndSwapObject(a, j, p, null)) {
// 尝试缩减arena槽数组的大小
if (m != 0)
U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
p.item = null;
p.hash = h;
i = p.index >>>= 1;
if (Thread.interrupted())
return null;
if (timed && m == 0 && ns <= 0L)
return TIMED_OUT;
break;
}
}
}
// CAS操作失败,clear
else
p.item = null;
}
// CASE3: 无效槽位位置, 需要扩容
else {
if (p.bound != b) {
p.bound = b;
p.collides = 0;
i = (i != m || m == 0) ? m : m - 1;
}
else if ((c = p.collides) < m || m == FULL ||
!U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
p.collides = c + 1;
i = (i == 0) ? m : i - 1;
}
else
// 扩容
i = m + 1;
p.index = i;
}
}
}

arenaExchange方法的整体流程和slotExchange方法类似,主要区别在于使用了arena槽数组,然后使用了需要计算命中。

如果槽位被占用,说明已经有线程先到了,之后的处理和slotExchange一样;

如果槽位有效且为null,说明当前线程是先到的,就占用槽位,然后按照:spin->yield->block这种锁升级的顺序进行优化的等待,等不到配对线程就会进入阻塞。

另外,由于arenaExchange利用了槽数组,所以涉及到槽数组的扩容和缩减问题,也就是CASE2与CASE3中涉及的操作,这里就不深入

总结

单槽交换示意图:

image-20200105195225981

多槽交换示意图:

image-20200105195236526
参考

Java多线程进阶(二一)—— J.U.C之synchronizer框架:Exchanger

Exchanger 源码分析

-------------本文结束感谢您的阅读-------------