开源API网关APISIX源码分析(二)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
云原生 API 网关,700元额度,多规格可选
简介: 开源API网关APISIX源码分析(二)

使用与代码对照分析

流量控制之limit-conn

代码实现

  • 初始化
local plugin_name = "limit-conn"
local schema = {
    type = "object",
    properties = {
        conn = {type = "integer", exclusiveMinimum = 0},
        burst = {type = "integer",  minimum = 0},
        default_conn_delay = {type = "number", exclusiveMinimum = 0},
        only_use_default_delay = {type = "boolean", default = false},
        key = {type = "string"},
        key_type = {type = "string",
            enum = {"var", "var_combination"},
            default = "var",
        },
        rejected_code = {
            type = "integer", minimum = 200, maximum = 599, default = 503
        },
        rejected_msg = {
            type = "string", minLength = 1
        },
        allow_degradation = {type = "boolean", default = false}
    },
    required = {"conn", "burst", "default_conn_delay", "key"}
}

local _M = {
    version = 0.1,
    priority = 1003,
    name = plugin_name,
    schema = schema,
}

优先级是1003,包含连接数,爆破数,默认连接延迟,是否只使用默认延迟,key,关键词类型,拒绝响应码,拒绝信息。必要的信息是连接数,爆破树,默认链接延迟,key。

  • 检查配置是否符合
function _M.check_schema(conf)
    return core.schema.check(schema, conf)
end
  • access方法
function _M.increase(conf, ctx)
    core.log.info("ver: ", ctx.conf_version)
    local lim, err = lrucache(conf, nil, create_limit_obj, conf)
    if not lim then
        core.log.error("failed to instantiate a resty.limit.conn object: ", err)
        if conf.allow_degradation then
            return
        end
        return 500
    end

    local conf_key = conf.key
    local key
    if conf.key_type == "var_combination" then
        local err, n_resolved
        key, err, n_resolved = core.utils.resolve_var(conf_key, ctx.var);
        if err then
            core.log.error("could not resolve vars in ", conf_key, " error: ", err)
        end

        if n_resolved == 0 then
            key = nil
        end
    else
        key = ctx.var[conf_key]
    end

    if key == nil then
        core.log.info("The value of the configured key is empty, use client IP instead")
        -- When the value of key is empty, use client IP instead
        key = ctx.var["remote_addr"]
    end

    key = key .. ctx.conf_type .. ctx.conf_version
    core.log.info("limit key: ", key)

    local delay, err = lim:incoming(key, true)
    if not delay then
        if err == "rejected" then
            if conf.rejected_msg then
                return conf.rejected_code, { error_msg = conf.rejected_msg }
            end
            return conf.rejected_code or 503
        end

        core.log.error("failed to limit conn: ", err)
        if conf.allow_degradation then
            return
        end
        return 500
    end

    if lim:is_committed() then
        if not ctx.limit_conn then
            ctx.limit_conn = core.tablepool.fetch("plugin#limit-conn", 0, 6)
        end

        core.table.insert_tail(ctx.limit_conn, lim, key, delay, conf.only_use_default_delay)
    end

    if delay >= 0.001 then
        sleep(delay)
    end
end

从缓存中获取该配置出现的次数,并从上下文中获取请求的参数,然后进行一系列的配置来确定是否超过了限制。

  • log
function _M.decrease(conf, ctx)
    local limit_conn = ctx.limit_conn
    if not limit_conn then
        return
    end

    for i = 1, #limit_conn, 4 do
        local lim = limit_conn[i]
        local key = limit_conn[i + 1]
        local delay = limit_conn[i + 2]
        local use_delay =  limit_conn[i + 3]

        local latency
        if not use_delay then
            if ctx.proxy_passed then
                latency = ctx.var.upstream_response_time
            else
                latency = ctx.var.request_time - delay
            end
        end
        core.log.debug("request latency is ", latency) -- for test

        local conn, err = lim:leaving(key, latency)
        if not conn then
            core.log.error("failed to record the connection leaving request: ",
                           err)
            break
        end
    end

    core.tablepool.release("plugin#limit-conn", limit_conn)
    ctx.limit_conn = nil
    return
