开源API网关APISIX源码分析(二)

简介: 开源API网关APISIX源码分析(二)

使用与代码对照分析

流量控制之limit-conn

代码实现

  • 初始化
local plugin_name = "limit-conn"
local schema = {
    type = "object",
    properties = {
        conn = {type = "integer", exclusiveMinimum = 0},
        burst = {type = "integer",  minimum = 0},
        default_conn_delay = {type = "number", exclusiveMinimum = 0},
        only_use_default_delay = {type = "boolean", default = false},
        key = {type = "string"},
        key_type = {type = "string",
            enum = {"var", "var_combination"},
            default = "var",
        },
        rejected_code = {
            type = "integer", minimum = 200, maximum = 599, default = 503
        },
        rejected_msg = {
            type = "string", minLength = 1
        },
        allow_degradation = {type = "boolean", default = false}
    },
    required = {"conn", "burst", "default_conn_delay", "key"}
}

local _M = {
    version = 0.1,
    priority = 1003,
    name = plugin_name,
    schema = schema,
}

优先级是1003,包含连接数,爆破数,默认连接延迟,是否只使用默认延迟,key,关键词类型,拒绝响应码,拒绝信息。必要的信息是连接数,爆破树,默认链接延迟,key。

  • 检查配置是否符合
function _M.check_schema(conf)
    return core.schema.check(schema, conf)
end
  • access方法
function _M.increase(conf, ctx)
    core.log.info("ver: ", ctx.conf_version)
    local lim, err = lrucache(conf, nil, create_limit_obj, conf)
    if not lim then
        core.log.error("failed to instantiate a resty.limit.conn object: ", err)
        if conf.allow_degradation then
            return
        end
        return 500
    end

    local conf_key = conf.key
    local key
    if conf.key_type == "var_combination" then
        local err, n_resolved
        key, err, n_resolved = core.utils.resolve_var(conf_key, ctx.var);
        if err then
            core.log.error("could not resolve vars in ", conf_key, " error: ", err)
        end

        if n_resolved == 0 then
            key = nil
        end
    else
        key = ctx.var[conf_key]
    end

    if key == nil then
        core.log.info("The value of the configured key is empty, use client IP instead")
        -- When the value of key is empty, use client IP instead
        key = ctx.var["remote_addr"]
    end

    key = key .. ctx.conf_type .. ctx.conf_version
    core.log.info("limit key: ", key)

    local delay, err = lim:incoming(key, true)
    if not delay then
        if err == "rejected" then
            if conf.rejected_msg then
                return conf.rejected_code, { error_msg = conf.rejected_msg }
            end
            return conf.rejected_code or 503
        end

        core.log.error("failed to limit conn: ", err)
        if conf.allow_degradation then
            return
        end
        return 500
    end

    if lim:is_committed() then
        if not ctx.limit_conn then
            ctx.limit_conn = core.tablepool.fetch("plugin#limit-conn", 0, 6)
        end

        core.table.insert_tail(ctx.limit_conn, lim, key, delay, conf.only_use_default_delay)
    end

    if delay >= 0.001 then
        sleep(delay)
    end
end

从缓存中获取该配置出现的次数,并从上下文中获取请求的参数,然后进行一系列的配置来确定是否超过了限制。

  • log
function _M.decrease(conf, ctx)
    local limit_conn = ctx.limit_conn
    if not limit_conn then
        return
    end

    for i = 1, #limit_conn, 4 do
        local lim = limit_conn[i]
        local key = limit_conn[i + 1]
        local delay = limit_conn[i + 2]
        local use_delay =  limit_conn[i + 3]

        local latency
        if not use_delay then
            if ctx.proxy_passed then
                latency = ctx.var.upstream_response_time
            else
                latency = ctx.var.request_time - delay
            end
        end
        core.log.debug("request latency is ", latency) -- for test

        local conn, err = lim:leaving(key, latency)
        if not conn then
            core.log.error("failed to record the connection leaving request: ",
                           err)
            break
        end
    end

    core.tablepool.release("plugin#limit-conn", limit_conn)
    ctx.limit_conn = nil
    return
end

可观测性之syslog插件

  • 初始化
local batch_processor_manager = bp_manager_mod.new("sys logger")
local schema = {
    type = "object",
    properties = {
        host = {type = "string"},
        port = {type = "integer"},
        max_retry_times = {type = "integer", minimum = 1, default = 1},
        retry_interval = {type = "integer", minimum = 0, default = 1},
        flush_limit = {type = "integer", minimum = 1, default = 4096},
        drop_limit = {type = "integer", default = 1048576},
        timeout = {type = "integer", minimum = 1, default = 3},
        sock_type = {type = "string", default = "tcp", enum = {"tcp", "udp"}},
        pool_size = {type = "integer", minimum = 5, default = 5},
        tls = {type = "boolean", default = false},
        include_req_body = {type = "boolean", default = false}
    },
    required = {"host", "port"}
}


local lrucache = core.lrucache.new({
    ttl = 300, count = 512, serial_creating = true,
})


-- syslog uses max_retry_times/retry_interval/timeout
-- instead of max_retry_count/retry_delay/inactive_timeout
local schema = batch_processor_manager:wrap_schema(schema)
schema.max_retry_count = nil
schema.retry_delay = nil
schema.inactive_timeout = nil

local _M = {
    version = 0.1,
    priority = 401,
    name = plugin_name,
    schema = schema,
}

优先级是401,配置中包含host,port,max_retry_times,retry_interval等等,都是将日志发送到一个syslog服务器必备的选项。

  • 检查配置
function _M.check_schema(conf)
    local ok, err = core.schema.check(schema, conf)
    if not ok then
        return false, err
    end

    -- syslog uses max_retry_times/retry_interval/timeout
    -- instead of max_retry_count/retry_delay/inactive_timeout
    conf.max_retry_count = conf.max_retry_times
    conf.retry_delay = conf.retry_interval
    conf.inactive_timeout = conf.timeout
    return true
end

检查配置,并赋值。

  • 发送日志
local function send_syslog_data(conf, log_message, api_ctx)
    local err_msg
    local res = true

    core.log.info("sending a batch logs to ", conf.host, ":", conf.port)

    -- fetch it from lrucache
    local logger, err = core.lrucache.plugin_ctx(
        lrucache, api_ctx, nil, logger_socket.new, logger_socket, {
            host = conf.host,
            port = conf.port,
            flush_limit = conf.flush_limit,
            drop_limit = conf.drop_limit,
            timeout = conf.timeout,
            sock_type = conf.sock_type,
            max_retry_times = conf.max_retry_times,
            retry_interval = conf.retry_interval,
            pool_size = conf.pool_size,
            tls = conf.tls,
        }
    )

    if not logger then
        res = false
        err_msg = "failed when initiating the sys logger processor".. err
    end

    -- reuse the logger object
    local ok, err = logger:log(core.json.encode(log_message))
    if not ok then
        res = false
        err_msg = "failed to log message" .. err
    end

    return res, err_msg
end

这里的logger:log就是发送的日志。

  • log
unction _M.log(conf, ctx)
    local entry = log_util.get_full_log(ngx, conf)

    if batch_processor_manager:add_entry(conf, entry) then
        return
    end

    -- Generate a function to be executed by the batch processor
    local cp_ctx = core.table.clone(ctx)
    local func = function(entries, batch_max_size)
        local data, err
        if batch_max_size == 1 then
            data, err = core.json.encode(entries[1]) -- encode as single {}
        else
            data, err = core.json.encode(entries) -- encode as array [{}]
        end

        if not data then
            return false, 'error occurred while encoding the data: ' .. err
        end

        return send_syslog_data(conf, data, cp_ctx)
    end

    batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
end

apisix中插件是如何工作的

上节分析了四种不同类型的插件,那么究竟这些插件是怎么工作的呢?apisix里面的调度流程究竟是怎么样的? 带着上面的问题,我们来分析一下源码。

apisix.plugin

load方法

里面定义了对所有插件的加载,同时,其在init_worker中被调用

run_plugin方法

function _M.run_plugin(phase, plugins, api_ctx)
  local plugin_run = false
  api_ctx = api_ctx or ngx.ctx.api_ctx
  if not api_ctx then
    return
  end

  plugins = plugins or api_ctx.plugins
  if not plugins or #plugins == 0 then
    return api_ctx
  end

  if phase ~= "log"
    and phase ~= "header_filter"
    and phase ~= "body_filter"
    then
    for i = 1, #plugins, 2 do
      local phase_func = plugins[i][phase]
      if phase_func then
        plugin_run = true
        local code, body = phase_func(plugins[i + 1], api_ctx)
        if code or body then
          if is_http then
            if code >= 400 then
              core.log.warn(plugins[i].name, " exits with http status code ", code)
            end

            core.response.exit(code, body)
          else
            if code >= 400 then
              core.log.warn(plugins[i].name, " exits with status code ", code)
            end

            ngx_exit(1)
          end
        end
      end
    end
    return api_ctx, plugin_run
  end

  for i = 1, #plugins, 2 do
    local phase_func = plugins[i][phase]
    if phase_func then
      plugin_run = true
      phase_func(plugins[i + 1], api_ctx)
    end
  end

  return api_ctx, plugin_run
end

可以看见,这里提到了如果是Log/header_filter/body_filter,就执行方法并根据结果打印日志,如果是别的则只执行方法。

apisix.init

http_init_worker

function _M.http_init_worker()
    local seed, err = core.utils.get_seed_from_urandom()
    if not seed then
        core.log.warn('failed to get seed from urandom: ', err)
        seed = ngx_now() * 1000 + ngx.worker.pid()
    end
    math.randomseed(seed)
    -- for testing only
    core.log.info("random test in [1, 10000]: ", math.random(1, 10000))

    local we = require("resty.worker.events")
    local ok, err = we.configure({shm = "worker-events", interval = 0.1})
    if not ok then
        error("failed to init worker event: " .. err)
    end
    local discovery = require("apisix.discovery.init").discovery
    if discovery and discovery.init_worker then
        discovery.init_worker()
    end
    require("apisix.balancer").init_worker()
    load_balancer = require("apisix.balancer")
    require("apisix.admin.init").init_worker()

    require("apisix.timers").init_worker()

    require("apisix.debug").init_worker()

    plugin.init_worker()
    router.http_init_worker()
    require("apisix.http.service").init_worker()
    plugin_config.init_worker()
    require("apisix.consumer").init_worker()

    if core.config == require("apisix.core.config_yaml") then
        core.config.init_worker()
    end

    apisix_upstream.init_worker()
    require("apisix.plugins.ext-plugin.init").init_worker()

    local_conf = core.config.local_conf()

    if local_conf.apisix and local_conf.apisix.enable_server_tokens == false then
        ver_header = "APISIX"
    end
end

这一个方法是核心,在apisix的nginx.conf的配置文件中,有如下配置:

init_worker_by_lua_block {
  apisix.http_init_worker()
}

可见,该方法是在nginx启动的时候被执行的。

http_access_phase

在nginx.conf中有如下配置

access_by_lua_block {
  apisix.http_access_phase()
}

跟插件有关的主要调用了

plugin.run_global_rules(api_ctx, router.global_rules, nil)
local plugins = plugin.filter(api_ctx, route)
plugin.run_plugin("rewrite", plugins, api_ctx)

而这里的plugin是plugin目录下的 apisix.plugin

http_header_filter_phase

header_filter_by_lua_block {
  apisix.http_header_filter_phase()
}

处理header,没有调用Plugin

http_body_filter_phase

body_filter_by_lua_block {
  apisix.http_body_filter_phase()
}

处理body,没有调用plugin

http_log_phase

log_by_lua_block {
  apisix.http_log_phase()
}

处理log,没有调用plugin

标准模式分析

插件类型分类与作用

根据昨天的分析,插件的类型主要有身份验证、安全防护、流量控制、无服务架构和可观测性。

插件的分类介绍

官方中文插件介绍

auth

当一个插件设置 type = 'auth',说明它是个认证插件,认证插件需要在执行后选择对应的 consumer。举个例子,在 key-auth 插件中,它通过 apikey 请求头获取对应的 consumer,然后通过 consumer.attach_consumer 设置它。

其余插件代码中都不需要刻意去指定

编写自定义插件的步骤与方法

官方中文扩展编写介绍

目录
相关文章
|
1月前
|
存储 SQL API
milvus insert api流程源码分析
milvus insert api流程源码分析
43 3
|
2月前
|
JSON 缓存 应用服务中间件
开源API网关APISIX源码分析(一)
开源API网关APISIX源码分析
93 0
|
1月前
|
人工智能 JavaScript API
互联网人的福利!『昆仑天工』4款AI产品开源!提供API对接!
互联网人的福利!『昆仑天工』4款AI产品开源!提供API对接!
171 0
|
1月前
|
Web App开发 监控 中间件
【开源视频联动物联网平台】视频接入网关的用法
【开源视频联动物联网平台】视频接入网关的用法
37 1
|
1月前
|
传感器 边缘计算 物联网
【开源视频联动物联网平台】为什么需要物联网网关?
【开源视频联动物联网平台】为什么需要物联网网关?
28 2
|
2月前
|
存储 API
milvus insert api的数据结构源码分析
milvus insert api的数据结构源码分析
827 6
milvus insert api的数据结构源码分析
|
1月前
|
运维 网络协议 安全
长连接网关技术专题(十):百度基于Go的千万级统一长连接服务架构实践
本文将介绍百度基于golang实现的统一长连接服务,从统一长连接功能实现和性能优化等角度,描述了其在设计、开发和维护过程中面临的问题和挑战,并重点介绍了解决相关问题和挑战的方案和实践经验。
78 1
|
5月前
|
负载均衡 应用服务中间件 API
微服务技术系列教程(25) - SpringCloud- 接口网关服务Zuul
微服务技术系列教程(25) - SpringCloud- 接口网关服务Zuul
58 0
|
4月前
|
负载均衡 Cloud Native Java
【云原生】Spring Cloud Alibaba 之 Gateway 服务网关实战开发
【云原生】Spring Cloud Alibaba 之 Gateway 服务网关实战开发
372 0
|
2月前
|
缓存 安全 API
【亿级数据专题】「高并发架构」盘点本年度探索对外服务的百万请求量的API网关设计实现
公司对外开放的OpenAPI-Server服务,作为核心内部系统与外部系统之间的重要通讯枢纽,每天处理数百万次的API调用、亿级别的消息推送以及TB/PB级别的数据同步。经过多年流量的持续增长,该服务体系依然稳固可靠,展现出强大的负载能力。
55 9
【亿级数据专题】「高并发架构」盘点本年度探索对外服务的百万请求量的API网关设计实现