开源API网关APISIX源码分析(二)

简介: 开源API网关APISIX源码分析(二)

使用与代码对照分析

流量控制之limit-conn

代码实现

  • 初始化
local plugin_name = "limit-conn"
local schema = {
    type = "object",
    properties = {
        conn = {type = "integer", exclusiveMinimum = 0},
        burst = {type = "integer",  minimum = 0},
        default_conn_delay = {type = "number", exclusiveMinimum = 0},
        only_use_default_delay = {type = "boolean", default = false},
        key = {type = "string"},
        key_type = {type = "string",
            enum = {"var", "var_combination"},
            default = "var",
        },
        rejected_code = {
            type = "integer", minimum = 200, maximum = 599, default = 503
        },
        rejected_msg = {
            type = "string", minLength = 1
        },
        allow_degradation = {type = "boolean", default = false}
    },
    required = {"conn", "burst", "default_conn_delay", "key"}
}

local _M = {
    version = 0.1,
    priority = 1003,
    name = plugin_name,
    schema = schema,
}

优先级是1003,包含连接数,爆破数,默认连接延迟,是否只使用默认延迟,key,关键词类型,拒绝响应码,拒绝信息。必要的信息是连接数,爆破树,默认链接延迟,key。

  • 检查配置是否符合
function _M.check_schema(conf)
    return core.schema.check(schema, conf)
end
  • access方法
function _M.increase(conf, ctx)
    core.log.info("ver: ", ctx.conf_version)
    local lim, err = lrucache(conf, nil, create_limit_obj, conf)
    if not lim then
        core.log.error("failed to instantiate a resty.limit.conn object: ", err)
        if conf.allow_degradation then
            return
        end
        return 500
    end

    local conf_key = conf.key
    local key
    if conf.key_type == "var_combination" then
        local err, n_resolved
        key, err, n_resolved = core.utils.resolve_var(conf_key, ctx.var);
        if err then
            core.log.error("could not resolve vars in ", conf_key, " error: ", err)
        end

        if n_resolved == 0 then
            key = nil
        end
    else
        key = ctx.var[conf_key]
    end

    if key == nil then
        core.log.info("The value of the configured key is empty, use client IP instead")
        -- When the value of key is empty, use client IP instead
        key = ctx.var["remote_addr"]
    end

    key = key .. ctx.conf_type .. ctx.conf_version
    core.log.info("limit key: ", key)

    local delay, err = lim:incoming(key, true)
    if not delay then
        if err == "rejected" then
            if conf.rejected_msg then
                return conf.rejected_code, { error_msg = conf.rejected_msg }
            end
            return conf.rejected_code or 503
        end

        core.log.error("failed to limit conn: ", err)
        if conf.allow_degradation then
            return
        end
        return 500
    end

    if lim:is_committed() then
        if not ctx.limit_conn then
            ctx.limit_conn = core.tablepool.fetch("plugin#limit-conn", 0, 6)
        end

        core.table.insert_tail(ctx.limit_conn, lim, key, delay, conf.only_use_default_delay)
    end

    if delay >= 0.001 then
        sleep(delay)
    end
end

从缓存中获取该配置出现的次数,并从上下文中获取请求的参数,然后进行一系列的配置来确定是否超过了限制。

  • log
function _M.decrease(conf, ctx)
    local limit_conn = ctx.limit_conn
    if not limit_conn then
        return
    end

    for i = 1, #limit_conn, 4 do
        local lim = limit_conn[i]
        local key = limit_conn[i + 1]
        local delay = limit_conn[i + 2]
        local use_delay =  limit_conn[i + 3]

        local latency
        if not use_delay then
            if ctx.proxy_passed then
                latency = ctx.var.upstream_response_time
            else
                latency = ctx.var.request_time - delay
            end
        end
        core.log.debug("request latency is ", latency) -- for test

        local conn, err = lim:leaving(key, latency)
        if not conn then
            core.log.error("failed to record the connection leaving request: ",
                           err)
            break
        end
    end

    core.tablepool.release("plugin#limit-conn", limit_conn)
    ctx.limit_conn = nil
    return
end

可观测性之syslog插件

  • 初始化
local batch_processor_manager = bp_manager_mod.new("sys logger")
local schema = {
    type = "object",
    properties = {
        host = {type = "string"},
        port = {type = "integer"},
        max_retry_times = {type = "integer", minimum = 1, default = 1},
        retry_interval = {type = "integer", minimum = 0, default = 1},
        flush_limit = {type = "integer", minimum = 1, default = 4096},
        drop_limit = {type = "integer", default = 1048576},
        timeout = {type = "integer", minimum = 1, default = 3},
        sock_type = {type = "string", default = "tcp", enum = {"tcp", "udp"}},
        pool_size = {type = "integer", minimum = 5, default = 5},
        tls = {type = "boolean", default = false},
        include_req_body = {type = "boolean", default = false}
    },
    required = {"host", "port"}
}


local lrucache = core.lrucache.new({
    ttl = 300, count = 512, serial_creating = true,
})


-- syslog uses max_retry_times/retry_interval/timeout
-- instead of max_retry_count/retry_delay/inactive_timeout
local schema = batch_processor_manager:wrap_schema(schema)
schema.max_retry_count = nil
schema.retry_delay = nil
schema.inactive_timeout = nil

local _M = {
    version = 0.1,
    priority = 401,
    name = plugin_name,
    schema = schema,
}

优先级是401,配置中包含host,port,max_retry_times,retry_interval等等,都是将日志发送到一个syslog服务器必备的选项。

  • 检查配置
function _M.check_schema(conf)
    local ok, err = core.schema.check(schema, conf)
    if not ok then
        return false, err
    end

    -- syslog uses max_retry_times/retry_interval/timeout
    -- instead of max_retry_count/retry_delay/inactive_timeout
    conf.max_retry_count = conf.max_retry_times
    conf.retry_delay = conf.retry_interval
    conf.inactive_timeout = conf.timeout
    return true
end

检查配置,并赋值。

  • 发送日志
local function send_syslog_data(conf, log_message, api_ctx)
    local err_msg
    local res = true

    core.log.info("sending a batch logs to ", conf.host, ":", conf.port)

    -- fetch it from lrucache
    local logger, err = core.lrucache.plugin_ctx(
        lrucache, api_ctx, nil, logger_socket.new, logger_socket, {
            host = conf.host,
            port = conf.port,
            flush_limit = conf.flush_limit,
            drop_limit = conf.drop_limit,
            timeout = conf.timeout,
            sock_type = conf.sock_type,
            max_retry_times = conf.max_retry_times,
            retry_interval = conf.retry_interval,
            pool_size = conf.pool_size,
            tls = conf.tls,
        }
    )

    if not logger then
        res = false
        err_msg = "failed when initiating the sys logger processor".. err
    end

    -- reuse the logger object
    local ok, err = logger:log(core.json.encode(log_message))
    if not ok then
        res = false
        err_msg = "failed to log message" .. err
    end

    return res, err_msg
end

这里的logger:log就是发送的日志。

  • log
unction _M.log(conf, ctx)
    local entry = log_util.get_full_log(ngx, conf)

    if batch_processor_manager:add_entry(conf, entry) then
        return
    end

    -- Generate a function to be executed by the batch processor
    local cp_ctx = core.table.clone(ctx)
    local func = function(entries, batch_max_size)
        local data, err
        if batch_max_size == 1 then
            data, err = core.json.encode(entries[1]) -- encode as single {}
        else
            data, err = core.json.encode(entries) -- encode as array [{}]
        end

        if not data then
            return false, 'error occurred while encoding the data: ' .. err
        end

        return send_syslog_data(conf, data, cp_ctx)
    end

    batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
end

apisix中插件是如何工作的

上节分析了四种不同类型的插件,那么究竟这些插件是怎么工作的呢?apisix里面的调度流程究竟是怎么样的? 带着上面的问题,我们来分析一下源码。

apisix.plugin

load方法

里面定义了对所有插件的加载,同时,其在init_worker中被调用

run_plugin方法

function _M.run_plugin(phase, plugins, api_ctx)
  local plugin_run = false
  api_ctx = api_ctx or ngx.ctx.api_ctx
  if not api_ctx then
    return
  end

  plugins = plugins or api_ctx.plugins
  if not plugins or #plugins == 0 then
    return api_ctx
  end

  if phase ~= "log"
    and phase ~= "header_filter"
    and phase ~= "body_filter"
    then
    for i = 1, #plugins, 2 do
      local phase_func = plugins[i][phase]
      if phase_func then
        plugin_run = true
        local code, body = phase_func(plugins[i + 1], api_ctx)
        if code or body then
          if is_http then
            if code >= 400 then
              core.log.warn(plugins[i].name, " exits with http status code ", code)
            end

            core.response.exit(code, body)
          else
            if code >= 400 then
              core.log.warn(plugins[i].name, " exits with status code ", code)
            end

            ngx_exit(1)
          end
        end
      end
    end
    return api_ctx, plugin_run
  end

  for i = 1, #plugins, 2 do
    local phase_func = plugins[i][phase]
    if phase_func then
      plugin_run = true
      phase_func(plugins[i + 1], api_ctx)
    end
  end

  return api_ctx, plugin_run
end

可以看见,这里提到了如果是Log/header_filter/body_filter,就执行方法并根据结果打印日志,如果是别的则只执行方法。

apisix.init

http_init_worker

