记一次引入Elasticsearch的系统架构实战(四)

本文涉及的产品
Elasticsearch Serverless通用抵扣包,测试体验金 200元
简介: 记一次引入Elasticsearch的系统架构实战(四)

作品搜索实现细节


实体定义


SearchKey是原有SQL Server的数据,现需要同步到Elasticsearch,仍是继承抽象类ElasticsearchEntity实体定义,同时这里有三个细节点:

 

1. public string KeyName,我定义的是Text类型,在Elasticsearch使用Text类型才会分词。

 

2.在实体定义我没有给KeyName指定分词器,因为我会使用两个分词器:拼音和默认分词,而我会在批量写入数据创建Mapping时定义。

  

3.实体里的 public List<int> SysTagId 与SearchKey在SQL Server是两张不同的物理表是一对多的关系,在代码表示如下,但是在关系型数据库是无法与之对应和体现的,这就是咱们所说的“阻抗失配”,但是能在以文档型存储系统(MongoDB、Elasticsearch)里很好的解决这个问题,可以以一个聚合的方式写入,避免多次查询关联。


[ElasticsearchType(RelationName = "search_key")]
    public class SearchKey : ElasticsearchEntity
    {
        [Number(NumberType.Integer, Name = "key_id")]
        public int KeyId { get; set; }
        [Number(NumberType.Integer, Name = "entity_id")]
        public int EntityId { get; set; }
        [Number(NumberType.Integer, Name = "entity_type")]
        public int EntityType { get; set; }
        [Text(Name = "key_name")]
        public string KeyName { get; set; }
        [Number(NumberType.Integer, Name = "weight")]
        public int Weight { get; set; }
        [Boolean(Name = "is_subsidiary")]
        public bool IsSubsidiary { get; set; }
        [Date(Name = "active_date")]
        public DateTimeOffset? ActiveDate { get; set; }
        [Number(NumberType.Integer, Name = "sys_tag_id")]
        public List<int> SysTagId { get; set; }
    }

 

数据同步

  

数据同步我采用了Quartz.Net定时调度任务框架,因此时效不高,所以每4小时同步一次即可,有42W多的数据,分批进行同步,每次查询1000条数据同时进行一次批量写入。全量同步一次的时间大概2分钟。因此使用RPC调用[ES业务API服务]。

  

因为具体业务逻辑已经封装在[ES业务API服务],因此同步逻辑也相对简单,查询出SQL Server数据源、聚合整理、调用[ES业务API服务]的批量写入接口、重新绑定别名到新的Index。


[DisallowConcurrentExecution]
    public class SearchKeySynchronousJob : BaseJob
    {
        public override void Execute()
        {
            var rm = SFNovelReadManager.Instance();
            var maxId = 0;
            var size = 1000;
            string indexName = "";
            while (true)
            {
                //避免一次性全部查询出来,每1000条一次写入。
                var searchKey = sm.searchKey.GetList(size, maxId);
                if (!searchKey.Any())
                    break;
                var entityIds = searchKey.Select(a => a.EntityID).Distinct().ToList();
                var sysTagRecord = rm.Novel.GetSysTagRecord(entityIds);
                var items = searchKey.Select(a => new SearchKeyPostItem
                {
                    Weight = a.Weight,
                    EntityType = a.EntityType,
                    EntityId = a.EntityID,
                    IsSubsidiary = a.IsSubsidiary ?? false,
                    KeyName = a.KeyName,
                    ActiveDate = a.ActiveDate,
                    SysTagId = sysTagRecord.Where(c => c.EntityID == a.EntityID).Select(c => c.SysTagID).ToList(),
                    KeyID = a.KeyID
                }).ToList();
                //以一个聚合写入到ES
                var postResult = new SearchKeyPostRequest
                {
                    IndexName = indexName,
                    Items = items
                }.Excute();
                if (postResult.Success)
                {
                    indexName = (string)postResult.Data;
                    maxId = searchKey.Max(a => a.KeyID);
                }
            }
            //别名从旧Index指向新的Index,最后删除旧Index
            var renameResult = new SearchKeyRenameRequest
            {
                IndexName = indexName
            }.Excute();
        }
    }
}


业务API接口


批量新增接口这里有2个细节点:

 

