python redis链接建立实现分析

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介:

  今天在写zabbix storm job监控脚本的时候用到了python的redis模块,之前也有用过,但是没有过多的了解,今天看了下相关的api和源码,看到有ConnectionPool的实现,这里简单说下。
在ConnectionPool之前,如果需要连接redis,我都是用StrictRedis这个类,在源码中可以看到这个类的具体解释:

1
2
redis.StrictRedis Implementation of the Redis protocol.This abstract class provides a Python interface to all Redis commands and an 
implementation of the Redis protocol.Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server

使用的方法:

1
2
  r = redis.StrictRedis(host = xxxx, port = xxxx, db = xxxx)
  r.xxxx()

有了ConnectionPool这个类之后,可以使用如下方法

1
2
pool  =  redis.ConnectionPool(host = xxx, port = xxx, db = xxxx)
=  redis.Redis(connection_pool = pool)

这里Redis是StrictRedis的子类
简单分析如下:
在StrictRedis类的__init__方法中,可以初始化connection_pool这个参数,其对应的是一个ConnectionPool的对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class  StrictRedis( object ):
........
     def  __init__( self , host = 'localhost' , port = 6379 ,
                  db = 0 , password = None , socket_timeout = None ,
                  socket_connect_timeout = None ,
                  socket_keepalive = None , socket_keepalive_options = None ,
                  connection_pool = None , unix_socket_path = None ,
                  encoding = 'utf-8' , encoding_errors = 'strict' ,
                  charset = None , errors = None ,
                  decode_responses = False , retry_on_timeout = False ,
                  ssl = False , ssl_keyfile = None , ssl_certfile = None ,
                  ssl_cert_reqs = None , ssl_ca_certs = None ):
          if  not  connection_pool:
              ..........
               connection_pool  =  ConnectionPool( * * kwargs)
          self .connection_pool  =  connection_pool

在StrictRedis的实例执行具体的命令时会调用execute_command方法,这里可以看到具体实现是从连接池中获取一个具体的连接,然后执行命令,完成后释放连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    # COMMAND EXECUTION AND PROTOCOL PARSING
     def  execute_command( self * args,  * * options):
         "Execute a command and return a parsed response"
         pool  =  self .connection_pool
         command_name  =  args[ 0 ]
         connection  =  pool.get_connection(command_name,  * * options)   #调用ConnectionPool.get_connection方法获取一个连接
         try :
             connection.send_command( * args)   #命令执行,这里为Connection.send_command
             return  self .parse_response(connection, command_name,  * * options)
         except  (ConnectionError, TimeoutError) as e:
             connection.disconnect()
             if  not  connection.retry_on_timeout  and  isinstance (e, TimeoutError):
                 raise
             connection.send_command( * args)  
             return  self .parse_response(connection, command_name,  * * options)
         finally :
             pool.release(connection)   #调用ConnectionPool.release释放连接

在来看看ConnectionPool类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
      class  ConnectionPool( object ):  
        ...........
     def  __init__( self , connection_class = Connection, max_connections = None ,
                  * * connection_kwargs):    #类初始化时调用构造函数
         max_connections  =  max_connections  or  2  * *  31
         if  not  isinstance (max_connections, ( int long ))  or  max_connections <  0 :   #判断输入的max_connections是否合法
             raise  ValueError( '"max_connections" must be a positive integer' )
         self .connection_class  =  connection_class   #设置对应的参数
         self .connection_kwargs  =  connection_kwargs
         self .max_connections  =  max_connections
         self .reset()   #初始化ConnectionPool 时的reset操作
     def  reset( self ):
         self .pid  =  os.getpid()
         self ._created_connections  =  0   #已经创建的连接的计数器
         self ._available_connections  =  []    #声明一个空的数组,用来存放可用的连接
         self ._in_use_connections  =  set ()   #声明一个空的集合,用来存放已经在用的连接
         self ._check_lock  =  threading.Lock()
.......
     def  get_connection( self , command_name,  * keys,  * * options):   #在连接池中获取连接的方法
         "Get a connection from the pool"
         self ._checkpid()
         try :
             connection  =  self ._available_connections.pop()   #获取并删除代表连接的元素,在第一次获取connectiong时,因为_available_connections是一个空的数组,
             会直接调用make_connection方法
         except  IndexError:
             connection  =  self .make_connection()
         self ._in_use_connections.add(connection)    #向代表正在使用的连接的集合中添加元素
         return  connection   
     def  make_connection( self ):  #在_available_connections数组为空时获取连接调用的方法
         "Create a new connection"
         if  self ._created_connections > =  self .max_connections:    #判断创建的连接是否已经达到最大限制,max_connections可以通过参数初始化
             raise  ConnectionError( "Too many connections" )
         self ._created_connections  + =  1    #把代表已经创建的连接的数值+1
         return  self .connection_class( * * self .connection_kwargs)      #返回有效的连接,默认为Connection(**self.connection_kwargs)
     def  release( self , connection):   #释放连接,链接并没有断开,只是存在链接池中
         "Releases the connection back to the pool"
         self ._checkpid()
         if  connection.pid ! =  self .pid:
             return
         self ._in_use_connections.remove(connection)    #从集合中删除元素
         self ._available_connections.append(connection)  #并添加到_available_connections 的数组中
     def  disconnect( self ):  #断开所有连接池中的链接
         "Disconnects all connections in the pool"
         all_conns  =  chain( self ._available_connections,
                           self ._in_use_connections)
         for  connection  in  all_conns:
             connection.disconnect()

execute_command最终调用的是Connection.send_command方法,关闭链接为 Connection.disconnect方法,而Connection类的实现:

1
2
3
4
5
6
7
class  Connection( object ):
     "Manages TCP communication to and from a Redis server"
     def  __del__( self ):    #对象删除时的操作,调用disconnect释放连接
         try :
             self .disconnect()
         except  Exception:
             pass

核心的链接建立方法是通过socket模块实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
     def  _connect( self ):
         err  =  None
         for  res  in  socket.getaddrinfo( self .host,  self .port,  0 ,
                                       socket.SOCK_STREAM):
             family, socktype, proto, canonname, socket_address  =  res
             sock  =  None
             try :
                 sock  =  socket.socket(family, socktype, proto)
                 # TCP_NODELAY
                 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY,  1 )
                 # TCP_KEEPALIVE
                 if  self .socket_keepalive:    #构造函数中默认 socket_keepalive=False,因此这里默认为短连接
                     sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE,  1 )
                     for  k, v  in  iteritems( self .socket_keepalive_options):
                         sock.setsockopt(socket.SOL_TCP, k, v)
                 # set the socket_connect_timeout before we connect
                 sock.settimeout( self .socket_connect_timeout)   #构造函数中默认socket_connect_timeout=None,即连接为blocking的模式
                 # connect
                 sock.connect(socket_address)
                 # set the socket_timeout now that we're connected
                 sock.settimeout( self .socket_timeout)   #构造函数中默认socket_timeout=None
                 return  sock
             except  socket.error as _:
                 err  =  _
                 if  sock  is  not  None :
                     sock.close()
.....

关闭链接的方法:

1
2
3
4
5
6
7
8
9
10
11
     def  disconnect( self ):
         "Disconnects from the Redis server"
         self ._parser.on_disconnect()
         if  self ._sock  is  None :
             return
         try :
             self ._sock.shutdown(socket.SHUT_RDWR)   #先shutdown再close
             self ._sock.close()
         except  socket.error:
             pass
         self ._sock  =  None

        
可以小结如下
1)默认情况下每创建一个Redis实例都会构造出一个ConnectionPool实例,每一次访问redis都会从这个连接池得到一个连接,操作完成后会把该连接放回连接池(连接并没有释放),可以构造一个统一的ConnectionPool,在创建Redis实例时,可以将该ConnectionPool传入,那么后续的操作会从给定的ConnectionPool获得连接,不会再重复创建ConnectionPool。
2)默认情况下没有设置keepalive和timeout,建立的连接是blocking模式的短连接。
3)不考虑底层tcp的情况下,连接池中的连接会在ConnectionPool.disconnect中统一销毁。



本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1583541,如需转载请自行联系原作者

相关文章
|
2月前
|
存储 分布式计算 大数据
基于Python大数据的的电商用户行为分析系统
本系统基于Django、Scrapy与Hadoop技术,构建电商用户行为分析平台。通过爬取与处理海量用户数据,实现行为追踪、偏好分析与个性化推荐,助力企业提升营销精准度与用户体验,推动电商智能化发展。
|
3月前
|
缓存 供应链 监控
1688item_search_factory - 按关键字搜索工厂数据接口深度分析及 Python 实现
item_search_factory接口专为B2B电商供应链优化设计,支持通过关键词精准检索工厂信息,涵盖资质、产能、地理位置等核心数据,助力企业高效开发货源、分析产业集群与评估供应商。
|
3月前
|
缓存 监控 算法
item_get - Lazada 商品详情详情接口深度分析及 Python 实现
Lazada商品详情接口item_get可获取商品全维度数据,包括价格、库存、SKU、促销及卖家信息,支持东南亚六国站点,适用于竞品监控、定价策略与市场分析,助力跨境卖家精准决策。
|
3月前
|
JSON 监控 数据格式
1688 item_search_app 关键字搜索商品接口深度分析及 Python 实现
1688开放平台item_search_app接口专为移动端优化,支持关键词搜索、多维度筛选与排序,可获取商品详情及供应商信息,适用于货源采集、价格监控与竞品分析,助力采购决策。
|
3月前
|
缓存 供应链 监控
VVIC seller_search 排行榜搜索接口深度分析及 Python 实现
VVIC搜款网seller_search接口提供服装批发市场的商品及商家排行榜数据,涵盖热销榜、销量排名、类目趋势等,支持多维度筛选与数据分析,助力选品决策、竞品分析与市场预测,为服装供应链提供有力数据支撑。
|
3月前
|
缓存 监控 算法
唯品会item_search - 按关键字搜索 VIP 商品接口深度分析及 Python 实现
唯品会item_search接口支持通过关键词、分类、价格等条件检索商品,广泛应用于电商数据分析、竞品监控与市场调研。结合Python可实现搜索、分析、可视化及数据导出,助力精准决策。
|
2月前
|
机器学习/深度学习 大数据 关系型数据库
基于python大数据的台风灾害分析及预测系统
针对台风灾害预警滞后、精度不足等问题,本研究基于Python与大数据技术,构建多源数据融合的台风预测系统。利用机器学习提升路径与强度预测准确率,结合Django框架实现动态可视化与实时预警,为防灾决策提供科学支持,显著提高应急响应效率,具有重要社会经济价值。
|
2月前
|
机器学习/深度学习 大数据 关系型数据库
基于python大数据的青少年网络使用情况分析及预测系统
本研究基于Python大数据技术,构建青少年网络行为分析系统,旨在破解现有防沉迷模式下用户画像模糊、预警滞后等难题。通过整合多平台亿级数据,运用机器学习实现精准行为预测与实时干预,推动数字治理向“数据驱动”转型,为家庭、学校及政府提供科学决策支持,助力青少年健康上网。

推荐镜像

更多