一次算法读图超时引起的urllib3源码分析

简介:
号主从事深度学习算法服务开发多年,2022年二月的最后一天,出炉一个刚接触算法服务时困扰许久的“头号难题”。 介于篇幅源码较多,各位人才看官调整好心情给个好评:点赞、评论、转发

故事上下文

算法服务处理处理流程:

输入image_url -> 读取图片image -> 图片预处理(解压缩/RGB_BGR互转/缩放等) -> 算法推理 -> 输出标签结果

问题:发现某算法A,单独测试推理<50ms,但是整个流程花费200ms~3s,明显不正常,头大!!!

首先进行读图性能测试

读图方式 读小图性能(约10k) 读大图性能(约700k) 备注
外网读图 0.0208 0.0713
内网读图 0.0119 0.0592
本地读图 0.0006 0.0283

对比分析,发现可能造成超时的原因包含两种,“网络速度”或者“网络抖动”

直接上解决方案

  • 问题1:网络环境引起的超时问题?

    • 读图组件代码固定,切换图片链接至内网读图,快速解决问题
  • 问题2:网络抖动引起的等待超时问题?

    • 快速断开,快速重试解决问题

由于此处读图选用的urllib3,后续的章节我们着重分析下urllib3的超时部分源码

urllib架构

urllib3是一个功能强大,条理清晰,用于HTTP客户端的Python库,许多Python的原生系统已经开始使用urllib3

image-20211105141714587.png

urllib源码分析

分析__init__.py就可以得出对外提供的功能

__all__ = (
    "HTTPConnectionPool",  # http模式连接池
    "HTTPHeaderDict",      # 请求头词典
    "HTTPSConnectionPool", # https模式连接池
    "PoolManager",      # 池管理类,self.poos映射类型,保存连接信息
    "ProxyManager",     # 代理池管理类,行为同PoolManager
    "HTTPResponse",     # 返回对象
    "Retry",       # 精细化控制重试和重定向 retries=Retry(3, redirect=2)
    "Timeout",     # 精细化控制超时 timeout=Timeout(connect=1.0, read=2.0)
    "add_stderr_logger",    # 修改默认日志的级别
    "connection_from_url",  # 返回HTTPConnectionPool或HTTPSConnectionPool实例
    "disable_warnings",     # 禁用warnings
    "encode_multipart_formdata", # dict 转换成 form-data
    "make_headers",              # 生成request headers 快捷函数
    "proxy_from_url",            # 返回ProxyManager对象
    "request",                   # 请求方法
)
  • RetryTimeout整数封装成附加其他功效的简单功能类
  • HTTPResponse为对返回数据的Model封装

以下是urllib3 主干类层次结构

image-20211105141913884.png

分析源码的方式有很多中,其中问题导向最可靠,以下我们从Timeout进行分析urllib3源码

urllib3.exceptions.ConnectTimeoutError: (<urllib3.connection.HTTPConnection object at 0x7fc862ecb518>, 'Connection to xxx.xxx.com timed out. (connect timeout=0.0001)')

urllib3.exceptions.ReadTimeoutError: HTTPConnectionPool(host='xxx.xxx.com', port=80): Read timed out. (read timeout=0.0001)

ConnectTimeoutError:连接超时;ReadTimeoutError:读取超时,我们先看一个请求验证的Demo:

# -*- coding: utf-8 -*-
def image_url_demo1(image_url, timeout=3):
    import socket
    import urllib2
    try:
        data = urllib2.urlopen(image_url, timeout=timeout).read()
        return data
    except urllib2.URLError as e:
        raise e
    except socket.timeout as e:
        raise e
    except Exception as e:
        raise e

import urllib3
def image_url_demo2(image_url, timeout=urllib3.Timeout(connect=5, read=5)):
    try:
        api_http = urllib3.PoolManager()
        r = api_http.request('GET', image_url, timeout=timeout)
        return r.data
    except urllib3.exceptions.MaxRetryError as e:
        raise e
    except urllib3.exceptions.ConnectTimeoutError as e:
        raise e
    except urllib3.exceptions.ReadTimeoutError as e:
        raise e
    except Exception as e:
        raise e

