apisix源码分析- 插件 prometheus 支持

apisix源码分析- 插件 prometheus 支持

用到的库 https://github.com/knyar/nginx-lua-prometheus

apisix/prometheus/exporter.lua

变量 metrics 用来定义指标,

prmetheus_keys

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
KeyIndex.new(shared_dict, prefix)
self.dict = shared_dict
self.key_prefix = prefix .. "key_"
self.delete_count = prefix .. "delete_count"
self.key_count = prefix .. "key_count"
self.last = 0
self.deleted = 0
self.keys = {}
self.index = {}

KeyIndex:sync()
获取 dict delete_count 和 dict key数量 N
如果本地的 delete 和 delete_count 不同,说明有删除
执行 sync_range(0,N)
如果当前的 last 和 N 不同说明有更新,那么执行 sync_reange(N,last)
并返回 N

KeyIndex:sync_range(first, last)
从 dict 同步 first 到 last 区间的内容到当前对象
从 dict 获取 keyi
记录映射关系 keys[i] = key ,index[key]=i
如果 keyi 是不存在的,从映射关系删除内容

KeyIndex:list()
执行同步 sync()
并返回 keys 的 copy

KeyIndex:add(key_or_keys, err_msg_lru_eviction)
对每一个 key 执行:
同步并获取N
如果 key 存在,就跳过
如果 key 不存在就设置 dict N+1 为 key
key_count ++
设置 keys 和 index

KeyIndex:remove(key, err_msg_lru_eviction)


总结:
利用 dict 存储数据,利用 sync 将数据同步到 worker 进程的函数

prometheus

ngx.sleep(0)

OpenResty 中的 ngx.sleep(0) 调用会主动放弃当前的 CPU 执行权,而把执行权交还给 nginx 事件循环和其他并发请求。当前
yield 了的 Lua 协程会在下一个 nginx 事件处理周期里接着继续运行。

一个典型应用:

shared-dict怎样实现不阻塞nginx进程?每分钟刷新一次缓存,刷新的时候,几十秒,nginx无反应,单个进程100%,其他进程没事,但是也无反应,好像访问的时候也到这个100%的进程了,导致无反应。

解决方法是:可以在操作中穿插一些 ngx.sleep(0) 用来让其他协程获得运行机会。

prometheus 里有大量的调用 dict ,执行 reload 后大量的同步会导致阻塞。

init:

dict : 原始全局字典


_counter : 用来记录 worker 计数器,并同步到dict


key_index: 用来将dict key 同步到 worker 中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
register(self, name, help, label_names, buckets, TYPE_HISTOGRAM)
构建指标需要的数据,对传入的name 去掉 _bucket _sum _count 的后缀
当为 historgram 时 name 或 name_maybe_historgram 存在
当为其他时, name 或者加入了后缀的存在都认为是指标存在,返回错误。
label_names :label 名列表
label_count: label 数量
lookup = {} :用table 来存储指标树
例如 ['me.com']['200'][LEAF_KEY] = 'http_count{host="me.com",status="200"}'

yield :执行读写 dict 次数 200 次就释放一次 cpu 时间片占用

lookup_or_create(self, label_values)
返回完整的指标名 counter 和 gauge 直接返回字符串
histogram 返回 list
如果 lookup_size > lookup_max_size ,清空 lookup
inc_gauge: dict incr


用特权进程来提供服务

想办法让指标就是排好序

https://mp.weixin.qq.com/s/guR77q6kXxpGfKzT46GYsg

  1. 指标名存储到对应的 table 树里面,定时同步
  2. 输出只有循环和取 dict
apisix源码分析-减少对对象的访问 ctx 缓存(9)

apisix源码分析-减少对对象的访问 ctx 缓存(9)

ctx

apisix/core/ctx

用来缓存 ngx.var 的信息。

  1. set_vars_meta(ctx)

    创建了一个名为 ctx_var 的 tablepool

    包含了两个 table 对象 
    
    • _cache
    • _request 这里 request 缓存了 resty.ngxvar.request 的对象

    ctx.var = var

  2. release_var(ctx)

    释放 ctx_var 对象

