一次算法读图超时引起的urllib3源码分析

简介:
号主从事深度学习算法服务开发多年,2022年二月的最后一天,出炉一个刚接触算法服务时困扰许久的“头号难题”。 介于篇幅源码较多,各位人才看官调整好心情给个好评:点赞、评论、转发

故事上下文

算法服务处理处理流程:

输入image_url -> 读取图片image -> 图片预处理(解压缩/RGB_BGR互转/缩放等) -> 算法推理 -> 输出标签结果

问题:发现某算法A,单独测试推理<50ms,但是整个流程花费200ms~3s,明显不正常,头大!!!

首先进行读图性能测试

读图方式 读小图性能(约10k) 读大图性能(约700k) 备注
外网读图 0.0208 0.0713
内网读图 0.0119 0.0592
本地读图 0.0006 0.0283

对比分析,发现可能造成超时的原因包含两种,“网络速度”或者“网络抖动”

直接上解决方案

  • 问题1:网络环境引起的超时问题?

    • 读图组件代码固定,切换图片链接至内网读图,快速解决问题
  • 问题2:网络抖动引起的等待超时问题?

    • 快速断开,快速重试解决问题

由于此处读图选用的urllib3,后续的章节我们着重分析下urllib3的超时部分源码

urllib架构

urllib3是一个功能强大,条理清晰,用于HTTP客户端的Python库,许多Python的原生系统已经开始使用urllib3

image-20211105141714587.png

urllib源码分析

分析__init__.py就可以得出对外提供的功能

__all__ = (
    "HTTPConnectionPool",  # http模式连接池
    "HTTPHeaderDict",      # 请求头词典
    "HTTPSConnectionPool", # https模式连接池
    "PoolManager",      # 池管理类,self.poos映射类型,保存连接信息
    "ProxyManager",     # 代理池管理类,行为同PoolManager
    "HTTPResponse",     # 返回对象
    "Retry",       # 精细化控制重试和重定向 retries=Retry(3, redirect=2)
    "Timeout",     # 精细化控制超时 timeout=Timeout(connect=1.0, read=2.0)
    "add_stderr_logger",    # 修改默认日志的级别
    "connection_from_url",  # 返回HTTPConnectionPool或HTTPSConnectionPool实例
    "disable_warnings",     # 禁用warnings
    "encode_multipart_formdata", # dict 转换成 form-data
    "make_headers",              # 生成request headers 快捷函数
    "proxy_from_url",            # 返回ProxyManager对象
    "request",                   # 请求方法
)
  • RetryTimeout整数封装成附加其他功效的简单功能类
  • HTTPResponse为对返回数据的Model封装

以下是urllib3 主干类层次结构

image-20211105141913884.png

分析源码的方式有很多中,其中问题导向最可靠,以下我们从Timeout进行分析urllib3源码

urllib3.exceptions.ConnectTimeoutError: (<urllib3.connection.HTTPConnection object at 0x7fc862ecb518>, 'Connection to xxx.xxx.com timed out. (connect timeout=0.0001)')

urllib3.exceptions.ReadTimeoutError: HTTPConnectionPool(host='xxx.xxx.com', port=80): Read timed out. (read timeout=0.0001)

ConnectTimeoutError:连接超时;ReadTimeoutError:读取超时,我们先看一个请求验证的Demo:

# -*- coding: utf-8 -*-
def image_url_demo1(image_url, timeout=3):
    import socket
    import urllib2
    try:
        data = urllib2.urlopen(image_url, timeout=timeout).read()
        return data
    except urllib2.URLError as e:
        raise e
    except socket.timeout as e:
        raise e
    except Exception as e:
        raise e

import urllib3
def image_url_demo2(image_url, timeout=urllib3.Timeout(connect=5, read=5)):
    try:
        api_http = urllib3.PoolManager()
        r = api_http.request('GET', image_url, timeout=timeout)
        return r.data
    except urllib3.exceptions.MaxRetryError as e:
        raise e
    except urllib3.exceptions.ConnectTimeoutError as e:
        raise e
    except urllib3.exceptions.ReadTimeoutError as e:
        raise e
    except Exception as e:
        raise e

def image_url_demo3(image_url, timeout=urllib3.Timeout(connect=5, read=5)):
    try:
        api_http = urllib3.PoolManager()
        # r = api_http.request('GET', image_url, timeout=timeout)
        r = api_http.request('GET', image_url, timeout=timeout, retries=False)
        return r.data
    except urllib3.exceptions.MaxRetryError as e:
        raise e
    except urllib3.exceptions.ConnectTimeoutError as e:
        raise e
    except urllib3.exceptions.ReadTimeoutError as e:
        raise e
    except Exception as e:
        raise e

if __name__ == '__main__':
    image_url = 'http://xxx.xxx.com/dxGnGYXcNDbNpERoLBxSoekayqw9E.jpg'
    # image = image_url_demo1(image_url, 1)
    # image = image_url_demo1(image_url, 0.000001)
    # print(type(image))

    # image = image_url_demo2(image_url)
    # image = image_url_demo2(image_url, urllib3.Timeout(connect=0.0001, read=1))
    # image = image_url_demo2(image_url, urllib3.Timeout(connect=1, read=0.0001))
    # print(type(image))

    image = image_url_demo3(image_url)
    # image = image_url_demo3(image_url, urllib3.Timeout(connect=0.0001, read=1))
    image = image_url_demo3(image_url, urllib3.Timeout(connect=1, read=0.0001))
    print(type(image))

由上可知urllib3的超时设置,其实就是封装了socket的超时设置,以下是socket的超时设置逻辑,包含请求超时接收超时

请求建立超时设置

  import socket
  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  sock.settimeout(5)
  sock.connect((host, port))
  # 恢复默认超时设置,设置某些情况下socket进入阻塞模式(如makefile)
  sock.settimeout(None)
  sock.connect((host, port))
  sock.sendall('xxx')
  sock.recv(1024)
  sock.close()

数据接收超时设置

  import socket
  socket.setdefaulttimeout(5)
  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  sock.connect((host, port))
  sock.sendall('xxx')
  # 连接和接收的时候都设置一次超时
  sock.settimeout(5)
  sock.recv(1024)
  sock.close()

由上urllib3主干类层次结构可知,请求的建立的源码封装在HTTPConnection中,所以,我们想查看的ConnectTimeoutError源码也在该类中,具体如下:

 // https://github.com/urllib3/urllib3/blob/main/src/urllib3/connection.py#L191
 def _new_conn(self) -> socket.socket:
        """Establish a socket connection and set nodelay settings on it.
        :return: New socket connection.
        """
        try:
            conn = connection.create_connection(
                (self._dns_host, self.port),
                self.timeout,
                source_address=self.source_address,
                socket_options=self.socket_options,
            )
        except socket.gaierror as e:
            raise NameResolutionError(self.host, self, e) from e
        except SocketTimeout as e:
            raise ConnectTimeoutError(
                self,
                f"Connection to {self.host} timed out. (connect timeout={self.timeout})",
            ) from e
        except OSError as e:
            raise NewConnectionError(
                self, f"Failed to establish a new connection: {e}"
            ) from e

        return conn

connection.create_connection具体实现了连接的创建,具体如下:

// https://github.com/urllib3/urllib3/blob/main/src/urllib3/util/connection.py#L29
def create_connection(
    address: Tuple[str, int],
    timeout: Optional[float] = SOCKET_GLOBAL_DEFAULT_TIMEOUT,
    source_address: Optional[Tuple[str, int]] = None,
    socket_options: Optional[_TYPE_SOCKET_OPTIONS] = None,
) -> socket.socket:
   
    host, port = address
    if host.startswith("["):
        host = host.strip("[]")
    err = None

    # Using the value from allowed_gai_family() in the context of getaddrinfo lets
    # us select whether to work with IPv4 DNS records, IPv6 records, or both.
    # The original create_connection function always returns all records.
    family = allowed_gai_family()

    try:
        host.encode("idna")
    except UnicodeError:
        raise LocationParseError(f"'{host}', label empty or too long") from None

    for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
        af, socktype, proto, canonname, sa = res
        sock = None
        try:
         // 和直接使用 socket 设置方式一致
            sock = socket.socket(af, socktype, proto)

            # If provided, set socket level options before connecting.
            _set_socket_options(sock, socket_options)

            if timeout is not SOCKET_GLOBAL_DEFAULT_TIMEOUT:
                sock.settimeout(timeout)
            if source_address:
                sock.bind(source_address)
            sock.connect(sa)
            # Break explicitly a reference cycle
            err = None
            return sock

        except OSError as _:
            err = _
            if sock is not None:
                sock.close()

    if err is not None:
        try:
            raise err
        finally:
            # Break explicitly a reference cycle
            err = None
    else:
        raise OSError("getaddrinfo returns an empty list")

由上urllib3主干类层次结构可知,请求的读取的源码封装在HTTPConnectionPool中,所以,我们想查看的ReadTimeoutError源码也在该类中,具体如下:

 // https://github.com/urllib3/urllib3/blob/main/src/urllib3/connectionpool.py#L362
 def _raise_timeout(
        self,
        err: Union[BaseSSLError, OSError, SocketTimeout],
        url: str,
        timeout_value: _TYPE_TIMEOUT,
    ) -> None:
        """Is the error actually a timeout? Will raise a ReadTimeout or pass"""

        if isinstance(err, SocketTimeout):
            raise ReadTimeoutError(
                self, url, f"Read timed out. (read timeout={timeout_value})"
            ) from err

        # See the above comment about EAGAIN in Python 3.
        if hasattr(err, "errno") and err.errno in _blocking_errnos:
            raise ReadTimeoutError(
                self, url, f"Read timed out. (read timeout={timeout_value})"
            ) from err

    def _make_request(
        self,
        conn: HTTPConnection,
        method: str,
        url: str,
        timeout: _TYPE_TIMEOUT = _Default,
        chunked: bool = False,
        **httplib_request_kw: Any,
    ) -> _HttplibHTTPResponse:
        
        self.num_requests += 1

        timeout_obj = self._get_timeout(timeout)
        timeout_obj.start_connect()
        conn.timeout = timeout_obj.connect_timeout  # type: ignore[assignment]

        # Trigger any extra validation we need to do.
        // 请求连接验证过程中的超时,也属于 ReadTimeoutError
        try:
            self._validate_conn(conn)
        except (SocketTimeout, BaseSSLError) as e:
            self._raise_timeout(err=e, url=url, timeout_value=conn.timeout)
            raise

        # conn.request() calls http.client.*.request, not the method in
        # urllib3.request. It also calls makefile (recv) on the socket.
        try:
         //  请求体发送
            if chunked:
                conn.request_chunked(method, url, **httplib_request_kw)
            else:
                conn.request(method, url, **httplib_request_kw)

        # We are swallowing BrokenPipeError (errno.EPIPE) since the server is
        # legitimately able to close the connection after sending a valid response.
        # With this behaviour, the received response is still readable.
        except BrokenPipeError:
            pass
        except OSError as e:
            # MacOS/Linux
            # EPROTOTYPE is needed on macOS
            # https://erickt.github.io/blog/2014/11/19/adventures-in-debugging-a-potential-osx-kernel-bug/
            if e.errno != errno.EPROTOTYPE:
                raise

        # Reset the timeout for the recv() on the socket
        // 重置请求socket超时时间
        read_timeout = timeout_obj.read_timeout

        if conn.sock:
            if read_timeout == 0:
                raise ReadTimeoutError(
                    self, url, f"Read timed out. (read timeout={read_timeout})"
                )
            if read_timeout is Timeout.DEFAULT_TIMEOUT:
                conn.sock.settimeout(socket.getdefaulttimeout())
            else:  # None or a value
                conn.sock.settimeout(read_timeout)

        # Receive the response from the server
        try:
            httplib_response = conn.getresponse()
        except (BaseSSLError, OSError) as e:
            self._raise_timeout(err=e, url=url, timeout_value=read_timeout)
            raise
            
        ...
        
        return httplib_response

