背景

一般情况下我们在写一写对外的服务的时候都会有一层 cache 作为缓存,用来减少底层数据库的压力,但是在遇到例如 redis 抖动或者其他情况可能会导致大量的 cache miss 出现。

假如我们要访问一个文件,但是由于各种问题,这个请求发送了 1000 次,如果不做处理,让他直接访问缓存或者直接访问数据库,那么就会显著增大后端压力,降低并发性能。

这个时候我们就可以使用 singleflight 库文件,直译过来叫做单飞,主要作用就是把一组相同的请求合并为一个请求,实际上只会请求一次,这样会显著减少数据库和缓存的压力。

原理

singleflight 一共提供了三种接口,具体的说明在这里 singleflight

  • func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool):执行并返回给定函数的结果,确保给定键一次只执行一次。如果有重复的进来,重复的调用者等待原始函数完成并收到相同的结果。共享的返回值指示是否将v提供给多个调用者。
  • func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result:DoChan与Do类似,但返回一个通道,该通道将在结果准备就绪时接收结果。(与 Do 的返回结果不同
  • func (g *Group) Forget(key string):告诉 singleflight 删除一个键。未来对该键的Do调用将调用该函数,而不是等待较早的调用完成。

基础数据结构

一共有两组元素,有一个 map 来保存映射关系,来合并重复请求,还有一个 mutex 锁来保护 m,因为在 golang 中 map 不是并发安全的。

map 内部没有锁机制,如果当扩容的时候,把数据复制到 oldbucket 中,然后突然一个 goroutine 写入一个数据,有可能 new bucket 还没有初始化,或者打断了迁移的状态等,那么会造成数据混乱

1
2
3
4
5
6
// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}

map 是一个关于 key -> call 的映射,call 是用来表示一次”正在进行或已经完成”的共享请求操作的结构体。这个机制用于避免缓存击穿时多个 goroutine 重复请求同一个资源。多个 goroutine 请求相同的 key,只会触发一个 Do 调用,其他的都会等待这个 call 的结果。call 的数据结构定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// call is an in-flight or completed singleflight.Do call
type call struct {
wg sync.WaitGroup

// These fields are written once before the WaitGroup is done
// and are only read after the WaitGroup is done.
// 只会写入一次值
val any
err error

// These fields are read and written with the singleflight
// mutex held before the WaitGroup is done, and are read but
// not written after the WaitGroup is done.
// 统计调用次数以及需要返回的 channel
dups int
chans []chan<- Result
}

Do 和 DoChan

Do 和 DoChan 的功能都是向上提供一个 doCall 的接口,只是返回值不同罢了。在底层代码中只有在存在多次请求的时候才有部分区别。

相同:

  • 懒加载,只会在被使用的时候才会加载。
  • 都是共享相同结果,来直接进行返回。
  • 内部更新相似,除了 chans 不同

不同:

  • 主要是 Do 使用的是 wg 来进行阻塞,DoChan 直接用 channel 的阻塞特性来进行阻塞。
  • 返回方式不同,一个返回 (val, err, shared),另一个返回 channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()

if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()

g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}

doCall

这部分代码是 fn 函数的实际执行逻辑。当 fn 执行完成后,会调用 wg.Done() 来唤醒因调用 Do 而阻塞等待的其他 goroutine。随后,会从 m.calls 中删除当前 key,以避免该条目污染后续的请求。最后,它会向所有等待在 DoChan 的 channel 中的 goroutine 发送结果,以确保并发调用者能够获得相同的返回值。

这一版感觉非常的复杂,和我之前在 github 上看到的一版不同。链接在这 singleflight,可以看到实现非常简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (any, error)) {
c.val, c.err = fn()

g.mu.Lock()
c.wg.Done()
if g.m[key] == c {
delete(g.m, key)
}
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
g.mu.Unlock()
}

但是,下面一版加了很多关于 panic 的处理,有点头皮发麻。但是大致的意思是很明确的——显式触发 panic,让程序在 fn 中 panic 后能继续崩溃退出,而不是因为 recover 导致主 goroutine 继续执行,从而让等待的 goroutine 永远收不到结果,最终造成死锁

处理方法是:宁愿整个程序 crash,也不能因为 panic 被吞而导致多个请求 goroutine 被永远挂起

注释里面有一份文档,我先把他贴出来:

errgroup: rethink concurrency patterns