function _M.http_init_worker()
    local seed, err = core.utils.get_seed_from_urandom()
    if not seed then
        core.log.warn('failed to get seed from urandom: ', err)
        seed = ngx_now() * 1000 + ngx.worker.pid()
    end
    math.randomseed(seed)
    -- for testing only
    core.log.info("random test in [1, 10000]: ", math.random(1, 10000))

    local we = require("resty.worker.events")
    local ok, err = we.configure({shm = "worker-events", interval = 0.1})
    if not ok then
        error("failed to init worker event: " .. err)
    end
    local discovery = require("apisix.discovery.init").discovery
    if discovery and discovery.init_worker then
        discovery.init_worker()
    end
    require("apisix.balancer").init_worker()
    load_balancer = require("apisix.balancer")
    require("apisix.admin.init").init_worker()

    require("apisix.timers").init_worker()

    require("apisix.debug").init_worker()

    plugin.init_worker()
    router.http_init_worker()
    require("apisix.http.service").init_worker()
    plugin_config.init_worker()
    require("apisix.consumer").init_worker()

    if core.config == require("apisix.core.config_yaml") then
        core.config.init_worker()
    end

    apisix_upstream.init_worker()
    require("apisix.plugins.ext-plugin.init").init_worker()

    local_conf = core.config.local_conf()

    if local_conf.apisix and local_conf.apisix.enable_server_tokens == false then
        ver_header = "APISIX"
    end
end

这一个方法是核心,在apisix的nginx.conf的配置文件中,有如下配置:

init_worker_by_lua_block {
  apisix.http_init_worker()
}

可见,该方法是在nginx启动的时候被执行的。

http_access_phase

在nginx.conf中有如下配置

access_by_lua_block {
  apisix.http_access_phase()
}

跟插件有关的主要调用了

plugin.run_global_rules(api_ctx, router.global_rules, nil)
local plugins = plugin.filter(api_ctx, route)
plugin.run_plugin("rewrite", plugins, api_ctx)

而这里的plugin是plugin目录下的 apisix.plugin

http_header_filter_phase

header_filter_by_lua_block {
  apisix.http_header_filter_phase()
}

处理header,没有调用Plugin

http_body_filter_phase

body_filter_by_lua_block {
  apisix.http_body_filter_phase()
}

处理body,没有调用plugin

http_log_phase

log_by_lua_block {
  apisix.http_log_phase()
}

处理log,没有调用plugin

标准模式分析

插件类型分类与作用

根据昨天的分析,插件的类型主要有身份验证、安全防护、流量控制、无服务架构和可观测性。

插件的分类介绍

官方中文插件介绍

auth

当一个插件设置 type = 'auth',说明它是个认证插件,认证插件需要在执行后选择对应的 consumer。举个例子,在 key-auth 插件中,它通过 apikey 请求头获取对应的 consumer,然后通过 consumer.attach_consumer 设置它。

其余插件代码中都不需要刻意去指定

编写自定义插件的步骤与方法

官方中文扩展编写介绍

目录
相关文章
|
1月前
|
存储 SQL API
milvus insert api流程源码分析
milvus insert api流程源码分析
46 3
|
2月前
|
JSON 缓存 应用服务中间件
开源API网关APISIX源码分析(一)
开源API网关APISIX源码分析
98 0
|
1月前
|
人工智能 JavaScript API
互联网人的福利!『昆仑天工』4款AI产品开源!提供API对接!
互联网人的福利!『昆仑天工』4款AI产品开源!提供API对接!
181 0
|
1月前
|
Web App开发 监控 中间件
【开源视频联动物联网平台】视频接入网关的用法
【开源视频联动物联网平台】视频接入网关的用法
40 1
|
1月前
|
传感器 边缘计算 物联网
【开源视频联动物联网平台】为什么需要物联网网关?
【开源视频联动物联网平台】为什么需要物联网网关?
30 2
|
2月前
|
存储 API
milvus insert api的数据结构源码分析
milvus insert api的数据结构源码分析
830 6
milvus insert api的数据结构源码分析
|
18天前
|
缓存 前端开发 API
API接口封装系列
API(Application Programming Interface)接口封装是将系统内部的功能封装成可复用的程序接口并向外部提供,以便其他系统调用和使用这些功能,通过这种方式实现系统之间的通信和协作。下面将介绍API接口封装的一些关键步骤和注意事项。
|
25天前
|
监控 前端开发 JavaScript
实战篇:商品API接口在跨平台销售中的有效运用与案例解析
随着电子商务的蓬勃发展,企业为了扩大市场覆盖面,经常需要在多个在线平台上展示和销售产品。然而,手工管理多个平台的库存、价格、商品描述等信息既耗时又容易出错。商品API接口在这一背景下显得尤为重要,它能够帮助企业在不同的销售平台之间实现商品信息的高效同步和管理。本文将通过具体的淘宝API接口使用案例,展示如何在跨平台销售中有效利用商品API接口,以及如何通过代码实现数据的统一管理。
|
1月前
|
安全 算法 API
产品经理必备知识——API接口
前言 在古代,我们的传输信息的方式有很多,比如写信、飞鸽传书,以及在战争中使用的烽烟,才有了著名的烽火戏诸侯,但这些方式传输信息的效率终究还是无法满足高速发展的社会需要。如今万物互联的时代,我通过一部手机就可以实现衣食住行的方方面面,比如:在家购物、远程控制家电、自动驾驶等等,背后都离不开我们今天要聊的API接口。
|
1天前
|
前端开发 Java 测试技术
IDEA 版 API 接口神器来了,一键生成文档,贼香!
IDEA 版 API 接口神器来了,一键生成文档,贼香!
7 0