博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
cas 实现无锁栈和队列
阅读量:3943 次
发布时间:2019-05-24

本文共 7935 字,大约阅读时间需要 26 分钟。

文章目录

什么是CAS?能做什么事情?

CAS(Compare-and-Swap),即比较并替换,是一种实现并发算法时常用到的技术

在并发程序中,如果多个线程共享一个变量,通过CAS可以在不加锁的情况下实现对共享变量的安全的访问。

CAS和Linux普通的锁有什么区别?

Linux 普通锁:

在并发编程中,当一个线程访问共享内存前,会先对共享区加锁,当其他进程想要访问共享区的时候,会试图对共享区加锁,如果此时已经有其他线程对共享区加了锁,该进程会进入阻塞状态,直到被其他释放锁的线程唤醒为止,进入阻塞状态时cpu要切换线程执行,所以要保存当前进程的执行状态.(高速缓存中要重新缓存其他线程的资源。)

(进入阻塞状态的线程不占用cpu的资源)

CAS:

CAS不通过阻塞线程来实现对共享内存的安全访问,通过将对共享内存的读写操作实现为原子操作,cas的操作原型

CAS(void *ptr,type oldVal,type newVal){
if(*ptr == oldVal) {
*ptr = newVal; }}

如果两个线程都获得了oldVal,当一个线程成功地将oldVal替换为newVal的时候,另一个线程的操作就会失败.

为什么原子操作就能保证线程安全?
这里先说一下原子操作的实现原理

原子操作

1、使用总线锁保证原子性:

在x86的平台上,cpu提供了在指令执行期间对指令加锁的手段,CPU芯片上有一条引线#LOCK pin, 如果汇编语言的程序中在一条指令前面加上前缀"LOCK",经过汇编以后的机器代码就使CPU在执行这条指令的时候把#LOCK pin的电位拉低, 持续到这条指令结束时放开, 从而把总线锁住,这样同一总线上别的CPU就暂时不能通过总线访问内存了,保证了这条指令在多处理器环境中的原子性

  在同一时刻我们只需保证对某个内存地址的操作是原子性即可,但总线锁定把CPU和内存之间通信锁住了,这使得锁定期间,其他处理器不能操作其他内存地址的数据,所以总线锁定的开销比较大.
  
2、使用缓存锁保证原子性:

缓存锁:所谓“缓存锁定”是指内存区域如果被缓存在处理器的缓存行中,并且在Lock操作期间被锁定,那么当它执行锁操作回写到内存时,处理器不在总线上声言LOCK#信号,而是修改内部的内存地址,并允许它的缓存一致性机制来保证操作的原子性,因为缓存一致性机制会阻止同时修改由两个以上处理器缓存的内存区域数据,当其他处理器回写已被锁定的缓存行数据时,会使缓存行无效。

缓存一致性协议:

M:Modified 被修改的

处于这一状态的数据,只能在本CPU中有缓存数据,其他CPU中都没有。相对内存中的值来说,是已经被修改的且没有被更新到主存中
E:Exclusive 独占的
处于这一种状态的数据 只有本CPU中有缓存 且其数据没有修改
S:Shared 共享的
处于该种状态的数据在多个CPU中都有缓存
I:Invalid 无效的
本CPU中的这份缓存已经无效

每种状态对应的监听:

  1. 一个处于M状态的缓存行 必须时刻监听所有试图读取该缓存行对应的主存地址的操作,如果监听到,就必须在此操作执行之前把缓存行中的数据写回内存

  2. 一个处于E状态的缓存行 必须时刻监听其他试图读取该缓存行对应的主存地址的操作,如果监听到,必须把缓存行状态设置为S

  3. 一个处于S状态的缓存行 必须时刻监听使该缓存行无效或者独享该缓存行的请求,如果监听到,就把缓存行的状态设置为I

