Java中怎么使用ConcurrentHashMap实现线程安全的Map
本文小编为大家详细介绍“Java中怎么使用ConcurrentHashMap实现线程安全的Map”,内容详细,步骤清晰,细节处理妥当,希望这篇“Java中怎么使用ConcurrentHashMap实现线程安全的Map”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。
jdk1.7版本
数据结构
/***Thesegments,eachofwhichisaspecializedhashtable.*/finalSegment<K,V>[]segments;
可以看到主要就是一个Segment数组,注释也写了,每个都是一个特殊的hash table。
来看一下Segment是什么东西。
staticfinalclassSegment<K,V>extendsReentrantLockimplementsSerializable{....../***Theper-segmenttable.Elementsareaccessedvia*entryAt/setEntryAtprovidingvolatilesemantics.*/transientvolatileHashEntry<K,V>[]table;transientintthreshold;finalfloatloadFactor;//构造函数Segment(floatlf,intthreshold,HashEntry<K,V>[]tab){this.loadFactor=lf;this.threshold=threshold;this.table=tab;}......}
上面是部分代码,可以看到Segment继承了ReentrantLock,所以其实每个Segment就是一个锁。
里面存放着HashEntry数组,该变量用volatile修饰。HashEntry和hashmap的节点类似,也是一个链表的节点。
来看看具体的代码,可以看到和hashmap里面稍微不同的是,他的成员变量有用volatile修饰。
staticfinalclassHashEntry<K,V>{finalinthash;finalKkey;volatileVvalue;volatileHashEntry<K,V>next;HashEntry(inthash,Kkey,Vvalue,HashEntry<K,V>next){this.hash=hash;this.key=key;this.value=value;this.next=next;}......}
所以ConcurrentHashMap的数据结构差不多是下图这种样子的。
在构造的时候,Segment 的数量由所谓的 concurrentcyLevel 决定,默认是 16,也可以在相应构造函数直接指定。注意,Java 需要它是 2 的幂数值,如果输入是类似 15 这种非幂值,会被自动调整到 16 之类 2 的幂数值。
来看看源码,先从简单的get方法开始
get()
publicVget(Objectkey){Segment<K,V>s;//manuallyintegrateaccessmethodstoreduceoverheadHashEntry<K,V>[]tab;inth=hash(key);longu=(((h>>>segmentShift)&segmentMask)<<SSHIFT)+SBASE;//通过unsafe获取Segment数组的元素if((s=(Segment<K,V>)UNSAFE.getObjectVolatile(segments,u))!=null&&(tab=s.table)!=null){//还是通过unsafe获取HashEntry数组的元素for(HashEntry<K,V>e=(HashEntry<K,V>)UNSAFE.getObjectVolatile(tab,((long)(((tab.length-1)&h))<<TSHIFT)+TBASE);e!=null;e=e.next){Kk;if((k=e.key)==key||(e.hash==h&&key.equals(k)))returne.value;}}returnnull;}
get的逻辑很简单,就是找到Segment对应下标的HashEntry数组,再找到HashEntry数组对应下标的链表头,再遍历链表获取数据。
这个获取数组中的数据是使用UNSAFE.getObjectVolatile(segments, u),unsafe提供了像c语言的可以直接访问内存的能力。该方法可以获取对象的相应偏移量的数据。u就是计算好的一个偏移量,所以等同于segments[i],只是效率更高。
put()
publicVput(Kkey,Vvalue){Segment<K,V>s;if(value==null)thrownewNullPointerException();inthash=hash(key);intj=(hash>>>segmentShift)&segmentMask;if((s=(Segment<K,V>)UNSAFE.getObject//nonvolatile;recheck(segments,(j<<SSHIFT)+SBASE))==null)//inensureSegments=ensureSegment(j);returns.put(key,hash,value,false);}
而对于 put 操作,是以 Unsafe 调用方式,直接获取相应的 Segment,然后进行线程安全的 put 操作:
主要逻辑在Segment内部的put方法
finalVput(Kkey,inthash,Vvalue,booleanonlyIfAbsent){//scanAndLockForPut会去查找是否有key相同Node//无论如何,确保获取锁HashEntry<K,V>node=tryLock()?null:scanAndLockForPut(key,hash,value);VoldValue;try{HashEntry<K,V>[]tab=table;intindex=(tab.length-1)&hash;HashEntry<K,V>first=entryAt(tab,index);for(HashEntry<K,V>e=first;;){if(e!=null){Kk;//更新已有value...}else{//放置HashEntry到特定位置,如果超过阈值,进行rehash//...}}}finally{unlock();}returnoldValue;}
size()
来看一下主要的代码,
for(;;){//如果重试次数等于默认的2,就锁住所有的segment,来计算值if(retries++==RETRIES_BEFORE_LOCK){for(intj=0;j<segments.length;++j)ensureSegment(j).lock();//forcecreation}sum=0L;size=0;overflow=false;for(intj=0;j<segments.length;++j){Segment<K,V>seg=segmentAt(segments,j);if(seg!=null){sum+=seg.modCount;intc=seg.count;if(c<0||(size+=c)<0)overflow=true;}}//如果sum不再变化,就表示得到了一个确切的值if(sum==last)break;last=sum;}
这里其实就是计算所有segment的数量和,如果数量和跟上次获取到的值相等,就表示map没有进行操作,这个值是相对正确的。如果重试两次之后还是没法得到一个统一的值,就锁住所有的segment,再来获取值。
扩容
privatevoidrehash(HashEntry<K,V>node){HashEntry<K,V>[]oldTable=table;intoldCapacity=oldTable.length;//新表的大小是原来的两倍intnewCapacity=oldCapacity<<1;threshold=(int)(newCapacity*loadFactor);HashEntry<K,V>[]newTable=(HashEntry<K,V>[])newHashEntry[newCapacity];intsizeMask=newCapacity-1;for(inti=0;i<oldCapacity;i++){HashEntry<K,V>e=oldTable[i];if(e!=null){HashEntry<K,V>next=e.next;intidx=e.hash&sizeMask;if(next==null)//SinglenodeonlistnewTable[idx]=e;else{//Reuseconsecutivesequenceatsameslot//如果有多个节点HashEntry<K,V>lastRun=e;intlastIdx=idx;//这里操作就是找到末尾的一段索引值都相同的链表节点,这段的头结点是lastRun.for(HashEntry<K,V>last=next;last!=null;last=last.next){intk=last.hash&sizeMask;if(k!=lastIdx){lastIdx=k;lastRun=last;}}//然后将lastRun结点赋值给数组位置,这样lastRun后面的节点也跟着过去了。newTable[lastIdx]=lastRun;//之后就是复制开头到lastRun之间的节点//Cloneremainingnodesfor(HashEntry<K,V>p=e;p!=lastRun;p=p.next){Vv=p.value;inth=p.hash;intk=h&sizeMask;HashEntry<K,V>n=newTable[k];newTable[k]=newHashEntry<K,V>(h,p.key,v,n);}}}}intnodeIndex=node.hash&sizeMask;//addthenewnodenode.setNext(newTable[nodeIndex]);newTable[nodeIndex]=node;table=newTable;}
jdk1.8版本
数据结构
1.8的版本的ConcurrentHashmap整体上和Hashmap有点像,但是去除了segment,而是使用node的数组。
transientvolatileNode<K,V>[]table;
1.8中还是有Segment这个内部类,但是存在的意义只是为了序列化兼容,实际已经不使用了。
来看一下node节点
staticclassNode<K,V>implementsMap.Entry<K,V>{finalinthash;finalKkey;volatileVval;volatileNode<K,V>next;Node(inthash,Kkey,Vval,Node<K,V>next){this.hash=hash;this.key=key;this.val=val;this.next=next;}......}
和HashMap中的node节点类似,也是实现Map.Entry,不同的是val和next加上了volatile修饰来保证可见性。
put()
finalVputVal(Kkey,Vvalue,booleanonlyIfAbsent){if(key==null||value==null)thrownewNullPointerException();inthash=spread(key.hashCode());intbinCount=0;for(Node<K,V>[]tab=table;;){Node<K,V>f;intn,i,fh;if(tab==null||(n=tab.length)==0)//初始化tab=initTable();elseif((f=tabAt(tab,i=(n-1)&hash))==null){//利用CAS去进行无锁线程安全操作,如果bin是空的if(casTabAt(tab,i,null,newNode<K,V>(hash,key,value,null)))break;//nolockwhenaddingtoemptybin}elseif((fh=f.hash)==MOVED)tab=helpTransfer(tab,f);else{VoldVal=null;synchronized(f){//细粒度的同步修改操作...if(tabAt(tab,i)==f){if(fh>=0){binCount=1;for(Node<K,V>e=f;;++binCount){Kek;//找到相同key就更新if(e.hash==hash&&((ek=e.key)==key||(ek!=null&&key.equals(ek)))){oldVal=e.val;if(!onlyIfAbsent)e.val=value;break;}Node<K,V>pred=e;//没有相同的就新增if((e=e.next)==null){pred.next=newNode<K,V>(hash,key,value,null);break;}}}//如果是树节点,进行树的操作elseif(finstanceofTreeBin){Node<K,V>p;binCount=2;if((p=((TreeBin<K,V>)f).putTreeVal(hash,key,value))!=null){oldVal=p.val;if(!onlyIfAbsent)p.val=value;}}}}//Bin超过阈值,进行树化if(binCount!=0){if(binCount>=TREEIFY_THRESHOLD)treeifyBin(tab,i);if(oldVal!=null)returnoldVal;break;}}}addCount(1L,binCount);returnnull;}
可以看到,在同步逻辑上,它使用的是 synchronized,而不是通常建议的 ReentrantLock 之类,这是为什么呢?现在 JDK1.8 中,synchronized 已经被不断优化,可以不再过分担心性能差异,另外,相比于 ReentrantLock,它可以减少内存消耗,这是个非常大的优势。
与此同时,更多细节实现通过使用 Unsafe 进行了优化,例如 tabAt 就是直接利用 getObjectAcquire,避免间接调用的开销。
那么,再来看看size是怎么操作的?
finallongsumCount(){CounterCell[]as=counterCells;CounterCella;longsum=baseCount;if(as!=null){for(inti=0;i<as.length;++i){if((a=as[i])!=null)sum+=a.value;}}returnsum;}
这里就是获取成员变量counterCells,遍历获取总数。
其实,对于 CounterCell 的操作,是基于 java.util.concurrent.atomic.LongAdder 进行的,是一种 JVM 利用空间换取更高效率的方法,利用了Striped64内部的复杂逻辑。这个东西非常小众,大多数情况下,建议还是使用 AtomicLong,足以满足绝大部分应用的性能需求。
扩容
privatefinalvoidtransfer(Node<K,V>[]tab,Node<K,V>[]nextTab){......//初始化if(nextTab==null){//initiatingtry{@SuppressWarnings("unchecked")Node<K,V>[]nt=(Node<K,V>[])newNode<?,?>[n<<1];nextTab=nt;}catch(Throwableex){//trytocopewithOOMEsizeCtl=Integer.MAX_VALUE;return;}nextTable=nextTab;transferIndex=n;}intnextn=nextTab.length;ForwardingNode<K,V>fwd=newForwardingNode<K,V>(nextTab);//是否继续处理下一个booleanadvance=true;//是否完成booleanfinishing=false;//toensuresweepbefore mittingnextTabfor(inti=0,bound=0;;){Node<K,V>f;intfh;while(advance){intnextIndex,nextBound;if(--i>=bound||finishing)advance=false;elseif((nextIndex=transferIndex)<=0){i=-1;advance=false;}//首次循环才会进来这里elseif(UpareAndSwapInt(this,TRANSFERINDEX,nextIndex,nextBound=(nextIndex>stride?nextIndex-stride:0))){bound=nextBound;i=nextIndex-1;advance=false;}}if(i<0||i>=n||i+n>=nextn){intsc;//扩容结束后做后续工作if(finishing){nextTable=null;table=nextTab;sizeCtl=(n<<1)-(n>>>1);return;}//每当一条线程扩容结束就会更新一次sizeCtl的值,进行减1操作if(UpareAndSwapInt(this,SIZECTL,sc=sizeCtl,sc-1)){if((sc-2)!=resizeStamp(n)<<RESIZE_STAMP_SHIFT)return;finishing=advance=true;i=n;//recheckbefore mit}}//如果是null,设置fwdelseif((f=tabAt(tab,i))==null)advance=casTabAt(tab,i,null,fwd);//说明该位置已经被处理过了,不需要再处理elseif((fh=f.hash)==MOVED)advance=true;//alreadyprocessedelse{//真正的处理逻辑synchronized(f){if(tabAt(tab,i)==f){Node<K,V>ln,hn;if(fh>=0){intrunBit=fh&n;Node<K,V>lastRun=f;for(Node<K,V>p=f.next;p!=null;p=p.next){intb=p.hash&n;if(b!=runBit){runBit=b;lastRun=p;}}if(runBit==0){ln=lastRun;hn=null;}else{hn=lastRun;ln=null;}for(Node<K,V>p=f;p!=lastRun;p=p.next){intph=p.hash;Kpk=p.key;Vpv=p.val;if((ph&n)==0)ln=newNode<K,V>(ph,pk,pv,ln);elsehn=newNode<K,V>(ph,pk,pv,hn);}setTabAt(nextTab,i,ln);setTabAt(nextTab,i+n,hn);setTabAt(tab,i,fwd);advance=true;}//树节点操作elseif(finstanceofTreeBin){......}}}}}}
}setTabAt(nextTab,i,ln);setTabAt(nextTab,i+n,hn);setTabAt(tab,i,fwd);advance=true;}//树节点操作elseif(finstanceofTreeBin){......}}}}}}
核心逻辑和HashMap一样也是创建两个链表,只是多了获取lastRun的操作。
读到这里,这篇“Java中怎么使用ConcurrentHashMap实现线程安全的Map”文章已经介绍完毕,想要掌握这篇文章的知识点还需要大家自己动手实践使用过才能领会,如果想了解更多相关内容的文章,欢迎关注主机评测网行业资讯频道。
上一篇:vue3使用reactive赋值后页面不改变怎么解决
下一篇:Java?web访问localhost:8080/xx/xx.jsp报404错误如何解决