python使用DBUtil连接池封装psycopg2/hologres

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
实时计算 Flink 版,5000CU*H 3个月
简介: 这段代码是一个Python类`PsycopgConn`,用于管理PostgreSQL数据库连接池。类使用了`dbutils.pooled_db.PooledDB`来创建连接池,支持多线程,并提供了获取连接、关闭连接池、执行查询(`SelectSql`)、插入(`InsertSql`)和更新(`UpdateSql`)SQL的方法。类实现单例模式以确保只有一个实例存在。连接配置包括主机、端口等



pip install psycopg2 psycopg2-binary



import psycopg2
import datetime
import base64

import sys
import psycopg2.extras
from dbutils.pooled_db import PooledDB
import threading
import traceback
 

u = ''
p = ''

 
"""
全局单例 连接池
"""
class PsycopgConn:
    # 多线程的锁 针对单例初始化的过程 加锁
    _instance_lock = threading.Lock()
 
    """
    init在new之后被调用
    """
    def __init__(self):
        self.init_pool()
 
    """
    重载构造函数 实现全局单例
    """
    def __new__(cls, *args, **kwargs):
        if not hasattr(cls, '_instance'):
            with PsycopgConn._instance_lock:
                if not hasattr(cls, '_instance'):
                    PsycopgConn._instance = object.__new__(cls)
                return PsycopgConn._instance
 
    def get_pool_conn(self):
        """
        获取连接池连接
        :return:
        """
        if not self._pool:
            self.init_pool()
        return self._pool.connection()
 
    def init_pool(self):
        """
        初始化连接池
        :return:
        """
        try:
            pool = PooledDB(
                creator=psycopg2,  # 使用连接数据库的模块 psycopg2
                maxconnections=20,  # 连接池允许的最大连接数,0 和 None 表示不限制连接数
                mincached=1,  # 初始化时,链接池中至少创建的空闲的链接,0 表示不创建
                maxcached=5,  # 链接池中最多闲置的链接,0 和 None 不限制
                blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
                maxusage=None,  # 一个链接最多被重复使用的次数,None 表示无限制
                setsession=[],  # 开始会话前执行的命令列表
                host='imp-sothebys-cn-internal-zhangbei.hologres.aliyuncs.com',
                port=8099,
                user=u.decode("utf-8"),
                password=p.decode("utf-8"),
                database='sothebys_data')
            self._pool = pool
        except:
            logger.error("trace back: %s", traceback.format_exc())
            logger.info('connect postgresql error')
            self.close_pool()
 
    def close_pool(self):
        """
        关闭连接池连接
        :return:
        """
        if self._pool != None:
            self._pool.close()
 
    def SelectSql(self, sql):
        """
        查询
        :param sql:
        :return:
        """
        try:
            conn = self.get_pool_conn()
            cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)  # 设置返回格式为字典
            cursor.execute(sql)
            result = cursor.fetchall()
        except Exception as e:
            logger.error("trace back: %s", traceback.format_exc())
            logger.info('execute sql {0} is error'.format(sql))
        finally:
            cursor.close()
            conn.close()
        return result
 
    def InsertSql(self, sql):
        """
        插入数据
        :param sql:
        :return:
        """
        try:
            conn = self.get_pool_conn()
            cursor = conn.cursor()
            cursor.execute(sql)
            result = True
        except Exception as e:
            logger.error("trace back: %s", traceback.format_exc())
            logger.info('ERROR: execute  {0} causes error'.format(sql))
        finally:
            cursor.close()
            conn.commit()
            conn.close()
        return result
 
    def UpdateSql(self, sql):
        """
        更新数据
        :param sql:
        :return:
        """
        try:
            conn = self.get_pool_conn()
            cursor = conn.cursor()
            cursor.execute(sql)
            result = True
        except Exception as e:
            logger.error("trace back: %s", traceback.format_exc())
            logger.info('ERROR: execute  {0} causes error'.format(sql))
        finally:
            cursor.close()
            conn.commit()
            conn.close()
        return result



调用者如何使用


pgsql = PsycopgConn()

result = pgsql.SelectSql(sql)



常见报错


1、 undefined symbol: PQconninfo

ImportError: /home/admin/.pyenv/versions/3.6.2/
lib/python3.6/site-packages/psycopg2/
_psycopg.cpython-36m-x86_64-linux-gnu.so: undefined symbol: PQconninfo


装一下这个包即可

pip install psycopg2-binary



相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
相关文章
|
2天前
|
Python
用python转移小文件到指定目录并压缩,脚本封装
这篇文章介绍了如何使用Python脚本将大量小文件转移到指定目录,并在达到大约250MB时进行压缩。
14 2
|
1天前
|
关系型数据库 MySQL Python
mysql之python客户端封装类
mysql之python客户端封装类
|
29天前
|
数据安全/隐私保护 开发者 Python
在 Python 中定义封装?
【8月更文挑战第29天】
24 8
|
1月前
|
Docker Python 容器
[docker]封装python的docker镜像
[docker]封装python的docker镜像
|
1月前
|
关系型数据库 Linux PostgreSQL
【Azure 应用服务】Azure Function App Linux环境下的Python Function,安装 psycopg2 模块错误
【Azure 应用服务】Azure Function App Linux环境下的Python Function,安装 psycopg2 模块错误
|
1月前
|
网络协议 Python
python requests库如何使用http连接池降低延迟 keepalive复用连接
Python的`requests`库通过内置的连接池机制支持HTTP Keep-Alive特性,允许复用TCP连接以发送多个请求,减少连接开销。默认情况下,`requests`不显式禁用Keep-Alive,其行为取决于底层HTTP库(如urllib3)及服务器的支持。通过创建`Session`对象并自定义`HTTPAdapter`,可以调整连接池大小和重试策略,进一步优化连接复用。测试显示,使用`Session`和定制的`HTTPAdapter`比普通请求方法能显著减少连续请求间的时间消耗,体现了Keep-Alive的优势。
|
1月前
|
消息中间件 SQL Java
实时数仓 Hologres产品使用合集之如何用python将kafka数据写入
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
3月前
|
Python
Python面向对象进阶:深入解析面向对象三要素——封装、继承与多态
Python面向对象进阶:深入解析面向对象三要素——封装、继承与多态
|
2月前
|
数据挖掘 Python
封装和解构是 Python 中常用的技术
封装和解构是 Python 中常用的技术
|
3月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之对于Hologres的Python查询,该如何操作
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
39 0

相关产品

  • 实时数仓 Hologres