开源API网关APISIX源码分析(一)

本文涉及的产品
云原生 API 网关,700元额度,多规格可选
简介: 开源API网关APISIX源码分析

APISIX主框架代码分析

apisix.core

core.schema

配置文件与配置模板进行对比,看是否满足条件

core.table

对lua自带table的扩展,增加了一些功能

core.log

使用了nginx 的errlog模块,估计是将结果输出到nginx的errlog中

core.json

对json处理,主要用到了cjson和dkjson。

core.request

对ngx.req的封装

core.response

对ngx.resp的封装

core.utils

封装的工具

core.lrucache

对resty.lrucache的封装

插件编写分析

现有插件分析

身份验证之ldap-auth

代码实现

  • 定义基本信息
local schema = {
    type = "object",
    title = "work with route or service object",
    properties = {
        base_dn = { type = "string" },
        ldap_uri = { type = "string" },
        use_tls = { type = "boolean" },
        uid = { type = "string" }
    },
    required = {"base_dn","ldap_uri"},
}

local consumer_schema = {
    type = "object",
    title = "work with consumer object",
    properties = {
        user_dn = { type = "string" },
    },
    required = {"user_dn"},
}

local plugin_name = "ldap-auth"

local _M = {
    version = 0.1,
    priority = 2540,
    type = 'auth',
    name = plugin_name,
    schema = schema,
    consumer_schema = consumer_schema
}

插件的配置信息,包含版本,作用的优先级(插件类似于中间件是堆叠在上面的,优先级指定了谁先执行谁后执行),类型为auth,schema和consumer_schema定义了自己的配置和后续的配置。

  • 检查配置的方法
function _M.check_schema(conf, schema_type)
    local ok, err
    if schema_type == core.schema.TYPE_CONSUMER then
        ok, err = core.schema.check(consumer_schema, conf)
    else
        ok, err = core.schema.check(schema, conf)
    end

    return ok, err
end

这里用到了core里面的方法,这里的core是

local core = require("apisix.core")

即apisix的主框架代码,这部分内容,在上面已经提到了,搞忘了实现原理的朋友可以回看一下。

  • 构造消费者缓存
local create_consumer_cache
do
    local consumer_names = {}

    function create_consumer_cache(consumers)
        core.table.clear(consumer_names)

        for _, consumer in ipairs(consumers.nodes) do
            core.log.info("consumer node: ", core.json.delay_encode(consumer))
            consumer_names[consumer.auth_conf.user_dn] = consumer
        end

        return consumer_names
    end

end
  • 匹配并处理授权的header
local function extract_auth_header(authorization)
    local obj = { username = "", password = "" }

    local m, err = ngx.re.match(authorization, "Basic\\s(.+)", "jo")
    if err then
        -- error authorization
        return nil, err
    end

    if not m then
        return nil, "Invalid authorization header format"
    end

    local decoded = ngx.decode_base64(m[1])

    if not decoded then
        return nil, "Failed to decode authentication header: " .. m[1]
    end

    local res
    res, err = ngx_re.split(decoded, ":")
    if err then
        return nil, "Split authorization err:" .. err
    end
    if #res < 2 then
        return nil, "Split authorization err: invalid decoded data: " .. decoded
    end

    obj.username = ngx.re.gsub(res[1], "\\s+", "", "jo")
    obj.password = ngx.re.gsub(res[2], "\\s+", "", "jo")

    return obj, nil
end
校验并重写响应
function _M.rewrite(conf, ctx)
    core.log.info("plugin rewrite phase, conf: ", core.json.delay_encode(conf))

    -- 1. extract authorization from header
    local auth_header = core.request.header(ctx, "Authorization")
    if not auth_header then
        core.response.set_header("WWW-Authenticate", "Basic realm='.'")
        return 401, { message = "Missing authorization in request" }
    end

    local user, err = extract_auth_header(auth_header)
    if err then
        return 401, { message = err }
    end

    -- 2. try authenticate the user against the ldap server
    local uid = conf.uid or "cn"

    local userdn =  uid .. "=" .. user.username .. "," .. conf.base_dn
    local ld = lualdap.open_simple (conf.ldap_uri, userdn, user.password, conf.use_tls)
    if not ld then
        return 401, { message = "Invalid user authorization" }
    end

    -- 3. Retrieve consumer for authorization plugin
    local consumer_conf = consumer_mod.plugin(plugin_name)
    if not consumer_conf then
        return 401, {message = "Missing related consumer"}
    end
    local consumers = lrucache("consumers_key", consumer_conf.conf_version,
        create_consumer_cache, consumer_conf)
    local consumer = consumers[userdn]
    if not consumer then
        return 401, {message = "Invalid API key in request"}
    end
    consumer_mod.attach_consumer(ctx, consumer, consumer_conf)

    core.log.info("hit basic-auth access")
end

return _M

使用与代码对照分析

安全防护之api-breaker

代码实现

  • 初始化
