记一次引入Elasticsearch的系统架构实战(三)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 记一次引入Elasticsearch的系统架构实战(三)

异步写入


对于异步写入有两个细节点:

  

1.该数据从RabbtiMQ订阅消费写入到Elasticsearch,从下面代码可以看出,我刻意以月的维度建立Index,格式为 userviewrecord-2021-12,这么做的目的是为了方便管理Index和资源利用,有需要的情况下会删除旧的Index。

  

2.消息订阅与WebAPI暂时集成到同一个进程,这样做主要是开发、部署都方便,如果后续订阅多了,在把消息订阅相关的业务抽离到独立的进程。


按需演变,避免过度设计


订阅消费逻辑


public class UserViewDurationConsumer : BaseConsumer<UserViewDurationMessage>
    {
        private readonly ElasticClient _elasticClient;
        public UserViewDurationConsumer(ElasticClient elasticClient)
        {
            _elasticClient = elasticClient;
        }
        public override void Excute(UserViewDurationMessage msg)
        {
            var document = msg.MapTo<Entity.UserViewDuration>();
            var result = _elasticClient.Create(document, a => a.Index(typeof(Entity.UserViewDuration).GetRelationName() + "-" + msg.CreateDateTime.ToString("yyyy-MM"))).GetApiResult();
            if (result.Failed)
                LoggerHelper.WriteToFile(result.Message);
        }
    }
/// <summary>
    /// 订阅消费
    /// </summary>
    public static class ConsumerExtension
    {
        public static IApplicationBuilder UseSubscribe<T, TConsumer>(this IApplicationBuilder appBuilder, IHostApplicationLifetime lifetime) where T : EasyNetQEntity, new() where TConsumer : BaseConsumer<T>
        {
            var bus = appBuilder.ApplicationServices.GetRequiredService<IBus>();
            var consumer = appBuilder.ApplicationServices.GetRequiredService<TConsumer>();
            lifetime.ApplicationStarted.Register(() =>
            {
                bus.Subscribe<T>(msg => consumer.Excute(msg));
            });
            lifetime.ApplicationStopped.Register(() => bus?.Dispose());
            return appBuilder;
        }
    }
订阅与注入
public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }
        public IConfiguration Configuration { get; }
        public void ConfigureServices(IServiceCollection services)
        {
            ......
        }
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApplicationLifetime lifetime)
        {
            app.UseAllElasticApm(Configuration);
            app.UseHealthChecks("/health");
            app.UseDeveloperExceptionPage();
            app.UseSwagger();
            app.UseSwaggerUI(c =>
            {
                c.SwaggerEndpoint("/swagger/v1/swagger.json", "SF.ES.Api v1");
                c.RoutePrefix = "";
            });
            app.UseRouting();
            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
            app.UseSubscribe<UserViewDurationMessage, UserViewDurationConsumer>(lifetime);
        }
    }


查询接口


查询接口此处有两个细节点:

  

1.如果不确定月份,则使用通配符查询userviewrecord-*,当然有需要的也可以使用别名处理。

  

2.因为Elasticsearch是记录UTC时间,因此时间查询得指定TimeZone。


