管好超时才能做好异步

简介:

    前段时间伤自尊了,被人三言两语给问蒙圈了,蔫头耷脑好半天愣没缓不过来,俗话说得好呀,刀不磨要生锈人不学要落后,人过三十不学艺人过四十天过午,老老实实做人踏踏实实做学问,证明自己实在不是干技术的料也好早点转行去卖鸡蛋灌饼。

    开始今天话题前先澄清一个事,小蓝单车发布麒麟计划,共享单车即将进入精准广告、实时导航、运动数据和快乐骑行时代,联系二个月前《细思极恐的共享单车》的博文,朋友圈及各路粉丝一再追问我和蓝去去的关系,今天正式澄清:本人不是小蓝单车的产品经理,也没有从事相关方面工作,文章内容如有雷同纯属巧合。

    回到正题,今天说说超时,做服务一定绕不开大并发、高效率、可靠稳定、快速响应这些关键词,从技术的角度上讲,每个环节每个步骤都有值得大书特书一番的地方,不过个人觉得最最重要的地方还是超时管理,这方面做好就成功一大半了。首先介绍一下应用场景,作为一个服务,对外受理接入请求,请求报文到达以后先解析,理解用户想要干什么,对于当下大多数的互联网业务,往往需要一系列逻辑处理,这边取点标签那边取点参数,汇总起来算一算得到最终结果再给用户返回回去。随着互联网的发展数据量越来越大,数据本地化早已不能满足业务需要,在逻辑处理过程中数次调用远程通信已是再平常不过的事情了,这就产生了上下游多个节点间的数据通信与传输的问题,那么多连接有tcp的有udp的,有长连接的有短连接的,有先发后回的,有后发先回的,有发了不回的,有不要还回的,各种情况都要考虑到,一个字忒乱。

    怎样把超时管理起来呢,epoll非常棒但终究是事件触发的,把超时转化为事件是想要达到的目的。成千上万个连接的超时设置的有长有短,最讨厌的是这些连接还是剧烈频繁变动的,对于有序容器管理这些连接成本真的有点高,要是再加把锁实现进(线)程同步,效率有点让人焦虑,既然不能全局有序可不可以局部有序呢,只关心最值而不是顺序本身,根堆的偏序特性是个相当不错的选择,二叉树每次插值调姿开销确实有点大。根堆分大小,取决于比较函数,不是全局有序因此迭代器意义不大,元素追加弹出基本够用了,在用例中比较函数计算(入队事件 + 超时事件 - 当前事件)最小值当做堆顶。

001	while (true) {
002		if (sg_CPQueue.GtPQueueTop(stNode) > 0) {
003			/* 小根堆最短超时 */
004			iTotal = epoll_wait(sg_CEpoll.GtEpollGetEpoll(), stEvent, GT_EVENT, GtAdpDeadline(stNode.m_stTimer));
005		}
006		else {
007			/* 监听套接字事件 */
008			iTotal = epoll_wait(sg_CEpoll.GtEpollGetEpoll(), stEvent, GT_EVENT, -1);
009		}
010
011		if (iTotal > 0) {
012			/* 套接字事件处理 */
013			for (iCount = 0; iCount < iTotal; iCount ++) {
014				if (stEvent[iCount].data.fd == (sg_CEpoll.GtEpollGetNet())->GtNetGetSocket()) {
015					/* 新接入客户事件 */
016					while ((iSocket = accept((sg_CEpoll.GtEpollGetNet())->GtNetGetSocket(), (struct sockaddr*)&stClient, &iClient)) > 0) {
017						GtAdpEpollAdd(iSocket);
018					}
019				}
020				else {
021					/* 读写事件派发中 */
022					sg_CPool.GtPoolDistribute((void*)stEvent[iCount].data.fd);
023				}
024			}
025		}
026		else {
027			/* 小根堆超时清理 */
028			if (stNode.m_iLower > 0) {
029				GtAdpErrors(stNode.m_iUpper);
030				GtAdpEpollDel(stNode.m_iLower);
031				GtAdpPQueueDel(stNode.m_iLower);
032			}
033		}
034	}

     第2行从小根堆中获取一个元素,获取成功就用套接字超时作为epoll_wait超时,否则阻塞直到有事件发生为止。

    第11行判断返回值,如果大于零表明有待处理事件发生,等于零表明有超时发生。

     第13行至24行对事件处理,是新接入子套接字则加入epoll监听,否则派发任务到工作线程。

     第28行至32行清除epoll事件并从根堆中剔除元素,同时返回上游错误信息。

