记一次引入Elasticsearch的系统架构实战(三)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 记一次引入Elasticsearch的系统架构实战(三)

异步写入


对于异步写入有两个细节点:

  

1.该数据从RabbtiMQ订阅消费写入到Elasticsearch,从下面代码可以看出,我刻意以月的维度建立Index,格式为 userviewrecord-2021-12,这么做的目的是为了方便管理Index和资源利用,有需要的情况下会删除旧的Index。

  

2.消息订阅与WebAPI暂时集成到同一个进程,这样做主要是开发、部署都方便,如果后续订阅多了,在把消息订阅相关的业务抽离到独立的进程。


按需演变,避免过度设计


订阅消费逻辑


public class UserViewDurationConsumer : BaseConsumer<UserViewDurationMessage>
    {
        private readonly ElasticClient _elasticClient;
        public UserViewDurationConsumer(ElasticClient elasticClient)
        {
            _elasticClient = elasticClient;
        }
        public override void Excute(UserViewDurationMessage msg)
        {
            var document = msg.MapTo<Entity.UserViewDuration>();
            var result = _elasticClient.Create(document, a => a.Index(typeof(Entity.UserViewDuration).GetRelationName() + "-" + msg.CreateDateTime.ToString("yyyy-MM"))).GetApiResult();
            if (result.Failed)
                LoggerHelper.WriteToFile(result.Message);
        }
    }
/// <summary>
    /// 订阅消费
    /// </summary>
    public static class ConsumerExtension
    {
        public static IApplicationBuilder UseSubscribe<T, TConsumer>(this IApplicationBuilder appBuilder, IHostApplicationLifetime lifetime) where T : EasyNetQEntity, new() where TConsumer : BaseConsumer<T>
        {
            var bus = appBuilder.ApplicationServices.GetRequiredService<IBus>();
            var consumer = appBuilder.ApplicationServices.GetRequiredService<TConsumer>();
            lifetime.ApplicationStarted.Register(() =>
            {
                bus.Subscribe<T>(msg => consumer.Excute(msg));
            });
            lifetime.ApplicationStopped.Register(() => bus?.Dispose());
            return appBuilder;
        }
    }
订阅与注入
public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }
        public IConfiguration Configuration { get; }
        public void ConfigureServices(IServiceCollection services)
        {
            ......
        }
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApplicationLifetime lifetime)
        {
            app.UseAllElasticApm(Configuration);
            app.UseHealthChecks("/health");
            app.UseDeveloperExceptionPage();
            app.UseSwagger();
            app.UseSwaggerUI(c =>
            {
                c.SwaggerEndpoint("/swagger/v1/swagger.json", "SF.ES.Api v1");
                c.RoutePrefix = "";
            });
            app.UseRouting();
            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
            app.UseSubscribe<UserViewDurationMessage, UserViewDurationConsumer>(lifetime);
        }
    }


查询接口


查询接口此处有两个细节点:

  

1.如果不确定月份,则使用通配符查询userviewrecord-*,当然有需要的也可以使用别名处理。

  

2.因为Elasticsearch是记录UTC时间,因此时间查询得指定TimeZone。


