MySQL · 引擎介绍 · Sphinx源码剖析(三)

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介:

在本节中我会介绍Sphinx在构建索引之前做的一些事情,主要是从mysql拉取数据保存,然后分词排序保存到内存等等一系列的操作。下面是几个相关指令

 sql_query = \
 SELECT id, group_id, UNIX_TIMESTAMP(date_added) AS date_added, \
 title, content \
 FROM documents
 sql_query_range = SELECT MIN(id),MAX(id) FROM documents
 sql_range_step = 1000 

其中sql_query是sphinx每次从mysql拉取数据的sql,而sql_query_range则是取得需要从mysql拉取的数据条目,而sql_rang_step则是表示每次从mysql拉取多少数据。sql_rang_range执行分两种情况,第一种是第一次拉取数据的时候,第二种是当当前的range数据读取完毕之后。

首先来看CSphSource_SQL::NextDocument函数,这个函数的主要作用是从mysql读取数据然后切分保存,首先我们来看读取数据这一部分,这里步骤很简单,就是执行对应的sql,然后判断当前range的数据是否读取完毕,如果读取完毕则继续执行sql_query_rang(RunQueryStep)。这里要注意的是,sphinx读取数据是一条一条的读取然后执行的.

 do
	{
		// try to get next row bool bGotRow = SqlFetchRow ();

		// when the party's over... while ( !bGotRow )
		{
			// is that an error? if ( SqlIsError() )
			{
				sError.SetSprintf ( "sql_fetch_row: %s", SqlError() );
				m_tDocInfo.m_uDocID = 1; // 0 means legal eof return NULL;
			}

			// maybe we can do next step yet? if ( !RunQueryStep ( m_tParams.m_sQuery.cstr(), sError ) )
			{
				// if there's a message, there's an error // otherwise, we're just over if ( !sError.IsEmpty() )
				{
					m_tDocInfo.m_uDocID = 1; // 0 means legal eof return NULL;
				}

			} else
			{
				// step went fine; try to fetch
				bGotRow = SqlFetchRow ();
				continue;
			}

			SqlDismissResult ();

			// ok, we're over
			ARRAY_FOREACH ( i, m_tParams.m_dQueryPost )
			{
				if ( !SqlQuery ( m_tParams.m_dQueryPost[i].cstr() ) )
				{
					sphWarn ( "sql_query_post[%d]: error=%s, query=%s",
						i, SqlError(), m_tParams.m_dQueryPost[i].cstr() );
					break;
				}
				SqlDismissResult ();
			}

			m_tDocInfo.m_uDocID = 0; // 0 means legal eof return NULL;
		}

		// get him!
		m_tDocInfo.m_uDocID = VerifyID ( sphToDocid ( SqlColumn(0) ) );
		m_uMaxFetchedID = Max ( m_uMaxFetchedID, m_tDocInfo.m_uDocID );
	} while ( !m_tDocInfo.m_uDocID );

上面的代码我们可以看到一个很关键的字段m_uDocID,这个字段表示当前doc的id(因此数据库的表设计必须有这个id字段).

读取完毕数据之后,开始处理读取的数据,这里会按照字段来切分,主要是将对应的数据库字段保存到索引fielld

 // split columns into fields and attrs for ( int i=0; i<m_iPlainFieldsLength; i++ )
	{
		// get that field #if USE_ZLIB if ( m_dUnpack[i]!=SPH_UNPACK_NONE )
		{
			DWORD uUnpackedLen = 0;
			m_dFields[i] = (BYTE*) SqlUnpackColumn ( i, uUnpackedLen, m_dUnpack[i] );
			m_dFieldLengths[i] = (int)uUnpackedLen;
			continue;
		}
		#endif
		m_dFields[i] = (BYTE*) SqlColumn ( m_tSchema.m_dFields[i].m_iIndex );
		m_dFieldLengths[i] = SqlColumnLength ( m_tSchema.m_dFields[i].m_iIndex );
	}

紧接着就是处理attribute,后续我们会详细介绍attribute,现在我们只需要知道它是一个类似二级索引的东西(不进入全文索引).

 switch ( tAttr.m_eAttrType )
		{
			case SPH_ATTR_STRING:
			case SPH_ATTR_JSON:
				// memorize string, fixup NULLs
				m_dStrAttrs[i] = SqlColumn ( tAttr.m_iIndex );
				if ( !m_dStrAttrs[i].cstr() )
					m_dStrAttrs[i] = "";

				m_tDocInfo.SetAttr ( tAttr.m_tLocator, 0 );
				break;
..................................
			default:
				// just store as uint by default
				m_tDocInfo.SetAttr ( tAttr.m_tLocator, sphToDword ( SqlColumn ( tAttr.m_iIndex ) ) ); // FIXME? report conversion errors maybe? break;
		}