def image_url_demo3(image_url, timeout=urllib3.Timeout(connect=5, read=5)):
    try:
        api_http = urllib3.PoolManager()
        # r = api_http.request('GET', image_url, timeout=timeout)
        r = api_http.request('GET', image_url, timeout=timeout, retries=False)
        return r.data
    except urllib3.exceptions.MaxRetryError as e:
        raise e
    except urllib3.exceptions.ConnectTimeoutError as e:
        raise e
    except urllib3.exceptions.ReadTimeoutError as e:
        raise e
    except Exception as e:
        raise e

if __name__ == '__main__':
    image_url = 'http://xxx.xxx.com/dxGnGYXcNDbNpERoLBxSoekayqw9E.jpg'
    # image = image_url_demo1(image_url, 1)
    # image = image_url_demo1(image_url, 0.000001)
    # print(type(image))

    # image = image_url_demo2(image_url)
    # image = image_url_demo2(image_url, urllib3.Timeout(connect=0.0001, read=1))
    # image = image_url_demo2(image_url, urllib3.Timeout(connect=1, read=0.0001))
    # print(type(image))

    image = image_url_demo3(image_url)
    # image = image_url_demo3(image_url, urllib3.Timeout(connect=0.0001, read=1))
    image = image_url_demo3(image_url, urllib3.Timeout(connect=1, read=0.0001))
    print(type(image))

由上可知urllib3的超时设置,其实就是封装了socket的超时设置,以下是socket的超时设置逻辑,包含请求超时接收超时

请求建立超时设置

  import socket
  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  sock.settimeout(5)
  sock.connect((host, port))
  # 恢复默认超时设置,设置某些情况下socket进入阻塞模式(如makefile)
  sock.settimeout(None)
  sock.connect((host, port))
  sock.sendall('xxx')
  sock.recv(1024)
  sock.close()

数据接收超时设置

  import socket
  socket.setdefaulttimeout(5)
  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  sock.connect((host, port))
  sock.sendall('xxx')
  # 连接和接收的时候都设置一次超时
  sock.settimeout(5)
  sock.recv(1024)
  sock.close()

由上urllib3主干类层次结构可知,请求的建立的源码封装在HTTPConnection中,所以,我们想查看的ConnectTimeoutError源码也在该类中,具体如下:

 // https://github.com/urllib3/urllib3/blob/main/src/urllib3/connection.py#L191
 def _new_conn(self) -> socket.socket:
        """Establish a socket connection and set nodelay settings on it.
        :return: New socket connection.
        """
        try:
            conn = connection.create_connection(
                (self._dns_host, self.port),
                self.timeout,
                source_address=self.source_address,
                socket_options=self.socket_options,
            )
        except socket.gaierror as e:
            raise NameResolutionError(self.host, self, e) from e
        except SocketTimeout as e:
            raise ConnectTimeoutError(
                self,
                f"Connection to {self.host} timed out. (connect timeout={self.timeout})",
            ) from e
        except OSError as e:
            raise NewConnectionError(
                self, f"Failed to establish a new connection: {e}"
            ) from e

        return conn

connection.create_connection具体实现了连接的创建,具体如下:

// https://github.com/urllib3/urllib3/blob/main/src/urllib3/util/connection.py#L29
def create_connection(
    address: Tuple[str, int],
    timeout: Optional[float] = SOCKET_GLOBAL_DEFAULT_TIMEOUT,
    source_address: Optional[Tuple[str, int]] = None,
    socket_options: Optional[_TYPE_SOCKET_OPTIONS] = None,
) -> socket.socket:
   
    host, port = address
    if host.startswith("["):
        host = host.strip("[]")
    err = None

    # Using the value from allowed_gai_family() in the context of getaddrinfo lets
    # us select whether to work with IPv4 DNS records, IPv6 records, or both.
    # The original create_connection function always returns all records.
    family = allowed_gai_family()

    try:
        host.encode("idna")
    except UnicodeError:
        raise LocationParseError(f"'{host}', label empty or too long") from None

    for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
        af, socktype, proto, canonname, sa = res
        sock = None
        try:
         // 和直接使用 socket 设置方式一致
            sock = socket.socket(af, socktype, proto)

            # If provided, set socket level options before connecting.
            _set_socket_options(sock, socket_options)

            if timeout is not SOCKET_GLOBAL_DEFAULT_TIMEOUT:
                sock.settimeout(timeout)
            if source_address:
                sock.bind(source_address)
            sock.connect(sa)
            # Break explicitly a reference cycle
            err = None
            return sock

        except OSError as _:
            err = _
            if sock is not None:
                sock.close()

    if err is not None:
        try:
            raise err
        finally:
            # Break explicitly a reference cycle
            err = None
    else:
        raise OSError("getaddrinfo returns an empty list")

