记一次引入Elasticsearch的系统架构实战(四)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 记一次引入Elasticsearch的系统架构实战(四)

作品搜索实现细节


实体定义


SearchKey是原有SQL Server的数据,现需要同步到Elasticsearch,仍是继承抽象类ElasticsearchEntity实体定义,同时这里有三个细节点:

 

1. public string KeyName,我定义的是Text类型,在Elasticsearch使用Text类型才会分词。

 

2.在实体定义我没有给KeyName指定分词器,因为我会使用两个分词器:拼音和默认分词,而我会在批量写入数据创建Mapping时定义。

  

3.实体里的 public List<int> SysTagId 与SearchKey在SQL Server是两张不同的物理表是一对多的关系,在代码表示如下,但是在关系型数据库是无法与之对应和体现的,这就是咱们所说的“阻抗失配”,但是能在以文档型存储系统(MongoDB、Elasticsearch)里很好的解决这个问题,可以以一个聚合的方式写入,避免多次查询关联。


[ElasticsearchType(RelationName = "search_key")]
    public class SearchKey : ElasticsearchEntity
    {
        [Number(NumberType.Integer, Name = "key_id")]
        public int KeyId { get; set; }
        [Number(NumberType.Integer, Name = "entity_id")]
        public int EntityId { get; set; }
        [Number(NumberType.Integer, Name = "entity_type")]
        public int EntityType { get; set; }
        [Text(Name = "key_name")]
        public string KeyName { get; set; }
        [Number(NumberType.Integer, Name = "weight")]
        public int Weight { get; set; }
        [Boolean(Name = "is_subsidiary")]
        public bool IsSubsidiary { get; set; }
        [Date(Name = "active_date")]
        public DateTimeOffset? ActiveDate { get; set; }
        [Number(NumberType.Integer, Name = "sys_tag_id")]
        public List<int> SysTagId { get; set; }
    }

 

数据同步

  

数据同步我采用了Quartz.Net定时调度任务框架,因此时效不高,所以每4小时同步一次即可,有42W多的数据,分批进行同步,每次查询1000条数据同时进行一次批量写入。全量同步一次的时间大概2分钟。因此使用RPC调用[ES业务API服务]。

  

因为具体业务逻辑已经封装在[ES业务API服务],因此同步逻辑也相对简单,查询出SQL Server数据源、聚合整理、调用[ES业务API服务]的批量写入接口、重新绑定别名到新的Index。


[DisallowConcurrentExecution]
    public class SearchKeySynchronousJob : BaseJob
    {
        public override void Execute()
        {
            var rm = SFNovelReadManager.Instance();
            var maxId = 0;
            var size = 1000;
            string indexName = "";
            while (true)
            {
                //避免一次性全部查询出来,每1000条一次写入。
                var searchKey = sm.searchKey.GetList(size, maxId);
                if (!searchKey.Any())
                    break;
                var entityIds = searchKey.Select(a => a.EntityID).Distinct().ToList();
                var sysTagRecord = rm.Novel.GetSysTagRecord(entityIds);
                var items = searchKey.Select(a => new SearchKeyPostItem
                {
                    Weight = a.Weight,
                    EntityType = a.EntityType,
                    EntityId = a.EntityID,
                    IsSubsidiary = a.IsSubsidiary ?? false,
                    KeyName = a.KeyName,
                    ActiveDate = a.ActiveDate,
                    SysTagId = sysTagRecord.Where(c => c.EntityID == a.EntityID).Select(c => c.SysTagID).ToList(),
                    KeyID = a.KeyID
                }).ToList();
                //以一个聚合写入到ES
                var postResult = new SearchKeyPostRequest
                {
                    IndexName = indexName,
                    Items = items
                }.Excute();
                if (postResult.Success)
                {
                    indexName = (string)postResult.Data;
                    maxId = searchKey.Max(a => a.KeyID);
                }
            }
            //别名从旧Index指向新的Index,最后删除旧Index
            var renameResult = new SearchKeyRenameRequest
            {
                IndexName = indexName
            }.Excute();
        }
    }
}


