从锁的原理到构建分布式锁


声明:本文转载自https://my.oschina.net/u/3070387/blog/1559876,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

最近在工作中遇到一个复杂的业务,必须进行加锁,使逻辑串行化,但是在负载均衡下java传统的锁是没有意义的,必须使用分布式锁。分布式锁的方案在网络上有许多,这里更想探讨这些方案背后的原理。

如何避免竞争条件

两个或多个进程读写某些共享数据,而最后的结果取决于进程运行的精确时序,称为竞争条件。——《现代操作系统》

而锁正是避免竞争条件的解决方案之一。 再列出《现代操作系统》一书中,所成为一个解决竞争条件的好方案的条件:

  1. 任何两个进程不能同时处于其临界区
  2. 不应对CPU的速度和数量做任何假设
  3. 临界区外运行的进程不得阻塞其他进程
  4. 不得使进程无限期等待进入临界区

互斥量

互斥量是一个可以处于两态之一的变量:解锁和加锁。如,用0表示解锁,其他值表示加锁。当一个线程或是进程需要访问临界区时,如果互斥量当前是解锁状态,即线程可以自由进入临界区,并更改互斥量的值,当然这个过程需要是原子性的。 如果互斥量已经加锁,调用线程被阻塞,直到获得锁的线程将互斥量重置。 而我们平时使用的java的ReentrantLock也是基于互斥量的原理,使用CAS更新互斥量,更新成功即获得锁,否则进入阻塞。 根据互斥量的定义和ReentrantLock的实现原理,这里总结一下互斥量的性质:

  1. 互斥量状态判断到变更的过程必须是原子性的
  2. 当无法获取互斥量时,阻塞线程
  3. 鲁棒性

第3点是我加上去的,必须保证拥有鲁棒性,即不会无端发生锁丢失,锁状态的变更等异常情况。这里对应的是为了避免违反上述提到的设计避免竞争条件方案的第4个条件不得使进程无限期等待进入临界区。若然不具备鲁棒性,这种情况必定会发生。

使用redis设计分布式锁

根据上面提到的互斥量概念,我们依此来利用redis设计出。

互斥量状态判断到变更的过程必须是原子性的

这点可以利用redis的一些原子性命令来实现,例如不存在即插入的SETNX命令。 而第二点

当无法获取互斥量时,阻塞线程

如果所实现的仅仅是同一时间只允许一个线程(进程)进入临界区,那么在SETNX命令失败马上返回,即实现tryLock功能。 如果要实现阻塞功能的话,比较困难,因为在多进程情况下,必须跨进程唤醒被阻塞的线程,所以这里利用自旋的方式去多次尝试获取锁,直到超时,为了防止过多的空转浪费CPU资源,可以在自旋过程中加入sleep操作。 最后一点

鲁棒性

这个其实是最难实现的,原子性的操作,大多组件都会有提供,但是要提供一个组件的鲁棒性,必须要考虑到所有情况,并且给出具体的解决方案。关于使用redis实现分布式锁这点,可以参考http://www.cnblogs.com/0201zcr/p/5942748.html

对于分布式锁来说,一般可以跨线程调用锁,即在线程A加锁,在线程B解锁,因为分布式锁覆盖的范围更大,锁的可是进程级。这里要注意是否应该加以限制,因为临界区外运行的进程不得阻塞其他进程,虽然提到的是进程,但在线程级是否应该支持,需要进行考虑。 线程A加锁,而不进入临界区,而创建线程B,由线程B去执行逻辑并且释放锁,这样临界区外的线程A将会阻塞后续到来的线程C、D。当然这样并不会违反临界区外运行的进程不得阻塞其他进程,但是要不要缩小这个范围,限制到线程,就交由实现者去判断。 最后贴出我所写的用Redis实现分布式锁的代码

