大家都知道在天猫、京东、苏宁等等电商网站上有很多秒杀活动,例如在某一个时候抢购一个原价1999现在秒杀价只要999的手机时,会迎来一个用户要求的高峰期,会有几十万几百万的并发量,来抢这个手机,在高并发的环境下会队数据库做事器或者是文件做事器运用做事器造成巨大的压力,严重时说不定就宕机了。
另一个问题是,秒杀的东西都是有量的,例如一款手机只有10台的量秒杀,那么,在高并发的情形下,成千上万条数据更新数据库(例如10台的量被人抢一台就会在数据集某些记录下 减1),那次这个时候的先后顺序是很乱的,很随意马虎涌现10台的量,抢到的人就不止10个这种严重的问题。那么,往后所说的问题我们该如何去办理呢?接下来我所分享的技能就可以拿来处理以上的问题:分布式锁 和 任务行列步队。
二、实现思路

1.Redis实现分布式锁思路
思路很大略,紧张用到的redis函数是setnx(),这个该当是实现分布式锁最紧张的函数。首先是将某一任务标识名(这里用Lock:order作为标识名的例子)作为键存到redis里,并为其设个过期韶光,如果是还有Lock:order要求过来,先是通过setnx()看看是否能将Lock:order插入到redis里,可以的话就返回true,不可以就返回false。当然,在我的代码里会比这个思路繁芜一些,我会在剖析代码时进一步解释。
2.Redis实现任务行列步队
这里的实现会用到上面的Redis分布式的锁机制,紧张是用到了Redis里的有序凑集这一数据构造。例如入队时,通过zset的add()函数进行入队,而出对时,可以用到zset的getScore()函数。其余还可以弹召盘部的几个任务。
以上便是实现 分布式锁 和 任务行列步队 的大略思路,如果你看完有点模棱两可,那请看接下来的代码实现。
三、代码剖析
1.先来剖析Redis分布式锁的代码实现
1)为避免分外缘故原由导致锁无法开释,在加锁成功后,锁会被授予一个生存韶光(通过lock方法的参数设置或者利用默认值),超出生存韶光锁会被自动开释锁的生存韶光默认比较短(秒级),因此,若须要永劫光加锁,可以通过expire方法延长锁的生存韶光为适当韶光,比如在循环内。
2)系统级的锁当进程无论何种缘故原由时涌现crash时,操作系统会自己回收锁,以是不会涌现资源丢失,但分布式锁不用,若一次性设置很永劫光,一旦由于各种缘故原由涌现进程crash 或者其他非常导致unlock未被调用时,则该锁在剩下的韶光就会变成垃圾锁,导致其他进程或者进程重启后无法进入加锁区域。
先看加锁的实当代码:这里须要紧张两个参数,一个是$timeout,这个是循环获取锁的等待韶光,在这个韶光内会一贯考试测验获取锁知道超时,如果为0,则表示获取锁失落败后直接返回而不再等待;另一个主要参数的$expire,这个参数指当前锁的最大生存韶光,以秒为单位的,它必须大于0,如果超过生存韶光锁仍未被开释,则系统会自动逼迫开释。这个参数的最要浸染请看上面的(1)里的阐明。
这里先取得当前韶光,然后再获取到锁失落败时的等待超时的时候(是个韶光戳),再获取到锁的最大生存时候是多少。这里redis的key用这种格式:"Lock:锁的标识名",这里就开始进入循环了,先是插入数据到redis里,利用setnx()函数,这函数的意思是,如果该键不存在则插入数据,将最大生存时候作为值存储,如果插入成功,则对该键进行失落效韶光的设置,并将该键放在$lockedName数组里,返回true,也便是上锁成功;
如果该键存在,则不会插入操作了,这里有一步严谨的操作,那便是取得当前键的剩余韶光,如果这个韶光小于0,表示key上没有设置生存韶光(key是不会不存在的,由于前面setnx会自动创建)如果涌现这种状况,那便是进程的某个实例setnx成功后 crash 导致紧随着的expire没有被调用,这时可以直接设置expire并把锁纳为己用。如果没设置锁失落败的等待韶光 或者 已超过最大等待韶光了,那就退出循环,反之则 隔 $waitIntervalUs 后连续 要求。这便是加锁的整一个代码剖析。
/加锁@param[type]$name锁的标识名@paraminteger$timeout循环获取锁的等待超时时间,在此韶光内会一贯考试测验获取锁直到超时,为0表示失落败后直接返回不等待@paraminteger$expire当前锁的最大生存韶光(秒),必须大于0,如果超过生存韶光锁仍未被开释,则系统会自动逼迫开释@paraminteger$waitIntervalUs获取锁失落败后挂起再试的韶光间隔(微秒)@return[type][description]/publicfunctionlock($name,$timeout=0,$expire=15,$waitIntervalUs=100000){if($name==null)returnfalse;//取得当前韶光$now=time();//获取锁失落败时的等待超时时刻$timeoutAt=$now+$timeout;//锁的最大生存时候$expireAt=$now+$expire;$redisKey="Lock:{$name}";while(true){//将rediskey的最大生存时候存到redis里,过了这个时候该锁会被自动开释$result=$this->redisString->setnx($redisKey,$expireAt);if($result!=false){//设置key的失落效韶光$this->redisString->expire($redisKey,$expireAt);//将锁标志放到lockedNames数组里$this->lockedNames[$name]=$expireAt;returntrue;}//以秒为单位,返回给定key的剩余生存韶光$ttl=$this->redisString->ttl($redisKey);//ttl小于0表示key上没有设置生存韶光(key是不会不存在的,由于前面setnx会自动创建)//如果涌现这种状况,那便是进程的某个实例setnx成功后crash导致紧随着的expire没有被调用//这时可以直接设置expire并把锁纳为己用if($ttl<0){$this->redisString->set($redisKey,$expireAt);$this->lockedNames[$name]=$expireAt;returntrue;}/循环要求锁部分///如果没设置锁失落败的等待韶光或者已超过最大等待韶光了,那就退出if($timeout<=0||$timeoutAt<microtime(true))break;//隔$waitIntervalUs后连续要求usleep($waitIntervalUs);}returnfalse;}
接着看解锁的代码剖析:解锁就大略多了,传入参数便是锁标识,先是判断是否存在该锁,存在的话,就从redis里面通过deleteKey()函数删除掉锁标识即可。
/解锁@param[type]$name[description]@return[type][description]/publicfunctionunlock($name){//先判断是否存在此锁if($this->isLocking($name)){//删除锁if($this->redisString->deleteKey("Lock:$name")){//清掉lockedNames里的锁标志unset($this->lockedNames[$name]);returntrue;}}returnfalse;}
在贴上删除掉所有锁的方法,实在都一个样,多了个循环遍历而已。
/开释当前所有得到的锁@return[type][description]/publicfunctionunlockAll(){//此标志是用来标志是否开释所有锁成功$allSuccess=true;foreach($this->lockedNamesas$name=>$expireAt){if(false===$this->unlock($name)){$allSuccess=false;}}return$allSuccess;}
以上便是用Redis实现分布式锁的整一套思路和代码实现的总结和分享,这里我附上正一个实现类的代码,代码里我基本上对每一行进行了注释,方便大家快速看懂并且能仿照运用。想要深入理解的请看全体类的代码:
/在redis上实现分布式锁/classRedisLock{private$redisString;private$lockedNames=[];publicfunction__construct($param=NULL){$this->redisString=RedisFactory::get($param)->string;}/加锁@param[type]$name锁的标识名@paraminteger$timeout循环获取锁的等待超时时间,在此韶光内会一贯考试测验获取锁直到超时,为0表示失落败后直接返回不等待@paraminteger$expire当前锁的最大生存韶光(秒),必须大于0,如果超过生存韶光锁仍未被开释,则系统会自动逼迫开释@paraminteger$waitIntervalUs获取锁失落败后挂起再试的韶光间隔(微秒)@return[type][description]/publicfunctionlock($name,$timeout=0,$expire=15,$waitIntervalUs=100000){if($name==null)returnfalse;//取得当前韶光$now=time();//获取锁失落败时的等待超时时刻$timeoutAt=$now+$timeout;//锁的最大生存时候$expireAt=$now+$expire;$redisKey="Lock:{$name}";while(true){//将rediskey的最大生存时候存到redis里,过了这个时候该锁会被自动开释$result=$this->redisString->setnx($redisKey,$expireAt);if($result!=false){//设置key的失落效韶光$this->redisString->expire($redisKey,$expireAt);//将锁标志放到lockedNames数组里$this->lockedNames[$name]=$expireAt;returntrue;}//以秒为单位,返回给定key的剩余生存韶光$ttl=$this->redisString->ttl($redisKey);//ttl小于0表示key上没有设置生存韶光(key是不会不存在的,由于前面setnx会自动创建)//如果涌现这种状况,那便是进程的某个实例setnx成功后crash导致紧随着的expire没有被调用//这时可以直接设置expire并把锁纳为己用if($ttl<0){$this->redisString->set($redisKey,$expireAt);$this->lockedNames[$name]=$expireAt;returntrue;}/循环要求锁部分///如果没设置锁失落败的等待韶光或者已超过最大等待韶光了,那就退出if($timeout<=0||$timeoutAt<microtime(true))break;//隔$waitIntervalUs后连续要求usleep($waitIntervalUs);}returnfalse;}/解锁@param[type]$name[description]@return[type][description]/publicfunctionunlock($name){//先判断是否存在此锁if($this->isLocking($name)){//删除锁if($this->redisString->deleteKey("Lock:$name")){//清掉lockedNames里的锁标志unset($this->lockedNames[$name]);returntrue;}}returnfalse;}/开释当前所有得到的锁@return[type][description]/publicfunctionunlockAll(){//此标志是用来标志是否开释所有锁成功$allSuccess=true;foreach($this->lockedNamesas$name=>$expireAt){if(false===$this->unlock($name)){$allSuccess=false;}}return$allSuccess;}/给当前所增加指定生存韶光,必须大于0@param[type]$name[description]@return[type][description]/publicfunctionexpire($name,$expire){//先判断是否存在该锁if($this->isLocking($name)){//所指定的生存韶光必须大于0$expire=max($expire,1);//增加锁生存韶光if($this->redisString->expire("Lock:$name",$expire)){returntrue;}}returnfalse;}/判断当前是否拥有指定名字的所@param[type]$name[description]@returnboolean[description]/publicfunctionisLocking($name){//先看lonkedName[$name]是否存在该锁标志名if(isset($this->lockedNames[$name])){//从redis返回该锁的生存韶光return(string)$this->lockedNames[$name]=(string)$this->redisString->get("Lock:$name");}returnfalse;}}Redis实现分布式锁
2.用Redis实现任务行列步队的代码剖析
1)任务行列步队,用于将业务逻辑中可以异步处理的操作放入行列步队中,在其他线程中处理后出
2)行列步队中利用了分布式锁和其他逻辑,担保入队和出队的同等性
3)这个行列步队和普通行列步队不一样,入队时的id是用来区分重复入队的,行列步队里面只会有一条记录,同一个id后入的覆盖前入的,而不是追加, 如果需求哀求重复入队当做不用的任务,请利用不同的id区分
先看入队的代码剖析:首先当然是对参数的合法性检测,接着就用到上面加锁机制的内容了,便是开始加锁,入队时我这里选择当前韶光戳作为score,接着便是入队了,利用的是zset数据构造的add()方法,入队完成后,就对该任务解锁,即完成了一个入队的操作。
/入队一个Task@param[type]$name行列步队名称@param[type]$id任务id(或者其数组)@paraminteger$timeout入队超时时间(秒)@paraminteger$afterInterval[description]@return[type][description]/publicfunctionenqueue($name,$id,$timeout=10,$afterInterval=0){//合法性检测if(empty($name)||empty($id)||$timeout<=0)returnfalse;//加锁if(!$this->_redis->lock->lock("Queue:{$name}",$timeout)){Logger::get('queue')->error("enqueuefaildbecouseoflockfailure:name=$name,id=$id");returnfalse;}//入队时以当前韶光戳作为score$score=microtime(true)+$afterInterval;//入队foreach((array)$idas$item){//先判断下是否已经存在该id了if(false===$this->_redis->zset->getScore("Queue:$name",$item)){$this->_redis->zset->add("Queue:$name",$score,$item);}}//解锁$this->_redis->lock->unlock("Queue:$name");returntrue;}
接着来看一下出队的代码剖析:出队一个Task,须要指定它的$id 和 $score,如果$score与行列步队中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失落败处理。首先和对参数进行合法性检测,接着又用到加锁的功能了,然后及时出队了;
先利用getScore()从Redis里获取到该id的score,然后将传入的$score和Redis里存储的score进行比拟,如果两者相等就进行出队操作,也便是利用zset里的delete()方法删掉该任务id,末了当前便是解锁了。这便是出队的代码剖析。
/出队一个Task,须要指定$id和$score如果$score与行列步队中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失落败处理@param[type]$name行列步队名称@param[type]$id任务标识@param[type]$score任务对应score,从行列步队中获取任务时会返回一个score,只有$score和行列步队中的值匹配时Task才会被出队@paraminteger$timeout超时时间(秒)@return[type]Task是否成功,返回false可能是redis操作失落败,也有可能是$score与行列步队中的值不匹配(这表示该Task自从获取到本地之后被其他线程入队过)/publicfunctiondequeue($name,$id,$score,$timeout=10){//合法性检测if(empty($name)||empty($id)||empty($score))returnfalse;//加锁if(!$this->_redis->lock->lock("Queue:$name",$timeout)){Logger:get('queue')->error("dequeuefaildbecouseoflocklailure:name=$name,id=$id");returnfalse;}//出队//先取出redis的score$serverScore=$this->_redis->zset->getScore("Queue:$name",$id);$result=false;//先判断传进来的score和redis的score是否是一样if($serverScore==$score){//删掉该$id$result=(float)$this->_redis->zset->delete("Queue:$name",$id);if($result==false){Logger::get('queue')->error("dequeuefaildbecauseofredisdeletefailure:name=$name,id=$id");}}//解锁$this->_redis->lock->unlock("Queue:$name");return$result;}
学过数据构造的朋友都该当知道,行列步队操作还有弹召盘部某个值的方法等等,这里处理入队出队操作,我还实现了 获取行列步队顶部多少个Task 并将其出队的方法,想理解的朋友可以看这段代码,如果看不太明白就留言,这里我不再对其进行剖析了。
/获取行列步队顶部多少个Task并将其出队@param[type]$name行列步队名称@paraminteger$count数量@paraminteger$timeout超时时间@return[type]返回数组[0=>['id'=>,'score'=>],1=>['id'=>,'score'=>],2=>['id'=>,'score'=>]]/publicfunctionpop($name,$count=1,$timeout=10){//合法性检测if(empty($name)||$count<=0)return[];//加锁if(!$this->_redis->lock->lock("Queue:$name")){Log::get('queue')->error("popfaildbecauseofpopfailure:name=$name,count=$count");returnfalse;}//取出多少的Task$result=[];$array=$this->_redis->zset->getByScore("Queue:$name",false,microtime(true),true,false,[0,$count]);//将其放在$result数组里并删除掉redis对应的idforeach($arrayas$id=>$score){$result[]=['id'=>$id,'score'=>$score];$this->_redis->zset->delete("Queue:$name",$id);}//解锁$this->_redis->lock->unlock("Queue:$name");return$count==1?(empty($result)?false:$result[0]):$result;}
以上便是用Redis实现任务行列步队的整一套思路和代码实现的总结和分享,这里我附上正一个实现类的代码,代码里我基本上对每一行进行了注释,方便大家快速看懂并且能仿照运用。想要深入理解的请看全体类的代码:
/任务行列步队/classRedisQueue{private$_redis;publicfunction__construct($param=null){$this->_redis=RedisFactory::get($param);}/入队一个Task@param[type]$name行列步队名称@param[type]$id任务id(或者其数组)@paraminteger$timeout入队超时时间(秒)@paraminteger$afterInterval[description]@return[type][description]/publicfunctionenqueue($name,$id,$timeout=10,$afterInterval=0){//合法性检测if(empty($name)||empty($id)||$timeout<=0)returnfalse;//加锁if(!$this->_redis->lock->lock("Queue:{$name}",$timeout)){Logger::get('queue')->error("enqueuefaildbecouseoflockfailure:name=$name,id=$id");returnfalse;}//入队时以当前韶光戳作为score$score=microtime(true)+$afterInterval;//入队foreach((array)$idas$item){//先判断下是否已经存在该id了if(false===$this->_redis->zset->getScore("Queue:$name",$item)){$this->_redis->zset->add("Queue:$name",$score,$item);}}//解锁$this->_redis->lock->unlock("Queue:$name");returntrue;}/出队一个Task,须要指定$id和$score如果$score与行列步队中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失落败处理@param[type]$name行列步队名称@param[type]$id任务标识@param[type]$score任务对应score,从行列步队中获取任务时会返回一个score,只有$score和行列步队中的值匹配时Task才会被出队@paraminteger$timeout超时时间(秒)@return[type]Task是否成功,返回false可能是redis操作失落败,也有可能是$score与行列步队中的值不匹配(这表示该Task自从获取到本地之后被其他线程入队过)/publicfunctiondequeue($name,$id,$score,$timeout=10){//合法性检测if(empty($name)||empty($id)||empty($score))returnfalse;//加锁if(!$this->_redis->lock->lock("Queue:$name",$timeout)){Logger:get('queue')->error("dequeuefaildbecouseoflocklailure:name=$name,id=$id");returnfalse;}//出队//先取出redis的score$serverScore=$this->_redis->zset->getScore("Queue:$name",$id);$result=false;//先判断传进来的score和redis的score是否是一样if($serverScore==$score){//删掉该$id$result=(float)$this->_redis->zset->delete("Queue:$name",$id);if($result==false){Logger::get('queue')->error("dequeuefaildbecauseofredisdeletefailure:name=$name,id=$id");}}//解锁$this->_redis->lock->unlock("Queue:$name");return$result;}/获取行列步队顶部多少个Task并将其出队@param[type]$name行列步队名称@paraminteger$count数量@paraminteger$timeout超时时间@return[type]返回数组[0=>['id'=>,'score'=>],1=>['id'=>,'score'=>],2=>['id'=>,'score'=>]]/publicfunctionpop($name,$count=1,$timeout=10){//合法性检测if(empty($name)||$count<=0)return[];//加锁if(!$this->_redis->lock->lock("Queue:$name")){Logger::get('queue')->error("popfaildbecauseofpopfailure:name=$name,count=$count");returnfalse;}//取出多少的Task$result=[];$array=$this->_redis->zset->getByScore("Queue:$name",false,microtime(true),true,false,[0,$count]);//将其放在$result数组里并删除掉redis对应的idforeach($arrayas$id=>$score){$result[]=['id'=>$id,'score'=>$score];$this->_redis->zset->delete("Queue:$name",$id);}//解锁$this->_redis->lock->unlock("Queue:$name");return$count==1?(empty($result)?false:$result[0]):$result;}/获取行列步队顶部的多少个Task@param[type]$name行列步队名称@paraminteger$count数量@return[type]返回数组[0=>['id'=>,'score'=>],1=>['id'=>,'score'=>],2=>['id'=>,'score'=>]]/publicfunctiontop($name,$count=1){//合法性检测if(empty($name)||$count<1)return[];//取错多少个Task$result=[];$array=$this->_redis->zset->getByScore("Queue:$name",false,microtime(true),true,false,[0,$count]);//将Task存放在数组里foreach($arrayas$id=>$score){$result[]=['id'=>$id,'score'=>$score];}//返回数组return$count==1?(empty($result)?false:$result[0]):$result;}}Redis实现任务行列步队
到此,这两大块功能基本讲解完毕,对付任务行列步队,你可以写一个shell脚本,让做事器定时运行某些程序,实现入队出队等操作,这里我就不在将其与实际运用结合起来去实现了,大家理解好这两大功能的实现思路即可;
由于代码用的是PHP措辞来写的,如果你理解了实现思路,你完备可以利用java或者是.net等等其他措辞去实现这两个功能。这两大功能的运用处景十分多,特殊是秒杀,另一个便是春运抢火车票,这两个是最光鲜的例子了。当然还有很多地方用到,这里我不再逐一列举。
领取办法:点赞关注领取办法:点赞关注小编后私信【资料】获取资料领取办法!