Java分布式跟踪系统Zipkin(四):Brave源码分析-HttpTracing


声明:本文转载自https://my.oschina.net/mozhu/blog/1585185,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

上一篇博文中,我们分析了Tracing的相关源代码,这一篇我们来看看Brave是如何在Web项目中使用的

我们先来看看普通的servlet项目中,如何使用Brave,这对我们后面分析和理解Brave和SpringMVC等框架整合有帮助

首先Chapter1/servlet25项目中配置了FrontServlet和BackendServlet以及TracingFilter

web.xml

<web-app xmlns="http://java.sun.com/xml/ns/javaee"     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"     xsi:schemaLocation="http://java.sun.com/xml/ns/javaee 	http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"     version="2.5">    <display-name>Servlet2.5 Application</display-name>    <filter>     <filter-name>TracingFilter</filter-name>     <filter-class>org.mozhu.zipkin.filter.BraveTracingFilter</filter-class>   </filter>   <filter-mapping>     <filter-name>TracingFilter</filter-name>     <url-pattern>/*</url-pattern>   </filter-mapping>    <servlet>     <servlet-name>BackendServlet</servlet-name>     <servlet-class>org.mozhu.zipkin.servlet.BackendServlet</servlet-class>     <load-on-startup>1</load-on-startup>   </servlet>   <servlet-mapping>     <servlet-name>BackendServlet</servlet-name>     <url-pattern>/api</url-pattern>   </servlet-mapping>    <servlet>     <servlet-name>FrontendServlet</servlet-name>     <servlet-class>org.mozhu.zipkin.servlet.FrontendServlet</servlet-class>     <load-on-startup>1</load-on-startup>   </servlet>   <servlet-mapping>     <servlet-name>FrontendServlet</servlet-name>     <url-pattern>/</url-pattern>   </servlet-mapping> </web-app> 

TracingFilter

我们使用自定义的BraveTracingFilter作为入口,其init方法中,我们初始化了Tracing,然后创建HttpTracing对象,最后调用TracingFilter.create(httpTracing)创建了tracingFilter。 doFilter方法中,所有请求将被tracingFilter来处理

BraveTracingFilter

