Test::Nginx 使用

https://github.com/openresty/test-nginx

https://openresty.gitbooks.io/programming-openresty/content/testing/test-nginx.html

Test::Nginx 使用 Perl 便携的 nginx 测试组件

安装

1
cpan Test::Nginx

测试布局

按照惯例,会在项目的根目录创建一个 t/ 的目录来作为测试文件的目录. 当有很多测试文件时,可以在 t/ 中进一步的分组 ,比如 t/001-test/

本质上每个 .t 文件都是一个 Perl 脚本文件 ,可以使用 Prove (perl 的通用测试工具)来运行

比如 prove t/001.t

测试文件布局

这里有一个简单的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use Test::Nginx::Socket 'no_plan';

run_tests;

__DATA__

=== TEST 1: max pool size is 200
--- http_config eval: $::HttpConfig
--- config
location /t {
content_by_lua_block {
ngx.print("hello world")
}
}

--- request
GET /t

--- response_body chomp
hello world

测试文件通常以 .t 作为扩展名 ,每个测试文件本身就是一个 Perl 脚本.

每个测试文件分为两个部分,两个部分使用 __DATA__ 来分隔

  • 第一部分是简短的 Perl 代码
1
2
3
4
# 调用对应的测试模块 一般使用 'no_plan' 即可
use Test::Nginx::Socket 'no_plan';
# 运行测试用例
run_tests;
  • 第二部分是测试用例

测试用例格式 https://openresty.gitbooks.io/programming-openresty/content/testing/test-file-layout.html

数据块通过三个横线来设置某种数据块,比如如下的四种类型:

1
2
3
4
5
6
7
8
9
10
11
12
--- config
location = /t {
echo "hello, world!";
}

--- request
GET /t

--- response_body
hello, world!

--- error_code: 200

数据块的过滤器

在类型后可以跟一个和多个过滤器

1
2
3
4
5
6
7
# chomp : 去掉换行符
--- error_code chomp
200

# eval: 执行一个 Perl 代码 :以下是生产 4096 个 a 字符串的body .
--- response_body eval
"a" x 4096

运行

Test::Nginx 依赖 Perl 的 prove 命令来运行测试用例.

运行时 Test::Nginx 会调用 nginx 服务器和 socket 客户端来运行测试,它会自动在系统环境的 Path 中找到 nginx . 所有需要在环境中指定 nginx 的程序路径

1
export PATH=/usr/local/openresty/nginx/sbin:$PATH

可以在程序的根目录添加一个 go.sh 的文件

1
2
3
4
5
6
7
8
#!/usr/bin/env bash

export PATH=/usr/local/Cellar/openresty/1.21.4.2_1/nginx/sbin:$PATH

# 部分系统perl环境会提示(Can't locate t/.pm in @INC)
export PERL5LIB=$(pwd):$PERL5LIB

exec prove "$@"

然后使用这个脚本来做测试 ,比如测试 t/aa.t 这个文件.

1
2
3
4
5
$ ./go.sh t/aa.t 
t/aa.t .. ok
All tests successful.
Files=1, Tests=2, 0 wallclock secs ( 0.02 usr 0.01 sys + 0.19 cusr 0.10 csys = 0.32 CPU)
Result: PASS

可以通过 -v 的选项来生成详细的测试报告

prove -v t/aa.t

运行多个文件

prove -v t/foo.t t/bar.t t/baz.t

通配符运行多个文件

