开源API网关APISIX源码分析(一)

本文涉及的产品
云原生 API 网关,700元额度,多规格可选
简介: 开源API网关APISIX源码分析

APISIX主框架代码分析

apisix.core

core.schema

配置文件与配置模板进行对比,看是否满足条件

core.table

对lua自带table的扩展,增加了一些功能

core.log

使用了nginx 的errlog模块,估计是将结果输出到nginx的errlog中

core.json

对json处理,主要用到了cjson和dkjson。

core.request

对ngx.req的封装

core.response

对ngx.resp的封装

core.utils

封装的工具

core.lrucache

对resty.lrucache的封装

插件编写分析

现有插件分析

身份验证之ldap-auth

代码实现

  • 定义基本信息
local schema = {
    type = "object",
    title = "work with route or service object",
    properties = {
        base_dn = { type = "string" },
        ldap_uri = { type = "string" },
        use_tls = { type = "boolean" },
        uid = { type = "string" }
    },
    required = {"base_dn","ldap_uri"},
}

local consumer_schema = {
    type = "object",
    title = "work with consumer object",
    properties = {
        user_dn = { type = "string" },
    },
    required = {"user_dn"},
}

local plugin_name = "ldap-auth"

local _M = {
    version = 0.1,
    priority = 2540,
    type = 'auth',
    name = plugin_name,
    schema = schema,
    consumer_schema = consumer_schema
}

插件的配置信息,包含版本,作用的优先级(插件类似于中间件是堆叠在上面的,优先级指定了谁先执行谁后执行),类型为auth,schema和consumer_schema定义了自己的配置和后续的配置。

  • 检查配置的方法
function _M.check_schema(conf, schema_type)
    local ok, err
    if schema_type == core.schema.TYPE_CONSUMER then
        ok, err = core.schema.check(consumer_schema, conf)
    else
        ok, err = core.schema.check(schema, conf)
    end

    return ok, err
end

这里用到了core里面的方法,这里的core是

local core = require("apisix.core")

即apisix的主框架代码,这部分内容,在上面已经提到了,搞忘了实现原理的朋友可以回看一下。

  • 构造消费者缓存
local create_consumer_cache
do
    local consumer_names = {}

    function create_consumer_cache(consumers)
        core.table.clear(consumer_names)

        for _, consumer in ipairs(consumers.nodes) do
            core.log.info("consumer node: ", core.json.delay_encode(consumer))
            consumer_names[consumer.auth_conf.user_dn] = consumer
        end

        return consumer_names
    end

end
  • 匹配并处理授权的header
local function extract_auth_header(authorization)
    local obj = { username = "", password = "" }

    local m, err = ngx.re.match(authorization, "Basic\\s(.+)", "jo")
    if err then
        -- error authorization
        return nil, err
    end

    if not m then
        return nil, "Invalid authorization header format"
    end

    local decoded = ngx.decode_base64(m[1])

    if not decoded then
        return nil, "Failed to decode authentication header: " .. m[1]
    end

    local res
    res, err = ngx_re.split(decoded, ":")
    if err then
        return nil, "Split authorization err:" .. err
    end
    if #res < 2 then
        return nil, "Split authorization err: invalid decoded data: " .. decoded
    end

    obj.username = ngx.re.gsub(res[1], "\\s+", "", "jo")
    obj.password = ngx.re.gsub(res[2], "\\s+", "", "jo")

    return obj, nil
end
校验并重写响应
function _M.rewrite(conf, ctx)
    core.log.info("plugin rewrite phase, conf: ", core.json.delay_encode(conf))

    -- 1. extract authorization from header
    local auth_header = core.request.header(ctx, "Authorization")
    if not auth_header then
        core.response.set_header("WWW-Authenticate", "Basic realm='.'")
        return 401, { message = "Missing authorization in request" }
    end

    local user, err = extract_auth_header(auth_header)
    if err then
        return 401, { message = err }
    end

    -- 2. try authenticate the user against the ldap server
    local uid = conf.uid or "cn"

    local userdn =  uid .. "=" .. user.username .. "," .. conf.base_dn
    local ld = lualdap.open_simple (conf.ldap_uri, userdn, user.password, conf.use_tls)
    if not ld then
        return 401, { message = "Invalid user authorization" }
    end

    -- 3. Retrieve consumer for authorization plugin
    local consumer_conf = consumer_mod.plugin(plugin_name)
    if not consumer_conf then
        return 401, {message = "Missing related consumer"}
    end
    local consumers = lrucache("consumers_key", consumer_conf.conf_version,
        create_consumer_cache, consumer_conf)
    local consumer = consumers[userdn]
    if not consumer then
        return 401, {message = "Invalid API key in request"}
    end
    consumer_mod.attach_consumer(ctx, consumer, consumer_conf)

    core.log.info("hit basic-auth access")
end

return _M

使用与代码对照分析

安全防护之api-breaker

代码实现

  • 初始化
local schema = {
    type = "object",
    properties = {
        break_response_code = {
            type = "integer",
            minimum = 200,
            maximum = 599,
        },
        max_breaker_sec = {
            type = "integer",
            minimum = 3,
            default = 300,
        },
        unhealthy = {
            type = "object",
            properties = {
                http_statuses = {
                    type = "array",
                    minItems = 1,
                    items = {
                        type = "integer",
                        minimum = 500,
                        maximum = 599,
                    },
                    uniqueItems = true,
                    default = {500}
                },
                failures = {
                    type = "integer",
                    minimum = 1,
                    default = 3,
                }
            },
            default = {http_statuses = {500}, failures = 3}
        },
        healthy = {
            type = "object",
            properties = {
                http_statuses = {
                    type = "array",
                    minItems = 1,
                    items = {
                        type = "integer",
                        minimum = 200,
                        maximum = 499,
                    },
                    uniqueItems = true,
                    default = {200}
                },
                successes = {
                    type = "integer",
                    minimum = 1,
                    default = 3,
                }
            },
            default = {http_statuses = {200}, successes = 3}
        }
    },
    required = {"break_response_code"},
}

local _M = {
    version = 0.1,
    name = plugin_name,
    priority = 1005,
    schema = schema,
}
  • 检测配置是否合规
function _M.check_schema(conf)
    return core.schema.check(schema, conf)
end
  • 获取请求中的一些内容
local function gen_healthy_key(ctx)
    return "healthy-" .. core.request.get_host(ctx) .. ctx.var.uri
end


local function gen_unhealthy_key(ctx)
    return "unhealthy-" .. core.request.get_host(ctx) .. ctx.var.uri
end


local function gen_lasttime_key(ctx)
    return "unhealthy-lasttime" .. core.request.get_host(ctx) .. ctx.var.uri
end

这里面主要使用core.request.get_host来获取host和ctx.var.uri获取请求的uri信息。这三个方法除了前缀不一样,其余内容都是一样的。

  • 处理逻辑


function _M.access(conf, ctx)
    local unhealthy_key = gen_unhealthy_key(ctx)
    -- unhealthy counts
    local unhealthy_count, err = shared_buffer:get(unhealthy_key)
    if err then
        core.log.warn("failed to get unhealthy_key: ",
                      unhealthy_key, " err: ", err)
        return
    end

    if not unhealthy_count then
        return
    end

    -- timestamp of the last time a unhealthy state was triggered
    local lasttime_key = gen_lasttime_key(ctx)
    local lasttime, err = shared_buffer:get(lasttime_key)
    if err then
        core.log.warn("failed to get lasttime_key: ",
                      lasttime_key, " err: ", err)
        return
    end

    if not lasttime then
        return
    end

    local failure_times = math.ceil(unhealthy_count / conf.unhealthy.failures)
    if failure_times < 1 then
        failure_times = 1
    end

    -- cannot exceed the maximum value of the user configuration
    local breaker_time = 2 ^ failure_times
    if breaker_time > conf.max_breaker_sec then
        breaker_time = conf.max_breaker_sec
    end
    core.log.info("breaker_time: ", breaker_time)

    -- breaker
    if lasttime + breaker_time >= ngx.time() then
        return conf.break_response_code
    end

    return
end

主要是检查的次数,核心代码为

local failure_times = math.ceil(unhealthy_count / conf.unhealthy.failures)
if failure_times < 1 then
  failure_times = 1
end

-- cannot exceed the maximum value of the user configuration
local breaker_time = 2 ^ failure_times
if breaker_time > conf.max_breaker_sec then
  breaker_time = conf.max_breaker_sec
end
core.log.info("breaker_time: ", breaker_time)

如果达到的不健康次数超过了配置的最大次数,则就被break掉。

  • 打印出日志方法
