@boothsun
2018-04-28T06:22:32.000000Z
字数 8553
阅读 1717
Java
文章大部分内容为以下优秀博文 学习笔记:
1. 深入并发包 ConcurrentHashMap
2. Java进阶(六)从ConcurrentHashMap的演进看Java多线程核心技术
3. ConcurrentHashMap源码分析(JDK8版本)
4. 深入浅出ConcurrentHashMap1.8
JDK1.7下 ConcurrentHashMap 是通过分段锁(锁是 ReentrantLock)的形式来保证线程安全的,底层采用的是数组+链表的存储结构。
JDK1.8下ConcurrentHashMap是通过CAS+Synchronized的形式来保证线程安全的,底层采用的是数组+链表+红黑树的存储结构。
JDK1.7 中的ConcurrentHashMap的底层数据结构仍然是数组和链表。与HashMap不同的是,ConcurrentHashMap最外层不是一个大的数组,而是一个Segment的数组。每个Segment包含一个与HashMap数据结构差不多的链表数组。整个数据结构如下图所示:

另一张,我觉得还可以的图:

其包含两个核心静态内部类 Segment和HashEntry。
JDK1.8的实现已经抛弃了Segment分段锁机制,利用CAS+Synchronized来保证并发更新的安全性,底层采用数组+链表+红黑树的存储结构。默认情况下,hash冲突时产生的是链表,但当链表长度超过一定阈值(默认是8)时会自动将链表(寻址时间复杂度为O(n) )转换为红黑树(寻址时间复杂度为O() )。其数据结构如下图,所示:

另一张,我觉得还可以的图:

所以,整个ConcurrentHashMap看起来就像是优化过且线程安全的HashMap,虽然在JDK1.8中还能看到Segment的数据结构,但是已经是简化了属性,只是为了兼容就版本。
// node数组最大容量:2^30=1073741824private static final int MAXIMUM_CAPACITY = 1 << 30;// 默认初始值,必须是2的幕数private static final int DEFAULT_CAPACITY = 16;//数组可能最大值,需要与toArray()相关方法关联static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;//并发级别,遗留下来的,为兼容以前的版本private static final int DEFAULT_CONCURRENCY_LEVEL = 16;// 负载因子private static final float LOAD_FACTOR = 0.75f;// 链表转红黑树阀值,> 8 链表转换为红黑树static final int TREEIFY_THRESHOLD = 8;//树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))static final int UNTREEIFY_THRESHOLD = 6;static final int MIN_TREEIFY_CAPACITY = 64;private static final int MIN_TRANSFER_STRIDE = 16;private static int RESIZE_STAMP_BITS = 16;// 2^15-1,help resize的最大线程数private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;// 32-16=16,sizeCtl中记录size大小的偏移量private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;// forwarding nodes的hash值static final int MOVED = -1;// 树根节点的hash值static final int TREEBIN = -2;// ReservationNode的hash值static final int RESERVED = -3;// 可用处理器数量static final int NCPU = Runtime.getRuntime().availableProcessors();//存放node的数组transient volatile Node<K,V>[] table;/** 控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义* 当为负数时:-1代表正在初始化,-N代表有N-1个线程正在 进行扩容* 当为0时:代表当时的table还没有被初始化* 当为正数时:表示初始化或者下一次进行扩容的大小private transient volatile int sizeCtl;
基本属性定义了ConcurrentHashMap的一些边界以及操作时的一些控制标志。下面,我们看一下内部的一些结构组成,这些是ConcurrentHashMap整个数据结构的核心。
Node是ConcurrentHashMap存储结构的基本单元,继承于HashMap中的Entry,用于存储数据,源代码如下:
static class Node<K,V> implements Map.Entry<K,V> {//链表的数据结构final int hash;final K key;//val和next都会在扩容时发生变化,所以加上volatile来保持可见性和禁止重排序volatile V val;volatile Node<K,V> next;Node(int hash, K key, V val, Node<K,V> next) {this.hash = hash;this.key = key;this.val = val;this.next = next;}public final K getKey() { return key; }public final V getValue() { return val; }public final int hashCode() { return key.hashCode() ^ val.hashCode(); }public final String toString(){ return key + "=" + val; }//不允许更新valuepublic final V setValue(V value) {throw new UnsupportedOperationException();}public final boolean equals(Object o) {Object k, v, u; Map.Entry<?,?> e;return ((o instanceof Map.Entry) &&(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&(v = e.getValue()) != null &&(k == key || k.equals(key)) &&(v == (u = val) || v.equals(u)));}//用于map中的get()方法,子类重写Node<K,V> find(int h, Object k) {Node<K,V> e = this;if (k != null) {do {K ek;if (e.hash == h &&((ek = e.key) == k || (ek != null && k.equals(ek))))return e;} while ((e = e.next) != null);}return null;}}
Node的数据结构很简单,它是链表中的一个节点,但是只允许对数据进行查找,不允许进行修改。
TreeNode继承于Node,但是数据结构换成了二叉树结构,它是红黑树的数据的存储结构,用于红黑树中存储数据,当链表的节点数大于8时会转换为红黑树的结构,此时就是通过ThreeNode作为存储结构代替Node来转换成红黑树。具体源代码如下:
static final class TreeNode<K,V> extends Node<K,V> {//树形结构的属性定义TreeNode<K,V> parent; // red-black tree linksTreeNode<K,V> left;TreeNode<K,V> right;TreeNode<K,V> prev; // needed to unlink next upon deletionboolean red; //标志红黑树的红节点TreeNode(int hash, K key, V val, Node<K,V> next,TreeNode<K,V> parent) {super(hash, key, val, next);this.parent = parent;}Node<K,V> find(int h, Object k) {return findTreeNode(h, k, null);}//根据key查找 从根节点开始找出相应的TreeNode,final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {if (k != null) {TreeNode<K,V> p = this;do {int ph, dir; K pk; TreeNode<K,V> q;TreeNode<K,V> pl = p.left, pr = p.right;if ((ph = p.hash) > h)p = pl;else if (ph < h)p = pr;else if ((pk = p.key) == k || (pk != null && k.equals(pk)))return p;else if (pl == null)p = pr;else if (pr == null)p = pl;else if ((kc != null ||(kc = comparableClassFor(k)) != null) &&(dir = compareComparables(kc, k, pk)) != 0)p = (dir < 0) ? pl : pr;else if ((q = pr.findTreeNode(h, k, kc)) != null)return q;elsep = pl;} while (p != null);}return null;}}
TreeBin可以看成是封装了操作TreeNode的工具类,它提供了一些转换红黑树的条件判断和锁控制的工具方法。
static final class TreeBin<K,V> extends Node<K,V> {//指向TreeNode列表和根节点TreeNode<K,V> root;volatile TreeNode<K,V> first;volatile Thread waiter;volatile int lockState;// 读写锁状态static final int WRITER = 1; // 获取写锁的状态static final int WAITER = 2; // 等待写锁的状态static final int READER = 4; // 增加数据时读锁的状态/*** 初始化红黑树*/TreeBin(TreeNode<K,V> b) {super(TREEBIN, null, null, null);this.first = b;TreeNode<K,V> r = null;for (TreeNode<K,V> x = b, next; x != null; x = next) {next = (TreeNode<K,V>)x.next;x.left = x.right = null;if (r == null) {x.parent = null;x.red = false;r = x;}else {K k = x.key;int h = x.hash;Class<?> kc = null;for (TreeNode<K,V> p = r;;) {int dir, ph;K pk = p.key;if ((ph = p.hash) > h)dir = -1;else if (ph < h)dir = 1;else if ((kc == null &&(kc = comparableClassFor(k)) == null) ||(dir = compareComparables(kc, k, pk)) == 0)TreeNode<K,V> xp = p;if ((p = (dir <= 0) ? p.left : p.right) == null) {x.parent = xp;if (dir <= 0)xp.left = x;elsexp.right = x;r = balanceInsertion(r, x);break;}}}}this.root = r;assert checkInvariants(root);}......}
public V put(K key, V value) {return putVal(key, value, false);}/** Implementation for put and putIfAbsent */final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode()); //两次hash,减少hash冲突,可以均匀分布int binCount = 0;for (Node<K,V>[] tab = table;;) { //对这个table进行迭代Node<K,V> f; int n, i, fh;//这里就是上面构造方法没有进行初始化,在这里进行判断,为null就调用initTable进行初始化,属于懒汉模式初始化if (tab == null || (n = tab.length) == 0)tab = initTable();else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果i位置没有数据,就直接无锁插入if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}else if ((fh = f.hash) == MOVED)//如果在进行扩容,则先进行扩容操作tab = helpTransfer(tab, f);else {V oldVal = null;//如果以上条件都不满足,那就要进行加锁操作,也就是存在hash冲突,锁住链表或者红黑树的头结点synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) { //表示该节点是链表结构binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;//这里涉及到相同的key进行put就会覆盖原先的valueif (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) { //插入链表尾部pred.next = new Node<K,V>(hash, key,value, null);break;}}}else if (f instanceof TreeBin) {//红黑树结构Node<K,V> p;binCount = 2;//红黑树结构旋转插入if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) { //如果链表的长度大于8时就会进行红黑树的转换if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount);//统计size,并且检查是否需要扩容return null;}
这个put的过程很清晰,首先会进入无条件自循直到put成功,可以用以下六步流程来概括:
initTable()方法来进行初始化过程。addCount()方法统计size,并且检查是否需要扩容。现在我们来对每一步的细节进行源码分析,在第一步中,符合条件会进行初始化操作,我们来看看initTable()方法:
/*** Initializes table, using the size recorded in sizeCtl.*/private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {//空的table才能进入初始化操作if ((sc = sizeCtl) < 0) //sizeCtl<0表示其他线程已经在初始化了或者扩容了,挂起当前线程Thread.yield(); // lost initialization race; just spinelse if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//CAS操作SIZECTL为-1,表示初始化状态try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];//初始化table = tab = nt;sc = n - (n >>> 2);//记录下次扩容的大小}} finally {sizeCtl = sc;}break;}}return tab;}
上面的初始化过程非常简单,首先会验证sizeCtl的值是否小于0,如果是,则表示其他线程正在进行扩容或者已经在初始化了,则会挂起当前线程。否则,则表明当前还没有进行过初始化table的过程,就会进入初始化流程。初始化流程也非常简单,就是new了指定大小的Node数组,然后计算下次扩容的大小。
在第二步中没有hash冲突就直接调用Unsafe的方法CAS插入该元素,这步很简单,没有太多可说的。
第三步,如果容器正在扩容,则会调用helpTransfer()方法帮助扩容,现在我们来看看helpTransfer()方法看看。
/*** 帮助从旧的table的元素复制到新的table中*/final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;if (tab != null && (f instanceof ForwardingNode) &&(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { //新的table nextTba已经存在前提下才能帮助扩容int rs = resizeStamp(tab.length);while (nextTab == nextTable && table == tab &&(sc = sizeCtl) < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || transferIndex <= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {transfer(tab, nextTab);//调用扩容方法break;}}return nextTab;}return table;}
其实helpTransfer()方法的目的就是调用多个工作线程一起帮助进行扩容,这样的效率就会更高,而不是只有检查到要扩容的那个线程进行扩容操作,其他线程就要等待扩容操作完成才能工作。
后面继续参考写:
ConcurrentHashMap源码分析(JDK8版本)
加锁代码段:
f = tabAt(tab, i = (n - 1) & hash)synchronized (f) {