SqlAlchemy 2.0 中文文档(四十三)(2)https://developer.aliyun.com/article/1563060
连接池
当调用 connect()
或 execute()
方法时,Engine
将向连接池请求连接。默认的连接池 QueuePool
将根据需要打开到数据库的连接。随着并发语句的执行,QueuePool
将增加其连接池的大小,默认为五个,并允许默认的 “溢出” 十个。由于 Engine
本质上是连接池的“主基地”,因此在应用程序中应该为每个数据库保留一个单独的 Engine
,而不是为每个连接创建一个新的。
注意
默认情况下,SQLite 引擎不使用 QueuePool
。有关 SQLite 连接池使用的详细信息,请参阅 SQLite。
有关连接池的更多信息,请参阅 连接池。
自定义 DBAPI connect() 参数 / 连接时例程
对于需要特殊连接方法的情况,在绝大多数情况下,最合适的方法是在 create_engine()
级别使用多个钩子来自定义此过程。这些在以下子部分中描述。
通过 dbapi.connect() 传递的特殊关键字参数
所有的 Python DBAPI 都接受除了基本连接之外的额外参数。常见参数包括用于指定字符集编码和超时值的参数;更复杂的数据包括特殊的 DBAPI 常量和对象以及 SSL 子参数。有两种简单的方式可以传递这些参数而不复杂化。
将参数添加到 URL 查询字符串中
简单的字符串值,以及一些数字值和布尔标志,通常可以直接在 URL 的查询字符串中指定。一个常见的例子是接受字符编码参数 encoding
的 DBAPI,例如大多数 MySQL DBAPI:
engine = create_engine("mysql+pymysql://user:pass@host/test?charset=utf8mb4")
使用查询字符串的优点在于可以在配置文件中指定其他的 DBAPI 选项,这样做的方式在 URL 中指定的 DBAPI 方式是可移植的。在此级别传递的具体参数因 SQLAlchemy 方言而异。某些方言将所有参数都作为字符串传递,而其他方言将解析特定的数据类型并将参数移动到不同的位置,例如到驱动程序级别的 DSN 和连接字符串中。由于此领域中方言的行为目前存在差异,因此应该查阅特定方言的文档以查看是否支持在此级别上支持特定参数。
提示
对于给定 URL 显示传递给 DBAPI 的确切参数的一般技术可以直接使用Dialect.create_connect_args()
方法进行如下操作:
>>> from sqlalchemy import create_engine >>> engine = create_engine( ... "mysql+pymysql://some_user:some_pass@some_host/test?charset=utf8mb4" ... ) >>> args, kwargs = engine.dialect.create_connect_args(engine.url) >>> args, kwargs ([], {'host': 'some_host', 'database': 'test', 'user': 'some_user', 'password': 'some_pass', 'charset': 'utf8mb4', 'client_flag': 2})
上述的args, kwargs
对通常作为dbapi.connect(*args, **kwargs)
传递给 DBAPI。
使用 connect_args 字典参数
将任何参数传递给保证在任何时候传递所有参数的dbapi.connect()
函数的更通用的系统是create_engine.connect_args
字典参数。这可用于否则不被方言处理的参数添加到查询字符串时,以及当需要将特殊子结构或对象传递给 DBAPI 时。有时只是需要将特定标志发送为True
符号,而 SQLAlchemy 方言并不知道如何将其从 URL 中呈现的字符串形式强制为此关键字参数。下面说明了使用取代连接的基础实现的 psycopg2“连接工厂”的用法:
engine = create_engine( "postgresql+psycopg2://user:pass@hostname/dbname", connect_args={"connection_factory": MyConnectionFactory}, )
另一个示例是 pyodbc 的“timeout”参数:
engine = create_engine( "mssql+pyodbc://user:pass@sqlsrvr?driver=ODBC+Driver+13+for+SQL+Server", connect_args={"timeout": 30}, )
上述示例还说明了 URL“查询字符串”参数以及create_engine.connect_args
都可以同时使用;在 pyodbc 的情况下,“driver”关键字在 URL 中具有特殊含义。
控制参数传递给 DBAPI connect()函数的方式
除了操作传递给connect()
的参数之外,我们还可以使用DialectEvents.do_connect()
事件挂钩进一步定制如何调用 DBAPI connect()
函数本身。此挂钩将传递完整的*args, **kwargs
,方言将发送到connect()
。然后,可以在原地修改这些集合以更改它们的使用方式:
from sqlalchemy import event engine = create_engine("postgresql+psycopg2://user:pass@hostname/dbname") @event.listens_for(engine, "do_connect") def receive_do_connect(dialect, conn_rec, cargs, cparams): cparams["connection_factory"] = MyConnectionFactory
生成动态认证令牌
DialectEvents.do_connect()
也是一个理想的方法,可以动态插入可能在Engine
生命周期内更改的认证令牌。例如,如果令牌由get_authentication_token()
生成并作为token
参数传递给 DBAPI,则可以实现如下:
from sqlalchemy import event engine = create_engine("postgresql+psycopg2://user@hostname/dbname") @event.listens_for(engine, "do_connect") def provide_token(dialect, conn_rec, cargs, cparams): cparams["token"] = get_authentication_token()
另请参见
使用访问令牌连接到数据库 - 一个涉及 SQL Server 的更具体的示例
修改连接后的 DBAPI 连接或在连接后运行命令
对于 SQLAlchemy 创建的 DBAPI 连接,没有问题,但我们希望在实际使用之前修改完成的连接,例如设置特殊标志或运行某些命令,PoolEvents.connect()
事件钩子是最合适的钩子。这个钩子在每次创建新连接时都会被调用,在 SQLAlchemy 使用之前:
from sqlalchemy import event engine = create_engine("postgresql+psycopg2://user:pass@hostname/dbname") @event.listens_for(engine, "connect") def connect(dbapi_connection, connection_record): cursor_obj = dbapi_connection.cursor() cursor_obj.execute("SET some session variables") cursor_obj.close()
完全替换 DBAPI 的 connect()
函数
最后,DialectEvents.do_connect()
事件钩子也可以允许我们完全接管连接过程,建立连接并返回它:
from sqlalchemy import event engine = create_engine("postgresql+psycopg2://user:pass@hostname/dbname") @event.listens_for(engine, "do_connect") def receive_do_connect(dialect, conn_rec, cargs, cparams): # return the new DBAPI connection with whatever we'd like to # do return psycopg2.connect(*cargs, **cparams)
DialectEvents.do_connect()
钩子取代了以前的 create_engine.creator
钩子,但前者仍然可用。DialectEvents.do_connect()
具有一个明显的优势,就是解析自 URL 的完整参数也会传递给用户定义的函数,而这在 create_engine.creator
中不是这样的。 ## 配置日志记录
Python 的标准 logging 模块用于实现 SQLAlchemy 的信息和调试日志输出。这使得 SQLAlchemy 的日志记录可以以标准方式与其他应用程序和库集成。create_engine()
还有两个参数 create_engine.echo
和 create_engine.echo_pool
,允许立即将日志记录到 sys.stdout
以便进行本地开发;这些参数最终会与下面描述的常规 Python 记录器交互。
本节假设您熟悉上面链接的日志记录模块。SQLAlchemy 所执行的所有日志记录都存在于 sqlalchemy
命名空间下,就像 logging.getLogger('sqlalchemy')
一样。当配置了日志记录(例如通过 logging.basicConfig()
),可以打开的 SA 日志记录器的通用命名空间如下:
sqlalchemy.engine
- 控制 SQL 回显。设置为logging.INFO
以输出 SQL 查询,设置为logging.DEBUG
以输出查询 + 结果集。这些设置等同于create_engine.echo
上的echo=True
和echo="debug"
。sqlalchemy.pool
- 控制连接池日志记录。设置为logging.INFO
以记录连接失效和重用事件;设置为logging.DEBUG
以另外记录所有池的签入和签出。这些设置等同于在create_engine.echo_pool
上分别设置pool_echo=True
和pool_echo="debug"
。sqlalchemy.dialects
- 控制用于 SQL 方言的自定义日志记录,日志记录程度在特定方言中使用的情况下通常很少。sqlalchemy.orm
- 控制在 ORM 中使用日志记录的各种 ORM 函数的日志记录程度,通常很少。设置为logging.INFO
以记录一些关于映射器配置的顶级信息。
例如,要使用 Python 日志记录而不是 echo=True
标志来记录 SQL 查询:
import logging logging.basicConfig() logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
默认情况下,整个 sqlalchemy
命名空间中的日志级别设置为 logging.WARN
,以便即使在已启用日志记录的应用程序中,也不会发生任何日志操作。
注意
SQLAlchemy Engine
通过仅在检测到当前日志级别为 logging.INFO
或 logging.DEBUG
时发出日志语句来节省 Python 函数调用开销。它仅在从连接池获取新连接时检查此级别。因此,在已经运行的应用程序中更改日志配置时,任何当前活动的 Connection
(通常更常见的是活动事务中的 Session
对象)将根据新配置不会记录任何 SQL,直到获取新的 Connection
(对于 Session
,这是在当前事务结束并开始新事务之后)。
关于回显标志的更多信息
如前所述,create_engine.echo
和 create_engine.echo_pool
参数是立即记录到 sys.stdout
的快捷方式:
>>> from sqlalchemy import create_engine, text >>> e = create_engine("sqlite://", echo=True, echo_pool="debug") >>> with e.connect() as conn: ... print(conn.scalar(text("select 'hi'"))) 2020-10-24 12:54:57,701 DEBUG sqlalchemy.pool.impl.SingletonThreadPool Created new connection <sqlite3.Connection object at 0x7f287819ac60> 2020-10-24 12:54:57,701 DEBUG sqlalchemy.pool.impl.SingletonThreadPool Connection <sqlite3.Connection object at 0x7f287819ac60> checked out from pool 2020-10-24 12:54:57,702 INFO sqlalchemy.engine.Engine select 'hi' 2020-10-24 12:54:57,702 INFO sqlalchemy.engine.Engine () hi 2020-10-24 12:54:57,703 DEBUG sqlalchemy.pool.impl.SingletonThreadPool Connection <sqlite3.Connection object at 0x7f287819ac60> being returned to pool 2020-10-24 12:54:57,704 DEBUG sqlalchemy.pool.impl.SingletonThreadPool Connection <sqlite3.Connection object at 0x7f287819ac60> rollback-on-return
使用这些标志大致相当于:
import logging logging.basicConfig() logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO) logging.getLogger("sqlalchemy.pool").setLevel(logging.DEBUG)
需要注意的是,这两个标志独立于任何现有的日志配置,并且将无条件使用 logging.basicConfig()
。这会在任何现有的记录器配置之外额外进行配置。因此,在明确配置日志记录时,请始终确保所有回显标志始终设置为 False,以避免获得重复的日志行。
设置日志名称
实例的记录器名称,例如 Engine
或 Pool
的默认值为使用截断的十六进制标识符字符串。要将其设置为特定名称,请使用 create_engine.logging_name
和 create_engine.pool_logging_name
与 sqlalchemy.create_engine()
;名称将附加到记录名称 sqlalchemy.engine.Engine
:
>>> import logging >>> from sqlalchemy import create_engine >>> from sqlalchemy import text >>> logging.basicConfig() >>> logging.getLogger("sqlalchemy.engine.Engine.myengine").setLevel(logging.INFO) >>> e = create_engine("sqlite://", logging_name="myengine") >>> with e.connect() as conn: ... conn.execute(text("select 'hi'")) 2020-10-24 12:47:04,291 INFO sqlalchemy.engine.Engine.myengine select 'hi' 2020-10-24 12:47:04,292 INFO sqlalchemy.engine.Engine.myengine ()
提示
create_engine.logging_name
和 create_engine.pool_logging_name
参数也可以与 create_engine.echo
和 create_engine.echo_pool
一起使用。但是,如果其他引擎的回声标志设置为 True,而没有记录名称,则将发生不可避免的双重记录条件。这是因为将自动为 sqlalchemy.engine.Engine
添加一个处理程序,该处理程序将同时记录无名称引擎和具有记录名称的引擎的消息。例如:
from sqlalchemy import create_engine, text e1 = create_engine("sqlite://", echo=True, logging_name="myname") with e1.begin() as conn: conn.execute(text("SELECT 1")) e2 = create_engine("sqlite://", echo=True) with e2.begin() as conn: conn.execute(text("SELECT 2")) with e1.begin() as conn: conn.execute(text("SELECT 3"))
上述场景将双重记录 SELECT 3
。要解决此问题,请确保所有引擎都设置了 logging_name
,或者使用显式记录器/处理程序设置,而不使用 create_engine.echo
和 create_engine.echo_pool
。
设置每个连接/子引擎令牌
1.4.0b2 版本中的新功能。
当记录名称适合于在长时间存在的 Engine
对象上建立时,它并不灵活到足以容纳任意大的名称列表,用于跟踪日志消息中的单个连接和/或事务的情况。
对于这种用例,由 Connection
和 Result
对象生成的日志消息本身可以使用其他令牌进行增强,例如事务或请求标识符。 Connection.execution_options.logging_token
参数接受一个字符串参数,该参数可用于建立每个连接的跟踪令牌:
>>> from sqlalchemy import create_engine >>> e = create_engine("sqlite://", echo="debug") >>> with e.connect().execution_options(logging_token="track1") as conn: ... conn.execute(text("select 1")).all() 2021-02-03 11:48:45,754 INFO sqlalchemy.engine.Engine [track1] select 1 2021-02-03 11:48:45,754 INFO sqlalchemy.engine.Engine [track1] [raw sql] () 2021-02-03 11:48:45,754 DEBUG sqlalchemy.engine.Engine [track1] Col ('1',) 2021-02-03 11:48:45,755 DEBUG sqlalchemy.engine.Engine [track1] Row (1,)
Connection.execution_options.logging_token
参数也可以通过create_engine.execution_options
或Engine.execution_options()
在引擎或子引擎上建立。这可能对应用程序的不同组件应用不同的日志令牌而无需创建新引擎很有用:
>>> from sqlalchemy import create_engine >>> e = create_engine("sqlite://", echo="debug") >>> e1 = e.execution_options(logging_token="track1") >>> e2 = e.execution_options(logging_token="track2") >>> with e1.connect() as conn: ... conn.execute(text("select 1")).all() 2021-02-03 11:51:08,960 INFO sqlalchemy.engine.Engine [track1] select 1 2021-02-03 11:51:08,960 INFO sqlalchemy.engine.Engine [track1] [raw sql] () 2021-02-03 11:51:08,960 DEBUG sqlalchemy.engine.Engine [track1] Col ('1',) 2021-02-03 11:51:08,961 DEBUG sqlalchemy.engine.Engine [track1] Row (1,) >>> with e2.connect() as conn: ... conn.execute(text("select 2")).all() 2021-02-03 11:52:05,518 INFO sqlalchemy.engine.Engine [track2] Select 1 2021-02-03 11:52:05,519 INFO sqlalchemy.engine.Engine [track2] [raw sql] () 2021-02-03 11:52:05,520 DEBUG sqlalchemy.engine.Engine [track2] Col ('1',) 2021-02-03 11:52:05,520 DEBUG sqlalchemy.engine.Engine [track2] Row (1,)
SqlAlchemy 2.0 中文文档(四十三)(4)https://developer.aliyun.com/article/1563062