A few events in the last couple of weeks have prompted me to revise errgroup a bit.

  1. https://golang.org/cl/131815 pointed out that errgroup can cause tests to deadlock if t.Fatal or t.Skip is called in one of the goroutines in the group. While we do not recommend t.Fatal or t.Skip outside the test’s main goroutine, it’s difficult to debug such a deadlock and arguably more useful if we build in some sort of support.
  2. In my GopherCon talk, “Rethinking Classical Concurrency Patterns”, I recommended that API authors “make concurrency an internal detail.” In multiple discussions after the talk, folks asked me how to handle panics in goroutines, and I realized that making concurrency an internal detail requires that we propagate panics (and runtime.Goexit calls) back to the caller’s goroutine.
  3. Kevin Burke asked whether I would be interested in a proposal to move errgroup to the standard library. I replied that there is at least one open problem with the API: namely, that unlike sync.WaitGroup, an errgroup.Group cannot be reused after Wait is called. My reasoning was that the associated context.CancelFunc must be called in order to avoid leaking resources, and I did not want to expand the required API surface with a boilerplate cancel call.
    I looked at the examples again with that reasoning in mind, and realized that it is almost always more robust to defer two operations: “cancel the goroutines” and “wait for the goroutine to exit”. A single call suffices for both, and simplifies many functions that return early on error.

Updates golang/go#15758.
Updates golang/go#25448.
Change-Id: Ica4ce9e4569867d1485f19365af76ca010d7b6aa

为了更好的处理子协程的异常情况,建议使用 defer 延迟执行两种操作,“取消 goroutine” 和 “等待 goroutine 全部退出”。doCall 部分也是这样来实现的。

I looked at the examples again with that reasoning in mind, and realized that it is almost always more robust to defer two operations: “cancel the goroutines” and “wait for the goroutine to exit”.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false

// use double-defer to distinguish panic from runtime.Goexit,
// more details see https://golang.org/cl/134395
// 双重 defer 区别 panic 和 runtime.Goexit(两者在语义上都不会走完函数逻辑)
// runtime.Goexit 不会触发 recover(),这也就是为什么要用 double-defer 来判断
defer func() {
// the given function invoked runtime.Goexit
// 如果没有正常返回,也没有触发 recover,则说明是 runtime.Goexit 导致的返回。
if !normalReturn && !recovered {
c.err = errGoexit
}

// 状态清理部分
g.mu.Lock()
defer g.mu.Unlock()
// 唤醒等待的的 goroutine
c.wg.Done()
// 删除 call 对应的 key,防止内存泄露和污染下一次请求。
if g.m[key] == c {
delete(g.m, key)
}

// 我对这段代码的理解是,如果在 fn 发生了 panic,
// recover 后并不会干涉到 doCall 的执行,fn 只会返回一个 error
// 如果有多个 goroutine 正在等待结果,需要保证 doCall 能 crash 掉,对外做出反应,让等待着的 goroutine 能正常退出,防止苦等而造成的锁

// 解决方法就是在一个新的 goroutine 中触发 panic
// 并让当前 goroutine 阻塞不退出,从而保证崩溃信息完整且所有等待者都能感知异常。
if e, ok := c.err.(*panicError); ok {
// In order to prevent the waiting channels from being blocked forever,
// needs to ensure that this panic cannot be recovered.
if len(c.chans) > 0 {
// 为什么会新开一个 goroutine 呢?
// 在新的 goroutine 里触发 panic,可以让这个新的 goroutine 崩溃
// 操作系统或运行时会把这个 panic 视为程序异常终止,产生完整的崩溃日志(crash dump)。
go panic(e)
// Keep this goroutine around so that it will appear in the crash dump.
select {}
} else {
// 没有等待的 channel,直接触发 panic
panic(e)
}
} else if c.err == errGoexit {
// Already in the process of goexit, no need to call again
} else {
// Normal return
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()

// 用闭包做一层包裹,内层 defer 用于捕获 panic
func() {
defer func() {
if !normalReturn {
// Ideally, we would wait to take a stack trace until we've determined
// whether this is a panic or a runtime.Goexit.
//
// Unfortunately, the only way we can distinguish the two is to see
// whether the recover stopped the goroutine from terminating, and by
// the time we know that, the part of the stack trace relevant to the
// panic has been discarded.
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()

c.val, c.err = fn()
normalReturn = true
}()

if !normalReturn {
recovered = true
}
}

defer 执行顺序是先执行匿名函数中的 defer,其次才是最外面的 defer

defer 的执行是先进后出

Forget

简单来说就是删除 map 中的 key,没有什么特别的地方。

1
2
3
4
5
6
7
8
// Forget tells the singleflight to forget about a key.  Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
}