基于Redis实现分布式应用限流


声明:本文转载自https://my.oschina.net/giegie/blog/1525931,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务。

前几天在DD的公众号,看了一篇关于使用 瓜娃 实现单应用限流的方案 --》原文,参考《redis in action》 实现了一个jedis版本的,都属于业务层次限制。 实际场景中常用的限流策略:

  • Nginx接入层限流
    按照一定的规则如帐号、IP、系统调用逻辑等在Nginx层面做限流

  • 业务应用系统限流
    通过业务代码控制流量这个流量可以被称为信号量,可以理解成是一种锁,它可以限制一项资源最多能同时被多少进程访问。

代码实现

import redis.clients.jedis.Jedis; import redis.clients.jedis.Transaction; import redis.clients.jedis.ZParams;  import java.util.List; import java.util.UUID;  /**  * @email wangiegie@gmail.com  * @data 2017-08  */ public class RedisRateLimiter {     private static final String BUCKET = "BUCKET";     private static final String BUCKET_COUNT = "BUCKET_COUNT";     private static final String BUCKET_MONITOR = "BUCKET_MONITOR";      static String acquireTokenFromBucket(             Jedis jedis, int limit, long timeout) {         String identifier = UUID.randomUUID().toString();         long now = System.currentTimeMillis();         Transaction transaction = jedis.multi();          //删除信号量         transaction.zremrangeByScore(BUCKET_MONITOR.getBytes(), "-inf".getBytes(), String.valueOf(now - timeout).getBytes());         ZParams params = new ZParams();         params.weightsByDouble(1.0,0.0);         transaction.zinterstore(BUCKET, params, BUCKET, BUCKET_MONITOR);          //计数器自增         transaction.incr(BUCKET_COUNT);         List<Object> results = transaction.exec();         long counter = (Long) results.get(results.size() - 1);          transaction = jedis.multi();         transaction.zadd(BUCKET_MONITOR, now, identifier);         transaction.zadd(BUCKET, counter, identifier);         transaction.zrank(BUCKET, identifier);         results = transaction.exec();         //获取排名,判断请求是否取得了信号量         long rank = (Long) results.get(results.size() - 1);         if (rank < limit) {             return identifier;         } else {//没有获取到信号量,清理之前放入redis 中垃圾数据             transaction = jedis.multi();             transaction.zrem(BUCKET_MONITOR, identifier);             transaction.zrem(BUCKET, identifier);             transaction.exec();         }         return null;     } } 

调用

测试接口调用 @GetMapping("/") public void index(HttpServletResponse response) throws IOException {     Jedis jedis = jedisPool.getResource();     String token = RedisRateLimiter.acquireTokenFromBucket(jedis, LIMIT, TIMEOUT);     if (token == null) {         response.sendError(500);     }else{         //TODO 你的业务逻辑     }     jedisPool.returnResource(jedis); } 

优化

使用拦截器 + 注解优化代码

拦截器

@Configuration static class WebMvcConfigurer extends WebMvcConfigurerAdapter {     private Logger logger = LoggerFactory.getLogger(WebMvcConfigurer.class);     @Autowired     private JedisPool jedisPool;      public void addInterceptors(InterceptorRegistry registry) {         registry.addInterceptor(new HandlerInterceptorAdapter() {             public boolean preHandle(HttpServletRequest request, HttpServletResponse response,                                      Object handler) throws Exception {                 HandlerMethod handlerMethod = (HandlerMethod) handler;                 Method method = handlerMethod.getMethod();                 RateLimiter rateLimiter = method.getAnnotation(RateLimiter.class);                  if (rateLimiter != null){                     int limit = rateLimiter.limit();                     int timeout = rateLimiter.timeout();                     Jedis jedis = jedisPool.getResource();                     String token = RedisRateLimiter.acquireTokenFromBucket(jedis, limit, timeout);                     if (token == null) {                         response.sendError(500);                         return false;                     }                     logger.debug("token -> {}",token);                 }                 return true;             }         }).addPathPatterns("/*");     } } 

定义注解

/**  * @email wangiegie@gmail.com  * @data 2017-08  * 限流注解  */  @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RateLimiter {     int limit() default 5;     int timeout() default 1000; } 

使用

@RateLimiter(limit = 2, timeout = 5000) @GetMapping("/test") public void test() { } 

并发测试

工具:apache-jmeter-3.2
说明: 没有获取到信号量的接口返回500,status是红色,获取到信号量的接口返回200,status是绿色。
当限制请求信号量为2,并发5个线程: image
当限制请求信号量为5,并发10个线程:
image

资料

基于reids + lua的实现

张开涛-聊聊高并发系统之限流特技-1

总结

  1. 对于信号量的操作,使用事务操作。
  2. 不要使用时间戳作为信号量的排序分数,因为在分布式环境中,各个节点的时间差的原因,会出现不公平信号量的现象。
  3. 可以使用把这块代码抽成@rateLimiter注解,然后再方法上使用就会很方便啦
  4. 源码http://git.oschina.net/boding1/pig-cloud

本文发表于2017年08月30日 12:36
(c)注:本文转载自https://my.oschina.net/giegie/blog/1525931,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 2071 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1