prove -v t/*.t

递归运行目录 -r

prove -r t/

测试单个用例,只需要在用例中为单个用例添加 --- ONLY

测试文件运行的顺序

测试文件通常按照字母顺序运行, 可以通过在测试文件名前面添加数字序列来控制运行的顺序. 例如

1
2
3
4
5
6
t/000-sanity.t
t/001-set.t
t/002-content.t
t/003-errors.t
...
t/139-ssl-cert-by.t

虽然 Prove 可以通过 -jN 来支持作业并行运行,但是 Test::Nginx 并真正不支持这种模式.

测试块运行的顺序

测试块会打乱每个文件中的测试块 ,可以通过加入 no_shuffle() 来禁用这种行为

1
2
3
4
5
6
7
use Test::Nginx::Socket 'no_plan';

no_shuffle();
run_tests();

__DATA__
...

测试组件

Test::Nginx::Socket::Lua

用于 ngx_lua 相关的测试框架

https://metacpan.org/pod/Test%3A%3ANginx%3A%3ASocket%3A%3ALua

通过环境变了 TEST_NGINX_INIT_BY_LUA 导入 ngx_lua module

1
export TEST_NGINX_INIT_BY_LUA="package.path = '$PWD/../lua-resty-core/lib/?.lua;' .. (package.path or '') require 'resty.core'"

example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
use Test::Nginx::Socket::Lua;

repeat_each(2);
plan tests => repeat_each() * 3 * blocks();

no_shuffle();
run_tests();

__DATA__

=== TEST 1: sanity
--- config
location = /t {
content_by_lua '
ngx.say("hello world")
';
}
--- request
GET /t
--- response_body
hello world
--- error_code: 200
--- no_error_log
[error]

Test::Nginx::Socket::Lua::Stream

测试 stream

example

1
2
3
4
5
6
stream {
server {
listen 1985;
echo "Hello, stream echo!";
}
}

Test::Nginx::Socket::Lua::Dgram

测试 upd stream

example

1
2
3
4
5
6
stream {
server {
listen 1985 udp;
echo "Hello, stream echo!";
}
}
微服务还是单体服务?

微服务还是单体服务?

前两天看到了腾讯云开发者的账号发布以一篇 QQ 浏览器服务的优化.

bookmark

在做架构设计时将微服务变成单体服务. 我在下面做了评价.

很多做设计的没有意识到 rpc 也是需要时间的,无限度的搞微服务,复杂性和网络调用链导致问题排查不下去. 一个单体应用加个缓存的事情,最后搞成一堆服务相互依赖调用,浪费机器、浪费人力.

在做架构拆分,很容易忘记初衷是什么,而是为了架构而架构.

在做微服务的时候,基本都是目前架构冗余,业务杂糅在一起,扩容困难. 或者是数据库表太多,大量的表关联查询缓慢,所以要做微服务架构的改造,跟随一起的还有,服务要跟着一起上容器.

单体应用与微服务并不冲突

做微服务拆分,但不要拆得那么细,比如有些过分的拆分恨不得 tab 就成为一个微服务.

本来一个很简单的业务,一个微服务/单体应用就够了, 非得拆出好几个来,比如一个业务有几个任务,业务把每个任务都拆成一个服务,可每个服务的80%的代码都是一样的,甚至数据都是一个数据库来源. 没有这个必要嘛. 而且过多的拆分会导致业务代码的割裂.

grpc 很快,但是也需要时间

grpc 使用 http2 协议,相比 http 协议对比,长链接和流要快很多,但是还是需要花时间的,每多一次的 grpc 的交互,网络请求会多出 1-2ms 的时间.

有些给前端的页面渲染拆成了多个模块,当想要把所有数据一次渲染出来.内部可能需要做几次甚至是十几次的 rpc 调用. 导致在网络上的时间就超过了 20ms. 还是不考虑高负载和网络抖动的情况下.

当 rpc 出现错误重试的成本要远远高于单体应用出错重试.

让缓存前置,让请求后置

当对服务进行拆分,有时候会出现这样的情况,每个微服务配套一套缓存和 DB.通常我们在内网微服务通信,然后通过 http 来提供外网服务.如果可能的话,可以尝试让对外的服务做前置的缓存.

不要过早的优化,容器并不是银弹

容器也是有损耗的, 使用容器会带来物理机大概 5% 的性能损耗,如果业务本身很稳定,也不是高并发的系统,没有什么扩容的需求,要不还是用物理机吧.

容器带来的无状态服务,扩容等让运维变得简单,但是同时也带来了查问题的成本变高,全链路的监控和运维的成本,对于小团队来说几乎是个黑洞.

做架构升级不要为了兼容旧的东西做妥协

在做架构升级会碰到很多的技术债,为了兼容可能是错误的技术债,不得不为此妥协,当妥协的内容越来越多的时候,会发现架构升级做了个寂寞. (这个和微服务没关系)

架构的目的,不是为了炫耀新技术,而是为了稳定,作为一个技术人员,要最求新技术,善用老技术.

apisix源码分析- 插件 prometheus 支持

apisix源码分析- 插件 prometheus 支持

用到的库 https://github.com/knyar/nginx-lua-prometheus

apisix/prometheus/exporter.lua

变量 metrics 用来定义指标,

prmetheus_keys

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
KeyIndex.new(shared_dict, prefix)
self.dict = shared_dict
self.key_prefix = prefix .. "key_"
self.delete_count = prefix .. "delete_count"
self.key_count = prefix .. "key_count"
self.last = 0
self.deleted = 0
self.keys = {}
self.index = {}

KeyIndex:sync()
获取 dict delete_count 和 dict key数量 N
如果本地的 delete 和 delete_count 不同,说明有删除
执行 sync_range(0,N)
如果当前的 last 和 N 不同说明有更新,那么执行 sync_reange(N,last)
并返回 N

KeyIndex:sync_range(first, last)
从 dict 同步 first 到 last 区间的内容到当前对象
从 dict 获取 keyi
记录映射关系 keys[i] = key ,index[key]=i
如果 keyi 是不存在的,从映射关系删除内容

KeyIndex:list()
执行同步 sync()
并返回 keys 的 copy

KeyIndex:add(key_or_keys, err_msg_lru_eviction)
对每一个 key 执行:
同步并获取N
如果 key 存在,就跳过
如果 key 不存在就设置 dict N+1 为 key
key_count ++
设置 keys 和 index

KeyIndex:remove(key, err_msg_lru_eviction)


总结:
利用 dict 存储数据,利用 sync 将数据同步到 worker 进程的函数

prometheus

ngx.sleep(0)

OpenResty 中的 ngx.sleep(0) 调用会主动放弃当前的 CPU 执行权,而把执行权交还给 nginx 事件循环和其他并发请求。当前
yield 了的 Lua 协程会在下一个 nginx 事件处理周期里接着继续运行。

一个典型应用:

shared-dict怎样实现不阻塞nginx进程?每分钟刷新一次缓存,刷新的时候,几十秒,nginx无反应,单个进程100%,其他进程没事,但是也无反应,好像访问的时候也到这个100%的进程了,导致无反应。

解决方法是:可以在操作中穿插一些 ngx.sleep(0) 用来让其他协程获得运行机会。

prometheus 里有大量的调用 dict ,执行 reload 后大量的同步会导致阻塞。

init:

dict : 原始全局字典


_counter : 用来记录 worker 计数器,并同步到dict


key_index: 用来将dict key 同步到 worker 中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
register(self, name, help, label_names, buckets, TYPE_HISTOGRAM)
构建指标需要的数据,对传入的name 去掉 _bucket _sum _count 的后缀
当为 historgram 时 name 或 name_maybe_historgram 存在
当为其他时, name 或者加入了后缀的存在都认为是指标存在,返回错误。
label_names :label 名列表
label_count: label 数量
lookup = {} :用table 来存储指标树
例如 ['me.com']['200'][LEAF_KEY] = 'http_count{host="me.com",status="200"}'

yield :执行读写 dict 次数 200 次就释放一次 cpu 时间片占用

lookup_or_create(self, label_values)
返回完整的指标名 counter 和 gauge 直接返回字符串
histogram 返回 list
如果 lookup_size > lookup_max_size ,清空 lookup
inc_gauge: dict incr


用特权进程来提供服务

想办法让指标就是排好序

https://mp.weixin.qq.com/s/guR77q6kXxpGfKzT46GYsg

  1. 指标名存储到对应的 table 树里面,定时同步
  2. 输出只有循环和取 dict
apisix源码分析-减少对对象的访问 ctx 缓存(9)

apisix源码分析-减少对对象的访问 ctx 缓存(9)

ctx

apisix/core/ctx

用来缓存 ngx.var 的信息。

  1. set_vars_meta(ctx)

    创建了一个名为 ctx_var 的 tablepool

    包含了两个 table 对象 
    
    • _cache
    • _request 这里 request 缓存了 resty.ngxvar.request 的对象

    ctx.var = var

  2. release_var(ctx)

    释放 ctx_var 对象

用到了库 resty.ngxvar 利用 ffi 的方式实现了取 ngx.var 的值,相比直接去值性能提高了5倍。

导出了两个函数

1
2
local get_var      = require("resty.ngxvar").fetch   
local get_request = require("resty.ngxvar").request

request 是 local get_request = require("resty.core.base").get_request

看看 fetch 的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
function _M.fetch(name, request)
-- 从 vars 获取值,var 实现了 ffi 获取 uri host remote_addr request_time scheme upstream_response_time 等
local method = vars[name]

-- 未实现的直接从 ngx_var 取值
if not var_patched or not method then
if num_type[name] then
return tonumber(ngx_var[name])
elseif ups_num_type[name] then
return sum_upstream_num(ngx_var[name])
end

return ngx_var[name]
end

return method(request)
end
  1. local mt 的 __index 重写了去值方式。

    直接取值,如果_cache (也就是第一步里面创建的_cache)存在,直接返回。

    这里 key 做了一些预定义

    • cookie 开头的,返回 cookie
    • arg 开头的返回 request.get_uri_args 的值
    • post_arg 返回 request.get_post_args 的值
    • http 开头返回 ngx.var 的值。
    • graphql 开头返回 graphql 的值。
    • 否则 返回 ngx.ctx.api_ctx 的值 或者 ngx.var 的值

    _newinex 设置缓存。

高性能的原因

1. 使用 ffi 的方式优化去值性能  


2. 通过维护缓存,减少多次取值。 
apisix源码分析-路由高性能的秘密 redix tree(8)

apisix源码分析-路由高性能的秘密 redix tree(8)

redixtree

lua-resty-radixtree : 一个基于 antirez/rax 实现的路由匹配算法。

实现了高性能的路由匹配。

redixtree 和 trietree 非常的类似。

trie tree

trietree 的结构如下,

Untitled.png

1
2
3
4
5
type TrieNode struct {
// Value rune
Iskey bool
Next map[rune]*TrieNode
}

节点下按照单个的字符作为树大的节点。通过节点能够快速的找到查找的路径。 比如需要找 inn 。

它的树深度由最长的路径决定。 在查找过程中我们希望树的深度能够浅一些。

比如想要查找 foot 和 foob 时,发现前缀都是 foo 开头的,如果把foo存在一个节点,树的深度不就减少了两层吗。这个就是下面要介绍的 radixtree 。

radix tree

Untitled.png

1
2
3
4
5
type RedixNode struct {
Value string
Iskey bool
Next map[rune]*RedixNode
}

在 apisix 中使用了开源的 radixtree 实现 rax 来作为低层的查询树。

节点定义如下:

1
2
3
4
5
6
7
typedef struct raxNode {
uint32_t iskey:1; /* Does this node contain a key? */
uint32_t isnull:1; /* Associated value is NULL (don't store it). */
uint32_t iscompr:1; /* Node is compressed. */
uint32_t size:29; /* Number of children, or compressed string len. */
unsigned char data[];
} raxNode;
1
2
3
4
5
6
7
8
9
10
11
12
*
* (f) ""
* /
* (i o) "f"
* / \
* "firs" ("rst") (o) "fo"
* / \
* "first" [] [t b] "foo"
* / \
* "foot" ("er") ("ar") "foob"
* / \
* "footer" [] [] "foobar"