001	void* GtAdpPoolCore(void* pPara)
002	{
003		GtNet::CGtNet CNet;
004		UCHAR uszBuf[GT_PACKET] = {0};
005	
006		CNet.GtNetSetSocket((int)pPara);
007		// UCHAR(0x04) | int(socket) | data ...
008		if (read(CNet.GtNetGetSocket(), uszBuf, GT_PACKET) > 0) {
009			if (GT_ADP_PKTTAG != uszBuf[0]) {
010				/* 上游请求受理 */
011				GtAdpPoolUpper(CNet, uszBuf);
012			}
013			else {
014				/* 下游业务操作 */
015				GtAdpPoolLower(CNet, uszBuf);
016			}
017		}
018		else {
019			/* 上游连接关闭 */
020			GtAdpEpollDel(CNet.GtNetGetSocket());
021		}
022	
023		return NULL;
024	}

    第8行读取消息(既可能是上游消息也可能是下游消息),大于零启动业务处理,否则清除epoll事件并关闭连接。

    第9行识别报文格式,用例采用0x04标识示意,具体业务侧应采用更加严格的定义。

    第11行调用上半场处理函数。

    第15行调用下半场处理函数。

001	void GtAdpPoolUpper(GtNet::CGtNet& rCUpper, UCHAR* puszPacket)
002	{
003		int iSocket = 0;
004		UCHAR uszBuf[GT_PACKET] = {0};
005		char szUrl[GT_BYTE64] = "192.168.0.221:12345";
006	
007	#ifdef _GT_TCP_
008		GtNet::CGtTcp CLower;
009	#else
010		GtNet::CGtUdp CLower;
011	#endif
012	
013		// 业务处理开始
014		uszBuf[0] = GT_ADP_PKTTAG;
015		iSocket = rCUpper.GtNetGetSocket();
016		memcpy(uszBuf + sizeof(UCHAR), &iSocket, sizeof(int));
017		sprintf((char*)uszBuf + sizeof(UCHAR) + sizeof(int), "GtAdapter: %s", (char*)puszPacket);
018		// 业务处理结束
019	
020	#ifdef _GT_TCP_
021		if (GT_SUCCESS == CLower.GtTcpConnect(szUrl)) {
022	#else
023		if (GT_SUCCESS == CLower.GtUdpConnect(szUrl)) {
024	#endif
025			CLower.GtNetSend(uszBuf, sizeof(UCHAR) + sizeof(int) + strlen((char*)uszBuf + sizeof(UCHAR) + sizeof(int)));
026	
027			/* 下游启动关注 */
028			GtAdpEpollAdd(CLower.GtNetGetSocket());
029			GtAdpPQueueAdd(iSocket, CLower.GtNetGetSocket());
030		}
031		else {
032			GtAdpErrors(rCUpper.GtNetGetSocket());
033		}
034	
035		return;
036	}

    第13行至18行业务实现过程。

    第21行或23行为初始化下游连接操作(tcp和udp是个人封装过的),需要注意的是,udp需要采用connect方式初始化,即将对端地址关联到套接字上,以方便read读取。

    第28行和29行在发送完毕数据以后将套接字再次注册事件和追加到超时根堆中去。

    第32行若tcp连接失败则向上游返回失败信息,udp只能通过超时判断。

001	void GtAdpPoolLower(GtNet::CGtNet& rCLower, UCHAR* puszPacket)
002	{
003		int iResult = GT_SUCCESS;
004		UCHAR uszBuf[GT_PACKET] = {0};
005		int iSocket = *(int*)(puszPacket + sizeof(UCHAR));
006	
007		/* 下游解除关注 */
008		GtAdpEpollDel(rCLower.GtNetGetSocket());
009		GtAdpPQueueDel(rCLower.GtNetGetSocket());
010	
011		// 业务处理开始
012		iResult = (NULL != strcpy((char*)uszBuf, (char*)puszPacket + sizeof(UCHAR) + sizeof(int))) ? GT_SUCCESS : GT_FAILURE;
013		// 业务处理结束
014	
015		if (GT_SUCCESS == iResult) {
016			GtAdpResponse(iSocket, uszBuf, strlen((char*)uszBuf));
017		}
018		else {
019			GtAdpErrors(iSocket);
020		}
021	
022		return;
023	}

    第8行至9行清除epoll事件并从根堆中删除元素。

    第11行至13行业务实现过程。

    第15行至20行根据业务实现结果向上游返回不同的消息。

001	long GtAdpDeadline(struct timeval& rstTimer)
002	{
003		long lTimer = 0;
004		struct timeval stNow, stTimer;
005	
006		gettimeofday(&stNow, NULL);
007		(rstTimer.tv_usec += GT_ADP_TIMOUT) > GT_ADP_ONESEC ? rstTimer.tv_sec ++, rstTimer.tv_usec -= GT_ADP_ONESEC : 0;
008	
009		stTimer.tv_sec = rstTimer.tv_sec - stNow.tv_sec;
010		stTimer.tv_usec = rstTimer.tv_usec - stNow.tv_usec;
011		stTimer.tv_usec < 0 ? stTimer.tv_sec -= 1, stTimer.tv_usec += GT_ADP_ONESEC : 0;
012	
013		lTimer = stTimer.tv_sec * GT_ADP_TMBASE + stTimer.tv_usec / GT_ADP_TMBASE;
014	
015		return lTimer > 0 ? lTimer : 0;
016	}

    第6行获取当前的系统时间。

    第7行入队时间加上超时时间。

    第9行至11行计算当前时间与超时时间的差值。

    第13行将差值转换成epoll超时的时间单位。

    完整的代码片段http://git.oschina.net/gonglibin/codes/rc87dt0iagk4exqm3psuw66,作为论证和用例代码仅供参考,调试测试通过,根堆封装成对象的时候已经加入锁,epoll对象早年封装的没有考虑同步的问题,代码中通过注释已经提请留意了。

    外面下雨了,前些日子的暑热一扫而光,Google I/O 2017开发者大会518开幕了,谷歌宣布kotlin作为一级编程语言,消息一出有小伙伴慌了,其实木必要,kotlin好比加了语法糖的java,跟java原本就有着血浓于水的感情,况且谷歌与oracle豪门之间的恩怨不至于革小码农们的命,编程思想比实现语言更重要,人工智能与VR是好东西,未来有前途,抽空研究研究。

相关文章
|
26天前
|
存储 数据可视化 Linux
语雀停机事件后,你也在找替代方案吗?
2023年10月23日,语雀遭遇长达8小时的服务中断,严重影响了用户的日常工作和生活。事后官方提供了6个月免费会员作为补偿。此次事件引发用户对云笔记产品的可靠性思考,Obsidian和思源笔记因注重本地存留而受到关注。Obsidian支持双向链接、Markdown、本地存储及插件系统,适合个人知识管理;思源笔记则强调关系图谱和快速引用功能。此外,也有用户选择印象笔记、腾讯文档等云产品或使用编辑器+网盘的方式。如何选择合适的工具取决于个人需求和偏好。
58 2
|
6月前
|
编译器 调度 C++
协程问题之机制保障中提到的早值班机制和稳定性周会机制分别是什么
协程问题之机制保障中提到的早值班机制和稳定性周会机制分别是什么
|
2月前
|
消息中间件 运维 UED
消息队列运维实战:攻克消息丢失、重复与积压难题
消息队列(MQ)作为分布式系统中的核心组件,承担着解耦、异步处理和流量削峰等功能。然而,在实际应用中,消息丢失、重复和积压等问题时有发生,严重影响系统的稳定性和数据的一致性。本文将深入探讨这些问题的成因及其解决方案,帮助您在运维过程中有效应对这些挑战。
45 1
|
5月前
|
Java 数据库 开发者
"揭秘!SpringBoot+事务钩子,如何携手打造零差错、秒级响应的高效支付系统,让你的业务飞起来!"
【8月更文挑战第11天】构建高效稳定的支付系统时,Spring Boot凭借其快速开发与丰富生态成为优选框架。通过集成Spring事务管理抽象,@Transactional注解简化了数据库事务处理。针对复杂业务,可利用`TransactionSynchronizationManager`和`TransactionSynchronization`接口自定义事务钩子函数,在事务不同阶段执行特定逻辑,如支付成功或失败时的通知,确保数据一致性与业务完整性。
118 4
|
8月前
|
消息中间件 监控 Java
接口请求重试策略:保障稳定性的必杀技
接口请求重试策略:保障稳定性的必杀技
404 0
有几百万消息持续积压几小时,怎么解决
有几百万消息持续积压几小时,怎么解决
100 0
|
前端开发 Cloud Native 大数据
坑爹,线上同步近 3w 个用户导致链路阻塞引入发的线上问题,你经历过吗?
坑爹,线上同步近 3w 个用户导致链路阻塞引入发的线上问题,你经历过吗?
|
消息中间件 开发框架 NoSQL
【工作中问题解决实践 二】分布式消息并发同步处理方案
【工作中问题解决实践 二】分布式消息并发同步处理方案
142 0
|
Kubernetes 负载均衡 算法
异步任务处理系统,如何解决业务长耗时、高并发难题?
阿里云函数计算 FC 为用户提供了开箱即用的,接近于Level ß3能力的异步任务处理服务。用户只需要创建任务处理函数,通过控制台,命令行工具,API/SDK,事件触发等多种方式提交任务,就可以弹性、可靠、可观测完备的方式处理任务。