由上urllib3主干类层次结构可知,请求的读取的源码封装在HTTPConnectionPool中,所以,我们想查看的ReadTimeoutError源码也在该类中,具体如下:

 // https://github.com/urllib3/urllib3/blob/main/src/urllib3/connectionpool.py#L362
 def _raise_timeout(
        self,
        err: Union[BaseSSLError, OSError, SocketTimeout],
        url: str,
        timeout_value: _TYPE_TIMEOUT,
    ) -> None:
        """Is the error actually a timeout? Will raise a ReadTimeout or pass"""

        if isinstance(err, SocketTimeout):
            raise ReadTimeoutError(
                self, url, f"Read timed out. (read timeout={timeout_value})"
            ) from err

        # See the above comment about EAGAIN in Python 3.
        if hasattr(err, "errno") and err.errno in _blocking_errnos:
            raise ReadTimeoutError(
                self, url, f"Read timed out. (read timeout={timeout_value})"
            ) from err

    def _make_request(
        self,
        conn: HTTPConnection,
        method: str,
        url: str,
        timeout: _TYPE_TIMEOUT = _Default,
        chunked: bool = False,
        **httplib_request_kw: Any,
    ) -> _HttplibHTTPResponse:
        
        self.num_requests += 1

        timeout_obj = self._get_timeout(timeout)
        timeout_obj.start_connect()
        conn.timeout = timeout_obj.connect_timeout  # type: ignore[assignment]

        # Trigger any extra validation we need to do.
        // 请求连接验证过程中的超时,也属于 ReadTimeoutError
        try:
            self._validate_conn(conn)
        except (SocketTimeout, BaseSSLError) as e:
            self._raise_timeout(err=e, url=url, timeout_value=conn.timeout)
            raise

        # conn.request() calls http.client.*.request, not the method in
        # urllib3.request. It also calls makefile (recv) on the socket.
        try:
         //  请求体发送
            if chunked:
                conn.request_chunked(method, url, **httplib_request_kw)
            else:
                conn.request(method, url, **httplib_request_kw)

        # We are swallowing BrokenPipeError (errno.EPIPE) since the server is
        # legitimately able to close the connection after sending a valid response.
        # With this behaviour, the received response is still readable.
        except BrokenPipeError:
            pass
        except OSError as e:
            # MacOS/Linux
            # EPROTOTYPE is needed on macOS
            # https://erickt.github.io/blog/2014/11/19/adventures-in-debugging-a-potential-osx-kernel-bug/
            if e.errno != errno.EPROTOTYPE:
                raise

        # Reset the timeout for the recv() on the socket
        // 重置请求socket超时时间
        read_timeout = timeout_obj.read_timeout

        if conn.sock:
            if read_timeout == 0:
                raise ReadTimeoutError(
                    self, url, f"Read timed out. (read timeout={read_timeout})"
                )
            if read_timeout is Timeout.DEFAULT_TIMEOUT:
                conn.sock.settimeout(socket.getdefaulttimeout())
            else:  # None or a value
                conn.sock.settimeout(read_timeout)

        # Receive the response from the server
        try:
            httplib_response = conn.getresponse()
        except (BaseSSLError, OSError) as e:
            self._raise_timeout(err=e, url=url, timeout_value=read_timeout)
            raise
            
        ...
        
        return httplib_response