1.在第一次有数据进来的时候需要创建Mapping,因为得对KeyName字段定义分词器,其余字段都可以使用AutoMap即可。

  

2.新创建的Index名称是精确到秒的 SearchKey-202112261121


/// <summary>
        /// 批量新增作品搜索列表(返回创建的indexName)
        /// </summary>
        /// <param name="request"></param>
        /// <returns></returns>
        [HttpPost]
        public ApiResult Post(SearchKeyPostRequest request)
        {
            if (!request.Items.Any())
                return ApiResult.IsFailed("无传入数据");
            var date = DateTime.Now;
            var relationName = typeof(SearchKey).GetRelationName();
            var indexName = request.IndexName.IsNullOrWhiteSpace() ? (relationName + "-" + date.ToString("yyyyMMddHHmmss")) : request.IndexName;
            if (request.IndexName.IsNullOrWhiteSpace())
            {
                var createResult = _elasticClient.Indices.Create(indexName,
                    a =>
                        a.Map<SearchKey>(m => m.AutoMap().Properties(p =>
                            p.Custom(new TextProperty
                            {
                                Name = "key_name",
                                Analyzer = "standard",
                                Fields = new Properties(new Dictionary<PropertyName, IProperty>
                                {
                                    { new PropertyName("pinyin"),new TextProperty{ Analyzer = "pinyin"} },
                                    { new PropertyName("standard"),new TextProperty{ Analyzer = "standard"} }
                                })
                            }))));
                if (!createResult.IsValid && request.IndexName.IsNullOrWhiteSpace())
                    return ApiResult.IsFailed("创建索引失败");
            }
            var document = request.Items.MapTo<List<SearchKey>>();
            var result = _elasticClient.BulkAll(indexName, document);
            return result ? ApiResult.IsSuccess(data: indexName) : ApiResult.IsFailed();
        }

 

重新绑定别名接口这里有4个细节点:


1.别名使用searchkey,只会有一个Index[searchkey-yyyyMMddHHmmss]会跟searchkey绑定.

2.优先把已绑定的Index查询出来,方便解绑与删除。


3.别名绑定在Elasticsearch虽然是原子性的,但是不是数据一致性的,因此得先Add后Remove。


4.删除旧得Index免得占用过多资源。


/// <summary>
        /// 重新绑定别名
        /// </summary>
        /// <returns></returns>
        [HttpPut]
        public ApiResult Rename(SearchKeyRanameRequest request)
        {
            var aliasName = typeof(SearchKey).GetRelationName();
            var getAliasResult = _elasticClient.Indices.GetAlias(aliasName);
            //给新index指定别名
            var bulkAliasRequest = new BulkAliasRequest
            {
                Actions = new List<IAliasAction>
                {
                    new AliasAddDescriptor().Index(request.IndexName).Alias(aliasName)
                }
            };
            //移除别名里旧的索引
            if (getAliasResult.IsValid)
            {
                var indeNameList = getAliasResult.Indices.Keys;
                foreach (var indexName in indeNameList)
                {
                    bulkAliasRequest.Actions.Add(new AliasRemoveDescriptor().Index(indexName.Name).Alias(aliasName));
                }
            }
            var result = _elasticClient.Indices.BulkAlias(bulkAliasRequest);
            //删除旧的index
            if (getAliasResult.IsValid)
            {
                var indeNameList = getAliasResult.Indices.Keys;
                foreach (var indexName in indeNameList)
                {
                    _elasticClient.Indices.Delete(indexName);
                }
            }
            return result != null && result.ApiCall.Success ? ApiResult.IsSuccess() : ApiResult.IsFailed();
        }

   

查询接口这里跟前面细节得差不多:

  

但是这里有一个得特别注意的点,可以看到这个查询接口同时使用了should和must,这里得设置minimumShouldMatch才能正常像SQL过滤。

  

should可以理解成SQL的Or,Must可以理解成SQL的And。

  

默认情况下minimumShouldMatch是等于0的,等于0的意思是,should不命中任何的数据仍然会返回must命中的数据,也就是你们可能想搜索(keyname.pinyin=’chengong‘ or keyname.standard=’chengong‘) and id > 0,但是es里没有存keyname='chengong'的数据,会把id> 0 而且 keyname != 'chengong' 数据给查询出来。

  

