版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/77450039
注:本文是为了配合《Spark内核设计的艺术——架构设计与实现》一书的内容而编写,目的是为了节省成本、方便读者查阅。书中附录D的内容都在本文呈现。
Metrics是codahale提供的第三方度量仓库。Metrics作为一款监控指标的度量类库,可以为第三方库提供辅助统计信息,还可以将度量数据发送给Ganglia和Graphite以提供图形化的监控。
Metrics也采用了监听器模式,提供了Gauge、Counter、Meter、Histogram、Timer等度量工具类以及健康检查(HealthCheck)功能。想了解更多Metrics的内容,读者可以访问Metrics官网:http://metrics.dropwizard.io/3.2.2/
本文将对Metrics中的核心类进行介绍,方便读者对Spark度量系统更加细致深入的理解。
MetricRegistry
MetricRegistry是Metrics提供的度量容器,这里先列出MetricRegistry的主要结构。
public class MetricRegistry implements MetricSet {
private final ConcurrentMap<String, Metric> metrics;
private final List<MetricRegistryListener> listeners;
}
从上面代码可以看出MetricRegistry中会缓存各种度量和监听器,下面对MetricRegistry中的一些方法进行介绍。
1、name
功能描述:构建形如“字符串1.字符串2…字符串N-1.字符串N”这样的字符串。任何空值或空字符串都将被过滤。 public static String name(String name, String... names) {
final StringBuilder builder = new StringBuilder();
append(builder, name);
if (names != null) {
for (String s : names) {
append(builder, s);
}
}
return builder.toString();
}
2、notifyListenerOfAddedMetric
功能描述:当有新的Metric添加到ConcurrentMap<String, Metric> metrics时,调用此方法。根据Metric的子接口的不同,调用不同方法。例如:Gauge则调用监听器的onGaugeAdded;Counter则调用监听器的onCounterAdded;Histogram则调用监听器的onHistogramAdded。 private void notifyListenerOfAddedMetric(MetricRegistryListener listener, Metric metric, String name) {
if (metric instanceof Gauge) {
listener.onGaugeAdded(name, (Gauge<?>) metric);
} else if (metric instanceof Counter) {
listener.onCounterAdded(name, (Counter) metric);
} else if (metric instanceof Histogram) {
listener.onHistogramAdded(name, (Histogram) metric);
} else if (metric instanceof Meter) {
listener.onMeterAdded(name, (Meter) metric);
} else if (metric instanceof Timer) {
listener.onTimerAdded(name, (Timer) metric);
} else {
throw new IllegalArgumentException("Unknown metric type: " + metric.getClass());
}
}
3、onMetricAdded
功能描述:当有新的Metric添加到ConcurrentMap<String, Metric> metrics时,调用此方法。遍历调用监听器缓存List<MetricRegistryListener> listeners中的所有监听器,调用notifyListenerOfAddedMetric。 private void onMetricAdded(String name, Metric metric) {
for (MetricRegistryListener listener : listeners) {
notifyListenerOfAddedMetric(listener, metric, name);
}
}
4、register
功能描述:如果metric的类型是Metric并且metrics中还没有此metric,则将它添加到metrics;
如果Metric的类型是MetricSet,则MetricSet中包含的所有新的Metric添加到缓存ConcurrentMap<String, Metric> metrics;以上添加过程都伴随onMetricAdded的调用。 public <T extends Metric> T register(String name, T metric) throws IllegalArgumentException {
if (metric instanceof MetricSet) {
registerAll(name, (MetricSet) metric);
} else {
final Metric existing = metrics.putIfAbsent(name, metric);
if (existing == null) {
onMetricAdded(name, metric);
} else {
throw new IllegalArgumentException("A metric named " + name + " already exists");
}
}
return metric;
}
private void registerAll(String prefix, MetricSet metrics) throws IllegalArgumentException {
for (Map.Entry<String, Metric> entry : metrics.getMetrics().entrySet()) {
if (entry.getValue() instanceof MetricSet) {
registerAll(name(prefix, entry.getKey()), (MetricSet) entry.getValue());
} else {
register(name(prefix, entry.getKey()), entry.getValue());
}
}
}
Gauge
Gauge是Metrics提供的用于估计度量值的特质,其实现如下:public interface Gauge<T> extends Metric {
/**
* Returns the metric's current value.
*
* @return the metric's current value
*/
T getValue();
}
Slf4jReporter
Slf4jReporter 是Metrics提供的使用实现了Slf4j接口的实现类的方法,将度量输出到日志的类。1、report
功能描述:将度量输出到日志的方法。 @Override
public void report(SortedMap<String, Gauge> gauges,
SortedMap<String, Counter> counters,
SortedMap<String, Histogram> histograms,
SortedMap<String, Meter> meters,
SortedMap<String, Timer> timers) {
if (loggerProxy.isEnabled(marker)) {
for (Entry<String, Gauge> entry : gauges.entrySet()) {
logGauge(entry.getKey(), entry.getValue());
}
for (Entry<String, Counter> entry : counters.entrySet()) {
logCounter(entry.getKey(), entry.getValue());
}
for (Entry<String, Histogram> entry : histograms.entrySet()) {
logHistogram(entry.getKey(), entry.getValue());
}
for (Entry<String, Meter> entry : meters.entrySet()) {
logMeter(entry.getKey(), entry.getValue());
}
for (Entry<String, Timer> entry : timers.entrySet()) {
logTimer(entry.getKey(), entry.getValue());
}
}
}
2、logGauge
功能描述:将估计度量输出到日志的方法。(备注: Slf4jReporter的 report 方法中分别对 Gauge 、 Counter 、 Histogram 、 Meter 及 Timer 进行输出,为说明问题,附录 D 只挑选了对 Gauge 的输出作为介绍,其它种类度量的输出,读者可查阅相关文档或者阅读 Metrics 源码。) private void logGauge(String name, Gauge gauge) {
loggerProxy.log(marker, "type=GAUGE, name={}, value={}", prefix(name), gauge.getValue());
}
3、LoggerProxy
Slf4jReporter的日志输出依赖于LoggerProxy,根据LoggerProxy的类名,我们知道这是一个有关日志输出的代理类,其实现如下: /* private class to allow logger configuration */
static abstract class LoggerProxy {
protected final Logger logger;
public LoggerProxy(Logger logger) {
this.logger = logger;
}
abstract void log(Marker marker, String format, Object... arguments);
abstract boolean isEnabled(Marker marker);
}
可以看到LoggerProxy实际上不过是代理了org.slf4j.Logger接口对日志输出。
ScheduledReporter
ScheduledReporter是ConsoleReporter、CsvReporter、Slf4jReporter及GraphiteReporter的共同父类,ScheduledReporter中的很多方法被子类所共用。
1、start
功能描述:启动度量输出工作,实质为定时器不断地调用report方法输出。 public void start(long period, TimeUnit unit) {
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
report();
} catch (RuntimeException ex) {
LOG.error("RuntimeException thrown from {}#report. Exception was suppressed.", ScheduledReporter.this.getClass().getSimpleName(), ex);
}
}
}, period, period, unit);
}
2、stop
功能描述:停止度量输出工作,实质为停止定时器。 public void stop() {
executor.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
executor.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
System.err.println(getClass().getSimpleName() + ": ScheduledExecutorService did not terminate");
}
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
executor.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
关于《Spark内核设计的艺术 架构设计与实现》
经过近一年的准备,《 Spark内核设计的艺术 架构设计与实现 》一书现已出版发行,图书如图:
电子版售卖链接如下: