管好超时才能做好异步

简介:

    前段时间伤自尊了,被人三言两语给问蒙圈了,蔫头耷脑好半天愣没缓不过来,俗话说得好呀,刀不磨要生锈人不学要落后,人过三十不学艺人过四十天过午,老老实实做人踏踏实实做学问,证明自己实在不是干技术的料也好早点转行去卖鸡蛋灌饼。

    开始今天话题前先澄清一个事,小蓝单车发布麒麟计划,共享单车即将进入精准广告、实时导航、运动数据和快乐骑行时代,联系二个月前《细思极恐的共享单车》的博文,朋友圈及各路粉丝一再追问我和蓝去去的关系,今天正式澄清:本人不是小蓝单车的产品经理,也没有从事相关方面工作,文章内容如有雷同纯属巧合。

    回到正题,今天说说超时,做服务一定绕不开大并发、高效率、可靠稳定、快速响应这些关键词,从技术的角度上讲,每个环节每个步骤都有值得大书特书一番的地方,不过个人觉得最最重要的地方还是超时管理,这方面做好就成功一大半了。首先介绍一下应用场景,作为一个服务,对外受理接入请求,请求报文到达以后先解析,理解用户想要干什么,对于当下大多数的互联网业务,往往需要一系列逻辑处理,这边取点标签那边取点参数,汇总起来算一算得到最终结果再给用户返回回去。随着互联网的发展数据量越来越大,数据本地化早已不能满足业务需要,在逻辑处理过程中数次调用远程通信已是再平常不过的事情了,这就产生了上下游多个节点间的数据通信与传输的问题,那么多连接有tcp的有udp的,有长连接的有短连接的,有先发后回的,有后发先回的,有发了不回的,有不要还回的,各种情况都要考虑到,一个字忒乱。

    怎样把超时管理起来呢,epoll非常棒但终究是事件触发的,把超时转化为事件是想要达到的目的。成千上万个连接的超时设置的有长有短,最讨厌的是这些连接还是剧烈频繁变动的,对于有序容器管理这些连接成本真的有点高,要是再加把锁实现进(线)程同步,效率有点让人焦虑,既然不能全局有序可不可以局部有序呢,只关心最值而不是顺序本身,根堆的偏序特性是个相当不错的选择,二叉树每次插值调姿开销确实有点大。根堆分大小,取决于比较函数,不是全局有序因此迭代器意义不大,元素追加弹出基本够用了,在用例中比较函数计算(入队事件 + 超时事件 - 当前事件)最小值当做堆顶。

001	while (true) {
002		if (sg_CPQueue.GtPQueueTop(stNode) > 0) {
003			/* 小根堆最短超时 */
004			iTotal = epoll_wait(sg_CEpoll.GtEpollGetEpoll(), stEvent, GT_EVENT, GtAdpDeadline(stNode.m_stTimer));
005		}
006		else {
007			/* 监听套接字事件 */
008			iTotal = epoll_wait(sg_CEpoll.GtEpollGetEpoll(), stEvent, GT_EVENT, -1);
009		}
010
011		if (iTotal > 0) {
012			/* 套接字事件处理 */
013			for (iCount = 0; iCount < iTotal; iCount ++) {
014				if (stEvent[iCount].data.fd == (sg_CEpoll.GtEpollGetNet())->GtNetGetSocket()) {
015					/* 新接入客户事件 */
016					while ((iSocket = accept((sg_CEpoll.GtEpollGetNet())->GtNetGetSocket(), (struct sockaddr*)&stClient, &iClient)) > 0) {
017						GtAdpEpollAdd(iSocket);
018					}
019				}
020				else {
021					/* 读写事件派发中 */
022					sg_CPool.GtPoolDistribute((void*)stEvent[iCount].data.fd);
023				}
024			}
025		}
026		else {
027			/* 小根堆超时清理 */
028			if (stNode.m_iLower > 0) {
029				GtAdpErrors(stNode.m_iUpper);
030				GtAdpEpollDel(stNode.m_iLower);
031				GtAdpPQueueDel(stNode.m_iLower);
032			}
033		}
034	}
wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw== wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==

     第2行从小根堆中获取一个元素,获取成功就用套接字超时作为epoll_wait超时,否则阻塞直到有事件发生为止。

    第11行判断返回值,如果大于零表明有待处理事件发生,等于零表明有超时发生。

     第13行至24行对事件处理,是新接入子套接字则加入epoll监听,否则派发任务到工作线程。

     第28行至32行清除epoll事件并从根堆中剔除元素,同时返回上游错误信息。

001	void* GtAdpPoolCore(void* pPara)
002	{
003		GtNet::CGtNet CNet;
004		UCHAR uszBuf[GT_PACKET] = {0};
005	
006		CNet.GtNetSetSocket((int)pPara);
007		// UCHAR(0x04) | int(socket) | data ...
008		if (read(CNet.GtNetGetSocket(), uszBuf, GT_PACKET) > 0) {
009			if (GT_ADP_PKTTAG != uszBuf[0]) {
010				/* 上游请求受理 */
011				GtAdpPoolUpper(CNet, uszBuf);
012			}
013			else {
014				/* 下游业务操作 */
015				GtAdpPoolLower(CNet, uszBuf);
016			}
017		}
018		else {
019			/* 上游连接关闭 */
020			GtAdpEpollDel(CNet.GtNetGetSocket());
021		}
022	
023		return NULL;
024	}
wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw== wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==

    第8行读取消息(既可能是上游消息也可能是下游消息),大于零启动业务处理,否则清除epoll事件并关闭连接。

    第9行识别报文格式,用例采用0x04标识示意,具体业务侧应采用更加严格的定义。

    第11行调用上半场处理函数。

    第15行调用下半场处理函数。

001	void GtAdpPoolUpper(GtNet::CGtNet& rCUpper, UCHAR* puszPacket)
002	{
003		int iSocket = 0;
004		UCHAR uszBuf[GT_PACKET] = {0};
005		char szUrl[GT_BYTE64] = "192.168.0.221:12345";
006	
007	#ifdef _GT_TCP_
008		GtNet::CGtTcp CLower;
009	#else
010		GtNet::CGtUdp CLower;
011	#endif
012	
013		// 业务处理开始
014		uszBuf[0] = GT_ADP_PKTTAG;
015		iSocket = rCUpper.GtNetGetSocket();
016		memcpy(uszBuf + sizeof(UCHAR), &iSocket, sizeof(int));
017		sprintf((char*)uszBuf + sizeof(UCHAR) + sizeof(int), "GtAdapter: %s", (char*)puszPacket);
018		// 业务处理结束
019	
020	#ifdef _GT_TCP_
021		if (GT_SUCCESS == CLower.GtTcpConnect(szUrl)) {
022	#else
023		if (GT_SUCCESS == CLower.GtUdpConnect(szUrl)) {
024	#endif
025			CLower.GtNetSend(uszBuf, sizeof(UCHAR) + sizeof(int) + strlen((char*)uszBuf + sizeof(UCHAR) + sizeof(int)));
026	
027			/* 下游启动关注 */
028			GtAdpEpollAdd(CLower.GtNetGetSocket());
029			GtAdpPQueueAdd(iSocket, CLower.GtNetGetSocket());
030		}
031		else {
032			GtAdpErrors(rCUpper.GtNetGetSocket());
033		}
034	
035		return;
036	}
wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw== wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==

    第13行至18行业务实现过程。

    第21行或23行为初始化下游连接操作(tcp和udp是个人封装过的),需要注意的是,udp需要采用connect方式初始化,即将对端地址关联到套接字上,以方便read读取。

    第28行和29行在发送完毕数据以后将套接字再次注册事件和追加到超时根堆中去。

    第32行若tcp连接失败则向上游返回失败信息,udp只能通过超时判断。

001	void GtAdpPoolLower(GtNet::CGtNet& rCLower, UCHAR* puszPacket)
002	{
003		int iResult = GT_SUCCESS;
004		UCHAR uszBuf[GT_PACKET] = {0};
005		int iSocket = *(int*)(puszPacket + sizeof(UCHAR));
006	
007		/* 下游解除关注 */
008		GtAdpEpollDel(rCLower.GtNetGetSocket());
009		GtAdpPQueueDel(rCLower.GtNetGetSocket());
010	
011		// 业务处理开始
012		iResult = (NULL != strcpy((char*)uszBuf, (char*)puszPacket + sizeof(UCHAR) + sizeof(int))) ? GT_SUCCESS : GT_FAILURE;
013		// 业务处理结束
014	
015		if (GT_SUCCESS == iResult) {
016			GtAdpResponse(iSocket, uszBuf, strlen((char*)uszBuf));
017		}
018		else {
019			GtAdpErrors(iSocket);
020		}
021	
022		return;
023	}
wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw== wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==

    第8行至9行清除epoll事件并从根堆中删除元素。

    第11行至13行业务实现过程。

    第15行至20行根据业务实现结果向上游返回不同的消息。

001	long GtAdpDeadline(struct timeval& rstTimer)
002	{
003		long lTimer = 0;
004		struct timeval stNow, stTimer;
005	
006		gettimeofday(&stNow, NULL);
007		(rstTimer.tv_usec += GT_ADP_TIMOUT) > GT_ADP_ONESEC ? rstTimer.tv_sec ++, rstTimer.tv_usec -= GT_ADP_ONESEC : 0;
008	
009		stTimer.tv_sec = rstTimer.tv_sec - stNow.tv_sec;
010		stTimer.tv_usec = rstTimer.tv_usec - stNow.tv_usec;
011		stTimer.tv_usec < 0 ? stTimer.tv_sec -= 1, stTimer.tv_usec += GT_ADP_ONESEC : 0;
012	
013		lTimer = stTimer.tv_sec * GT_ADP_TMBASE + stTimer.tv_usec / GT_ADP_TMBASE;
014	
015		return lTimer > 0 ? lTimer : 0;
016	}
wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw== wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==

    第6行获取当前的系统时间。

    第7行入队时间加上超时时间。

    第9行至11行计算当前时间与超时时间的差值。

    第13行将差值转换成epoll超时的时间单位。

    完整的代码片段http://git.oschina.net/gonglibin/codes/rc87dt0iagk4exqm3psuw66,作为论证和用例代码仅供参考,调试测试通过,根堆封装成对象的时候已经加入锁,epoll对象早年封装的没有考虑同步的问题,代码中通过注释已经提请留意了。

    外面下雨了,前些日子的暑热一扫而光,Google I/O 2017开发者大会518开幕了,谷歌宣布kotlin作为一级编程语言,消息一出有小伙伴慌了,其实木必要,kotlin好比加了语法糖的java,跟java原本就有着血浓于水的感情,况且谷歌与oracle豪门之间的恩怨不至于革小码农们的命,编程思想比实现语言更重要,人工智能与VR是好东西,未来有前途,抽空研究研究。

相关文章
|
2月前
|
消息中间件 监控 Java
接口请求重试策略:保障稳定性的必杀技
接口请求重试策略:保障稳定性的必杀技
67 0
|
9月前
有几百万消息持续积压几小时,怎么解决
有几百万消息持续积压几小时,怎么解决
50 0
|
4月前
|
前端开发 Cloud Native 大数据
坑爹,线上同步近 3w 个用户导致链路阻塞引入发的线上问题,你经历过吗?
坑爹,线上同步近 3w 个用户导致链路阻塞引入发的线上问题,你经历过吗?
|
7月前
|
前端开发 JavaScript BI
做好这四步,服务端轻松成为全栈化人才
本文总结了作者和团队的其他同学在全栈化实践中的痛点和问题,分享了自己的解决思路和经验,希望让后人可以更轻松加入全栈化大家庭,走的更快更稳。
44360 13
做好这四步,服务端轻松成为全栈化人才
|
12月前
|
消息中间件 存储 前端开发
一次线上事故,我顿悟了异步的精髓
一次线上事故,我顿悟了异步的精髓
|
消息中间件 中间件 Java
分布式事务解决方案-最大努力通知|学习笔记
快速学习分布式事务解决方案-最大努力通知
78 0
分布式事务解决方案-最大努力通知|学习笔记
|
Kubernetes 负载均衡 算法
异步任务处理系统,如何解决业务长耗时、高并发难题?
阿里云函数计算 FC 为用户提供了开箱即用的,接近于Level ß3能力的异步任务处理服务。用户只需要创建任务处理函数,通过控制台,命令行工具,API/SDK,事件触发等多种方式提交任务,就可以弹性、可靠、可观测完备的方式处理任务。
异步任务处理系统,如何解决业务长耗时、高并发难题?
|
人工智能 关系型数据库 MySQL
稳定性三十六计-超时处理
稳定性三十六计-超时处理
稳定性三十六计-超时处理

热门文章

最新文章