Gunicorn 源码阅读

简介: gunicorn “Green Unicorn”,脱胎于ruby社区的Unicorn,是一个 WSGI HTTP Server。学习gunicorn后,我们可以把之前的 Bottle 程序正式部署起来。

gunicorn “Green Unicorn”,脱胎于ruby社区的Unicorn,是一个 WSGI  HTTP Server。学习gunicorn后,我们可以把之前的 Bottle 程序正式部署起来。老规矩,本文分下面几个部分:


  • gunicorn 项目结构简介
  • gunicorn 使用
  • gunicorn-application 实现
  • arbiter实现
  • sync-worker实现
  • 小结
  • 小技巧


gunicorn 项目结构简介



gunicorn 源码选择的版本是 20.0.0,主要的文件及包如下:


文件 描述
app包 guincorn 的 Application (不是wsgi定义的applicaton)
http包 gunicorn 对 http协议的一些处理
workers包 gunicorn 的工作类实现 ,包括同步sync实现,线程池版本实现gthread,以及异步版本实现 geventlet,gevent等
arbiter.py guicorn 的master实现


gunicorn的设计特点:


Gunicorn is based on the pre-fork worker model. This means that there is a central master process that manages a set of worker processes. The master never knows anything about individual clients. All requests and responses are handled completely by worker processes.


gunicorn使用pre-fork 工作模型,也就是master提前fork出预定数量的work,管理worker集合。所有的request和response都由worker进程处理。


我们重点放在:gunicorn的服务实现,master-worker如何实现和协作上。


gunicorn 使用



编写测试app,可以看到这是一个符合wsgi规范的application:


# myapp.py
def app(environ, start_response):  # env 和 http 状态及头设定回调
    data = b"Hello, World!\n"
    start_response("200 OK", [
            ("Content-Type", "text/plain"),
            ("Content-Length", str(len(data)))
    ])
    return iter([data])  # 返回数据


使用4个work节点,日志级别debug的方式启动服务,加载 myapp:app


# gunicorn -w 4 --log-level debug  myapp:app
[2021-02-23 17:58:57 +0800] [50258] [DEBUG] Current configuration:  # 准备配置
...
[2021-02-23 18:01:12 +0800] [50462] [INFO] Starting gunicorn 20.0.0  # 启动gunicorn
[2021-02-23 18:01:12 +0800] [50462] [DEBUG] Arbiter booted  # 启动master
[2021-02-23 18:01:12 +0800] [50462] [INFO] Listening at: http://127.0.0.1:8000 (50462)  # 监听端口
[2021-02-23 18:01:12 +0800] [50462] [INFO] Using worker: sync
[2021-02-23 18:01:12 +0800] [50464] [INFO] Booting worker with pid: 50464 # 启动worker
[2021-02-23 18:01:12 +0800] [50465] [INFO] Booting worker with pid: 50465
[2021-02-23 18:01:12 +0800] [50466] [INFO] Booting worker with pid: 50466
[2021-02-23 18:01:12 +0800] [50467] [INFO] Booting worker with pid: 50467
[2021-02-23 18:01:12 +0800] [50462] [DEBUG] 4 workers


使用 curl 测试服务


# curl http://127.0.0.1:8000
Hello, World!


同时gunicorn中可以看到 worker=50465 处理了这个http请求


[2021-02-24 16:09:39 +0800] [50465] [DEBUG] GET /


运行时候,还可以通过发送信号,手动扩充work节点数


# kill -TTIN 50462


观察服务日志,会发现 master=50462 进程处理了 ttin 信号,并且扩展worker节点数到5


...
[2021-02-24 18:02:56 +0800] [50462] [INFO] Handling signal: ttin
[2021-02-24 18:02:56 +0800] [75918] [INFO] Booting worker with pid: 75918
[2021-02-24 18:02:56 +0800] [50462] [DEBUG] 5 workers


使用 Ctrl+C  关闭服务,可以看到也是 master=50462 进程处理了 int 信号,并且在关闭worker节点后关闭自己


^C[2021-02-25 15:06:54 +0800] [50462] [INFO] Handling signal: int
[2021-02-25 15:06:54 +0800] [50464] [INFO] Worker exiting (pid: 50464)
[2021-02-25 15:06:54 +0800] [50465] [INFO] Worker exiting (pid: 50465)
[2021-02-25 15:06:54 +0800] [50466] [INFO] Worker exiting (pid: 50466)
[2021-02-25 15:06:54 +0800] [50467] [INFO] Worker exiting (pid: 50467)
[2021-02-25 15:06:54 +0800] [75918] [INFO] Worker exiting (pid: 75918)
[2021-02-25 15:06:54 +0800] [50462] [INFO] Shutting down: Master


