openresty lua_resty_counter 源码

openresty lua_resty_counter 源码

无锁的累加库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
local ngx_shared = ngx.shared
local pairs = pairs
local ngx = ngx
local error = error
local setmetatable = setmetatable
local tonumber = tonumber
-- table 清理函数
local clear_tab
do
local ok
ok, clear_tab = pcall(require, "table.clear")
if not ok then
clear_tab = function(tab)
for k in pairs(tab) do
tab[k] = nil
end
end
end
end

local _M = {
_VERSION = '0.2.1'
}
local mt = { __index = _M }

-- local cache of counters increments
-- worker 全局存储,key 为 shdict_name ,value 为对应的 table
-- <shdict_name>={}
local increments = {}
-- boolean flags of per worker sync timers
-- 用来记录是否开启的自动同步 timer_started[shdict_name]=true
local timer_started = {}

local id

-- 同步函数: 将 worker 的计数器同步到 ngx.shared 中
local function sync(_, self)
local err, _, forcible
local ok = true
-- 循环 worker 全局字典,将 increments 的计数同步到 dict 中
for k, v in pairs(self.increments) do
_, err, forcible = self.dict:incr(k, v, 0)
if forcible then
ngx.log(ngx.ERR, "increasing counter in shdict: lru eviction: key=", k)
ok = false
end
if err then
ngx.log(ngx.ERR, "error increasing counter in shdict key: ", k, ", err: ", err)
ok = false
end
end
-- 同步完成后清理字典,并设置 error_metric_name 为 1
clear_tab(self.increments)
if ok == false then
self.dict:incr(self.error_metric_name, 1, 0)
end

return ok
end


function _M.new(shdict_name, sync_interval, error_metric_name)
id = ngx.worker.id()

if not ngx_shared[shdict_name] then
error("shared dict \"" .. (shdict_name or "nil") .. "\" not defined", 2)
end

if not increments[shdict_name] then
increments[shdict_name] = {}
end
-- 构建当前对象的计数器
local self = setmetatable({
dict = ngx_shared[shdict_name],
increments = increments[shdict_name],
error_metric_name = error_metric_name,
}, mt)

-- 更具设置的同步时间,设置定时同步 ngx.timer.every
if sync_interval then
sync_interval = tonumber(sync_interval)
if not sync_interval or sync_interval < 0 then
error("expect sync_interval to be a positive number", 2)
end
if not timer_started[shdict_name] then
ngx.log(ngx.DEBUG, "start timer for shdict ", shdict_name, " on worker ", id)
ngx.timer.every(sync_interval, sync, self)
timer_started[shdict_name] = true
end
end

return self
end
-- 主动同步内容
function _M:sync()
return sync(false, self)
end
-- incr 只执行 increments 的累加
function _M:incr(key, step)
step = step or 1
local v = self.increments[key]
if v then
step = step + v
end

self.increments[key] = step
return true
end
-- reset 清理 dict
function _M:reset(key, number)
if not number then
return nil, "expect a number at #2"
end
return self.dict:incr(key, -number, number)
end
-- get 获取 dict 的内容
function _M:get(key)
return self.dict:get(key)
end

function _M:get_keys(max_count)
return self.dict:get_keys(max_count)
end

return _M

openresty lua_resty_counter 源码

https://beixiu.net/lua-resty-counter-source/

作者

张巍

发布于

2023-08-11

更新于

2023-08-11

许可协议

评论