节点通过 iscompr 来判断是压缩节点还是非压缩节点。

size 的大小为 29 bit 和前面三个变量一起 32 bit 内存对齐。

节点数据结构

对 data 部分的不同的处理达到不同的效果。

压缩节点结构:

前面是 size 长度的数据,中间是指向下一个节点的指针 z-ptr,最后是节点数据指针指向数据,当 iskey ==1 时存在节点数据。

[header iscompr=1][xyz][z-ptr](value-ptr?)

Untitled.png

非压缩节点结构

前面是 size 长度的数据,接着是一个 padding,接下来是size个指针指向下一个节点。最后是value-data指针指向数据。

[header iscompr=0][abc][a-ptr][b-ptr][c-ptr](value-ptr?)

Untitled.png

padding

1
2
3
4
5
6
7
/* 对齐函数 : 在申请内存时对齐到整数倍的字节 
sizeof(void*) 一个指针所占用的内存大小,4个字节或8个字节
nodesize +4 字节数+ 头部占用的32bit(4个字节)
(nodesize+4) % sizeof(void*) 对整体占用求余
(sizeof(void*)-1) :前面的已经是padding数了。这里做一个并集。??
*/
#define raxPadding(nodesize) ((sizeof(void*)-((nodesize+4) % sizeof(void*))) & (sizeof(void*)-1))