urllib其他常用姿势

响应方式

所用的响应都通过HTTPResponse对象提供statusdataheaders属性。

  import urllib3
  http = urllib3.PoolManager()
  r = http.request('GET', 'http://httpbin.org/ip')
  r.status
  r.data
  r.headers

  # 输出
  200
  b'{\n  "origin": "137.59.103.52"\n}\n'
  HTTPHeaderDict({'Date': 'Fri, 05 Nov 2021 05:38:24 GMT', 'Content-Type': 'application/json', 'Content-Length': '32', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true'})

JSON响应

JSON内容可以通过解码和反序列化来加载data请求的属性:

  import json
  import urllib3
  http = urllib3.PoolManager()
  r = http.request('GET', 'http://httpbin.org/ip')
  json.loads(r.data.decode('utf-8'))

  # 输出
  {'origin': '137.59.103.52'}

二进制响应

data响应的属性始终设置为表示响应内容的字节字符串:

  import urllib3
  http = urllib3.PoolManager()
  r = http.request('GET', 'http://httpbin.org/bytes/8')
  r.data

  # 输出
  b'S\x04e\to\x12NN'

注:对于更大的响应,有时可以使用stream进行接收

IO响应

有时候你想用io.TextIOWrapper或类似的对象,如直接使用HTTPResponse数据。要使这两个接口很好地结合在一起,需要使用auto_close通过将其设置为False。默认情况下,读取所有字节后关闭HTTP响应,以上设置这将禁用该行为:

  import io
  import urllib3
  http = urllib3.PoolManager()
  r = http.request('GET', 'https://www.baidu.com/', preload_content=False)
  r.auto_close = False
  for line in io.TextIOWrapper(r):
    print(line)

请求方式

GETHEADDELETE请求比较常规,将请求参数作为字典传递到fields参数即可,如:fields={'arg': 'value'}。下面我们主要说说POSTPUT请求。

首先,POSTPUT通过URL传参请求,需要在URL中进行手动编码参数:

  import urllib3
  http = urllib3.PoolManager()
  from urllib.parse import urlencode
  encoded_args = urlencode({'arg': 'value'})
  url = 'http://httpbin.org/post?' + encoded_args
  r = http.request('POST', url)
  json.loads(r.data.decode('utf-8'))['args']

  # 输出
  {'arg': 'value'}

表单POST

表单方式,将参数作为字典传递到fields参数进行请求:

  import urllib3
  http = urllib3.PoolManager()
  r = http.request('POST', 'http://httpbin.org/post', fields={'field': 'value'})
  json.loads(r.data.decode('utf-8'))['form']

  # 输出
  {'field': 'value'}

注:表单方式默认以String类型进行传递

JSON POST

JSON方式,将指定编码数据作为JSON请求发送body参数和设置Content-Type参数进行请求:

  import json
  import urllib3
  http = urllib3.PoolManager()
  data = {'attribute': 'value'}
  encoded_data = json.dumps(data).encode('utf-8')
  r = http.request('POST','http://httpbin.org/post', body=encoded_data, headers={'Content-Type': 'application/json'})
  json.loads(r.data.decode('utf-8'))['json']

  # 输出
  {'attribute': 'value'}

文件和二进制 POST

使用multipart/form-data编码进行二进制文件传参请求,比如上传图片或其他文件,由于这种场景已经不再适用,这块不继续讲解

参考文档

❤️❤️❤️读者每一份热爱都是笔者前进的动力!

我是三十一,感谢各位朋友:求点赞、求收藏、求评论,大家下期见!

相关文章
|
7月前
|
NoSQL 算法 Redis
Redis系列-11.RedLock算法和底层源码分析
Redis系列-11.RedLock算法和底层源码分析
122 0
|
缓存 算法 测试技术
vue3源码分析——手写diff算法
通过上面的测试用例,可以看到是分了7种情况来的,在本篇文章,不采用编写测试用例,有兴趣的可以自己去github上面查看,这里主要使用图文加上代码,帮助大家更快的理解vue3中的diff算法
vue3源码分析——手写diff算法
|
存储 数据采集 缓存
Memcached源码分析 - LRU淘汰算法(6)
Memcached源码分析 - 网络模型(1)Memcached源码分析 - 命令解析(2)Memcached源码分析 - 数据存储(3)Memcached源码分析 - 增删改查操作(4)Memcached源码分析 - 内存存储机制Slabs(5)Memcached源码分析 - LRU淘汰算法(6)Memcached源码分析 - 消息回应(7) Memcached的LRU几种策略 惰性删除。
1642 0
|
机器学习/深度学习 算法 索引
机器学习实战:K近邻算法(源码分析)
学习机器学习的总结: 先把代码放到这儿,话说一句一句看着打真的好累,还好可以通过debug一步一步观察变量,理解顿时快了许多。
1251 0
|
算法 关系型数据库 C语言
PgSQL · 源码分析 · PG中的无锁算法和原子操作应用一则
原子操作概述 近年来随着服务器上CPU核数的不断增加,无锁算法(Lock Free)越来越广泛的被应用于高并发的系统中。PostgreSQL 做为世界上最高级开源数据库也在9.5时引入了无锁算法。本文先介绍了无锁算法和原子操作在PostgreSQL中的具体实现, 再通过一个Patch来看一下在PostgreSQL中是如何利用它来解决实际的高并发问题的。 无锁算法是利用CPU的原子操作实现的数
2375 0
|
算法 TensorFlow 算法框架/工具
TensorFlow学习笔记之五——源码分析之最近算法
import numpy as np import tensorflow as tf # Import MINST data import input_data mnist = input_data.read_data_sets("/tmp/data/", one_hot=True) #这里主要是导入数据,数据通过input_data.py已经下载到/tmp/data/目录之下了,这里下载数据的时候,需要提前用浏览器尝试是否可以打开 #http://yann.lecun.com/exdb/mnist/,如果打不开,下载数据阶段会报错。
1349 0
|
17天前
|
算法
基于WOA算法的SVDD参数寻优matlab仿真
该程序利用鲸鱼优化算法(WOA)对支持向量数据描述(SVDD)模型的参数进行优化,以提高数据分类的准确性。通过MATLAB2022A实现,展示了不同信噪比(SNR)下模型的分类误差。WOA通过模拟鲸鱼捕食行为,动态调整SVDD参数,如惩罚因子C和核函数参数γ,以寻找最优参数组合,增强模型的鲁棒性和泛化能力。
|
23天前
|
机器学习/深度学习 算法 Serverless
基于WOA-SVM的乳腺癌数据分类识别算法matlab仿真,对比BP神经网络和SVM
本项目利用鲸鱼优化算法(WOA)优化支持向量机(SVM)参数,针对乳腺癌早期诊断问题,通过MATLAB 2022a实现。核心代码包括参数初始化、目标函数计算、位置更新等步骤,并附有详细中文注释及操作视频。实验结果显示,WOA-SVM在提高分类精度和泛化能力方面表现出色,为乳腺癌的早期诊断提供了有效的技术支持。
|
3天前
|
供应链 算法 调度
排队算法的matlab仿真,带GUI界面
该程序使用MATLAB 2022A版本实现排队算法的仿真,并带有GUI界面。程序支持单队列单服务台、单队列多服务台和多队列多服务台三种排队方式。核心函数`func_mms2`通过模拟到达时间和服务时间,计算阻塞率和利用率。排队论研究系统中顾客和服务台的交互行为,广泛应用于通信网络、生产调度和服务行业等领域,旨在优化系统性能,减少等待时间,提高资源利用率。
|
10天前
|
存储 算法
基于HMM隐马尔可夫模型的金融数据预测算法matlab仿真
本项目基于HMM模型实现金融数据预测,包括模型训练与预测两部分。在MATLAB2022A上运行,通过计算状态转移和观测概率预测未来值,并绘制了预测值、真实值及预测误差的对比图。HMM模型适用于金融市场的时间序列分析,能够有效捕捉隐藏状态及其转换规律,为金融预测提供有力工具。
下一篇
DataWorks