[HttpGet]
        [Route("record")]
        public ApiResult<List<UserMarkRecordGetRecordResponse>> GetRecord([FromQuery] UserViewDurationRecordGetRequest request)
        {
            var dataList = new List<UserMarkRecordGetRecordResponse>();
            string dateTime;
            if (request.BeginDateTime.HasValue && request.EndDateTime.HasValue)
            {
                var month = request.EndDateTime.Value.DifferMonth(request.BeginDateTime.Value);
                if(month <= 0 )
                    dateTime = request.BeginDateTime.Value.ToString("yyyy-MM");
                else
                    dateTime = "*";
            }
            else
                dateTime = "*";
            var mustQuerys = new List<Func<QueryContainerDescriptor<UserViewDuration>, QueryContainer>>();
            if (request.UserId.HasValue)
                mustQuerys.Add(a => a.Term(t => t.Field(f => f.UserId).Value(request.UserId.Value)));
            if (request.EntityType.HasValue)
                mustQuerys.Add(a => a.Term(t => t.Field(f => f.EntityType).Value(request.EntityType)));
            if (request.EntityId.HasValue)
                mustQuerys.Add(a => a.Term(t => t.Field(f => f.EntityId).Value(request.EntityId.Value)));
            if (request.CharpterId.HasValue)
                mustQuerys.Add(a => a.Term(t => t.Field(f => f.CharpterId).Value(request.CharpterId.Value)));
            if (request.BeginDateTime.HasValue)
                mustQuerys.Add(a => a.DateRange(dr =>
                    dr.Field(f => f.CreateDateTime).GreaterThanOrEquals(request.BeginDateTime.Value).TimeZone(EsConst.TimeZone)));
            if (request.EndDateTime.HasValue)
                mustQuerys.Add(a =>
                    a.DateRange(dr => dr.Field(f => f.CreateDateTime).LessThanOrEquals(request.EndDateTime.Value).TimeZone(EsConst.TimeZone)));
            var searchResult = _elasticClient.Search<UserViewDuration>(a =>
                a.Index(typeof(UserViewDuration).GetRelationName() + "-" + dateTime)
                    .Size(request.Size)
                    .Query(q => q.Bool(b => b.Must(mustQuerys)))
                    .SearchAfterTimestamp(request.Timestamp)
                    .Sort(s => s.Field(f => f.Timestamp, SortOrder.Descending)));
            var apiResult = searchResult.GetApiResult<UserViewDuration, List<UserMarkRecordGetRecordResponse>>();
            if (apiResult.Success)
                dataList.AddRange(apiResult.Data);
            return ApiResult<List<UserMarkRecordGetReco

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
2月前
|
存储 搜索推荐 数据挖掘
ElasticSearch架构介绍及原理解析
ElasticSearch架构介绍及原理解析
128 0
|
4天前
|
人工智能 自然语言处理 开发者
Langchain 与 Elasticsearch:创新数据检索的融合实战
Langchain 与 Elasticsearch:创新数据检索的融合实战
29 10
|
4天前
|
canal 自然语言处理 关系型数据库
Elasticsearch 线上实战问题及解决方案探讨
Elasticsearch 线上实战问题及解决方案探讨
16 0
|
4天前
|
监控 API 索引
实战问题:Elasticsearch 2.X 数据如何迁移到 7.X?
实战问题:Elasticsearch 2.X 数据如何迁移到 7.X?
10 0
|
4天前
|
存储 Serverless 定位技术
深度探索 Elasticsearch 8.X:function_score 参数解读与实战案例分析
深度探索 Elasticsearch 8.X:function_score 参数解读与实战案例分析
10 0
|
4天前
|
机器学习/深度学习 存储 计算机视觉
Elasticsearch 8.X “图搜图”实战
Elasticsearch 8.X “图搜图”实战
15 0
|
4天前
|
存储 机器学习/深度学习 API
高维向量搜索:在 Elasticsearch 8.X 中利用 dense_vector 的实战探索
高维向量搜索:在 Elasticsearch 8.X 中利用 dense_vector 的实战探索
19 0
高维向量搜索:在 Elasticsearch 8.X 中利用 dense_vector 的实战探索
|
4天前
|
存储 缓存 监控
干货 | Elasticsearch 8.X 性能优化实战
干货 | Elasticsearch 8.X 性能优化实战
18 2
|
4天前
|
存储 缓存 运维
Elasticsearch 8.X 检索实战调优锦囊 001
Elasticsearch 8.X 检索实战调优锦囊 001
12 0
|
12天前
|
安全 Java 数据安全/隐私保护
Spring Boot优雅实现多租户架构:概念与实战
【4月更文挑战第29天】在多租户系统中,一个应用实例服务于多个租户,每个租户享有独立的数据视图,而应用的基础设施被共享。这样的架构不仅优化了资源使用,还能降低维护和运营成本。本文将详细介绍如何在Spring Boot中实现多租户架构,并提供具体的实战案例。
39 2