end

可观测性之syslog插件

  • 初始化
local batch_processor_manager = bp_manager_mod.new("sys logger")
local schema = {
    type = "object",
    properties = {
        host = {type = "string"},
        port = {type = "integer"},
        max_retry_times = {type = "integer", minimum = 1, default = 1},
        retry_interval = {type = "integer", minimum = 0, default = 1},
        flush_limit = {type = "integer", minimum = 1, default = 4096},
        drop_limit = {type = "integer", default = 1048576},
        timeout = {type = "integer", minimum = 1, default = 3},
        sock_type = {type = "string", default = "tcp", enum = {"tcp", "udp"}},
        pool_size = {type = "integer", minimum = 5, default = 5},
        tls = {type = "boolean", default = false},
        include_req_body = {type = "boolean", default = false}
    },
    required = {"host", "port"}
}


local lrucache = core.lrucache.new({
    ttl = 300, count = 512, serial_creating = true,
})


-- syslog uses max_retry_times/retry_interval/timeout
-- instead of max_retry_count/retry_delay/inactive_timeout
local schema = batch_processor_manager:wrap_schema(schema)
schema.max_retry_count = nil
schema.retry_delay = nil
schema.inactive_timeout = nil

local _M = {
    version = 0.1,
    priority = 401,
    name = plugin_name,
    schema = schema,
}

优先级是401,配置中包含host,port,max_retry_times,retry_interval等等,都是将日志发送到一个syslog服务器必备的选项。

  • 检查配置
function _M.check_schema(conf)
    local ok, err = core.schema.check(schema, conf)
    if not ok then
        return false, err
    end

    -- syslog uses max_retry_times/retry_interval/timeout
    -- instead of max_retry_count/retry_delay/inactive_timeout
    conf.max_retry_count = conf.max_retry_times
    conf.retry_delay = conf.retry_interval
    conf.inactive_timeout = conf.timeout
    return true
end

检查配置,并赋值。

  • 发送日志
local function send_syslog_data(conf, log_message, api_ctx)
    local err_msg
    local res = true

    core.log.info("sending a batch logs to ", conf.host, ":", conf.port)

    -- fetch it from lrucache
    local logger, err = core.lrucache.plugin_ctx(
        lrucache, api_ctx, nil, logger_socket.new, logger_socket, {
            host = conf.host,
            port = conf.port,
            flush_limit = conf.flush_limit,
            drop_limit = conf.drop_limit,
            timeout = conf.timeout,
            sock_type = conf.sock_type,
            max_retry_times = conf.max_retry_times,
            retry_interval = conf.retry_interval,
            pool_size = conf.pool_size,
            tls = conf.tls,
        }
    )

    if not logger then
        res = false
        err_msg = "failed when initiating the sys logger processor".. err
    end

    -- reuse the logger object
    local ok, err = logger:log(core.json.encode(log_message))
    if not ok then
        res = false
        err_msg = "failed to log message" .. err
    end

    return res, err_msg
end

这里的logger:log就是发送的日志。

  • log
unction _M.log(conf, ctx)
    local entry = log_util.get_full_log(ngx, conf)

    if batch_processor_manager:add_entry(conf, entry) then
        return
    end

    -- Generate a function to be executed by the batch processor
    local cp_ctx = core.table.clone(ctx)
    local func = function(entries, batch_max_size)
        local data, err
        if batch_max_size == 1 then
            data, err = core.json.encode(entries[1]) -- encode as single {}
        else
            data, err = core.json.encode(entries) -- encode as array [{}]
        end

        if not data then
            return false, 'error occurred while encoding the data: ' .. err
        end

        return send_syslog_data(conf, data, cp_ctx)
    end

    batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
end

apisix中插件是如何工作的

