异步写入
对于异步写入有两个细节点:
1.该数据从RabbtiMQ订阅消费写入到Elasticsearch,从下面代码可以看出,我刻意以月的维度建立Index,格式为 userviewrecord-2021-12,这么做的目的是为了方便管理Index和资源利用,有需要的情况下会删除旧的Index。
2.消息订阅与WebAPI暂时集成到同一个进程,这样做主要是开发、部署都方便,如果后续订阅多了,在把消息订阅相关的业务抽离到独立的进程。
按需演变,避免过度设计
订阅消费逻辑
public class UserViewDurationConsumer : BaseConsumer<UserViewDurationMessage> { private readonly ElasticClient _elasticClient; public UserViewDurationConsumer(ElasticClient elasticClient) { _elasticClient = elasticClient; } public override void Excute(UserViewDurationMessage msg) { var document = msg.MapTo<Entity.UserViewDuration>(); var result = _elasticClient.Create(document, a => a.Index(typeof(Entity.UserViewDuration).GetRelationName() + "-" + msg.CreateDateTime.ToString("yyyy-MM"))).GetApiResult(); if (result.Failed) LoggerHelper.WriteToFile(result.Message); } } /// <summary> /// 订阅消费 /// </summary> public static class ConsumerExtension { public static IApplicationBuilder UseSubscribe<T, TConsumer>(this IApplicationBuilder appBuilder, IHostApplicationLifetime lifetime) where T : EasyNetQEntity, new() where TConsumer : BaseConsumer<T> { var bus = appBuilder.ApplicationServices.GetRequiredService<IBus>(); var consumer = appBuilder.ApplicationServices.GetRequiredService<TConsumer>(); lifetime.ApplicationStarted.Register(() => { bus.Subscribe<T>(msg => consumer.Excute(msg)); }); lifetime.ApplicationStopped.Register(() => bus?.Dispose()); return appBuilder; } } 订阅与注入 public class Startup { public Startup(IConfiguration configuration) { Configuration = configuration; } public IConfiguration Configuration { get; } public void ConfigureServices(IServiceCollection services) { ...... } public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApplicationLifetime lifetime) { app.UseAllElasticApm(Configuration); app.UseHealthChecks("/health"); app.UseDeveloperExceptionPage(); app.UseSwagger(); app.UseSwaggerUI(c => { c.SwaggerEndpoint("/swagger/v1/swagger.json", "SF.ES.Api v1"); c.RoutePrefix = ""; }); app.UseRouting(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); }); app.UseSubscribe<UserViewDurationMessage, UserViewDurationConsumer>(lifetime); } }
查询接口
查询接口此处有两个细节点:
1.如果不确定月份,则使用通配符查询userviewrecord-*,当然有需要的也可以使用别名处理。
2.因为Elasticsearch是记录UTC时间,因此时间查询得指定TimeZone。
[HttpGet] [Route("record")] public ApiResult<List<UserMarkRecordGetRecordResponse>> GetRecord([FromQuery] UserViewDurationRecordGetRequest request) { var dataList = new List<UserMarkRecordGetRecordResponse>(); string dateTime; if (request.BeginDateTime.HasValue && request.EndDateTime.HasValue) { var month = request.EndDateTime.Value.DifferMonth(request.BeginDateTime.Value); if(month <= 0 ) dateTime = request.BeginDateTime.Value.ToString("yyyy-MM"); else dateTime = "*"; } else dateTime = "*"; var mustQuerys = new List<Func<QueryContainerDescriptor<UserViewDuration>, QueryContainer>>(); if (request.UserId.HasValue) mustQuerys.Add(a => a.Term(t => t.Field(f => f.UserId).Value(request.UserId.Value))); if (request.EntityType.HasValue) mustQuerys.Add(a => a.Term(t => t.Field(f => f.EntityType).Value(request.EntityType))); if (request.EntityId.HasValue) mustQuerys.Add(a => a.Term(t => t.Field(f => f.EntityId).Value(request.EntityId.Value))); if (request.CharpterId.HasValue) mustQuerys.Add(a => a.Term(t => t.Field(f => f.CharpterId).Value(request.CharpterId.Value))); if (request.BeginDateTime.HasValue) mustQuerys.Add(a => a.DateRange(dr => dr.Field(f => f.CreateDateTime).GreaterThanOrEquals(request.BeginDateTime.Value).TimeZone(EsConst.TimeZone))); if (request.EndDateTime.HasValue) mustQuerys.Add(a => a.DateRange(dr => dr.Field(f => f.CreateDateTime).LessThanOrEquals(request.EndDateTime.Value).TimeZone(EsConst.TimeZone))); var searchResult = _elasticClient.Search<UserViewDuration>(a => a.Index(typeof(UserViewDuration).GetRelationName() + "-" + dateTime) .Size(request.Size) .Query(q => q.Bool(b => b.Must(mustQuerys))) .SearchAfterTimestamp(request.Timestamp) .Sort(s => s.Field(f => f.Timestamp, SortOrder.Descending))); var apiResult = searchResult.GetApiResult<UserViewDuration, List<UserMarkRecordGetRecordResponse>>(); if (apiResult.Success) dataList.AddRange(apiResult.Data); return ApiResult<List<UserMarkRecordGetReco