然后我们来看Sphinx如何处理得到的数据,核心代码在 RtIndex_t::AddDocument中,这个函数主要是用来分词(IterateHits中)然后保存数据到对应的数据结构,而核心的数据结构是RtAccum_t,也就是最终sphinx在写索引到文件之前,会将数据保存到这个数据结构,这里要注意一般来说sphinx会保存很多数据,然后最后一次性提交给索引引擎来处理.而索引引擎中处理的就是这个数据结构.因此最终会调用RtAccum_t::AddDocument.

这里需要注意两个地方,第一个是m_dAccum这个域,这个域是一个vector,而这个vector里面保存了CSphWordHit这个结构,我们来看这个结构的定义

 struct CSphWordHit
 {
 SphDocID_t		m_uDocID;		///< document ID
 SphWordID_t		m_uWordID;		///< word ID in current dictionary
 Hitpos_t		m_uWordPos;		///< word position in current document
 };

可以看到其实这个结构也就是保存了对应分词的信息.

然后我们来看核心代码,这里主要是便利刚才从mysql得到的数据,去重然后保存数据.

	int iHits = 0;
	if ( pHits && pHits->Length() )
	{
		CSphWordHit tLastHit;
		tLastHit.m_uDocID = 0;
		tLastHit.m_uWordID = 0;
		tLastHit.m_uWordPos = 0;

		iHits = pHits->Length();
		m_dAccum.Reserve ( m_dAccum.GetLength()+iHits );
		for ( const CSphWordHit * pHit = pHits->First(); pHit<=pHits->Last(); pHit++ )
		{
			// ignore duplicate hits if ( pHit->m_uDocID==tLastHit.m_uDocID && pHit->m_uWordID==tLastHit.m_uWordID && pHit->m_uWordPos==tLastHit.m_uWordPos )
				continue;

			// update field lengths if ( pFieldLens && HITMAN::GetField ( pHit->m_uWordPos )!=HITMAN::GetField ( tLastHit.m_uWordPos ) )
				pFieldLens [ HITMAN::GetField ( tLastHit.m_uWordPos ) ] = HITMAN::GetPos ( tLastHit.m_uWordPos );

			// accumulate
			m_dAccum.Add ( *pHit );
			tLastHit = *pHit;
		}
		if ( pFieldLens )
			pFieldLens [ HITMAN::GetField ( tLastHit.m_uWordPos ) ] = HITMAN::GetPos ( tLastHit.m_uWordPos );
	}

做完上面这些事情之后,就需要提交数据给索引处理引擎了,这里核心的代码都是在RtIndex_t::Commit中.

这个函数主要做两个事情,第一个提取出前面我们构造好的RtAccum_t,然后对于所有的doc进行排序,创建segment,也就是对应的索引块(ram chunk),最后调用CommitReplayable来提交ram chunk到磁盘.

其实可以这么理解,保存在内存中的索引也就是segment,然后当内存的大小到达限制后就会刷新内存中的索引到磁盘.

 void RtIndex_t::Commit ( int * pDeleted, ISphRtAccum * pAccExt )
 {
 assert ( g_bRTChangesAllowed );
 MEMORY ( MEM_INDEX_RT );

 RtAccum_t * pAcc = AcquireAccum ( NULL, pAccExt, true );
 if ( !pAcc )
 return;

 ...................................
 pAcc->Sort();

 RtSegment_t * pNewSeg = pAcc->CreateSegment ( m_tSchema.GetRowSize(), m_iWordsCheckpoint );
 .............................................

 // now on to the stuff that needs locking and recovery
 CommitReplayable ( pNewSeg, pAcc->m_dAccumKlist, pDeleted );
 ......................................
 }

然后我们来看RtAccum_t::CreateSegment函数,这个函数用来将分词好的数据保存到ram chunk,这里需要注意两个数据结构分别是RtDoc_t和RtWord_t,这两个数据结构分别表示doc信息和分词信息.

结构很简单,后面的注释都很详细

 template < typename DOCID = SphDocID_t >
 struct RtDoc_T
 {
 DOCID						m_uDocID;	///< my document id
 DWORD						m_uDocFields;	///< fields mask
 DWORD						m_uHits;	///< hit count
 DWORD						m_uHit;		///< either index into segment hits, or the only hit itself (if hit count is 1)
 };

 template < typename WORDID=SphWordID_t >
 struct RtWord_T
 {
 union
 {
 WORDID					m_uWordID;	///< my keyword id const BYTE *			m_sWord;
 };
 DWORD						m_uDocs;	///< document count (for stats and/or BM25)
 DWORD						m_uHits;	///< hit count (for stats and/or BM25)
 DWORD						m_uDoc;		///< index into segment docs
 };

然后来看代码,首先是初始化对应的写结构,可以看到都是会写到我们创建好的segment中.

 RtDocWriter_t tOutDoc ( pSeg );
	RtWordWriter_t tOutWord ( pSeg, m_bKeywordDict, iWordsCheckpoint );
	RtHitWriter_t tOutHit ( pSeg );