上节分析了四种不同类型的插件,那么究竟这些插件是怎么工作的呢?apisix里面的调度流程究竟是怎么样的? 带着上面的问题,我们来分析一下源码。

apisix.plugin

load方法

里面定义了对所有插件的加载,同时,其在init_worker中被调用

run_plugin方法

function _M.run_plugin(phase, plugins, api_ctx)
  local plugin_run = false
  api_ctx = api_ctx or ngx.ctx.api_ctx
  if not api_ctx then
    return
  end

  plugins = plugins or api_ctx.plugins
  if not plugins or #plugins == 0 then
    return api_ctx
  end

  if phase ~= "log"
    and phase ~= "header_filter"
    and phase ~= "body_filter"
    then
    for i = 1, #plugins, 2 do
      local phase_func = plugins[i][phase]
      if phase_func then
        plugin_run = true
        local code, body = phase_func(plugins[i + 1], api_ctx)
        if code or body then
          if is_http then
            if code >= 400 then
              core.log.warn(plugins[i].name, " exits with http status code ", code)
            end

            core.response.exit(code, body)
          else
            if code >= 400 then
              core.log.warn(plugins[i].name, " exits with status code ", code)
            end

            ngx_exit(1)
          end
        end
      end
    end
    return api_ctx, plugin_run
  end

  for i = 1, #plugins, 2 do
    local phase_func = plugins[i][phase]
    if phase_func then
      plugin_run = true
      phase_func(plugins[i + 1], api_ctx)
    end
  end

  return api_ctx, plugin_run
end

可以看见,这里提到了如果是Log/header_filter/body_filter,就执行方法并根据结果打印日志,如果是别的则只执行方法。

apisix.init

http_init_worker

function _M.http_init_worker()
    local seed, err = core.utils.get_seed_from_urandom()
    if not seed then
        core.log.warn('failed to get seed from urandom: ', err)
        seed = ngx_now() * 1000 + ngx.worker.pid()
    end
    math.randomseed(seed)
    -- for testing only
    core.log.info("random test in [1, 10000]: ", math.random(1, 10000))

    local we = require("resty.worker.events")
    local ok, err = we.configure({shm = "worker-events", interval = 0.1})
    if not ok then
        error("failed to init worker event: " .. err)
    end
    local discovery = require("apisix.discovery.init").discovery
    if discovery and discovery.init_worker then
        discovery.init_worker()
    end
    require("apisix.balancer").init_worker()
    load_balancer = require("apisix.balancer")
    require("apisix.admin.init").init_worker()

    require("apisix.timers").init_worker()

    require("apisix.debug").init_worker()

    plugin.init_worker()
    router.http_init_worker()
    require("apisix.http.service").init_worker()
    plugin_config.init_worker()
    require("apisix.consumer").init_worker()

    if core.config == require("apisix.core.config_yaml") then
        core.config.init_worker()
    end

    apisix_upstream.init_worker()
    require("apisix.plugins.ext-plugin.init").init_worker()

    local_conf = core.config.local_conf()

    if local_conf.apisix and local_conf.apisix.enable_server_tokens == false then
        ver_header = "APISIX"
    end
end

这一个方法是核心,在apisix的nginx.conf的配置文件中,有如下配置:

init_worker_by_lua_block {
  apisix.http_init_worker()
}

可见,该方法是在nginx启动的时候被执行的。

http_access_phase

在nginx.conf中有如下配置

access_by_lua_block {
  apisix.http_access_phase()
}

跟插件有关的主要调用了

plugin.run_global_rules(api_ctx, router.global_rules, nil)
local plugins = plugin.filter(api_ctx, route)
plugin.run_plugin("rewrite", plugins, api_ctx)

而这里的plugin是plugin目录下的 apisix.plugin

http_header_filter_phase

header_filter_by_lua_block {
  apisix.http_header_filter_phase()
}

处理header,没有调用Plugin

http_body_filter_phase

body_filter_by_lua_block {
  apisix.http_body_filter_phase()
}