/**  * Created by BingZhong on 2017/7/29.  *  * 基于Redis实现的分布式锁  */ public final class RedisLockHelper {      private static Logger logger = LoggerFactory.getLogger(RedisLockHelper.class);      /**      * redis操作帮助类,可以是其他封装了redis操作的类      */     private RedisHelper redisHelper;      public static final long DEFAULT_TIMEOUT = 30 * 1000;      public static final long DEFAULT_SLEEP_TIME = 100;      private RedisLockHelper(RedisHelper redisHelper) {         this.redisHelper = redisHelper;     }      public static RedisLockHelper getInstance(RedisHelper redisHelper) {         return new RedisLockHelper(redisHelper);     }      /**      * 创建锁      *      * @param mutex     互斥量      * @param timeout   锁的超时时间      * @param sleepTime 线程自旋尝试获取锁时的休眠时间      * @param timeUnit  时间单位      */     public RedisLock newLock(String mutex, long timeout, long sleepTime, TimeUnit timeUnit) {         logger.info("创建分布式锁,互斥量为{}", mutex);         return new RedisLock(mutex, timeout, sleepTime, timeUnit);     }      public RedisLock newLock(String mutex, long timeout, TimeUnit timeUnit) {         return newLock(mutex, timeout, DEFAULT_SLEEP_TIME, timeUnit);     }      public RedisLock newLock(String mutex) {         return newLock(mutex, DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);     }      public class RedisLock {         /**          * 用于创建redis健值对的键,相当于互斥量          */         private final String mutex;          /**          * 锁过期的绝对时间          */         private volatile long lockExpiresTime = 0;          /**          * 锁的超时时间          */         private final long timeout;          /**          * 每次循环获取锁的休眠时间          */         private final long sleepTime;          /**          * 锁的线程持有者          */         private volatile Thread lockHolder = null;          private final ReentrantLock threadLock = new ReentrantLock();          public RedisLock(String mutex, long timeout, long sleepTime, TimeUnit timeUnit) {             this.mutex = mutex;             this.timeout = timeUnit.toMillis(timeout);             this.sleepTime = timeUnit.toMillis(sleepTime);         }          /**          * 加锁,将会一直尝试获取锁,直到超时          */         public boolean lock(long acquireTimeout, TimeUnit timeUnit) throws InterruptedException {             acquireTimeout = timeUnit.toMillis(acquireTimeout);             long acquireTime = acquireTimeout + System.currentTimeMillis();             threadLock.tryLock(acquireTimeout, timeUnit);             try {                 while (true) {                     boolean hasLock = tryLock();                     if (hasLock) {                         //获取锁成功                         return true;                     } else if (acquireTime < System.currentTimeMillis()) {                         break;                     }                     Thread.sleep(sleepTime);                 }             } finally {                 if (threadLock.isHeldByCurrentThread()) {                     threadLock.unlock();                 }             }              return false;         }          /**          * 尝试获取锁,无论是否获取到锁都将直接返回而不会阻塞          * 不支持重入锁          */         public boolean tryLock() {             if (lockHolder == Thread.currentThread()) {                 throw new IllegalMonitorStateException("不支持重入锁");             }             long currentTime = System.currentTimeMillis();             String expires = String.valueOf(timeout + currentTime);             //尝试设置互斥量             if (redisHelper.setNx(mutex, expires) > 0) {                 setLockStatus(expires);                 return true;             } else {                 String currentLockTime = redisHelper.get(mutex);                 //检查锁是否超时                 if (Objects.nonNull(currentLockTime) && Long.parseLong(currentLockTime) < currentTime) {                     //获取旧的锁时间并设置互斥量                     String oldLockTime = redisHelper.getSet(mutex, expires);                     //判断获取到的旧值是否一致,不一致证明已经有另外的进程(线程)成功获取到了锁                     if (Objects.nonNull(oldLockTime) && Objects.equals(oldLockTime, currentLockTime)) {                         setLockStatus(expires);                         return true;                     }                 }                  return false;             }         }          /**          * 该锁是否被锁住          */         public boolean isLock() {             String currentLockTime = redisHelper.get(mutex);             //存在互斥量且锁还为过时即锁住             return Objects.nonNull(currentLockTime) && Long.parseLong(currentLockTime) > System.currentTimeMillis();         }          public String getMutex() {             return mutex;         }          /**          * 解锁          */         public boolean unlock() {             //只有锁的持有线程才能解锁             if (lockHolder == Thread.currentThread()) {                 //判断锁是否超时,没有超时才将互斥量删除                 if (lockExpiresTime > System.currentTimeMillis()) {                     redisHelper.del(mutex);                     logger.info("删除互斥量[{}]", mutex);                 }                 lockHolder = null;                 logger.info("释放[{}]锁成功", mutex);                  return true;             } else {                 throw new IllegalMonitorStateException("没有获取到锁的线程无法执行解锁操作");             }         }          private void setLockStatus(String expires) {             lockExpiresTime = Long.parseLong(expires);             lockHolder = Thread.currentThread();             logger.info("获取[{}]锁成功", mutex);         }     } }  

总结

很多功能组件,其实都是基于一些概念来进行设计,对于避免竞争条件的方案,还有信号量,管程等,了解这些概念,才能在需要时设计出满足自己需求的组件。

本文发表于2017年11月03日 06:34
(c)注:本文转载自https://my.oschina.net/u/3070387/blog/1559876,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1955 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1