深入理解Go并发编程
GMP调度模型
同事问我会不会GMP调度模型,我说不咋会。他说协程那块会用到GMP,那我就看一看。
有很多GMP场景实战,随着G的变化,M和P会有什么变化:https://zhuanlan.zhihu.com/p/323271088
https://juejin.cn/post/7213654163317260349
互斥锁Mutex
Mutex限定临界区同一时间内只能有一个goroutine进入。这些被锁包围的转账的代码就是临界区。
func transfer3(amount int64, accountFrom, accountTo *Account) bool {
txMutex.Lock()
defer txMutex.Unlock()
bal := atomic.LoadInt64(&accountFrom.Balance)
if bal < amount {
return false
}
atomic.AddInt64(&accountTo.Balance, amount)
atomic.AddInt64(&accountFrom.Balance, -amount)
return true
}
调用Mutex.Lock()方法时,如果被其他go持有,会被阻塞。而调用TryLock(),会返回true和false;
直接用零值就行。 var mu sync.Mutex
mu.Lock()
Go内置了数据竞争检测器,但是不要在生产环境开启。
map并不是线程安全的,要使用sync.Map
读写锁RWMutex
多读少写
func BenchmarkCounter_RWMutex(b *testing.B) {
var counter int64
var mu sync.RWMutex
for i := 0; i < b.N; i++ {
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
i++
if i%10000 == 0 {
// Lock the mutex, increment the counter, and unlock the mutex
mu.Lock()
counter++
mu.Unlock()
} else {
// Lock the mutex for reading, retrieve the counter value, and unlock the mutex
mu.RLock()
_ = counter
mu.RUnlock()
}
}
})
}
}
不要递归使用读写锁。也就是不支持重入。重入也会导致死锁。
普通的是导入sync
检测各种死锁,可以尝试导入 sync “github.com/sasha-s/go-deadlock”
go func() {
l.RLock() // 第一次读锁
defer l.RUnlock()
l.RLock() // 重入读锁 → 阻塞!
defer l.RUnlock()
}()
go func() {
l.Lock() // 写锁等待第一次读锁释放
defer l.Unlock()
}()
- 协程A持有读锁未释放,尝试重入读锁
- 协程B请求写锁,触发写优先机制
- 协程A的重入读锁被写优先机制阻塞
- 协程B等待协程A释放第一次读锁
- 循环等待形成死锁(28)
任务编排WaitGroup
跟Java里的CyclicBarrier和CountDownLatch差不多。或者和CompletableFuture.allOf(task1, task2, task3)一样。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureExample {
public static void main(String[] args) {
// 创建三个 CompletableFuture,分别代表三个异步任务
CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {
System.out.println("任务1 开始执行,线程: " + Thread.currentThread().getName());
simulateWork(2000); // 模拟耗时操作
System.out.println("任务1 完成");
});
CompletableFuture<Void> task2 = CompletableFuture.runAsync(() -> {
System.out.println("任务2 开始执行,线程: " + Thread.currentThread().getName());
simulateWork(3000); // 模拟耗时操作
System.out.println("任务2 完成");
});
CompletableFuture<Void> task3 = CompletableFuture.runAsync(() -> {
System.out.println("任务3 开始执行,线程: " + Thread.currentThread().getName());
simulateWork(1000); // 模拟耗时操作
System.out.println("任务3 完成");
});
// 使用 allOf 等待所有任务完成
CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2, task3);
// 阻塞等待所有任务完成
allTasks.join();
System.out.println("所有任务已完成,主程序结束。");
}
// 模拟耗时操作
private static void simulateWork(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
然后要注意一些可能写错的场景:
要把wg.Add(1)写在协程外面,不然可能第一个协程执行完add和done后,wait就直接通过了。add要写在for循环和go协程中间,或者写在更前面。
多调用Done方法可能会让计数器的值为负数,这是不允许的。
不能进行waitgroup的重用,即一个协程不停的Add和Done;另外一个协程去wait,这样会panic。wait之后发现是0然后不是0.傻掉了。
Add就是+1,Done就是-1,wait就是检测是不是0
WaitGroup的扩展:
使用conc.WaitGroup;可以简化代码。
条件变量Cond
Cond同步原语,为等待/通知场景下的并发操作提供支持。通常用于等待某个条件的一组goroutine,当条件变为true时,其中一个或所有的goroutine会被唤醒执行。
类似于Java中的java.util.concruuent,locks.Condition
一旦有使用Cond的场景,我们更习惯使用channel去实现。
Wait方法:会把调用者放入Cond的等待队列中并阻塞,直到被Signal或者Broadcast方法从等待队列中移除并唤醒。在调用Wait方法时,调用者必须要持有c.L的锁。
Signal方法:允许调用者唤醒一个等待此Cond的goroutine。在Java中又叫notify方法或者notify_one方法。
Broadcast方法:允许调用者唤醒所有等待此Cond的goroutine。在java中又叫notifyAll方法。
单例化利器Once
Once常用于单个对象的初始化场景
不要递归调用,会死锁。不可重入
然后如果Once里面的执行失败了,没有准备好相对应的资源(比如网络问题导致连接失败),也不能再执行Once里面的。
解决方案:可以自己实现一个类似Once的同步原语。使用双重检查锁。必须要用atomic,起到的作用和Java里的volatile一样,可以看到最新值。
// RetryableOnce 是一个并发安全的实现,允许在初始化失败后重试
// 与标准sync.Once不同,它会在初始化函数返回错误后允许后续调用重新尝试
type RetryableOnce struct {
mu sync.Mutex
done uint32
}
// Do 执行初始化函数(如果尚未成功初始化)
// 如果函数返回错误,下次调用会自动重试
// 此实现是并发安全的
func (ro *RetryableOnce) Do(f func() error) error {
// 快速路径:检查是否已初始化(尽可能避免加锁)
if atomic.LoadUint32(&ro.done) ==1 {
return nil
}
ro.mu.Lock()
defer ro.mu.Unlock()
// 双重检查锁
if atomic.LoadUint32(&ro.done) ==1 {
return nil
}
// 执行初始化函数
err := f()
if err == nil {
atomic.StoreUint32(&ro.done,1)
}
return err
}
当然也可以直接对sync.Once进行扩展。
比如
type AnimalStore struct {
once sync.Once
inited uint32
}
或者直接
type Once struct {
once sync.Once
}
然后加一个Done()函数。直接去取&o.Once是不是1.
也可以给Once扩展返回值,或者直接使用”github.com/carlmjohnson/syncx”
func main() {
var getMoL = syncx.Once(func() int {
fmt.Println("calculating meaning of life...")
return 42
})
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
fmt.Println(getMoL())
wg.Done()
}()
}
wg.Wait()
}
并发map
map[k]v;其中k要是可比较的。
map对象必须在使用前被初始化。尤其是其作为一个struct字段的时候。
map是不能并发的读写的。比如一个协程去读,另外一个协程去写。不然会报错。
然后对于并发场景,可以自己写一套RWMap,去对每一个读写操作加锁。也可以用sync.Map.
import (
"sync"
)
type RWMap[K comparable, V any] struct { // 一个读写锁保护的线程安全的map
sync.RWMutex // 读写锁保护下面的map字段
m map[K]V
}
// 新建一个RWMap
func NewRWMap[K comparable, V any](n int) *RWMap[K, V] {
return &RWMap[K, V]{
m: make(map[K]V, n),
}
}
func (m *RWMap[K, V]) Get(k K) (V, bool) { //从map中读取一个值
m.RLock()
defer m.RUnlock()
v, existed := m.m[k] // 在锁的保护下从map中读取
return v, existed
}
func (m *RWMap[K, V]) Set(k K, v V) { // 设置一个键值对
m.Lock() // 锁保护
defer m.Unlock()
m.m[k] = v
}
func (m *RWMap[K, V]) Delete(k K) { //删除一个键
m.Lock() // 锁保护
defer m.Unlock()
delete(m.m, k)
}
func (m *RWMap[K, V]) Len() int { // map的长度
m.RLock() // 锁保护
defer m.RUnlock()
return len(m.m)
}
func (m *RWMap[K, V]) Each(f func(k K, v V) bool) { // 遍历map
m.RLock() //遍历期间一直持有读锁
defer m.RUnlock()
for k, v := range m.m {
if !f(k, v) {
return
}
}
}
在以下两个场景中,使用sync.Map会比使用map+RWMutex的方式性能好得多:
- 在只会增长的缓存系统中,一个key只被写入一次而被读很多次
- 多个goroutine为不相交的键集读、写和重写键值对
经过测试,sync.Map的读比较快,写不如map+读写锁快。因为不是很常用,所以临时再去查API也可以。
sync.Map原理。里面有两个map。read map是读map,充当缓存加速。dirty map是修改的map,新值追加是在这层实现的。不过对于旧值,直接更新read map,因为两者共享一个指针。
这样可以让read map的速度近乎无锁。如果发生了击穿,即访问到dirty map,则会把dirty map升级为read map.如果要写新值,则从read map里面构建dirty map,在拷贝过程中会忽略那些已经被标记删除的k-v数据。删除数据是先标记后删除,延迟删除。
然后分片加锁,还可以用orcaman/concurrent-map。不过这个key只能是字符串。
减少锁的粒度,去分片,将一个锁分成多个锁,每个锁都控制一个分片。类似于Java的ConcurrentMap.将 map 分为 32 个 shard,每个分片使用RWMutex。
github.com/alphadose/haxmap也提供了,lock-free线程安全的
池Pool
如果在堆上大量地创建对象,会影响STW(stop-the-world)的时间,可以把不用的对象回收起来,避免被垃圾回收,这样就不必在堆上重新创建对象了。包括数据库的连接,TCP的长连接。
Pool有两个重要的字段,local和victim。每次垃圾回收的时候,Pool会把victim中的对象移除,然后把Local中的数据给victim.
使用sync.Pool的时候,如果set特别大量的byte数据,可能会造成其无法回收,从而造成内存浪费。
sync.Pool会无通知的在某个时候把连接移除,被垃圾回收了。所以标准库里的HTTP,数据库,TCP,memcache,都自己实现了相关的连接池(http.Client,sql.DB,faith/pool,gomemcache)
上下文Context
在使用WithCancel、WIthCancelCause、WithTimeout和WithDeadline函数时,一定要调用cancel函数。
子goroutine一定要设置正确的检查点,及时检查Context是否已被撤销或者超时。
原子操作atomic
CAS和其他函数
func TestCAS(t *testing.T) {
var x uint64 = 0
ok := atomic.CompareAndSwapUint64(&x, 0, 100) // ok == true
assert.Equal(t, true, ok)
ok = atomic.CompareAndSwapUint64(&x, 0, 100) // ok == false, x的原有的值不是0
assert.Equal(t, false, ok)
}
newXValue := atomic.AddUint64(&x, 100) // newXValue == 100
old := atomic.SwapUint64(&x, 100)
v := atomic.LoadUint64(&x) // v == 0
atomic.StoreUint64(&x, 100) // x == 100
还提供了Value,可以对对象进行操作。而非单纯的int
func loadNewConfig() config{
return Config{
NodeName: "北京",
Addr :"10.77.95.27"
}
}
var config atomic.Value
config.Store(loadNewConfig())
atomic包的方法提供了内存屏障的功能,保证数据的可见性,读到最新值。
lock-free队列的实现,其中入队和出队都用了CAS。
package main
import (
"sync/atomic"
"unsafe"
)
// 创建lockfree包别名
var lockfree = struct {
NewQueue func() *Queue
}{
NewQueue: NewQueue,
}
// Node 队列节点
type Node struct {
Value interface{}
Next unsafe.Pointer
}
// Queue lock-free队列结构
type Queue struct {
head unsafe.Pointer
tail unsafe.Pointer
}
// NewQueue 创建新的lock-free队列
func NewQueue() *Queue {
dummy := &Node{}
q := &Queue{
head: unsafe.Pointer(dummy),
tail: unsafe.Pointer(dummy),
}
return q
}
// Enqueue 入队操作
func (q *Queue) Enqueue(value interface{}) {
newNode := &Node{Value: value}
for {
tail := atomic.LoadPointer(&q.tail)
next := (*Node)(tail).Next
// 检查tail是否是最新的
if tail != atomic.LoadPointer(&q.tail) {
continue
}
// 如果tail的next不为空,说明有其他线程正在操作,帮助更新tail
if next != nil {
atomic.CompareAndSwapPointer(&q.tail, tail, next)
continue
}
// 尝试将新节点链接到tail后面
if atomic.CompareAndSwapPointer(&(*Node)(tail).Next, next, unsafe.Pointer(newNode)) {
// 尝试更新tail指针
atomic.CompareAndSwapPointer(&q.tail, tail, unsafe.Pointer(newNode))
return
}
}
}
// Dequeue 出队操作
func (q *Queue) Dequeue() (interface{}, bool) {
for {
head := atomic.LoadPointer(&q.head)
tail := atomic.LoadPointer(&q.tail)
next := (*Node)(head).Next
// 检查队列是否为空
if head == tail {
if next == nil {
return nil, false // 队列为空
}
// 帮助更新tail指针
atomic.CompareAndSwapPointer(&q.tail, tail, next)
continue
}
// 获取队首元素
value := (*Node)(next).Value
// 尝试移动head指针
if atomic.CompareAndSwapPointer(&q.head, head, next) {
return value, true
}
}
}
package main
import (
"fmt"
"sync"
)
func main() {
q := lockfree.NewQueue()
// 并发入队
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(val int) {
defer wg.Done()
q.Enqueue(val)
}(i)
}
wg.Wait()
// 并发出队
for {
val, ok := q.Dequeue()
if !ok {
break
}
fmt.Println("Dequeued:", val)
}
}
CAS将”检查”和”执行”合并为一个原子操作,消除了这个时间窗口.
channel
Go语言的哲学就是“不要通过共享内存来进行通信,而是要通过通信来共享内存。”
channel的行为矩阵
| nil | 非空 (not empty) |
空 (empty) |
满 (full) |
不满 (not full) |
关闭 (closed) |
|
|---|---|---|---|---|---|---|
| receive | 阻塞 | 读到值 | 阻塞 | 读到值 | 读到值 | 既有的值读完后,返回零值 |
| send | 阻塞 | 写入值 | 写入值 | 阻塞 | 写入值 | panic |
| close | panic | 正常关闭 | 正常关闭 | 正常关闭 | 正常关闭 | panic |
有三种情况会发生panic:关闭为nil的chan,关闭已经关闭的chan,向已经关闭的chan里写入。
注意子goroutine泄露的问题,比如子goroutine里有ch <- data;
主线程里有 result := <-ch; 结果主线程超时结束了,会导致unbuffered的chan没有被读取。建议改为容量为1的chan
也要注意一下那些因为channel阻塞导致子goroutine无法退出的情况。
可以用反射reflect去操作select和channel
chan可以用做信息交流(线程安全的队列和buffer), 数据传递(4个goroutine之间,4个chan互相传数据,每个goroutine盯着自己的chan),信号通知(wait/notify),互斥锁,任务编排(or-Done模式,扇入模式,扇出模式,Stream模式,管道模式,map-reduce模式)
如果有多个任务,那么只要有任意一个任务执行完成,就可以返回任务完成的信号。这就是Or-Done模式。
在软件工程中,模块的扇入是指有多少个上级模块调用它,多个源channel输入,一个目的channel输出的情况。比如递归排序。
扇出只有一个源channel输入,但有多个目的channel输出。比如观察者模式,所有观察者都会收到变动信号。
Stream模式,把channel当做流式管道使用的方式。提供只取几个数据的方法。
把流串起来,就形成了管道。第一个流取数据,第二个流做平方。两个流组成管道。
map-reduce模式。第一步是映射(map),处理队列中的数据。第二步是规约(reduce),把列表中的每一个元素按照一定的方式处理成结果,放入结果队列。比如下面的代码map把每个整数*10,reduce把map函数处理的结果累加起来。
func mapChan[T, K any](in <-chan T, fn func(T) K) <-chan K {
out := make(chan K)
if in == nil {
close(out)
return out
}
go func() {
defer close(out)
for v := range in {
out <- fn(v)
}
}()
return out
}
func reduce[T, K any](in <-chan T, fn func(r K, v T) K) K {
var out K
if in == nil {
return out
}
for v := range in {
out = fn(out, v)
}
return out
}
func asStream(done <-chan struct{}) <-chan int {
s := make(chan int)
values := []int{1, 2, 3, 4, 5}
go func() {
defer close(s)
for _, v := range values {
select {
case <-done:
return
case s <- v:
}
}
}()
return s
}
func main() {
in := asStream(nil)
// map op: time 10
mapFn := func(v int) int {
return v * 10
}
// reduce op: sum
reduceFn := func(r, v int) int {
return r + v
}
sum := reduce(mapChan(in, mapFn), reduceFn)
fmt.Println(sum)
}
Go内存模型
Go的内存模型描述的是并发环境中多个goroutine读取相同变量时,对变量可见性的保证。
然后介绍了goroutine,channel, Mutex,Once, WaitGroup, atomic 的一些正常的执行顺序,可以 A synchronized before B.
信号量Semaphore
信号量是用来控制多个goroutine同时访问多个资源的同步原语。
PV操作:P操作用来减少信号量的计数值,V操作用来增大信号量的计数值。
可以通过chan来实现一个信号量,chan中的资源槽位。
Lock就是 chan <- struct{}{}; Unlock就是 <-chan
在Go官方的扩展库里也有一个golang.org/x/sync/semaphore。叫Weighted,可以一次请求、释放n个资源。
缓解压力利器 SingleFlight
缓存击穿和缓存雪崩是缓存过期;缓存穿透是访问的key不存在。
SingleFlight用于解决缓存击穿问题,是Go团队提供的一个扩展同步原语。他的作用是在处理多个goroutine同时调用一个函数时,只让一个goroutine调用这个函数,当这个goroutine返回结果时,再把结果返回给这几个同时调用的goroutine,这样就可以减少并发调用的数量。
SingleFlight使用Mutex和Map来实现,其中MUtex提供并发时的读、写保护,Map用来保存正在处理(in flight)的对同一个Key的请求。
循环屏障CyclicBarrier
CyclicBarrier更适合用在“数量固定的goroutine等待到达同一个检查点”的场景中,而且放行后,屏障可以重复使用。
func main() {
cnt := 0
b := cyclicbarrier.NewWithAction(10, func() error {
cnt++
return nil
})
wg := sync.WaitGroup{}
wg.Add(10)
for i := 0; i < 10; i++ {
i := i
go func() {
defer wg.Done()
for j := 0; j < 5; j++ {
time.Sleep(time.Duration(rand.Intn(10)) * time.Second)
log.Printf("goroutine %d 来到第%d轮屏障", i, j)
err := b.Await(context.TODO())
log.Printf("goroutine %d 冲破第%d轮屏障", i, j)
if err != nil {
panic(err)
}
}
}()
}
wg.Wait()
fmt.Println(cnt)
}
分组操作
ErrGroup,SizedGroup,ErrSizedGroup,gollback,Hunch,schedgroup.
ErrGroup底层是基于WaitGroup.他的功能更加丰富,可与Context集成,error可以向上传播,把子任务的错误传递给Wait的调用者。
SizedGroup内部做了控制,虽然任务可以有成千上万个,但是内部只使用有限的goroutine来执行。
gollback可以把子任务的执行结果和error都返回,不需要再额外定义变量数组在每个goroutine里记录了。
Hunch和gollback差不多。
schedgroup可以指定任务在某个时间或者某个时间之后执行。
限流
令牌桶和漏桶。golang.org/x/time/rate是Go官方提供的一个基于令牌桶实现的限流库,uber-go/ratelimit就是一个被广泛使用的基于漏桶技术实现的单机的限流库。go-redis/redis_rate提供了一种成熟的基于Redis的分布式限流方案,采用漏洞技术,还支持突发请求。其底层使用redis/go-redis库来访问Redis,并且使用Lua脚本的方式在Redis服务端计算是否有充足的令牌。
限流属于服务上层,不属于业务考虑范围。
Go并发编程和调度器
主从:Campaign方法,其作用是把一个节点选择为主节点;Leader方法,查询当前主节点的方法;Observe方法:来监控主节点的变化。
etcd提供了选主逻辑,而我们要做的就是利用这些方法,让它们为我们的业务服务。etcd还提供了简单的Locker同步原语。
Locker是基于Mutex实现的。请求锁调用Lock方法,释放锁调用Unlock方法,通过Key方法获取Mutex的值。持有锁的节点崩溃后,锁在未来也会释放,可设置超时过期时间。etcd也提供了RWMutex。
基于etcd实现的分布式队列,分布式优先队列也是有的。还提供了分布式屏障Barrier。计数型屏障DoubleBarrier,集齐数量后才能一进一出。
软件事务内存(software transactional memory,STM)是一种并发编程模型,用于解决多线程访问共享内存时可能产生的数据竞争和并发性问题。当利用etcd做存储时,是可以利用STM实现事务操作的,一个事务可以包含多个账号的数据更改操作,事务能保证这些更改要不全部成功,要不全部失败。
并发模式
略
经典并发问题解析
哲学家就餐问题:
形成死锁的四个条件:
1.禁止抢占(No Preemption):不能强制抢夺别人的资源,抢筷子
2.持有和等待(Hold and Wait):一个线程在等待时持有并发资源。拿着筷子不放手,等别的筷子
3.互斥(Mutual Exclusion):资源在同一时刻只能被分配给一个线程。一个筷子只能在一个人手里
4.循环等待(Circular Waiting):一系列线程相互持有其他线程所需要的资源
破坏循环等待条件-减少就餐人数。奇偶处理办法。资源分级,必须拿到低级的筷子才能拿高级的筷子。引入服务生,将拿两个筷子视为一个原子操作,打破持有且等待条件。
理发师问题:
用Cond或者用channel实现信号量。