package org.mozhu.zipkin.filter;  import brave.Tracing; import brave.context.log4j2.ThreadContextCurrentTraceContext; import brave.http.HttpTracing; import brave.propagation.B3Propagation; import brave.propagation.ExtraFieldPropagation; import brave.servlet.TracingFilter; import zipkin2.codec.SpanBytesEncoder; import zipkin2.reporter.AsyncReporter; import zipkin2.reporter.Sender; import zipkin2.reporter.okhttp3.OkHttpSender;  import javax.servlet.*; import java.io.IOException; import java.util.concurrent.TimeUnit;  public class BraveTracingFilter implements Filter {     Filter tracingFilter;      @Override     public void init(FilterConfig filterConfig) throws ServletException {         Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans");         AsyncReporter asyncReporter = AsyncReporter.builder(sender)                 .closeTimeout(500, TimeUnit.MILLISECONDS)                 .build(SpanBytesEncoder.JSON_V2);          Tracing tracing = Tracing.newBuilder()                 .localServiceName(System.getProperty("zipkin.service", "servlet25-demo"))                 .spanReporter(asyncReporter)                 .propagationFactory(ExtraFieldPropagation.newFactory(B3Propagation.FACTORY, "user-name"))                 .currentTraceContext(ThreadContextCurrentTraceContext.create())                 .build();          HttpTracing httpTracing = HttpTracing.create(tracing);         filterConfig.getServletContext().setAttribute("TRACING", httpTracing);         tracingFilter = TracingFilter.create(httpTracing);         tracingFilter.init(filterConfig);     }      @Override     public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {         tracingFilter.doFilter(servletRequest, servletResponse, filterChain);     }      @Override     public void destroy() {         tracingFilter.destroy();     }  } 

TracingFilter

TracingFilter在brave-instrumentation-servlet包中

public final class TracingFilter implements Filter {   public static Filter create(Tracing tracing) {     return new TracingFilter(HttpTracing.create(tracing));   }    public static Filter create(HttpTracing httpTracing) {     return new TracingFilter(httpTracing);   }    final ServletRuntime servlet = ServletRuntime.get();   final Tracer tracer;   final HttpServerHandler<HttpServletRequest, HttpServletResponse> handler;   final TraceContext.Extractor<HttpServletRequest> extractor;    TracingFilter(HttpTracing httpTracing) {     tracer = httpTracing.tracing().tracer();     handler = HttpServerHandler.create(httpTracing, new HttpServletAdapter());     extractor = httpTracing.tracing().propagation().extractor(HttpServletRequest::getHeader);   } } 

TracingFilter中几个重要的类

  • HttpTracing - 包含Http处理相关的组件,clientParser,serverParser,clientSampler,serverSampler
  • ServletRuntime - Servlet运行时类,包含根据环境来判断是否支持Servlet3异步调用等方法
  • HttpServerHandler - Http处理的核心组件,基本上所有和trace相关的操作均在此类中完成
  • HttpServletAdapter - HttpServlet的适配器接口,此类的引入可以让httpServerHandler类变得更为通用,因为它是一个泛型接口,跟具体的request和response无关,能和更多框架进行整合
  • TraceContext.Extractor - TraceContext的数据提取器

doFilter方法

@Override   public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)       throws IOException, ServletException {     HttpServletRequest httpRequest = (HttpServletRequest) request;     HttpServletResponse httpResponse = servlet.httpResponse(response);      Span span = handler.handleReceive(extractor, httpRequest);     Throwable error = null;     try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {       chain.doFilter(httpRequest, httpResponse); // any downstream filters see Tracer.currentSpan     } catch (IOException | ServletException | RuntimeException | Error e) {       error = e;       throw e;     } finally {       if (servlet.isAsync(httpRequest)) { // we don't have the actual response, handle later         servlet.handleAsync(handler, httpRequest, span);       } else { // we have a synchronous response, so we can finish the span         handler.handleSend(httpResponse, error, span);       }     }   } 
  • 首先调用handler.handleReceive(extractor, httpRequest)从request中提取Span信息
  • 然后调用tracer.withSpanInScope(span)将Span包装成Tracer.SpanInScope,而Tracer.SpanInScope和前面博文中分析的CurrentTraceContext.Scope比较像,都实现了Closeable接口,这里的目的也一样,都是为了利用JDK7的try-with-resources的特性,JVM会自动调用close方法,做一些线程对象的清理工作。其区别是后者是SPI(Service Provider Interface),不适合暴露给真正的使用者。 这样使得chain.doFilter(httpRequest, httpResponse)里的代码能用Tracer.currentSpan拿到从请求中提取(extract)的Span信息。
  • 最后调用handler.handleSend(httpResponse, error, span)

下面来仔细分析下handler中handleReceive和handleSend两个方法 handleReceive方法

  public Span handleReceive(TraceContext.Extractor<Req> extractor, Req request) {     return handleReceive(extractor, request, request);   }    public <C> Span handleReceive(TraceContext.Extractor<C> extractor, C carrier, Req request) {     Span span = nextSpan(extractor.extract(carrier), request);     if (span.isNoop()) return span;      // all of the parsing here occur before a timestamp is recorded on the span     span.kind(Span.Kind.SERVER);      // Ensure user-code can read the current trace context     Tracer.SpanInScope ws = tracer.withSpanInScope(span);     try {       parser.request(adapter, request, span);     } finally {       ws.close();     }      boolean parsedEndpoint = false;     if (Platform.get().zipkinV1Present()) {       zipkin.Endpoint.Builder deprecatedEndpoint = zipkin.Endpoint.builder().serviceName("");       if ((parsedEndpoint = adapter.parseClientAddress(request, deprecatedEndpoint))) {         span.remoteEndpoint(deprecatedEndpoint.build());       }     }     if (!parsedEndpoint) {       Endpoint.Builder remoteEndpoint = Endpoint.newBuilder();       if (adapter.parseClientAddress(request, remoteEndpoint)) {         span.remoteEndpoint(remoteEndpoint.build());       }     }     return span.start();   } 
  • 首先调用nextSpan(extractor.extract(carrier), request)从request中提取TraceContextOrSamplingFlags,并创建Span,并将Span的kind类型设置为SERVER
  • 然后调用parser.request(adapter, request, span),将request的内容,将span的name改为request的method即GET或者POST,而且会将当前请求的路径以Tag(http.path)写入Span中,这样我们就能在Zipkin的UI界面中能清晰的看出某个Span是发起了什么请求。
  • 最后为Span设置Endpoint信息,并调用start设置开始时间

handleSend方法

  public void handleSend(@Nullable Resp response, @Nullable Throwable error, Span span) {     if (span.isNoop()) return;      // Ensure user-code can read the current trace context     Tracer.SpanInScope ws = tracer.withSpanInScope(span);     try {       parser.response(adapter, response, error, span);     } finally {       ws.close();       span.finish();     }   } 

handleSend比较简单,调用parser.response(adapter, response, error, span),会将HTTP状态码写入Span的Tag(http.status_code)中,如果有出错,则会将错误信息写入Tag(error)中 最后会调用Span的finish方法,而finish方法中,会调用Reporter的report方法将Span信息上报到Zipkin。

接着看下nextSpan方法

  Span nextSpan(TraceContextOrSamplingFlags extracted, Req request) {     if (extracted.sampled() == null) { // Otherwise, try to make a new decision       extracted = extracted.sampled(sampler.trySample(adapter, request));     }     return extracted.context() != null         ? tracer.joinSpan(extracted.context())         : tracer.nextSpan(extracted);   } 

从请求里提取的对象extracted(TraceContextOrSamplingFlags),如果没有sampled信息,则由HttpSampler的trySample方法来决定是否采样 如果extracted中含有TraceContext信息,则由tracer调用joinSpan,加入已存在的trace,这种情况一般是客户端代码使用将trace信息放入header,而服务端收到请求后,则自动加入客户端发起的trace中,所以当backend的请求运行到这段代码,会joinSpan 如果extracted中不含TraceContext信息,则由tracer调用nextSpan,这种情况一般是我们用户发起的请求,比如浏览器发起,则请求header中肯定是没有trace信息的,所以当frontend的请求运行到这段代码,会新建一个span

joinSpan方法

  public final Span joinSpan(TraceContext context) {     if (context == null) throw new NullPointerException("context == null");     if (!supportsJoin) return newChild(context);     // If we are joining a trace, we are sharing IDs with the caller     // If the sampled flag was left unset, we need to make the decision here     TraceContext.Builder builder = context.toBuilder();     if (context.sampled() == null) {       builder.sampled(sampler.isSampled(context.traceId()));     } else {       builder.shared(true);     }     return toSpan(builder.build());   }    public Span newChild(TraceContext parent) {     if (parent == null) throw new NullPointerException("parent == null");     return nextSpan(TraceContextOrSamplingFlags.create(parent));   } 

在joinSpan方法中,会共享调用方的traceId,如果调用者没有传入sampled信息,则由服务端自己决定是否采样,即sampler.isSampled(context.traceId())

nextSpan方法

  public Span nextSpan(TraceContextOrSamplingFlags extracted) {     TraceContext parent = extracted.context();     if (extracted.samplingFlags() != null) {       TraceContext implicitParent = currentTraceContext.get();       if (implicitParent == null) {         return toSpan(newRootContext(extracted.samplingFlags(), extracted.extra()));       }       // fall through, with an implicit parent, not an extracted one       parent = appendExtra(implicitParent, extracted.extra());     }     long nextId = Platform.get().randomLong();     if (parent != null) {       return toSpan(parent.toBuilder() // copies "extra" from the parent           .spanId(nextId)           .parentId(parent.spanId())           .shared(false)           .build());     }     TraceIdContext traceIdContext = extracted.traceIdContext();     if (extracted.traceIdContext() != null) {       Boolean sampled = traceIdContext.sampled();       if (sampled == null) sampled = sampler.isSampled(traceIdContext.traceId());       return toSpan(TraceContext.newBuilder()           .sampled(sampled)           .debug(traceIdContext.debug())           .traceIdHigh(traceIdContext.traceIdHigh()).traceId(traceIdContext.traceId())           .spanId(nextId)           .extra(extracted.extra()).build());     }     // TraceContextOrSamplingFlags is a union of 3 types, we've checked all three     throw new AssertionError("should not reach here");   } 

在nextSpan方法中,首先找出合适的parent,当parent存在时,则新建一个child Span,否则返回new Span

到这里服务端接受到请求后,是如何记录Span信息的代码已经分析完毕,接下来我们看看作为客户端,我们是如何上报Span信息

FrontServlet

首先我们看到FrontServet中init方法里,我们初始化了OkHttpClient,并将TracingInterceptor拦截器添加到OkHttpClient的NetworkInterceptor拦截器栈中,然后还用CurrentTraceContext中的ExecutorService的包装方法,将Dispatcher中的ExecutorService包装后设置到OkHttpClient中。

package org.mozhu.zipkin.servlet;  import brave.http.HttpTracing; import brave.okhttp3.TracingInterceptor; import okhttp3.Dispatcher; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory;  import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.PrintWriter;  public class FrontendServlet extends HttpServlet {      private final static Logger LOGGER = LoggerFactory.getLogger(FrontendServlet.class);      private OkHttpClient client;      @Override     public void init(ServletConfig config) throws ServletException {         super.init(config);         HttpTracing httpTracing = (HttpTracing) config.getServletContext().getAttribute("TRACING");         client = new OkHttpClient.Builder()                 .dispatcher(new Dispatcher(                         httpTracing.tracing().currentTraceContext()                                 .executorService(new Dispatcher().executorService())                 ))                 .addNetworkInterceptor(TracingInterceptor.create(httpTracing))                 .build();     }      @Override     protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {         LOGGER.info("frontend receive request");         Request request = new Request.Builder()                 .url("http://localhost:9000/api")                 .build();          Response response = client.newCall(request).execute();         if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);          PrintWriter writer = resp.getWriter();         writer.write(response.body().string());         writer.flush();         writer.close();     }  } 
public final class TracingInterceptor implements Interceptor {   // ...    final Tracer tracer;   final String remoteServiceName;   final HttpClientHandler<Request, Response> handler;   final TraceContext.Injector<Request.Builder> injector;    TracingInterceptor(HttpTracing httpTracing) {     if (httpTracing == null) throw new NullPointerException("HttpTracing == null");     tracer = httpTracing.tracing().tracer();     remoteServiceName = httpTracing.serverName();     handler = HttpClientHandler.create(httpTracing, new HttpAdapter());     injector = httpTracing.tracing().propagation().injector(SETTER);   } } 

TracingInterceptor中依赖Tracer,TraceContext.Injector,HttpClientHandler,HttpAdapter。

  • TraceContext.Injector - 将Trace信息注入到HTTP Request中,即放到Http headers中
  • HttpClientHandler - 和HttpServerHandler对应,也是Http处理的核心组件,基本上所有和trace相关的操作均在此类中完成
  • HttpAdapter - 能从Http request中获得各种数据,比如method,请求Path,header值等
  @Override public Response intercept(Chain chain) throws IOException {     Request request = chain.request();     Request.Builder requestBuilder = request.newBuilder();      Span span = handler.handleSend(injector, requestBuilder, request);     parseServerAddress(chain.connection(), span);     Response response = null;     Throwable error = null;     try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {       return response = chain.proceed(requestBuilder.build());     } catch (IOException | RuntimeException | Error e) {       error = e;       throw e;     } finally {       handler.handleReceive(response, error, span);     }   } 

这里代码和TracingFilter中doFilter比较相似,是一个相反的过程

  • 首先将trace信息注入到request中,并创建Span对象
  • 然后调用chain.proceed(requestBuilder.build())来执行发送http请求
  • 最后handler.handleReceive(response, error, span)

接下来看看HttpClientHandler的handleSend方法和handleReceive方法 handleSend方法

  public Span handleSend(TraceContext.Injector<Req> injector, Req request, Span span) {     return handleSend(injector, request, request, span);   }    public <C> Span handleSend(TraceContext.Injector<C> injector, C carrier, Req request, Span span) {     injector.inject(span.context(), carrier);     if (span.isNoop()) return span;      // all of the parsing here occur before a timestamp is recorded on the span     span.kind(Span.Kind.CLIENT);      // Ensure user-code can read the current trace context     Tracer.SpanInScope ws = tracer.withSpanInScope(span);     try {       parser.request(adapter, request, span);     } finally {       ws.close();     }      boolean parsedEndpoint = false;     if (Platform.get().zipkinV1Present()) {       zipkin.Endpoint.Builder deprecatedEndpoint = zipkin.Endpoint.builder()           .serviceName(serverNameSet ? serverName : "");       if ((parsedEndpoint = adapter.parseServerAddress(request, deprecatedEndpoint))) {         span.remoteEndpoint(deprecatedEndpoint.serviceName(serverName).build());       }     }     if (!parsedEndpoint) {       Endpoint.Builder remoteEndpoint = Endpoint.newBuilder().serviceName(serverName);       if (adapter.parseServerAddress(request, remoteEndpoint) || serverNameSet) {         span.remoteEndpoint(remoteEndpoint.build());       }     }     return span.start();   } 
  • 首先调用injector.inject(span.context(), carrier)将Trace信息注入request中,并将Span的kind类型设置为CLIENT
  • 然后调用parser.request(adapter, request, span),将request的内容,将span的name改为request的method即GET或者POST,而且会将当前请求的路径以Tag(http.path)写入Span中,这样我们就能在Zipkin的UI界面中能清晰的看出某个Span是发起了什么请求。
  • 最后为Span设置Endpoint信息,并调用start设置开始时间

handleReceive方法

  public void handleReceive(@Nullable Resp response, @Nullable Throwable error, Span span) {     if (span.isNoop()) return;     Tracer.SpanInScope ws = tracer.withSpanInScope(span);     try {       parser.response(adapter, response, error, span);     } finally {       ws.close();       span.finish();     }   } 

handleReceive比较简单,当客户端收到服务端的响应后handleReceive方法会被调用,即调用parser.response(adapter, response, error, span),会将HTTP状态码写入Span的Tag(http.status_code)中,如果有出错,则会将错误信息写入Tag(error)中 最后会调用Span的finish方法,而finish方法中,会调用Reporter的report方法将Span信息上报到Zipkin。

BackendServlet

最后看看BackendServlet,在收到请求后,将请求的header中参数user-name取出,添加到时间戳字符串尾部,并返回。 在上一篇博文中,我们看到如果我们向Frontend发送的请求中带有header user-name参数,Frontend会将这个值传递给Backend,然后backend会将它放到响应字符串中返回,以表明接收到该header。

package org.mozhu.zipkin.servlet;  import org.slf4j.Logger; import org.slf4j.LoggerFactory;  import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.PrintWriter; import java.util.Date;  public class BackendServlet extends HttpServlet {      private final static Logger LOGGER = LoggerFactory.getLogger(BackendServlet.class);      @Override     protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {         LOGGER.info("backend receive request");         String username = req.getHeader("user-name");         String result;         if (username != null) {             result = new Date().toString() + " " + username;         } else {             result = new Date().toString();         }         PrintWriter writer = resp.getWriter();         writer.write(result);         writer.flush();         writer.close();     }  } 

至此,我们已经分析完Brave是如何在普通的web项目中使用的,分析了TracingFilter拦截请求处理请求的逻辑,也分析了OkHttpClient是如何将Trace信息放入request中的。 后面博文中,我们还会继续分析Brave和Spring Web项目的整合方法。

本文发表于2017年12月05日 18:33
(c)注:本文转载自https://my.oschina.net/mozhu/blog/1585185,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 2298 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1