记一次引入Elasticsearch的系统架构实战(四)

本文涉及的产品
Elasticsearch Serverless通用抵扣包,测试体验金 200元
简介: 记一次引入Elasticsearch的系统架构实战(四)

作品搜索实现细节


实体定义


SearchKey是原有SQL Server的数据,现需要同步到Elasticsearch,仍是继承抽象类ElasticsearchEntity实体定义,同时这里有三个细节点:

 

1. public string KeyName,我定义的是Text类型,在Elasticsearch使用Text类型才会分词。

 

2.在实体定义我没有给KeyName指定分词器,因为我会使用两个分词器:拼音和默认分词,而我会在批量写入数据创建Mapping时定义。

  

3.实体里的 public List<int> SysTagId 与SearchKey在SQL Server是两张不同的物理表是一对多的关系,在代码表示如下,但是在关系型数据库是无法与之对应和体现的,这就是咱们所说的“阻抗失配”,但是能在以文档型存储系统(MongoDB、Elasticsearch)里很好的解决这个问题,可以以一个聚合的方式写入,避免多次查询关联。


[ElasticsearchType(RelationName = "search_key")]
    public class SearchKey : ElasticsearchEntity
    {
        [Number(NumberType.Integer, Name = "key_id")]
        public int KeyId { get; set; }
        [Number(NumberType.Integer, Name = "entity_id")]
        public int EntityId { get; set; }
        [Number(NumberType.Integer, Name = "entity_type")]
        public int EntityType { get; set; }
        [Text(Name = "key_name")]
        public string KeyName { get; set; }
        [Number(NumberType.Integer, Name = "weight")]
        public int Weight { get; set; }
        [Boolean(Name = "is_subsidiary")]
        public bool IsSubsidiary { get; set; }
        [Date(Name = "active_date")]
        public DateTimeOffset? ActiveDate { get; set; }
        [Number(NumberType.Integer, Name = "sys_tag_id")]
        public List<int> SysTagId { get; set; }
    }

 

数据同步

  

数据同步我采用了Quartz.Net定时调度任务框架,因此时效不高,所以每4小时同步一次即可,有42W多的数据,分批进行同步,每次查询1000条数据同时进行一次批量写入。全量同步一次的时间大概2分钟。因此使用RPC调用[ES业务API服务]。

  

因为具体业务逻辑已经封装在[ES业务API服务],因此同步逻辑也相对简单,查询出SQL Server数据源、聚合整理、调用[ES业务API服务]的批量写入接口、重新绑定别名到新的Index。


[DisallowConcurrentExecution]
    public class SearchKeySynchronousJob : BaseJob
    {
        public override void Execute()
        {
            var rm = SFNovelReadManager.Instance();
            var maxId = 0;
            var size = 1000;
            string indexName = "";
            while (true)
            {
                //避免一次性全部查询出来,每1000条一次写入。
                var searchKey = sm.searchKey.GetList(size, maxId);
                if (!searchKey.Any())
                    break;
                var entityIds = searchKey.Select(a => a.EntityID).Distinct().ToList();
                var sysTagRecord = rm.Novel.GetSysTagRecord(entityIds);
                var items = searchKey.Select(a => new SearchKeyPostItem
                {
                    Weight = a.Weight,
                    EntityType = a.EntityType,
                    EntityId = a.EntityID,
                    IsSubsidiary = a.IsSubsidiary ?? false,
                    KeyName = a.KeyName,
                    ActiveDate = a.ActiveDate,
                    SysTagId = sysTagRecord.Where(c => c.EntityID == a.EntityID).Select(c => c.SysTagID).ToList(),
                    KeyID = a.KeyID
                }).ToList();
                //以一个聚合写入到ES
                var postResult = new SearchKeyPostRequest
                {
                    IndexName = indexName,
                    Items = items
                }.Excute();
                if (postResult.Success)
                {
                    indexName = (string)postResult.Data;
                    maxId = searchKey.Max(a => a.KeyID);
                }
            }
            //别名从旧Index指向新的Index,最后删除旧Index
            var renameResult = new SearchKeyRenameRequest
            {
                IndexName = indexName
            }.Excute();
        }
    }
}


业务API接口


批量新增接口这里有2个细节点:

 

