[redis分布式锁]redisson分布式锁的实现及spring-boot-starter封装


声明:本文转载自https://my.oschina.net/dengfuwei/blog/1604975,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

本文主要介绍redisson中对于可重入锁、读写锁、公平锁的实现,并利用spring AOP封装成基于方法级别的注解使用方式。

关于redisson的介绍及其spring boot starter的封装参考:spring boot redisson starter的封装和使用

redisson是一个非常强大的redis客户端,封装了很多针对分布式场景的工具,很多工具都使用了大量的Lua脚本来实现,其中分布式锁也是如此。需要明确的是: Redis使用单个Lua解释器去运行所有脚本,并且,Redis也保证脚本会以原子性(atomic)的方式执行,即:当某个脚本正在运行的时候,不会有其他脚本或Redis命令被执行 。(摘自:EVAL-Redis命令参考

redisson中的分布式锁实现了jdk中锁的规范,顶层接口主要是RLock和RReadWriteLock,分别继承自jdk中的Lock和ReadWriteLock,redisson中的代码结构如下:

  • RLock接口实现类的代码结构

RLock接口实现类的代码结构

  • RReadWriteLock接口实现类的代码结构

RReadWriteLock接口实现类的代码结构

从类名上可以看出各种锁对应的类

可重入锁 RedissonLock


可重入锁是一种特殊的互斥锁,同一个线程可以重复获取到锁而不会阻塞(相应的,多次获取之后也需要相等次数的释放锁的操作)。不同线程获取同一个锁的时候是互斥的。

获取锁

具体实现是在RedissonLock中的tryLockInnerAsync方法,源码如下:

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {         internalLockLeaseTime = unit.toMillis(leaseTime);         return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,                   "if (redis.call('exists', KEYS[1]) == 0) then " +                       "redis.call('hset', KEYS[1], ARGV[2], 1); " +                       "redis.call('pexpire', KEYS[1], ARGV[1]); " +                       "return nil; " +                   "end; " +                   "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +                       "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                       "redis.call('pexpire', KEYS[1], ARGV[1]); " +                       "return nil; " +                   "end; " +                   "return redis.call('pttl', KEYS[1]);",                     Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));     } 

主要内容是在Lua脚本当中,从中可以看出Lua脚本的实现逻辑,逻辑流程图如下: Redisson可重入锁Lua脚本逻辑流程

释放锁

具体实现是在RedissonLock中的unlockInnerAsync方法,源码如下:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {     return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,         "if (redis.call('exists', KEYS[1]) == 0) then " +             "redis.call('publish', KEYS[2], ARGV[1]); " +             "return 1; " +         "end;" +         "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +             "return nil;" +         "end; " +         "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +         "if (counter > 0) then " +             "redis.call('pexpire', KEYS[1], ARGV[2]); " +             "return 0; " +         "else " +             "redis.call('del', KEYS[1]); " +             "redis.call('publish', KEYS[2], ARGV[1]); " +             "return 1; "+         "end; " +         "return nil;",             Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); } 

从Lua脚本中可以看出以下几点逻辑:

  • 线程标识:线程是使用uuid和线程id结合起来做唯一标识的,其中uuid是Redisson对象(RedissonClient接口的实现)的属性,即在创建Redisson对象时就生成了。在同一个应用节点当中,线程标识的uuid是相同的,而线程id是不同的;在不同的应用节点当中,线程标识的uuid是不同的,而线程id是可能相同的。总之,在分布式环境当中,线程标识是不会重复的。
  • 释放锁的时候只会释放当前线程自己的锁。利用hashKey判断(hashKey是线程标识)
  • 一个线程如果多次获取锁,必须有相等次数的释放锁,否则锁不会释放。利用value的增长值做判断
  • 释放锁之后,会发布一个消息到指定的channel,channel名称格式为:redisson_lock__channel:{key}

读写锁 ReadWriteLock