urllib其他常用姿势

响应方式

所用的响应都通过HTTPResponse对象提供statusdataheaders属性。

  import urllib3
  http = urllib3.PoolManager()
  r = http.request('GET', 'http://httpbin.org/ip')
  r.status
  r.data
  r.headers

  # 输出
  200
  b'{\n  "origin": "137.59.103.52"\n}\n'
  HTTPHeaderDict({'Date': 'Fri, 05 Nov 2021 05:38:24 GMT', 'Content-Type': 'application/json', 'Content-Length': '32', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true'})

JSON响应

JSON内容可以通过解码和反序列化来加载data请求的属性:

  import json
  import urllib3
  http = urllib3.PoolManager()
  r = http.request('GET', 'http://httpbin.org/ip')
  json.loads(r.data.decode('utf-8'))

  # 输出
  {'origin': '137.59.103.52'}

二进制响应

data响应的属性始终设置为表示响应内容的字节字符串:

  import urllib3
  http = urllib3.PoolManager()
  r = http.request('GET', 'http://httpbin.org/bytes/8')
  r.data

  # 输出
  b'S\x04e\to\x12NN'

注:对于更大的响应,有时可以使用stream进行接收

IO响应

有时候你想用io.TextIOWrapper或类似的对象,如直接使用HTTPResponse数据。要使这两个接口很好地结合在一起,需要使用auto_close通过将其设置为False。默认情况下,读取所有字节后关闭HTTP响应,以上设置这将禁用该行为:

  import io
  import urllib3
  http = urllib3.PoolManager()
  r = http.request('GET', 'https://www.baidu.com/', preload_content=False)
  r.auto_close = False
  for line in io.TextIOWrapper(r):
    print(line)

请求方式

GETHEADDELETE请求比较常规,将请求参数作为字典传递到fields参数即可,如:fields={'arg': 'value'}。下面我们主要说说POSTPUT请求。

首先,POSTPUT通过URL传参请求,需要在URL中进行手动编码参数:

  import urllib3
  http = urllib3.PoolManager()
  from urllib.parse import urlencode
  encoded_args = urlencode({'arg': 'value'})
  url = 'http://httpbin.org/post?' + encoded_args
  r = http.request('POST', url)
  json.loads(r.data.decode('utf-8'))['args']

  # 输出
  {'arg': 'value'}

表单POST

表单方式,将参数作为字典传递到fields参数进行请求:

  import urllib3
  http = urllib3.PoolManager()
  r = http.request('POST', 'http://httpbin.org/post', fields={'field': 'value'})
  json.loads(r.data.decode('utf-8'))['form']

  # 输出
  {'field': 'value'}

注:表单方式默认以String类型进行传递

JSON POST

JSON方式,将指定编码数据作为JSON请求发送body参数和设置Content-Type参数进行请求:

  import json
  import urllib3
  http = urllib3.PoolManager()
  data = {'attribute': 'value'}
  encoded_data = json.dumps(data).encode('utf-8')
  r = http.request('POST','http://httpbin.org/post', body=encoded_data, headers={'Content-Type': 'application/json'})
  json.loads(r.data.decode('utf-8'))['json']

  # 输出
  {'attribute': 'value'}

文件和二进制 POST

使用multipart/form-data编码进行二进制文件传参请求,比如上传图片或其他文件,由于这种场景已经不再适用,这块不继续讲解

参考文档

❤️❤️❤️读者每一份热爱都是笔者前进的动力!

我是三十一,感谢各位朋友:求点赞、求收藏、求评论,大家下期见!

相关文章
|
3月前
|
NoSQL 算法 Redis
Redis系列-11.RedLock算法和底层源码分析
Redis系列-11.RedLock算法和底层源码分析
83 0
|
缓存 算法 测试技术
vue3源码分析——手写diff算法
通过上面的测试用例,可以看到是分了7种情况来的,在本篇文章,不采用编写测试用例,有兴趣的可以自己去github上面查看,这里主要使用图文加上代码,帮助大家更快的理解vue3中的diff算法
vue3源码分析——手写diff算法
|
存储 数据采集 缓存
Memcached源码分析 - LRU淘汰算法(6)
Memcached源码分析 - 网络模型(1)Memcached源码分析 - 命令解析(2)Memcached源码分析 - 数据存储(3)Memcached源码分析 - 增删改查操作(4)Memcached源码分析 - 内存存储机制Slabs(5)Memcached源码分析 - LRU淘汰算法(6)Memcached源码分析 - 消息回应(7) Memcached的LRU几种策略 惰性删除。
1596 0
|
机器学习/深度学习 算法 索引
机器学习实战:K近邻算法(源码分析)
学习机器学习的总结: 先把代码放到这儿,话说一句一句看着打真的好累,还好可以通过debug一步一步观察变量,理解顿时快了许多。
1234 0
|
算法 关系型数据库 C语言
PgSQL · 源码分析 · PG中的无锁算法和原子操作应用一则
原子操作概述 近年来随着服务器上CPU核数的不断增加,无锁算法(Lock Free)越来越广泛的被应用于高并发的系统中。PostgreSQL 做为世界上最高级开源数据库也在9.5时引入了无锁算法。本文先介绍了无锁算法和原子操作在PostgreSQL中的具体实现, 再通过一个Patch来看一下在PostgreSQL中是如何利用它来解决实际的高并发问题的。 无锁算法是利用CPU的原子操作实现的数
2298 0
|
算法 TensorFlow 算法框架/工具
TensorFlow学习笔记之五——源码分析之最近算法
import numpy as np import tensorflow as tf # Import MINST data import input_data mnist = input_data.read_data_sets("/tmp/data/", one_hot=True) #这里主要是导入数据,数据通过input_data.py已经下载到/tmp/data/目录之下了,这里下载数据的时候,需要提前用浏览器尝试是否可以打开 #http://yann.lecun.com/exdb/mnist/,如果打不开,下载数据阶段会报错。
1331 0
|
6天前
|
算法
基于模糊控制算法的倒立摆控制系统matlab仿真
本项目构建了一个基于模糊控制算法的倒立摆控制系统,利用MATLAB 2022a实现了从不稳定到稳定状态的转变,并输出了相应的动画和收敛过程。模糊控制器通过对小车位置与摆的角度误差及其变化量进行模糊化处理,依据预设的模糊规则库进行模糊推理并最终去模糊化为精确的控制量,成功地使倒立摆维持在直立位置。该方法无需精确数学模型,适用于处理系统的非线性和不确定性。
基于模糊控制算法的倒立摆控制系统matlab仿真
|
1天前
|
算法 数据安全/隐私保护
基于LS算法的OFDM+QPSK系统信道估计均衡matlab性能仿真
基于MATLAB 2022a的仿真展示了OFDM+QPSK系统中最小二乘(LS)算法的信道估计与均衡效果。OFDM利用多个低速率子载波提高频谱效率,通过循环前缀克服多径衰落。LS算法依据导频符号估计信道参数,进而设计均衡器以恢复数据符号。核心程序实现了OFDM信号处理流程,包括加性高斯白噪声的加入、保护间隔去除、快速傅立叶变换及信道估计与均衡等步骤,并最终计算误码率,验证了算法的有效性。
9 2
|
1天前
|
算法
基于GA-PSO遗传粒子群混合优化算法的CVRP问题求解matlab仿真
本文介绍了一种基于GA-PSO混合优化算法求解带容量限制的车辆路径问题(CVRP)的方法。在MATLAB2022a环境下运行,通过遗传算法的全局搜索与粒子群算法的局部优化能力互补,高效寻找最优解。程序采用自然数编码策略,通过选择、交叉、变异操作及粒子速度和位置更新,不断迭代直至满足终止条件,旨在最小化总行驶距离的同时满足客户需求和车辆载重限制。
|
6天前
|
机器学习/深度学习 算法 定位技术
MATLAB - 遗传算法(GA)求解旅行商问题(TSP)
MATLAB - 遗传算法(GA)求解旅行商问题(TSP)
13 3

热门文章

最新文章