C#中怎么使用CAS实现无锁算法
本篇内容介绍了“C#中怎么使用CAS实现无锁算法”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
CAS 的基本概念
CASpare-and-Swap)是一种多线程并发编程中常用的原子操作,用于实现多线程间的同步和互斥访问。 它操作通常包含三个参数:一个内存地址(通常是一个共享变量的地址)、期望的旧值和新值。
<ppareAndSwap(内存地址,期望的旧值,新值)
CAS 操作会比较内存地址处的值与期望的旧值是否相等,如果相等,则将新值写入该内存地址; 如果不相等,则不进行任何操作。这个比较和交换的操作是一个原子操作,不会被其他线程中断。
CAS 通常是通过硬件层面的CPU指令实现的,其原子性是由硬件保证的。具体的实现方式根据环境会有所不同。
CAS 操作通常会有一个返回值,用于表示操作是否成功。返回结果可能是true或false,也可能是内存地址处的旧值。
相比于传统的锁机制,CAS 有一些优势:
原子性:CAS 操作是原子的,不需要额外的锁来保证多线程环境下的数据一致性,避免了锁带来的性能开销和竞争条件。
无阻塞:CAS 操作是无阻塞的,不会因为资源被锁定而导致线程的阻塞和上下文切换,提高了系统的并发性和可伸缩性。
适用性:CAS 操作可以应用于广泛的数据结构和算法,如自旋锁、计数器、队列等,使得它在实际应用中具有较大的灵活性和适用性。
C# 中如何使用 CAS
在 C# 中,我们可以使用 Interlocked 类来实现 CAS 操作。
Interlocked 类提供了一组pareExchange 的重载方法,用于实现不同类型的数据的 CAS 操作。
publicstaticint pareExchange(refintlocation1,intvalue,int parand);publicstaticlong pareExchange(reflonglocation1,longvalue,long parand);//...省略其他重载方法publicstaticobject pareExchange(refobjectlocation1,objectvalue,object parand);publicstaticT pareExchange<T>(refTlocation1,Tvalue,T parand)whereT:class;<ppareExchange 方法将 location1 内存地址处的值与parand 比较,如果相等,则将 value 写入 location1 内存地址处,否则不进行任何操作。
该方法返回 location1 内存地址处的值。
通过判断方法返回值与parand 是否相等,我们就可以知道pareExchange 方法是否执行成功。
算法示例
在使用 CAS 实现无锁算法时,通常我们不光是为了比较和更新一个数据,还需要在更新成功后进行下一步的操作。结合 while(true) 循环,我们可以不断地尝试更新数据,直到更新成功为止。
伪代码如下:
while(true){//读取数据oldValue=...;//计算新值newValue=...;//CAS更新数据result= pareExchange(reflocation,newValue,oldValue);//判断CAS是否成功if(result==oldValue){//CAS成功,执行后续操作break;}}
在复杂的无锁算法中,因为每一步操作都是独立的,连续的操作并非原子,所以我们不光要借助 CAS,每一步操作前都应判断是否有其他线程已经修改了数据。
示例1:计数器
下面是一个简单的计数器类,它使用 CAS 实现了一个线程安全的自增操作。
publicclassCounter{privateint_value;publicintIncrement(){while(true){intoldValue=_value;intnewValue=oldValue+1;intresult=InterlockedpareExchange(ref_value,newValue,oldValue);if(result==oldValue){returnnewValue;}}}}
CLR 底层源码中,我们也会经常看到这样的代码,比如 ThreadPool 增加线程时的计数器。github/dotnet/runtime/blob/release/6.0/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs#L446
internalvoidEnsureThreadRequested(){////Ifwehavenotyetrequested#procsthreads,thenrequestanewthread.////CoreCLR:NotethatthereisaseparatecountintheVMwhichhasalreadybeenincremented//bytheVMbythetimewereachthispoint.//intcount=_separated.numOutstandingThreadRequests;while(count<Environment.ProcessorCount){intprev=InterlockedpareExchange(ref_separated.numOutstandingThreadRequests,count+1,count);if(prev==count){ThreadPool.RequestWorkerThread();break;}count=prev;}}
示例2:队列
下面是一个简单的队列类,它使用 CAS 实现了一个线程安全的入队和出队操作。相较于上面的计数器,这里的操作更加复杂,我们每一步都需要考虑是否有其他线程已经修改了数据。
这样的算法有点像薛定谔的猫,你不知道它是死是活,只有当你试图去观察它的时候,它才可能会变成死或者活。
publicclassConcurrentQueue<T>{//_head和_tail是两个伪节点,_head._next指向队列的第一个节点,_tail指向队列的最后一个节点。//_head和_tail会被多个线程修改和访问,所以要用volatile修饰。privatevolatileNode_head;privatevolatileNode_tail;publicConcurrentQueue(){_head=newNode(default);//_tail指向_head时,队列为空。_tail=_head;}publicvoidEnqueue(Titem){varnode=newNode(item);while(true){Nodetail=_tail;Nodenext=tail._next;//判断给next赋值的这段时间,是否有其他线程修改过_tailif(tail==_tail){//如果next为null,则说明从给tail赋值到给next赋值这段时间,没有其他线程修改过tail._next,if(next==null){//如果tail._next为null,则说明从给tail赋值到这里,没有其他线程修改过tail._next,//tail依旧是队列的最后一个节点,我们就可以直接将node赋值给tail._next。if(InterlockedpareExchange(reftail._next,node,null)==null){//如果_tail==tail,则说明从上一步CAS操作到这里,没有其他线程修改过_tail,也就是没有其他线程执行过Enqueue操作。//那么当前线程Enqueue的node就是队列的最后一个节点,我们就可以直接将node赋值给_tail。InterlockedpareExchange(ref_tail,node,tail);break;}}//如果next不为null,则说明从给tail赋值到给next赋值这段时间,有其他线程修改过tail._next,else{//如果没有其他线程修改过_tail,那么next就是队列的最后一个节点,我们就可以直接将next赋值给_tail。InterlockedpareExchange(ref_tail,next,tail);}}}}publicboolTryDequeue(outTitem){while(true){Nodehead=_head;Nodetail=_tail;Nodenext=head._next;//判断_head是否被修改过//如果没有被修改过,说明从给head赋值到给next赋值这段时间,没有其他线程执行过Dequeue操作。if(head==_head){//如果head==tail,说明队列为空if(head==tail){//虽然上面已经判断过队列是否为空,但是在这里再判断一次//是为了防止在给tail赋值到给next赋值这段时间,有其他线程执行过Enqueue操作。if(next==null){item=default;returnfalse;}//如果next不为null,则说明从给tail赋值到给next赋值这段时间,有其他线程修改过tail._next,也就是有其他线程执行过Enqueue操作。//那么next就可能是队列的最后一个节点,我们尝试将next赋值给_tail。InterlockedpareExchange(ref_tail,next,tail);}//如果head!=tail,说明队列不为空else{item=next._item;if(InterlockedpareExchange(ref_head,next,head)==head){//如果_head没有被修改过//说明从给head赋值到这里,没有其他线程执行过Dequeue操作,上面的item就是队列的第一个节点的值。//我们就可以直接返回。break;}//如果_head被修改过//说明从给head赋值到这里,有其他线程执行过Dequeue操作,上面的item就不是队列的第一个节点的值。//我们就需要重新执行Dequeue操作。}}}returntrue;}privateclassNode{publicreadonlyT_item;publicNode_next;publicNode(Titem){_item=item;}}}
我们可以通过以下代码来进行测试
usingSystem.Collections.Concurrent;varqueue=newConcurrentQueue<int>();varresults=newConcurrentBag<int>();intdequeueRetryCount=0;varenqueueTask=Task.Run(()=>{//确保Enqueue前dequeueTask已经开始运行Thread.Sleep(10);Console.WriteLine("Enqueuestart");Parallel.For(0,100000,i=>queue.Enqueue(i));Console.WriteLine("Enqueuedone");});vardequeueTask=Task.Run(()=>{Thread.Sleep(10);Console.WriteLine("Dequeuestart");Parallel.For(0,100000,i=>{while(true){if(queue.TryDequeue(outintresult)){results.Add(result);break;}Interlocked.Increment(refdequeueRetryCount);}});Console.WriteLine("Dequeuedone");});awaitTask.WhenAll(enqueueTask,dequeueTask);Console.WriteLine($"Enqueueanddequeuedone,totaldatacount:{results.Count},dequeueretrycount:{dequeueRetryCount}");varhashSet=results.ToHashSet();for(inti=0;i<100000;i++){if(!hashSet.Contains(i)){Console.WriteLine("Error,missing"+i);break;}}Console.WriteLine("Done");
输出结果:
Dequeue start
Enqueue start
Enqueue done
Dequeue done
Enqueue and dequeue done, total data count: 100000, dequeue retry count: 10586
Done
上述的 retry count 为 797,说明在 100000 次的 Dequeue 操作中,有 10586 次的 Dequeue 操作需要重试,那是因为在 Dequeue 操作中,可能暂时没有数据可供 Dequeue,需要等待其他线程执行 Enqueue 操作。
当然这个 retry count 是不稳定的,因为在多线程环境下,每次执行的结果都可能不一样。
“C#中怎么使用CAS实现无锁算法”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注主机评测网网站,小编将为大家输出更多高质量的实用文章!
下一篇:elementPlus修改主题色及皮肤设置的方法是什么