gunicorn “Green Unicorn”,脱胎于ruby社区的Unicorn,是一个 WSGI HTTP Server。学习gunicorn后,我们可以把之前的 Bottle 程序正式部署起来。老规矩,本文分下面几个部分:
- gunicorn 项目结构简介
- gunicorn 使用
- gunicorn-application 实现
- arbiter实现
- sync-worker实现
- 小结
- 小技巧
gunicorn 项目结构简介
gunicorn 源码选择的版本是 20.0.0
,主要的文件及包如下:
文件 | 描述 |
app包 | guincorn 的 Application (不是wsgi定义的applicaton) |
http包 | gunicorn 对 http协议的一些处理 |
workers包 | gunicorn 的工作类实现 ,包括同步sync实现,线程池版本实现gthread,以及异步版本实现 geventlet,gevent等 |
arbiter.py | guicorn 的master实现 |
gunicorn的设计特点:
Gunicorn is based on the pre-fork worker model. This means that there is a central master process that manages a set of worker processes. The master never knows anything about individual clients. All requests and responses are handled completely by worker processes.
gunicorn使用pre-fork 工作模型,也就是master提前fork出预定数量的work,管理worker集合。所有的request和response都由worker进程处理。
我们重点放在:gunicorn的服务实现,master-worker如何实现和协作上。
gunicorn 使用
编写测试app,可以看到这是一个符合wsgi规范的application:
# myapp.py def app(environ, start_response): # env 和 http 状态及头设定回调 data = b"Hello, World!\n" start_response("200 OK", [ ("Content-Type", "text/plain"), ("Content-Length", str(len(data))) ]) return iter([data]) # 返回数据
使用4个work节点,日志级别debug的方式启动服务,加载 myapp:app
# gunicorn -w 4 --log-level debug myapp:app [2021-02-23 17:58:57 +0800] [50258] [DEBUG] Current configuration: # 准备配置 ... [2021-02-23 18:01:12 +0800] [50462] [INFO] Starting gunicorn 20.0.0 # 启动gunicorn [2021-02-23 18:01:12 +0800] [50462] [DEBUG] Arbiter booted # 启动master [2021-02-23 18:01:12 +0800] [50462] [INFO] Listening at: http://127.0.0.1:8000 (50462) # 监听端口 [2021-02-23 18:01:12 +0800] [50462] [INFO] Using worker: sync [2021-02-23 18:01:12 +0800] [50464] [INFO] Booting worker with pid: 50464 # 启动worker [2021-02-23 18:01:12 +0800] [50465] [INFO] Booting worker with pid: 50465 [2021-02-23 18:01:12 +0800] [50466] [INFO] Booting worker with pid: 50466 [2021-02-23 18:01:12 +0800] [50467] [INFO] Booting worker with pid: 50467 [2021-02-23 18:01:12 +0800] [50462] [DEBUG] 4 workers
使用 curl
测试服务
# curl http://127.0.0.1:8000 Hello, World!
同时gunicorn中可以看到 worker=50465 处理了这个http请求
[2021-02-24 16:09:39 +0800] [50465] [DEBUG] GET /
运行时候,还可以通过发送信号,手动扩充work节点数
# kill -TTIN 50462
观察服务日志,会发现 master=50462 进程处理了 ttin
信号,并且扩展worker节点数到5
... [2021-02-24 18:02:56 +0800] [50462] [INFO] Handling signal: ttin [2021-02-24 18:02:56 +0800] [75918] [INFO] Booting worker with pid: 75918 [2021-02-24 18:02:56 +0800] [50462] [DEBUG] 5 workers
使用 Ctrl+C
关闭服务,可以看到也是 master=50462 进程处理了 int
信号,并且在关闭worker节点后关闭自己
^C[2021-02-25 15:06:54 +0800] [50462] [INFO] Handling signal: int [2021-02-25 15:06:54 +0800] [50464] [INFO] Worker exiting (pid: 50464) [2021-02-25 15:06:54 +0800] [50465] [INFO] Worker exiting (pid: 50465) [2021-02-25 15:06:54 +0800] [50466] [INFO] Worker exiting (pid: 50466) [2021-02-25 15:06:54 +0800] [50467] [INFO] Worker exiting (pid: 50467) [2021-02-25 15:06:54 +0800] [75918] [INFO] Worker exiting (pid: 75918) [2021-02-25 15:06:54 +0800] [50462] [INFO] Shutting down: Master
如果对gunicon的参数不了解,可以使用下面命令查看帮助
# gunicorn -h usage: gunicorn [OPTIONS] [APP_MODULE] optional arguments: -h, --help show this help message and exit ... -w INT, --workers INT The number of worker processes for handling requests. [1]
帮助使用我们熟悉的 argparse 实现。
class Setting(object): def add_option(self, parser): args = tuple(self.cli) help_txt = "%s [%s]" % (self.short, self.default) help_txt = help_txt.replace("%", "%%") kwargs = { "dest": self.name, "action": self.action or "store", "type": self.type or str, "default": None, "help": help_txt } ... parser.add_argument(*args, **kwargs) # 添加选项 class Workers(Setting): # --workers 的选项类 name = "workers" section = "Worker Processes" cli = ["-w", "--workers"] meta = "INT" validator = validate_pos_int type = int default = int(os.environ.get("WEB_CONCURRENCY", 1)) desc = """\ The number of worker processes for handling requests. A positive integer generally in the ``2-4 x $(NUM_CORES)`` range. You'll want to vary this a bit to find the best for your particular application's work load. By default, the value of the ``WEB_CONCURRENCY`` environment variable. If it is not defined, the default is ``1``. """ def parser(self): kwargs = { "usage": self.usage, "prog": self.prog } parser = argparse.ArgumentParser(**kwargs) parser.add_argument("-v", "--version", action="version", default=argparse.SUPPRESS, version="%(prog)s (version " + __version__ + ")\n", help="show program's version number and exit") parser.add_argument("args", nargs="*", help=argparse.SUPPRESS) keys = sorted(self.settings, key=self.settings.__getitem__) # 动态添加参数选项 for k in keys: self.settings[k].add_option(parser) return parser
gunicorn-application 实现
gunicorn的application主要是下面三个类实现。需要注意的是这里的application可以理解为web-server的application;bottle/flask/django等实现的是web-framework的applicaiton。前者动态加载后者,前者处理http服务,后者处理单次的http请求。
- BaseApplication
- Application
- WSGIApplication
3个Application梳理后,大概的代码模版如下:
class WSGIApplication(Application) def __init__(self, usage=None, prog=None): self.do_load_config() # 加载配置 def do_load_config(): ... cfg = self.init(parser, args, args.args) # 初始化配置 ... def init(...): ... self.app_uri = args[0] # 获取wsgi-application参数 def load(...): util.import_app(self.app_uri) # 动态加载wsgi-application ... def run(...): self.load() Arbiter(self).run() # 启动master,也就是Arbiter def run(): # 运行服务 """\ The ``gunicorn`` command line runner for launching Gunicorn with generic WSGI applications. """ from gunicorn.app.wsgiapp import WSGIApplication WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run() if __name__ == '__main__': run()
application部分的实现,相对比较简单,就不再赘述。
arbiter实现
Arbiter 仲裁者,事实上的master进程核心,整理后代码模版如下:
class Arbiter(object): def __init__(self, app): self.worker_class = self.cfg.worker_class # worker类 self.num_workers = self.cfg.worker # worker数量 ... def start(): self.init_signals() # 初始化信号监听 ... sock.create_socket(...) # 创建socket服务 def run(self): self.start() try: self.manage_workers() # 启动节点 while True: # 无限循环 ... sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None if sig is None: self.sleep() # 持续休眠 self.murder_workers() self.manage_workers() continue if sig not in self.SIG_NAMES: self.log.info("Ignoring unknown signal: %s", sig) continue # 处理信号 signame = self.SIG_NAMES.get(sig) handler = getattr(self, "handle_%s" % signame, None) ... handler() self.wakeup() # 唤醒 except (StopIteration, KeyboardInterrupt): ...
在了解Arbiter工作前先了解一下信号, linux 系统可以使用下面命令查看信号清单
# kill -l 1) SIGHUP 2) SIGINT 3) SIGQUIT 4) SIGILL 5) SIGTRAP 6) SIGABRT 7) SIGBUS 8) SIGFPE 9) SIGKILL 10) SIGUSR1 11) SIGSEGV 12) SIGUSR2 13) SIGPIPE 14) SIGALRM 15) SIGTERM ...
- 1 (SIGHUP): terminate a connection, or reload the configuration for daemons
- 2 (SIGINT): interrupt the session from the dialogue station
- 3 (SIGQUIT): terminate the session from the dialogue station
- 4 (SIGILL): illegal instruction was executed
- 5 (SIGTRAP): do a single instruction (trap)
- 6 (SIGABRT): abnormal termination
- 7 (SIGBUS): error on the system bus
- 8 (SIGFPE): floating point error
- 9 (SIGKILL): immmediately terminate the process
- 10 (SIGUSR1): user-defined signal
- 11 (SIGSEGV): segmentation fault due to illegal access of a memory segment
- 12 (SIGUSR2): user-defined signal
- 13 (SIGPIPE): writing into a pipe, and nobody is reading from it
- 14 (SIGALRM): the timer terminated (alarm)
- 15 (SIGTERM): terminate the process in a soft way
信号是操作系统提供的事件,可以用来进行跨进程的通信。Arbiter.init_signals 做的工作如下:
SIGNALS = [getattr(signal, "SIG%s" % x) for x in "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()] def init_signals(self): ... # initialize all signals for s in self.SIGNALS: signal.signal(s, self.signal) signal.signal(signal.SIGCHLD, self.handle_chld) # 添加信号监听器 def signal(self, sig, frame): if len(self.SIG_QUEUE) < 5: self.SIG_QUEUE.append(sig) self.wakeup()
之前演示的扩容信号 TTIN
是这样处理的 :
def handle_ttin(self): """\ SIGTTIN handling. Increases the number of workers by one. """ self.num_workers += 1 # 扩容 self.manage_workers() # 管理worker
Arbiter的sleep和warkeup是这样实现的:
self.PIPE = pair = os.pipe() # 创建管道 def sleep(self): """\ Sleep until PIPE is readable or we timeout. A readable PIPE means a signal occurred. """ try: ready = select.select([self.PIPE[0]], [], [], 1.0) # 使用select监听管道的数据变化 if not ready[0]: return while os.read(self.PIPE[0], 1): # 读取管道数据 pass except (select.error, OSError) as e: ... def wakeup(self): """\ Wake up the arbiter by writing to the PIPE """ try: os.write(self.PIPE[1], b'.') # 管道写入 except IOError as e: ...
需要说明的是Arbiter通过 sock.create_sockets
创建了socket,并绑定端口和监听,然后在fork-worker的时候,将socket传递给了子进程。
worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS, self.app, self.timeout / 2.0, self.cfg, self.log) self.cfg.pre_fork(self, worker) pid = os.fork() if pid != 0: worker.pid = pid # 记录worker的pid self.WORKERS[pid] = worker # 添加到worker集合 return pid
销毁worker是使用信号:
def kill_workers(self, sig): """\ Kill all workers with the signal `sig` :attr sig: `signal.SIG*` value """ worker_pids = list(self.WORKERS.keys()) for pid in worker_pids: os.kill(pid, sig)
sync-worker实现
接下来,我们看看worker,主要是sync-worker的实现。worker的关系主要如下:
- Worker 处理信号
- SyncWorker 同步处理http请求
- ThreadWorker 使用线程处理http请求
接之前Arbiter中fork-worker的代码,创建完成的work进入 init_process
# Process Child worker.pid = os.getpid() try: util._setproctitle("worker [%s]" % self.proc_name) self.log.info("Booting worker with pid: %s", worker.pid) self.cfg.post_fork(self, worker) worker.init_process() sys.exit(0)
work的init_process模版如下:
def init_process(self): """\ If you override this method in a subclass, the last statement in the function should be to call this method with super().init_process() so that the ``run()`` loop is initiated. """ # For waking ourselves up self.PIPE = os.pipe() # 创建管道 ... self.wait_fds = self.sockets + [self.PIPE[0]] # 监听管道和socket ... self.init_signals() # 初始化信号监听 ... self.load_wsgi() # 加载wsgi的应用 ... # Enter main run loop self.booted = True self.run() # 工作循环
work一样的进行信号监听:
SIGNALS = [getattr(signal, "SIG%s" % x) for x in "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()] def init_signals(self): # reset signaling for s in self.SIGNALS: signal.signal(s, signal.SIG_DFL) # init new signaling signal.signal(signal.SIGQUIT, self.handle_quit) signal.signal(signal.SIGTERM, self.handle_exit) signal.signal(signal.SIGINT, self.handle_quit) ... if hasattr(signal, 'set_wakeup_fd'): signal.set_wakeup_fd(self.PIPE[1]) # 等待select唤醒
work最重要的run循环:
def run(self, timeout): listener = self.sockets[0] while self.alive: ... # Accept a connection. If we get an error telling us # that no connection is waiting we fall down to the # select which is where we'll wait for a bit for new # workers to come give us some love. try: self.accept(listener) # 接受客户端链接 # Keep processing clients until no one is waiting. This # prevents the need to select() for every client that we # process. continue except EnvironmentError as e: ... try: self.wait(timeout) # 休眠等待 except StopWaiting: return
处理客户端连接,这一部分和之前介绍http比较类似,也不再赘述。
def accept(self, listener): client, addr = listener.accept() client.setblocking(1) util.close_on_exec(client) self.handle(listener, client, addr)
work处理完成请求后进入等待
def wait(self, timeout): try: ret = select.select(self.wait_fds, [], [], timeout) if ret[0]: if self.PIPE[0] in ret[0]: os.read(self.PIPE[0], 1) return ret[0] except select.error as e: if e.args[0] == errno.EINTR: return self.sockets if e.args[0] == errno.EBADF: if self.nr < 0: return self.sockets else: raise StopWaiting raise
小结
可以用下面一张图展示gunicorn的工作流程,作为我们的小结论
小技巧
可以使用thread,实现一个定时器
# reloader.py class Reloader(threading.Thread): def __init__(self, extra_files=None, interval=1, callback=None): super().__init__() self.setDaemon(True) self._interval = interval self._callback = callback def run(self): mtimes = {} while True: for filename in self.get_files(): try: mtime = os.stat(filename).st_mtime except OSError: continue old_time = mtimes.get(filename) if old_time is None: mtimes[filename] = mtime continue elif mtime > old_time: if self._callback: self._callback(filename) time.sleep(self._interval)
在使用 gunicorn myapp:app
命令的时候, myapp:app 没有静态的 import ,而是这样动态加载的:
# util.py klass = components.pop(-1) mod = importlib.import_module('.'.join(components)) return getattr(mod, klass)
参考链接
- Gunicorn Design docs.gunicorn.org/en/stable/d…
- 阅读gunicorn 代码文档 gunicorn.readthedocs.io/en/latest/i…
- Handling Unix Signals in Python stackabuse.com/handling-un…
- How To Use Signal Driven Programming In Applications medium.com/fintechexpl…
- Django with Nginx, Gunicorn medium.com/analytics-v…