golang Mutex

我们先来看怎么实现一个最简单的互斥锁,在开始之前可以先想一想,如果是你,你会怎么设计呢?

你可能会想到,可以通过一个 flag 变量,标记当前的锁是否被某个 goroutine 持有。如果这个 flag 的值是 1,就代表锁已经被持有,那么其它竞争的 goroutine 只能等待;如果这个 flag 的值是 0,就可以通过 CAS(compare-and-swap,或者 compare-and-set)将这个 flag 设置为 1,标识锁被当前的这个 goroutine 持有了。

整个逻辑很简单,但事实上早期的 Mutex 就是这么设计的,不过在看源码之前我们需要先了解一下什么是 CAS,它非常重要。

CAS 是什么

假设有一个块内存,里面存储的值是 a,但是现在想将其变成 a + b,这个时候需要经历哪几步呢?

  • 将存储的值读取出来,得到 a
  • 将 a 和 b 进行加法运算,得到 a + b
  • 再将计算后的新值 a + b 写回到原来的内存中,也就是将原来的值 a 给更新掉

单线程的话是没有任何问题的,但如果是多个线程同时操作这块内存呢?显然可能会出问题。因此需要通过 CAS 解决这一点,首先它会将内存中原本的值备份一份,运算之后会比较此时内存的值和备份的值,如果一致才进行更新,如果不一致则什么也不做。比如一开始内存的值是 a,备份一份,然后计算完毕之后发现内存的值变成了 a1,前后不一致,说明其它线程已经将这块内存的值给修改了,那么此时就不会再更新了;如果一致,说明没有别的线程修改这个内存的数据,那么此时才会更新。

所以 CAS 的名字很直观,就是先比较、然后再决定是否更新(设置),并且整体是原子性的。

CAS 是实现互斥锁和并发原语的基础,我们有必要掌握它。

sync.Mutex

这是sync包的互斥锁,加了之后不需要其他goroutine读取,只允许这个读取

加锁是原语操作

当goutine超过1ms因为锁请求不到资源的时候,会变成饥饿模式,下次允许优先获得锁。

下面是最简单的mutex实现方式

// CAS 操作,当时还没有抽象出 atomic 包
func cas(val *int32, old, new int32) bool
func semacquire(*int32)
func semrelease(*int32)

// 互斥锁的结构,包含两个字段
type Mutex struct {
    key int32  // 锁是否被持有的标识
    sema int32 // 信号量专用,用以阻塞和唤醒 goroutine
}

// 保证成功在 val 上增加 delta 的值
func xadd(val *int32, delta int32) (new int32) {
    for {
        v := *val
        if cas(val, v, v+delta) {
            return v + delta
        }
    }
    panic("unreached")
}

// 请求锁
func (m *Mutex) Lock() {
    if xadd(&m.key, 1) == 1 { // 标识加 1,如果等于 1,成功获取到锁
        return
    }
    semacquire(&m.sema) // 否则阻塞等待
}

// 释放锁
func (m *Mutex) Unlock() {
    if xadd(&m.key, -1) == 0 { // 将标识减去 1,如果等于 0,则没有其它等待者
        return
    }
    semrelease(&m.sema) // 唤醒其它阻塞的 goroutine
}

在调用 Lock 请求锁的时候,通过 xadd 方法进行 CAS 操作,并且不成功时会不断循环,直到 CAS 操作执行成功,保证对 key 的加 1 操作完成。如果幸运,锁没有被别的 goroutine 持有(key 为 0),那么 key 再加 1 之后就变成了 1,这个 goroutine 就成功获取了锁;如果锁已经被别的 goroutine 持有了,那么将 key 增加 1 之后显然不等于 1,因此会调用 semacquire 方法,通过请求信号量来将自己休眠。等到持有锁的 goroutine 释放锁的时候,再由持有锁的 goroutine 释放信号量来将自己唤醒。

所以当持有锁的 goroutine 调用 Unlock 释放锁时,它会将 key 减 1,如果结果为 0,那么表示此时没有因等待锁而阻塞 goroutine,于是直接返回;如果不为 0,那么说明还有其它的 goroutine 在等待锁,于是会调用 semrelease 方法,使用信号量唤醒等待的 goroutine 中的一个。

所以到这里我们算是明白了,初版的 Mutex 利用 CAS 原子操作,对 key 这个标志量进行设置,并且 key 不仅仅标识了锁是否被 goroutine 所持有,还记录了当前持有和等待获取锁的 goroutine 的数量。整体没什么难度,但是这里面有一个问题:

Mutex 常有错误

1、lock和unlock没有成对出现

package main

import (
    "sync"
    "time"
)

func main() {
    var m sync.Mutex

    go func() {
        m.Lock()
    }()
    // 确保子协程内部的代码先执行,简单 sleep 一下
    time.Sleep(100)
    m.Lock()
}

此时会引发 panic:fatal error: all goroutines are asleep - deadlock,因为子协程已经获取锁了,而主协程也在获取锁,所以主协程会阻塞在 m.Lock() 这一步。然后当子协程执行完毕之后,就只剩下主协程,而主协程如果想往下走,那么必须有子协程进行 Unlock。但是现在没有子协程了,所以主协程想往下走是不可能的,因此就死锁了。