[HttpGet]
        [Route("record")]
        public ApiResult<List<UserMarkRecordGetRecordResponse>> GetRecord([FromQuery] UserViewDurationRecordGetRequest request)
        {
            var dataList = new List<UserMarkRecordGetRecordResponse>();
            string dateTime;
            if (request.BeginDateTime.HasValue && request.EndDateTime.HasValue)
            {
                var month = request.EndDateTime.Value.DifferMonth(request.BeginDateTime.Value);
                if(month <= 0 )
                    dateTime = request.BeginDateTime.Value.ToString("yyyy-MM");
                else
                    dateTime = "*";
            }
            else
                dateTime = "*";
            var mustQuerys = new List<Func<QueryContainerDescriptor<UserViewDuration>, QueryContainer>>();
            if (request.UserId.HasValue)
                mustQuerys.Add(a => a.Term(t => t.Field(f => f.UserId).Value(request.UserId.Value)));
            if (request.EntityType.HasValue)
                mustQuerys.Add(a => a.Term(t => t.Field(f => f.EntityType).Value(request.EntityType)));
            if (request.EntityId.HasValue)
                mustQuerys.Add(a => a.Term(t => t.Field(f => f.EntityId).Value(request.EntityId.Value)));
            if (request.CharpterId.HasValue)
                mustQuerys.Add(a => a.Term(t => t.Field(f => f.CharpterId).Value(request.CharpterId.Value)));
            if (request.BeginDateTime.HasValue)
                mustQuerys.Add(a => a.DateRange(dr =>
                    dr.Field(f => f.CreateDateTime).GreaterThanOrEquals(request.BeginDateTime.Value).TimeZone(EsConst.TimeZone)));
            if (request.EndDateTime.HasValue)
                mustQuerys.Add(a =>
                    a.DateRange(dr => dr.Field(f => f.CreateDateTime).LessThanOrEquals(request.EndDateTime.Value).TimeZone(EsConst.TimeZone)));
            var searchResult = _elasticClient.Search<UserViewDuration>(a =>
                a.Index(typeof(UserViewDuration).GetRelationName() + "-" + dateTime)
                    .Size(request.Size)
                    .Query(q => q.Bool(b => b.Must(mustQuerys)))
                    .SearchAfterTimestamp(request.Timestamp)
                    .Sort(s => s.Field(f => f.Timestamp, SortOrder.Descending)));
            var apiResult = searchResult.GetApiResult<UserViewDuration, List<UserMarkRecordGetRecordResponse>>();
            if (apiResult.Success)
                dataList.AddRange(apiResult.Data);
            return ApiResult<List<UserMarkRecordGetReco

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
3月前
|
存储 分布式计算 大数据
大数据-169 Elasticsearch 索引使用 与 架构概念 增删改查
大数据-169 Elasticsearch 索引使用 与 架构概念 增删改查
81 3
|
19天前
|
存储 JavaScript 开发工具
基于HarmonyOS 5.0(NEXT)与SpringCloud架构的跨平台应用开发与服务集成研究【实战】
本次的.HarmonyOS Next ,ArkTS语言,HarmonyOS的元服务和DevEco Studio 开发工具,为开发者提供了构建现代化、轻量化、高性能应用的便捷方式。这些技术和工具将帮助开发者更好地适应未来的智能设备和服务提供方式。
50 8
基于HarmonyOS 5.0(NEXT)与SpringCloud架构的跨平台应用开发与服务集成研究【实战】
|
29天前
|
弹性计算 Java 数据库
Web应用上云经典架构实战
本课程详细介绍了Web应用上云的经典架构实战,涵盖前期准备、配置ALB、创建服务器组和监听、验证ECS公网能力、环境配置(JDK、Maven、Node、Git)、下载并运行若依框架、操作第二台ECS以及验证高可用性。通过具体步骤和命令,帮助学员快速掌握云上部署的全流程。
|
28天前
|
搜索推荐 API 定位技术
一文看懂Elasticsearch的技术架构:高效、精准的搜索神器
Elasticsearch 是一个基于 Lucene 的开源搜索引擎,以其强大的全文本搜索功能和快速的倒排索引技术著称。它不仅支持数字、文本、地理位置等多类型数据,还提供了可调相关度分数、高级查询 DSL 等功能。Elasticsearch 的核心技术流程包括数据导入、解析、索引化、查询处理、得分计算及结果返回,确保高效处理大规模数据并提供准确的搜索结果。通过 RESTful API、Logstash 和 Filebeat 等工具,Elasticsearch 可以从多种数据源中导入和解析数据,支持复杂的查询需求。
103 0
|
28天前
|
存储 负载均衡 监控
揭秘 Elasticsearch 集群架构,解锁大数据处理神器
Elasticsearch 是一个强大的分布式搜索和分析引擎,广泛应用于大数据处理、实时搜索和分析。本文深入探讨了 Elasticsearch 集群的架构和特性,包括高可用性和负载均衡,以及主节点、数据节点、协调节点和 Ingest 节点的角色和功能。
49 0
|
2月前
|
消息中间件 Java Kafka
实时数仓Kappa架构:从入门到实战
【11月更文挑战第24天】随着大数据技术的不断发展,企业对实时数据处理和分析的需求日益增长。实时数仓(Real-Time Data Warehouse, RTDW)应运而生,其中Kappa架构作为一种简化的数据处理架构,通过统一的流处理框架,解决了传统Lambda架构中批处理和实时处理的复杂性。本文将深入探讨Kappa架构的历史背景、业务场景、功能点、优缺点、解决的问题以及底层原理,并详细介绍如何使用Java语言快速搭建一套实时数仓。
271 4
|
2月前
|
存储 索引
Elasticsearch分布式架构
【11月更文挑战第2天】
47 1
|
2月前
|
运维 NoSQL Java
后端架构演进:微服务架构的优缺点与实战案例分析
【10月更文挑战第28天】本文探讨了微服务架构与单体架构的优缺点,并通过实战案例分析了微服务架构在实际应用中的表现。微服务架构具有高内聚、低耦合、独立部署等优势,但也面临分布式系统的复杂性和较高的运维成本。通过某电商平台的实际案例,展示了微服务架构在提升系统性能和团队协作效率方面的显著效果,同时也指出了其带来的挑战。
107 4
|
3月前
|
存储 监控 分布式数据库
百亿级存储架构: ElasticSearch+HBase 海量存储架构与实现
本文介绍了百亿级数据存储架构的设计与实现,重点探讨了ElasticSearch和HBase的结合使用。通过ElasticSearch实现快速检索,HBase实现海量数据存储,解决了大规模数据的高效存储与查询问题。文章详细讲解了数据统一接入、元数据管理、数据一致性及平台监控等关键模块的设计思路和技术细节,帮助读者理解和掌握构建高性能数据存储系统的方法。
百亿级存储架构: ElasticSearch+HBase 海量存储架构与实现
|
3月前
|
存储 前端开发 API
DDD领域驱动设计实战-分层架构
DDD分层架构通过明确各层职责及交互规则,有效降低了层间依赖。其基本原则是每层仅与下方层耦合,分为严格和松散两种形式。架构演进包括传统四层架构与改良版四层架构,后者采用依赖反转设计原则优化基础设施层位置。各层职责分明:用户接口层处理显示与请求;应用层负责服务编排与组合;领域层实现业务逻辑;基础层提供技术基础服务。通过合理设计聚合与依赖关系,DDD支持微服务架构灵活演进,提升系统适应性和可维护性。

热门文章

最新文章