本文主要介绍redisson中对于可重入锁、读写锁、公平锁的实现,并利用spring AOP封装成基于方法级别的注解使用方式。
关于redisson的介绍及其spring boot starter的封装参考:spring boot redisson starter的封装和使用
redisson是一个非常强大的redis客户端,封装了很多针对分布式场景的工具,很多工具都使用了大量的Lua脚本来实现,其中分布式锁也是如此。需要明确的是: Redis使用单个Lua解释器去运行所有脚本,并且,Redis也保证脚本会以原子性(atomic)的方式执行,即:当某个脚本正在运行的时候,不会有其他脚本或Redis命令被执行 。(摘自:EVAL-Redis命令参考)
redisson中的分布式锁实现了jdk中锁的规范,顶层接口主要是RLock和RReadWriteLock,分别继承自jdk中的Lock和ReadWriteLock,redisson中的代码结构如下:


从类名上可以看出各种锁对应的类
可重入锁 RedissonLock
可重入锁是一种特殊的互斥锁,同一个线程可以重复获取到锁而不会阻塞(相应的,多次获取之后也需要相等次数的释放锁的操作)。不同线程获取同一个锁的时候是互斥的。
获取锁
具体实现是在RedissonLock中的tryLockInnerAsync方法,源码如下:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
主要内容是在Lua脚本当中,从中可以看出Lua脚本的实现逻辑,逻辑流程图如下: 
释放锁
具体实现是在RedissonLock中的unlockInnerAsync方法,源码如下:
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }
从Lua脚本中可以看出以下几点逻辑:
- 线程标识:线程是使用uuid和线程id结合起来做唯一标识的,其中uuid是Redisson对象(RedissonClient接口的实现)的属性,即在创建Redisson对象时就生成了。在同一个应用节点当中,线程标识的uuid是相同的,而线程id是不同的;在不同的应用节点当中,线程标识的uuid是不同的,而线程id是可能相同的。总之,在分布式环境当中,线程标识是不会重复的。
- 释放锁的时候只会释放当前线程自己的锁。利用hashKey判断(hashKey是线程标识)
- 一个线程如果多次获取锁,必须有相等次数的释放锁,否则锁不会释放。利用value的增长值做判断
- 释放锁之后,会发布一个消息到指定的channel,channel名称格式为:redisson_lock__channel:{key}
读写锁 ReadWriteLock
读写锁是将资源的操作者分为两类:读者、写者。它允许多个读者同时获取到资源,但只允许一个写者对资源进行操作。
- redisson中的读写锁顶层接口是:RReadWriteLock,继承了jdk中的读写锁接口:ReadWriteLock
- 读锁的实现类是 RedissonReadLock,写锁的实现类是 RedissonWriteLock
- 获取锁方法都是 tryLockInnerAsync,释放锁方法都是 unlockInnerAsync
- 在Redis中都是利用HASH数据结构,锁的key、hashKey与可重入锁类似,释放锁时也与上面几点可重入锁的逻辑类似
- 不同于可重入锁的是,读写锁增加了一个hashKey为mode的数据,值为read或write,用于标识锁的类型
- 获取锁的方式
// 获取读锁 redissonClient.getReadWriteLock(key).readLock() // 获取写锁 redissonClient.getReadWriteLock(key).writeLock()
公平锁 FairLock
公平锁和可重入锁一样继承了jdk中的 java.util.concurrent.locks.Lock 接口,在提供了自动过期解锁功能的同时,保证了当多个线程同时请求加锁时,优先分配给先发出请求的线程。公平锁的实现类是 RedissonFairLock。主要用到的Redis的两种数据结构及其作用
- redisson_lock_queue:{key},List列表,用于存储线程列表,线程重复获取锁时会存在多个元素,保证获取锁的线程的顺序,实现优先分配给先发出请求的线程的功能。
- redisson_lock_timeout:{key},SortedSet有序集合,用于存储线程获取锁时等待的超时时间,SortedSet中的score存储的是获取锁的等待超时时间,值越小说明越先请求获取锁,因此List中的线程顺序和SortedSet中的线程顺序是一致的(但并没有强行要求顺序必须一致)。线程重复获取锁时会重置score的值
获取锁
具体实现的方法是:tryLockInnerAsync,获取锁之前会清理等待超时的线程,Lua脚本如下:
// remove stale threads while true do local firstThreadId2 = redis.call("lindex", KEYS[2], 0); if firstThreadId2 == false then break; end; local timeout = tonumber(redis.call("zscore", KEYS[3], firstThreadId2)); if timeout <= tonumber(ARGV[4]) then redis.call("zrem", KEYS[3], firstThreadId2); redis.call("lpop", KEYS[2]); else break; end; end;
处理逻辑:

加锁处理的Lua源码:
if (redis.call("exists", KEYS[1]) == 0) and ((redis.call("exists", KEYS[2]) == 0) or (redis.call("lindex", KEYS[2], 0) == ARGV[2])) then redis.call("lpop", KEYS[2]); redis.call("zrem", KEYS[3], ARGV[2]); redis.call("hset", KEYS[1], ARGV[2], 1); redis.call("pexpire", KEYS[1], ARGV[1]); return nil; end; if (redis.call("hexists", KEYS[1], ARGV[2]) == 1) then redis.call("hincrby", KEYS[1], ARGV[2], 1); redis.call("pexpire", KEYS[1], ARGV[1]); return nil; end; local firstThreadId = redis.call("lindex", KEYS[2], 0); local ttl; if firstThreadId ~= false and firstThreadId ~= ARGV[2] then ttl = tonumber(redis.call("zscore", KEYS[3], firstThreadId)) - tonumber(ARGV[4]); else ttl = redis.call("pttl", KEYS[1]); end; local timeout = ttl + tonumber(ARGV[3]); if redis.call("zadd", KEYS[3], timeout, ARGV[2]) == 1 then redis.call("rpush", KEYS[2], ARGV[2]); end; return ttl;
释放锁
公平锁的释放同样也遵循可重入锁介绍的几点逻辑,这里就不过多介绍,可以通过源码中的Lua脚本看到具体的实现。释放锁的时候也同样会先移除已等待超时的线程,处理逻辑与获取锁时一样。
以上就是redisson中可重入锁、读写锁、公平锁的一些实现方式。
spring boot starter的封装(基于spring AOP)
了解了以上几种锁的实现方式之后,我们可以结合spring AOP封装成spring boot starter,这样使用起来就会更加方便。此封装是在redisson-spring-boot-starter的基础之上进行的,请先阅读文章开头提到的 spring boot redisson starter的封装和使用
- 首先,引入主要的依赖包,spring AOP支持和redisson-spring-boot-starter
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>com.itopener</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency>
- 既然是基于AOP的封装,需要定义一个方法级别的注解,注解的属性与redisson中分布式锁需要的参数保持一致,但额外增加一个锁类型的枚举,便于支持多种分布式锁源码如下:
import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import java.util.concurrent.TimeUnit; import org.springframework.core.annotation.AliasFor; /** * @author fuwei.deng * @date 2018年1月10日 上午10:47:50 * @version 1.0.0 */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface LockAction { /** 锁的资源,key。支持spring El表达式*/ @AliasFor("key") String value() default "'default'"; @AliasFor("value") String key() default "'default'"; /** 锁类型*/ LockType lockType() default LockType.REENTRANT_LOCK; /** 获取锁等待时间,默认3秒*/ long waitTime() default 3000L; /** 锁自动释放时间,默认30秒*/ long leaseTime() default 30000L; /** 时间单位(获取锁等待时间和持锁时间都用此单位)*/ TimeUnit unit() default TimeUnit.MILLISECONDS; }
关联的枚举定义如下:
/** * @author fuwei.deng * @date 2018年1月9日 上午11:25:59 * @version 1.0.0 */ public enum LockType { /** 可重入锁*/ REENTRANT_LOCK, /** 公平锁*/ FAIR_LOCK, /** 读锁*/ READ_LOCK, /** 写锁*/ WRITE_LOCK; }
- 定义AOP配置和实现。对于加锁的资源,支持spring EL表达式,方便灵活的根据方法参数进行加锁
import java.lang.reflect.Method; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.annotation.Configuration; import org.springframework.core.LocalVariableTableParameterNameDiscoverer; import org.springframework.expression.EvaluationContext; import org.springframework.expression.ExpressionParser; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.expression.spel.support.StandardEvaluationContext; import com.itopener.redisson.spring.boot.autoconfigure.RedissonAutoConfiguration; /** * @author fuwei.deng * @date 2017年6月14日 下午3:11:22 * @version 1.0.0 */ @Aspect @Configuration @ConditionalOnBean(RedissonClient.class) @AutoConfigureAfter(RedissonAutoConfiguration.class) public class RedissonDistributedLockAspectConfiguration { private final Logger logger = LoggerFactory.getLogger(RedissonDistributedLockAspectConfiguration.class); @Autowired private RedissonClient redissonClient; private ExpressionParser parser = new SpelExpressionParser(); private LocalVariableTableParameterNameDiscoverer discoverer = new LocalVariableTableParameterNameDiscoverer(); @Pointcut("@annotation(com.itopener.lock.redisson.spring.boot.autoconfigure.LockAction)") private void lockPoint(){ } @Around("lockPoint()") public Object around(ProceedingJoinPoint pjp) throws Throwable{ Method method = ((MethodSignature) pjp.getSignature()).getMethod(); LockAction lockAction = method.getAnnotation(LockAction.class); String key = lockAction.value(); Object[] args = pjp.getArgs(); key = parse(key, method, args); RLock lock = getLock(key, lockAction); if(!lock.tryLock(lockAction.waitTime(), lockAction.leaseTime(), lockAction.unit())) { logger.debug("get lock failed [{}]", key); return null; } //得到锁,执行方法,释放锁 logger.debug("get lock success [{}]", key); try { return pjp.proceed(); } catch (Exception e) { logger.error("execute locked method occured an exception", e); } finally { lock.unlock(); logger.debug("release lock [{}]", key); } return null; } /** * @description 解析spring EL表达式 * @author fuwei.deng * @date 2018年1月9日 上午10:41:01 * @version 1.0.0 * @param key 表达式 * @param method 方法 * @param args 方法参数 * @return */ private String parse(String key, Method method, Object[] args) { String[] params = discoverer.getParameterNames(method); EvaluationContext context = new StandardEvaluationContext(); for (int i = 0; i < params.length; i ++) { context.setVariable(params[i], args[i]); } return parser.parseExpression(key).getValue(context, String.class); } private RLock getLock(String key, LockAction lockAction) { switch (lockAction.lockType()) { case REENTRANT_LOCK: return redissonClient.getLock(key); case FAIR_LOCK: return redissonClient.getFairLock(key); case READ_LOCK: return redissonClient.getReadWriteLock(key).readLock(); case WRITE_LOCK: return redissonClient.getReadWriteLock(key).writeLock(); default: throw new RuntimeException("do not support lock type:" + lockAction.lockType().name()); } } }
- 最后就在resources/META-INF/spring.factories中配置这个类,以便spring boot自动加载配置
# Auto Configure org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.itopener.lock.redisson.spring.boot.autoconfigure.RedissonDistributedLockAspectConfiguration
- 这样,就可以在方法上加LockAction注解来使用分布式锁了,示例如下:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import com.itopener.demo.lock.redisson.vo.UserVO; import com.itopener.lock.redisson.spring.boot.autoconfigure.LockAction; import com.itopener.lock.redisson.spring.boot.autoconfigure.LockType; @Service public class RedissonLockService { private final Logger logger = LoggerFactory.getLogger(RedissonLockService.class); @LockAction("'updateKey'") public void update(String key){ try { Thread.sleep(5000); } catch (InterruptedException e) { logger.error("exp", e); } } @LockAction("#userVO.id") public void spel(UserVO userVO){ try { Thread.sleep(5000); } catch (InterruptedException e) { logger.error("exp", e); } } @LockAction(value = "#userVO.id", lockType = LockType.WRITE_LOCK, waitTime = 30000) public void update(UserVO userVO){ logger.info("write user : {}", userVO.getId()); try { Thread.sleep(500); } catch (InterruptedException e) { logger.error("exp", e); } } @LockAction(value = "#userVO.id", lockType = LockType.READ_LOCK, waitTime = 30000) public UserVO read(UserVO userVO) { logger.info("read user : {}", userVO.getId()); try { Thread.sleep(10000); } catch (InterruptedException e) { logger.error("exp", e); } return userVO; } }
参考资料:
源码:
https://gitee.com/itopener/springboot
- starter目录:itopener-parent / spring-boot-starters-parent / lock-redisson-spring-boot-starter-parent
- demo目录:itopener-parent / demo-parent / demo-lock-redisson