如果对gunicon的参数不了解,可以使用下面命令查看帮助


# gunicorn -h
usage: gunicorn [OPTIONS] [APP_MODULE]
optional arguments:
  -h, --help            show this help message and exit
  ...
  -w INT, --workers INT
                        The number of worker processes for handling requests. [1]


帮助使用我们熟悉的 argparse 实现。


class Setting(object):
  def add_option(self, parser):
        args = tuple(self.cli)
        help_txt = "%s [%s]" % (self.short, self.default)
        help_txt = help_txt.replace("%", "%%")
        kwargs = {
            "dest": self.name,
            "action": self.action or "store",
            "type": self.type or str,
            "default": None,
            "help": help_txt
        }
        ...
        parser.add_argument(*args, **kwargs)  # 添加选项
class Workers(Setting):  # --workers 的选项类
    name = "workers"
    section = "Worker Processes"
    cli = ["-w", "--workers"]
    meta = "INT"
    validator = validate_pos_int
    type = int
    default = int(os.environ.get("WEB_CONCURRENCY", 1))
    desc = """\
        The number of worker processes for handling requests.
        A positive integer generally in the ``2-4 x $(NUM_CORES)`` range.
        You'll want to vary this a bit to find the best for your particular
        application's work load.
        By default, the value of the ``WEB_CONCURRENCY`` environment variable.
        If it is not defined, the default is ``1``.
        """
def parser(self):
    kwargs = {
        "usage": self.usage,
        "prog": self.prog
    }
    parser = argparse.ArgumentParser(**kwargs)
    parser.add_argument("-v", "--version",
            action="version", default=argparse.SUPPRESS,
            version="%(prog)s (version " + __version__ + ")\n",
            help="show program's version number and exit")
    parser.add_argument("args", nargs="*", help=argparse.SUPPRESS)
    keys = sorted(self.settings, key=self.settings.__getitem__)  # 动态添加参数选项
    for k in keys:
        self.settings[k].add_option(parser)
    return parser


gunicorn-application 实现



gunicorn的application主要是下面三个类实现。需要注意的是这里的application可以理解为web-server的application;bottle/flask/django等实现的是web-framework的applicaiton。前者动态加载后者,前者处理http服务,后者处理单次的http请求。


  • BaseApplication
  • Application
  • WSGIApplication


3个Application梳理后,大概的代码模版如下:


class WSGIApplication(Application)
  def __init__(self, usage=None, prog=None):
    self.do_load_config()  # 加载配置
  def do_load_config():
    ...
    cfg = self.init(parser, args, args.args)  # 初始化配置
    ...
  def init(...):
      ...
      self.app_uri = args[0]  # 获取wsgi-application参数
  def load(...):
      util.import_app(self.app_uri)  # 动态加载wsgi-application
      ...
  def run(...):
    self.load()
    Arbiter(self).run()  # 启动master,也就是Arbiter
def run():  # 运行服务
    """\
    The ``gunicorn`` command line runner for launching Gunicorn with
    generic WSGI applications.
    """
    from gunicorn.app.wsgiapp import WSGIApplication
    WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run()
if __name__ == '__main__':
    run()


application部分的实现,相对比较简单,就不再赘述。


arbiter实现



Arbiter 仲裁者,事实上的master进程核心,整理后代码模版如下:


class Arbiter(object):
  def __init__(self, app):
    self.worker_class = self.cfg.worker_class  # worker类
    self.num_workers = self.cfg.worker  # worker数量
        ...
    def start():
    self.init_signals()  # 初始化信号监听
    ...
    sock.create_socket(...) # 创建socket服务
    def run(self):  
    self.start()
    try:
            self.manage_workers() # 启动节点
            while True: #  无限循环
                ...
                sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
                if sig is None:
                    self.sleep()  # 持续休眠
                    self.murder_workers()
                    self.manage_workers()
                    continue
                if sig not in self.SIG_NAMES:
                    self.log.info("Ignoring unknown signal: %s", sig)
                    continue
                  # 处理信号
                signame = self.SIG_NAMES.get(sig)
                handler = getattr(self, "handle_%s" % signame, None)
                ...
                handler()
                self.wakeup()  # 唤醒
        except (StopIteration, KeyboardInterrupt):
           ...


在了解Arbiter工作前先了解一下信号,  linux 系统可以使用下面命令查看信号清单


# kill -l
 1) SIGHUP   2) SIGINT   3) SIGQUIT  4) SIGILL   5) SIGTRAP
 6) SIGABRT  7) SIGBUS   8) SIGFPE   9) SIGKILL 10) SIGUSR1
 11) SIGSEGV  12) SIGUSR2 13) SIGPIPE 14) SIGALRM 15) SIGTERM