节点操作

添加和删除会出现压缩节点和非压缩节点的转换。

当出现节点分裂时,压缩节点在分裂点创建非压缩的子节点,

当删除时非压缩节点只有一个节点时会和上一级节点合并

读取data 和设置data ,中 raxNodeCurrentLength 获取了当前节点的长度 减去一个指针,就是data 的指针地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void raxSetData(raxNode *n, void *data) {
n->iskey = 1;
if (data != NULL) {
n->isnull = 0;
void **ndata = (void**)
((char*)n+raxNodeCurrentLength(n)-sizeof(void*));
memcpy(ndata,&data,sizeof(data));
} else {
n->isnull = 1;
}
}
/* Get the node auxiliary data. */
void *raxGetData(raxNode *n) {
if (n->isnull) return NULL;
void **ndata =(void**)((char*)n+raxNodeCurrentLength(n)-sizeof(void*));
void *data;
memcpy(&data,ndata,sizeof(data));
return data;
}

需要处理压缩节点和非压缩节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 用来查询字符串在redixtree 中能够匹配到哪个位置
static inline size_t raxLowWalk(rax *rax, unsigned char *s, size_t len, raxNode **stopnode, raxNode ***plink, int *splitpos, raxStack *ts) {
raxNode *h = rax->head;
raxNode **parentlink = &rax->head;
size_t i = 0; /* Position in the string. */
size_t j = 0; /* Position in the node children (or bytes if compressed).*/
while(h->size && i < len) {
debugnode("Lookup current node",h);
unsigned char *v = h->data;
if (h->iscompr) {
for (j = 0; j < h->size && i < len; j++, i++) {
if (v[j] != s[i]) break;
}
if (j != h->size) break;
} else {
/* Even when h->size is large, linear scan provides good
* performances compared to other approaches that are in theory
* more sounding, like performing a binary search. */
for (j = 0; j < h->size; j++) {
if (v[j] == s[i]) break;
}
if (j == h->size) break;
i++;
}
if (ts) raxStackPush(ts,h); /* Save stack of parent nodes. */
raxNode **children = raxNodeFirstChildPtr(h);
if (h->iscompr) j = 0; /* Compressed node only child is at index 0. */
memcpy(&h,children+j,sizeof(h));
parentlink = children+j;
j = 0; /* If the new node is compressed and we do not
iterate again (since i == l) set the split
position to 0 to signal this node represents
the searched key. */
}
debugnode("Lookup stop node is",h);
if (stopnode) *stopnode = h;
if (plink) *plink = parentlink;
if (splitpos && h->iscompr) *splitpos = j;
return i;
}

辅助结构

  1. raxStack : 用来返回访问节点的栈,当我们需要返回查询路径时使用。

  2. Iterator :

    raxSeek 跳转到匹配的位置

    raxNext 后向查找,能够循环查找到所有匹配位置的子节点。

    raxPrev 前向查找,能够向上查询访问路径。

lua-resty-radixtree

对 rdx 的封装

1
2
3
4
5
6
7
8
9
10
void *radix_tree_new();
int radix_tree_destroy(void *t);
int radix_tree_insert(void *t, const unsigned char *buf, size_t len,
int idx);
void *radix_tree_find(void *t, const unsigned char *buf, size_t len);
void *radix_tree_search(void *t, void *it, const unsigned char *buf, size_t len);
int radix_tree_prev(void *it, const unsigned char *buf, size_t len);
int radix_tree_next(void *it, const unsigned char *buf, size_t len);
int radix_tree_stop(void *it);
void *radix_tree_new_it(void *t);

实现了对 paths、hosts、methods、remote_adds、vars 、filter_fun 的匹配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
paths = {"/aa", "/bb*", "/name/:name/*other"},
hosts = {"*.bar.com", "foo.com"},
methods = {"GET", "POST", "PUT"},
remote_addrs = {"127.0.0.1","192.168.0.0/16",
"::1", "fe80::/32"},
vars = {
{"arg_name", "==", "json"},
{"arg_weight", ">", 10},
},
filter_fun = function(vars, opts)
return vars["arg_name"] == "json"
end,

metadata = "metadata /bb",
}

在 apisix 中通过 dispatch(path,opts) 来执行 handler 函数的回调。

1
2
3
4
5
6
7
function _M.dispatch(self, path, opts, ...)
...
local route, err = match_route(self, path, opts or empty_table, args)
...
handler(...)
return true
end
apisix源码分析-插件的运行(7)

apisix源码分析-插件的运行(7)

插件的运行

插件的 init_worker 监控了 etcd 的 pluglns 。

插件列表发生发生变化时通过 load 函数重新加载插件 。

load 函数获取启用的插件列表后,清理本地的插件,使用 load_plugin 函数通过 pcall(require, pkg_name)重新引入插件

插件在每个阶段通过 common_phase(phase_name) 来执行。执行流程如下:

  • 执行公共插件
  • 插件调用阶段 run_plugin(phase,plugins,api_ctx)
  • api_ctx.script_obj : 当使用有向无环图来做编辑时,apisix 会将图转换为脚本存储到 script_obj 变量中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
local function common_phase(phase_name)
local api_ctx = ngx.ctx.api_ctx
if not api_ctx then
return
end
-- 执行公共变量
plugin.run_global_rules(api_ctx, api_ctx.global_rules, phase_name)

if api_ctx.script_obj then -- 执行有向无环图编辑的脚本
script.run(phase_name, api_ctx)
return api_ctx, true
end

return plugin.run_plugin(phase_name, nil, api_ctx) -- 执行插件列表
end
apisix源码分析-route的匹配(6)

apisix源码分析-route的匹配(6)

route 是如何匹配的?

Route 中主要包含三部分内容:匹配规则(比如 uri、host、remote_addr 等),插件配置(限流限速等)和上游信息,通过 init_worker 阶段对 etcd 的监控,route 节点的数据同步到了user_routes的变量中,默认执行 http/router/radixtree_uri.lua 匹配规则 ,还可以使用 radixtree_host_uriradixtree_uri_with_parameter 。通过实现 match(api_ctx) 方法实现查询替换。