用到了库 resty.ngxvar 利用 ffi 的方式实现了取 ngx.var 的值,相比直接去值性能提高了5倍。

导出了两个函数

1
2
local get_var      = require("resty.ngxvar").fetch   
local get_request = require("resty.ngxvar").request

request 是 local get_request = require("resty.core.base").get_request

看看 fetch 的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
function _M.fetch(name, request)
-- 从 vars 获取值,var 实现了 ffi 获取 uri host remote_addr request_time scheme upstream_response_time 等
local method = vars[name]

-- 未实现的直接从 ngx_var 取值
if not var_patched or not method then
if num_type[name] then
return tonumber(ngx_var[name])
elseif ups_num_type[name] then
return sum_upstream_num(ngx_var[name])
end

return ngx_var[name]
end

return method(request)
end
  1. local mt 的 __index 重写了去值方式。

    直接取值,如果_cache (也就是第一步里面创建的_cache)存在,直接返回。

    这里 key 做了一些预定义

    • cookie 开头的,返回 cookie
    • arg 开头的返回 request.get_uri_args 的值
    • post_arg 返回 request.get_post_args 的值
    • http 开头返回 ngx.var 的值。
    • graphql 开头返回 graphql 的值。
    • 否则 返回 ngx.ctx.api_ctx 的值 或者 ngx.var 的值

    _newinex 设置缓存。

高性能的原因

1. 使用 ffi 的方式优化去值性能  


2. 通过维护缓存,减少多次取值。 
apisix源码分析-路由高性能的秘密 redix tree(8)

apisix源码分析-路由高性能的秘密 redix tree(8)

redixtree

lua-resty-radixtree : 一个基于 antirez/rax 实现的路由匹配算法。

实现了高性能的路由匹配。

redixtree 和 trietree 非常的类似。

trie tree

trietree 的结构如下,

Untitled.png

1
2
3
4
5
type TrieNode struct {
// Value rune
Iskey bool
Next map[rune]*TrieNode
}

节点下按照单个的字符作为树大的节点。通过节点能够快速的找到查找的路径。 比如需要找 inn 。

它的树深度由最长的路径决定。 在查找过程中我们希望树的深度能够浅一些。

比如想要查找 foot 和 foob 时,发现前缀都是 foo 开头的,如果把foo存在一个节点,树的深度不就减少了两层吗。这个就是下面要介绍的 radixtree 。

radix tree

Untitled.png

1
2
3
4
5
type RedixNode struct {
Value string
Iskey bool
Next map[rune]*RedixNode
}

在 apisix 中使用了开源的 radixtree 实现 rax 来作为低层的查询树。

节点定义如下:

1
2
3
4
5
6
7
typedef struct raxNode {
uint32_t iskey:1; /* Does this node contain a key? */
uint32_t isnull:1; /* Associated value is NULL (don't store it). */
uint32_t iscompr:1; /* Node is compressed. */
uint32_t size:29; /* Number of children, or compressed string len. */
unsigned char data[];
} raxNode;
1
2
3
4
5
6
7
8
9
10
11
12
*
* (f) ""
* /
* (i o) "f"
* / \
* "firs" ("rst") (o) "fo"
* / \
* "first" [] [t b] "foo"
* / \
* "foot" ("er") ("ar") "foob"
* / \
* "footer" [] [] "foobar"

节点通过 iscompr 来判断是压缩节点还是非压缩节点。

size 的大小为 29 bit 和前面三个变量一起 32 bit 内存对齐。

节点数据结构

对 data 部分的不同的处理达到不同的效果。

压缩节点结构:

前面是 size 长度的数据,中间是指向下一个节点的指针 z-ptr,最后是节点数据指针指向数据,当 iskey ==1 时存在节点数据。

[header iscompr=1][xyz][z-ptr](value-ptr?)

Untitled.png

非压缩节点结构

前面是 size 长度的数据,接着是一个 padding,接下来是size个指针指向下一个节点。最后是value-data指针指向数据。

[header iscompr=0][abc][a-ptr][b-ptr][c-ptr](value-ptr?)

Untitled.png

padding

1
2
3
4
5
6
7
/* 对齐函数 : 在申请内存时对齐到整数倍的字节 
sizeof(void*) 一个指针所占用的内存大小,4个字节或8个字节
nodesize +4 字节数+ 头部占用的32bit(4个字节)
(nodesize+4) % sizeof(void*) 对整体占用求余
(sizeof(void*)-1) :前面的已经是padding数了。这里做一个并集。??
*/
#define raxPadding(nodesize) ((sizeof(void*)-((nodesize+4) % sizeof(void*))) & (sizeof(void*)-1))

节点操作

添加和删除会出现压缩节点和非压缩节点的转换。

当出现节点分裂时,压缩节点在分裂点创建非压缩的子节点,

当删除时非压缩节点只有一个节点时会和上一级节点合并

读取data 和设置data ,中 raxNodeCurrentLength 获取了当前节点的长度 减去一个指针,就是data 的指针地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void raxSetData(raxNode *n, void *data) {
n->iskey = 1;
if (data != NULL) {
n->isnull = 0;
void **ndata = (void**)
((char*)n+raxNodeCurrentLength(n)-sizeof(void*));
memcpy(ndata,&data,sizeof(data));
} else {
n->isnull = 1;
}
}
/* Get the node auxiliary data. */
void *raxGetData(raxNode *n) {
if (n->isnull) return NULL;
void **ndata =(void**)((char*)n+raxNodeCurrentLength(n)-sizeof(void*));
void *data;
memcpy(&data,ndata,sizeof(data));
return data;
}

需要处理压缩节点和非压缩节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 用来查询字符串在redixtree 中能够匹配到哪个位置
static inline size_t raxLowWalk(rax *rax, unsigned char *s, size_t len, raxNode **stopnode, raxNode ***plink, int *splitpos, raxStack *ts) {
raxNode *h = rax->head;
raxNode **parentlink = &rax->head;
size_t i = 0; /* Position in the string. */
size_t j = 0; /* Position in the node children (or bytes if compressed).*/
while(h->size && i < len) {
debugnode("Lookup current node",h);
unsigned char *v = h->data;
if (h->iscompr) {
for (j = 0; j < h->size && i < len; j++, i++) {
if (v[j] != s[i]) break;
}
if (j != h->size) break;
} else {
/* Even when h->size is large, linear scan provides good
* performances compared to other approaches that are in theory
* more sounding, like performing a binary search. */
for (j = 0; j < h->size; j++) {
if (v[j] == s[i]) break;
}
if (j == h->size) break;
i++;
}
if (ts) raxStackPush(ts,h); /* Save stack of parent nodes. */
raxNode **children = raxNodeFirstChildPtr(h);
if (h->iscompr) j = 0; /* Compressed node only child is at index 0. */
memcpy(&h,children+j,sizeof(h));
parentlink = children+j;
j = 0; /* If the new node is compressed and we do not
iterate again (since i == l) set the split
position to 0 to signal this node represents
the searched key. */
}
debugnode("Lookup stop node is",h);
if (stopnode) *stopnode = h;
if (plink) *plink = parentlink;
if (splitpos && h->iscompr) *splitpos = j;
return i;
}

辅助结构

  1. raxStack : 用来返回访问节点的栈,当我们需要返回查询路径时使用。

  2. Iterator :

    raxSeek 跳转到匹配的位置

    raxNext 后向查找,能够循环查找到所有匹配位置的子节点。

    raxPrev 前向查找,能够向上查询访问路径。

lua-resty-radixtree

对 rdx 的封装

1
2
3
4
5
6
7
8
9
10
void *radix_tree_new();
int radix_tree_destroy(void *t);
int radix_tree_insert(void *t, const unsigned char *buf, size_t len,
int idx);
void *radix_tree_find(void *t, const unsigned char *buf, size_t len);
void *radix_tree_search(void *t, void *it, const unsigned char *buf, size_t len);
int radix_tree_prev(void *it, const unsigned char *buf, size_t len);
int radix_tree_next(void *it, const unsigned char *buf, size_t len);
int radix_tree_stop(void *it);
void *radix_tree_new_it(void *t);

