开源API网关APISIX源码分析(一)

简介: 开源API网关APISIX源码分析

APISIX主框架代码分析

apisix.core

core.schema

配置文件与配置模板进行对比,看是否满足条件

core.table

对lua自带table的扩展,增加了一些功能

core.log

使用了nginx 的errlog模块,估计是将结果输出到nginx的errlog中

core.json

对json处理,主要用到了cjson和dkjson。

core.request

对ngx.req的封装

core.response

对ngx.resp的封装

core.utils

封装的工具

core.lrucache

对resty.lrucache的封装

插件编写分析

现有插件分析

身份验证之ldap-auth

代码实现

  • 定义基本信息
local schema = {
    type = "object",
    title = "work with route or service object",
    properties = {
        base_dn = { type = "string" },
        ldap_uri = { type = "string" },
        use_tls = { type = "boolean" },
        uid = { type = "string" }
    },
    required = {"base_dn","ldap_uri"},
}

local consumer_schema = {
    type = "object",
    title = "work with consumer object",
    properties = {
        user_dn = { type = "string" },
    },
    required = {"user_dn"},
}

local plugin_name = "ldap-auth"

local _M = {
    version = 0.1,
    priority = 2540,
    type = 'auth',
    name = plugin_name,
    schema = schema,
    consumer_schema = consumer_schema
}

插件的配置信息,包含版本,作用的优先级(插件类似于中间件是堆叠在上面的,优先级指定了谁先执行谁后执行),类型为auth,schema和consumer_schema定义了自己的配置和后续的配置。

  • 检查配置的方法
function _M.check_schema(conf, schema_type)
    local ok, err
    if schema_type == core.schema.TYPE_CONSUMER then
        ok, err = core.schema.check(consumer_schema, conf)
    else
        ok, err = core.schema.check(schema, conf)
    end

    return ok, err
end

这里用到了core里面的方法,这里的core是

local core = require("apisix.core")

即apisix的主框架代码,这部分内容,在上面已经提到了,搞忘了实现原理的朋友可以回看一下。

  • 构造消费者缓存
local create_consumer_cache
do
    local consumer_names = {}

    function create_consumer_cache(consumers)
        core.table.clear(consumer_names)

        for _, consumer in ipairs(consumers.nodes) do
            core.log.info("consumer node: ", core.json.delay_encode(consumer))
            consumer_names[consumer.auth_conf.user_dn] = consumer
        end

        return consumer_names
    end

end
  • 匹配并处理授权的header
local function extract_auth_header(authorization)
    local obj = { username = "", password = "" }

    local m, err = ngx.re.match(authorization, "Basic\\s(.+)", "jo")
    if err then
        -- error authorization
        return nil, err
    end

    if not m then
        return nil, "Invalid authorization header format"
    end

    local decoded = ngx.decode_base64(m[1])

    if not decoded then
        return nil, "Failed to decode authentication header: " .. m[1]
    end

    local res
    res, err = ngx_re.split(decoded, ":")
    if err then
        return nil, "Split authorization err:" .. err
    end
    if #res < 2 then
        return nil, "Split authorization err: invalid decoded data: " .. decoded
    end

    obj.username = ngx.re.gsub(res[1], "\\s+", "", "jo")
    obj.password = ngx.re.gsub(res[2], "\\s+", "", "jo")

    return obj, nil
end
校验并重写响应
function _M.rewrite(conf, ctx)
    core.log.info("plugin rewrite phase, conf: ", core.json.delay_encode(conf))

    -- 1. extract authorization from header
    local auth_header = core.request.header(ctx, "Authorization")
    if not auth_header then
        core.response.set_header("WWW-Authenticate", "Basic realm='.'")
        return 401, { message = "Missing authorization in request" }
    end

    local user, err = extract_auth_header(auth_header)
    if err then
        return 401, { message = err }
    end

    -- 2. try authenticate the user against the ldap server
    local uid = conf.uid or "cn"

    local userdn =  uid .. "=" .. user.username .. "," .. conf.base_dn
    local ld = lualdap.open_simple (conf.ldap_uri, userdn, user.password, conf.use_tls)
    if not ld then
        return 401, { message = "Invalid user authorization" }
    end

    -- 3. Retrieve consumer for authorization plugin
    local consumer_conf = consumer_mod.plugin(plugin_name)
    if not consumer_conf then
        return 401, {message = "Missing related consumer"}
    end
    local consumers = lrucache("consumers_key", consumer_conf.conf_version,
        create_consumer_cache, consumer_conf)
    local consumer = consumers[userdn]
    if not consumer then
        return 401, {message = "Invalid API key in request"}
    end
    consumer_mod.attach_consumer(ctx, consumer, consumer_conf)

    core.log.info("hit basic-auth access")
end

return _M

使用与代码对照分析

安全防护之api-breaker

代码实现

  • 初始化
local schema = {
    type = "object",
    properties = {
        break_response_code = {
            type = "integer",
            minimum = 200,
            maximum = 599,
        },
        max_breaker_sec = {
            type = "integer",
            minimum = 3,
            default = 300,
        },
        unhealthy = {
            type = "object",
            properties = {
                http_statuses = {
                    type = "array",
                    minItems = 1,
                    items = {
                        type = "integer",
                        minimum = 500,
                        maximum = 599,
                    },
                    uniqueItems = true,
                    default = {500}
                },
                failures = {
                    type = "integer",
                    minimum = 1,
                    default = 3,
                }
            },
            default = {http_statuses = {500}, failures = 3}
        },
        healthy = {
            type = "object",
            properties = {
                http_statuses = {
                    type = "array",
                    minItems = 1,
                    items = {
                        type = "integer",
                        minimum = 200,
                        maximum = 499,
                    },
                    uniqueItems = true,
                    default = {200}
                },
                successes = {
                    type = "integer",
                    minimum = 1,
                    default = 3,
                }
            },
            default = {http_statuses = {200}, successes = 3}
        }
    },
    required = {"break_response_code"},
}

local _M = {
    version = 0.1,
    name = plugin_name,
    priority = 1005,
    schema = schema,
}
  • 检测配置是否合规
function _M.check_schema(conf)
    return core.schema.check(schema, conf)
end
  • 获取请求中的一些内容
local function gen_healthy_key(ctx)
    return "healthy-" .. core.request.get_host(ctx) .. ctx.var.uri
end


local function gen_unhealthy_key(ctx)
    return "unhealthy-" .. core.request.get_host(ctx) .. ctx.var.uri
end


local function gen_lasttime_key(ctx)
    return "unhealthy-lasttime" .. core.request.get_host(ctx) .. ctx.var.uri
end

这里面主要使用core.request.get_host来获取host和ctx.var.uri获取请求的uri信息。这三个方法除了前缀不一样,其余内容都是一样的。

  • 处理逻辑


function _M.access(conf, ctx)
    local unhealthy_key = gen_unhealthy_key(ctx)
    -- unhealthy counts
    local unhealthy_count, err = shared_buffer:get(unhealthy_key)
    if err then
        core.log.warn("failed to get unhealthy_key: ",
                      unhealthy_key, " err: ", err)
        return
    end

    if not unhealthy_count then
        return
    end

    -- timestamp of the last time a unhealthy state was triggered
    local lasttime_key = gen_lasttime_key(ctx)
    local lasttime, err = shared_buffer:get(lasttime_key)
    if err then
        core.log.warn("failed to get lasttime_key: ",
                      lasttime_key, " err: ", err)
        return
    end

    if not lasttime then
        return
    end

    local failure_times = math.ceil(unhealthy_count / conf.unhealthy.failures)
    if failure_times < 1 then
        failure_times = 1
    end

    -- cannot exceed the maximum value of the user configuration
    local breaker_time = 2 ^ failure_times
    if breaker_time > conf.max_breaker_sec then
        breaker_time = conf.max_breaker_sec
    end
    core.log.info("breaker_time: ", breaker_time)

    -- breaker
    if lasttime + breaker_time >= ngx.time() then
        return conf.break_response_code
    end

    return
end

主要是检查的次数,核心代码为

local failure_times = math.ceil(unhealthy_count / conf.unhealthy.failures)
if failure_times < 1 then
  failure_times = 1
end

-- cannot exceed the maximum value of the user configuration
local breaker_time = 2 ^ failure_times
if breaker_time > conf.max_breaker_sec then
  breaker_time = conf.max_breaker_sec
end
core.log.info("breaker_time: ", breaker_time)

如果达到的不健康次数超过了配置的最大次数,则就被break掉。

  • 打印出日志方法
function _M.log(conf, ctx)
    local unhealthy_key = gen_unhealthy_key(ctx)
    local healthy_key = gen_healthy_key(ctx)
    local upstream_status = core.response.get_upstream_status(ctx)

    if not upstream_status then
        return
    end

    -- unhealthy process
    if core.table.array_find(conf.unhealthy.http_statuses,
                             upstream_status)
    then
        local unhealthy_count, err = shared_buffer:incr(unhealthy_key, 1, 0)
        if err then
            core.log.warn("failed to incr unhealthy_key: ", unhealthy_key,
                          " err: ", err)
        end
        core.log.info("unhealthy_key: ", unhealthy_key, " count: ",
                      unhealthy_count)

        shared_buffer:delete(healthy_key)

        -- whether the user-configured number of failures has been reached,
        -- and if so, the timestamp for entering the unhealthy state.
        if unhealthy_count % conf.unhealthy.failures == 0 then
            shared_buffer:set(gen_lasttime_key(ctx), ngx.time(),
                              conf.max_breaker_sec)
            core.log.info("update unhealthy_key: ", unhealthy_key, " to ",
                          unhealthy_count)
        end

        return
    end

    -- health process
    if not core.table.array_find(conf.healthy.http_statuses, upstream_status) then
        return
    end

    local unhealthy_count, err = shared_buffer:get(unhealthy_key)
    if err then
        core.log.warn("failed to `get` unhealthy_key: ", unhealthy_key,
                      " err: ", err)
    end

    if not unhealthy_count then
        return
    end

    local healthy_count, err = shared_buffer:incr(healthy_key, 1, 0)
    if err then
        core.log.warn("failed to `incr` healthy_key: ", healthy_key,
                      " err: ", err)
    end

    -- clear related status
    if healthy_count >= conf.healthy.successes then
        -- stat change to normal
        core.log.info("change to normal, ", healthy_key, " ", healthy_count)
        shared_buffer:delete(gen_lasttime_key(ctx))
        shared_buffer:delete(unhealthy_key)
        shared_buffer:delete(healthy_key)
    end

    return
end
目录
相关文章
|
1月前
|
存储 SQL API
milvus insert api流程源码分析
milvus insert api流程源码分析
40 3
|
30天前
|
人工智能 JavaScript API
互联网人的福利!『昆仑天工』4款AI产品开源!提供API对接!
互联网人的福利!『昆仑天工』4款AI产品开源!提供API对接!
162 0
|
1月前
|
Web App开发 监控 中间件
【开源视频联动物联网平台】视频接入网关的用法
【开源视频联动物联网平台】视频接入网关的用法
35 1
|
1月前
|
传感器 边缘计算 物联网
【开源视频联动物联网平台】为什么需要物联网网关?
【开源视频联动物联网平台】为什么需要物联网网关?
27 2
|
2月前
|
存储 API
milvus insert api的数据结构源码分析
milvus insert api的数据结构源码分析
823 6
milvus insert api的数据结构源码分析
|
2月前
|
缓存 安全 应用服务中间件
开源API网关APISIX源码分析(二)
开源API网关APISIX源码分析(二)
52 0
|
1月前
|
运维 网络协议 安全
长连接网关技术专题(十):百度基于Go的千万级统一长连接服务架构实践
本文将介绍百度基于golang实现的统一长连接服务,从统一长连接功能实现和性能优化等角度,描述了其在设计、开发和维护过程中面临的问题和挑战,并重点介绍了解决相关问题和挑战的方案和实践经验。
73 1
|
5月前
|
负载均衡 应用服务中间件 API
微服务技术系列教程(25) - SpringCloud- 接口网关服务Zuul
微服务技术系列教程(25) - SpringCloud- 接口网关服务Zuul
56 0
|
4月前
|
负载均衡 Cloud Native Java
【云原生】Spring Cloud Alibaba 之 Gateway 服务网关实战开发
【云原生】Spring Cloud Alibaba 之 Gateway 服务网关实战开发
326 0
|
2月前
|
缓存 安全 API
【亿级数据专题】「高并发架构」盘点本年度探索对外服务的百万请求量的API网关设计实现
公司对外开放的OpenAPI-Server服务,作为核心内部系统与外部系统之间的重要通讯枢纽,每天处理数百万次的API调用、亿级别的消息推送以及TB/PB级别的数据同步。经过多年流量的持续增长,该服务体系依然稳固可靠,展现出强大的负载能力。
55 9
【亿级数据专题】「高并发架构」盘点本年度探索对外服务的百万请求量的API网关设计实现

热门文章

最新文章