聊聊flink的MetricQueryServiceGateway


声明:本文转载自https://my.oschina.net/go4it/blog/3023586,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

本文主要研究一下flink的MetricQueryServiceGateway

MetricQueryServiceGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java

public interface MetricQueryServiceGateway {

	CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(Time timeout);

	String getAddress();
}
  • MetricQueryServiceGateway定义了两个方法,一个是queryMetrics,一个是getAddress;它有一个实现类为AkkaQueryServiceGateway

AkkaQueryServiceGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java

public class AkkaQueryServiceGateway implements MetricQueryServiceGateway {

	private final ActorRef queryServiceActorRef;

	public AkkaQueryServiceGateway(ActorRef queryServiceActorRef) {
		this.queryServiceActorRef = Preconditions.checkNotNull(queryServiceActorRef);
	}

	@Override
	public CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(Time timeout) {
		return FutureUtils.toJava(
			Patterns.ask(queryServiceActorRef, MetricQueryService.getCreateDump(), timeout.toMilliseconds())
				.mapTo(ClassTag$.MODULE$.apply(MetricDumpSerialization.MetricSerializationResult.class))
		);
	}

	@Override
	public String getAddress() {
		return queryServiceActorRef.path().toString();
	}
}
  • AkkaQueryServiceGateway实现了MetricQueryServiceGateway接口,它的构造器要求传入queryServiceActorRef;queryMetrics方法ask的消息类型为MetricQueryService.CreateDump;getAddress方法返回的是queryServiceActorRef.path()

MetricQueryService

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java

public class MetricQueryService extends UntypedActor {
	private static final Logger LOG = LoggerFactory.getLogger(MetricQueryService.class);

	public static final String METRIC_QUERY_SERVICE_NAME = "MetricQueryService";
	private static final String SIZE_EXCEEDED_LOG_TEMPLATE =  "{} will not be reported as the metric dump would exceed the maximum size of {} bytes.";

	private static final CharacterFilter FILTER = new CharacterFilter() {
		@Override
		public String filterCharacters(String input) {
			return replaceInvalidChars(input);
		}
	};

	private final MetricDumpSerializer serializer = new MetricDumpSerializer();

	private final Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
	private final Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>();
	private final Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();
	private final Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>();

	private final long messageSizeLimit;

	//......

