实现.Net程序中OpenTracing采样和上报配置的自动更新
前言
OpenTracing是一个链路跟踪的开放协议,已经有开源的.net实现:opentracing-csharp,同时支持.net framework和.net core,Github地址:https://github.com/opentracing/opentracing-csharp。
这个库支持多种链路跟踪模式,不过仅提供了最基础的功能,想用在实际项目中还需要做很多增强,还好也有人做了开源项目:opentracing-contrib,Github地址:https://github.com/opentracing-contrib/csharp-netcore。
opentracing-contrib中集成了一个名为Jaeger的类库,这个库实现了链路跟踪数据的采样和上报,支持将数据上传到Jaeger进行分析统计。
为了同时保障性能和跟踪关键数据,能够远程调整采样率是很重要的,Jaeger本身也提供了远程配置采样率的支持。
不过我这里用的阿里云链路跟踪不支持,配置的设计也和想要的不同,所以自己做了一个采样和上报配置的动态更新,也才有了这篇文章。
思路
使用Jaeger初始化Tracer大概是这样的:
var tracer = new Tracer.Builder(serviceName)
.WithSampler(sampler)
.WithReporter(reporter)
.Build();
GlobalTracer.Register(tracer);
首先是提供当前服务的名字,然后需要提供一个采样器,再提供一个上报器,Build下生成ITracer的一个实例,最后注册到全局。
可以分析得出,采样和上报配置的更新就是更新采样器和上报器。
不过Tracer并没有提供UpdateSampler和UdapteReporter的方法,被卡住了,怎么办呢?
前文提到Jaeger是支持采样率的动态调整的,看看它怎么做的:
private RemoteControlledSampler(Builder builder)
{
...
_pollTimer = new Timer(_ => UpdateSampler(), null, TimeSpan.Zero, builder.PollingInterval);
}
/// <summary>
/// Updates <see cref="Sampler"/> to a new sampler when it is different.
/// </summary>
internal void UpdateSampler()
{
try
{
SamplingStrategyResponse response = _samplingManager.GetSamplingStrategyAsync(_serviceName)
.ConfigureAwait(false).GetAwaiter().GetResult();
...
UpdateRateLimitingOrProbabilisticSampler(response);
}
catch (Exception ex)
{
...
}
}
private void UpdateRateLimitingOrProbabilisticSampler(SamplingStrategyResponse response)
{
...
lock (_lock)
{
if (!Sampler.Equals(sampler))
{
Sampler.Close();
Sampler = sampler;
...
}
}
}
这里只留下关键代码,可以看到核心就是:通过一个Timer定时获取采样策略,然后替换原来的Sampler。
这是一个很好理解的办法,下边就按照这个思路来搞。
方案
分别提供一个可更新的Sampler和可更新的Reporter,Build Tracer时使用这两个可更新的类。这里延续开源项目中Samper和Reporter的创建方式,给出这两个类。
可更新的Sampler:
internal class UpdatableSampler : ValueObject, ISampler
{
public const string Type = "updatable";
private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
private readonly string _serviceName;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger;
private readonly IMetrics _metrics;
internal ISampler Sampler { get; private set; }
private UpdatableSampler(Builder builder)
{
_serviceName = builder.ServiceName;
_loggerFactory = builder.LoggerFactory;
_logger = _loggerFactory.CreateLogger<UpdatableSampler>();
_metrics = builder.Metrics;
Sampler = builder.InitialSampler;
}
/// <summary>
/// Updates <see cref="Sampler"/> to a new sampler when it is different.
/// </summary>
public void UpdateSampler(ISampler sampler)
{
try
{
_lock.EnterWriteLock();
if (!Sampler.Equals(sampler))
{
Sampler.Close();
Sampler = sampler;
_metrics.SamplerUpdated.Inc(1);
}
}
catch (System.Exception ex)
{
_logger.LogWarning(ex, "Updating sampler failed");
_metrics.SamplerQueryFailure.Inc(1);
}
finally
{
_lock.ExitWriteLock();
}
}
public SamplingStatus Sample(string operation, TraceId id)
{
try
{
_lock.EnterReadLock();
var status= Sampler.Sample(operation, id);
return status;
}
finally
{
_lock.ExitReadLock();
}
}
public override string ToString()
{
try
{
_lock.EnterReadLock();
return $"{nameof(UpdatableSampler)}(Sampler={Sampler})";
}
finally
{
_lock.ExitReadLock();
}
}
public void Close()
{
try
{
_lock.EnterWriteLock();
Sampler.Close();
}
finally
{
_lock.ExitWriteLock();
}
}
protected override IEnumerable<object> GetAtomicValues()
{
yield return Sampler;
}
public sealed class Builder
{
internal string ServiceName { get; }
internal ILoggerFactory LoggerFactory { get; private set; }
internal ISampler InitialSampler { get; private set; }
internal IMetrics Metrics { get; private set; }
public Builder(string serviceName)
{
ServiceName = serviceName ?? throw new ArgumentNullException(nameof(serviceName));
}
public Builder WithLoggerFactory(ILoggerFactory loggerFactory)
{
LoggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
return this;
}
public Builder WithInitialSampler(ISampler initialSampler)
{
InitialSampler = initialSampler ?? throw new ArgumentNullException(nameof(initialSampler));
return this;
}
public Builder WithMetrics(IMetrics metrics)
{
Metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
return this;
}
public UpdatableSampler Build()
{
if (LoggerFactory == null)
{
LoggerFactory = NullLoggerFactory.Instance;
}
if (InitialSampler == null)
{
InitialSampler = new ProbabilisticSampler();
}
if (Metrics == null)
{
Metrics = new MetricsImpl(NoopMetricsFactory.Instance);
}
return new UpdatableSampler(this);
}
}
}
可更新的Reporter:
internal class UpdatableReporter : IReporter
{
public const string Type = "updatable";
private readonly string _serviceName;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger;
private readonly IMetrics _metrics;
private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
internal IReporter Reporter { get; private set; }
private UpdatableReporter(Builder builder)
{
_serviceName = builder.ServiceName;
_loggerFactory = builder.LoggerFactory;
_logger = _loggerFactory.CreateLogger<UpdatableReporter>();
_metrics = builder.Metrics;
Reporter = builder.InitialReporter;
}
/// <summary>
/// Updates <see cref="Reporter"/> to a new reporter when it is different.
/// </summary>
public void UpdateReporter(IReporter reporter)
{
try
{
_lock.EnterWriteLock();
if (!Reporter.Equals(reporter))
{
Reporter.CloseAsync(CancellationToken.None).ConfigureAwait(false).GetAwaiter().GetResult();
Reporter = reporter;
_metrics.SamplerUpdated.Inc(1);
}
}
catch (System.Exception ex)
{
_logger.LogWarning(ex, "Updating reporter failed");
_metrics.ReporterFailure.Inc(1);
}
finally
{
_lock.ExitWriteLock();
}
}
public void Report(Span span)
{
try
{
_lock.EnterReadLock();
Reporter.Report(span);
}
finally
{
_lock.ExitReadLock();
}
}
public override string ToString()
{
try
{
_lock.EnterReadLock();
return $"{nameof(UpdatableReporter)}(Reporter={Reporter})";
}
finally
{
_lock.ExitReadLock();
}
}
public async Task CloseAsync(CancellationToken cancellationToken)
{
try
{
_lock.EnterWriteLock();
await Reporter.CloseAsync(cancellationToken);
}
finally
{
_lock.ExitWriteLock();
}
}
public sealed class Builder
{
internal string ServiceName { get; }
internal ILoggerFactory LoggerFactory { get; private set; }
internal IReporter InitialReporter { get; private set; }
internal IMetrics Metrics { get; private set; }
public Builder(string serviceName)
{
ServiceName = serviceName ?? throw new ArgumentNullException(nameof(serviceName));
}
public Builder WithLoggerFactory(ILoggerFactory loggerFactory)
{
LoggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
return this;
}
public Builder WithInitialReporter(IReporter initialReporter)
{
InitialReporter = initialReporter ?? throw new ArgumentNullException(nameof(initialReporter));
return this;
}
public Builder WithMetrics(IMetrics metrics)
{
Metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
return this;
}
public UpdatableReporter Build()
{
if (LoggerFactory == null)
{
LoggerFactory = NullLoggerFactory.Instance;
}
if (InitialReporter == null)
{
InitialReporter = new NoopReporter();
}
if (Metrics == null)
{
Metrics = new MetricsImpl(NoopMetricsFactory.Instance);
}
return new UpdatableReporter(this);
}
}
}
注意这里边用到了读写锁,因为要做到不停止服务的更新,而且大部分情况下都是读,使用lock就有点大柴小用了。
现在初始化Tracer大概是这样的:
1
2
3
4
5
6
7
8
9
10
11
12
sampler = new UpdatableSampler.Builder(serviceName)
.WithInitialSampler(BuildSampler(configuration))
.Build();
reporter = new UpdatableReporter.Builder(serviceName)
.WithInitialReporter(BuildReporter(configuration))
.Build();
var tracer = new Tracer.Builder(serviceName)
.WithSampler(sampler)
.WithReporter(reporter)
.Build();
当配置发生改变时,调用sampler和reporter的更新方法:
private void OnTracingConfigurationChanged(TracingConfiguration newConfiguration, TracingConfigurationChangedInfo changedInfo)
{
...
((UpdatableReporter)_reporter).UpdateReporter(BuildReporter(newConfiguration));
((UpdatableSampler)_sampler).UpdateSampler(BuildSampler(newConfiguration));
...
}
这里就不写如何监听配置的改变了,使用Timer或者阻塞查询等等都可以。
后记
opentracing-contrib这个项目只支持.net core,如果想用在.net framwork中还需要自己搞,这个方法会单独写一篇文章,这里就不做介绍了。