CPU需要读数据时:

  1. 如果其缓存行状态是I,则需要从内存中读取并把自己的状态设置成S

  2. 如果不是I,则可以直接从缓存中读取数值 但是在此之前 需要等待其他CPU的监听结果,如果其他CPU也有该数据的缓存且状态是M,则要等其他CPU把缓存更新要内存后再读;

CPU需要写数据时:

只有在状态是M或者E的时候才能执行,否则需要发出特殊的RFO指令(Read Or Ownership,这是一种总线事务),通知其他CPU置缓存无效(I),这种情况下会性能开销是相对较大的.在写入完成后,修改其缓存状态为M.

所以如果一个变量在某段时间只被一个线程频繁地修改,则使用其内部缓存就完全可以办到,(其他线程没有访问,所以短时间内不用将缓存的数据写入内存中)不涉及到总线事务,如果缓存一会被这个CPU独占、一会被那个CPU 独占,这时才会不断产生RFO指令影响到并发性能.这里说的缓存频繁被独占并不是指线程越多越容易触发,而是这里的CPU协调机制,这有点类似于有时多线程并不一定提高效率,原因是线程挂起、调度的开销比执行任务的开销还要大,这里的多CPU也是一样,如果在CPU间调度不合理,也会形成RFO指令的开销比任务开销还要大.(cpu的亲和性)

并非所有情况都会使用缓存一致性的,如被操作的数据不能被缓存在CPU内部或操作数据跨越多个缓存行(状态无法标识),则处理器会调用总线锁定;

CAS 操作的缺点

(1)、循环时间长,开销大

(2)、只能保证一个共享变量的原子操作
(3)、ABA问题:

CAS 实现无锁栈