处理body,没有调用plugin

http_log_phase

log_by_lua_block {
  apisix.http_log_phase()
}

处理log,没有调用plugin

标准模式分析

插件类型分类与作用

根据昨天的分析,插件的类型主要有身份验证、安全防护、流量控制、无服务架构和可观测性。

插件的分类介绍

官方中文插件介绍

auth

当一个插件设置 type = 'auth',说明它是个认证插件,认证插件需要在执行后选择对应的 consumer。举个例子,在 key-auth 插件中,它通过 apikey 请求头获取对应的 consumer,然后通过 consumer.attach_consumer 设置它。

其余插件代码中都不需要刻意去指定

编写自定义插件的步骤与方法

官方中文扩展编写介绍

目录
相关文章
|
25天前
|
监控 负载均衡 API
Apache Apisix轻松打造亿级流量Api网关
Apache APISIX 是一个动态、实时、高性能的 API 网关,提供负载均衡、动态上行、灰度发布、熔断、鉴权、可观测等丰富的流量管理功能。适用于处理传统南北向流量、服务间东西向流量及 k8s 入口控制。Airflow 是一个可编程、调度和监控的工作流平台,基于有向无环图 (DAG) 定义和执行任务,提供丰富的命令行工具和 Web 管理界面,方便系统运维和管理。
Apache Apisix轻松打造亿级流量Api网关
|
18天前
|
运维 Cloud Native Java
热联集团:从 APISIX 迁移到云原生网关
我们将核心业务系统从 IDC 全栈迁移到阿里云后,并采用了云原生 API 网关,通过其独有的软硬一体的加速方案,相比普通 HTTPS 请求 TLS 握手时延降低一倍,极限 QPS 提升 80% 以上,运维效率也提升了 50%,此外,我们把 Nacos 迁移到 MSE Nacos,稳定性、性能和运维成本等方面都具备了明显的优势。
|
26天前
|
监控 安全 API
拥抱开源:下一代API管理工具Kong的崛起
【10月更文挑战第27天】在微服务架构和API经济的推动下,API管理成为软件开发的关键环节。Kong作为开源的API管理平台,凭借其灵活性和强大功能,受到开发者的青睐。本文探讨了Kong的核心特性、使用技巧及其在企业中的应用,帮助读者更好地理解和利用这一工具。
|
3月前
|
人工智能 Serverless API
一键服务化:从魔搭开源模型到OpenAI API服务
在多样化大模型的背后,OpenAI得益于在领域的先发优势,其API接口今天也成为了业界的一个事实标准。
一键服务化:从魔搭开源模型到OpenAI API服务
|
3月前
|
编解码 自然语言处理 机器人
通义千问Qwen2-VL开源,API可直接调用!
通义千问宣布开源第二代视觉语言模型Qwen2-VL,并推出2B、7B两个尺寸及其量化版本模型。同时,旗舰模型Qwen2-VL-72B的API已上线阿里云百炼平台,用户可直接调用。
1201 9
|
2月前
|
监控 API 数据安全/隐私保护
2024年开源API工具盘点,覆盖API全生命周期
2024年经济持续低迷,本文整理一些免费的开源工具,旨在帮助企业组织降低工具的支出成本,能用免费的何必用付费的呢(狗头)?
73 0
|
3月前
|
前端开发 关系型数据库 MySQL
ThingsGateway:一款基于.NET8开源的跨平台高性能边缘采集网关
ThingsGateway:一款基于.NET8开源的跨平台高性能边缘采集网关
101 2
|
4月前
|
运维 监控 Cloud Native
|
4月前
|
缓存 JavaScript 前端开发
为开源项目 go-gin-api 增加 WebSocket 模块
为开源项目 go-gin-api 增加 WebSocket 模块
47 2
|
5月前
|
JSON Shell API
阿里云PAI-Stable Diffusion开源代码浅析之(一)所有api的入参如何看
阿里云PAI-Stable Diffusion开源代码浅析之所有api的入参如何看