在 apisix 中使用了 redixtree 来实现路由匹配、域名证书、admin route 的匹配。 redixtree 相关分析可以看相关的分析。通过存储不同的path,来实现不同的需求。

  • radixtree_uri

    paths = route.value.uris or route.value.uri,

    在radixtree_uri 中直接使用了uri,也就是直接使用了请求的路径部分在 redixtree 中做匹配

  • radixtree_host_uri

    建立了两层 redixtree 来做 route 查询,第一层使用域名作为 path 创建 redixtree 和ssl的域名查询相同)。在域名对应的 key 在创建 uri 的 redixtree。形成两层查询。

  • radixtree_uri_with_parameter (未实现)

access 的执行

当获取到匹配的 route 后,就可以得到 route、plugin、service 等信息。这些信息会存储在变量api_ctx 中

1
2
3
4
5
6
7
8
9
api_ctx.matched_route = route
api_ctx.conf_type = "route&service"
api_ctx.conf_version = route.modifiedIndex .. "&" .. service.modifiedIndex
api_ctx.conf_id = route.value.id .. "&" .. service.value.id
api_ctx.service_id = service.value.id
api_ctx.service_name = service.value.name

api_ctx.route_id = route.value.id
api_ctx.route_name = route.value.name

对于插件有两种,列表或者通过编排的。

当发现 route.value.script 不为空时,会执行通过编排生成的 lua 脚本。

为空时执行插件列表。在执行插件列表时执行了会执行如下代码:

1
2
3
4
5
6
7
8
9
10
11
plugin.run_plugin("rewrite", plugins, api_ctx)
if api_ctx.consumer then
...
route, changed = plugin.merge_consumer_route(
route,
api_ctx.consumer,
api_ctx
)
...
end
plugin.run_plugin("access", plugins, api_ctx)

在这里可以发现,apisix 的插件的 rewrite 和 access 都是在 openresty 的 access 阶段执行的

在配置的时候我们会有默认插件,Consumer 绑定的插件, Route 绑定的插件,Service 绑定的插件。只列出插件 merge 操作,根据插件优先级的说明,同样的插件只会有一份有效,相同的插件会做合并。Plugin 配置可直接绑定在 Route 上,也可以被绑定在 Service 或 Consumer上。而对于同一 个插件的配置,只能有一份是有效的,配置选择优先级总是 Consumer > Route > Service。在 access 执行过程中可以看到如下的合并过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if route.value.plugin_config_id then
...
route = plugin_config.merge(route, conf)
end
if route.value.service_id then
...
route = plugin.merge_service_route(service, route)
...
end
if api_ctx.consumer then
...
route, changed = plugin.merge_consumer_route(
route,
api_ctx.consumer,
api_ctx
)
...
end
apisix源码分析-处理用户请求(5)

apisix源码分析-处理用户请求(5)

apisix 如何处理用户的请求?

api_ctx

在执行开始前, apisix 会给 ngx.ctx 增加一个变量 ngx.ctx.api_ctx 的变量。

1
2
api_ctx = core.tablepool.fetch("api_ctx", 0, 32) 
ngx_ctx.api_ctx = api_ctx

变量从 “api_ctx” 的 table 池( lua-tablepool) 中取出,用完后还给池。

(lua-tablepool 中一个 table 变量重复使用 20000 次后就会被新的取代)

上面的两行代码在 ssl 阶段和 access 阶段都存在,但是存储的内容是不同的。

ssl 阶段是证书信息,access 阶段是 route 相关的信息

ssl

ssl 阶段负责设置证书。 证书的设置可以看看 ngx.ssl 的相关说明,在 ssl_certificate_by_lua_block 阶段通过 ngx.ssl 设置证书和私钥来配置域名证书。

init_work : 初始化 ssl_certificates 变量,用来和 etcd 同步数据

1
2
3
4
5
6
7
ssl_certificates, err = core.config.new("/ssl", {
automatic = true,
item_schema = core.schema.ssl,
checker = function (item, schema_type)
return apisix_ssl.check_ssl_conf(true, item)
end,
})

证书匹配

调用了 ssl/router/radixtree_sni.lua 的 match_and_set 。

  • 建立域名查询树

    radixtree_router 初始化 radixtree 来存储域名信息

    radixtree_router_ver 存储 配置版本,通过版本号来决定是否需要重建 radixtree

  • 域名会通过 字符串反向(sni:reverse()) 后在 radixtree 中查询。

    api_ctx.matched_sni 存储匹配的域名

    api_ctx.matched_ssl 存储对应的证书信息

域名查询树建立

create_router(ssl_certificates.values)

对象

1
2
3
4
5
6
7
8
9
10
route_items[idx] = {
paths = sni, -- 反向后的域名
handler = function (api_ctx) -- 回调函数
if not api_ctx then
return
end
api_ctx.matched_ssl = ssl
api_ctx.matched_sni = sni
end
}