实现了对 paths、hosts、methods、remote_adds、vars 、filter_fun 的匹配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
paths = {"/aa", "/bb*", "/name/:name/*other"},
hosts = {"*.bar.com", "foo.com"},
methods = {"GET", "POST", "PUT"},
remote_addrs = {"127.0.0.1","192.168.0.0/16",
"::1", "fe80::/32"},
vars = {
{"arg_name", "==", "json"},
{"arg_weight", ">", 10},
},
filter_fun = function(vars, opts)
return vars["arg_name"] == "json"
end,

metadata = "metadata /bb",
}

在 apisix 中通过 dispatch(path,opts) 来执行 handler 函数的回调。

1
2
3
4
5
6
7
function _M.dispatch(self, path, opts, ...)
...
local route, err = match_route(self, path, opts or empty_table, args)
...
handler(...)
return true
end
apisix源码分析-插件的运行(7)

apisix源码分析-插件的运行(7)

插件的运行

插件的 init_worker 监控了 etcd 的 pluglns 。

插件列表发生发生变化时通过 load 函数重新加载插件 。

load 函数获取启用的插件列表后,清理本地的插件,使用 load_plugin 函数通过 pcall(require, pkg_name)重新引入插件

插件在每个阶段通过 common_phase(phase_name) 来执行。执行流程如下:

  • 执行公共插件
  • 插件调用阶段 run_plugin(phase,plugins,api_ctx)
  • api_ctx.script_obj : 当使用有向无环图来做编辑时,apisix 会将图转换为脚本存储到 script_obj 变量中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
local function common_phase(phase_name)
local api_ctx = ngx.ctx.api_ctx
if not api_ctx then
return
end
-- 执行公共变量
plugin.run_global_rules(api_ctx, api_ctx.global_rules, phase_name)

if api_ctx.script_obj then -- 执行有向无环图编辑的脚本
script.run(phase_name, api_ctx)
return api_ctx, true
end

return plugin.run_plugin(phase_name, nil, api_ctx) -- 执行插件列表
end
apisix源码分析-route的匹配(6)

apisix源码分析-route的匹配(6)

route 是如何匹配的?

Route 中主要包含三部分内容:匹配规则(比如 uri、host、remote_addr 等),插件配置(限流限速等)和上游信息,通过 init_worker 阶段对 etcd 的监控,route 节点的数据同步到了user_routes的变量中,默认执行 http/router/radixtree_uri.lua 匹配规则 ,还可以使用 radixtree_host_uriradixtree_uri_with_parameter 。通过实现 match(api_ctx) 方法实现查询替换。

在 apisix 中使用了 redixtree 来实现路由匹配、域名证书、admin route 的匹配。 redixtree 相关分析可以看相关的分析。通过存储不同的path,来实现不同的需求。

  • radixtree_uri

    paths = route.value.uris or route.value.uri,

    在radixtree_uri 中直接使用了uri,也就是直接使用了请求的路径部分在 redixtree 中做匹配

  • radixtree_host_uri

    建立了两层 redixtree 来做 route 查询,第一层使用域名作为 path 创建 redixtree 和ssl的域名查询相同)。在域名对应的 key 在创建 uri 的 redixtree。形成两层查询。

  • radixtree_uri_with_parameter (未实现)

access 的执行

当获取到匹配的 route 后,就可以得到 route、plugin、service 等信息。这些信息会存储在变量api_ctx 中

1
2
3
4
5
6
7
8
9
api_ctx.matched_route = route
api_ctx.conf_type = "route&service"
api_ctx.conf_version = route.modifiedIndex .. "&" .. service.modifiedIndex
api_ctx.conf_id = route.value.id .. "&" .. service.value.id
api_ctx.service_id = service.value.id
api_ctx.service_name = service.value.name

api_ctx.route_id = route.value.id
api_ctx.route_name = route.value.name

对于插件有两种,列表或者通过编排的。

当发现 route.value.script 不为空时,会执行通过编排生成的 lua 脚本。

为空时执行插件列表。在执行插件列表时执行了会执行如下代码:

1
2
3
4
5
6
7
8
9
10
11
plugin.run_plugin("rewrite", plugins, api_ctx)
if api_ctx.consumer then
...
route, changed = plugin.merge_consumer_route(
route,
api_ctx.consumer,
api_ctx
)
...
end
plugin.run_plugin("access", plugins, api_ctx)

在这里可以发现,apisix 的插件的 rewrite 和 access 都是在 openresty 的 access 阶段执行的

在配置的时候我们会有默认插件,Consumer 绑定的插件, Route 绑定的插件,Service 绑定的插件。只列出插件 merge 操作,根据插件优先级的说明,同样的插件只会有一份有效,相同的插件会做合并。Plugin 配置可直接绑定在 Route 上,也可以被绑定在 Service 或 Consumer上。而对于同一 个插件的配置,只能有一份是有效的,配置选择优先级总是 Consumer > Route > Service。在 access 执行过程中可以看到如下的合并过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if route.value.plugin_config_id then
...
route = plugin_config.merge(route, conf)
end
if route.value.service_id then
...
route = plugin.merge_service_route(service, route)
...
end
if api_ctx.consumer then
...
route, changed = plugin.merge_consumer_route(
route,
api_ctx.consumer,
api_ctx
)
...
end
apisix源码分析-处理用户请求(5)

apisix源码分析-处理用户请求(5)

apisix 如何处理用户的请求?

api_ctx

在执行开始前, apisix 会给 ngx.ctx 增加一个变量 ngx.ctx.api_ctx 的变量。

1
2
api_ctx = core.tablepool.fetch("api_ctx", 0, 32) 
ngx_ctx.api_ctx = api_ctx

变量从 “api_ctx” 的 table 池( lua-tablepool) 中取出,用完后还给池。

(lua-tablepool 中一个 table 变量重复使用 20000 次后就会被新的取代)

上面的两行代码在 ssl 阶段和 access 阶段都存在,但是存储的内容是不同的。

ssl 阶段是证书信息,access 阶段是 route 相关的信息

ssl

ssl 阶段负责设置证书。 证书的设置可以看看 ngx.ssl 的相关说明,在 ssl_certificate_by_lua_block 阶段通过 ngx.ssl 设置证书和私钥来配置域名证书。

init_work : 初始化 ssl_certificates 变量,用来和 etcd 同步数据

1
2
3
4
5
6
7
ssl_certificates, err = core.config.new("/ssl", {
automatic = true,
item_schema = core.schema.ssl,
checker = function (item, schema_type)
return apisix_ssl.check_ssl_conf(true, item)
end,
})

证书匹配

调用了 ssl/router/radixtree_sni.lua 的 match_and_set 。

  • 建立域名查询树

    radixtree_router 初始化 radixtree 来存储域名信息

    radixtree_router_ver 存储 配置版本,通过版本号来决定是否需要重建 radixtree

  • 域名会通过 字符串反向(sni:reverse()) 后在 radixtree 中查询。

    api_ctx.matched_sni 存储匹配的域名

    api_ctx.matched_ssl 存储对应的证书信息

域名查询树建立

create_router(ssl_certificates.values)

对象

1
2
3
4
5
6
7
8
9
10
route_items[idx] = {
paths = sni, -- 反向后的域名
handler = function (api_ctx) -- 回调函数
if not api_ctx then
return
end
api_ctx.matched_ssl = ssl
api_ctx.matched_sni = sni
end
}

证书有两种,全匹配证书,泛域名证书。

