python使用DBUtil连接池封装psycopg2/hologres

本文涉及的产品
大数据开发治理平台 DataWorks,不限时长
实时数仓Hologres,5000CU*H 100GB 3个月
实时计算 Flink 版,5000CU*H 3个月
简介: 这段代码是一个Python类`PsycopgConn`,用于管理PostgreSQL数据库连接池。类使用了`dbutils.pooled_db.PooledDB`来创建连接池,支持多线程,并提供了获取连接、关闭连接池、执行查询(`SelectSql`)、插入(`InsertSql`)和更新(`UpdateSql`)SQL的方法。类实现单例模式以确保只有一个实例存在。连接配置包括主机、端口等



pip install psycopg2 psycopg2-binary



import psycopg2
import datetime
import base64

import sys
import psycopg2.extras
from dbutils.pooled_db import PooledDB
import threading
import traceback
 

u = ''
p = ''

 
"""
全局单例 连接池
"""
class PsycopgConn:
    # 多线程的锁 针对单例初始化的过程 加锁
    _instance_lock = threading.Lock()
 
    """
    init在new之后被调用
    """
    def __init__(self):
        self.init_pool()
 
    """
    重载构造函数 实现全局单例
    """
    def __new__(cls, *args, **kwargs):
        if not hasattr(cls, '_instance'):
            with PsycopgConn._instance_lock:
                if not hasattr(cls, '_instance'):
                    PsycopgConn._instance = object.__new__(cls)
                return PsycopgConn._instance
 
    def get_pool_conn(self):
        """
        获取连接池连接
        :return:
        """
        if not self._pool:
            self.init_pool()
        return self._pool.connection()
 
    def init_pool(self):
        """
        初始化连接池
        :return:
        """
        try:
            pool = PooledDB(
                creator=psycopg2,  # 使用连接数据库的模块 psycopg2
                maxconnections=20,  # 连接池允许的最大连接数,0 和 None 表示不限制连接数
                mincached=1,  # 初始化时,链接池中至少创建的空闲的链接,0 表示不创建
                maxcached=5,  # 链接池中最多闲置的链接,0 和 None 不限制
                blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
                maxusage=None,  # 一个链接最多被重复使用的次数,None 表示无限制
                setsession=[],  # 开始会话前执行的命令列表
                host='imp-sothebys-cn-internal-zhangbei.hologres.aliyuncs.com',
                port=8099,
                user=u.decode("utf-8"),
                password=p.decode("utf-8"),
                database='sothebys_data')
            self._pool = pool
        except:
            logger.error("trace back: %s", traceback.format_exc())
            logger.info('connect postgresql error')
            self.close_pool()
 
    def close_pool(self):
        """
        关闭连接池连接
        :return:
        """
        if self._pool != None:
            self._pool.close()
 
    def SelectSql(self, sql):
        """
        查询
        :param sql:
        :return:
        """
        try:
            conn = self.get_pool_conn()
            cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)  # 设置返回格式为字典
            cursor.execute(sql)
            result = cursor.fetchall()
        except Exception as e:
            logger.error("trace back: %s", traceback.format_exc())
            logger.info('execute sql {0} is error'.format(sql))
        finally:
            cursor.close()
            conn.close()
        return result
 
    def InsertSql(self, sql):
        """
        插入数据
        :param sql:
        :return:
        """
        try:
            conn = self.get_pool_conn()
            cursor = conn.cursor()
            cursor.execute(sql)
            result = True
        except Exception as e:
            logger.error("trace back: %s", traceback.format_exc())
            logger.info('ERROR: execute  {0} causes error'.format(sql))
        finally:
            cursor.close()
            conn.commit()
            conn.close()
        return result
 
    def UpdateSql(self, sql):
        """
        更新数据
        :param sql:
        :return:
        """
        try:
            conn = self.get_pool_conn()
            cursor = conn.cursor()
            cursor.execute(sql)
            result = True
        except Exception as e:
            logger.error("trace back: %s", traceback.format_exc())
            logger.info('ERROR: execute  {0} causes error'.format(sql))
        finally:
            cursor.close()
            conn.commit()
            conn.close()
        return result



调用者如何使用


pgsql = PsycopgConn()

result = pgsql.SelectSql(sql)



常见报错


1、 undefined symbol: PQconninfo

ImportError: /home/admin/.pyenv/versions/3.6.2/
lib/python3.6/site-packages/psycopg2/
_psycopg.cpython-36m-x86_64-linux-gnu.so: undefined symbol: PQconninfo


装一下这个包即可

pip install psycopg2-binary



相关实践学习
基于Hologres+PAI+计算巢,5分钟搭建企业级AI问答知识库
本场景采用阿里云人工智能平台PAI、Hologres向量计算和计算巢,搭建企业级AI问答知识库。通过本教程的操作,5分钟即可拉起大模型(PAI)、向量计算(Hologres)与WebUI资源,可直接进行对话问答。
相关文章
|
2月前
|
Python
python封装执行cmd命令的方法
python封装执行cmd命令的方法
56 0
|
2月前
|
Android开发 Python
Python封装ADB获取Android设备wifi地址的方法
Python封装ADB获取Android设备wifi地址的方法
115 0
|
2月前
|
Python
请简述Python中的继承、封装和多态的概念。
【2月更文挑战第24天】【2月更文挑战第82篇】请简述Python中的继承、封装和多态的概念。
|
11天前
|
Python
Python面向对象进阶:深入解析面向对象三要素——封装、继承与多态
Python面向对象进阶:深入解析面向对象三要素——封装、继承与多态
|
7天前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之对于Hologres的Python查询,该如何操作
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
12 0
|
2月前
|
Java 数据安全/隐私保护 Python
Python中的封装
Python中的封装
27 5
|
2月前
|
域名解析 缓存 网络协议
Python中使用`requests`库连接池与性能优化技术
【4月更文挑战第12天】在Python的网络编程中,`requests`库因其简洁易用的API和强大的功能而备受欢迎。然而,在高并发或大量请求的场景下,直接使用`requests`发送请求可能会导致性能瓶颈。为了解决这个问题,我们可以利用`requests`库提供的连接池(Connection Pooling)机制,以及采取一些性能优化措施,来提升请求的处理效率和稳定性。
|
2月前
|
Python
Python从入门到精通:深入学习面向对象编程——2.1.2继承、封装和多态的概念
Python从入门到精通:深入学习面向对象编程——2.1.2继承、封装和多态的概念
|
2月前
|
Python
python使用tkinter库,封装操作excel为GUI程序
python使用tkinter库,封装操作excel为GUI程序
284 0
|
2月前
|
Python
07 Python面向对象的三大特点【封装、继承、多态】
07 Python面向对象的三大特点【封装、继承、多态】
14 0

相关产品

  • 实时数仓 Hologres