证书有两种,全匹配证书,泛域名证书。

1
2
3
4
5
6
for _, msni in ipairs(api_ctx.matched_sni) do
-- 完全匹配 或者 相对于msni sni_srv 后面没有点(泛匹配)?
if sni_rev == msni or not str_find(sni_rev, ".", #msni) then
matched = true
end
end

按照 redixtree 最长匹配原则,证书会优先匹配到全匹配证书。

access

apisix 匹配路由是在 access 阶段完成的,功能由 router_http.match(api_ctx) 提供

apisix源码分析-admin api 的注册和路由(4)

apisix源码分析-admin api 的注册和路由(4)

admin api

admin 注册

1
2
3
4
5
6
7
8
9
10
11
12
location /apisix/admin {
set $upstream_scheme 'http';
set $upstream_host $http_host;
set $upstream_uri '';

allow 0.0.0.0/0;
deny all;

content_by_lua_block {
apisix.http_admin()
}
}

在 admin/init.lua 中,通过 init_worker 初始化 admin route ,并将本地插件配置同步到etcd中,并注册了插件重启事件,当触发"/apisix/admin/plugins/reload"触发同步本地插件列表到 etcd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
router = route.new(uri_route)
events = require("resty.worker.events")
-- 注册插件重启事件
events.register(reload_plugins, reload_event, "PUT")
if ngx_worker_id() == 0 then
local ok, err = ngx_timer_at(0, function(premature)
if premature then
return
end
-- 将本地插件配置同步到 etcd
sync_local_conf_to_etcd(true)
end)

if not ok then
error("failed to sync local configure to etcd: " .. err)
end
end

admin 路由

router = route.new(uri_route) 是一个 redixtree 对象。用来做 admin api 的路由匹配 ,可以通过uri_route 参数看到对不同接口的实现。

run 作为 admin api 的主要实现,通过 uri 和m来执行 resource 中的调用。

有如下两种类型的 api ,分别对应到系统组件,比如 routes ,servers,ssl 等。

/apisix/admin/schema//

这里的 seg_res 是对应的组件名,seg_id 是组件中对应配置的id

和插件路径

/apisix/admin/schema/plugins//

这里的seg_res 对应的是插件名,seg_id 对应的是插件配置的id

通过 seg_res 获取对应的执行对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
local resources = {
routes = require("apisix.admin.routes"),
services = require("apisix.admin.services"),
upstreams = require("apisix.admin.upstreams"),
consumers = require("apisix.admin.consumers"),
schema = require("apisix.admin.schema"),
ssl = require("apisix.admin.ssl"),
plugins = require("apisix.admin.plugins"),
proto = require("apisix.admin.proto"),
global_rules = require("apisix.admin.global_rules"),
stream_routes = require("apisix.admin.stream_routes"),
plugin_metadata = require("apisix.admin.plugin_metadata"),
plugin_configs = require("apisix.admin.plugin_config"),
}

local resource = resources[seg_res]

通过 method 来执行resource 对应的请求函数

1
2
local code, data = resource[method](seg_id, req_body, seg_sub_path,
uri_args)

resources 的定义可以看出,admin 的路由 用来实现 api 的操作, 涉及到 routes services upstreams consumers schema ssl plugins proto global_rules stream_routes plugin_metadata plugin_configs

api 的作用就是用来对 etcd 进行 crud 的操作。 对每个组件的 api ,实现大概如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
local _M = {
version = 0.1,
}


local function check_conf(...)
... -- 对 put 写入 etcd 的数据校验
-- 使用 schema_def.lua 里对应的校验数据进行校验
-- 返回 etcd key
end

_M.put(...) -- etcd set
_M.get(...) -- etcd get
_M.post(...) -- etcd push
_M.delete(...) -- etcd delete
return _M

在前面的 resource[method] 通过 method= put | get | post | delete 就可以执行到对应的函数.

apisix源码分析-初始化过程(3)

apisix源码分析-初始化过程(3)

init

init 对应到 openresty 的 init_by_lua_blockinit_worker_by_lua_block 的阶段

init_by_lua_block 所做的不多,执行 apisix.http_init(args),args 里面传入了 dns_resolver 配置的参数,比如使用docker-compose 启动传入的127.0.0.11 。

1
2
3
4
5
local dns_resolver = { "127.0.0.11", }
local args = {
dns_resolver = dns_resolver,
}
apisix.http_init(args)

在init 过程中:

1
2
3
4
5
6
7
8
9
10
11
12
-- dns 发现设置: args["dns_resolver"]
core.resolver.init_resolver(args)
-- 生成 apisix uuid /conf/apisix.uid
core.id.init()
--https://github.com/openresty/lua-resty-core/blob/master/lib/ngx/process.md#enable_privileged_agent
-- 开启特权进程,可以使用 master 进程权限执行一些功能
local process = require("ngx.process")
local ok, err = process.enable_privileged_agent()

-- 读初始化配置, core/config_etcd.lua 执行 init 判断 etcd 是否可访问。
local ok, err = core.config.init()

init_worker

apisix/init/http_init_worker()

init_worker 阶段执行了所有组件的初始化,也就是对应组件的 init_worker 方法。

比如 admin timers plugin plugin_config route service consumer config upstream ssl。、

这个阶段主要的作用是初始化各个组件。每个组件自身都实现了一个 init_worker 方法,除了自身相关的内容,主要是同步组件自身对应的 etcd 信息。

etcd watch 实现

1
obj, err = core.config.new("/etcd_path", cfg)

比如在 router 的 http_init_worker 阶段,watch 了 /global_rules

1
2
3
4
5
local global_rules, err = core.config.new("/global_rules", {
automatic = true,
item_schema = core.schema.global_rule,
checker = plugin_checker,
})

core.config.new(key,ops)

函数对应的执行在文件 core/config_etcd.lua 中。

> 在 apisix/core.lua 中 local config = require("apisix.core.config_" .. config_center), config_center 来自配置文件,默认为 etcd。 

key 为 etcd 监控的节点,ops 为需要的参数。

当 automatic == true 时 ,会创建 ngx.timer_at , 否则只会初始化 etcd_cli

load_full_data 函数,将 etcd 取回的数据填充到 key 对应的 obj 中(将etcd 数据转换为 lua 对象)

- self.item_scheme 检验数据
- self.checker 对象检查
- 将数据写入 self.value  , self.values_hash[key] 记录对应key的index
- 执行 self.filter

_automatic_fetch 函数,启动 ngx_timer_at 调用函数 _automatic_fetch ,通过自调用形成迭代。

在_automatic_fetch函数中真正执行同步的是 sync_data 函数

sync_data 函数,读取 etcd,并 watch etcd ,将数据写到对象中 。

   self.values :  array 用来存储 etcd 写回的数据


   self.values_hash: hash 用来存储 etcd key 对应的 values 的下标, 在 etcd 中 {/routes/aaa:<data>} 同步到 apisix 的存储 


假设 etcd 有如下 key


`etcd: /routes/aaaa : <data>` 


在self 中存储: 
`self.values = [<data>,...]
self.values_hash= {"/routes/aaa":1}` 

出现修改数据

1
2
insert_tab(self.values, res)
self.values_hash[key] = #self.values

当出现删除数据时

1
2
3
self.sync_times = self.sync_times + 1
self.values[pre_index] = false
self.values_hash[key] = nil

这里出现的 sync_times 是一个计数器,用来记录删除values 数量,当超过 100 的时候,会对 values 里的数据进行重建。

self.conf_version 记录版本变化次数

服务发现

服务发现的 init_worker 会循环调用配置的服务发现。 discovery_type 为配置文件中配置的服务发现。

1
2
3
4
local discovery_type = local_conf.discovery
for discovery_name, _ in pairs(discovery_type) do
discovery[discovery_name].init_worker()
end

我们可以在 discovery 目录找到对应的服务发现的实现。服务发现需要实现两个接口。

1
2
3
4
local _M = {}
function _M.nodes(service_name)
function _M.init_worker()
return _M

为了看分析比较简单,可以查看 discovery/dns.lua 的实现。

init_woker 函数初始化了 dns_client 。

nodes 函数通过 dns 服务发现来获取 service_name 对应的服务。并将返回的内容转换成 apisix 的的格式。

1
2
3
4
5
nodes[i] = {host = r.address, weight = r.weight or 1, port = r.port or port}
if r.priority then
-- for SRV record, nodes with lower priority are chosen first
nodes[i].priority = -r.priority
end

目前 apisix 支持了 consul_kv dns eureka nacos 四种服务发现,如果想要其他的可以自行实现这两个函数来支持业务。

在将upstream 和 route 绑定时会 set_by_route 会触发服务发现的 nodes 函数。

access

apisix 匹配路由是在 access 阶段完成的,功能由 router_http.match(api_ctx) 提供

header_filter

会设置一些 apisix 自定义的 header ,调用 common_phase

body_filter

调用 common_phase

log

调用 common_phase

被动健康检查

释放前面阶段里通过 tablepool 申请的对象 (api_ctx plugins uri_parse_param matched_route_record )

balancer

在access 阶段就计算出了要均衡到的后端,设置并执行 balancer.lua 的 run 函数