实现一个Redis分布式锁

实现一个Redis分布式锁

作者:佚名 2020-07-30 09:35:09

运维

数据库运维

分布式

Redis 在我们日常开发中,难免会遇到要加锁的情景。例如扣除产品库存,首先要从数据库中取出库存,进行库存判断,再减去库存。这一波操作明显不符合原子性,如果代码块不加锁,很容易因为并发导致超卖问题。

[[335642]]

前言

在我们日常开发中,难免会遇到要加锁的情景。例如扣除产品库存,首先要从数据库中取出库存,进行库存判断,再减去库存。这一波操作明显不符合原子性,如果代码块不加锁,很容易因为并发导致超卖问题。咱们的系统如果是单体架构,那我们使用本地锁就可以解决问题。如果是分布式架构,就需要使用分布式锁。

方案

使用 SETNX 和 EXPIRE 命令 

  1. SETNX key value  
  2. EXPIRE key seconds  
  3. DEL key  
  4. if (setnx("item_1_lock", 1)) {  
  5.     expire("item_1_lock", 30);  
  6.     try {  
  7.         ... 逻辑  
  8.     } catch {  
  9.         ...  
  10.     } finally {  
  11.         del("item_1_lock");  
  12.     }  

这种方法看起来可以解决问题,但是有一定的风险,因为 SETNX 和 EXPIRE 这波操作是非原子性的,如果 SETNX 成功之后,出现错误,导致 EXPIRE 没有执行,导致锁没有设置超时时间形成死锁。

针对这种情况,我们可以使用 lua 脚本来保持操作原子性,保证 SETNX 和 EXPIRE 两个操作要么都成功,要么都不成功。 

  1. if (redis.call('setnx', KEYS[1], ARGV[1]) < 1 
  2. then return 0;  
  3. end;  
  4. redis.call('expire', KEYS[1], tonumber(ARGV[2]));  
  5. return 1; 

通过这样的方法,我们初步解决了竞争锁的原子性问题,虽然其他功能还未实现,但是应该不会造成死锁🤪🤪🤪。

Redis 2.6.12 以上可灵活使用 SET 命令 

  1. SET key value NX EX 30  
  2. DEL key  
  3. if (set("item_1_lock", 1, "NX", "EX", 30)) {  
  4.     try {  
  5.         ... 逻辑  
  6.     } catch {  
  7.         ...  
  8.     } finally {  
  9.         del("item_1_lock");  
  10.     }  

改进后的方法不需要借助 lua 脚本就解决了 SETNX 和 EXPIRE 的原子性问题。现在我们再仔细琢磨琢磨,如果 A 拿到了锁顺利进入代码块执行逻辑,但是由于各种原因导致超时自动释放锁。在这之后 B 成功拿到了锁进入代码块执行逻辑,但此时如果 A 执行逻辑完毕再来释放锁,就会把 B 刚获得的锁释放了。就好比用自己家的钥匙开了别家的门,这是不可接受的。

为了解决这个问题我们可以尝试在 SET 的时候设置一个锁标识,然后在 DEL 的时候验证当前锁是否为自己的锁。 

  1. String value = UUID.randomUUID().toString().replaceAll("-", "");  
  2. if (set("item_1_lock", value, "NX", "EX", 30)) {  
  3.     try {  
  4.         ... 逻辑  
  5.     } catch {  
  6.         ...  
  7.     } finally {  
  8.         ... lua 脚本保证原子性  
  9.     }  
  10.  
  11. if (redis.call('get', KEYS[1]) == ARGV[1])  
  12. then return redis.call('del', KEYS[1])  
  13. else return 0  
  14. end 

到这里,我们终于解决了竞争锁的原子性问题和误删锁问题。但是锁一般还需要支持可重入、循环等待和超时自动续约等功能点。下面我们学习使用一个非常好用的包来解决这些问题。

入门 Redisson

Redission 的锁,实现了可重入和超时自动续约功能,它都帮我们封装好了,我们只要按照自己的需求调用它的 API 就可以轻松实现上面所提到的几个功能点。详细功能可以查看 Redisson 文档

在项目中安装 Redisson 

  1. <dependency>  
  2.     <groupId>org.redisson</groupId>  
  3.     <artifactId>redisson</artifactId>  
  4.     <version>3.13.2</version>  
  5. </dependency>  
  1. implementation 'org.redisson:redisson:3.13.2' 

用 Maven 或者 Gradle 构建,目前最新版本为 3.13.2,也可以在这里 Redisson 找到你需要的版本。

简单尝试 

  1. RedissonClient redissonClient = Redisson.create();  
  2. RLock lock = redissonClient.getLock("lock");  
  3. boolean res = lock.lock();  
  4. if (res) {  
  5.    try {  
  6.      ... 逻辑  
  7.    } finally {  
  8.        lock.unlock();  
  9.    }  

Redisson 将底层逻辑全部做了一个封装 ,我们无需关心具体实现,几行代码就能使用一把完美的锁。下面我们简单折腾折腾源码 🤔🤔🤔。

加锁 

  1. private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {  
  2.     long threadId = Thread.currentThread().getId();  
  3.     Long ttl = tryAcquire(leaseTime, unit, threadId);  
  4.     if (ttl == null) { 
  5.          return;  
  6.     }  
  7.     RFuture<RedissonLockEntry> future = subscribe(threadId);  
  8.     if (interruptibly) {  
  9.         commandExecutor.syncSubscriptionInterrupted(future);  
  10.     } else {  
  11.         commandExecutor.syncSubscription(future);  
  12.     }  
  13.     try {  
  14.         while (true) {  
  15.             ttl = tryAcquire(leaseTime, unit, threadId);  
  16.             if (ttl == null) {  
  17.                 break;  
  18.             }  
  19.             if (ttl >= 0) {  
  20.                 try {  
  21.                     future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);  
  22.                 } catch (InterruptedException e) {  
  23.                     if (interruptibly) {  
  24.                         throw e;  
  25.                     }  
  26.                     future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);  
  27.                 }  
  28.             } else {  
  29.                 if (interruptibly) {  
  30.                     future.getNow().getLatch().acquire();  
  31.                 } else {  
  32.                     future.getNow().getLatch().acquireUninterruptibly();  
  33.                 }  
  34.             } 
  35.         }  
  36.     } finally {  
  37.         unsubscribe(future, threadId);  
  38.     }  

获取锁 

  1. private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {  
  2.     if (leaseTime != -1) {  
  3.         return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);  
  4.     }  
  5.     RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); 
  6.      ttlRemainingFuture.onComplete((ttlRemaining, e) -> {  
  7.         if (e != null) {  
  8.             return;  
  9.         }  
  10.         if (ttlRemaining == null) {  
  11.             scheduleExpirationRenewal(threadId);  
  12.         }  
  13.     });  
  14.     return ttlRemainingFuture;  
  15.  
  16. <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {  
  17.     internalLockLeaseTime = unit.toMillis(leaseTime);  
  18.     return evalWriteAsync(getName(), LongCodec.INSTANCE, command,  
  19.             "if (redis.call('exists', KEYS[1]) == 0) then " +  
  20.                     "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +  
  21.                     "redis.call('pexpire', KEYS[1], ARGV[1]); " +  
  22.                     "return nil; " +  
  23.                     "end; " +  
  24.                     "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +  
  25.                     "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +  
  26.                     "redis.call('pexpire', KEYS[1], ARGV[1]); " +  
  27.                     "return nil; " +  
  28.                     "end; " +  
  29.                     "return redis.call('pttl', KEYS[1]);",  
  30.             Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));  

删除锁 

  1. public RFuture<Void> unlockAsync(long threadId) {  
  2.     RPromise<Void> result = new RedissonPromise<Void>();  
  3.     RFuture<Boolean> future = unlockInnerAsync(threadId);  
  4.     future.onComplete((opStatus, e) -> {  
  5.         cancelExpirationRenewal(threadId);  
  6.         if (e != null) { 
  7.              result.tryFailure(e);  
  8.             return;  
  9.         }  
  10.         if (opStatus == null) { 
  11.              IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "  
  12.                     + id + " thread-id: " + threadId);  
  13.             result.tryFailure(cause);  
  14.             return; 
  15.          }  
  16.         result.trySuccess(null);  
  17.     });  
  18.     return result;  
  19.  
  20. protected RFuture<Boolean> unlockInnerAsync(long threadId) {  
  21.     return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,  
  22.             "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +  
  23.                     "return nil;" +  
  24.                     "end; " +  
  25.                     "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +  
  26.                     "if (counter > 0) then " +  
  27.                     "redis.call('pexpire', KEYS[1], ARGV[2]); " +  
  28.                     "return 0; " +  
  29.                     "else " +  
  30.                     "redis.call('del', KEYS[1]); " +  
  31.                     "redis.call('publish', KEYS[2], ARGV[1]); " +  
  32.                     "return 1; " +  
  33.                     "end; " +  
  34.                     "return nil;",  
  35.             Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));  

总结

使用 Redis 做分布式锁来解决并发问题仍存在一些困难,也有很多需要注意的点,我们应该正确评估系统的体量,不能为了使用某项技术而用。要完全解决并发问题,仍需要在数据库层面做功夫。 

 

文章来源网络,作者:运维,如若转载,请注明出处:https://shuyeidc.com/wp/239341.html<

(0)
运维的头像运维
上一篇2025-04-23 08:59
下一篇 2025-04-23 09:00

相关推荐

  • 个人主题怎么制作?

    制作个人主题是一个将个人风格、兴趣或专业领域转化为视觉化或结构化内容的过程,无论是用于个人博客、作品集、社交媒体账号还是品牌形象,核心都是围绕“个人特色”展开,以下从定位、内容规划、视觉设计、技术实现四个维度,详细拆解制作个人主题的完整流程,明确主题定位:找到个人特色的核心主题定位是所有工作的起点,需要先回答……

    2025-11-20
    0
  • 社群营销管理关键是什么?

    社群营销的核心在于通过建立有温度、有价值、有归属感的社群,实现用户留存、转化和品牌传播,其管理需贯穿“目标定位-内容运营-用户互动-数据驱动-风险控制”全流程,以下从五个维度展开详细说明:明确社群定位与目标社群管理的首要任务是精准定位,需明确社群的核心价值(如行业交流、产品使用指导、兴趣分享等)、目标用户画像……

    2025-11-20
    0
  • 香港公司网站备案需要什么材料?

    香港公司进行网站备案是一个涉及多部门协调、流程相对严谨的过程,尤其需兼顾中国内地与香港两地的监管要求,由于香港公司注册地与中国内地不同,其网站若主要服务内地用户或使用内地服务器,需根据服务器位置、网站内容性质等,选择对应的备案路径(如工信部ICP备案或公安备案),以下从备案主体资格、流程步骤、材料准备、注意事项……

    2025-11-20
    0
  • 如何企业上云推广

    企业上云已成为数字化转型的核心战略,但推广过程中需结合行业特性、企业痛点与市场需求,构建系统性、多维度的推广体系,以下从市场定位、策略设计、执行落地及效果优化四个维度,详细拆解企业上云推广的实践路径,精准定位:明确目标企业与核心价值企业上云并非“一刀切”的方案,需先锁定目标客户群体,提炼差异化价值主张,客户分层……

    2025-11-20
    0
  • PS设计搜索框的实用技巧有哪些?

    在PS中设计一个美观且功能性的搜索框需要结合创意构思、视觉设计和用户体验考量,以下从设计思路、制作步骤、细节优化及交互预览等方面详细说明,帮助打造符合需求的搜索框,设计前的规划明确使用场景:根据网站或APP的整体风格确定搜索框的调性,例如极简风适合细线条和纯色,科技感适合渐变和发光效果,电商类则可能需要突出搜索……

    2025-11-20
    0

发表回复

您的邮箱地址不会被公开。必填项已用 * 标注