	@Override
	public void onReceive(Object message) {
		try {
			if (message instanceof AddMetric) {
				AddMetric added = (AddMetric) message;

				String metricName = added.metricName;
				Metric metric = added.metric;
				AbstractMetricGroup group = added.group;

				QueryScopeInfo info = group.getQueryServiceMetricInfo(FILTER);

				if (metric instanceof Counter) {
					counters.put((Counter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
				} else if (metric instanceof Gauge) {
					gauges.put((Gauge<?>) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
				} else if (metric instanceof Histogram) {
					histograms.put((Histogram) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
				} else if (metric instanceof Meter) {
					meters.put((Meter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
				}
			} else if (message instanceof RemoveMetric) {
				Metric metric = (((RemoveMetric) message).metric);
				if (metric instanceof Counter) {
					this.counters.remove(metric);
				} else if (metric instanceof Gauge) {
					this.gauges.remove(metric);
				} else if (metric instanceof Histogram) {
					this.histograms.remove(metric);
				} else if (metric instanceof Meter) {
					this.meters.remove(metric);
				}
			} else if (message instanceof CreateDump) {
				MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters);

				dump = enforceSizeLimit(dump);

				getSender().tell(dump, getSelf());
			} else {
				LOG.warn("MetricQueryServiceActor received an invalid message. " + message.toString());
				getSender().tell(new Status.Failure(new IOException("MetricQueryServiceActor received an invalid message. " + message.toString())), getSelf());
			}
		} catch (Exception e) {
			LOG.warn("An exception occurred while processing a message.", e);
		}
	}

	public static Object getCreateDump() {
		return CreateDump.INSTANCE;
	}

	private static class CreateDump implements Serializable {
		private static final CreateDump INSTANCE = new CreateDump();
	}
	//......
}
  • MetricQueryService继承了UntypedActor,它的onReceive方法判断message类型,如果为CreateDump的话,则调用MetricDumpSerialization.MetricDumpSerializer.serialize(counters, gauges, histograms, meters)方法来序列化metrics得到MetricDumpSerialization.MetricSerializationResult,然后使用getSender().tell(dump, getSelf())返回数据

MetricDumpSerialization

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java

public class MetricDumpSerialization {
	//......

	public static class MetricSerializationResult implements Serializable {

		private static final long serialVersionUID = 6928770855951536906L;

		public final byte[] serializedCounters;
		public final byte[] serializedGauges;
		public final byte[] serializedMeters;
		public final byte[] serializedHistograms;

		public final int numCounters;
		public final int numGauges;
		public final int numMeters;
		public final int numHistograms;

		public MetricSerializationResult(
			byte[] serializedCounters,
			byte[] serializedGauges,
			byte[] serializedMeters,
			byte[] serializedHistograms,
			int numCounters,
			int numGauges,
			int numMeters,
			int numHistograms) {

			Preconditions.checkNotNull(serializedCounters);
			Preconditions.checkNotNull(serializedGauges);
			Preconditions.checkNotNull(serializedMeters);
			Preconditions.checkNotNull(serializedHistograms);
			Preconditions.checkArgument(numCounters >= 0);
			Preconditions.checkArgument(numGauges >= 0);
			Preconditions.checkArgument(numMeters >= 0);
			Preconditions.checkArgument(numHistograms >= 0);
			this.serializedCounters = serializedCounters;
			this.serializedGauges = serializedGauges;
			this.serializedMeters = serializedMeters;
			this.serializedHistograms = serializedHistograms;
			this.numCounters = numCounters;
			this.numGauges = numGauges;
			this.numMeters = numMeters;
			this.numHistograms = numHistograms;
		}
	}

	public static class MetricDumpSerializer {

		private DataOutputSerializer countersBuffer = new DataOutputSerializer(1024 * 8);
		private DataOutputSerializer gaugesBuffer = new DataOutputSerializer(1024 * 8);
		private DataOutputSerializer metersBuffer = new DataOutputSerializer(1024 * 8);
		private DataOutputSerializer histogramsBuffer = new DataOutputSerializer(1024 * 8);

		public MetricSerializationResult serialize(
			Map<Counter, Tuple2<QueryScopeInfo, String>> counters,
			Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges,
			Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms,
			Map<Meter, Tuple2<QueryScopeInfo, String>> meters) {

			countersBuffer.clear();
			int numCounters = 0;
			for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) {
				try {
					serializeCounter(countersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
					numCounters++;
				} catch (Exception e) {
					LOG.debug("Failed to serialize counter.", e);
				}
			}

			gaugesBuffer.clear();
			int numGauges = 0;
			for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) {
				try {
					serializeGauge(gaugesBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
					numGauges++;
				} catch (Exception e) {
					LOG.debug("Failed to serialize gauge.", e);
				}
			}

			histogramsBuffer.clear();
			int numHistograms = 0;
			for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) {
				try {
					serializeHistogram(histogramsBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
					numHistograms++;
				} catch (Exception e) {
					LOG.debug("Failed to serialize histogram.", e);
				}
			}

			metersBuffer.clear();
			int numMeters = 0;
			for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) {
				try {
					serializeMeter(metersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
					numMeters++;
				} catch (Exception e) {
					LOG.debug("Failed to serialize meter.", e);
				}
			}

			return new MetricSerializationResult(
				countersBuffer.getCopyOfBuffer(),
				gaugesBuffer.getCopyOfBuffer(),
				metersBuffer.getCopyOfBuffer(),
				histogramsBuffer.getCopyOfBuffer(),
				numCounters,
				numGauges,
				numMeters,
				numHistograms);
		}

		public void close() {
			countersBuffer = null;
			gaugesBuffer = null;
			metersBuffer = null;
			histogramsBuffer = null;
		}
	}

	//......
}
  • MetricDumpSerialization有几个静态类分别是MetricSerializationResult、MetricDumpSerializer、MetricDumpDeserializer;MetricDumpSerializer提供了serialize方法用于将counters、gauges、histograms、meters指标序列化为MetricSerializationResult

小结

  • MetricQueryServiceGateway定义了两个方法,一个是queryMetrics,一个是getAddress;它有一个实现类为AkkaQueryServiceGateway
  • AkkaQueryServiceGateway实现了MetricQueryServiceGateway接口,它的构造器要求传入queryServiceActorRef;queryMetrics方法ask的消息类型为MetricQueryService.CreateDump;getAddress方法返回的是queryServiceActorRef.path()
  • MetricQueryService继承了UntypedActor,它的onReceive方法判断message类型,如果为CreateDump的话,则调用MetricDumpSerialization.MetricDumpSerializer.serialize(counters, gauges, histograms, meters)方法来序列化metrics得到MetricDumpSerialization.MetricSerializationResult,然后使用getSender().tell(dump, getSelf())返回数据;MetricDumpSerialization有几个静态类分别是MetricSerializationResult、MetricDumpSerializer、MetricDumpDeserializer;MetricDumpSerializer提供了serialize方法用于将counters、gauges、histograms、meters指标序列化为MetricSerializationResult

doc

本文发表于2019年03月17日 12:00
(c)注:本文转载自https://my.oschina.net/go4it/blog/3023586,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1817 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1