> 原文:http://www.itmuch.com/spring-cloud-sum/hystrix-threadlocal/
> 采用 CC BY 3.0 CN 许可协议。可自由转载、引用,但需署名作者且注明文章原文地址。
目前,Spring Cloud已在南京公司推广开来,不仅如此,深圳那边近期也要基于Spring Cloud新开微服务了。
于是,领导要求我出一套基于Spring Cloud的快速开发脚手架(近期开源)。在编写脚手架的过程中,也顺带总结一下以前在项目中遇到的问题:
> 使用Hystrix时,如何传播ThreadLocal对象?
我们知道,Hystrix有隔离策略:THREAD以及SEMAPHORE。
> 如果你不知道Hystrix的隔离策略,可以阅读我的书籍《Spring Cloud与Docker微服务架构实战》,或者参考文档:<https://github.com/Netflix/Hystrix/wiki/Configuration#executionisolationstrategy>
引子
当隔离策略为 THREAD
时,是没办法拿到 ThreadLocal
中的值的。
举个例子,使用Feign调用某个远程API,这个远程API需要传递一个Header,这个Header是动态的,跟你的HttpRequest相关,我们选择编写一个拦截器来实现Header的传递(当然也可以在Feign Client接口的方法上加RequestHeader
)。
示例代码:
public class KeycloakRequestInterceptor implements RequestInterceptor { private static final String AUTHORIZATION_HEADER = "Authorization"; @Override public void apply(RequestTemplate template) { ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); Principal principal = attributes.getRequest().getUserPrincipal(); if (principal != null && principal instanceof KeycloakPrincipal) { KeycloakSecurityContext keycloakSecurityContext = ((KeycloakPrincipal) principal) .getKeycloakSecurityContext(); if (keycloakSecurityContext instanceof RefreshableKeycloakSecurityContext) { RefreshableKeycloakSecurityContext.class.cast(keycloakSecurityContext) .refreshExpiredToken(true); template.header(AUTHORIZATION_HEADER, "Bearer " + keycloakSecurityContext.getTokenString()); } } // 否则啥都不干 } }
你可能不知道Keycloak是什么,不过没有关系,相信这段代码并不难阅读,该拦截器做了几件事:
- 使用
RequestContextHolder.getRequestAttributes()
静态方法获得Request。 - 从Request获得当前用户的身份,然后使用Keycloak的API拿到Token,并扔到Header里。
- 这样,Feign使用这个拦截器时,就会用你这个Header去请求了。
> 注:Keycloak是一个非常容易上手,并且功能强大的单点认证平台。
现实很骨感
以上代码可完美运行——但仅限于Feign不开启Hystrix支持时。
> 注:Spring Cloud Dalston以及更高版可使用 feign.hystrix.enabled=true
为Feign开启Hystrix支持。
当Feign开启Hystrix支持时,
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
是 null
。
原因在于,Hystrix的默认隔离策略是THREAD
。而 RequestContextHolder
源码中,使用了两个血淋淋的ThreadLocal
。
解决方案一:调整隔离策略
将隔离策略设为SEMAPHORE即可:
hystrix.command.default.execution.isolation.strategy: SEMAPHORE
这样配置后,Feign可以正常工作。
但该方案不是特别好。原因是Hystrix官方强烈建议使用THREAD作为隔离策略! 参考文档:
> ##### Thread or Semaphore > > The default, and the recommended setting, is to run HystrixCommand
s using thread isolation (THREAD
) and HystrixObservableCommand
s using semaphore isolation (SEMAPHORE
). > > Commands executed in threads have an extra layer of protection against latencies beyond what network timeouts can offer. > > Generally the only time you should use semaphore isolation for HystrixCommand
s is when the call is so high volume (hundreds per second, per instance) that the overhead of separate threads is too high; this typically only applies to non-network calls.
于是,那么有没有更好的方案呢?
解决方案二:自定义并发策略
既然Hystrix不太建议使用SEMAPHORE作为隔离策略,那么是否有其他方案呢?答案是自定义并发策略,目前,Spring Cloud Sleuth以及Spring Security都通过该方式传递 ThreadLocal
对象。
下面我们来编写自定义的并发策略。
编写自定义并发策略
编写自定义并发策略比较简单,只需编写一个类,让其继承HystrixConcurrencyStrategy
,并重写wrapCallable
方法即可。
代码示例:
@Component public class RequestAttributeHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { private static final Log log = LogFactory.getLog(RequestHystrixConcurrencyStrategy.class); public RequestHystrixConcurrencyStrategy() { HystrixPlugins.reset(); HystrixPlugins.getInstance().registerConcurrencyStrategy(this); } @Override public <T> Callable<T> wrapCallable(Callable<T> callable) { RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes(); return new WrappedCallable<>(callable, requestAttributes); } static class WrappedCallable<T> implements Callable<T> { private final Callable<T> target; private final RequestAttributes requestAttributes; public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) { this.target = target; this.requestAttributes = requestAttributes; } @Override public T call() throws Exception { try { RequestContextHolder.setRequestAttributes(requestAttributes); return target.call(); } finally { RequestContextHolder.resetRequestAttributes(); } } } }
如代码所示,我们编写了一个RequestHystrixConcurrencyStrategy
,在其中:
wrapCallable
方法拿到 RequestContextHolder.getRequestAttributes()
,也就是我们想传播的对象; - 在
WrappedCallable
类中,我们将要传播的对象作为成员变量,并在其中的call方法中,为静态方法设值。 - 这样,在Hystrix包裹的方法中,就可以使用
RequestContextHolder.getRequestAttributes()
获取到相关属性——也就是说,可以拿到RequestContextHolder
中的ThreadLocal
属性。
经过测试,代码能正常工作。
新的问题
至此,我们已经实现了ThreadLocal
属性的传递,然而Hystrix只允许有一个并发策略!这意味着——如果不做任何处理,Sleuth、Spring Security将无法正常拿到上下文!(上文说过,目前Sleuth、Spring Security都是通过自定义并发策略的方式来传递ThreadLocal对象的。)
如何解决这个问题呢?
我们知道,Spring Cloud中,Spring Cloud Security与Spring Cloud Sleuth是可以共存的!我们不妨参考下Sleuth以及Spring Security的实现:
- Sleuth:
org.springframework.cloud.sleuth.instrument.hystrix.SleuthHystrixConcurrencyStrategy
- Spring Security:
org.springframework.cloud.netflix.hystrix.security.SecurityContextConcurrencyStrategy
阅读完后,你将恍然大悟——于是,我们可以模仿它们的写法,改写上文编写的并发策略:
public class RequestAttributeHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { private static final Log log = LogFactory.getLog(RequestAttributeHystrixConcurrencyStrategy.class); private HystrixConcurrencyStrategy delegate; public RequestAttributeHystrixConcurrencyStrategy() { try { this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy(); if (this.delegate instanceof RequestAttributeHystrixConcurrencyStrategy) { // Welcome to singleton hell... return; } HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins .getInstance().getCommandExecutionHook(); HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance() .getEventNotifier(); HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance() .getMetricsPublisher(); HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance() .getPropertiesStrategy(); this.logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher, propertiesStrategy); HystrixPlugins.reset(); HystrixPlugins.getInstance().registerConcurrencyStrategy(this); HystrixPlugins.getInstance() .registerCommandExecutionHook(commandExecutionHook); HystrixPlugins.getInstance().registerEventNotifier(eventNotifier); HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher); HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy); } catch (Exception e) { log.error("Failed to register Sleuth Hystrix Concurrency Strategy", e); } } private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier, HystrixMetricsPublisher metricsPublisher, HystrixPropertiesStrategy propertiesStrategy) { if (log.isDebugEnabled()) { log.debug("Current Hystrix plugins configuration is [" + "concurrencyStrategy [" + this.delegate + "]," + "eventNotifier [" + eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "]," + "propertiesStrategy [" + propertiesStrategy + "]," + "]"); log.debug("Registering Sleuth Hystrix Concurrency Strategy."); } } @Override public <T> Callable<T> wrapCallable(Callable<T> callable) { RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes(); return new WrappedCallable<>(callable, requestAttributes); } @Override public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) { return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties); } @Override public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) { return this.delegate.getBlockingQueue(maxQueueSize); } @Override public <T> HystrixRequestVariable<T> getRequestVariable( HystrixRequestVariableLifecycle<T> rv) { return this.delegate.getRequestVariable(rv); } static class WrappedCallable<T> implements Callable<T> { private final Callable<T> target; private final RequestAttributes requestAttributes; public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) { this.target = target; this.requestAttributes = requestAttributes; } @Override public T call() throws Exception { try { RequestContextHolder.setRequestAttributes(requestAttributes); return target.call(); } finally { RequestContextHolder.resetRequestAttributes(); } } } }
简单讲解下:
- 将现有的并发策略作为新并发策略的成员变量
- 在新并发策略中,返回现有并发策略的线程池、Queue。
Pull Request
笔者已将该实现方式Pull Request:<https://github.com/spring-cloud/spring-cloud-netflix/pull/2509> ,希望官方能够接纳,也希望在不久的将来,能够更舒服、更爽地使用Spring Cloud。
PS. Pull Request的代码跟博客中的代码略有区别,有少量简单的优化,主要是增加了一个开关。
灵感来自