业务API接口


批量新增接口这里有2个细节点:

 

1.在第一次有数据进来的时候需要创建Mapping,因为得对KeyName字段定义分词器,其余字段都可以使用AutoMap即可。

  

2.新创建的Index名称是精确到秒的 SearchKey-202112261121


/// <summary>
        /// 批量新增作品搜索列表(返回创建的indexName)
        /// </summary>
        /// <param name="request"></param>
        /// <returns></returns>
        [HttpPost]
        public ApiResult Post(SearchKeyPostRequest request)
        {
            if (!request.Items.Any())
                return ApiResult.IsFailed("无传入数据");
            var date = DateTime.Now;
            var relationName = typeof(SearchKey).GetRelationName();
            var indexName = request.IndexName.IsNullOrWhiteSpace() ? (relationName + "-" + date.ToString("yyyyMMddHHmmss")) : request.IndexName;
            if (request.IndexName.IsNullOrWhiteSpace())
            {
                var createResult = _elasticClient.Indices.Create(indexName,
                    a =>
                        a.Map<SearchKey>(m => m.AutoMap().Properties(p =>
                            p.Custom(new TextProperty
                            {
                                Name = "key_name",
                                Analyzer = "standard",
                                Fields = new Properties(new Dictionary<PropertyName, IProperty>
                                {
                                    { new PropertyName("pinyin"),new TextProperty{ Analyzer = "pinyin"} },
                                    { new PropertyName("standard"),new TextProperty{ Analyzer = "standard"} }
                                })
                            }))));
                if (!createResult.IsValid && request.IndexName.IsNullOrWhiteSpace())
                    return ApiResult.IsFailed("创建索引失败");
            }
            var document = request.Items.MapTo<List<SearchKey>>();
            var result = _elasticClient.BulkAll(indexName, document);
            return result ? ApiResult.IsSuccess(data: indexName) : ApiResult.IsFailed();
        }

 

重新绑定别名接口这里有4个细节点:


1.别名使用searchkey,只会有一个Index[searchkey-yyyyMMddHHmmss]会跟searchkey绑定.

2.优先把已绑定的Index查询出来,方便解绑与删除。


3.别名绑定在Elasticsearch虽然是原子性的,但是不是数据一致性的,因此得先Add后Remove。


4.删除旧得Index免得占用过多资源。


/// <summary>
        /// 重新绑定别名
        /// </summary>
        /// <returns></returns>
        [HttpPut]
        public ApiResult Rename(SearchKeyRanameRequest request)
        {
            var aliasName = typeof(SearchKey).GetRelationName();
            var getAliasResult = _elasticClient.Indices.GetAlias(aliasName);
            //给新index指定别名
            var bulkAliasRequest = new BulkAliasRequest
            {
                Actions = new List<IAliasAction>
                {
                    new AliasAddDescriptor().Index(request.IndexName).Alias(aliasName)
                }
            };
            //移除别名里旧的索引
            if (getAliasResult.IsValid)
            {
                var indeNameList = getAliasResult.Indices.Keys;
                foreach (var indexName in indeNameList)
                {
                    bulkAliasRequest.Actions.Add(new AliasRemoveDescriptor().Index(indexName.Name).Alias(aliasName));
                }
            }
            var result = _elasticClient.Indices.BulkAlias(bulkAliasRequest);
            //删除旧的index
            if (getAliasResult.IsValid)
            {
                var indeNameList = getAliasResult.Indices.Keys;
                foreach (var indexName in indeNameList)
                {
                    _elasticClient.Indices.Delete(indexName);
                }
            }
            return result != null && result.ApiCall.Success ? ApiResult.IsSuccess() : ApiResult.IsFailed();
        }

   

查询接口这里跟前面细节得差不多:

  

