Part5. Concurrency
1. Channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
type waitq struct {
first *sudog
last *sudog
}
// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
// sudog is necessary because the g ↔ synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
//
|
如果目标 Channel 没有被关闭并且已经有处于读等待的 Goroutine,那么 runtime.chansend
会从接收队列 recvq
中取出最先陷入等待的 Goroutine 并直接向它发送数据:
1
2
3
4
| if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
|
- 调用
runtime.sendDirect
将发送的数据直接拷贝到 x = <-c
表达式中变量 x
所在的内存地址上; - 调用
runtime.goready
将等待接收数据的 Goroutine 标记成可运行状态 Grunnable
并把该 Goroutine 放到发送方所在的处理器的 runnext
上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方;
1
2
3
4
5
6
7
8
9
10
| func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
goready(gp, skip+1)
}
|
如果创建的 Channel 包含缓冲区并且 Channel 中的数据没有装满,会执行下面这段代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
}
|
在这里我们首先会使用 runtime.chanbuf
计算出下一个可以存储数据的位置,然后通过 runtime.typedmemmove
将发送的数据拷贝到缓冲区中并增加 sendx
索引和 qcount
计数器。
当 Channel 没有接收者能够处理数据时,向 Channel 发送数据会被下游阻塞,当然使用 select
关键字可以向 Channel 非阻塞地发送消息。向 Channel 阻塞地发送数据会执行下面的代码,我们可以简单梳理一下这段代码的逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if !block {
unlock(&c.lock)
return false
}
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.g = gp
mysg.c = c
gp.waiting = mysg
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
gp.waiting = nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true
}
|
当我们从一个空 Channel 接收数据时会直接调用 runtime.gopark
让出处理器的使用权。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
|
如果当前 Channel 已经被关闭并且缓冲区中不存在任何数据,那么会清除 ep
指针中的数据并立刻返回。
除了上述两种特殊情况,使用 runtime.chanrecv
从 Channel 接收数据时还包含以下三种不同情况:
- 当存在等待的发送者时,通过
runtime.recv
从阻塞的发送者或者缓冲区中获取数据; - 当缓冲区存在数据时,从 Channel 的缓冲区中接收数据;
- 当缓冲区中不存在数据时,等待其他 Goroutine 向 Channel 发送数据;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
goready(gp, skip+1)
}
|
- 如果 Channel 不存在缓冲区;
- 调用
runtime.recvDirect
将 Channel 发送队列中 Goroutine 存储的 elem
数据拷贝到目标内存地址中;
- 如果 Channel 存在缓冲区;
- 将队列中的数据拷贝到接收方的内存地址;
- 将发送队列头的数据拷贝到缓冲区中,释放一个阻塞的发送方;
当 Channel 的发送队列中不存在等待的 Goroutine 并且缓冲区中也不存在任何数据时,从管道中接收数据的操作会变成阻塞的,然而不是所有的接收操作都是阻塞的,与 select
语句结合使用时就可能会使用到非阻塞的接收操作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if !block {
unlock(&c.lock)
return false, false
}
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
gp.waiting = mysg
mysg.g = gp
mysg.c = c
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
gp.waiting = nil
closed := gp.param == nil
gp.param = nil
releaseSudog(mysg)
return true, !closed
}
|
在正常的接收场景中,我们会使用 runtime.sudog
将当前 Goroutine 包装成一个处于等待状态的 Goroutine 并将其加入到接收队列中。
完成入队之后,上述代码还会调用 runtime.goparkunlock
立刻触发 Goroutine 的调度,让出处理器的使用权并等待调度器的调度。
ch <- i
向 Channel 发送数据时遇到的几种情况:
- 如果当前 Channel 的
recvq
上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前 Goroutine 并将其设置成下一个运行的 Goroutine; - 如果 Channel 存在缓冲区并且其中还有空闲的容量,我们会直接将数据存储到缓冲区
sendx
所在的位置上; - 如果不满足上面的两种情况,会创建一个
runtime.sudog
结构并将其加入 Channel 的 sendq
队列中,当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据;
发送数据的过程中包含几个会触发 Goroutine 调度的时机:
- 发送数据时发现 Channel 上存在等待接收数据的 Goroutine,立刻设置处理器的
runnext
属性,但是并不会立刻触发调度; - 发送数据时并没有找到接收方并且缓冲区已经满了,这时会将自己加入 Channel 的
sendq
队列并调用 runtime.goparkunlock
触发 Goroutine 的调度让出处理器的使用权;
从 Channel 中接收数据时可能会发生的五种情况:
- 如果 Channel 为空,那么会直接调用
runtime.gopark
挂起当前 Goroutine; - 如果 Channel 已经关闭并且缓冲区没有任何数据,
runtime.chanrecv
会直接返回; - 如果 Channel 的
sendq
队列中存在挂起的 Goroutine,会将 recvx
索引所在的数据拷贝到接收变量所在的内存空间上并将 sendq
队列中 Goroutine 的数据拷贝到缓冲区; - 如果 Channel 的缓冲区中包含数据,那么直接读取
recvx
索引对应的数据; - 在默认情况下会挂起当前的 Goroutine,将
runtime.sudog
结构加入 recvq
队列并陷入休眠等待调度器的唤醒;
我们总结一下从 Channel 接收数据时,会触发 Goroutine 调度的两个时机:
- 当 Channel 为空时;
- 当缓冲区中不存在数据并且也不存在数据的发送者时;
2. Select
Linux的select
是操作系统中的系统调用,我们经常会使用 select
、poll
和 epoll
等函数构建 I/O 多路复用模型提升程序的性能(synchronous I/O multiplexing)。Go 语言的 select
与操作系统中的 select
比较相似,C 语言的 select
系统调用可以同时监听多个文件描述符的可读或者可写的状态,Go 语言中的 select
也能够让 Goroutine 同时等待多个 Channel 可读或者可写,在多个文件或者 Channel状态改变之前,select
会一直阻塞当前线程或者 Goroutine。
如果是带有chan的语句,则会直接执行如下操作:
- 当存在可以收发的 Channel 时,直接处理该 Channel 对应的
case
; - 当不存在可以收发的 Channel 时,执行
default
中的语句;
空的select会直接转换成调用 runtime.block
函数,阻塞这个Goroutine并且永远无法唤醒。
如果当前的 select
条件只包含一个 case
,那么编译器会将 select
改写成 if
条件语句。下面对比了改写前后的代码:
1
2
3
4
5
6
7
8
9
10
11
12
| // 改写前
select {
case v, ok <-ch: // case ch <- v
...
}
// 改写后
if ch == nil {
block()
}
v, ok := <-ch // case ch <- v
...
|
当 select
中仅包含两个 case
,并且其中一个是 default
时,Go 语言的编译器就会认为这是一次非阻塞的收发操作。cmd/compile/internal/gc.walkselectcases
会对这种情况单独处理。不过在正式优化之前,该函数会将 case
中的所有 Channel 都转换成指向 Channel 的地址,我们会分别介绍非阻塞发送和非阻塞接收时,编译器进行的不同优化。
编译器会使用如下的流程处理 select
语句
将所有的 case
转换成包含 Channel 以及类型等信息的 runtime.scase
结构体;
调用运行时函数 runtime.selectgo
从多个准备就绪的 Channel 中选择一个可执行的 runtime.scase
结构体;
通过 for
循环生成一组 if
语句,在语句中判断自己是不是被选中的 case
;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
ncases := nsends + nrecvs
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]
norder := 0
for i := range scases {
cas := &scases[i]
}
for i := 1; i < ncases; i++ {
j := fastrandn(uint32(i + 1))
pollorder[norder] = pollorder[j]
pollorder[j] = uint16(i)
norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]
// 根据 Channel 的地址排序确定加锁顺序
...
sellock(scases, lockorder)
...
}
|
Go 语言会对 select 语句进行优化,它会根据 select 中 case 的不同选择不同的优化路径:
- 空的 select 语句会被转换成调用 runtime.block 直接挂起当前 Goroutine;
- 如果 select 语句中只包含一个 case,编译器会将其转换成 if ch == nil { block }; n; 表达式;
- 首先判断操作的 Channel 是不是空的;然后执行 case 结构中的内容;如果 select 语句中只包含两个 case 并且其中一个是 default,那么会使用 runtime.selectnbrecv 和 runtime.selectnbsend 非阻塞地执行收发操作;
在默认情况下会通过 runtime.selectgo 获取执行 case 的索引,并通过多个 if 语句执行对应 case 中的代码;
在编译器已经对 select 语句进行优化之后,Go 语言会在运行时执行编译期间展开的 runtime.selectgo 函数,该函数会按照以下的流程执行:
- 随机生成一个遍历的轮询顺序 pollOrder 并根据 Channel 地址生成锁定顺序 lockOrder;
- 根据 pollOrder 遍历所有的 case 查看是否有可以立刻处理的 Channel;
1.如果存在,直接获取 case 对应的索引并返回;
2.如果不存在,创建 runtime.sudog 结构体,将当前 Goroutine 加入到所有相关 Channel 的收发队列,并调用 runtime.gopark 挂起当前 Goroutine 等待调度器的唤醒;
- 当调度器唤醒当前 Goroutine 时,会再次按照 lockOrder 遍历所有的 case,从中查找需要被处理的 runtime.sudog 对应的索引;
3. Locker
1
2
3
4
5
| // A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
|
4. Mutex
1
2
3
4
5
6
7
8
9
| type Mutex struct {
state int32
sema uint32
}
//在默认情况下,互斥锁的所有状态位都是 0,int32 中的不同位分别表示了不同的状态:
//waitersCount — 当前互斥锁上等待的 Goroutine 个数;
//mutexStarving — 当前的互斥锁进入饥饿状态;
//mutexWoken — 表示从正常模式被从唤醒;
//mutexLocked — 表示互斥锁的锁定状态;
|
在正常模式下,锁的等待者会按照先进先出的顺序获取锁。但是刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,
在cas状态不成功时,会进入如下的一个循环,判断当前 Goroutine 能否进入自旋:
- 互斥锁只有在普通模式才能进入自旋;
- runtime.sync_runtime_canSpin 需要返回 true:
- 运行在多 CPU 的机器上,并且GOMAXPROCS > 1;
- 当前 Goroutine 为了获取该锁进入自旋的次数小于四次;
- 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
|
在处理自旋逻辑以后,互斥锁会再次根据上下文来计算互斥锁最新的状态
1
2
3
4
5
6
7
8
9
10
11
12
13
| new := old
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
new &^= mutexWoken
}
|
在互斥锁状态更新结束以后,会让CAS再次更新
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // 通过 CAS 函数获取了锁
}
...
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
|
在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环;
在饥饿模式下,当前 Goroutine 会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出;
- 通过自旋等待互斥锁的释放;
- 计算互斥锁的最新状态;
- 更新互斥锁的状态并获取锁;
如果该函数返回的新状态等于 0,当前 Goroutine 就成功解锁了互斥锁;
如果该函数返回的新状态不等于 0,这段代码会调用 sync.Mutex.unlockSlow 开始慢速解锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 { // 正常模式
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else { // 饥饿模式
runtime_Semrelease(&m.sema, true, 1)
}
}
|
5. RWMutex
1
2
3
4
5
6
7
| type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
|
writerSem和readerSem分别用于写等待读和读等待写
当资源的使用者想要获取写锁时,需要调用 sync.RWMutex.Lock 方法:
1
2
3
4
5
6
7
| func (rw *RWMutex) Lock() {
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
|
调用结构体持有的 sync.Mutex 结构体的 sync.Mutex.Lock 阻塞后续的写操作;
因为互斥锁已经被获取,其他 Goroutine 在获取写锁时会进入自旋或者休眠;
调用 sync/atomic.AddInt32 函数阻塞后续的读操作:
如果仍然有其他 Goroutine 持有互斥锁的读锁,该 Goroutine 会调用 runtime.sync_runtime_SemacquireMutex 进入休眠状态等待所有读锁所有者执行结束后释放 writerSem 信号量将当前协程唤醒;
1
2
3
4
5
6
7
8
9
10
| func (rw *RWMutex) Unlock() {
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
throw("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
rw.w.Unlock()
}
|
- 调用
sync/atomic.AddInt32
函数将 readerCount
变回正数,释放读锁; - 通过 for 循环释放所有因为获取读锁而陷入等待的 Goroutine:
- 调用
sync.Mutex.Unlock
释放写锁;
读锁的加锁方法 sync.RWMutex.RLock 很简单,该方法会通过 sync/atomic.AddInt32 将 readerCount 加一:
1
2
3
4
5
| func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
|
如果该方法返回负数 — 其他 Goroutine 获得了写锁,当前 Goroutine 就会调用 runtime.sync_runtime_SemacquireMutex 陷入休眠等待锁的释放;
如果该方法的结果为非负数 — 没有 Goroutine 获得写锁,当前方法会成功返回;
当 Goroutine 想要释放读锁时,会调用如下所示的 sync.RWMutex.RUnlock 方法:
1
2
3
4
5
| func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
rw.rUnlockSlow(r)
}
}
|
该方法会先减少正在读资源的 readerCount 整数,根据 sync/atomic.AddInt32 的返回值不同会分别进行处理:
如果返回值大于等于零 — 读锁直接解锁成功;
如果返回值小于零 — 有一个正在执行的写操作,在这时会调用sync.RWMutex.rUnlockSlow 方法;
1
2
3
4
5
6
7
8
| func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
throw("sync: RUnlock of unlocked RWMutex")
}
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
|
sync.RWMutex.rUnlockSlow 会减少获取锁的写操作等待的读操作数 readerWait 并在所有读操作都被释放之后触发写操作的信号量 writerSem,该信号量被触发时,调度器就会唤醒尝试获取写锁的 Goroutine。
6. WaitGroup
1
2
3
4
5
6
7
8
9
10
| type WaitGroup struct {
noCopy noCopy
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers do not ensure it. So we allocate 12 bytes and then use
// the aligned 8 bytes in them as state, and the other 4 as storage
// for the sema.
state1 [3]uint32
}
|
其中noCopy确保不会被赋值的方式拷贝,state1记录状态量,其提供了三个方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if v > 0 || w == 0 {
return
}
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
|
通过对 sync.WaitGroup
的分析和研究,我们能够得出以下结论:
7. Once
1
2
3
4
| type Once struct {
done uint32
m Mutex
}
|
为当前 Goroutine 获取互斥锁;
执行传入的无入参函数;
运行延迟函数调用,将成员变量 done 更新成 1;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
|
8. Context
1
2
3
4
5
6
| type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
|
在 Goroutine 构成的树形结构中对信号进行同步以减少计算资源的浪费是 context.Context
的最大作用。Go 服务的每一个请求都是通过单独的 Goroutine 处理的,HTTP/RPC 请求的处理器会启动新的 Goroutine 访问数据库和其他服务。
都会从最顶层的 Goroutine 一层一层传递到最下层。context.Context
可以在上层 Goroutine 执行出现错误时,将信号及时同步给下层。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
|
9. Map
1
2
3
4
5
6
7
8
9
10
| type Map struct {
// 加锁作用,保护 dirty 字段
mu Mutex
// 只读的数据,实际数据类型为 readOnly
read atomic.Value
// 最新写入的数据
dirty map[interface{}]*entry
// 计数器,每次需要读 dirty 则 +1
misses int
}
|
使用read,dirty空间换时间。通过引入两个map将读写分离到不同的map,其中read map提供并发读和已存元素原子写,而dirty map则负责读写。
这样read map就可以在不加锁的情况下进行并发读取,当read map中没有读取到值时,再加锁进行后续读取,并累加未命中数。
当未命中数大于等于dirty map长度,将dirty map上升为read map。
从结构体的定义可以发现,虽然引入了两个map,但是底层数据存储的是指针,指向的是同一份值。
其中设置ReadOnly的字段为:
1
2
3
4
5
6
| type readOnly struct {
// 内建 map
m map[interface{}]*entry
// 表示 dirty 里存在 read 里没有的 key,通过该字段决定是否加锁读 dirty
amended bool
}
|
通过 read 和 dirty 两个字段将读写分离,读的数据存在只读字段 read 上,将最新写入的数据则存在 dirty 字段上
读取时会先查询 read,不存在再查询 dirty,写入时则只写入 dirty
读取 read 并不需要加锁,而读或写 dirty 都需要加锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
// Avoid reporting a spurious miss if m.dirty got promoted while we were
// blocked on m.mu. (If further loads of the same key will not miss, it's
// not worth copying the dirty map for this key.)
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
// Regardless of whether the entry was present, record a miss: this key
// will take the slow path until the dirty map is promoted to the read
// map.
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*interface{})(p), true
}
|
另外有 misses 字段来统计 read 被穿透的次数(被穿透指需要读 dirty 的情况),超过一定次数则将 dirty 数据同步到 read 上
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| // Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// The entry was previously expunged, which implies that there is a
// non-nil dirty map and this entry is not in it.
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
e.storeLocked(&value)
} else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
// tryStore stores a value if the entry has not been expunged.
//
// If the entry is expunged, tryStore returns false and leaves the entry
// unchanged.
func (e *entry) tryStore(i *interface{}) bool {
for {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
}
}
|
对于删除数据则直接通过标记来延迟删除
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| // LoadAndDelete deletes the value for a key, returning the previous value if any.
// The loaded result reports whether the key was present.
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
delete(m.dirty, key)
// Regardless of whether the entry was present, record a miss: this key
// will take the slow path until the dirty map is promoted to the read
// map.
m.missLocked()
}
m.mu.Unlock()
}
if ok {
return e.delete()
}
return nil, false
}
// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
m.LoadAndDelete(key)
}
|