pip install psycopg2 psycopg2-binary
import psycopg2 import datetime import base64 import sys import psycopg2.extras from dbutils.pooled_db import PooledDB import threading import traceback u = '' p = '' """ 全局单例 连接池 """ class PsycopgConn: # 多线程的锁 针对单例初始化的过程 加锁 _instance_lock = threading.Lock() """ init在new之后被调用 """ def __init__(self): self.init_pool() """ 重载构造函数 实现全局单例 """ def __new__(cls, *args, **kwargs): if not hasattr(cls, '_instance'): with PsycopgConn._instance_lock: if not hasattr(cls, '_instance'): PsycopgConn._instance = object.__new__(cls) return PsycopgConn._instance def get_pool_conn(self): """ 获取连接池连接 :return: """ if not self._pool: self.init_pool() return self._pool.connection() def init_pool(self): """ 初始化连接池 :return: """ try: pool = PooledDB( creator=psycopg2, # 使用连接数据库的模块 psycopg2 maxconnections=20, # 连接池允许的最大连接数,0 和 None 表示不限制连接数 mincached=1, # 初始化时,链接池中至少创建的空闲的链接,0 表示不创建 maxcached=5, # 链接池中最多闲置的链接,0 和 None 不限制 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None 表示无限制 setsession=[], # 开始会话前执行的命令列表 host='imp-sothebys-cn-internal-zhangbei.hologres.aliyuncs.com', port=8099, user=u.decode("utf-8"), password=p.decode("utf-8"), database='sothebys_data') self._pool = pool except: logger.error("trace back: %s", traceback.format_exc()) logger.info('connect postgresql error') self.close_pool() def close_pool(self): """ 关闭连接池连接 :return: """ if self._pool != None: self._pool.close() def SelectSql(self, sql): """ 查询 :param sql: :return: """ try: conn = self.get_pool_conn() cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) # 设置返回格式为字典 cursor.execute(sql) result = cursor.fetchall() except Exception as e: logger.error("trace back: %s", traceback.format_exc()) logger.info('execute sql {0} is error'.format(sql)) finally: cursor.close() conn.close() return result def InsertSql(self, sql): """ 插入数据 :param sql: :return: """ try: conn = self.get_pool_conn() cursor = conn.cursor() cursor.execute(sql) result = True except Exception as e: logger.error("trace back: %s", traceback.format_exc()) logger.info('ERROR: execute {0} causes error'.format(sql)) finally: cursor.close() conn.commit() conn.close() return result def UpdateSql(self, sql): """ 更新数据 :param sql: :return: """ try: conn = self.get_pool_conn() cursor = conn.cursor() cursor.execute(sql) result = True except Exception as e: logger.error("trace back: %s", traceback.format_exc()) logger.info('ERROR: execute {0} causes error'.format(sql)) finally: cursor.close() conn.commit() conn.close() return result
调用者如何使用
pgsql = PsycopgConn()
result = pgsql.SelectSql(sql)
常见报错
1、 undefined symbol: PQconninfo
ImportError: /home/admin/.pyenv/versions/3.6.2/ lib/python3.6/site-packages/psycopg2/ _psycopg.cpython-36m-x86_64-linux-gnu.so: undefined symbol: PQconninfo
装一下这个包即可
pip install psycopg2-binary