说一下Redis的两个命令:
SETNX key value
setnx 是SET if Not eXists(如果不存在,则 SET)的简写。
用法如图,如果不存在set成功返回int的1,这个key存在了返回0。

SETEX key seconds value
将值 value 关联到 key ,并将 key 的生存韶光设为 seconds (以秒为单位)。
如果 key 已经存在,setex命令将覆写旧值。
有小伙伴肯定会迷惑万一set value 成功 set time失落败,那不就傻了么,这啊Redis官网想到了。
setex是一个原子性(atomic)操作,关联值和设置生存韶光两个动作会在同一韶光内完成。
我设置了10秒的失落效韶光,ttl命令可以查看倒计时,负的解释已经到期了。
跟大家讲这两个命名也是有缘故原由的,由于他们是Redis实现分布式锁的关键。
正文开始前还是看看场景:
我依然是创建了很多个线程去扣减库存inventory,不出意外的库存扣减顺序变了,终极的结果也是不对的。
单机加synchronized或者Lock这些常规操作我就不说了好吧,结果肯定是对的。
我先实现一个大略的Redis锁,然后我们再实现分布式锁,可能更方便大家的理解。
还记得上面我说过的命令么,实现一个单机的实在比较大略,你们先思考一下,别往下看。
setnx可以看到,第一个成功了,没开释锁,后面的都失落败了,至少顺序问题问题是办理了,只要加锁,缩放后面的拿到,开释如此循环,就能担保按照顺序实行。
但是你们也创造问题了,还是一样的,第一个仔set成功了,但是溘然挂了,那锁就一贯在那无法得到开释,后面的线程也永久得不到锁,又去世锁了。
以是....
setex知道我之前说这个命令的缘故原由了吧,设置一个过期韶光,就算线程1挂了,也会在失落效韶光到了,自动开释。
我这里就用到了nx和px的结合参数,便是set值并且加了过期韶光,这里我还设置了一个过期韶光,便是这韶光内如果第二个没拿到第一个的锁,就退出壅塞了,由于可能是客户端断连了。
加锁
整体加锁的逻辑比较大略,大家基本上都能看懂,不过我拿到当前韶光去减开始韶光的操作觉得有点笨, System.currentTimeMillis()花费很大的。
/加锁@paramid@return/publicbooleanlock(Stringid){Longstart=System.currentTimeMillis();try{for(;;){//SET命令返回OK,则证明获取锁成功Stringlock=jedis.set(LOCK_KEY,id,params);if("OK".equals(lock)){returntrue;}//否则循环等待,在timeout韶光内仍未获取到锁,则获取失落败longl=System.currentTimeMillis()-start;if(l>=timeout){returnfalse;}try{Thread.sleep(100);}catch(InterruptedExceptione){e.printStackTrace();}}}finally{jedis.close();}}
System.currentTimeMillis花费大,每个线程进来都这样,我之前写代码,就会在做事器启动的时候,开一个线程不断去拿,调用方直接获取值就好了,不过也不是最优解,日期类还是有很多好方法的。
@ServicepublicclassTimeServcie{privatestaticlongtime;static{newThread(newRunnable(){@Overridepublicvoidrun(){while(true){try{Thread.sleep(5);}catch(InterruptedExceptione){e.printStackTrace();}longcur=System.currentTimeMillis();setTime(cur);}}}).start();}publicstaticlonggetTime(){returntime;}publicstaticvoidsetTime(longtime){TimeServcie.time=time;}}
解锁
解锁的逻辑更加大略,便是一段Lua的拼装,把Key做了删除。
你们创造没,我上面加锁解锁都用了UUID,这便是为了担保,谁加锁了谁解锁,假如你删掉了我的锁,那不乱套了嘛。
LUA是原子性的,也比较大略,便是判断一下Key和我们参数是否相等,是的话就删除,返回成功1,0便是失落败。
/解锁@paramid@return/publicbooleanunlock(Stringid){Stringscript="ifredis.call('get',KEYS[1])==ARGV[1]then"+"returnredis.call('del',KEYS[1])"+"else"+"return0"+"end";try{Stringresult=jedis.eval(script,Collections.singletonList(LOCK_KEY),Collections.singletonList(id)).toString();return"1".equals(result)?true:false;}finally{jedis.close();}}
验证
我们可以用我们写的Redis锁试试效果,可以看到都按照顺序去实行了
思考
大家是不是以为完美了,但是上面的锁,有不少瑕疵的,我没思考很多点,你或容许以思考一下,源码我都开源到我的GItHub了。
而且,锁一样平常都是须要可重入行的,上面的线程都是实行完了就开释了,无法再次进入了,进去也是重新加锁了,对付一个锁的设计来说肯定不是很合理的。
我不打算手写,由于都有现成的,别人帮我们写好了。
redissonredisson的锁,就实现了可重入了,但是他的源码比较晦涩难懂。
利用起来很大略,由于他们底层都封装好了,你连接上你的Redis客户端,他帮你做了我上面写的统统,然后更完美。
大略看看他的利用吧,跟正常利用Lock没啥差异。
ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(inventory,inventory,10L,SECONDS,linkedBlockingQueue);longstart=System.currentTimeMillis();Configconfig=newConfig();config.useSingleServer().setAddress("redis://127.0.0.1:6379");finalRedissonClientclient=Redisson.create(config);finalRLocklock=client.getLock("lock1");for(inti=0;i<=NUM;i++){threadPoolExecutor.execute(newRunnable(){publicvoidrun(){lock.lock();inventory--;System.out.println(inventory);lock.unlock();}});}longend=System.currentTimeMillis();System.out.println("实行线程数:"+NUM+"总耗时:"+(end-start)+"库存数为:"+inventory);
上面可以看到我用到了getLock,实在便是获取一个锁的实例。
RedissionLock也没做啥,便是熟习的初始化。
publicRLockgetLock(Stringname){returnnewRedissonLock(connectionManager.getCommandExecutor(),name);}publicRedissonLock(CommandAsyncExecutorcommandExecutor,Stringname){super(commandExecutor,name);//命令实行器this.commandExecutor=commandExecutor;//UUID字符串this.id=commandExecutor.getConnectionManager().getId();//内部锁过期韶光this.internalLockLeaseTime=commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();this.entryName=id+":"+name;}
加锁
有没有创造很多跟Lock很多相似的地方呢?
考试测验加锁,拿到当前哨程,然后我开头说的ttl也看到了,是不是统统都是那么熟习?
publicvoidlockInterruptibly(longleaseTime,TimeUnitunit)throwsInterruptedException{//当前哨程IDlongthreadId=Thread.currentThread().getId();//考试测验获取锁Longttl=tryAcquire(leaseTime,unit,threadId);//如果ttl为空,则证明获取锁成功if(ttl==null){return;}//如果获取锁失落败,则订阅到对应这个锁的channelRFuture<RedissonLockEntry>future=subscribe(threadId);commandExecutor.syncSubscription(future);try{while(true){//再次考试测验获取锁ttl=tryAcquire(leaseTime,unit,threadId);//ttl为空,解释成功获取锁,返回if(ttl==null){break;}//ttl大于0则等待ttl韶光后连续考试测验获取if(ttl>=0){getEntry(threadId).getLatch().tryAcquire(ttl,TimeUnit.MILLISECONDS);}else{getEntry(threadId).getLatch().acquire();}}}finally{//取消对channel的订阅unsubscribe(future,threadId);}//get(lockAsync(leaseTime,unit));}
获取锁
获取锁的时候,也比较大略,你可以看到,他也是不断刷新过期韶光,跟我上面不断去拿当前韶光,校验过期是一个道理,只是我比较粗糙。
private<T>RFuture<Long>tryAcquireAsync(longleaseTime,TimeUnitunit,finallongthreadId){//如果带有过期韶光,则按照普通办法获取锁if(leaseTime!=-1){returntryLockInnerAsync(leaseTime,unit,threadId,RedisCommands.EVAL_LONG);}//先按照30秒的过期韶光来实行获取锁的方法RFuture<Long>ttlRemainingFuture=tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS,threadId,RedisCommands.EVAL_LONG);//如果还持有这个锁,则开启定时任务不断刷新该锁的过期韶光ttlRemainingFuture.addListener(newFutureListener<Long>(){@OverridepublicvoidoperationComplete(Future<Long>future)throwsException{if(!future.isSuccess()){return;}LongttlRemaining=future.getNow();//lockacquiredif(ttlRemaining==null){scheduleExpirationRenewal(threadId);}}});returnttlRemainingFuture;}
底层加锁逻辑
你可能会想这么多操作,在一起不是原子性不还是有问题么?
大佬们肯定想得到呀,以是还是LUA,他利用了Hash的数据构造。
紧张是判断锁是否存在,存在就设置过期韶光,如果锁已经存在了,那比拟一下线程,线程是一个那就证明可以重入,锁在了,但是不是当前哨程,证明别人还没开释,那就把剩余韶光返回,加锁失落败。
是不是有点绕,多理解一遍。
<T>RFuture<T>tryLockInnerAsync(longleaseTime,TimeUnitunit,longthreadId,RedisStrictCommand<T>command){//过期韶光internalLockLeaseTime=unit.toMillis(leaseTime);returncommandExecutor.evalWriteAsync(getName(),LongCodec.INSTANCE,command,//如果锁不存在,则通过hset设置它的值,并设置过期韶光"if(redis.call('exists',KEYS[1])==0)then"+"redis.call('hset',KEYS[1],ARGV[2],1);"+"redis.call('pexpire',KEYS[1],ARGV[1]);"+"returnnil;"+"end;"+//如果锁已存在,并且锁的是当前哨程,则通过hincrby给数值递增1"if(redis.call('hexists',KEYS[1],ARGV[2])==1)then"+"redis.call('hincrby',KEYS[1],ARGV[2],1);"+"redis.call('pexpire',KEYS[1],ARGV[1]);"+"returnnil;"+"end;"+//如果锁已存在,但并非本线程,则返回过期韶光ttl"returnredis.call('pttl',KEYS[1]);",Collections.<Object>singletonList(getName()),internalLockLeaseTime,getLockName(threadId));}
解锁
锁的开释紧张是publish开释锁的信息,然后做校验,一样会判断是否当前哨程,成功就开释锁,还有个hincrby递减的操作,锁的值大于0解释是可重入锁,那就刷新过期韶光。
如果值小于0了,那删掉Key开释锁。
是不是又和AQS很像了?
AQS便是通过一个volatile润色status去看锁的状态,也会看数值判断是否是可重入的。
以是我说代码的设计,末了就万剑归一,都是一样的。
publicRFuture<Void>unlockAsync(finallongthreadId){finalRPromise<Void>result=newRedissonPromise<Void>();//解锁方法RFuture<Boolean>future=unlockInnerAsync(threadId);future.addListener(newFutureListener<Boolean>(){@OverridepublicvoidoperationComplete(Future<Boolean>future)throwsException{if(!future.isSuccess()){cancelExpirationRenewal(threadId);result.tryFailure(future.cause());return;}//获取返回值BooleanopStatus=future.getNow();//如果返回空,则证明解锁的线程和当前锁不是同一个线程,抛出非常if(opStatus==null){IllegalMonitorStateExceptioncause=newIllegalMonitorStateException("attempttounlocklock,notlockedbycurrentthreadbynodeid:"+id+"thread-id:"+threadId);result.tryFailure(cause);return;}//解锁成功,取消刷新过期韶光的那个定时任务if(opStatus){cancelExpirationRenewal(null);}result.trySuccess(null);}});returnresult;}protectedRFuture<Boolean>unlockInnerAsync(longthreadId){returncommandExecutor.evalWriteAsync(getName(),LongCodec.INSTANCE,EVAL,//如果锁已经不存在,发布锁开释的"if(redis.call('exists',KEYS[1])==0)then"+"redis.call('publish',KEYS[2],ARGV[1]);"+"return1;"+"end;"+//如果开释锁的线程和已存在锁的线程不是同一个线程,返回null"if(redis.call('hexists',KEYS[1],ARGV[3])==0)then"+"returnnil;"+"end;"+//通过hincrby递减1的办法,开释一次锁//若剩余次数大于0,则刷新过期韶光"localcounter=redis.call('hincrby',KEYS[1],ARGV[3],-1);"+"if(counter>0)then"+"redis.call('pexpire',KEYS[1],ARGV[2]);"+"return0;"+//否则证明锁已经开释,删除key并发布锁开释的"else"+"redis.call('del',KEYS[1]);"+"redis.call('publish',KEYS[2],ARGV[1]);"+"return1;"+"end;"+"returnnil;",Arrays.<Object>asList(getName(),getChannelName()),LockPubSub.unlockMessage,internalLockLeaseTime,getLockName(threadId));}
总结
这个写了比较久,但是不是由于繁芜什么的,是由于个人事情的缘故原由,最近事情很多嘛,还是那句话,程序员才是我的本职写文章只是个爱好,不能本末倒置了。
大家会创造,你学懂一个技能栈之后,学新的会很快,而且也能创造他们的设计思想和技巧真的很奥妙,也总能找到相似点,和让你惊叹的点。
就拿Doug Lea写的AbstractQueuedSynchronizer(AQS)来说,他写了一行代码,你可能看几天才能看懂,大佬们的思想是真的牛。
我看源码有时候也头疼,但是去谷歌一下,自己理解一下,溘然恍然大悟的时候以为统统又很值。
学习便是一条时而郁郁寡欢,时而开环大笑的路,大家加油,我们发展路上一起共勉。
我是敖丙,一个在互联网搪塞塞责的工具人。
最好的关系是相互造诣,大家的「三连」便是丙丙创作的最大动力,我们下期见!
注:如果本篇博客有任何缺点和建议,欢迎人才们留言,你快说句话啊!
你知道的越多,你不知道的越多