作品搜索实现细节
实体定义
SearchKey是原有SQL Server的数据,现需要同步到Elasticsearch,仍是继承抽象类ElasticsearchEntity实体定义,同时这里有三个细节点:
1. public string KeyName,我定义的是Text类型,在Elasticsearch使用Text类型才会分词。
2.在实体定义我没有给KeyName指定分词器,因为我会使用两个分词器:拼音和默认分词,而我会在批量写入数据创建Mapping时定义。
3.实体里的 public List<int> SysTagId 与SearchKey在SQL Server是两张不同的物理表,是一对多的关系,在代码表示如下,但是在关系型数据库是无法与之对应和体现的,这就是咱们所说的“阻抗失配”,但是能在以文档型存储系统(MongoDB、Elasticsearch)里很好的解决这个问题,可以以一个聚合的方式写入,避免多次查询关联。
[ElasticsearchType(RelationName = "search_key")] public class SearchKey : ElasticsearchEntity { [Number(NumberType.Integer, Name = "key_id")] public int KeyId { get; set; } [Number(NumberType.Integer, Name = "entity_id")] public int EntityId { get; set; } [Number(NumberType.Integer, Name = "entity_type")] public int EntityType { get; set; } [Text(Name = "key_name")] public string KeyName { get; set; } [Number(NumberType.Integer, Name = "weight")] public int Weight { get; set; } [Boolean(Name = "is_subsidiary")] public bool IsSubsidiary { get; set; } [Date(Name = "active_date")] public DateTimeOffset? ActiveDate { get; set; } [Number(NumberType.Integer, Name = "sys_tag_id")] public List<int> SysTagId { get; set; } }
数据同步
数据同步我采用了Quartz.Net定时调度任务框架,因此时效不高,所以每4小时同步一次即可,有42W多的数据,分批进行同步,每次查询1000条数据同时进行一次批量写入。全量同步一次的时间大概2分钟。因此使用RPC调用[ES业务API服务]。
因为具体业务逻辑已经封装在[ES业务API服务],因此同步逻辑也相对简单,查询出SQL Server数据源、聚合整理、调用[ES业务API服务]的批量写入接口、重新绑定别名到新的Index。
[DisallowConcurrentExecution] public class SearchKeySynchronousJob : BaseJob { public override void Execute() { var rm = SFNovelReadManager.Instance(); var maxId = 0; var size = 1000; string indexName = ""; while (true) { //避免一次性全部查询出来,每1000条一次写入。 var searchKey = sm.searchKey.GetList(size, maxId); if (!searchKey.Any()) break; var entityIds = searchKey.Select(a => a.EntityID).Distinct().ToList(); var sysTagRecord = rm.Novel.GetSysTagRecord(entityIds); var items = searchKey.Select(a => new SearchKeyPostItem { Weight = a.Weight, EntityType = a.EntityType, EntityId = a.EntityID, IsSubsidiary = a.IsSubsidiary ?? false, KeyName = a.KeyName, ActiveDate = a.ActiveDate, SysTagId = sysTagRecord.Where(c => c.EntityID == a.EntityID).Select(c => c.SysTagID).ToList(), KeyID = a.KeyID }).ToList(); //以一个聚合写入到ES var postResult = new SearchKeyPostRequest { IndexName = indexName, Items = items }.Excute(); if (postResult.Success) { indexName = (string)postResult.Data; maxId = searchKey.Max(a => a.KeyID); } } //别名从旧Index指向新的Index,最后删除旧Index var renameResult = new SearchKeyRenameRequest { IndexName = indexName }.Excute(); } } }
业务API接口
批量新增接口这里有2个细节点:
1.在第一次有数据进来的时候需要创建Mapping,因为得对KeyName字段定义分词器,其余字段都可以使用AutoMap即可。
2.新创建的Index名称是精确到秒的 SearchKey-202112261121
/// <summary> /// 批量新增作品搜索列表(返回创建的indexName) /// </summary> /// <param name="request"></param> /// <returns></returns> [HttpPost] public ApiResult Post(SearchKeyPostRequest request) { if (!request.Items.Any()) return ApiResult.IsFailed("无传入数据"); var date = DateTime.Now; var relationName = typeof(SearchKey).GetRelationName(); var indexName = request.IndexName.IsNullOrWhiteSpace() ? (relationName + "-" + date.ToString("yyyyMMddHHmmss")) : request.IndexName; if (request.IndexName.IsNullOrWhiteSpace()) { var createResult = _elasticClient.Indices.Create(indexName, a => a.Map<SearchKey>(m => m.AutoMap().Properties(p => p.Custom(new TextProperty { Name = "key_name", Analyzer = "standard", Fields = new Properties(new Dictionary<PropertyName, IProperty> { { new PropertyName("pinyin"),new TextProperty{ Analyzer = "pinyin"} }, { new PropertyName("standard"),new TextProperty{ Analyzer = "standard"} } }) })))); if (!createResult.IsValid && request.IndexName.IsNullOrWhiteSpace()) return ApiResult.IsFailed("创建索引失败"); } var document = request.Items.MapTo<List<SearchKey>>(); var result = _elasticClient.BulkAll(indexName, document); return result ? ApiResult.IsSuccess(data: indexName) : ApiResult.IsFailed(); }
重新绑定别名接口这里有4个细节点:
1.别名使用searchkey,只会有一个Index[searchkey-yyyyMMddHHmmss]会跟searchkey绑定.
2.优先把已绑定的Index查询出来,方便解绑与删除。
3.别名绑定在Elasticsearch虽然是原子性的,但是不是数据一致性的,因此得先Add后Remove。
4.删除旧得Index免得占用过多资源。
/// <summary> /// 重新绑定别名 /// </summary> /// <returns></returns> [HttpPut] public ApiResult Rename(SearchKeyRanameRequest request) { var aliasName = typeof(SearchKey).GetRelationName(); var getAliasResult = _elasticClient.Indices.GetAlias(aliasName); //给新index指定别名 var bulkAliasRequest = new BulkAliasRequest { Actions = new List<IAliasAction> { new AliasAddDescriptor().Index(request.IndexName).Alias(aliasName) } }; //移除别名里旧的索引 if (getAliasResult.IsValid) { var indeNameList = getAliasResult.Indices.Keys; foreach (var indexName in indeNameList) { bulkAliasRequest.Actions.Add(new AliasRemoveDescriptor().Index(indexName.Name).Alias(aliasName)); } } var result = _elasticClient.Indices.BulkAlias(bulkAliasRequest); //删除旧的index if (getAliasResult.IsValid) { var indeNameList = getAliasResult.Indices.Keys; foreach (var indexName in indeNameList) { _elasticClient.Indices.Delete(indexName); } } return result != null && result.ApiCall.Success ? ApiResult.IsSuccess() : ApiResult.IsFailed(); }
查询接口这里跟前面细节得差不多:
但是这里有一个得特别注意的点,可以看到这个查询接口同时使用了should和must,这里得设置minimumShouldMatch才能正常像SQL过滤。
should可以理解成SQL的Or,Must可以理解成SQL的And。
默认情况下minimumShouldMatch是等于0的,等于0的意思是,should不命中任何的数据仍然会返回must命中的数据,也就是你们可能想搜索(keyname.pinyin=’chengong‘ or keyname.standard=’chengong‘) and id > 0,但是es里没有存keyname='chengong'的数据,会把id> 0 而且 keyname != 'chengong' 数据给查询出来。
因此我们得对minimumShouldMatch=1,就是should条件必须得任意命中一个才能返回结果。
在should和must混用的情况下必须得注意minimumShouldMatch的设置!!!
/// <summary> /// 作品搜索列表 /// </summary> /// <param name="request"></param> /// <returns></returns> [HttpPost] [Route("search")] public ApiResult<List<SearchKeyGetResponse>> Get(SearchKeyGetRequest request) { var shouldQuerys = new List<Func<QueryContainerDescriptor<SearchKey>, QueryContainer>>(); int minimumShouldMatch = 0; if (!request.KeyName.IsNullOrWhiteSpace()) { shouldQuerys.Add(a => a.MatchPhrase(m => m.Field("key_name.pinyin").Query(request.KeyName))); shouldQuerys.Add(a => a.MatchPhrase(m => m.Field("key_name.standard").Query(request.KeyName))); minimumShouldMatch = 1; } var mustQuerys = new List<Func<QueryContainerDescriptor<SearchKey>, QueryContainer>> { a => a.Range(t => t.Field(f => f.Weight).GreaterThanOrEquals(0)) }; if (request.IsSubsidiary.HasValue) mustQuerys.Add(a => a.Term(t => t.Field(f => f.IsSubsidiary).Value(request.IsSubsidiary.Value))); if (request.SysTagIds != null && request.SysTagIds.Any()) mustQuerys.Add(a => a.Terms(t => t.Field(f => f.SysTagId).Terms(request.SysTagIds))); if (request.EntityType.HasValue) { if (request.EntityType.Value == ESearchKey.EntityType.AllNovel) { mustQuerys.Add(a => a.Terms(t => t.Field(f => f.EntityType).Terms(ESearchKey.EntityType.Novel, ESearchKey.EntityType.ChatNovel, ESearchKey.EntityType.FanNovel))); } else mustQuerys.Add(a => a.Term(t => t.Field(f => f.EntityType).Value((int)request.EntityType.Value))); } var sortDescriptor = new SortDescriptor<SearchKey>(); sortDescriptor = request.Sort == ESearchKey.Sort.Weight ? sortDescriptor.Field(f => f.Weight, SortOrder.Descending) : sortDescriptor.Field(f => f.ActiveDate, SortOrder.Descending); var searchResult = _elasticClient.Search<SearchKey>(a => a.Index(typeof(SearchKey).GetRelationName()) .From(request.Size * request.Page) .Size(request.Size) .Query(q => q.Bool(b => b.Should(shouldQuerys).Must(mustQuerys).MinimumShouldMatch(minimumShouldMatch))) .Sort(s => sortDescriptor)); var apiResult = searchResult.GetApiResult<SearchKey, List<SearchKeyGetResponse>>(); if (apiResult.Success) return apiResult; return ApiResult<List<SearchKeyGetResponse>>.IsSuccess("空集合数据"); }
APM监控
虽然在上面我做了足够的实现准备,但是对于上生产后的实际使用效果我还是希望有一个直观的体现。我之前写了一篇文章《.Net微服务实战之可观测性》很好叙述了该种情况,有兴趣的可以移步去看看。
在之前公司做微服务的时候的APM选型我们使用了Skywalking,但是现在这家公司的运维没有接触过,但是对于Elastic Stack他相对比较熟悉,如同上文所说架构设计的输入核心为两点:满足需求与组织架构,秉着我的技术选型原则是基于团队架构,我们采用了Elastic APM + Kibana(7.4版本),如下图所示: