local discovery_type = local_conf.discovery for discovery_name, _ in pairs(discovery_type) do discovery[discovery_name].init_worker() end
我们可以在 discovery 目录找到对应的服务发现的实现。服务发现需要实现两个接口。
1 2 3 4
local _M = {} function _M.nodes(service_name) function _M.init_worker() return _M
为了看分析比较简单,可以查看 discovery/dns.lua 的实现。
init_woker 函数初始化了 dns_client 。
nodes 函数通过 dns 服务发现来获取 service_name 对应的服务。并将返回的内容转换成 apisix 的的格式。
1 2 3 4 5
nodes[i] = {host = r.address, weight = r.weight or1, port = r.port or port} if r.priority then -- for SRV record, nodes with lower priority are chosen first nodes[i].priority = -r.priority end
...... // 获取 serial number serial = X509_get_serialNumber(ctx->cert); if (serial->length > 20) { return NGX_ERROR; } // 将证书得 serial 复制到p中 p = ngx_cpymem(p, serial->data, serial->length); ngx_memzero(p, 20 - serial->length);
local ngx_shared = ngx.shared localpairs = pairs local ngx = ngx localerror = error localsetmetatable = setmetatable localtonumber = tonumber -- table 清理函数 local clear_tab do local ok ok, clear_tab = pcall(require, "table.clear") ifnot ok then clear_tab = function(tab) for k inpairs(tab) do tab[k] = nil end end end end
local _M = { _VERSION = '0.2.1' } local mt = { __index = _M }
-- local cache of counters increments -- worker 全局存储,key 为 shdict_name ,value 为对应的 table -- <shdict_name>={} local increments = {} -- boolean flags of per worker sync timers -- 用来记录是否开启的自动同步 timer_started[shdict_name]=true local timer_started = {}
local id
-- 同步函数: 将 worker 的计数器同步到 ngx.shared 中 localfunctionsync(_, self) local err, _, forcible local ok = true -- 循环 worker 全局字典,将 increments 的计数同步到 dict 中 for k, v inpairs(self.increments) do _, err, forcible = self.dict:incr(k, v, 0) if forcible then ngx.log(ngx.ERR, "increasing counter in shdict: lru eviction: key=", k) ok = false end if err then ngx.log(ngx.ERR, "error increasing counter in shdict key: ", k, ", err: ", err) ok = false end end -- 同步完成后清理字典,并设置 error_metric_name 为 1 clear_tab(self.increments) if ok == falsethen self.dict:incr(self.error_metric_name, 1, 0) end
return ok end
function_M.new(shdict_name, sync_interval, error_metric_name) id = ngx.worker.id() ifnot ngx_shared[shdict_name] then error("shared dict \"" .. (shdict_name or"nil") .. "\" not defined", 2) end
ifnot increments[shdict_name] then increments[shdict_name] = {} end -- 构建当前对象的计数器 localself = setmetatable({ dict = ngx_shared[shdict_name], increments = increments[shdict_name], error_metric_name = error_metric_name, }, mt) -- 更具设置的同步时间,设置定时同步 ngx.timer.every if sync_interval then sync_interval = tonumber(sync_interval) ifnot sync_interval or sync_interval < 0then error("expect sync_interval to be a positive number", 2) end ifnot timer_started[shdict_name] then ngx.log(ngx.DEBUG, "start timer for shdict ", shdict_name, " on worker ", id) ngx.timer.every(sync_interval, sync, self) timer_started[shdict_name] = true end end
returnself end -- 主动同步内容 function_M:sync() return sync(false, self) end -- incr 只执行 increments 的累加 function_M:incr(key, step) step = step or1 local v = self.increments[key] if v then step = step + v end
self.increments[key] = step returntrue end -- reset 清理 dict function_M:reset(key, number) ifnot number then returnnil, "expect a number at #2" end returnself.dict:incr(key, -number, number) end -- get 获取 dict 的内容 function_M:get(key) returnself.dict:get(key) end
function_M:get_keys(max_count) returnself.dict:get_keys(max_count) end