APISIX主框架代码分析
apisix.core
core.schema
配置文件与配置模板进行对比,看是否满足条件
core.table
对lua自带table的扩展,增加了一些功能
core.log
使用了nginx 的errlog模块,估计是将结果输出到nginx的errlog中
core.json
对json处理,主要用到了cjson和dkjson。
core.request
对ngx.req的封装
core.response
对ngx.resp的封装
core.utils
封装的工具
core.lrucache
对resty.lrucache的封装
插件编写分析
现有插件分析
身份验证之ldap-auth
代码实现
- 定义基本信息
local schema = { type = "object", title = "work with route or service object", properties = { base_dn = { type = "string" }, ldap_uri = { type = "string" }, use_tls = { type = "boolean" }, uid = { type = "string" } }, required = {"base_dn","ldap_uri"}, } local consumer_schema = { type = "object", title = "work with consumer object", properties = { user_dn = { type = "string" }, }, required = {"user_dn"}, } local plugin_name = "ldap-auth" local _M = { version = 0.1, priority = 2540, type = 'auth', name = plugin_name, schema = schema, consumer_schema = consumer_schema }
插件的配置信息,包含版本,作用的优先级(插件类似于中间件是堆叠在上面的,优先级指定了谁先执行谁后执行),类型为auth,schema和consumer_schema定义了自己的配置和后续的配置。
- 检查配置的方法
function _M.check_schema(conf, schema_type) local ok, err if schema_type == core.schema.TYPE_CONSUMER then ok, err = core.schema.check(consumer_schema, conf) else ok, err = core.schema.check(schema, conf) end return ok, err end
这里用到了core里面的方法,这里的core是
local core = require("apisix.core")
即apisix的主框架代码,这部分内容,在上面已经提到了,搞忘了实现原理的朋友可以回看一下。
- 构造消费者缓存
local create_consumer_cache do local consumer_names = {} function create_consumer_cache(consumers) core.table.clear(consumer_names) for _, consumer in ipairs(consumers.nodes) do core.log.info("consumer node: ", core.json.delay_encode(consumer)) consumer_names[consumer.auth_conf.user_dn] = consumer end return consumer_names end end
- 匹配并处理授权的header
local function extract_auth_header(authorization) local obj = { username = "", password = "" } local m, err = ngx.re.match(authorization, "Basic\\s(.+)", "jo") if err then -- error authorization return nil, err end if not m then return nil, "Invalid authorization header format" end local decoded = ngx.decode_base64(m[1]) if not decoded then return nil, "Failed to decode authentication header: " .. m[1] end local res res, err = ngx_re.split(decoded, ":") if err then return nil, "Split authorization err:" .. err end if #res < 2 then return nil, "Split authorization err: invalid decoded data: " .. decoded end obj.username = ngx.re.gsub(res[1], "\\s+", "", "jo") obj.password = ngx.re.gsub(res[2], "\\s+", "", "jo") return obj, nil end 校验并重写响应 function _M.rewrite(conf, ctx) core.log.info("plugin rewrite phase, conf: ", core.json.delay_encode(conf)) -- 1. extract authorization from header local auth_header = core.request.header(ctx, "Authorization") if not auth_header then core.response.set_header("WWW-Authenticate", "Basic realm='.'") return 401, { message = "Missing authorization in request" } end local user, err = extract_auth_header(auth_header) if err then return 401, { message = err } end -- 2. try authenticate the user against the ldap server local uid = conf.uid or "cn" local userdn = uid .. "=" .. user.username .. "," .. conf.base_dn local ld = lualdap.open_simple (conf.ldap_uri, userdn, user.password, conf.use_tls) if not ld then return 401, { message = "Invalid user authorization" } end -- 3. Retrieve consumer for authorization plugin local consumer_conf = consumer_mod.plugin(plugin_name) if not consumer_conf then return 401, {message = "Missing related consumer"} end local consumers = lrucache("consumers_key", consumer_conf.conf_version, create_consumer_cache, consumer_conf) local consumer = consumers[userdn] if not consumer then return 401, {message = "Invalid API key in request"} end consumer_mod.attach_consumer(ctx, consumer, consumer_conf) core.log.info("hit basic-auth access") end return _M
使用与代码对照分析
安全防护之api-breaker
代码实现
- 初始化
local schema = { type = "object", properties = { break_response_code = { type = "integer", minimum = 200, maximum = 599, }, max_breaker_sec = { type = "integer", minimum = 3, default = 300, }, unhealthy = { type = "object", properties = { http_statuses = { type = "array", minItems = 1, items = { type = "integer", minimum = 500, maximum = 599, }, uniqueItems = true, default = {500} }, failures = { type = "integer", minimum = 1, default = 3, } }, default = {http_statuses = {500}, failures = 3} }, healthy = { type = "object", properties = { http_statuses = { type = "array", minItems = 1, items = { type = "integer", minimum = 200, maximum = 499, }, uniqueItems = true, default = {200} }, successes = { type = "integer", minimum = 1, default = 3, } }, default = {http_statuses = {200}, successes = 3} } }, required = {"break_response_code"}, } local _M = { version = 0.1, name = plugin_name, priority = 1005, schema = schema, }
- 检测配置是否合规
function _M.check_schema(conf) return core.schema.check(schema, conf) end
- 获取请求中的一些内容
local function gen_healthy_key(ctx) return "healthy-" .. core.request.get_host(ctx) .. ctx.var.uri end local function gen_unhealthy_key(ctx) return "unhealthy-" .. core.request.get_host(ctx) .. ctx.var.uri end local function gen_lasttime_key(ctx) return "unhealthy-lasttime" .. core.request.get_host(ctx) .. ctx.var.uri end
这里面主要使用core.request.get_host来获取host和ctx.var.uri获取请求的uri信息。这三个方法除了前缀不一样,其余内容都是一样的。
- 处理逻辑
function _M.access(conf, ctx) local unhealthy_key = gen_unhealthy_key(ctx) -- unhealthy counts local unhealthy_count, err = shared_buffer:get(unhealthy_key) if err then core.log.warn("failed to get unhealthy_key: ", unhealthy_key, " err: ", err) return end if not unhealthy_count then return end -- timestamp of the last time a unhealthy state was triggered local lasttime_key = gen_lasttime_key(ctx) local lasttime, err = shared_buffer:get(lasttime_key) if err then core.log.warn("failed to get lasttime_key: ", lasttime_key, " err: ", err) return end if not lasttime then return end local failure_times = math.ceil(unhealthy_count / conf.unhealthy.failures) if failure_times < 1 then failure_times = 1 end -- cannot exceed the maximum value of the user configuration local breaker_time = 2 ^ failure_times if breaker_time > conf.max_breaker_sec then breaker_time = conf.max_breaker_sec end core.log.info("breaker_time: ", breaker_time) -- breaker if lasttime + breaker_time >= ngx.time() then return conf.break_response_code end return end
主要是检查的次数,核心代码为
local failure_times = math.ceil(unhealthy_count / conf.unhealthy.failures) if failure_times < 1 then failure_times = 1 end -- cannot exceed the maximum value of the user configuration local breaker_time = 2 ^ failure_times if breaker_time > conf.max_breaker_sec then breaker_time = conf.max_breaker_sec end core.log.info("breaker_time: ", breaker_time)
如果达到的不健康次数超过了配置的最大次数,则就被break掉。
- 打印出日志方法
function _M.log(conf, ctx) local unhealthy_key = gen_unhealthy_key(ctx) local healthy_key = gen_healthy_key(ctx) local upstream_status = core.response.get_upstream_status(ctx) if not upstream_status then return end -- unhealthy process if core.table.array_find(conf.unhealthy.http_statuses, upstream_status) then local unhealthy_count, err = shared_buffer:incr(unhealthy_key, 1, 0) if err then core.log.warn("failed to incr unhealthy_key: ", unhealthy_key, " err: ", err) end core.log.info("unhealthy_key: ", unhealthy_key, " count: ", unhealthy_count) shared_buffer:delete(healthy_key) -- whether the user-configured number of failures has been reached, -- and if so, the timestamp for entering the unhealthy state. if unhealthy_count % conf.unhealthy.failures == 0 then shared_buffer:set(gen_lasttime_key(ctx), ngx.time(), conf.max_breaker_sec) core.log.info("update unhealthy_key: ", unhealthy_key, " to ", unhealthy_count) end return end -- health process if not core.table.array_find(conf.healthy.http_statuses, upstream_status) then return end local unhealthy_count, err = shared_buffer:get(unhealthy_key) if err then core.log.warn("failed to `get` unhealthy_key: ", unhealthy_key, " err: ", err) end if not unhealthy_count then return end local healthy_count, err = shared_buffer:incr(healthy_key, 1, 0) if err then core.log.warn("failed to `incr` healthy_key: ", healthy_key, " err: ", err) end -- clear related status if healthy_count >= conf.healthy.successes then -- stat change to normal core.log.info("change to normal, ", healthy_key, " ", healthy_count) shared_buffer:delete(gen_lasttime_key(ctx)) shared_buffer:delete(unhealthy_key) shared_buffer:delete(healthy_key) end return end