因此我们得对minimumShouldMatch=1,就是should条件必须得任意命中一个才能返回结果。

  

在should和must混用的情况下必须得注意minimumShouldMatch的设置!!!


/// <summary>
        /// 作品搜索列表
        /// </summary>
        /// <param name="request"></param>
        /// <returns></returns>
        [HttpPost]
        [Route("search")]
        public ApiResult<List<SearchKeyGetResponse>> Get(SearchKeyGetRequest request)
        {
            var shouldQuerys = new List<Func<QueryContainerDescriptor<SearchKey>, QueryContainer>>();
            int minimumShouldMatch = 0;
            if (!request.KeyName.IsNullOrWhiteSpace())
            {
                shouldQuerys.Add(a => a.MatchPhrase(m => m.Field("key_name.pinyin").Query(request.KeyName)));
                shouldQuerys.Add(a => a.MatchPhrase(m => m.Field("key_name.standard").Query(request.KeyName)));
                minimumShouldMatch = 1;
            }
            var mustQuerys = new List<Func<QueryContainerDescriptor<SearchKey>, QueryContainer>>
            {
                a => a.Range(t => t.Field(f => f.Weight).GreaterThanOrEquals(0))
            };
            if (request.IsSubsidiary.HasValue)
                mustQuerys.Add(a => a.Term(t => t.Field(f => f.IsSubsidiary).Value(request.IsSubsidiary.Value)));
            if (request.SysTagIds != null && request.SysTagIds.Any())
                mustQuerys.Add(a => a.Terms(t => t.Field(f => f.SysTagId).Terms(request.SysTagIds)));
            if (request.EntityType.HasValue)
            {
                if (request.EntityType.Value == ESearchKey.EntityType.AllNovel)
                {
                    mustQuerys.Add(a => a.Terms(t => t.Field(f => f.EntityType).Terms(ESearchKey.EntityType.Novel, ESearchKey.EntityType.ChatNovel, ESearchKey.EntityType.FanNovel)));
                }
                else
                    mustQuerys.Add(a => a.Term(t => t.Field(f => f.EntityType).Value((int)request.EntityType.Value)));
            }
            var sortDescriptor = new SortDescriptor<SearchKey>();
            sortDescriptor = request.Sort == ESearchKey.Sort.Weight
                ? sortDescriptor.Field(f => f.Weight, SortOrder.Descending)
                : sortDescriptor.Field(f => f.ActiveDate, SortOrder.Descending);
            var searchResult = _elasticClient.Search<SearchKey>(a =>
                a.Index(typeof(SearchKey).GetRelationName())
                    .From(request.Size * request.Page)
                    .Size(request.Size)
                    .Query(q => q.Bool(b => b.Should(shouldQuerys).Must(mustQuerys).MinimumShouldMatch(minimumShouldMatch)))
                    .Sort(s => sortDescriptor));
            var apiResult = searchResult.GetApiResult<SearchKey, List<SearchKeyGetResponse>>();
            if (apiResult.Success)
                return apiResult;
            return ApiResult<List<SearchKeyGetResponse>>.IsSuccess("空集合数据");
        }


APM监控

  

虽然在上面我做了足够的实现准备,但是对于上生产后的实际使用效果我还是希望有一个直观的体现。我之前写了一篇文章《.Net微服务实战之可观测性》很好叙述了该种情况,有兴趣的可以移步去看看。

  

在之前公司做微服务的时候的APM选型我们使用了Skywalking,但是现在这家公司的运维没有接触过,但是对于Elastic Stack他相对比较熟悉,如同上文所说架构设计的输入核心为两点满足需求组织架构,秉着我的技术选型原则是基于团队架构,我们采用了Elastic APM + Kibana(7.4版本),如下图所示:


image.png