local schema = {
    type = "object",
    properties = {
        break_response_code = {
            type = "integer",
            minimum = 200,
            maximum = 599,
        },
        max_breaker_sec = {
            type = "integer",
            minimum = 3,
            default = 300,
        },
        unhealthy = {
            type = "object",
            properties = {
                http_statuses = {
                    type = "array",
                    minItems = 1,
                    items = {
                        type = "integer",
                        minimum = 500,
                        maximum = 599,
                    },
                    uniqueItems = true,
                    default = {500}
                },
                failures = {
                    type = "integer",
                    minimum = 1,
                    default = 3,
                }
            },
            default = {http_statuses = {500}, failures = 3}
        },
        healthy = {
            type = "object",
            properties = {
                http_statuses = {
                    type = "array",
                    minItems = 1,
                    items = {
                        type = "integer",
                        minimum = 200,
                        maximum = 499,
                    },
                    uniqueItems = true,
                    default = {200}
                },
                successes = {
                    type = "integer",
                    minimum = 1,
                    default = 3,
                }
            },
            default = {http_statuses = {200}, successes = 3}
        }
    },
    required = {"break_response_code"},
}

local _M = {
    version = 0.1,
    name = plugin_name,
    priority = 1005,
    schema = schema,
}
  • 检测配置是否合规
function _M.check_schema(conf)
    return core.schema.check(schema, conf)
end
  • 获取请求中的一些内容
local function gen_healthy_key(ctx)
    return "healthy-" .. core.request.get_host(ctx) .. ctx.var.uri
end


local function gen_unhealthy_key(ctx)
    return "unhealthy-" .. core.request.get_host(ctx) .. ctx.var.uri
end


local function gen_lasttime_key(ctx)
    return "unhealthy-lasttime" .. core.request.get_host(ctx) .. ctx.var.uri
end

这里面主要使用core.request.get_host来获取host和ctx.var.uri获取请求的uri信息。这三个方法除了前缀不一样,其余内容都是一样的。

  • 处理逻辑


function _M.access(conf, ctx)
    local unhealthy_key = gen_unhealthy_key(ctx)
    -- unhealthy counts
    local unhealthy_count, err = shared_buffer:get(unhealthy_key)
    if err then
        core.log.warn("failed to get unhealthy_key: ",
                      unhealthy_key, " err: ", err)
        return
    end

    if not unhealthy_count then
        return
    end

    -- timestamp of the last time a unhealthy state was triggered
    local lasttime_key = gen_lasttime_key(ctx)
    local lasttime, err = shared_buffer:get(lasttime_key)
    if err then
        core.log.warn("failed to get lasttime_key: ",
                      lasttime_key, " err: ", err)
        return
    end

    if not lasttime then
        return
    end

    local failure_times = math.ceil(unhealthy_count / conf.unhealthy.failures)
    if failure_times < 1 then
        failure_times = 1
    end

    -- cannot exceed the maximum value of the user configuration
    local breaker_time = 2 ^ failure_times
    if breaker_time > conf.max_breaker_sec then
        breaker_time = conf.max_breaker_sec
    end
    core.log.info("breaker_time: ", breaker_time)

    -- breaker
    if lasttime + breaker_time >= ngx.time() then
        return conf.break_response_code
    end

    return
end

主要是检查的次数,核心代码为

local failure_times = math.ceil(unhealthy_count / conf.unhealthy.failures)
if failure_times < 1 then
  failure_times = 1
end

-- cannot exceed the maximum value of the user configuration
local breaker_time = 2 ^ failure_times
if breaker_time > conf.max_breaker_sec then
  breaker_time = conf.max_breaker_sec
end
core.log.info("breaker_time: ", breaker_time)

如果达到的不健康次数超过了配置的最大次数,则就被break掉。

  • 打印出日志方法
function _M.log(conf, ctx)
    local unhealthy_key = gen_unhealthy_key(ctx)
    local healthy_key = gen_healthy_key(ctx)
    local upstream_status = core.response.get_upstream_status(ctx)

    if not upstream_status then
        return
    end

    -- unhealthy process
    if core.table.array_find(conf.unhealthy.http_statuses,
                             upstream_status)
    then
        local unhealthy_count, err = shared_buffer:incr(unhealthy_key, 1, 0)
        if err then
            core.log.warn("failed to incr unhealthy_key: ", unhealthy_key,
                          " err: ", err)
        end
        core.log.info("unhealthy_key: ", unhealthy_key, " count: ",
                      unhealthy_count)

        shared_buffer:delete(healthy_key)

        -- whether the user-configured number of failures has been reached,
        -- and if so, the timestamp for entering the unhealthy state.
        if unhealthy_count % conf.unhealthy.failures == 0 then
            shared_buffer:set(gen_lasttime_key(ctx), ngx.time(),
                              conf.max_breaker_sec)
            core.log.info("update unhealthy_key: ", unhealthy_key, " to ",
                          unhealthy_count)
        end

        return
    end

    -- health process
    if not core.table.array_find(conf.healthy.http_statuses, upstream_status) then
        return
    end

    local unhealthy_count, err = shared_buffer:get(unhealthy_key)
    if err then
        core.log.warn("failed to `get` unhealthy_key: ", unhealthy_key,
                      " err: ", err)
    end

    if not unhealthy_count then
        return
    end

    local healthy_count, err = shared_buffer:incr(healthy_key, 1, 0)
    if err then
        core.log.warn("failed to `incr` healthy_key: ", healthy_key,
                      " err: ", err)
    end

    -- clear related status
    if healthy_count >= conf.healthy.successes then
        -- stat change to normal
        core.log.info("change to normal, ", healthy_key, " ", healthy_count)
        shared_buffer:delete(gen_lasttime_key(ctx))
        shared_buffer:delete(unhealthy_key)
        shared_buffer:delete(healthy_key)
    end

    return