读写锁是将资源的操作者分为两类:读者、写者。它允许多个读者同时获取到资源,但只允许一个写者对资源进行操作。

  • redisson中的读写锁顶层接口是:RReadWriteLock,继承了jdk中的读写锁接口:ReadWriteLock
  • 读锁的实现类是 RedissonReadLock,写锁的实现类是 RedissonWriteLock
  • 获取锁方法都是 tryLockInnerAsync,释放锁方法都是 unlockInnerAsync
  • 在Redis中都是利用HASH数据结构,锁的key、hashKey与可重入锁类似,释放锁时也与上面几点可重入锁的逻辑类似
  • 不同于可重入锁的是,读写锁增加了一个hashKey为mode的数据,值为read或write,用于标识锁的类型
  • 获取锁的方式
// 获取读锁 redissonClient.getReadWriteLock(key).readLock()  // 获取写锁 redissonClient.getReadWriteLock(key).writeLock() 

公平锁 FairLock


公平锁和可重入锁一样继承了jdk中的 java.util.concurrent.locks.Lock 接口,在提供了自动过期解锁功能的同时,保证了当多个线程同时请求加锁时,优先分配给先发出请求的线程。公平锁的实现类是 RedissonFairLock。主要用到的Redis的两种数据结构及其作用

  • redisson_lock_queue:{key},List列表,用于存储线程列表,线程重复获取锁时会存在多个元素,保证获取锁的线程的顺序,实现优先分配给先发出请求的线程的功能。
  • redisson_lock_timeout:{key},SortedSet有序集合,用于存储线程获取锁时等待的超时时间,SortedSet中的score存储的是获取锁的等待超时时间,值越小说明越先请求获取锁,因此List中的线程顺序和SortedSet中的线程顺序是一致的(但并没有强行要求顺序必须一致)。线程重复获取锁时会重置score的值

获取锁

具体实现的方法是:tryLockInnerAsync,获取锁之前会清理等待超时的线程,Lua脚本如下:

// remove stale threads while true do     local firstThreadId2 = redis.call("lindex", KEYS[2], 0);     if firstThreadId2 == false then         break;     end;     local timeout = tonumber(redis.call("zscore", KEYS[3], firstThreadId2));     if timeout <= tonumber(ARGV[4]) then         redis.call("zrem", KEYS[3], firstThreadId2);         redis.call("lpop", KEYS[2]);     else         break;     end; end; 

处理逻辑:

公平锁清理过期线程处理流程

加锁处理的Lua源码:

if (redis.call("exists", KEYS[1]) == 0) and ((redis.call("exists", KEYS[2]) == 0)  or (redis.call("lindex", KEYS[2], 0) == ARGV[2])) then     redis.call("lpop", KEYS[2]);     redis.call("zrem", KEYS[3], ARGV[2]);     redis.call("hset", KEYS[1], ARGV[2], 1);     redis.call("pexpire", KEYS[1], ARGV[1]);     return nil; end; if (redis.call("hexists", KEYS[1], ARGV[2]) == 1) then     redis.call("hincrby", KEYS[1], ARGV[2], 1);     redis.call("pexpire", KEYS[1], ARGV[1]);     return nil; end;                              local firstThreadId = redis.call("lindex", KEYS[2], 0); local ttl; if firstThreadId ~= false and firstThreadId ~= ARGV[2] then     ttl = tonumber(redis.call("zscore", KEYS[3], firstThreadId)) - tonumber(ARGV[4]); else     ttl = redis.call("pttl", KEYS[1]); end;                              local timeout = ttl + tonumber(ARGV[3]); if redis.call("zadd", KEYS[3], timeout, ARGV[2]) == 1 then     redis.call("rpush", KEYS[2], ARGV[2]); end; return ttl; 

释放锁

公平锁的释放同样也遵循可重入锁介绍的几点逻辑,这里就不过多介绍,可以通过源码中的Lua脚本看到具体的实现。释放锁的时候也同样会先移除已等待超时的线程,处理逻辑与获取锁时一样。