...


  • 1 (SIGHUP): terminate a connection, or reload the configuration for daemons
  • 2 (SIGINT): interrupt the session from the dialogue station
  • 3 (SIGQUIT): terminate the session from the dialogue station
  • 4 (SIGILL): illegal instruction was executed
  • 5 (SIGTRAP): do a single instruction (trap)
  • 6 (SIGABRT): abnormal termination
  • 7 (SIGBUS): error on the system bus
  • 8 (SIGFPE): floating point error
  • 9 (SIGKILL): immmediately terminate the process
  • 10 (SIGUSR1): user-defined signal
  • 11 (SIGSEGV): segmentation fault due to illegal access of a memory segment
  • 12 (SIGUSR2): user-defined signal
  • 13 (SIGPIPE): writing into a pipe, and nobody is reading from it
  • 14 (SIGALRM): the timer terminated (alarm)
  • 15 (SIGTERM): terminate the process in a soft way


信号是操作系统提供的事件,可以用来进行跨进程的通信。Arbiter.init_signals 做的工作如下:


SIGNALS = [getattr(signal, "SIG%s" % x)
               for x in "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()]
def init_signals(self):
    ...
    # initialize all signals
    for s in self.SIGNALS:
        signal.signal(s, self.signal)
    signal.signal(signal.SIGCHLD, self.handle_chld)  # 添加信号监听器
def signal(self, sig, frame):
  if len(self.SIG_QUEUE) < 5:
      self.SIG_QUEUE.append(sig)
      self.wakeup()


之前演示的扩容信号 TTIN 是这样处理的 :


def handle_ttin(self):
    """\
    SIGTTIN handling.
    Increases the number of workers by one.
    """
    self.num_workers += 1  # 扩容 
    self.manage_workers()  # 管理worker 


Arbiter的sleep和warkeup是这样实现的:


self.PIPE = pair = os.pipe()  # 创建管道
def sleep(self):
  """\
    Sleep until PIPE is readable or we timeout.
    A readable PIPE means a signal occurred.
    """
    try:
        ready = select.select([self.PIPE[0]], [], [], 1.0)  # 使用select监听管道的数据变化
        if not ready[0]:
            return
        while os.read(self.PIPE[0], 1):  # 读取管道数据
            pass
    except (select.error, OSError) as e:
        ...
def wakeup(self):
    """\
    Wake up the arbiter by writing to the PIPE
    """
    try:
        os.write(self.PIPE[1], b'.')  # 管道写入
    except IOError as e:
        ...


需要说明的是Arbiter通过 sock.create_sockets 创建了socket,并绑定端口和监听,然后在fork-worker的时候,将socket传递给了子进程。


worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
                                   self.app, self.timeout / 2.0,
                                   self.cfg, self.log)
self.cfg.pre_fork(self, worker)
pid = os.fork()
if pid != 0:
    worker.pid = pid  # 记录worker的pid
    self.WORKERS[pid] = worker # 添加到worker集合
    return pid


销毁worker是使用信号:


def kill_workers(self, sig):
    """\
    Kill all workers with the signal `sig`
    :attr sig: `signal.SIG*` value
    """
    worker_pids = list(self.WORKERS.keys())
    for pid in worker_pids:
        os.kill(pid, sig)


sync-worker实现



接下来,我们看看worker,主要是sync-worker的实现。worker的关系主要如下:


  • Worker 处理信号
  • SyncWorker 同步处理http请求
  • ThreadWorker 使用线程处理http请求


接之前Arbiter中fork-worker的代码,创建完成的work进入 init_process


# Process Child
worker.pid = os.getpid()
try:
    util._setproctitle("worker [%s]" % self.proc_name)
    self.log.info("Booting worker with pid: %s", worker.pid)
    self.cfg.post_fork(self, worker)
    worker.init_process()
    sys.exit(0)


work的init_process模版如下:


def init_process(self):
    """\
    If you override this method in a subclass, the last statement
    in the function should be to call this method with
    super().init_process() so that the ``run()`` loop is initiated.
    """
    # For waking ourselves up
    self.PIPE = os.pipe()  # 创建管道
    ...
    self.wait_fds = self.sockets + [self.PIPE[0]]  # 监听管道和socket
      ...
    self.init_signals()  # 初始化信号监听
      ...
    self.load_wsgi()  # 加载wsgi的应用
    ...
    # Enter main run loop
    self.booted = True
    self.run()  # 工作循环


work一样的进行信号监听:


SIGNALS = [getattr(signal, "SIG%s" % x)
            for x in "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()]
def init_signals(self):
    # reset signaling
    for s in self.SIGNALS:
        signal.signal(s, signal.SIG_DFL)
    # init new signaling
    signal.signal(signal.SIGQUIT, self.handle_quit)
    signal.signal(signal.SIGTERM, self.handle_exit)
    signal.signal(signal.SIGINT, self.handle_quit)
    ...
    if hasattr(signal, 'set_wakeup_fd'):
        signal.set_wakeup_fd(self.PIPE[1])  # 等待select唤醒


work最重要的run循环:


def run(self, timeout):
    listener = self.sockets[0]
    while self.alive:
          ...
        # Accept a connection. If we get an error telling us
        # that no connection is waiting we fall down to the
        # select which is where we'll wait for a bit for new
        # workers to come give us some love.
        try:
            self.accept(listener)  # 接受客户端链接
            # Keep processing clients until no one is waiting. This
            # prevents the need to select() for every client that we
            # process.
            continue
        except EnvironmentError as e:
              ...
        try:
            self.wait(timeout)  # 休眠等待 
        except StopWaiting:
            return


处理客户端连接,这一部分和之前介绍http比较类似,也不再赘述。


def accept(self, listener):
    client, addr = listener.accept()
    client.setblocking(1)
    util.close_on_exec(client)
    self.handle(listener, client, addr)


work处理完成请求后进入等待


def wait(self, timeout):
    try:
        ret = select.select(self.wait_fds, [], [], timeout)
        if ret[0]:
            if self.PIPE[0] in ret[0]:
                os.read(self.PIPE[0], 1)
            return ret[0]
    except select.error as e:
        if e.args[0] == errno.EINTR:
            return self.sockets
        if e.args[0] == errno.EBADF:
            if self.nr < 0:
                return self.sockets
            else:
                raise StopWaiting
        raise


小结



可以用下面一张图展示gunicorn的工作流程,作为我们的小结论


小技巧



可以使用thread,实现一个定时器


# reloader.py
class Reloader(threading.Thread):
    def __init__(self, extra_files=None, interval=1, callback=None):
        super().__init__()
        self.setDaemon(True)
        self._interval = interval
        self._callback = callback
    def run(self):
        mtimes = {}
        while True:
            for filename in self.get_files():
                try:
                    mtime = os.stat(filename).st_mtime
                except OSError:
                    continue
                old_time = mtimes.get(filename)
                if old_time is None:
                    mtimes[filename] = mtime
                    continue
                elif mtime > old_time:
                    if self._callback:
                        self._callback(filename)
            time.sleep(self._interval)


在使用 gunicorn myapp:app  命令的时候, myapp:app 没有静态的 import ,而是这样动态加载的:


# util.py
klass = components.pop(-1)
mod = importlib.import_module('.'.join(components))
return getattr(mod, klass)


参考链接




目录
相关文章
|
微服务
Sanic教程: 2.配置
Sanic教程: 2.配置
|
Unix Linux Python
03 Tornado - 入门程序
03 Tornado - 入门程序
72 0
|
数据采集 存储 安全
爬虫框架Playwright在Java环境下的开发实践
爬虫框架Playwright在Java环境下的开发实践
384 0
|
负载均衡 Unix 应用服务中间件
知其所以然:flask + uwsgi不要nginx,应该怎么写配置文件?
知其所以然:flask + uwsgi不要nginx,应该怎么写配置文件?
174 0
|
SEO
easyswoole项目示例
easyswoole项目示例
126 0
|
网络协议 Java API
Py之gevent:gevent的简介、安装、使用方法之详细攻略
Py之gevent:gevent的简介、安装、使用方法之详细攻略
Py之gevent:gevent的简介、安装、使用方法之详细攻略
|
安全 Java Linux
手把手教你搭个Frida + Sekiro Rpc框架
手把手教你搭个Frida + Sekiro Rpc框架
手把手教你搭个Frida + Sekiro Rpc框架
|
安全 中间件 API
werkzeug源码阅读-上
Werkzeug是一个全面的WSGI Web应用程序库。它最初是WSGI实用程序各种工具的简单集合,现已成为最高级的WSGI实用程序库之一,是Flask背后的项目。
297 0
werkzeug源码阅读-上
|
存储 算法 前端开发
werkzeug源码阅读-下
Werkzeug是一个全面的WSGI Web应用程序库。它最初是WSGI实用程序各种工具的简单集合,现已成为最高级的WSGI实用程序库之一,是Flask背后的项目。
363 0
werkzeug源码阅读-下
|
存储 前端开发 JavaScript
werkzeug源码阅读-完结篇
Werkzeug是一个全面的WSGI Web应用程序库。它最初是WSGI实用程序各种工具的简单集合,现已成为最高级的WSGI实用程序库之一,是Flask背后的项目。
523 0
werkzeug源码阅读-完结篇