1.在第一次有数据进来的时候需要创建Mapping,因为得对KeyName字段定义分词器,其余字段都可以使用AutoMap即可。

  

2.新创建的Index名称是精确到秒的 SearchKey-202112261121


/// <summary>
        /// 批量新增作品搜索列表(返回创建的indexName)
        /// </summary>
        /// <param name="request"></param>
        /// <returns></returns>
        [HttpPost]
        public ApiResult Post(SearchKeyPostRequest request)
        {
            if (!request.Items.Any())
                return ApiResult.IsFailed("无传入数据");
            var date = DateTime.Now;
            var relationName = typeof(SearchKey).GetRelationName();
            var indexName = request.IndexName.IsNullOrWhiteSpace() ? (relationName + "-" + date.ToString("yyyyMMddHHmmss")) : request.IndexName;
            if (request.IndexName.IsNullOrWhiteSpace())
            {
                var createResult = _elasticClient.Indices.Create(indexName,
                    a =>
                        a.Map<SearchKey>(m => m.AutoMap().Properties(p =>
                            p.Custom(new TextProperty
                            {
                                Name = "key_name",
                                Analyzer = "standard",
                                Fields = new Properties(new Dictionary<PropertyName, IProperty>
                                {
                                    { new PropertyName("pinyin"),new TextProperty{ Analyzer = "pinyin"} },
                                    { new PropertyName("standard"),new TextProperty{ Analyzer = "standard"} }
                                })
                            }))));
                if (!createResult.IsValid && request.IndexName.IsNullOrWhiteSpace())
                    return ApiResult.IsFailed("创建索引失败");
            }
            var document = request.Items.MapTo<List<SearchKey>>();
            var result = _elasticClient.BulkAll(indexName, document);
            return result ? ApiResult.IsSuccess(data: indexName) : ApiResult.IsFailed();
        }

 

重新绑定别名接口这里有4个细节点:


1.别名使用searchkey,只会有一个Index[searchkey-yyyyMMddHHmmss]会跟searchkey绑定.

2.优先把已绑定的Index查询出来,方便解绑与删除。


3.别名绑定在Elasticsearch虽然是原子性的,但是不是数据一致性的,因此得先Add后Remove。


4.删除旧得Index免得占用过多资源。


/// <summary>
        /// 重新绑定别名
        /// </summary>
        /// <returns></returns>
        [HttpPut]
        public ApiResult Rename(SearchKeyRanameRequest request)
        {
            var aliasName = typeof(SearchKey).GetRelationName();
            var getAliasResult = _elasticClient.Indices.GetAlias(aliasName);
            //给新index指定别名
            var bulkAliasRequest = new BulkAliasRequest
            {
                Actions = new List<IAliasAction>
                {
                    new AliasAddDescriptor().Index(request.IndexName).Alias(aliasName)
                }
            };
            //移除别名里旧的索引
            if (getAliasResult.IsValid)
            {
                var indeNameList = getAliasResult.Indices.Keys;
                foreach (var indexName in indeNameList)
                {
                    bulkAliasRequest.Actions.Add(new AliasRemoveDescriptor().Index(indexName.Name).Alias(aliasName));
                }
            }
            var result = _elasticClient.Indices.BulkAlias(bulkAliasRequest);
            //删除旧的index
            if (getAliasResult.IsValid)
            {
                var indeNameList = getAliasResult.Indices.Keys;
                foreach (var indexName in indeNameList)
                {
                    _elasticClient.Indices.Delete(indexName);
                }
            }
            return result != null && result.ApiCall.Success ? ApiResult.IsSuccess() : ApiResult.IsFailed();
        }

   

查询接口这里跟前面细节得差不多:

  

但是这里有一个得特别注意的点,可以看到这个查询接口同时使用了should和must,这里得设置minimumShouldMatch才能正常像SQL过滤。

  

should可以理解成SQL的Or,Must可以理解成SQL的And。

  

默认情况下minimumShouldMatch是等于0的,等于0的意思是,should不命中任何的数据仍然会返回must命中的数据,也就是你们可能想搜索(keyname.pinyin=’chengong‘ or keyname.standard=’chengong‘) and id > 0,但是es里没有存keyname='chengong'的数据,会把id> 0 而且 keyname != 'chengong' 数据给查询出来。

  

