上述的问题产生的缘故原由紧张便是从Redis拿数去到扣减优惠券数量不是原子性操作,在以往的单体项目中,办理这种问题可以采取synchronized同步锁的办法去实现:
@Slf4j@RestControllerpublic class MarketCouponController { @Resource private Redisson redisson; @Resource private StringRedisTemplate redisTemplate; @RequestMapping("/deductCouponTicket") public String deductCouponTicket(String couponCode) { String lockKey = "coupon:ticket" + couponCode; synchronized (this) { int couponTicketCount = Integer.parseInt(redisTemplate.opsForValue().get(lockKey)); if (couponTicketCount > 0) { int realCouponTicketCount = couponTicketCount - 1; log.info("用户抢券成功,扣减优惠券数量,剩余量:" + realCouponTicketCount); redisTemplate.opsForValue().set(lockKey, realCouponTicketCount + ""); } else { log.error("用户抢券失落败,优惠券已被抢光!
"); } } return "end"; }}复制代码
代码思考剖析:当多线程并发访问的时候,实行到synchronized(this)位置的时候,只会有一个线程得到锁,进入synchronized代码块中实行业务逻辑,当该线程实行完成之后才会开释锁,等下一个线程进来又会重新上锁实行该段代码,大略来说,通过synchronized锁机制实现多线程排队实行代码块的代码,从而担保了线程的安全。
以上的做法,如果是后端做事仅支配在一台机器的情形下(也便是单机***,这样的实现是没有问题的,但是在如今的互联网公司,面对这些高并发的场景,后端做事肯定是不仅支配到做事器上的,最好都有两台乃至更多机子来构建集群的架构,那么采取synchronized加锁的办法很明显办理不了当前的资源访问掌握的问题了。可以通过jmeter去仿照并发要求,在集群模式并发场景下,还是会涌现数据不一致的问题。

若后端是两个微做事构成的做事集群,由于synchronize代码块只能在同一个JVM进程中生效,两个要求能够同时进两个做事,以是上面代码中的synchronized就一点浸染没有了。synchronized和juc包下个那些锁都是只能用于JVM进程维度的锁,并不能利用在集群或分布式支配的环境中。
4.2 集群模式办理并发问题通过上面的实验很随意马虎创造通过synchronized等JVM进程级别的锁并不能办理分布式场景的并发问题,而正由于须要办理这种场景下的问题,才有了分布式锁的涌现。
可以通过Redis的SETNX命令(只在键key不存在的情形下,将键key的值设置为value。若键key已经存在,则SETNX命令不做任何动作)来办理,紧张办理上面 集议论况下锁不唯一的问题。
@Slf4j@RestControllerpublic class MarketCouponController { @Resource private Redisson redisson; @Resource private StringRedisTemplate redisTemplate; @RequestMapping("/deductCouponTicket") public String deductCouponTicket(String couponCode) { String lockKey = "coupon:ticket" + couponCode; // redis setnx操作 Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, "123456"); if (Boolean.FALSE.equals(result)) { return "error"; } int couponTicketCount = Integer.parseInt(redisTemplate.opsForValue().get(lockKey)); if (couponTicketCount > 0) { int realCouponTicketCount = couponTicketCount - 1; log.info("用户抢券成功,扣减优惠券数量,剩余量:" + realCouponTicketCount); redisTemplate.opsForValue().set(lockKey, realCouponTicketCount + ""); } else { log.error("用户抢券失落败,优惠券已被抢光!
"); } redisTemplate.delete(lockKey); return "end"; }}复制代码
代码思考剖析:看到上述代码的实现,通过redis setnx分布式锁的实现,实际上该段代码还是存在问题的,便是当实行扣减余票操作时,若业务代码报了非常,那么就会导致后面的删除Redis的key代码没有实行到,就会使Redis的key没有删掉的情形,那么Redis的这个key就会一贯存在Redis中,后面的线程再进来实行下面这行代码都是实行不堪利的,就会 导致线程去世锁,会导致后面进来的线程一贯拿不到锁,一贯要求不到资源,那么问题就会很严重了。
为理解决上述问题实在很大略,只要加上一个try...finally即可,这样业务代码纵然抛了非常也可以正常的开释锁。setnx + try ... finally办理,详细代码如下:
@Slf4j@RestControllerpublic class MarketCouponController { @Resource private Redisson redisson; @Resource private StringRedisTemplate redisTemplate; @RequestMapping("/deductCouponTicket") public String deductCouponTicket(String couponCode) { String lockKey = "coupon:ticket" + couponCode; try { // redis setnx操作 Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, "123456"); if (Boolean.FALSE.equals(result)) { return "error"; } int couponTicketCount = Integer.parseInt(redisTemplate.opsForValue().get(lockKey)); if (couponTicketCount > 0) { int realCouponTicketCount = couponTicketCount - 1; log.info("用户抢券成功,扣减优惠券数量,剩余量:" + realCouponTicketCount); redisTemplate.opsForValue().set(lockKey, realCouponTicketCount + ""); } else { log.error("用户抢券失落败,优惠券已被抢光!
"); } } catch (Exception e) { e.printStackTrace(); } finally { //开释锁 redisTemplate.delete(lockKey); } return "end"; }}复制代码
代码思考剖析:上述业务代码实行报错的问题办理了,但是又会有新的问题,当程序实行到try代码块中某个位置做事宕机或者做事重新发布,这样就还是会有上述的Redis的key没有删掉导致去世锁的情形。这样可以利用Redis的过期韶光来进行设置key,setnx + 过期韶光办理,如下代码所示:
@Slf4j@RestControllerpublic class MarketCouponController { @Resource private Redisson redisson; @Resource private StringRedisTemplate redisTemplate; @RequestMapping("/deductCouponTicket") public String deductCouponTicket(String couponCode) { String lockKey = "coupon:ticket" + couponCode; try { // redis setnx操作 Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, "123456"); //设置过期韶光 redisTemplate.expire(lockKey, 3, TimeUnit.SECONDS); if (Boolean.FALSE.equals(result)) { return "error"; } int couponTicketCount = Integer.parseInt(redisTemplate.opsForValue().get(lockKey)); if (couponTicketCount > 0) { int realCouponTicketCount = couponTicketCount - 1; log.info("用户抢券成功,扣减优惠券数量,剩余量:" + realCouponTicketCount); redisTemplate.opsForValue().set(lockKey, realCouponTicketCount + ""); } else { log.error("用户抢券失落败,优惠券已被抢光!
"); } } catch (Exception e) { e.printStackTrace(); } finally { //开释锁 redisTemplate.delete(lockKey); } return "end"; }}复制代码
代码思考剖析:上述代码办理了由于程序实行过程中宕机导致的锁没有开释导致的去世锁问题,但是如果代码像上述的这种写法仍旧还是会有问题,当程序实行到第 redisTemplate.opsForValue().setIfAbsent(lockKey, "austin");行时,系统溘然宕机了,此时Redis的过期韶光并没有设置,也会导致线程去世锁的征象。可以用了Redis设置的原子命设置过期韶光的命令,原子性过期韶光的setnx命令,如下代码所示:
@Slf4j@RestControllerpublic class MarketCouponController { @Resource private Redisson redisson; @Resource private StringRedisTemplate redisTemplate; @RequestMapping("/deductCouponTicket") public String deductCouponTicket(String couponCode) { String lockKey = "coupon:ticket" + couponCode; try { // redis setnx + 过期韶光 Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, "123456",3,TimeUnit.SECONDS); if (Boolean.FALSE.equals(result)) { return "error"; } int couponTicketCount = Integer.parseInt(redisTemplate.opsForValue().get(lockKey)); if (couponTicketCount > 0) { int realCouponTicketCount = couponTicketCount - 1; log.info("用户抢券成功,扣减优惠券数量,剩余量:" + realCouponTicketCount); redisTemplate.opsForValue().set(lockKey, realCouponTicketCount + ""); } else { log.error("用户抢券失落败,优惠券已被抢光!
"); } } catch (Exception e) { e.printStackTrace(); } finally { //开释锁 redisTemplate.delete(lockKey); } return "end"; }}复制代码
代码思考剖析:通过设置原子性过期韶光命令可以很好的办理上述这种程序实行过程中溘然宕机的情形。这种Redis分布式锁的实现看似已经没有问题了,一样平常软件公司并发量不是很高的情形下,这种实现分布式锁的办法已经够用了,纵然出了些小的数据不一致的问题,也是能够接管的,但是 如果是在高并发的场景下,上述的这种实现办法还是会存在很大问题。
这里我们举个大略解释一下该办法存在的毛病问题:
线程A抢占锁线程A抢占了锁,并设置了这个锁的过期韶光为10秒,也便是10秒后自动开锁,锁的编号为123456。10秒往后,线程A业务还在实行,此时锁就被自动打开了。
线程B抢占锁线程B进来创造锁已经被打开了,于是抢占了锁,设置锁的编号为123456,并设置锁的过期韶光为10秒。 线程A在15秒的时候,完成了任务,但此时线程B还在实行任务。线程A主动打开了编号为123456的锁,刚好这个锁也是线程B的锁。。线程B还在实行任务,创造自己的锁已经被打开了。。线程B:我都还没实行完任务呢,为什么我的锁开了?
线程C抢占锁线程B的锁在线程A实行完的时候开释了,此时,线程B还在实行任务。 线程C抢占到了锁,线程C开始实行任务 此时涌现线程B和线程C同时实行一个任务,在实行任务上产生了冲突。
从上面的剖析可以知道,当当前哨程处理 实行业务逻辑所须要的韶光大于锁的过期韶光 ,这时候锁会自动开释,又会被其他线程抢占到锁,当前哨程实行完之后,会把其他线程抢占到的锁误开释。然而,为什么会误打开别的线程的锁呢?由于锁的唯一性,每个锁的编号都是123456,线程只认锁的编号,瞥见编号为123456的就开,结果把别人的锁打开了,这种情形便是锁 没确保唯一性导致的。
办理上述问题实在也很大略,让每个线程加的锁时给Redis设置一个唯一id的value,每次开释锁的时候先判断一下线程的唯一id与Redis 存的值是否相同,若相同即可开释锁。设置线程id的原子性过期韶光的setnx命令, 详细代码如下:
@Slf4j@RestControllerpublic class MarketCouponController { @Resource private Redisson redisson; @Resource private StringRedisTemplate redisTemplate; @RequestMapping("/deductCouponTicket") public String deductCouponTicket(String couponCode) { String lockKey = "coupon:ticket" + couponCode; String threadUniqueKey = UUID.randomUUID().toString(); try { // redis setnx + 唯一编号 + 过期韶光 Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, threadUniqueKey, 3, TimeUnit.SECONDS); if (Boolean.FALSE.equals(result)) { return "error"; } int couponTicketCount = Integer.parseInt(redisTemplate.opsForValue().get(lockKey)); if (couponTicketCount > 0) { int realCouponTicketCount = couponTicketCount - 1; log.info("用户抢券成功,扣减优惠券数量,剩余量:" + realCouponTicketCount); redisTemplate.opsForValue().set(lockKey, realCouponTicketCount + ""); } else { log.error("用户抢券失落败,优惠券已被抢光!
"); } } catch (Exception e) { e.printStackTrace(); } finally { //开释锁 redisTemplate.delete(lockKey); } return "end"; }}复制代码
代码思考剖析:实际上述实现的Redis分布式锁已经能够知足大部分运用处景了,但是还是略有不敷,比如当线程进来须要的实行韶光超过了Redis key的过期韶光,此时锁就已经被开释了,其他线程就可以立马得到锁实行代码,就又会产生bug了。
分布式锁的key过期韶光不管设置成多少都不得当,比如设置为10秒,如果业务代码实行链路较长,亦或是代码存在慢SQL、数据查询量大的情形,那么过期韶光就不好设置,那么这里有没有什么更好的方案呢?答案是有的:锁续命。
那么锁续命方案的原来就在于当线程加锁成功时,会开一个分线程,取锁过期韶光的1/3韶光点定时实行任务,每10s判断一次锁是否存在(即Redis的key),若锁还存在那么就直接重新设置锁的过期韶光,若锁已经不存在了那么就直接结束当前的分线程。
五、Redission框架实现分布式锁上面说的续命锁看起来大略,但是实际上实现还是有一定的难度的,于是类似 Redission 开源框架已经帮我们实现好了,以是不须要再重复造轮子自己去写一个分布式锁了,下面会拿Redission框架举例,学习一下Redission分布式锁的设计思想。
5.1 Redission分布式锁的实现事理Redission实现分布式锁的事理流程图如下图所示,当线程一加锁成功获取到锁,并开启实行业务代码时,Redission框架会开启一个后台线程(所谓的Watch Dog看门狗),每隔锁过期的1/3韶光去定时判断一次是否还持有锁(Redis的key是否还存在),若不持有那么久实行结束当前的后台线程,若还持有锁,那么久重新设置锁的过期韶光,当线程一加锁成功后,那么线程二就自然获取不到锁,此时线程二就会做类似CAS的自旋操作,一贯等待线程一开释锁之后才能加锁成功。
其余,Redison底层实现分布式锁时利用了大量的lua脚本担保了其加锁操作的各种原子性。Redison实现分布式锁利用lua脚本的好处紧张是能担保Redis的操作是原子性的,Redis会将全体脚本作为一个整体实行,中间不会被其他命令插入。
5.2 Redission分布式锁的实现
@Slf4j@RestControllerpublic class MarketCouponController { @Resource private Redisson redisson; @Resource private StringRedisTemplate redisTemplate; / Redission分布式锁实现 @author: jacklin @since 2022/8/18 11:25 / @RequestMapping("/deductCouponTicket") public String deductCouponTicket(String couponCode) { String lockKey = "coupon:ticket" + couponCode; RLock rlock = redisson.getLock(lockKey); try { rlock.lock(); int couponTicketCount = Integer.parseInt(redisTemplate.opsForValue().get(lockKey)); if (couponTicketCount > 0) { int realCouponTicketCount = couponTicketCount - 1; log.info("用户抢券成功,扣减优惠券数量,剩余量:" + realCouponTicketCount); redisTemplate.opsForValue().set(lockKey, realCouponTicketCount + ""); } else { log.error("用户抢券失落败,优惠券已被抢光!
5.3 Redission利用lua脚本加锁核心源码剖析
"); } } catch (Exception e) { e.printStackTrace(); } finally { //开释锁 rlock.unlock(); } return "end"; }}复制代码
方法名为tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command):
// 利用lua脚本加锁// getName()传入KEYS[1],表示传入解锁的keyName,这里是 String lockKey = coupon:ticket" + couponCode;// internalLockLeaseTime传入ARGV[1],表示锁的超时时间,默认是30秒// getLockName(threadId)传入ARGV[2],表示锁的唯一标识线程id<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { //当第一个线程要求进来时会直接实行这段逻辑 return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command, //判断传入的Redis的key是否存在,即String lockKey = "coupon:ticket" + couponCode; "if (redis.call('exists', KEYS[1]) == 0) then " + //如果不存在那么就设置这个key为传入值、当前哨程id 即参数ARGV[2]的值(即getLockName(threahId),并且将线程id的value值设置为1) "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + //给这个key设置有效韶光,有效韶光即参数ARGV[1](即internalLockLeaseTime的值)的韶光 "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + //当第二个线程进来,Redis中的key已经存在(锁已经存在),那么直接进这段逻辑 //判断这个Redis key是否存在且当前的这个key是否是当前哨程设置的 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + //如果是的话,那么就进入重入锁的逻辑,利用hincrby指令将第一个线程进来将线程id的value值设置为1再加1 //然后每次开释锁的时候就会减1,直到这个值为0,这把锁就开释了,这点与juc的可重锁类似 "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});}复制代码
六、总结
本文通过单机环境本地锁->集议论况锁演化->分布式锁的延伸,深入浅出的先容了不同办法实现分布式锁的问题和改进之处,通过不同的例子详细展开解释,并且剖析了Redission分布式锁的实现事理和实现。
链接:https://juejin.cn/post/7133062101387444260