以上就是redisson中可重入锁、读写锁、公平锁的一些实现方式。

spring boot starter的封装(基于spring AOP)


了解了以上几种锁的实现方式之后,我们可以结合spring AOP封装成spring boot starter,这样使用起来就会更加方便。此封装是在redisson-spring-boot-starter的基础之上进行的,请先阅读文章开头提到的 spring boot redisson starter的封装和使用

  • 首先,引入主要的依赖包,spring AOP支持和redisson-spring-boot-starter
<dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-aop</artifactId> </dependency>  <dependency>     <groupId>com.itopener</groupId>     <artifactId>redisson-spring-boot-starter</artifactId>     <version>1.0.0-SNAPSHOT</version> </dependency> 
  • 既然是基于AOP的封装,需要定义一个方法级别的注解,注解的属性与redisson中分布式锁需要的参数保持一致,但额外增加一个锁类型的枚举,便于支持多种分布式锁源码如下:
import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import java.util.concurrent.TimeUnit;  import org.springframework.core.annotation.AliasFor;  /**  * @author fuwei.deng  * @date 2018年1月10日 上午10:47:50  * @version 1.0.0  */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface LockAction {  	/** 锁的资源,key。支持spring El表达式*/ 	@AliasFor("key") 	String value() default "'default'"; 	 	@AliasFor("value") 	String key() default "'default'"; 	 	/** 锁类型*/ 	LockType lockType() default LockType.REENTRANT_LOCK; 	 	/** 获取锁等待时间,默认3秒*/ 	long waitTime() default 3000L; 	 	/** 锁自动释放时间,默认30秒*/ 	long leaseTime() default 30000L; 	 	/** 时间单位(获取锁等待时间和持锁时间都用此单位)*/ 	TimeUnit unit() default TimeUnit.MILLISECONDS; } 

关联的枚举定义如下:

/**    * @author fuwei.deng  * @date 2018年1月9日 上午11:25:59  * @version 1.0.0  */ public enum LockType {  	/** 可重入锁*/ 	REENTRANT_LOCK, 	 	/** 公平锁*/ 	FAIR_LOCK, 	 	/** 读锁*/ 	READ_LOCK, 	 	/** 写锁*/ 	WRITE_LOCK; } 
  • 定义AOP配置和实现。对于加锁的资源,支持spring EL表达式,方便灵活的根据方法参数进行加锁
