SpringCloud 之 Zuul 源代码详细笔记


声明:本文转载自https://my.oschina.net/alexqdjay/blog/1802503,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

1. Zuul 介绍

ZuulSpring Cloud 微服务体系中担任很重要的角色--服务网关,是基于JVM的路由器和负载均衡器。

Zuul 的基本使用以及 Filter 的介绍就不在这说明了,本文主要介绍 Zuul 的原理。

2. Zuul 处理流程

处理流程如下:

 Request => ZuulHandlerMapping => ZuulController => ZuulServlet  

主要的接收逻辑都在 ZuulServlet 中,执行 Filter 的逻辑,根据 Filter 的类型依次执行,如下代码:

try {     preRoute(); } catch (ZuulException e) {     error(e);     postRoute();     return; } try {     route(); } catch (ZuulException e) {     error(e);     postRoute();     return; } try {     postRoute(); } catch (ZuulException e) {     error(e);     return; }  

接收的代码已经清楚了,其实 Zuul 组件的功能就到这边了,剩下对请求进行路由其实主要使用了Ribbon 组件进行的,因此下面与其说是介绍 Zuul 到不如说是 Ribbon 的介绍。

路由的逻辑处理主要是 route()Route Filter 进行的。

3. Route Filter

Zuul 中 Route FilterSimpleHostRoutingFilterRibbonRoutingFilter, 有人说还有 SendForwardFilter(本地的先不关注)。

3.1 SimpleHostRoutingFilter

当你配置路由时,直接配置 Url 而不是 serviceId ,那么就是使用的 SimpleHostRoutingFilter,相反就是用的 RibbonRoutingFilter

主要逻辑:

public Object run() { 	// 省略没用逻辑 ... 	 	String uri = this.helper.buildZuulRequestURI(request); 	this.helper.addIgnoredHeaders();  	try { 	   // forward 主要逻辑 		CloseableHttpResponse response = forward(this.httpClient, verb, uri, request, headers, params, requestEntity); 		setResponse(response); 	} 	catch (Exception ex) { 		throw new ZuulRuntimeException(ex); 	} }  // 从返回就能看出来调用 httpClient 完成的http请求 private CloseableHttpResponse forward () { // ... 	// forwardRequest 	CloseableHttpResponse zuulResponse = forwardRequest(httpclient, httpHost, httpRequest); 	return zuulResponse;  // ... }  // 通过 httpClient 发请求 private CloseableHttpResponse forwardRequest(CloseableHttpClient httpclient, 			HttpHost httpHost, HttpRequest httpRequest) throws IOException { 	return httpclient.execute(httpHost, httpRequest); }  

总结:构建 Request 然后通过 httpClient 进行请求。

3.2 RibbonRoutingFilter

public Object run() { 	// ... 	// 构建请求上下文,其实就是保护一下参数,如serviceId, retryable, url, 原request等 	RibbonCommandContext commandContext = buildCommandContext(context); 	// forward 	ClientHttpResponse response = forward(commandContext); 	setResponse(response); 	return response; }  protected ClientHttpResponse forward(RibbonCommandContext context) { 	// ... 	// 这里的重点转到 command 上了,主要逻辑都是 command 中执行 	RibbonCommand command = this.ribbonCommandFactory.create(context); 	ClientHttpResponse response = command.execute(); 	return response; } 

4. Command

4.1 继承关系

HystrixCommand RibbonCommand 	<- AbstractRibbonCommand  		<- HttpClientRibbonCommand / RestClientRibbonCommand / OkHttpRibbonCommand 

主要的逻辑都是 AbstractRibbonCommand 中,子类是不同选型 HttpClient, OkHttpHttpURLConnection

4.2 AbstractRibbonCommand

由于继承的 HystrixCommand 所以需要实现 run() 方法,上面调用execute() 是来自 HystrixCommand 主体逻辑需要 run() 中实现。

protected ClientHttpResponse run() throws Exception { 	final RequestContext context = RequestContext.getCurrentContext();  	// 根据不同实现创建不同的Request 	RQ request = createRequest(); 	// 执行负载均衡逻辑,其中 client 是 ribbonCommandFactory.create 中置入的 	RS response = this.client.executeWithLoadBalancer(request, config);  	context.set("ribbonResponse", response);  	if (this.isResponseTimedOut()) { 		if (response != null) { 			response.close(); 		} 	}  	return new RibbonHttpResponse(response); } 

4.3 RibbonCommandFactory

CommandFactory 观察下 client 是什么类,做什么事情。

ribbonCommandFactory 的继承关系:

RibbonCommandFactory    	<- AbstractRibbonCommandFactory    		<- HttpClientRibbonCommandFactory 		   OkHttpRibbonCommandFactory 		   RestClientRibbonCommandFactory 

以默认的 HttpClientRibbonCommandFactory 为例,代码如下:

public HttpClientRibbonCommand create(final RibbonCommandContext context) {     // 获取降级处理 	ZuulFallbackProvider zuulFallbackProvider = getFallbackProvider(context.getServiceId()); 	// serviceId 是根据请求的url来比对配置的路由得到的 	final String serviceId = context.getServiceId(); 	// clientFactory 根据 ServiceId 获取相关的组件,包括 IRule, IClientConfig, ILoadBalancer 等组件 	final RibbonLoadBalancingHttpClient client = this.clientFactory.getClient( 			serviceId, RibbonLoadBalancingHttpClient.class); 	client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId)); 	// 不同类型的 Factory 会获取不同的 client,生成不同的 Command 	return new HttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider, 			clientFactory.getClientConfig(serviceId)); } 

由此可见 client 就是 RibbonLoadBalancingHttpClient ,当然其他实现也会对应不一样的 client

4.4 Client

client 的继承比较复杂,从主要的继承看:

LoadBalancerContext  	<- AbstractLoadBalancerAwareClient  		<- AbstractLoadBalancingClient  			<- RibbonLoadBalancingHttpClient 			   OkHttpLoadBalancingClient 

4.2 中 this.client.executeWithLoadBalancer 执行的是:

// AbstractLoadBalancerAwareClient.executeWithLoadBalancer() public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {     // 重试策略处理类,要判断是不是重试可以重写这个     RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig);     // 专门用于失败切换其他服务端进行重试的 Command     LoadBalancerCommand<T> command = LoadBalancerCommand.<T>builder()             .withLoadBalancerContext(this)             .withRetryHandler(handler)             .withLoadBalancerURI(request.getUri())             .build();      try {         // 见下面分析         return command.submit(             new ServerOperation<T>() {                 @Override                 public Observable<T> call(Server server) {                     URI finalUri = reconstructURIWithServer(server, request.getUri());                     S requestForServer = (S) request.replaceUri(finalUri);                     try {                     	// 真实动作,执行 execute                         return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));                     }                      catch (Exception e) {                         return Observable.error(e);                     }                 }             })             .toBlocking()             .single();     } catch (Exception e) {         Throwable t = e.getCause();         if (t instanceof ClientException) {             throw (ClientException) t;         } else {             throw new ClientException(e);         }     } } 

这里有两个地方需要注重看的:

  1. LoadBalancerCommand 实现
  2. execute() 逻辑

先看 execute()LoadBalancerCommand 单独介绍

4.4.1 execute()

以默认的 RibbonLoadBalancingHttpClient 为例。(RibbonLoadBalancingHttpClient 将会被改名为ApacheHttpLoadBalancingClient,因为它的兄弟叫OkHttpLoadBalancingClient这样看上去比较像比较对称)。

public RibbonApacheHttpResponse execute(RibbonApacheHttpRequest request,final IClientConfig configOverride) { 	// ... 	 	final HttpUriRequest httpUriRequest = request.toRequest(requestConfig); 	// delegate == httpClient, 是通过 createDelegate 创建的,那么 okHttp 的相应的地方就是 okHttp 对应的 client 	// 所以,这里就是发送普通的 http 请求 	final HttpResponse httpResponse = this.delegate.execute(httpUriRequest); 	return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI()); } 

4.5 LoadBalancerCommand

主要逻辑都是在 submit 中, 使用了 rxJava 的特性进行重试, 下面删除了很多细节代码,剩下主干重试逻辑。

public Observable<T> submit(final ServerOperation<T> operation) {     // ...          // 外层的 observable 为了不同目标的重试     // selectServer() 是进行负载均衡,返回的是一个 observable,可以重试,重试时再重新挑选一个目标server     Observable<T> o = selectServer().concatMap(server -> {     	// 这里又开启一个 observable 主要是为了同机重试     	Observable<T> o = Observable 	      .just(server) 	      .concatMap(server -> { 	          return operation.call(server).doOnEach(new Observer<T>() { 	          	 @Override                  public void onCompleted() {                  	// server 状态的统计,譬如消除联系异常,抵消activeRequest等                  }                                    @Override                  public void onError() {                  	// server 状态的统计,错误统计等                  }                                    @Override                  public void onNext() {                     // 获取 entity, 返回内容                  } 	          }); 	    }) 	    // 如果设置了同机重试,进行重试 	    if (maxRetrysSame > 0)  	        // retryPolicy 判断是否重试,具体分析看下面 	        o = o.retry(retryPolicy(maxRetrysSame, true)); 	    return o;     })          // 设置了异机重试,进行重试     if (maxRetrysNext > 0)          o = o.retry(retryPolicy(maxRetrysNext, false));          return o.onErrorResumeNext(exp -> {     	return Observable.error(e);     }); } 

主要的重试逻辑如上,但是细节需要分析的:

  1. retryPolicy()
  2. selectServer()
  3. 目标 Server 状态记录

4.5.1 retryPolicy

private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) {     return new Func2<Integer, Throwable, Boolean>() {         @Override         public Boolean call(Integer tryCount, Throwable e) {             if (e instanceof AbortExecutionException) {                 return false;             } 			  // 重构总次数还是会放弃重试的             if (tryCount > maxRetrys) {                 return false;             }                          if (e.getCause() != null && e instanceof RuntimeException) {                 e = e.getCause();             }                          // 判断 exception 是否进行重试,可以自定义 handler 进行定制化             return retryHandler.isRetriableException(e, same);         }     }; } 

4.5.2 selectServer

private Observable<Server> selectServer() {     return Observable.create(new OnSubscribe<Server>() {         @Override         public void call(Subscriber<? super Server> next) {             try {                 // 通过 loadBalancerContext.getServerFromLoadBalancer 来进行负载均衡                 Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);                    next.onNext(server);                 next.onCompleted();             } catch (Exception e) {                 next.onError(e);             }         }     }); } 

loadBalancerContext.getServerFromLoadBalancer 进行负载均衡选择下一个请求目标,整个方法比较大,不列出了,把调用关系列出后分析主要的逻辑类。

loadBalancerContext.getServerFromLoadBalancer ()  	> lb.chooseServer()  

实际在作用的是 ILoadBalancer.chooseServer 方法。

4.5.3 ILoadBalancer

ILoadBalancer 继承关系:

ILoadBalancer 	<- AbstractLoadBalancer 		<- BaseLoadBalancer 			<- DynamicServerListLoadBalancer 				<- ZoneAwareLoadBalancer 

ILoadBalancer 接口:

public interface ILoadBalancer { 	void addServers(List<Server> newServers); 	Server chooseServer(Object key); // 主要逻辑 	void markServerDown(Server server); 	@Deprecated 	List<Server> getServerList(boolean availableOnly); 	List<Server> getReachableServers(); 	List<Server> getAllServers(); } 

实现负载均衡的逻辑的类 BaseLoadBalancer, DynamicServerListLoadBalancer 加入动态 ServerList 的功能,负载均衡逻辑并没有补充。

BaseLoadBalancer.chooseServer 主要逻辑代码:

public Server chooseServer(Object key) {     if (counter == null) {         counter = createCounter();     }     counter.increment();     if (rule == null) {         return null;     } else {         try {         	  // Rule 执行挑选逻辑             return rule.choose(key);         } catch (Exception e) {             logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);             return null;         }     } } 

4.6 IRule

IRule <- AbstractLoadBalancerRule 	<- ClientConfigEnabledRoundRobinRule // abstract 		<- BestAvailableRule // 最小连接优先轮询 		   PredicateBasedRule // abstract 		   		<- AvailabilityFilteringRule  		   		<- ZoneAvoidanceRule 	<- RoundRobinRule 		<- WeightedResponseTimeRule 	<- RandomRule 	<- RetryRule 

4.6.1 PredicateBasedRule

基于逻辑断言进行判断是否选择的 Rule, 具体 Predicate 继承如下:

Predicate 	<- AbstractServerPredicate 		<- AvailabilityPredicate // 可用性判断 		   ZoneAvoidancePredicate // 区域选择 		   CompositePredicate    // 复合判断自身没有逻辑,组合其他 Predicate 		   

AvailabilityPredicate

public boolean apply(@Nullable PredicateKey input) {     LoadBalancerStats stats = getLBStats();     if (stats == null) {         return true;     }     // 判断是否跳过     return !shouldSkipServer(stats.getSingleServerStat(input.getServer())); }           private boolean shouldSkipServer(ServerStats stats) { 	 // 如果处于不可用 或者 当前请求大于最大限制 时跳过该目标             if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped())              || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {         return true;     }     return false; } 

ZoneAvoidancePredicate

public boolean apply(@Nullable PredicateKey input) { 	// ... 	// 选择出可用区域,具体逻辑在 ZoneAvoidanceRule 中解析 	Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());    if (availableZones != null) {    		return availableZones.contains(input.getServer().getZone()); 	} else { 	    return false; 	} } 

CompositePredicate 组合逻辑断言

CompositePredicate  // 使用多个 Predicate 组成判断的 And 逻辑链 // 类似 if xx && yy & oo  Predicate<PredicateKey> chain = Predicates.<PredicateKey>and(primaryPredicates);  // 获取可用列表时使用到回退逻辑 public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {     List<Server> result = super.getEligibleServers(servers, loadBalancerKey);     Iterator<AbstractServerPredicate> i = fallbacks.iterator();     // 当筛选下来的server个数不符合配置中的最小个数时,会进行回退重选,一直回退到符合要求或者没有回退逻辑     while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))             && i.hasNext()) {         AbstractServerPredicate predicate = i.next();         result = predicate.getEligibleServers(servers, loadBalancerKey);     }     return result; }  // AbstractServerPredicate 上面 super.getEligibleServers  public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {     if (loadBalancerKey == null) {         return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));                 } else {         List<Server> results = Lists.newArrayList();         for (Server server: servers) {             // 每个 server 经过逻辑断言进行判断进行筛选             if (this.apply(new PredicateKey(loadBalancerKey, server))) {                 results.add(server);             }         }         return results;                 } }  

三大 Predicate 已经介绍完毕,回到主题。

PredicateBasedRule 主要逻辑:

public Server choose(Object key) {     ILoadBalancer lb = getLoadBalancer();     // 基于逻辑断言进行轮询 Predicate 由子类决定     Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);     if (server.isPresent()) {         return server.get();     } else {         return null;     }        } 
// AbstractServerPredicate public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) { 	 // 过滤可用结果, getEligibleServers 上面已经解析     List<Server> eligible = getEligibleServers(servers, loadBalancerKey);     if (eligible.size() == 0) {         return Optional.absent();     }     // 标准轮询     return Optional.of(eligible.get(nextIndex.getAndIncrement() % eligible.size())); } 

4.6.2 AvailabilityFilteringRule

AvailabilityFilteringRule 目标可用性轮询

public Server choose(Object key) {     int count = 0;     Server server = roundRobinRule.choose(key);     while (count++ <= 10) {         // 逻辑判断         if (predicate.apply(new PredicateKey(server))) {             return server;         }         // 轮询         server = roundRobinRule.choose(key);     }     return super.choose(key); }  // 其中 predicate // CompositePredicate 组合逻辑,这里只有 AvailabilityPredicate 可用性判断 predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null))                 .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())                 .build();  

4.6.3 ZoneAvoidanceRule

ZoneAvoidanceRule 没有重写 choose 方法,所以还是继承了 PredicateBasedRule,所以过滤逻辑其实就是 compositePredicate. getEligibleServers,而经过上面的解析,getEligibleServers 其实就是所有 server 进行逻辑判断,把通过的返回。

// Predicate 组合了 zonePredicate 和 availabilityPredicate compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate); 

可见主要是 zonePredicateavailabilityPredicate 的逻辑判断。

zonePredicate 上面分析主要调用 ZoneAvoidanceRule.getAvailableZones

// getAvailableZones 主要逻辑 for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {     String zone = zoneEntry.getKey();     ZoneSnapshot zoneSnapshot = zoneEntry.getValue();     int instanceCount = zoneSnapshot.getInstanceCount();     // 没有实例 即排除     if (instanceCount == 0) {         availableZones.remove(zone);         limitedZoneAvailability = true;     } else {         double loadPerServer = zoneSnapshot.getLoadPerServer();         // 不可用率超过阀值 或者 区域本来就不可用,即排除         if (((double) zoneSnapshot.getCircuitTrippedCount())                 / instanceCount >= triggeringBlackoutPercentage                 || loadPerServer < 0) {             availableZones.remove(zone);             limitedZoneAvailability = true;         } else {             // 过滤出 负载最高的几个区域              if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {                 // they are the same considering double calculation                 // round error                 worstZones.add(zone);             } else if (loadPerServer > maxLoadPerServer) {                 maxLoadPerServer = loadPerServer;                 worstZones.clear();                 worstZones.add(zone);             }         }     } }  // 没有排除 并且 最高负载没有超过限制,返回 if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {     // zone override is not needed here     return availableZones; } // 否则 随机排除一个负载高的区域 String zoneToAvoid = randomChooseZone(snapshot, worstZones); if (zoneToAvoid != null) {     availableZones.remove(zoneToAvoid); } return availableZones; 

这里有个问题:为啥当存在排除时即便没有超过限制负载也要排除一个区域?

4.6.4 RoundRobinRule

比较简单,如下:

public Server choose(ILoadBalancer lb, Object key) {     if (lb == null) {         log.warn("no load balancer");         return null;     }      Server server = null;     int count = 0;     while (server == null && count++ < 10) {         List<Server> reachableServers = lb.getReachableServers();         List<Server> allServers = lb.getAllServers();         int upCount = reachableServers.size();         int serverCount = allServers.size();          if ((upCount == 0) || (serverCount == 0)) {             log.warn("No up servers available from load balancer: " + lb);             return null;         }  		  // 累加取模,标准轮询         int nextServerIndex = incrementAndGetModulo(serverCount);         server = allServers.get(nextServerIndex);                  // 非线程安全list,可能会导致size有了对应索引处元素没有同步过来         if (server == null) {             /* Transient. */             Thread.yield();             continue;         }          // 可用即返回,不然下一轮         if (server.isAlive() && (server.isReadyToServe())) {             return (server);         }          // Next.         server = null;     }      // 超过10次没有获取到可用的server     if (count >= 10) {         log.warn("No available alive servers after 10 tries from load balancer: "                 + lb);     }     return server; } 

4.6.5 WeightedResponseTimeRule

// 这里会启动一个维持 使用响应时间计算比重系数 的任务 DynamicServerWeightTask // 主要公式 // totalResponseTime 为所有server 平均响应时间的和,由下公式知,响应越快 weight 越大 // weight = totalResponseTime - ss.getResponseTimeAvg(); // weightSoFar += weight; // finalWeights.add(weightSoFar);  // 0 - maxTotalWeight 的概率假设是平均的,那么 weight 越大区间就越大被选中的概率就越大 // 如 Aw(10) Bw(30) Cw(40) Dw(20) // weightSoFar: 10, 40, 80, 100 // 那么 0-10, 10-40, 40-80, 80-100 可以加 40-80区间最大,概率就越大 double randomWeight = random.nextDouble() * maxTotalWeight; // pick the server index based on the randomIndex int n = 0; for (Double d : currentWeights) {     if (d >= randomWeight) {         serverIndex = n;         break;     } else {         n++;     } } 

4.7 DynamicServerListLoadBalancer

继承自 BaseLoadBalancerBaseLoadBalancer 不同的是它持有 ServerList 对象来进行动态的获取 Server 列表。

4.7.1 ServerList

ServerList 	<- DynamicServerList 	<- DiscoveryEnabledNIWSServerList 
public interface ServerList<T extends Server> {     public List<T> getInitialListOfServers();     public List<T> getUpdatedListOfServers();    } 
  1. DynamicServerList: 定时地从一个 RouteStore 中获取
  2. DiscoveryEnabledNIWSServerList: 从 Eureka 中获取

4.8 Server 状态

怎么轮询怎么选择过滤都已经分析了,但是过滤和选择中使用到 Server Status 是怎么统计的,接下去看。

ServerStats 类记录了 server 的所有状态。

4.8.1 判断是否跳过

下面是判断是否跳过 server 上面已经分析,其中 stats.isCircuitBreakerTripped 是判断的关键

// AvailabilityPredicate shouldSkipServer(ServerStats stats) { 	if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped())                  || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {         return true;     }     return false; } 
public boolean isCircuitBreakerTripped(long currentTime) {     // 获取故障的到期时间点     long circuitBreakerTimeout = getCircuitBreakerTimeout();     if (circuitBreakerTimeout <= 0) {         return false;     }     // 大于当前时间表示还在出于故障     return circuitBreakerTimeout > currentTime; }  private long getCircuitBreakerTimeout() {     long blackOutPeriod = getCircuitBreakerBlackoutPeriod();     if (blackOutPeriod <= 0) {         return 0;     }     // 上次失败的时间点 + 需要断路的时间长度     return lastConnectionFailedTimestamp + blackOutPeriod; }  private long getCircuitBreakerBlackoutPeriod() {     int failureCount = successiveConnectionFailureCount.get();     int threshold = connectionFailureThreshold.get();     // 小于阀值(默认3)即不断路     if (failureCount < threshold) {         return 0;     }     // diff 超过阀值的次数,最大16     int diff = (failureCount - threshold) > 16 ? 16 : (failureCount - threshold);     // blackOutSeconds 最大 2^16 * 基数时间     int blackOutSeconds = (1 << diff) * circuitTrippedTimeoutFactor.get();     // 再次进行限制,断路总时间不超过 maxCircuitTrippedTimeout.get     if (blackOutSeconds > maxCircuitTrippedTimeout.get()) {         blackOutSeconds = maxCircuitTrippedTimeout.get();     }     return blackOutSeconds * 1000L; } 

判断是否在断路不可用状态就这样,下面看一些状态是怎么进去的。

4.8.2 记录状态

LoadBalancerCommand 中有状态的记录

// 这里开始 loadBalancerContext.noteOpenConnection(stats);   @Override public void onCompleted() {      // 记录准确状态      recordStats(tracer, stats, entity, null); }  @Override public void onError(Throwable e) { 	 // 记录错误状态     recordStats(tracer, stats, null, e);     logger.debug("Got error {} when executed on server {}", e, server);     if (listenerInvoker != null) {         listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());     } }  @Override public void onNext(T entity) {     this.entity = entity;     if (listenerInvoker != null) {         listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());     } }                                                    private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {     tracer.stop();     // 这里介绍     loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler); }  // loadBalancerContext public void noteOpenConnection(ServerStats serverStats) {     if (serverStats == null) {         return;     }     try {         serverStats.incrementActiveRequestsCount();     } catch (Exception ex) {         logger.error("Error noting stats for client {}", clientName, ex);     }             }  // serverStats // 各种记录 public void incrementActiveRequestsCount() {             activeRequestsCount.incrementAndGet();     requestCountInWindow.increment();     long currentTime = System.currentTimeMillis();     lastActiveRequestsCountChangeTimestamp = currentTime;     lastAccessedTimestamp = currentTime;     if (firstConnectionTimestamp == 0) {         firstConnectionTimestamp = currentTime;     } }  // loadBalancerContext public void noteRequestCompletion(ServerStats stats, Object response, Throwable e, long responseTime, RetryHandler errorHandler) { 	if (stats == null) { 		return; 	}     try {         recordStats(stats, responseTime);         RetryHandler callErrorHandler = errorHandler == null ? getRetryHandler() : errorHandler;         if (callErrorHandler != null && response != null) {             // 没有错误时,清除连续错误标识             stats.clearSuccessiveConnectionFailureCount();         } else if (callErrorHandler != null && e != null) {             // 判断是否需要断路的exception             if (callErrorHandler.isCircuitTrippingException(e)) {                 // 有错误时开始连续错误计数                 stats.incrementSuccessiveConnectionFailureCount();                   // 增加错误数                                   stats.addToFailureCount();             } else {             	// 非断路错误时清除连续标识                 stats.clearSuccessiveConnectionFailureCount();             }         }     } catch (Exception ex) {         logger.error("Error noting stats for client {}", clientName, ex);     }             }  // 退场专用 private void recordStats(ServerStats stats, long responseTime) { 	if (stats == null) { 		return; 	} 	// 活动请求数减一     stats.decrementActiveRequestsCount();     // 增加请求统计     stats.incrementNumRequests();     // 记录响应时间,有些负载策略需要响应时间     stats.noteResponseTime(responseTime); } 

5. 回顾

5.1 调用路径

// 调用路径 1.HandleMapping -> 2.ZuulController -> 3.ZuulServlet.service() -> 4.RibbonRoutingFilter -> 5.HystrixCommand.execute() -> 6.AbstractRibbonCommand.run() -> 7.RibbonLoadBalancingHttpClient.executeWithLoadBalance() -> 8.LoadBalancerCommand.submit() -> 9.RibbonLoadBalancingHttpClient.execute() -> 10.HttpClient.execute()  

1-4 都比较容易,5是为了有熔断效果所以用 Hystrix 进行包装,实际的逻辑都是对应的 Command 完成,7是不同的 Command 持有一个对应的 Client,执行 executeWithLoadBalance() 为了达到负载均衡和重试的效果,这个效果就交给 8.LoadBalancerCommand 完成,但是 LoadBalancerCommand 也只负责重试和负载均衡,具体执行的远程 http 请求还是由 9 来完成,而每个 BalancingClient 都是持有个真实的 client, 如: HttpClient, OKHttp,由这些 client 执行。

5.2 分支逻辑

5.2.1 负载均衡

分析了怎么进行 selectServer 的过程,以及常用的 ILoadBalancer 类型,对应的 IRule 即真实挑选和负载轮询逻辑实现。

5.2.2 状态记录

负载轮询的挑选逻辑中使用到 Server 的状态,所以分析了状态的记录以及怎么判断是否在断路状态的主要逻辑。

5.3 总结

Zuul 的主要代码并不是很大,即请求进来然后进行 Filter 处理,路由到上游服务器都是 Ribbon 的逻辑。

本文发表于2018年04月28日 22:38
(c)注:本文转载自https://my.oschina.net/alexqdjay/blog/1802503,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 2531 讨论 0 喜欢 1

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1