因此我们得对minimumShouldMatch=1,就是should条件必须得任意命中一个才能返回结果。

  

在should和must混用的情况下必须得注意minimumShouldMatch的设置!!!


/// <summary>
        /// 作品搜索列表
        /// </summary>
        /// <param name="request"></param>
        /// <returns></returns>
        [HttpPost]
        [Route("search")]
        public ApiResult<List<SearchKeyGetResponse>> Get(SearchKeyGetRequest request)
        {
            var shouldQuerys = new List<Func<QueryContainerDescriptor<SearchKey>, QueryContainer>>();
            int minimumShouldMatch = 0;
            if (!request.KeyName.IsNullOrWhiteSpace())
            {
                shouldQuerys.Add(a => a.MatchPhrase(m => m.Field("key_name.pinyin").Query(request.KeyName)));
                shouldQuerys.Add(a => a.MatchPhrase(m => m.Field("key_name.standard").Query(request.KeyName)));
                minimumShouldMatch = 1;
            }
            var mustQuerys = new List<Func<QueryContainerDescriptor<SearchKey>, QueryContainer>>
            {
                a => a.Range(t => t.Field(f => f.Weight).GreaterThanOrEquals(0))
            };
            if (request.IsSubsidiary.HasValue)
                mustQuerys.Add(a => a.Term(t => t.Field(f => f.IsSubsidiary).Value(request.IsSubsidiary.Value)));
            if (request.SysTagIds != null && request.SysTagIds.Any())
                mustQuerys.Add(a => a.Terms(t => t.Field(f => f.SysTagId).Terms(request.SysTagIds)));
            if (request.EntityType.HasValue)
            {
                if (request.EntityType.Value == ESearchKey.EntityType.AllNovel)
                {
                    mustQuerys.Add(a => a.Terms(t => t.Field(f => f.EntityType).Terms(ESearchKey.EntityType.Novel, ESearchKey.EntityType.ChatNovel, ESearchKey.EntityType.FanNovel)));
                }
                else
                    mustQuerys.Add(a => a.Term(t => t.Field(f => f.EntityType).Value((int)request.EntityType.Value)));
            }
            var sortDescriptor = new SortDescriptor<SearchKey>();
            sortDescriptor = request.Sort == ESearchKey.Sort.Weight
                ? sortDescriptor.Field(f => f.Weight, SortOrder.Descending)
                : sortDescriptor.Field(f => f.ActiveDate, SortOrder.Descending);
            var searchResult = _elasticClient.Search<SearchKey>(a =>
                a.Index(typeof(SearchKey).GetRelationName())
                    .From(request.Size * request.Page)
                    .Size(request.Size)
                    .Query(q => q.Bool(b => b.Should(shouldQuerys).Must(mustQuerys).MinimumShouldMatch(minimumShouldMatch)))
                    .Sort(s => sortDescriptor));
            var apiResult = searchResult.GetApiResult<SearchKey, List<SearchKeyGetResponse>>();
            if (apiResult.Success)
                return apiResult;
            return ApiResult<List<SearchKeyGetResponse>>.IsSuccess("空集合数据");
        }


APM监控

  

虽然在上面我做了足够的实现准备,但是对于上生产后的实际使用效果我还是希望有一个直观的体现。我之前写了一篇文章《.Net微服务实战之可观测性》很好叙述了该种情况,有兴趣的可以移步去看看。

  

在之前公司做微服务的时候的APM选型我们使用了Skywalking,但是现在这家公司的运维没有接触过,但是对于Elastic Stack他相对比较熟悉,如同上文所说架构设计的输入核心为两点满足需求组织架构,秉着我的技术选型原则是基于团队架构,我们采用了Elastic APM + Kibana(7.4版本),如下图所示:


image.png

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
打赏
0
0
0
0
633
分享
相关文章
RocketMQ实战—3.基于RocketMQ升级订单系统架构
本文主要介绍了基于MQ实现订单系统核心流程的异步化改造、基于MQ实现订单系统和第三方系统的解耦、基于MQ实现将订单数据同步给大数据团队、秒杀系统的技术难点以及秒杀商详页的架构设计和基于MQ实现秒杀系统的异步化架构。
328 64
RocketMQ实战—3.基于RocketMQ升级订单系统架构
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
本文聚焦 MySQL 集群架构中的负载均衡算法,阐述其重要性。详细介绍轮询、加权轮询、最少连接、加权最少连接、随机、源地址哈希等常用算法,分析各自优缺点及适用场景。并提供 Java 语言代码实现示例,助力直观理解。文章结构清晰,语言通俗易懂,对理解和应用负载均衡算法具有实用价值和参考价值。
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
从理论到落地:MCP 实战解锁 AI 应用架构新范式
本文旨在从 MCP 的技术原理、降低 MCP Server 构建复杂度、提升 Server 运行稳定性等方面出发,分享我们的一些实践心得。
1626 101
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
471 7
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
微服务架构下的电商API接口设计:策略、方法与实战案例
本文探讨了微服务架构下的电商API接口设计,旨在打造高效、灵活与可扩展的电商系统。通过服务拆分(如商品、订单、支付等模块)和标准化设计(RESTful或GraphQL风格),确保接口一致性与易用性。同时,采用缓存策略、负载均衡及限流技术优化性能,并借助Prometheus等工具实现监控与日志管理。微服务架构的优势在于支持敏捷开发、高并发处理和独立部署,满足电商业务快速迭代需求。未来,电商API设计将向智能化与安全化方向发展。
Google揭秘Agent架构三大核心:工具、模型与编排层实战指南
本文为Google发布的Agent白皮书全文翻译。本文揭示了智能体如何突破传统AI边界,通过模型、工具与编排层的三位一体架构,实现自主推理与现实交互。它不仅详解了ReAct、思维树等认知框架的运作逻辑,更通过航班预订、旅行规划等案例,展示了智能体如何调用Extensions、Functions和Data Stores,将抽象指令转化为真实世界操作。文中提出的“智能体链式组合”概念,预示了未来多智能体协作解决复杂问题的革命性潜力——这不仅是技术升级,更是AI赋能产业的范式颠覆。
599 1
百万级URL重定向工程:大规模网站架构设计与性能优化实战
本文深入探讨了大规模重定向系统的核心挑战与解决方案,涵盖技术瓶颈分析、分布式架构设计、十亿级URL处理策略、全球化部署方案及全链路监控体系。通过数学建模与性能优化,提出三层架构模型,并结合一致性哈希分片算法实现高效路由。同时,对比不同架构的吞吐量与容灾能力,分享某电商平台实践案例,展示性能显著提升。最后展望重定向即服务(RaaS)未来趋势,包括AI动态路由、量子安全跳转和边缘智能等关键技术,为企业提供扩展性强、稳定性高的系统设计参考。
119 25
301重定向进阶实战:从性能优化到未来架构演进
本文探讨了百万级流量动态重定向的架构设计与优化方案,结合全球电商平台迁移案例,展示基于Nginx+Lua的动态规则引擎及流量分级策略。同时,深入分析性能优化与安全加固技术,如零延迟跳转、智能熔断机制,并提出混合云环境下的跨平台解决方案。此外,针对SEO数据继承与流量恢复提供三维权重映射模型和自动化监测工具链。最后,展望边缘计算、区块链及量子安全等下一代重定向技术,为企业构建面向未来的体系提供参考。
114 7
让搜索引擎“更懂你”:AI × Elasticsearch MCP Server 开源实战
本文介绍基于Model Context Protocol (MCP)标准的Elasticsearch MCP Server,它为AI助手(如Claude、Cursor等)提供与Elasticsearch数据源交互的能力。文章涵盖MCP概念、Elasticsearch MCP Server的功能特性及实际应用场景,例如数据探索、开发辅助。通过自然语言处理,用户无需掌握复杂查询语法即可操作Elasticsearch,显著降低使用门槛并提升效率。项目开源地址:&lt;https://github.com/awesimon/elasticsearch-mcp&gt;,欢迎体验与反馈。
845 1
Python 高级编程与实战:深入理解设计模式与软件架构
本文深入探讨了Python中的设计模式与软件架构,涵盖单例、工厂、观察者模式及MVC、微服务架构,并通过实战项目如插件系统和Web应用帮助读者掌握这些技术。文章提供了代码示例,便于理解和实践。最后推荐了进一步学习的资源,助力提升Python编程技能。

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问