function _M.log(conf, ctx)
    local unhealthy_key = gen_unhealthy_key(ctx)
    local healthy_key = gen_healthy_key(ctx)
    local upstream_status = core.response.get_upstream_status(ctx)

    if not upstream_status then
        return
    end

    -- unhealthy process
    if core.table.array_find(conf.unhealthy.http_statuses,
                             upstream_status)
    then
        local unhealthy_count, err = shared_buffer:incr(unhealthy_key, 1, 0)
        if err then
            core.log.warn("failed to incr unhealthy_key: ", unhealthy_key,
                          " err: ", err)
        end
        core.log.info("unhealthy_key: ", unhealthy_key, " count: ",
                      unhealthy_count)

        shared_buffer:delete(healthy_key)

        -- whether the user-configured number of failures has been reached,
        -- and if so, the timestamp for entering the unhealthy state.
        if unhealthy_count % conf.unhealthy.failures == 0 then
            shared_buffer:set(gen_lasttime_key(ctx), ngx.time(),
                              conf.max_breaker_sec)
            core.log.info("update unhealthy_key: ", unhealthy_key, " to ",
                          unhealthy_count)
        end

        return
    end

    -- health process
    if not core.table.array_find(conf.healthy.http_statuses, upstream_status) then
        return
    end

    local unhealthy_count, err = shared_buffer:get(unhealthy_key)
    if err then
        core.log.warn("failed to `get` unhealthy_key: ", unhealthy_key,
                      " err: ", err)
    end

    if not unhealthy_count then
        return
    end

    local healthy_count, err = shared_buffer:incr(healthy_key, 1, 0)
    if err then
        core.log.warn("failed to `incr` healthy_key: ", healthy_key,
                      " err: ", err)
    end

    -- clear related status
    if healthy_count >= conf.healthy.successes then
        -- stat change to normal
        core.log.info("change to normal, ", healthy_key, " ", healthy_count)
        shared_buffer:delete(gen_lasttime_key(ctx))
        shared_buffer:delete(unhealthy_key)
        shared_buffer:delete(healthy_key)
    end

    return
end
目录
相关文章
|
5月前
|
负载均衡 应用服务中间件 API
Nginx、Kong、Apisix、Gateway网关比较
Nginx、Kong、Apisix、Gateway网关比较
800 1
Nginx、Kong、Apisix、Gateway网关比较
|
1月前
|
人工智能 Serverless API
一键服务化:从魔搭开源模型到OpenAI API服务
在多样化大模型的背后,OpenAI得益于在领域的先发优势,其API接口今天也成为了业界的一个事实标准。
一键服务化:从魔搭开源模型到OpenAI API服务
|
9天前
|
编解码 自然语言处理 机器人
通义千问Qwen2-VL开源,API可直接调用!
通义千问宣布开源第二代视觉语言模型Qwen2-VL,并推出2B、7B两个尺寸及其量化版本模型。同时,旗舰模型Qwen2-VL-72B的API已上线阿里云百炼平台,用户可直接调用。
102 7
|
28天前
|
前端开发 关系型数据库 MySQL
ThingsGateway:一款基于.NET8开源的跨平台高性能边缘采集网关
ThingsGateway:一款基于.NET8开源的跨平台高性能边缘采集网关
|
2月前
|
运维 监控 Cloud Native
|
2月前
|
缓存 JavaScript 前端开发
为开源项目 go-gin-api 增加 WebSocket 模块
为开源项目 go-gin-api 增加 WebSocket 模块
34 2
|
3月前
|
JSON Shell API
阿里云PAI-Stable Diffusion开源代码浅析之所有api的入参如何看
阿里云PAI-Stable Diffusion开源代码浅析之所有api的入参如何看
|
3月前
|
分布式计算 API 对象存储
Ray是一个开源的分布式计算框架,用于构建和扩展分布式应用。它提供了简单的API,使得开发者可以轻松地编写并行和分布式代码,而无需担心底层的复杂性。
Ray是一个开源的分布式计算框架,用于构建和扩展分布式应用。它提供了简单的API,使得开发者可以轻松地编写并行和分布式代码,而无需担心底层的复杂性。
354 11
|
2月前
|
消息中间件 关系型数据库 MySQL
为开源项目 go-gin-api 增加后台任务模块
为开源项目 go-gin-api 增加后台任务模块
23 0
|
3月前
|
自然语言处理 PyTorch API
`transformers`库是Hugging Face提供的一个开源库,它包含了大量的预训练模型和方便的API,用于自然语言处理(NLP)任务。在文本生成任务中,`transformers`库提供了许多预训练的生成模型,如GPT系列、T5、BART等。这些模型可以通过`pipeline()`函数方便地加载和使用,而`generate()`函数则是用于生成文本的核心函数。
`transformers`库是Hugging Face提供的一个开源库,它包含了大量的预训练模型和方便的API,用于自然语言处理(NLP)任务。在文本生成任务中,`transformers`库提供了许多预训练的生成模型,如GPT系列、T5、BART等。这些模型可以通过`pipeline()`函数方便地加载和使用,而`generate()`函数则是用于生成文本的核心函数。
下一篇
无影云桌面