但是这里有一个得特别注意的点,可以看到这个查询接口同时使用了should和must,这里得设置minimumShouldMatch才能正常像SQL过滤。

  

should可以理解成SQL的Or,Must可以理解成SQL的And。

  

默认情况下minimumShouldMatch是等于0的,等于0的意思是,should不命中任何的数据仍然会返回must命中的数据,也就是你们可能想搜索(keyname.pinyin=’chengong‘ or keyname.standard=’chengong‘) and id > 0,但是es里没有存keyname='chengong'的数据,会把id> 0 而且 keyname != 'chengong' 数据给查询出来。

  

因此我们得对minimumShouldMatch=1,就是should条件必须得任意命中一个才能返回结果。

  

在should和must混用的情况下必须得注意minimumShouldMatch的设置!!!


/// <summary>
        /// 作品搜索列表
        /// </summary>
        /// <param name="request"></param>
        /// <returns></returns>
        [HttpPost]
        [Route("search")]
        public ApiResult<List<SearchKeyGetResponse>> Get(SearchKeyGetRequest request)
        {
            var shouldQuerys = new List<Func<QueryContainerDescriptor<SearchKey>, QueryContainer>>();
            int minimumShouldMatch = 0;
            if (!request.KeyName.IsNullOrWhiteSpace())
            {
                shouldQuerys.Add(a => a.MatchPhrase(m => m.Field("key_name.pinyin").Query(request.KeyName)));
                shouldQuerys.Add(a => a.MatchPhrase(m => m.Field("key_name.standard").Query(request.KeyName)));
                minimumShouldMatch = 1;
            }
            var mustQuerys = new List<Func<QueryContainerDescriptor<SearchKey>, QueryContainer>>
            {
                a => a.Range(t => t.Field(f => f.Weight).GreaterThanOrEquals(0))
            };
            if (request.IsSubsidiary.HasValue)
                mustQuerys.Add(a => a.Term(t => t.Field(f => f.IsSubsidiary).Value(request.IsSubsidiary.Value)));
            if (request.SysTagIds != null && request.SysTagIds.Any())
                mustQuerys.Add(a => a.Terms(t => t.Field(f => f.SysTagId).Terms(request.SysTagIds)));
            if (request.EntityType.HasValue)
            {
                if (request.EntityType.Value == ESearchKey.EntityType.AllNovel)
                {
                    mustQuerys.Add(a => a.Terms(t => t.Field(f => f.EntityType).Terms(ESearchKey.EntityType.Novel, ESearchKey.EntityType.ChatNovel, ESearchKey.EntityType.FanNovel)));
                }
                else
                    mustQuerys.Add(a => a.Term(t => t.Field(f => f.EntityType).Value((int)request.EntityType.Value)));
            }
            var sortDescriptor = new SortDescriptor<SearchKey>();
            sortDescriptor = request.Sort == ESearchKey.Sort.Weight
                ? sortDescriptor.Field(f => f.Weight, SortOrder.Descending)
                : sortDescriptor.Field(f => f.ActiveDate, SortOrder.Descending);
            var searchResult = _elasticClient.Search<SearchKey>(a =>
                a.Index(typeof(SearchKey).GetRelationName())
                    .From(request.Size * request.Page)
                    .Size(request.Size)
                    .Query(q => q.Bool(b => b.Should(shouldQuerys).Must(mustQuerys).MinimumShouldMatch(minimumShouldMatch)))
                    .Sort(s => sortDescriptor));
            var apiResult = searchResult.GetApiResult<SearchKey, List<SearchKeyGetResponse>>();
            if (apiResult.Success)
                return apiResult;
            return ApiResult<List<SearchKeyGetResponse>>.IsSuccess("空集合数据");
        }


APM监控

  

虽然在上面我做了足够的实现准备,但是对于上生产后的实际使用效果我还是希望有一个直观的体现。我之前写了一篇文章《.Net微服务实战之可观测性》很好叙述了该种情况,有兴趣的可以移步去看看。

  