import java.lang.reflect.Method;  import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.annotation.Configuration; import org.springframework.core.LocalVariableTableParameterNameDiscoverer; import org.springframework.expression.EvaluationContext; import org.springframework.expression.ExpressionParser; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.expression.spel.support.StandardEvaluationContext;  import com.itopener.redisson.spring.boot.autoconfigure.RedissonAutoConfiguration;  /**  * @author fuwei.deng  * @date 2017年6月14日 下午3:11:22  * @version 1.0.0  */ @Aspect @Configuration @ConditionalOnBean(RedissonClient.class) @AutoConfigureAfter(RedissonAutoConfiguration.class) public class RedissonDistributedLockAspectConfiguration { 	 	private final Logger logger = LoggerFactory.getLogger(RedissonDistributedLockAspectConfiguration.class); 	 	@Autowired 	private RedissonClient redissonClient;  	private ExpressionParser parser = new SpelExpressionParser();  	private LocalVariableTableParameterNameDiscoverer discoverer = new LocalVariableTableParameterNameDiscoverer();  	@Pointcut("@annotation(com.itopener.lock.redisson.spring.boot.autoconfigure.LockAction)") 	private void lockPoint(){ 		 	} 	 	@Around("lockPoint()") 	public Object around(ProceedingJoinPoint pjp) throws Throwable{ 		Method method = ((MethodSignature) pjp.getSignature()).getMethod(); 		LockAction lockAction = method.getAnnotation(LockAction.class); 		String key = lockAction.value(); 		Object[] args = pjp.getArgs(); 		key = parse(key, method, args); 		 		RLock lock = getLock(key, lockAction); 		if(!lock.tryLock(lockAction.waitTime(), lockAction.leaseTime(), lockAction.unit())) { 			logger.debug("get lock failed [{}]", key); 			return null; 		} 		 		//得到锁,执行方法,释放锁 		logger.debug("get lock success [{}]", key); 		try { 			return pjp.proceed(); 		} catch (Exception e) { 			logger.error("execute locked method occured an exception", e); 		} finally { 			lock.unlock(); 			logger.debug("release lock [{}]", key); 		} 		return null; 	}  	/** 	 * @description 解析spring EL表达式 	 * @author fuwei.deng 	 * @date 2018年1月9日 上午10:41:01 	 * @version 1.0.0 	 * @param key 表达式 	 * @param method 方法 	 * @param args 方法参数 	 * @return 	 */ 	private String parse(String key, Method method, Object[] args) { 		String[] params = discoverer.getParameterNames(method); 		EvaluationContext context = new StandardEvaluationContext(); 		for (int i = 0; i < params.length; i ++) { 			context.setVariable(params[i], args[i]); 		} 		return parser.parseExpression(key).getValue(context, String.class); 	} 	 	private RLock getLock(String key, LockAction lockAction) { 		switch (lockAction.lockType()) { 			case REENTRANT_LOCK: 				return redissonClient.getLock(key); 			 			case FAIR_LOCK: 				return redissonClient.getFairLock(key); 				 			case READ_LOCK: 				return redissonClient.getReadWriteLock(key).readLock(); 			 			case WRITE_LOCK: 				return redissonClient.getReadWriteLock(key).writeLock(); 				 			default: 				throw new RuntimeException("do not support lock type:" + lockAction.lockType().name()); 			} 	} } 
  • 最后就在resources/META-INF/spring.factories中配置这个类,以便spring boot自动加载配置
# Auto Configure org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.itopener.lock.redisson.spring.boot.autoconfigure.RedissonDistributedLockAspectConfiguration 
  • 这样,就可以在方法上加LockAction注解来使用分布式锁了,示例如下:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service;  import com.itopener.demo.lock.redisson.vo.UserVO; import com.itopener.lock.redisson.spring.boot.autoconfigure.LockAction; import com.itopener.lock.redisson.spring.boot.autoconfigure.LockType;  @Service public class RedissonLockService {  	private final Logger logger = LoggerFactory.getLogger(RedissonLockService.class); 	 	@LockAction("'updateKey'") 	public void update(String key){ 		try { 			Thread.sleep(5000); 		} catch (InterruptedException e) { 			logger.error("exp", e); 		} 	} 	 	@LockAction("#userVO.id") 	public void spel(UserVO userVO){ 		try { 			Thread.sleep(5000); 		} catch (InterruptedException e) { 			logger.error("exp", e); 		} 	} 	 	@LockAction(value = "#userVO.id", lockType = LockType.WRITE_LOCK, waitTime = 30000) 	public void update(UserVO userVO){ 		logger.info("write user : {}", userVO.getId()); 		try { 			Thread.sleep(500); 		} catch (InterruptedException e) { 			logger.error("exp", e); 		} 	} 	 	@LockAction(value = "#userVO.id", lockType = LockType.READ_LOCK, waitTime = 30000) 	public UserVO read(UserVO userVO) { 		logger.info("read user : {}", userVO.getId()); 		try { 			Thread.sleep(10000); 		} catch (InterruptedException e) { 			logger.error("exp", e); 		} 		return userVO; 	} } 

参考资料:

源码:

https://gitee.com/itopener/springboot

  • starter目录:itopener-parent / spring-boot-starters-parent / lock-redisson-spring-boot-starter-parent
  • demo目录:itopener-parent / demo-parent / demo-lock-redisson

本文发表于2018年01月10日 12:32
(c)注:本文转载自https://my.oschina.net/dengfuwei/blog/1604975,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 8498 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1