如果是缺少 Lock,那么就会对一个未加锁的 Mutex 进行 Unlock,此时会引发 panic。

Go runtime 是如何检测到死锁的

也很简单,Go 只是去检查整个程序(所有的 Goroutines)是否都已经停止了,如果是,那么就是发生了灾难,Deadlock;很显然,上面这个例子只有一个 Main Goroutine,而它将会永远被阻塞,因此,Go runtime 检测到,整个程序已经被阻塞停止了,因此抛出 Panic Deadlock 异常!

2、拷贝了一个mutex

一般出现这种情况,都是在将 Mutex 作为函数参数传递的时候没有传递指针,而是直接把值本身传递了。而我们说 Go 只有值传递,不管传值还是传指针,都是拷贝一份。如果直接传 Mutex 本身,那么会拷贝一份,两者不是同一个 Mutex 了。

sync.RWMutex

实现原理

WMutex 是很常见的并发原语,很多编程语言的库都提供了类似的并发类型。RWMutex 一般都是基于互斥锁、条件变量(condition variables)或者信号量(semaphores)等并发原语来实现,Go 的 RWMutex 就是基于 Mutex 实现的。

而基于对读和写操作的优先级,读写锁的设计和实现也分成三类。

Read-preferring:读优先,当读锁和写锁都可以获取时优先获取读锁。该设计可以提供很高的并发性,但是在竞争激烈的情况下可能会导致写饥饿。这是因为如果有大量的读,这种设计会导致只有在所有的读锁都释放了,写锁才可能被获取到。

Write-preferring:写优先,当读锁和写锁都可以获取时优先获取写锁。当然,如果有一些 goroutine 已经获取了读锁,那么获取写锁的 goroutine 也必须要等待已经存在的读锁都被释放之后才可以获取。所以写优先级设计中的优先权是针对新来的读请求而言的,这种设计主要避免了写锁的饥饿问题。

不指定优先级:这种设计比较简单,不区分读锁和写锁的优先级,某些场景下这种不指定优先级的设计反而更有效。因为第一类优先级会导致写饥饿,第二类优先级可能会导致读饥饿,这种不指定优先级的访问不再区分读写,大家都是同一个优先级,有时反而解决了饥饿的问题。

Go 标准库 sync 中的 RWMutex 设计是 Write-preferring 方案,下面来分看一下底层结构。

RWMutex 包含一个 Mutex,以及四个辅助字段 writerSem、readerSem、readerCount 和 readerWait:

type RWMutex struct {
    // 获取写锁的 goroutine 称之为 writer,获取读锁的 goroutine 称之为 reader
    w           Mutex  // 互斥锁,通过互斥锁实现写锁的互斥
    writerSem   uint32 // writer 信号量
    readerSem   uint32 // reader 信号量
    readerCount int32  // reader 的数量
    readWait    int32  // writer 想要成功获取到写锁需要释放的读锁的数量,因为写锁要等到已存在的读锁释放之后才能获取
                       // 假设 writer 在获取写锁时,发现有 3 个 reader 获取到了读锁,那么 readWait 就是 3
}

const rwmutexMaxReaders = 1 << 30  // reader 的最大数量

总结

在开发过程中,一开始考虑共享资源并发访问问题的时候,我们就会想到互斥锁 Mutex。因为刚开始的时候,我们还并不太了解并发的情况,所以就会使用最简单的同步原语来解决问题。等到系统成熟,真正到了需要性能优化的时候,我们就能静下心来分析并发场景的可能性,这个时候我们就要考虑是否可以将 Mutex 修改为 RWMutex,来压榨系统的性能。

当然,如果一开始你的场景就非常明确了,比如我就要实现一个线程安全的 map,那么一开始你就可以考虑使用读写锁。正如之前说的,如果你能意识到你要解决的问题是使用 Mutex 时的效率问题、并且读多写少,那么你直接就可以毫不犹豫地选择 RWMutex,不用考虑其它选择。但是在使用 RWMutex 时,最需要注意的一点就是尽量避免重入,重入带来的死锁非常隐蔽,而且难以诊断。

Context

cancel实现原理

以上四个方法中常用的就是 Done 了,如果 Context 取消的时候,我们就可以得到一个关闭的 chan,关闭的 chan 是可以读取的;所以只要可以读取的时候,就意味着收到 Context 取消的信号了,那么 Context 监视的所有 goroutine 都会接收到信号。以下是这个方法的经典用法:

package main
 
import (
    "context"
    "fmt"
    "time"
)
 
func goroutine1(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            //等待退出的信号
            fmt.Println("收到取消通知,我要退出啦,使命已经结束")
            return
        default:
            //什么也不做,为了select不阻塞
        }
        
        // todo: 努力工作
        func() {
            time.Sleep(time.Second * 3)
            fmt.Println("bob,别傻愣着")
        }()
    }
}
 