在之前公司做微服务的时候的APM选型我们使用了Skywalking,但是现在这家公司的运维没有接触过,但是对于Elastic Stack他相对比较熟悉,如同上文所说架构设计的输入核心为两点满足需求组织架构,秉着我的技术选型原则是基于团队架构,我们采用了Elastic APM + Kibana(7.4版本),如下图所示:


image.png

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
3月前
|
运维 Oracle 容灾
Oracle dataguard 容灾技术实战(笔记),教你一种更清晰的Linux运维架构
Oracle dataguard 容灾技术实战(笔记),教你一种更清晰的Linux运维架构
|
2天前
|
弹性计算 监控 数据挖掘
事件驱动架构的优势与应用:深度解析与实战应用
【8月更文挑战第17天】事件驱动架构以其松耦合、可扩展性、异步处理、实时性和高可靠性等优势,在实时数据处理、复杂业务流程、弹性伸缩和实时通信等多个领域展现出巨大的应用潜力。通过合理应用事件驱动架构,可以构建灵活、可扩展和可维护的系统架构,满足不断变化的业务需求和技术挑战。对于开发者而言,深入理解事件驱动架构的核心概念和优势,将有助于更好地设计和实现高质量的软件系统。
|
17天前
|
XML 存储 Android开发
Android实战经验之Kotlin中快速实现MVI架构
本文介绍MVI(Model-View-Intent)架构模式,强调单向数据流与不可变状态管理,提升Android应用的可维护性和可测试性。MVI分为Model(存储数据)、View(展示UI)、Intent(用户动作)、State(UI状态)与ViewModel(处理逻辑)。通过Kotlin示例展示了MVI的实现过程,包括定义Model、State、Intent及创建ViewModel,并在View中观察状态更新UI。
56 12
|
1月前
|
存储 数据采集 数据处理
数据处理神器Elasticsearch_Pipeline:原理、配置与实战指南
数据处理神器Elasticsearch_Pipeline:原理、配置与实战指南
68 12
|
1月前
|
Kubernetes Cloud Native 微服务
企业级容器部署实战:基于ACK与ALB灵活构建云原生应用架构
这篇内容概述了云原生架构的优势,特别是通过阿里云容器服务Kubernetes版(ACK)和应用负载均衡器(ALB)实现的解决方案。它强调了ACK相对于自建Kubernetes的便利性,包括优化的云服务集成、自动化管理和更强的生态系统支持。文章提供了部署云原生应用的步骤,包括一键部署和手动部署的流程,并指出手动部署更适合有技术背景的用户。作者建议在预算允许的情况下使用ACK,因为它能提供高效、便捷的管理体验。同时,文章也提出了对文档改进的建议,如添加更多技术细节和解释,以帮助用户更好地理解和实施解决方案。最后,展望了ACK未来在智能化、安全性与边缘计算等方面的潜在发展。水文一篇,太忙了,见谅!
|
2月前
|
存储 缓存 监控
深入解析Elasticsearch的内存架构与管理
深入解析Elasticsearch的内存架构与管理
深入解析Elasticsearch的内存架构与管理
|
2月前
|
监控 API 数据库
构建高效后端:微服务架构的实战指南
【6月更文挑战第14天】在数字化浪潮下,后端开发面临着前所未有的挑战和机遇。本文将深入探讨微服务架构的设计理念、实现方式及其在现代软件开发中的重要性,为读者提供一份全面而实用的微服务实战手册。
38 2
|
2月前
|
缓存 数据处理 数据安全/隐私保护
Elasticsearch索引状态管理实战指南
Elasticsearch索引状态管理实战指南
44 0
|
2月前
|
存储 索引
Elasticsearch索引之嵌套类型:深度剖析与实战应用
Elasticsearch索引之嵌套类型:深度剖析与实战应用
|
2月前
|
调度
【灵动之链】打造高效处理架构的双轨组合模式实战
【灵动之链】打造高效处理架构的双轨组合模式实战