1
2
3
4
5
6
for _, msni in ipairs(api_ctx.matched_sni) do
-- 完全匹配 或者 相对于msni sni_srv 后面没有点(泛匹配)?
if sni_rev == msni or not str_find(sni_rev, ".", #msni) then
matched = true
end
end

按照 redixtree 最长匹配原则,证书会优先匹配到全匹配证书。

access

apisix 匹配路由是在 access 阶段完成的,功能由 router_http.match(api_ctx) 提供

apisix源码分析-admin api 的注册和路由(4)

apisix源码分析-admin api 的注册和路由(4)

admin api

admin 注册

1
2
3
4
5
6
7
8
9
10
11
12
location /apisix/admin {
set $upstream_scheme 'http';
set $upstream_host $http_host;
set $upstream_uri '';

allow 0.0.0.0/0;
deny all;

content_by_lua_block {
apisix.http_admin()
}
}

在 admin/init.lua 中,通过 init_worker 初始化 admin route ,并将本地插件配置同步到etcd中,并注册了插件重启事件,当触发"/apisix/admin/plugins/reload"触发同步本地插件列表到 etcd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
router = route.new(uri_route)
events = require("resty.worker.events")
-- 注册插件重启事件
events.register(reload_plugins, reload_event, "PUT")
if ngx_worker_id() == 0 then
local ok, err = ngx_timer_at(0, function(premature)
if premature then
return
end
-- 将本地插件配置同步到 etcd
sync_local_conf_to_etcd(true)
end)

if not ok then
error("failed to sync local configure to etcd: " .. err)
end
end

admin 路由

router = route.new(uri_route) 是一个 redixtree 对象。用来做 admin api 的路由匹配 ,可以通过uri_route 参数看到对不同接口的实现。

run 作为 admin api 的主要实现,通过 uri 和m来执行 resource 中的调用。

有如下两种类型的 api ,分别对应到系统组件,比如 routes ,servers,ssl 等。

/apisix/admin/schema//

这里的 seg_res 是对应的组件名,seg_id 是组件中对应配置的id

和插件路径

/apisix/admin/schema/plugins//

这里的seg_res 对应的是插件名,seg_id 对应的是插件配置的id

通过 seg_res 获取对应的执行对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
local resources = {
routes = require("apisix.admin.routes"),
services = require("apisix.admin.services"),
upstreams = require("apisix.admin.upstreams"),
consumers = require("apisix.admin.consumers"),
schema = require("apisix.admin.schema"),
ssl = require("apisix.admin.ssl"),
plugins = require("apisix.admin.plugins"),
proto = require("apisix.admin.proto"),
global_rules = require("apisix.admin.global_rules"),
stream_routes = require("apisix.admin.stream_routes"),
plugin_metadata = require("apisix.admin.plugin_metadata"),
plugin_configs = require("apisix.admin.plugin_config"),
}

local resource = resources[seg_res]

通过 method 来执行resource 对应的请求函数

1
2
local code, data = resource[method](seg_id, req_body, seg_sub_path,
uri_args)

resources 的定义可以看出,admin 的路由 用来实现 api 的操作, 涉及到 routes services upstreams consumers schema ssl plugins proto global_rules stream_routes plugin_metadata plugin_configs

api 的作用就是用来对 etcd 进行 crud 的操作。 对每个组件的 api ,实现大概如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
local _M = {
version = 0.1,
}


local function check_conf(...)
... -- 对 put 写入 etcd 的数据校验
-- 使用 schema_def.lua 里对应的校验数据进行校验
-- 返回 etcd key
end

_M.put(...) -- etcd set
_M.get(...) -- etcd get
_M.post(...) -- etcd push
_M.delete(...) -- etcd delete
return _M

在前面的 resource[method] 通过 method= put | get | post | delete 就可以执行到对应的函数.

apisix源码分析-初始化过程(3)

apisix源码分析-初始化过程(3)

init

init 对应到 openresty 的 init_by_lua_blockinit_worker_by_lua_block 的阶段

init_by_lua_block 所做的不多,执行 apisix.http_init(args),args 里面传入了 dns_resolver 配置的参数,比如使用docker-compose 启动传入的127.0.0.11 。

1
2
3
4
5
local dns_resolver = { "127.0.0.11", }
local args = {
dns_resolver = dns_resolver,
}
apisix.http_init(args)

在init 过程中:

1
2
3
4
5
6
7
8
9
10
11
12
-- dns 发现设置: args["dns_resolver"]
core.resolver.init_resolver(args)
-- 生成 apisix uuid /conf/apisix.uid
core.id.init()
--https://github.com/openresty/lua-resty-core/blob/master/lib/ngx/process.md#enable_privileged_agent
-- 开启特权进程,可以使用 master 进程权限执行一些功能
local process = require("ngx.process")
local ok, err = process.enable_privileged_agent()

-- 读初始化配置, core/config_etcd.lua 执行 init 判断 etcd 是否可访问。
local ok, err = core.config.init()

init_worker

apisix/init/http_init_worker()

init_worker 阶段执行了所有组件的初始化,也就是对应组件的 init_worker 方法。

比如 admin timers plugin plugin_config route service consumer config upstream ssl。、

这个阶段主要的作用是初始化各个组件。每个组件自身都实现了一个 init_worker 方法,除了自身相关的内容,主要是同步组件自身对应的 etcd 信息。

etcd watch 实现

1
obj, err = core.config.new("/etcd_path", cfg)

比如在 router 的 http_init_worker 阶段,watch 了 /global_rules

1
2
3
4
5
local global_rules, err = core.config.new("/global_rules", {
automatic = true,
item_schema = core.schema.global_rule,
checker = plugin_checker,
})

core.config.new(key,ops)

函数对应的执行在文件 core/config_etcd.lua 中。

> 在 apisix/core.lua 中 local config = require("apisix.core.config_" .. config_center), config_center 来自配置文件,默认为 etcd。 

key 为 etcd 监控的节点,ops 为需要的参数。

当 automatic == true 时 ,会创建 ngx.timer_at , 否则只会初始化 etcd_cli

load_full_data 函数,将 etcd 取回的数据填充到 key 对应的 obj 中(将etcd 数据转换为 lua 对象)

- self.item_scheme 检验数据
- self.checker 对象检查
- 将数据写入 self.value  , self.values_hash[key] 记录对应key的index
- 执行 self.filter

_automatic_fetch 函数,启动 ngx_timer_at 调用函数 _automatic_fetch ,通过自调用形成迭代。

在_automatic_fetch函数中真正执行同步的是 sync_data 函数

sync_data 函数,读取 etcd,并 watch etcd ,将数据写到对象中 。

   self.values :  array 用来存储 etcd 写回的数据


   self.values_hash: hash 用来存储 etcd key 对应的 values 的下标, 在 etcd 中 {/routes/aaa:<data>} 同步到 apisix 的存储 


假设 etcd 有如下 key


`etcd: /routes/aaaa : <data>` 


在self 中存储: 
`self.values = [<data>,...]
self.values_hash= {"/routes/aaa":1}` 

出现修改数据

1
2
insert_tab(self.values, res)
self.values_hash[key] = #self.values

当出现删除数据时

1
2
3
self.sync_times = self.sync_times + 1
self.values[pre_index] = false
self.values_hash[key] = nil

这里出现的 sync_times 是一个计数器,用来记录删除values 数量,当超过 100 的时候,会对 values 里的数据进行重建。

self.conf_version 记录版本变化次数

服务发现

服务发现的 init_worker 会循环调用配置的服务发现。 discovery_type 为配置文件中配置的服务发现。

1
2
3
4
local discovery_type = local_conf.discovery
for discovery_name, _ in pairs(discovery_type) do
discovery[discovery_name].init_worker()
end

我们可以在 discovery 目录找到对应的服务发现的实现。服务发现需要实现两个接口。

1
2
3
4
local _M = {}
function _M.nodes(service_name)
function _M.init_worker()
return _M

为了看分析比较简单,可以查看 discovery/dns.lua 的实现。

init_woker 函数初始化了 dns_client 。

nodes 函数通过 dns 服务发现来获取 service_name 对应的服务。并将返回的内容转换成 apisix 的的格式。

1
2
3
4
5
nodes[i] = {host = r.address, weight = r.weight or 1, port = r.port or port}
if r.priority then
-- for SRV record, nodes with lower priority are chosen first
nodes[i].priority = -r.priority
end

目前 apisix 支持了 consul_kv dns eureka nacos 四种服务发现,如果想要其他的可以自行实现这两个函数来支持业务。

在将upstream 和 route 绑定时会 set_by_route 会触发服务发现的 nodes 函数。

access

apisix 匹配路由是在 access 阶段完成的,功能由 router_http.match(api_ctx) 提供

header_filter

会设置一些 apisix 自定义的 header ,调用 common_phase

body_filter

调用 common_phase

log

调用 common_phase

被动健康检查

释放前面阶段里通过 tablepool 申请的对象 (api_ctx plugins uri_parse_param matched_route_record )

balancer

在access 阶段就计算出了要均衡到的后端,设置并执行 balancer.lua 的 run 函数

apisix源码分析-配置文件(2)

apisix源码分析-配置文件(2)

启动后会读取配置文件

apisix/core/profile.lua

配置文件放在 apisix_home/conf 下。

在执行读取配置文件会使用一个 os.getenv(“APISIX_PROFILE“) 来决定读取哪个配置文件

比如设置了 APISIX_PROFILE = dev

读取的配置文件为 apisix_home/conf/-dev

配置文件会读取 yaml 文件后,转换成 lua 里的 table 对象,通过 scheme 来校验。

scheme

core/scheme.lua

所有的传入的参数验证都会通过 scheme 来进行校验:

  • 读取配置文件校验
  • admin api 写入 etcd 的配置校验。
  • 插件配置校验。

这里用到了 jsonscheme 的 lua 库对参数进行校验,在 scheme_def.lua 中能够找到所有 apisix 的配置验证模版。插件的验证模版在每个插件中。

scheme 是一个非常棒的设计,用约定的方式定义了输入的数据,对数据数据做 scheme 的校验.保证输入输出的正确性.

主配置文件的生成

apisix 的 nginx.conf 配置是由 cli/ngx_tpl.lua 生成的,tpl 文件和上面读取到的 conf 中得到的配置文件做变量替换后得到具体的 nginx.conf 文件.

以下是和 http 有关的阶段配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
	 upstream apisix_backend {
server 0.0.0.1;

balancer_by_lua_block {
apisix.http_balancer_phase()
}

keepalive 320;
keepalive_requests 1000;
keepalive_timeout 60s;
}


init_by_lua_block {
require "resty.core"
apisix = require("apisix")

local dns_resolver = { "127.0.0.11", }
local args = {
dns_resolver = dns_resolver,
}
apisix.http_init(args)
}

init_worker_by_lua_block {
apisix.http_init_worker()
}

exit_worker_by_lua_block {
apisix.http_exit_worker()
}
server {
ssl_certificate_by_lua_block {
apisix.http_ssl_phase()
}
access_by_lua_block {
apisix.http_access_phase()
}

proxy_pass $upstream_scheme://apisix_backend$upstream_uri;

mirror /proxy_mirror;

header_filter_by_lua_block {
apisix.http_header_filter_phase()
}

body_filter_by_lua_block {
apisix.http_body_filter_phase()
}

log_by_lua_block {
apisix.http_log_phase()
}

}
}
apisix源码分析-启动(1)