func main() {
    ctx, cancel := context.WithCancel(context.Background())
    go goroutine1(ctx)
    time.Sleep(time.Second * 9)
    fmt.Println("结束啦~~~~,通知goroutine退出")
    cancel()
    // 可能要忙其他的事情
    for {}
    /*
    	bob,别傻愣着
    	bob,别傻愣着
    	结束啦~~~~,通知goroutine退出
    	bob,别傻愣着
    	收到取消通知,我要退出啦,使命已经结束
    */
}

tryLock

在 Go 1.18 中,为 sync.Mutex 新增了一个新的方法 TryLock(),它是一种非阻塞模式的取锁操作。当调用 TryLock() 时,该函数仅简单地返回 true 或者 false,代表是否加锁成功。

在并发编程中,为了避免多线程同时读写共享资源,我们需要互斥。Go 标准库提供了互斥锁 sync.Mutex ,通过加锁 Lock() 方法和解锁 Unlock() 方法达到对共享资源的并发控制。

在之前的设计中,当锁被占有,其他 goroutine 尝试获取锁时会被阻塞。这种方式当然是合理的,但是在某些情况下,或许我们希望在获取锁失败时,并不想停止执行,而是可以进入其他的逻辑。

在 Go 1.18 中,为 sync.Mutex 新增了一个新的方法 TryLock(),它是一种非阻塞模式的取锁操作。当调用 TryLock() 时,该函数仅简单地返回 true 或者 false,代表是否加锁成功。

有了 TryLock 的存在,我们就可以由这样的代码:

复制

 m.Lock()
 // 阻塞等待加锁成功后的逻辑
1.2.

转变成这样的逻辑

复制

 if m.TryLock(){
 // 加锁成功的逻辑
 }else {
 // 加锁失败的逻辑
 }
1.2.3.4.5.

TryLock 实现

在Go精妙的互斥锁设计一文中,我们详细分析过互斥锁的设计,其代码轻量简洁,通过巧妙的位运算,仅仅采用 state 一个字段就实现了四个字段的效果,非常之精彩,建议感兴趣的读者一读。

而 TryLock() 的实现更加简单。

复制

func (m *Mutex) TryLock() bool {
 old := m.state
 if old&(mutexLocked|mutexStarving) != 0 {
  return false
 }

 // There may be a goroutine waiting for the mutex, but we are
 // running now and can try to grab the mutex before that
 // goroutine wakes up.
 if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
  return false
 }

 if race.Enabled {
  race.Acquire(unsafe.Pointer(m))
 }
 return true
}
1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.

当锁被其他 goroutine 占有,或者当前锁正处于饥饿模式,它将立即返回 false。

复制

func (m *Mutex) Lock() {
 // Fast path: grab unlocked mutex.
 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  if race.Enabled {
   race.Acquire(unsafe.Pointer(m))
  }
  return
 }
 // Slow path (outlined so that the fast path can be inlined)
 m.lockSlow()
}
1.2.3.4.5.6.7.8.9.10.11.

而当锁可用时,TryLock() 会采用与 Lock() 方法一样的方式去尝试获取锁。但在获取失败时,与 Lock() 将不一样,它不会自旋或者阻塞。这是一个完全的非阻塞获取方式。

应用场景

正如 TryLock() 方法的注释一样,它的应用场景并不常见,并且也不被鼓励使用。

复制

// Note that while correct uses of TryLock do exist, they are rare,
// and use of TryLock is often a sign of a deeper problem
// in a particular use of mutexes.
1.2.3.

在当前 Go1.18 标准库源码中,与 Lock() 方法被大量内部使用而截然不同的是,并没有找到一处使用 TryLock() 的地方,仅仅在测试文件 mutex_test.go 中,有找到该方法的新增测试用例。

这里贴一个 TryLock 的使用场景讨论:https://stackoverflow.com/questions/41788074/use-case-for-lock-trylock

另外,在开源社区已经有不少 Go 的 TryLock 实现库。它们基于 sync.Mutex 通过 CAS 操作和 unsafe 指针实现 ;或者利用 channel 实现。

img

但是这些库都不能竞态检测。因此,官方支持实现 TryLock 是必要的,避免 TryLock 被滥用。且由于可以集成竞态检测,相较于三方库实现,有利于开发者发现问题。

总结

从 2012 年开始,实际上很早就有关于 Go 增加 TryLock 的 issue 讨论,但是直到 Go 1.18 才被增加。这其中很大一部分原因是,并没有合理的案例值得添加 TryLock。

Go Team 的负责人 rsc 之前提出的反对意见:TryLock 会鼓励开发者对锁进行不精确的思考,并最终导致竞态问题。

另外,Go 1.18 除了为互斥锁 sync.Mutex 新增了 TryLoc() 方法外,也为读写锁 sync.RWMutex 新增了相应的 TryRLock() 和 TryLock() 方法。

正如新增的这三个方法的注释,虽然使用它们的情况存在,但很少见,使用需谨慎。

参考文档:https://www.cnblogs.com/traditional/p/11894550.html