go-singlefilght

singleflight

1
2
golang.org/x/sync/singleflight

singleflight 是 go 提供的一个扩展并发原语,主要是用来合并请求来降低服务压力。

code

1
https://cs.opensource.google/go/x/sync/+/036812b2:singleflight/singleflight.go

原理

实现了一个 Group 的 struct

1
2
3
4
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}

当一个请求过来的时候,使用 mu 加锁,然后判断 m 这个 map 里面是否有相关的键,如果有说明这个 key 相关的请求已经发生了。等待函数返回。如果没有找到 key ,将 key 加入到 m 中,并使用 doCall 来做请求。

开放调用的函数有三个,分别是

1
2
3
4
5
6
// 用户执行函数,返回结果
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
// 执行函数,通过返回一个 chan
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result
// 用来过期一个 key
func (g *Group) Forget(key string)

Do

关键点是使用c.wg sync.WaitGroup来做执行等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
// 初始化 map
if g.m == nil {
g.m = make(map[string]*call)
}
// 判断是否存在 key
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait() // 存在的话就直接等待返回

if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()

g.doCall(c, key, fn) // 同步执行真正的调用
return c.val, c.err, c.dups > 0
}

DoChan

基本代码和 DoCall 一致,只是实例化了一个 1 个空间的 chan,添加到 c.chans 来异步的等待结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch) // 除了多了一个 chans 用来做 chan 返回,其他的部分和 Do 基本一致
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()

go g.doCall(c, key, fn) //因为是使用了 chan 来做数据交换,这里异步执行

return ch
}

Forget

Forget 只做了一件事情,就是将指定 key 的 forgotten 修改为 true ,然后将 key 删除。
这里为什么要修改为 true 了,在删除呢?是因为有可能其他的协程可能还在使用这个 call。

doCall

doCall 是真正执行的函数, 大部分都是对错误返回的处理,主要调用就是 c.val, c.err = fn() 和循环给c.chans 广播返回结果,使 DoChan 得到返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {

normalReturn := false
recovered := false


defer func() {
if !normalReturn && !recovered {
c.err = errGoexit
}

c.wg.Done()
g.mu.Lock()
defer g.mu.Unlock()
if !c.forgotten { // 执行完成后通过 forgotten 来删除key
delete(g.m, key)
}

if e, ok := c.err.(*panicError); ok {
if len(c.chans) > 0 {
go panic(e)
select {} // Keep this goroutine around so that it will appear in the crash dump.
} else {
panic(e)
}
} else if c.err == errGoexit {
// Already in the process of goexit, no need to call again
} else {
// Normal return
for _, ch := range c.chans {
// chans 不为空时,将返回数据回写。
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {
defer func() {
if !normalReturn { // 请求发生错误的兜底 recover
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()

c.val, c.err = fn() // 真正执行函数的位置
normalReturn = true
}()

if !normalReturn {
recovered = true
}
}
作者

张巍

发布于

2021-06-16

更新于

2022-04-22

许可协议

评论