ConcurrentHashMap【核心源码讲解】
JUC并发集合
本文你现在看到的是我的博文初稿,后续会加上对
ConcurrentHashMap
的源码的思维导图目前还在更作中,但想想还是先发布出来,哈哈哈!(不过这初稿直接怼源码很多人一般感觉枯燥)
但通往顶峰的路终究是伴随着坎坷,加油,兄弟们,约好的顶峰见,哈哈!来学起来吧!
ConcurrentHashMap
是线程安全的HASHMAP
在JDK1.8是以CAS+Synchronized实现的线程安全
CAS:在没有Hash冲突时(Node放在数组上时)
使用Synchronized: 在出现hash冲突时(NOde存放的位置已经有数据了)
存储结构
数组+链表+红黑树
put
方法:
调用
put
方法时,会调用’putval’且其方法的第三个参数默认传递的是false在调用
putIfAbsent
时,会调用putval
方法,但是第三个参数传递为true若传为false:代表key一至时,直接覆盖数据
若为true:代表key一致时,什么也不做箱相当于Redis的setnx,key不存在时,正常添加
public V put(K key, V value) {return putVal(key, value, false);
}
putVal方法 散列算法
final V putVal(K key, V value, boolean onlyIfAbsent) {//不允许key或value为null的值,跟hashMAp的区别if (key == null || value == null) throw new NullPointerException();//根据key的hashcode计算一个hash值,后期得出当前key-value要存储哪个数组的索引位置int hash = spread(key.hashCode());//一个标识:int binCount = 0;//==========添加数据到数组&初始化数组================//将MAP数组赋值给tab,死循环for (Node<K,V>[] tab = table;;) {//声明一堆变量,//n:数组长度 //i:当前node需要存放的索引位置 //f:当前索引i位置的node对象//fh:当前数组i索引位置上数据hash值Node<K,V> f; int n, i, fh;//判断当前数组是否还没有初始化if (tab == null || (n = tab.length) == 0)//将数组初始化tab = initTable();//(f = tabAt(tab, i = (n - 1) & hash): 计算索引放到哪个位置的方法//(n - 1) & hash):计算当前Node需要存放在哪个索引位置//在基于tabAt获取到i位置的数据//n:数组长度else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//现在数组i位置上没有数据,基于CAS方式将数据存放在i位置上if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))//若成功.执行break跳出循环,插入数据成功break; }//判断当前数据是否在扩容else if ((fh = f.hash) == MOVED)//让当前插入数据的线程协助扩容tab = helpTransfer(tab, f);else {//声明变量V oldVal = null;//基于当前索引位置的node,作为锁对象synchronized (f) {//判断当前位置的数据还是之前的f吗?-->避免并发操作的安全问题if (tabAt(tab, i) == f) {//判断hash值是否大于0--->不是红黑树if (fh >= 0) {//binCount设置为1:在链表情况下:记录链表长度的一个标识binCount = 1;//死循环,每循环一次,对binCount加1for (Node<K,V> e = f;; ++binCount) {//声明标识:ekK ek;//当前索引i位置的数据,会否和当前put的key的hash值一致if (e.hash == hash &&//当前索引i位置的数据的key和put的key相等则返回true((ek = e.key) == key ||//或者equals相等(ek != null && key.equals(ek)))) {//key一致,可能需要覆盖数据//当前索引i位置的数据的value赋值给oldValueoldVal = e.val;//如果传入的是false,代表key一致,覆盖Value//若传入的true,代表key一致,什么也不做if (!onlyIfAbsent)//覆盖Valuee.val = value;break;}//拿到当前指向的node对象Node<K,V> pred = e;//将e只想下一个node对象,若next指向的是一个null可以挂在当前node下面if ((e = e.next) == null) {//将hash,key,value封装为node对象挂载在pred的next上pred.next = new Node<K,V>(hash, key,value, null);break;}}}else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}//bincount不为0if (binCount != 0) {//判断列表长度是否大于等于阈值8(根据泊松分布,链表长度达到8的几率很低 )if (binCount >= TREEIFY_THRESHOLD)//尝试转为红黑树或者扩容//基于treeifyBin方法和上面的if阈值判断,可以得知链表想要转为红黑树必须保证数组长度>=64,链表长度>=8;//若数组长度<64,首先会将数组进行扩容treeifyBin(tab, i);//如果出现数据覆盖,if (oldVal != null)//返回之前的值return oldVal;break;}}}addCount(1L, binCount);return null;
}
spread():
计算当前NOde的hash值方法
static final int spread(int h) {//将key的hashcode的值高低16位进行^运算,又与HASH_BITS进行&运算//将高位的hash也参与到计算索引位置的运算当中//也是为啥concurrentHashMap要求数组长度为2^n幂//HASH_BITS:让hash值的最高位符号位肯定是0,代表当前hash值默认情况一定是正数,因为hash值为负数有特定含义return (h ^ (h >>> 16)) & HASH_BITS;
}static final int MOVED = -1; // 代表当前hash位置数据正在扩容
static final int TREEBIN = -2; // 代表当前hash位置下挂载的是一个红黑树
static final int RESERVED = -3; // 预留当前索引位置
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
tabAt
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
initTable():
初始化数组方法
private final Node<K,V>[] initTable() {//声明标识Node<K,V>[] tab; int sc;//再次判断数组没有初始化,并且完成tab的赋值while ((tab = table) == null || tab.length == 0) {//将sizeCtl赋值给sc变量,并判断是否小于0//sizeCtl:数组初始化和数组扩容时一个控制变量 //(-1: 低16位代表当前数组正在初始化 小于-1: 当前数组正在扩容个数(一个线程扩容值为-2 两个线程扩容,值为-3))//0: 代表数组没有初始化 >0: 代表当前数组的扩容阈值或者当前数组的初始化大小if ((sc = sizeCtl) < 0)Thread.yield(); // lost initialization race; just spin//尝试初始化数组,线程会以CAS的方式,将sizeCtl修改为-1,代表当前线程可以初始化数组else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//尝试初始化try {//再次判断当前数组是否已经初始化if ((tab = table) == null || tab.length == 0) {//开始做初始化,//若sizeCtl>0,就初始化sizeCtl长度的数组//若sizeCtl==0,就初始化默认长度为16int n = (sc > 0) ? sc : DEFAULT_CAPACITY;//初始化数组nt,赋值给tab和tableNode<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;//sc赋值为数组长度减去数组长度右移两位//将sc赋值为下次扩容的阈值sc = n - (n >>> 2);}} finally {//将赋值好的sc,设置给sizeCtlsizeCtl = sc;}break;}}return tab;
}
判断列表长度是否大于等于阈值8(根据泊松分布,链表长度达到8的几率很低 )
ConcurrentHashMap扩容操作
treeifyBin:
在链表长度>=8时,尝试将链表转为红黑树
private final void treeifyBin(Node<K,V>[] tab, int index) {Node<K,V> b; int n, sc;//数组不为空判断if (tab != null) {//数组长度n,是否小于64if ((n = tab.length) < MIN_TREEIFY_CAPACITY)//若数组长度小于64,不能将链表转为红黑树,先尝试扩容操作tryPresize(n << 1);//转红黑树的操作else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {synchronized (b) {if (tabAt(tab, index) == b) {TreeNode<K,V> hd = null, tl = null;for (Node<K,V> e = b; e != null; e = e.next) {TreeNode<K,V> p =new TreeNode<K,V>(e.hash, e.key, e.val,null, null);if ((p.prev = tl) == null)hd = p;elsetl.next = p;tl = p;}setTabAt(tab, index, new TreeBin<K,V>(hd));}}}}
}
tryPresize
计算扩容戳
//size:是将之前数组长度,左移一位得到的结果
private final void tryPresize(int size) {// 如果扩容的长度达到最大值,就使用最大值,//否则需要保证数组长度为2^n次幂//这块的操作也是为了给初始化进行准备的,因为在调用putAll方法时,也会触发tryPresize方法//若刚刚new的ConcurrentHashMAP直接调用putAll方法,会通过tryPreSize方法进行初始化int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :tableSizeFor(size + (size >>> 1) + 1);//看下面的详细剖析//=========下面的代码和initable的方法一样//声明scint sc;//将sizeCtl的值赋给sc,并判断是否大于0,代表没有初始化操作也没有扩容操作while ((sc = sizeCtl) >= 0) {//将concurrentHashMap 的table赋值给tab,并声明数组长度nNode<K,V>[] tab = table; int n;//判断数组长度需要初始化if (tab == null || (n = tab.length) == 0) {//执行初始化: 若sc是初始化长度比计算出来的c大,直接使用sc,若没有,无法容纳putAll中传入的map,使用更大的数组长度n = (sc > c) ? sc : c;//设置sizeCtl为-1,代表初始化操作if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {//DCL操作,再次判断数组的引用有没有变化if (table == tab) {@SuppressWarnings("unchecked")//初始化数组Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];//数组赋值table = nt;//计算扩容阈值sc = n - (n >>> 2);}} finally {//最终赋值给sizeCtlsizeCtl = sc;}}}//c(预计长度)<=sc(初始化或者扩容后的长度):满足直接退出循环结束方法//n >= MAXIMUM_CAPACITY: 数组长度大于等于最大长度,直接结束 退出else if (c <= sc || n >= MAXIMUM_CAPACITY)break;//=================对sizeCtl的修改以及条件判断的BUG//判断当前tab是否和table一致,防止并发else if (tab == table) {//计算扩容标识戳,根据当前数组长度计算一个16位的扩容戳//作用: 保证后面sizeCtl赋值时为小于-1的负数// 用来记录当前是从什么长度开始扩容的 00000000 00000000 10000000 00011010 int rs = resizeStamp(n);//若sc<0,代表有数组正在扩容if (sc < 0) {//协助扩容的代码,进不来得的Node<K,V>[] nt;//依然有BUGif ((//当前线程扩容时,老数组长度是否和当前线程扩容时老数组长度一致sc >>> RESIZE_STAMP_SHIFT) != rs //下面的两个判断都是有问题存在BUG:核心问题就应该现将rs左移16位,在追加当前值//判断当前扩容是否已经即将结束|| sc == rs + 1 //正确写法: sc== rs<<<16+ 1//判断当前扩容线程是否达到最大限度|| sc == rs + MAX_RESIZERS //正确写法: sc== rs<<<16+MAX_RESIZERS//扩容已经结束|| (nt = nextTable) == null //记录迁移的索引位置从高位往低位迁移,也代表扩容即将结束|| transferIndex <= 0)break;//若果线程需要协助扩容,首先对sizeCtl进行加一操作,代表当前进来的一个线程需要协助扩容if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))//上面的判断没进去,nt代表新数组transfer(tab, nt);}//没有其他数组扩容,当前为第一个扩容的 RESIZE_STAMP_SHIFT就是16//基于CAS将sizeCtl的值修改为 10000000 00011010 00000000 00000010 (左移16为加2)//将扩容戳左移16,符号为1,就代表这个值为负数//帝低16表示当前正在扩容的线程有多少个 //为什么低位值为2时,代表有一个线程正在扩容: //原因: 因为每一个线程扩容完毕后,会对低16位进行减一的操作,当最后一个线程扩容完毕后,减一的结果还是-1,当值为-1时,需要对老数组进行扫描,查看是否有遗漏的数据没有迁移到新数组else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))//调用transfer(),将第二个参数设置为null,就代表第一次来扩容transfer(tab, null);}}
}
tableSizeFor
将c的设置到最近的2的n次幂的值,
此处的c等于(size + (size >>> 1) + 1)
此处假设传入的c=17
size: 00000000 00000000 00000000 00010001
(size >>> 1): 00000000 00000000 00000000 00001000
1:00000000 00000000 00000000 00000001
最终三者相加: (size + (size >>> 1) + 1)=26: 00000000 00000000 00000000 00011010
private static final int tableSizeFor(int c) {int n = c - 1; //00000000 00000000 00000000 00011001//00000000 00000000 00000000 00011001 n//00000000 00000000 00000000 00001100 (n>>>1) 右移一位(最左边的1去掉了) 再将原先的n与(n>>>1)进行或运算//00000000 00000000 00000000 00011101 n|(n>>>1)n |= n >>> 1; //00000000 00000000 00000000 00011101 n//00000000 00000000 00000000 00000111 (n >>> 2)//00000000 00000000 00000000 00011111 n|(n >>> 2)n |= n >>> 2;//00000000 00000000 00000000 00011111 n//00000000 00000000 00000000 00000001 (n >>> 4)//00000000 00000000 00000000 00011111 n|(n >>> 4) 从此处开始都是1了,后面的结果都一样n |= n >>> 4;//00000000 00000000 00000000 00011111 n//00000000 00000000 00000000 00000000 (n >>> 8)//00000000 00000000 00000000 00011111 n|(n >>>8)n |= n >>> 8;//00000000 00000000 00000000 00011111 n//00000000 00000000 00000000 00000000 (n >>> 16)//00000000 00000000 00000000 00011111 n|(n >>>16)n |= n >>> 16;return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
resizeStamp
计算扩容标识戳
00000000 00000000 00000000 00100000 =32
Integer.numberOfLeadingZeros(32)=26 计算最高位1前面有多少个0
00000000 00000000 10000000 0000000 (1 << (RESIZE_STAMP_BITS - 1) --> 这个就是RESIZE_STAMP_BITS =16
00000000 00000000 00000000 00011010 26
00000000 00000000 10000000 00011010 Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)) 或运算的结果
static final int resizeStamp(int n) {return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
transfer()方法
扩容核心方法
//开始扩容 以32扩容到64长度为例
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {//===========//开始扩容: 计算每个线程迁移的长度//tab: 老数组 n:数组长度 stride: 每个线程一次性迁移多少数据到新数组int n = tab.length, stride;//基于CPU的内核数量来计算,每个线程一次性迁移多长的数据最合理//MIN_TRANSFER_STRIDE=16,为每个线程迁移数据的最小长度//就是: 根据CPU计算每个线程一次迁移多长的数据到新数组,若大于16,使用计算结果,小于16,使用最小长度16if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide range//==============//第一个进来扩容的线程需要把我们的新数据构建出来if (nextTab == null) { // initiatingtry {//将原数组长度左移一位,构建新数组长度Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//赋值操作nextTab = nt;} catch (Throwable ex) { //说明已经达到数组长度最大的取值范围sizeCtl = Integer.MAX_VALUE;//设置sizeCtl后直接结束return;}//将成员变量的新数组赋值nextTable = nextTab;//迁移数据时用到德标识,默认值为老数组长度transferIndex = n;//32 }//新数组长度int nextn = nextTab.length; //等于64//在老数组迁移完数据后做的标识ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//迁移数据时需要用到的标识//advance等于true,代表当前线程需要接受任务,在执行任务 advance==false,代表已经接收完任务boolean advance = true;//finishing表示是否迁移结束boolean finishing = false; // to ensure sweep before committing nextTab//===============线程领取迁移任务的操作//i=-1: 代表当前线程迁移数据的索引值//bound=0:for (int i = 0, bound = 0;;) {//f=null//fh=0Node<K,V> f; int fh;//当前线程需要接受任务while (advance) {//nextIndex=0, nextBound=0int nextIndex, nextBound;//第一次进来,这两个判断肯定进不去//对i进行减减,并且判断当前任务是否处理完毕if (--i >= bound || finishing)advance = false;//判断transferIndex是否<=0, 若小于等于代表没有任务可领,结束//在线程领取任务后,会对transferIndex进行修改 ,修改为transferIndex - stride//在任务都领取完后,transferIndex肯定<=0,代表灭有迁移数据可以领取else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}//当前线程尝试领取任务else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {//对bound赋值bound = nextBound;//对i赋值i = nextIndex - 1;//设置advance = false,代表当前线程领取到任务advance = false;}}//=====//===判断扩容是否已经结束//i<0:当前线程没有接受到任务//i >= n:迁移的索引位置不可能大于数组的长度,不会成立//i + n >= nextn : 因为i的最大值就是数组索引的最大值,不会成立if (i < 0 || i >= n || i + n >= nextn) {//若进来,代表当前线程没有接收到任务int sc;//finishing=true,代表扩容结束if (finishing) {//nextTable新数组设为null,nextTable = null;//将当前数组的引用执行指向新数组table = nextTab;//重新计算扩容阈值sizeCtl = (n << 1) - (n >>> 1);//真正的结束扩容return;}//当前线程没有接收到任务让当前线程结束扩容操作//采用CAS方式将sizeCtl-1,代表当前并发扩容线程数减一if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {//sizeCtl高16位是基于数组长度计算的扩容戳,低16位代表正在扩容线程数//(sc - 2) != resizeStamp(n): 不等于代表当前线程不是最后一个推出扩容的线程,直接结束当前线程扩容if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;//如果是最后一个推出扩容的线程,将finishing和advance设置为truefinishing = advance = true;//i设置为老数组长度,让最后一个i线程从尾到头再检查一遍,是否数据全部迁移完毕,i = n; // recheck before commit}}//========//开始迁移数据,并且在迁移完数据完毕后,会将advance设置为trueelse if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);else if ((fh = f.hash) == MOVED)advance = true; // already processedelse {synchronized (f) {if (tabAt(tab, i) == f) {Node<K,V> ln, hn;if (fh >= 0) {int runBit = fh & n;Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}//===honelse if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}}}}}
}