然后就是写数据了,这里主要是做一个聚合,也就是将相同的keyword对应的属性聚合起来.

	ARRAY_FOREACH ( i, m_dAccum )
	{
 .......................................
		// new keyword; flush current keyword if ( tHit.m_uWordID!=tWord.m_uWordID )
		{
			tOutDoc.ZipRestart ();
			if ( tWord.m_uWordID )
			{
				if ( m_bKeywordDict )
				{
					const BYTE * pPackedWord = pPacketBase + tWord.m_uWordID;
					assert ( pPackedWord[0] && pPackedWord[0]+1<m_pDictRt->GetPackedLen() );
					tWord.m_sWord = pPackedWord;
				}
				tOutWord.ZipWord ( tWord );
			}

			tWord.m_uWordID = tHit.m_uWordID;
			tWord.m_uDocs = 0;
			tWord.m_uHits = 0;
			tWord.m_uDoc = tOutDoc.ZipDocPtr();
			uPrevHit = EMPTY_HIT;
		}
 ..................
 }

这次就分析到这里,下次我们将会分析最核心的部分就是Sphinx如何刷新数据到磁盘.

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
3月前
|
监控 数据可视化 关系型数据库
微服务架构+Java+Spring Cloud +UniApp +MySql智慧工地系统源码
项目管理:项目名称、施工单位名称、项目地址、项目地址、总造价、总面积、施工准可证、开工日期、计划竣工日期、项目状态等。
308 6
|
4月前
|
JavaScript 关系型数据库 MySQL
基于JavaWeb和mysql实现校园订餐前后台管理系统(源码+数据库)
基于JavaWeb和mysql实现校园订餐前后台管理系统(源码+数据库)
|
4月前
|
NoSQL 关系型数据库 MySQL
基于Python和mysql开发的智慧校园答题考试系统(源码+数据库+程序配置说明书+程序使用说明书)
基于Python和mysql开发的智慧校园答题考试系统(源码+数据库+程序配置说明书+程序使用说明书)
|
7天前
|
运维 监控 安全
云HIS医疗管理系统源码——技术栈【SpringBoot+Angular+MySQL+MyBatis】
云HIS系统采用主流成熟技术,软件结构简洁、代码规范易阅读,SaaS应用,全浏览器访问前后端分离,多服务协同,服务可拆分,功能易扩展;支持多样化灵活配置,提取大量公共参数,无需修改代码即可满足不同客户需求;服务组织合理,功能高内聚,服务间通信简练。
26 4
|
4月前
|
NoSQL 关系型数据库 MySQL
基于Python和mysql开发的BBS问答社区管理系统(源码+数据库+程序配置说明书+程序使用说明书)
基于Python和mysql开发的BBS问答社区管理系统(源码+数据库+程序配置说明书+程序使用说明书)
|
19天前
|
存储 关系型数据库 MySQL
MySQL引擎对决:深入解析MyISAM和InnoDB的区别
MySQL引擎对决:深入解析MyISAM和InnoDB的区别
34 0
|
4月前
|
前端开发 IDE Java
基于Springboot+MYSQL+Maven实现的宠物医院管理系统(源码+数据库+运行指导文档+项目运行指导视频)
基于Springboot+MYSQL+Maven实现的宠物医院管理系统(源码+数据库+运行指导文档+项目运行指导视频)
167 0
|
10天前
|
存储 缓存 关系型数据库
MySQL的InnoDB引擎:深度解析与应用
【4月更文挑战第20天】本文深入探讨MySQL的InnoDB引擎,它采用MVCC和行级锁定实现高并发、高性能数据操作。InnoDB通过缓冲池减少I/O,支持ACID事务、外键约束和行级锁定,提供数据一致性。此外,还支持全文索引和灵活的索引策略。其高并发性能、数据一致性和可扩展性使其成为首选存储引擎。
29 12
|
12天前
|
Java 关系型数据库 MySQL
一套java+ spring boot与vue+ mysql技术开发的UWB高精度工厂人员定位全套系统源码有应用案例
UWB (ULTRA WIDE BAND, UWB) 技术是一种无线载波通讯技术,它不采用正弦载波,而是利用纳秒级的非正弦波窄脉冲传输数据,因此其所占的频谱范围很宽。一套UWB精确定位系统,最高定位精度可达10cm,具有高精度,高动态,高容量,低功耗的应用。
一套java+ spring boot与vue+ mysql技术开发的UWB高精度工厂人员定位全套系统源码有应用案例
|
19天前
|
消息中间件 Java 关系型数据库
JAVA云HIS医院管理系统源码、基于Angular+Nginx+ Java+Spring,SpringBoot+ MySQL + MyCat
JAVA云HIS医院管理系统 常规模版包括门诊管理、住院管理、药房管理、药库管理、院长查询、电子处方、物资管理、媒体管理等,为医院管理提供更有力的保障。 HIS系统以财务信息、病人信息和物资信息为主线,通过对信息的收集、存储、传递、统计、分析、综合查询、报表输出和信息共享,及时为医院领导及各部门管理人员提供全面、准确的各种数据。