大家好,我是肖恩,源码阅读我们周四见。
Werkzeug是一个全面的WSGI Web应用程序库。它最初是WSGI实用程序各种工具的简单集合,现已成为最高级的WSGI实用程序库之一,是Flask背后的项目。Werkzeug
是一个德语单词,工具的意思。这个单词发音对我来说,有点困难(可能也是它知名度不高的重要因素之一),刚好官方logo是个锤子,我就简称“德国锤子”。文章已经完成上下两篇,上篇介绍:
- serving && wsgi
- request && response
- local的
下篇介绍:
- middleware
- routing && urls
- datastructures
“德国锤子” 还有3个比较重要的功能,不要放过,我们继续学习:
- reloader
- debug
- 配合SQLAlchemy操作数据库
reloader
reloader演示
reloader是调试程序时非常实用的功能,开发的时候不用手动重启服务,修改代码后会自动重启服务,提高研发效率。运行示例中的Shorty服务:
# python3 shortly.py * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit) * Restarting with stat * Debugger is active! * Debugger PIN: 722-230-382 复制代码
程序启动后实际上有2个进程, 10527 的主进程和 10529 的子进程:
501 10527 7144 0 8:36上午 ttys008 0:00.23 .../Python.app/Contents/MacOS/Python shortly.py 501 10529 10527 0 8:36上午 ttys008 0:00.34 .../Python.app/Contents/MacOS/Python /Users/yoo/work/yuanmahui/python/ch20-werkzeug/shortly.py 复制代码
随便修改一下 shortly.py
代码,比如增加一个日志输出。可以发现启动日志有下面reload的信息:
... * Detected change in '/Users/yoo/work/yuanmahui/python/ch20-werkzeug/shortly.py', reloading * Restarting with stat ... 复制代码
再观查进程信息可以发现 10529 子进程已经退出,新增了 10634 子进程:
501 10527 7144 0 8:36上午 ttys008 0:00.24 .../Python.app/Contents/MacOS/Python shortly.py 501 10634 10527 0 8:38上午 ttys008 0:00.77 .../Python.app/Contents/MacOS/Python /Users/yoo/work/yuanmahui/python/ch20-werkzeug/shortly.py 复制代码
可以推测,主/子进程检测代码变动,然后子进程关闭/退出,再由主进程重新创建一个子进程。那么到底子进程是自动退出还是被主进程关闭?是主进程监听代码变化还是子进程监听的呢?我们带着这2个问题,一起看看代码实现。
reloader的实现原理
启动服务的时候,需要使用 use_reloader=True 参数启动reloader
run_simple("127.0.0.1", 5000, app, use_debugger=True, use_reloader=True) 复制代码
服务启动时候判断是独立启动,还是由reload启动:
# serving.py def run_simple(...): if not is_running_from_reloader(): ... from ._reloader import run_with_reloader as _rwr _rwr( inner, extra_files=extra_files, exclude_patterns=exclude_patterns, interval=reloader_interval, reloader_type=reloader_type, ) 复制代码
主进程和子进程的判断是通过 WERKZEUG_RUN_MAIN 的环境变量进行判断,默认情况下是没有这个环境变量:
def is_running_from_reloader() -> bool: return os.environ.get("WERKZEUG_RUN_MAIN") == "true" 复制代码
使用reloader启动的代码如下:
# _reloader.py def run_with_reloader( main_func: t.Callable[[], None], extra_files: t.Optional[t.Iterable[str]] = None, exclude_patterns: t.Optional[t.Iterable[str]] = None, interval: t.Union[int, float] = 1, reloader_type: str = "auto", ) -> None: """Run the given function in an independent Python interpreter.""" import signal signal.signal(signal.SIGTERM, lambda *args: sys.exit(0)) reloader = reloader_loops[reloader_type]( extra_files=extra_files, exclude_patterns=exclude_patterns, interval=interval ) try: if os.environ.get("WERKZEUG_RUN_MAIN") == "true": ensure_echo_on() t = threading.Thread(target=main_func, args=()) t.daemon = True # Enter the reloader to set up initial state, then start # the app thread and reloader update loop. with reloader: t.start() reloader.run() else: sys.exit(reloader.restart_with_reloader()) except KeyboardInterrupt: pass 复制代码
代码主要功能:
- 注册系统信号处理,支持使用 CTRL+C 退出
- 选择reloader的实现,默认是
stat
的实现,还有一种是watchdog
的实现。后者需要额外安装,但是效率会高一些 - 判断是否主进程,主进程则只是启动reloader ; 子进程则使用守护线程方式启动服务程序
关于stat和watchdog的区别,请看下面的官方文档:
默认stat后端只是mtime定期检查所有文件的 。这对于大多数情况来说已经足够了,但是众所周知,它会耗尽笔记本电脑的电池。
在watchdog后端使用文件系统事件,而且比stat快, 但是它需要 安装看门狗模块。实现此目的的推荐方法是添加 Werkzeug[watchdog]到您的需求文件中。
ReloaderLoop是reloader实现的基类:
class ReloaderLoop: name = "" def __init__( self, extra_files: t.Optional[t.Iterable[str]] = None, exclude_patterns: t.Optional[t.Iterable[str]] = None, # 默认1s的间隔 interval: t.Union[int, float] = 1, ) -> None: self.extra_files: t.Set[str] = {os.path.abspath(x) for x in extra_files or ()} self.exclude_patterns: t.Set[str] = set(exclude_patterns or ()) self.interval = interval def __enter__(self) -> "ReloaderLoop": """Do any setup, then run one step of the watch to populate the initial filesystem state. """ self.run_step() return self def __exit__(self, exc_type, exc_val, exc_tb): # type: ignore """Clean up any resources associated with the reloader.""" pass def run(self) -> None: """Continually run the watch step, sleeping for the configured interval after each step. """ while True: self.run_step() time.sleep(self.interval) def run_step(self) -> None: """Run one step for watching the filesystem. Called once to set up initial state, then repeatedly to update it. """ pass def restart_with_reloader(self) -> int: """Spawn a new Python interpreter with the same arguments as the current one, but running the reloader thread. """ while True: _log("info", f" * Restarting with {self.name}") args = _get_args_for_reloading() new_environ = os.environ.copy() new_environ["WERKZEUG_RUN_MAIN"] = "true" exit_code = subprocess.call(args, env=new_environ, close_fds=False) if exit_code != 3: return exit_code def trigger_reload(self, filename: str) -> None: self.log_reload(filename) sys.exit(3) def log_reload(self, filename: str) -> None: filename = os.path.abspath(filename) _log("info", f" * Detected change in {filename!r}, reloading") 复制代码
- ReloaderLoop是一个上下文装饰器,进入的时候自动调用子类的run_step方法。
- 在主进程中使用restart_with_reloader函数进行工作。这是一个无限循环,循环中使用
subprocess.call
创建一个子进程,并监听子进程的退出状态。如果退出状态为3则可以无限循环;如果不为3则会退出循环,结束主进程。 - 创建子进程时候,设置关键的 WERKZEUG_RUN_MAIN 环境变量标识。
- 子进程使用run方法持续监听代码变化。
- 如果触发reload则当前子进程退出。
StatReloaderLoop的实现比较简单,代码如下:
class StatReloaderLoop(ReloaderLoop): name = "stat" def __enter__(self) -> ReloaderLoop: self.mtimes: t.Dict[str, float] = {} return super().__enter__() def run_step(self) -> None: for name in chain(_find_stat_paths(self.extra_files, self.exclude_patterns)): try: mtime = os.stat(name).st_mtime except OSError: continue old_time = self.mtimes.get(name) if old_time is None: self.mtimes[name] = mtime continue if mtime > old_time: self.trigger_reload(name) 复制代码
- run_step中记录每个代码的时间戳,如果发现有文件的时间戳变化,则调用父类的trigger_reload
所以主进程只是负责持续创建子进程,子进程自己检测代码变化和自动退出。
debug
debug的展示
在Shortly中增加一个echo的view:
Rule("/echo", endpoint="echo"), def on_echo(self, request): raise 复制代码
访问这个view可以看到下面的异常信息, 带有完整的业务堆栈:
点击右侧的console图标,会提示输入PIN:
在console中可以进行调试:
2.0.1
版本的dubug可能有bug,需要使用os.environ["WERKZEUG_DEBUG_PIN"] = "off"
关闭pin-auth的认证,才能够正常工作
debug的实现原理
python3自带REPL的模块 code
, 看起来和python的命令行差不多,仔细对比会发现多了 (InteractiveConsole) 的输出。自己的应用程序中嵌入code,就可以实现交互式的debug功能。
# python3 -m code Python 3.8.5 (v3.8.5:580fbb018f, Jul 20 2020, 12:11:27) [Clang 6.0 (clang-600.0.57)] on darwin Type "help", "copyright", "credits" or "license" for more information. (InteractiveConsole) # <- 提示信息 >>> # python3 Python 3.8.5 (v3.8.5:580fbb018f, Jul 20 2020, 12:11:27) [Clang 6.0 (clang-600.0.57)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> 复制代码
启动服务的入口,如果有debug参数则使用DebuggedApplication来包裹业务app:
# serving.py def run_simple(...): if use_debugger: from .debug import DebuggedApplication application = DebuggedApplication(application, use_evalex) 复制代码
DebuggedApplication主要的call函数:
# debug def __call__( self, environ: "WSGIEnvironment", start_response: "StartResponse" ) -> t.Iterable[bytes]: """Dispatch the requests.""" request = Request(environ) response = self.debug_application if request.args.get("__debugger__") == "yes": # 处理debug请求 cmd = request.args.get("cmd") arg = request.args.get("f") secret = request.args.get("s") frame = self.frames.get(request.args.get("frm", type=int)) if cmd == "resource" and arg: response = self.get_resource(request, arg) # type: ignore elif cmd == "pinauth" and secret == self.secret: response = self.pin_auth(request) # type: ignore elif cmd == "printpin" and secret == self.secret: response = self.log_pin_request() # type: ignore elif ( self.evalex and cmd is not None and frame is not None and self.secret == secret and self.check_pin_trust(environ) ): response = self.execute_command(request, cmd, frame) # type: ignore elif ( self.evalex and self.console_path is not None and request.path == self.console_path ): response = self.display_console(request) # type: ignore return response(environ, start_response) 复制代码
- call接收wsgi的输入environ和start_response
- response使用debug_application处理
- 如果request上有__debugger__信息,则进行debug处理,比如显示异常堆栈,显示PIN认证以及调试指令等等
debug的重点在debug_application函数:
def debug_application( self, environ: "WSGIEnvironment", start_response: "StartResponse" ) -> t.Iterator[bytes]: """Run the application and conserve the traceback frames.""" app_iter = None try: # 正常业务 app_iter = self.app(environ, start_response) yield from app_iter if hasattr(app_iter, "close"): app_iter.close() # type: ignore except Exception: # 异常调试 if hasattr(app_iter, "close"): app_iter.close() # type: ignore traceback = get_current_traceback( skip=1, show_hidden_frames=self.show_hidden_frames, ignore_system_exceptions=True, ) for frame in traceback.frames: self.frames[frame.id] = frame self.tracebacks[traceback.id] = traceback try: start_response( "500 INTERNAL SERVER ERROR", [ ("Content-Type", "text/html; charset=utf-8"), ("X-XSS-Protection", "0"), ], ) except Exception: environ["wsgi.errors"].write( "Debugging middleware caught exception in streamed " "response at a point where response headers were already " "sent.\n" ) else: is_trusted = bool(self.check_pin_trust(environ)) yield traceback.render_full( evalex=self.evalex, evalex_trusted=is_trusted, secret=self.secret ).encode("utf-8", "replace") traceback.log(environ["wsgi.errors"]) 复制代码
- 使用
app_iter = self.app(environ, start_response)
执行业务功能 - 使用try-except捕获业务异常,对异常信息进行获取traceback
- 返回500的http状态,并且使用traceback.render_full渲染html显示异常堆栈
pin
debug可以使用web界面调试程序,这会产生安全问题。所以werkzeug的debug中引入了PIN机制,需要输入PIN验证码才可以进行调试,PIN在服务端的命令行中输出。
前端页面输入PIN后会提交PIN码:
# debugger.js function initPinBox() { document.querySelector(".pin-prompt form").addEventListener( "submit", function (event) { .... fetch( `${document.location.pathname}?__debugger__=yes&cmd=pinauth&pin=${pin}&s=${encodedSecret}` ) .then((res) => res.json()) .then(({auth, exhausted}) => { if (auth) { EVALEX_TRUSTED = true; fadeOut(document.getElementsByClassName("pin-prompt")[0]); } else { .... } }) ... }, false ); } 复制代码
这段JS代码重点是 __debugger__=yes&cmd=pinauth&pin=${pin}&s=${encodedSecret}
的URL,这个请求会直接被DebuggedApplication处理:
if request.args.get("__debugger__") == "yes": cmd = request.args.get("cmd") arg = request.args.get("f") secret = request.args.get("s") frame = self.frames.get(request.args.get("frm", type=int)) # type: ignore ... elif cmd == "pinauth" and secret == self.secret: response = self.pin_auth(request) # type: ignore ... elif ( self.evalex and cmd is not None and frame is not None and self.secret == secret and self.check_pin_trust(environ) ): response = self.execute_command(request, cmd, frame) # type: ignore 复制代码
- 如果cmd=pinauth则进行pin的验证
- 如果cmd是其它,比如dump或者对象之类,则执行对应的command, 调试信息的时候会使用
auth认证函数:
def pin_auth(self, request: Request) -> Response: """Authenticates with the pin.""" exhausted = False auth = False trust = self.check_pin_trust(request.environ) ... # Otherwise go through pin based authentication else: entered_pin = request.args["pin"] # 对比PIN if entered_pin.strip().replace("-", "") == pin.replace("-", ""): self._failed_pin_auth = 0 auth = True else: self._fail_pin_auth() rv = Response( json.dumps({"auth": auth, "exhausted": exhausted}), mimetype="application/json", ) if auth: # 设置cookie rv.set_cookie( self.pin_cookie_name, f"{int(time.time())}|{hash_pin(pin)}", httponly=True, samesite="None", ) elif bad_cookie: rv.delete_cookie(self.pin_cookie_name) return rv 复制代码
查看http请求详情,会发现认证成功后会设置一个Response-Cookie:__wzd10d9760bb71ac5d1b21e ,这样后续的debug调试都使用这个cookie。
开启PIN验证后,无法调试就是因为这个cookie在调试的时候没有附带上
Interactive
debug调试主要使用_InteractiveConsole的runsource实现:
class _InteractiveConsole(code.InteractiveInterpreter): locals: t.Dict[str, t.Any] ... def runsource(self, source: str, **kwargs: t.Any) -> str: # type: ignore source = f"{source.rstrip()}\n" ThreadedStream.push() prompt = "... " if self.more else ">>> " try: source_to_eval = "".join(self.buffer + [source]) if super().runsource(source_to_eval, "<debugger>", "single"): self.more = True self.buffer.append(source) else: self.more = False del self.buffer[:] finally: output = ThreadedStream.fetch() return prompt + escape(source) + output ... 复制代码
runsource函数比较复杂,但是核心就是对前端网页提交的字符串信息进行编译执行,并把执行的输出捕获后反馈给前端,和之前的介绍CGI实现类似。
开发的时候,可以利用debug功能协助进行调试,提高研发效率。需要注意的是,debug功能不可以用于线上环境。
配合SQLAlchemy操作数据库
数据库操作是Web程序非常重要的一环,werkzeug中操作数据可以使用sqlalchemy。
SQLAlchemy回顾
在正式介绍Werkzeug配合SQLAlchemy操作数据库操作数据库之前,我们先简单回顾一下SQLAlchemy的ORM使用:
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String from sqlalchemy.orm import sessionmaker engine = create_engine('sqlite:///:memory:', echo=True) Model = declarative_base() class User(Model): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String) fullname = Column(String) nickname = Column(String) def __repr__(self): return "<User(name='%s', fullname='%s', nickname='%s')>" % ( self.name, self.fullname, self.nickname) Model.metadata.create_all(engine) print("=" * 10) Session = sessionmaker(bind=engine) session = Session() ed_user = User(name='ed', fullname='Ed Jones', nickname='edsnickname') session.add(ed_user) session.commit() print(ed_user.id) result = engine.execute("select * from users") for row in result: print(row) 复制代码
- 创建engine,用于数据库连接
- 创建Model
- 创建User模型
- 将metadata提交到engine(创建表)
- 创建session
- 使用users插入数据:创建对象,add到session,然后提交session
- ...
SQLAlchemy配合使用
示例shorty中演示了如何使用SQLAlchemy操作SQLite数据库。使用方法是先使用 initdb
初始化sqlite数据库,然后再使用 runserver
启动服务:
python3 manage-shorty.py initdb python3 manage-shorty.py runserver 复制代码
先看看View中如何通过ORM操作数据,这是插入数据:
uid = URL(url, "private" not in request.form, alias).uid session.commit() 复制代码
这是查询数据:
url = URL.query.get(uid) 复制代码
- 创建数据后使用session.commit后就提交
- 查询数据使用模型的query
URL的数据模型是这样定义的:
url_table = Table( "urls", metadata, Column("uid", String(140), primary_key=True), Column("target", String(500)), Column("added", DateTime), Column("public", Boolean), ) class URL: query = session.query_property() def __init__(self, target, public=True, uid=None, added=None): self.target = target self.public = public self.added = added or datetime.utcnow() if not uid: while 1: uid = get_random_uid() if not URL.query.get(uid): break self.uid = uid session.add(self) @property def short_url(self): return url_for("link", uid=self.uid, _external=True) def __repr__(self): return f"<URL {self.uid!r}>" mapper(URL, url_table) 复制代码
- 注意URL模型的init方法,创建对象时候自动生成uid,并且添加到session,然后view使用commit就提交数据。这种使用方式和回顾里一致。
- URL中定义了一个query,可以用来检索数据,这和django中Model的object类似。
database_engine在创建创建App时候创建:
class Shorty: def __init__(self, db_uri): local.application = self self.database_engine = create_engine(db_uri, convert_unicode=True) self.dispatch = SharedDataMiddleware(self.dispatch, {"/static": STATIC_PATH}) def init_database(self): # 等同 Model.metadata.create_all(engine) metadata.create_all(self.database_engine) 复制代码
最关键的地方是session:
local = Local() local_manager = LocalManager([local]) # local.application = self application = local("application") ... session = scoped_session( lambda: create_session( application.database_engine, autocommit=False, autoflush=False ) ) 复制代码
简单的说这里scoped_session是绑定到线程的,跟随请求的生命周期。这样在请求中可以使用session访问数据。
小结
本章我们学习了web框架如何使用reload和debug协助研发,提高研发效率。reload主要是使用了subprocess开启多个python进程,debug则是使用code的REPL功能。
werkzeug也提供了使用sqlalchemy操作数据库的示例Shorty,使用ORM功能可以快速编写适配多种数据存储引擎的程序。
参考链接
- Contextual/Thread-local Sessions docs.sqlalchemy.org/en/13/orm/c…
- stackoverflow.com/questions/6…