end
目录
相关文章
|
6天前
|
人工智能 前端开发 API
Gemini Coder:基于 Google Gemini API 的开源 Web 应用生成工具,支持实时编辑和预览
Gemini Coder 是一款基于 Google Gemini API 的 AI 应用生成工具,支持通过文本描述快速生成代码,并提供实时代码编辑和预览功能,简化开发流程。
75 38
Gemini Coder:基于 Google Gemini API 的开源 Web 应用生成工具,支持实时编辑和预览
|
12天前
|
人工智能 JSON 安全
DeepSeek Engineer:集成 DeepSeek API 的开源 AI 编程助手,支持文件读取、编辑并生成结构化响应
DeepSeek Engineer 是一款开源AI编程助手,通过命令行界面处理用户对话并生成结构化JSON,支持文件操作和代码生成。
162 5
DeepSeek Engineer:集成 DeepSeek API 的开源 AI 编程助手,支持文件读取、编辑并生成结构化响应
|
21天前
|
存储 人工智能 API
AgentScope:阿里开源多智能体低代码开发平台,支持一键导出源码、多种模型API和本地模型部署
AgentScope是阿里巴巴集团开源的多智能体开发平台,旨在帮助开发者轻松构建和部署多智能体应用。该平台提供分布式支持,内置多种模型API和本地模型部署选项,支持多模态数据处理。
166 4
AgentScope:阿里开源多智能体低代码开发平台,支持一键导出源码、多种模型API和本地模型部署
|
2月前
|
监控 负载均衡 API
Apache Apisix轻松打造亿级流量Api网关
Apache APISIX 是一个动态、实时、高性能的 API 网关,提供负载均衡、动态上行、灰度发布、熔断、鉴权、可观测等丰富的流量管理功能。适用于处理传统南北向流量、服务间东西向流量及 k8s 入口控制。Airflow 是一个可编程、调度和监控的工作流平台,基于有向无环图 (DAG) 定义和执行任务,提供丰富的命令行工具和 Web 管理界面,方便系统运维和管理。
Apache Apisix轻松打造亿级流量Api网关
|
2月前
|
运维 Cloud Native Java
热联集团:从 APISIX 迁移到云原生网关
我们将核心业务系统从 IDC 全栈迁移到阿里云后,并采用了云原生 API 网关,通过其独有的软硬一体的加速方案,相比普通 HTTPS 请求 TLS 握手时延降低一倍,极限 QPS 提升 80% 以上,运维效率也提升了 50%,此外,我们把 Nacos 迁移到 MSE Nacos,稳定性、性能和运维成本等方面都具备了明显的优势。
|
2月前
|
监控 安全 API
拥抱开源:下一代API管理工具Kong的崛起
【10月更文挑战第27天】在微服务架构和API经济的推动下,API管理成为软件开发的关键环节。Kong作为开源的API管理平台,凭借其灵活性和强大功能,受到开发者的青睐。本文探讨了Kong的核心特性、使用技巧及其在企业中的应用,帮助读者更好地理解和利用这一工具。
|
2月前
|
Web App开发 人工智能 自然语言处理
WebChat:开源的网页内容增强问答 AI 助手,基于 Chrome 扩展的最佳实践开发,支持自定义 API 和本地大模型
WebChat 是一个基于 Chrome 扩展开发的 AI 助手,能够帮助用户理解和分析当前网页的内容,支持自定义 API 和本地大模型。
151 0
|
4月前
|
人工智能 Serverless API
一键服务化:从魔搭开源模型到OpenAI API服务
在多样化大模型的背后,OpenAI得益于在领域的先发优势,其API接口今天也成为了业界的一个事实标准。
一键服务化:从魔搭开源模型到OpenAI API服务
|
4月前
|
编解码 自然语言处理 机器人
通义千问Qwen2-VL开源,API可直接调用!
通义千问宣布开源第二代视觉语言模型Qwen2-VL,并推出2B、7B两个尺寸及其量化版本模型。同时,旗舰模型Qwen2-VL-72B的API已上线阿里云百炼平台,用户可直接调用。
2155 9
|
3月前
|
监控 API 数据安全/隐私保护
2024年开源API工具盘点,覆盖API全生命周期
2024年经济持续低迷,本文整理一些免费的开源工具,旨在帮助企业组织降低工具的支出成本,能用免费的何必用付费的呢(狗头)?
109 0

热门文章

最新文章