相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。 &nbsp;
目录
相关文章
|
1月前
|
缓存 监控 前端开发
顺企网 API 开发实战:搜索 / 详情接口从 0 到 1 落地(附 Elasticsearch 优化 + 错误速查)
企业API开发常陷参数、缓存、错误处理三大坑?本指南拆解顺企网双接口全流程,涵盖搜索优化、签名验证、限流应对,附可复用代码与错误速查表,助你2小时高效搞定开发,提升响应速度与稳定性。
|
3月前
|
人工智能 监控 前端开发
支付宝 AI 出行助手高效研发指南:4 人团队的架构迁移与提效实战
支付宝「AI 出行助手」是一款集成公交、地铁、火车票、机票、打车等多项功能的智能出行产品。
623 21
支付宝 AI 出行助手高效研发指南:4 人团队的架构迁移与提效实战
|
3月前
|
消息中间件 Java Kafka
Java 事件驱动架构设计实战与 Kafka 生态系统组件实操全流程指南
本指南详解Java事件驱动架构与Kafka生态实操,涵盖环境搭建、事件模型定义、生产者与消费者实现、事件测试及高级特性,助你快速构建高可扩展分布式系统。
235 7
|
3月前
|
监控 Java API
Spring Boot 3.2 结合 Spring Cloud 微服务架构实操指南 现代分布式应用系统构建实战教程
Spring Boot 3.2 + Spring Cloud 2023.0 微服务架构实践摘要 本文基于Spring Boot 3.2.5和Spring Cloud 2023.0.1最新稳定版本,演示现代微服务架构的构建过程。主要内容包括: 技术栈选择:采用Spring Cloud Netflix Eureka 4.1.0作为服务注册中心,Resilience4j 2.1.0替代Hystrix实现熔断机制,配合OpenFeign和Gateway等组件。 核心实操步骤: 搭建Eureka注册中心服务 构建商品
679 3
|
3月前
|
消息中间件 Java 数据库
Java 基于 DDD 分层架构实战从基础到精通最新实操全流程指南
本文详解基于Java的领域驱动设计(DDD)分层架构实战,结合Spring Boot 3.x、Spring Data JPA 3.x等最新技术栈,通过电商订单系统案例展示如何构建清晰、可维护的微服务架构。内容涵盖项目结构设计、各层实现细节及关键技术点,助力开发者掌握DDD在复杂业务系统中的应用。
661 0
|
4月前
|
存储 SQL 监控
数据中台架构解析:湖仓一体的实战设计
在数据量激增的数字化时代,企业面临数据分散、使用效率低等问题。数据中台作为统一管理与应用数据的核心平台,结合湖仓一体架构,打通数据壁垒,实现高效流转与分析。本文详解湖仓一体的设计与落地实践,助力企业构建统一、灵活的数据底座,驱动业务决策与创新。
|
4月前
|
存储 设计模式 人工智能
AI Agent安全架构实战:基于LangGraph的Human-in-the-Loop系统设计​
本文深入解析Human-in-the-Loop(HIL)架构在AI Agent中的核心应用,探讨其在高风险场景下的断点控制、状态恢复与安全管控机制,并结合LangGraph的创新设计与金融交易实战案例,展示如何实现效率与安全的平衡。
747 0
|
1月前
|
Cloud Native Serverless API
微服务架构实战指南:从单体应用到云原生的蜕变之路
🌟蒋星熠Jaxonic,代码为舟的星际旅人。深耕微服务架构,擅以DDD拆分服务、构建高可用通信与治理体系。分享从单体到云原生的实战经验,探索技术演进的无限可能。
微服务架构实战指南:从单体应用到云原生的蜕变之路
|
1月前
|
监控 Cloud Native Java
Spring Boot 3.x 微服务架构实战指南
🌟蒋星熠Jaxonic,技术宇宙中的星际旅人。深耕Spring Boot 3.x与微服务架构,探索云原生、性能优化与高可用系统设计。以代码为笔,在二进制星河中谱写极客诗篇。关注我,共赴技术星辰大海!(238字)
Spring Boot 3.x 微服务架构实战指南
|
2月前
|
消息中间件 数据采集 NoSQL
秒级行情推送系统实战:从触发、采集到入库的端到端架构
本文设计了一套秒级实时行情推送系统,涵盖触发、采集、缓冲、入库与推送五层架构,结合动态代理IP、Kafka/Redis缓冲及WebSocket推送,实现金融数据低延迟、高并发处理,适用于股票、数字货币等实时行情场景。
290 3
秒级行情推送系统实战:从触发、采集到入库的端到端架构