#include 
#include
#include
using namespace std;class CStack{
public: CStack(int size); ~CStack(); void push(int &val); void pop(); void show(); bool empty()const; bool full()const;private: int *mdata; int mtop; int msize;};CStack::CStack(int size = 10){
msize = size; mtop = 0; mdata = new int[size];}CStack::~CStack(){
delete []mdata;}//push操作与pop操作不能同时进行,否则会产生线程安全问题void CStack::push(int &val){
int *ptmp = mdata; while(1) {
/* ************************************************************ 当mdata = newdata 执行后, msize = 2*msize之前,有一个新进程执行了 int *tmp = mdata;并且进入while循环,会重复修改mdata 的值 所以必须将msize = 2*msize;与mdata = newdata;改为同时操作 ************************************************************ */ while(mtop >= msize) {
//当__sync_bool_compare_and_swap(&mdata,tmp,NULL)刚刚执行完其他线程就执行 int *tmp = mdata; //if(flag && ptmp != NULL) if(ptmp != NULL) {
/* ******************************************************************** 当mdata = newdata 执行后, msize = 2*msize之前,有一个新进程执行了 int *tmp = mdata;并且进入while循h环 如果没有if(msize == mtop) 会再次扩容 mdata,并且访问未初始化的数据 ******************************************************************** */ if(msize == mtop) {
/* ************************************************************************** mdata 与 msize 如果只修改了其中一个,而其他线程就再次进行扩容操作,这样就可能 连续扩容两次,flag 标志位防止在两个数据完全修改之前,其他线程再次扩容 ************************************************************************** */ if(__sync_bool_compare_and_swap(&mdata,ptmp,NULL)) {
int *newdata = new int[2*msize]; memcpy(newdata,ptmp,sizeof(int)*mtop); mdata = newdata; msize = 2*msize; delete []ptmp; } } } } do {
//循环获取最新的栈顶指针 int tmp = mtop; /* ****************************************************************** 多个线程同时执行push操作,可能一个线程push之后,栈为满了,这时其他进程 就跳出本层循环,进入外层循环,进行扩容操作 ****************************************************************** */ if(tmp >= msize) {
break; } if(__sync_bool_compare_and_swap(&mtop,tmp,tmp+1)) {
/* *********************************************************** 因为是先将mtop 的值加一,所以将val写入mtop-1的位置即可, 也就是tmp的位置 *********************************************************** */ mdata[tmp] = val; return ; } }while(1); }}bool CStack::empty()const{
return mtop == 0;}bool CStack::full()const{
return mtop == msize;}void CStack::pop(){
if(empty()) {
return ; } do {
int tmp = mtop; if(tmp == 0) {
return ; } if(__sync_bool_compare_and_swap(&mtop,tmp,tmp-1)) {
break; } }while(1);}void CStack::show(){
for(int i = 0; i < mtop; i++) {
//cout << mdata[i] << " "; } cout << endl; cout << mtop << endl;}/*int main(int argc, char* argv[]){ CStack st; for(int i = 0; i < 1000000; i++) { st.push(i); } st.show(); cout <

CAS 实现无锁队列

#include 
#include
#include
//#include
using namespace std;struct Node{
Node(int data = 0):mdata(data),mpnext(NULL){
} int mdata; Node *mpnext;};class CQueue{
public: CQueue(); ~CQueue(); void push(int val); void pop(); void show();private: Node *mfront; Node *mrear;};//构造函数CQueue::CQueue(){
mfront = new Node(); mrear = mfront;}//析构函数CQueue::~CQueue(){
Node *ptmp; while(mfront != NULL) {
ptmp = mfront; mfront = mfront->mpnext; delete ptmp; }}//插入元素void CQueue::push(int val){
Node *newNode = new Node(val); do {
if(__sync_bool_compare_and_swap(&mrear->mpnext,NULL,newNode)) {
mrear = mrear->mpnext; break; } //在尾节点更新之前线程挂掉 }while( 1 );}//删除一个元素void CQueue::pop(){
Node *pdel = mfront->mpnext; if(pdel == NULL) {
return ; } do {
if(__sync_bool_compare_and_swap(&mfront->mpnext,pdel,pdel->mpnext)) {
//若删除的是最后一个节点,重置尾节点 /**************************************************************************************** 当一个线程进行删除的时候,另一个进程要进行插入操作,如果队列中的节点数多于1个 不会产生线程安全的问题,因为插入和删除操作的不是同一个共享变量 如果队列中只剩下一个节点,则存在线程安全的问题, ****************************************************************************************/ if(pdel->mpnext == NULL) {
mrear = mfront; } delete pdel; break; } //有进程删除失败,说明节点已被删除,将pdel置为下一个节点 pdel = mfront->mpnext; //pdel为空,说明已经被全部删除 if(pdel == NULL) {
break; } }while(1);}//打印函数void CQueue::show(){
Node *pcur = mfront->mpnext; int count = 0; while(pcur != NULL) {
//cout << pcur->mdata << " "; pcur = pcur->mpnext; count++; } cout << endl; cout << count << endl;}/*int main(){ CQueue queue; for(int i = 0; i < 100000; i++) { queue.push(i); } for(int i = 0; i < 100000; i++) { queue.pop(); } queue.show(); return 0; }*/

转载地址:http://otnwi.baihongyu.com/

你可能感兴趣的文章
window安装PHP的redis扩展
查看>>
给网站选择一个好的jquery库远程调…
查看>>
flash&nbsp;as&nbsp;与js通信(转)
查看>>
Linux系统手动安装rzsz&nbsp;软件包
查看>>
PHP的事务处理机制
查看>>
JS&nbsp;moveStart和moveEnd方法
查看>>
thrift的lua实现
查看>>
编写高性能的Lua代码
查看>>
Python正则表达式指南
查看>>
LUA--thrift--lib库的创建生成
查看>>
Shell开启扩展模式匹配shopt -s extglob
查看>>
浅谈 URI 及其转义
查看>>
nginx 优化
查看>>
openresty+lua在反向代理服务中的玩法
查看>>
ClickHouse集群搭建从0到1
查看>>
nginx实现请求的负载均衡 + keepalived实现nginx的高可用
查看>>
linux shell 中数组的定义和for循环遍历的方法
查看>>
求1!+2!+3!....+20!(java代码)
查看>>
VMware安装Ubuntu系统无法选择语言
查看>>
QT5.12安装
查看>>