apisix源码分析-启动(1)

启动

1
2
3
4
5
6
7
8
9
/bin/apisix
/cli/
apisix.lua
env.lua
etcd.lua
file.lua
ngx_tpl.lua
ops.lua
util.lua

执行 /bin/apisix $* 实际执行 lua apisix/cli/apisix.lua $*

cli

`apisix.lua` :
  1. env.lua 环境变量
1
2
3
4
5
6
7
apisix_home : apisixe 安装目录
is_root_path : 是否为 root 目录
openresty_args : openresty 启动
openresty -p apisix_home -c apisix_home/conf/nginx.conf
pkg_cpath_org:
pkg_path_org :
min_etcd_version : etcd 版本
1. `ops.lua` :执行启动关闭等 ops.execute(env,arg) 在执行 `apisix start | stop | quit | restart | reload` 会调用 openresty 的执行命令,比如 `apisix reload` 会执行 `openresty -c nginx.conf -s reload`

apisix init

cli/ops/init()

负责读取检查准备apisix配置文件,

- 检查yaml配置文件
- 检查admin key
- 检查 openresty 版本
- 检查插件列表
- 检查promtheus
- 检查多端口
- 检查 ssl
- ...

所有检查的内容会存储在 sys_conf 的变量里,通过模版变量替换。

cli/ngx_tpl.lua 生成 conf/nginx.conf 文件,

1
2
3
4
local conf_render = template.compile(ngx_tpl)
local ngxconf = conf_render(sys_conf)
local ok, err = util.write_file(env.apisix_home .. "/conf/nginx.conf",
ngxconf)

apisix init_etcd : etcd.lua

检查 etcd 的版本,验证授权信息

以上就是 cli 启动的过程,总结就是检查运行环境,通过配置文件生成 nginx.conf 。