深入理解Go并发编程


深入理解Go并发编程

GMP调度模型

同事问我会不会GMP调度模型,我说不咋会。他说协程那块会用到GMP,那我就看一看。

有很多GMP场景实战,随着G的变化,M和P会有什么变化:https://zhuanlan.zhihu.com/p/323271088

https://juejin.cn/post/7213654163317260349

互斥锁Mutex

Mutex限定临界区同一时间内只能有一个goroutine进入。这些被锁包围的转账的代码就是临界区。

func transfer3(amount int64, accountFrom, accountTo *Account) bool {
    txMutex.Lock()
    defer txMutex.Unlock()

    bal := atomic.LoadInt64(&accountFrom.Balance)
    if bal < amount {
        return false
    }

    atomic.AddInt64(&accountTo.Balance, amount)
    atomic.AddInt64(&accountFrom.Balance, -amount)

    return true
}

调用Mutex.Lock()方法时,如果被其他go持有,会被阻塞。而调用TryLock(),会返回true和false;

直接用零值就行。 var mu sync.Mutex

mu.Lock()

Go内置了数据竞争检测器,但是不要在生产环境开启。

map并不是线程安全的,要使用sync.Map

读写锁RWMutex

多读少写

func BenchmarkCounter_RWMutex(b *testing.B) {
    var counter int64
    var mu sync.RWMutex

    for i := 0; i < b.N; i++ {
        b.RunParallel(func(pb *testing.PB) {
            i := 0
            for pb.Next() {
                i++

                if i%10000 == 0 {
                    // Lock the mutex, increment the counter, and unlock the mutex
                    mu.Lock()
                    counter++
                    mu.Unlock()
                } else {
                    // Lock the mutex for reading, retrieve the counter value, and unlock the mutex
                    mu.RLock()
                    _ = counter
                    mu.RUnlock()
                }

            }
        })
    }

}

不要递归使用读写锁。也就是不支持重入。重入也会导致死锁。

普通的是导入sync

检测各种死锁,可以尝试导入 sync “github.com/sasha-s/go-deadlock”

go func() {
    l.RLock()         // 第一次读锁
    defer l.RUnlock()
  
    l.RLock()         // 重入读锁 → 阻塞!
    defer l.RUnlock()
}()
go func() {
    l.Lock()          // 写锁等待第一次读锁释放
    defer l.Unlock()
}()
  1. 协程A持有读锁未释放,尝试重入读锁
  2. 协程B请求写锁,触发写优先机制
  3. 协程A的重入读锁被写优先机制阻塞
  4. 协程B等待协程A释放第一次读锁
  5. 循环等待形成死锁(28)

任务编排WaitGroup

跟Java里的CyclicBarrier和CountDownLatch差不多。或者和CompletableFuture.allOf(task1, task2, task3)一样。

import java.util.concurrent.CompletableFuture;

public class CompletableFutureExample {

    public static void main(String[] args) {
        // 创建三个 CompletableFuture,分别代表三个异步任务
        CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {
            System.out.println("任务1 开始执行,线程: " + Thread.currentThread().getName());
            simulateWork(2000);  // 模拟耗时操作
            System.out.println("任务1 完成");
        });

        CompletableFuture<Void> task2 = CompletableFuture.runAsync(() -> {
            System.out.println("任务2 开始执行,线程: " + Thread.currentThread().getName());
            simulateWork(3000);  // 模拟耗时操作
            System.out.println("任务2 完成");
        });

        CompletableFuture<Void> task3 = CompletableFuture.runAsync(() -> {
            System.out.println("任务3 开始执行,线程: " + Thread.currentThread().getName());
            simulateWork(1000);  // 模拟耗时操作
            System.out.println("任务3 完成");
        });

        // 使用 allOf 等待所有任务完成
        CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2, task3);

        // 阻塞等待所有任务完成
        allTasks.join();

        System.out.println("所有任务已完成,主程序结束。");
    }

    // 模拟耗时操作
    private static void simulateWork(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

然后要注意一些可能写错的场景:

要把wg.Add(1)写在协程外面,不然可能第一个协程执行完add和done后,wait就直接通过了。add要写在for循环和go协程中间,或者写在更前面。

多调用Done方法可能会让计数器的值为负数,这是不允许的。

不能进行waitgroup的重用,即一个协程不停的Add和Done;另外一个协程去wait,这样会panic。wait之后发现是0然后不是0.傻掉了。

Add就是+1,Done就是-1,wait就是检测是不是0

WaitGroup的扩展:

使用conc.WaitGroup;可以简化代码。

条件变量Cond

Cond同步原语,为等待/通知场景下的并发操作提供支持。通常用于等待某个条件的一组goroutine,当条件变为true时,其中一个或所有的goroutine会被唤醒执行。

类似于Java中的java.util.concruuent,locks.Condition

一旦有使用Cond的场景,我们更习惯使用channel去实现。

Wait方法:会把调用者放入Cond的等待队列中并阻塞,直到被Signal或者Broadcast方法从等待队列中移除并唤醒。在调用Wait方法时,调用者必须要持有c.L的锁。

Signal方法:允许调用者唤醒一个等待此Cond的goroutine。在Java中又叫notify方法或者notify_one方法。

Broadcast方法:允许调用者唤醒所有等待此Cond的goroutine。在java中又叫notifyAll方法。

单例化利器Once

Once常用于单个对象的初始化场景

不要递归调用,会死锁。不可重入

然后如果Once里面的执行失败了,没有准备好相对应的资源(比如网络问题导致连接失败),也不能再执行Once里面的。

解决方案:可以自己实现一个类似Once的同步原语。使用双重检查锁。必须要用atomic,起到的作用和Java里的volatile一样,可以看到最新值。

// RetryableOnce 是一个并发安全的实现,允许在初始化失败后重试
// 与标准sync.Once不同,它会在初始化函数返回错误后允许后续调用重新尝试
type RetryableOnce struct {
    mu          sync.Mutex
    done        uint32
}

// Do 执行初始化函数(如果尚未成功初始化)
// 如果函数返回错误,下次调用会自动重试
// 此实现是并发安全的
func (ro *RetryableOnce) Do(f func() error) error {
    // 快速路径:检查是否已初始化(尽可能避免加锁)
    
    if atomic.LoadUint32(&ro.done) ==1 {
        return nil
    }

    ro.mu.Lock()
    defer ro.mu.Unlock()
    // 双重检查锁
    if atomic.LoadUint32(&ro.done) ==1 {
        return nil
    }

    // 执行初始化函数
    err := f()
    if err == nil {
        atomic.StoreUint32(&ro.done,1)
    }
    return err
}

当然也可以直接对sync.Once进行扩展。

比如

type AnimalStore struct {
    once sync.Once
    inited uint32
}

或者直接

type Once struct {
    once   sync.Once
}

然后加一个Done()函数。直接去取&o.Once是不是1.

也可以给Once扩展返回值,或者直接使用”github.com/carlmjohnson/syncx”

func main() {
    var getMoL = syncx.Once(func() int {
       fmt.Println("calculating meaning of life...")
       return 42
    })
    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
       wg.Add(1)
       go func() {
          fmt.Println(getMoL())
          wg.Done()
       }()
    }
    wg.Wait()
}

并发map

map[k]v;其中k要是可比较的。

map对象必须在使用前被初始化。尤其是其作为一个struct字段的时候。

map是不能并发的读写的。比如一个协程去读,另外一个协程去写。不然会报错。

然后对于并发场景,可以自己写一套RWMap,去对每一个读写操作加锁。也可以用sync.Map.

import (
    "sync"
)

type RWMap[K comparable, V any] struct { // 一个读写锁保护的线程安全的map
    sync.RWMutex // 读写锁保护下面的map字段
    m            map[K]V
}

// 新建一个RWMap
func NewRWMap[K comparable, V any](n int) *RWMap[K, V] {
    return &RWMap[K, V]{
        m: make(map[K]V, n),
    }
}

func (m *RWMap[K, V]) Get(k K) (V, bool) { //从map中读取一个值
    m.RLock()
    defer m.RUnlock()
    v, existed := m.m[k] // 在锁的保护下从map中读取
    return v, existed
}

func (m *RWMap[K, V]) Set(k K, v V) { // 设置一个键值对
    m.Lock() // 锁保护
    defer m.Unlock()
    m.m[k] = v
}

func (m *RWMap[K, V]) Delete(k K) { //删除一个键
    m.Lock() // 锁保护
    defer m.Unlock()
    delete(m.m, k)
}

func (m *RWMap[K, V]) Len() int { // map的长度
    m.RLock() // 锁保护
    defer m.RUnlock()
    return len(m.m)
}

func (m *RWMap[K, V]) Each(f func(k K, v V) bool) { // 遍历map
    m.RLock() //遍历期间一直持有读锁
    defer m.RUnlock()

    for k, v := range m.m {
        if !f(k, v) {
            return
        }
    }
}

在以下两个场景中,使用sync.Map会比使用map+RWMutex的方式性能好得多:

  1. 在只会增长的缓存系统中,一个key只被写入一次而被读很多次
  2. 多个goroutine为不相交的键集读、写和重写键值对

经过测试,sync.Map的读比较快,写不如map+读写锁快。因为不是很常用,所以临时再去查API也可以。

https://www.bilibili.com/video/BV15NBsYcEoK/?spm_id_from=333.337.search-card.all.click&vd_source=6ed7f19911310aba47123ac6cc1674a0

sync.Map原理。里面有两个map。read map是读map,充当缓存加速。dirty map是修改的map,新值追加是在这层实现的。不过对于旧值,直接更新read map,因为两者共享一个指针。

这样可以让read map的速度近乎无锁。如果发生了击穿,即访问到dirty map,则会把dirty map升级为read map.如果要写新值,则从read map里面构建dirty map,在拷贝过程中会忽略那些已经被标记删除的k-v数据。删除数据是先标记后删除,延迟删除。

然后分片加锁,还可以用orcaman/concurrent-map。不过这个key只能是字符串。

减少锁的粒度,去分片,将一个锁分成多个锁,每个锁都控制一个分片。类似于Java的ConcurrentMap.将 map 分为 32 个 shard,每个分片使用RWMutex。

github.com/alphadose/haxmap也提供了,lock-free线程安全的

池Pool

如果在堆上大量地创建对象,会影响STW(stop-the-world)的时间,可以把不用的对象回收起来,避免被垃圾回收,这样就不必在堆上重新创建对象了。包括数据库的连接,TCP的长连接。

Pool有两个重要的字段,local和victim。每次垃圾回收的时候,Pool会把victim中的对象移除,然后把Local中的数据给victim.

使用sync.Pool的时候,如果set特别大量的byte数据,可能会造成其无法回收,从而造成内存浪费。

sync.Pool会无通知的在某个时候把连接移除,被垃圾回收了。所以标准库里的HTTP,数据库,TCP,memcache,都自己实现了相关的连接池(http.Client,sql.DB,faith/pool,gomemcache)

上下文Context

在使用WithCancel、WIthCancelCause、WithTimeout和WithDeadline函数时,一定要调用cancel函数。

子goroutine一定要设置正确的检查点,及时检查Context是否已被撤销或者超时。

原子操作atomic

CAS和其他函数

func TestCAS(t *testing.T) {
    var x uint64 = 0
    ok := atomic.CompareAndSwapUint64(&x, 0, 100) // ok == true
    assert.Equal(t, true, ok)

    ok = atomic.CompareAndSwapUint64(&x, 0, 100) // ok == false, x的原有的值不是0
    assert.Equal(t, false, ok)
}
newXValue := atomic.AddUint64(&x, 100) // newXValue == 100
old := atomic.SwapUint64(&x, 100)
v := atomic.LoadUint64(&x) // v == 0
atomic.StoreUint64(&x, 100) // x == 100

还提供了Value,可以对对象进行操作。而非单纯的int

func loadNewConfig() config{
  return Config{
      NodeName: "北京",
      Addr :"10.77.95.27"
  }
}

var config atomic.Value
config.Store(loadNewConfig())

atomic包的方法提供了内存屏障的功能,保证数据的可见性,读到最新值。

lock-free队列的实现,其中入队和出队都用了CAS。

package main

import (
    "sync/atomic"
    "unsafe"
)

// 创建lockfree包别名
var lockfree = struct {
    NewQueue func() *Queue
}{
    NewQueue: NewQueue,
}

// Node 队列节点
type Node struct {
    Value interface{}
    Next  unsafe.Pointer
}

// Queue lock-free队列结构
type Queue struct {
    head unsafe.Pointer
    tail unsafe.Pointer
}

// NewQueue 创建新的lock-free队列
func NewQueue() *Queue {
    dummy := &Node{}
    q := &Queue{
        head: unsafe.Pointer(dummy),
        tail: unsafe.Pointer(dummy),
    }
    return q
}

// Enqueue 入队操作
func (q *Queue) Enqueue(value interface{}) {
    newNode := &Node{Value: value}

    for {
        tail := atomic.LoadPointer(&q.tail)
        next := (*Node)(tail).Next

        // 检查tail是否是最新的
        if tail != atomic.LoadPointer(&q.tail) {
            continue
        }

        // 如果tail的next不为空,说明有其他线程正在操作,帮助更新tail
        if next != nil {
            atomic.CompareAndSwapPointer(&q.tail, tail, next)
            continue
        }

        // 尝试将新节点链接到tail后面
        if atomic.CompareAndSwapPointer(&(*Node)(tail).Next, next, unsafe.Pointer(newNode)) {
            // 尝试更新tail指针
            atomic.CompareAndSwapPointer(&q.tail, tail, unsafe.Pointer(newNode))
            return
        }
    }
}

// Dequeue 出队操作
func (q *Queue) Dequeue() (interface{}, bool) {
    for {
        head := atomic.LoadPointer(&q.head)
        tail := atomic.LoadPointer(&q.tail)
        next := (*Node)(head).Next

        // 检查队列是否为空
        if head == tail {
            if next == nil {
                return nil, false // 队列为空
            }
            // 帮助更新tail指针
            atomic.CompareAndSwapPointer(&q.tail, tail, next)
            continue
        }

        // 获取队首元素
        value := (*Node)(next).Value

        // 尝试移动head指针
        if atomic.CompareAndSwapPointer(&q.head, head, next) {
            return value, true
        }
    }
}
package main

import (
    "fmt"
    "sync"
)

func main() {
    q := lockfree.NewQueue()

    // 并发入队
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
       wg.Add(1)
       go func(val int) {
          defer wg.Done()
          q.Enqueue(val)
       }(i)
    }

    wg.Wait()

    // 并发出队
    for {
       val, ok := q.Dequeue()
       if !ok {
          break
       }
       fmt.Println("Dequeued:", val)
    }
}

CAS将”检查”和”执行”合并为一个原子操作,消除了这个时间窗口.

channel

Go语言的哲学就是“不要通过共享内存来进行通信,而是要通过通信来共享内存。”

channel的行为矩阵

nil 非空
(not empty)

(empty)

(full)
不满
(not full)
关闭
(closed)
receive 阻塞 读到值 阻塞 读到值 读到值 既有的值读完后,返回零值
send 阻塞 写入值 写入值 阻塞 写入值 panic
close panic 正常关闭 正常关闭 正常关闭 正常关闭 panic

有三种情况会发生panic:关闭为nil的chan,关闭已经关闭的chan,向已经关闭的chan里写入。

注意子goroutine泄露的问题,比如子goroutine里有ch <- data;

主线程里有 result := <-ch; 结果主线程超时结束了,会导致unbuffered的chan没有被读取。建议改为容量为1的chan

也要注意一下那些因为channel阻塞导致子goroutine无法退出的情况。

可以用反射reflect去操作select和channel

chan可以用做信息交流(线程安全的队列和buffer), 数据传递(4个goroutine之间,4个chan互相传数据,每个goroutine盯着自己的chan),信号通知(wait/notify),互斥锁,任务编排(or-Done模式,扇入模式,扇出模式,Stream模式,管道模式,map-reduce模式)

如果有多个任务,那么只要有任意一个任务执行完成,就可以返回任务完成的信号。这就是Or-Done模式。

在软件工程中,模块的扇入是指有多少个上级模块调用它,多个源channel输入,一个目的channel输出的情况。比如递归排序。

扇出只有一个源channel输入,但有多个目的channel输出。比如观察者模式,所有观察者都会收到变动信号。

Stream模式,把channel当做流式管道使用的方式。提供只取几个数据的方法。

把流串起来,就形成了管道。第一个流取数据,第二个流做平方。两个流组成管道。

map-reduce模式。第一步是映射(map),处理队列中的数据。第二步是规约(reduce),把列表中的每一个元素按照一定的方式处理成结果,放入结果队列。比如下面的代码map把每个整数*10,reduce把map函数处理的结果累加起来。

func mapChan[T, K any](in <-chan T, fn func(T) K) <-chan K {
    out := make(chan K)
    if in == nil {
        close(out)
        return out
    }

    go func() {
        defer close(out)

        for v := range in {
            out <- fn(v)
        }
    }()

    return out
}

func reduce[T, K any](in <-chan T, fn func(r K, v T) K) K {
    var out K

    if in == nil {
        return out
    }

    for v := range in {
        out = fn(out, v)
    }

    return out
}

func asStream(done <-chan struct{}) <-chan int {
    s := make(chan int)
    values := []int{1, 2, 3, 4, 5}
    go func() {
        defer close(s)

        for _, v := range values {
            select {
            case <-done:
                return
            case s <- v:
            }
        }

    }()
    return s
}

func main() {
    in := asStream(nil)

    // map op: time 10
    mapFn := func(v int) int {
        return v * 10
    }

    // reduce op: sum
    reduceFn := func(r, v int) int {
        return r + v
    }

    sum := reduce(mapChan(in, mapFn), reduceFn)
    fmt.Println(sum)
}

Go内存模型

Go的内存模型描述的是并发环境中多个goroutine读取相同变量时,对变量可见性的保证。

然后介绍了goroutine,channel, Mutex,Once, WaitGroup, atomic 的一些正常的执行顺序,可以 A synchronized before B.

信号量Semaphore

信号量是用来控制多个goroutine同时访问多个资源的同步原语。

PV操作:P操作用来减少信号量的计数值,V操作用来增大信号量的计数值。

可以通过chan来实现一个信号量,chan中的资源槽位。

Lock就是 chan <- struct{}{}; Unlock就是 <-chan

在Go官方的扩展库里也有一个golang.org/x/sync/semaphore。叫Weighted,可以一次请求、释放n个资源。

缓解压力利器 SingleFlight

缓存击穿和缓存雪崩是缓存过期;缓存穿透是访问的key不存在。

SingleFlight用于解决缓存击穿问题,是Go团队提供的一个扩展同步原语。他的作用是在处理多个goroutine同时调用一个函数时,只让一个goroutine调用这个函数,当这个goroutine返回结果时,再把结果返回给这几个同时调用的goroutine,这样就可以减少并发调用的数量。

SingleFlight使用Mutex和Map来实现,其中MUtex提供并发时的读、写保护,Map用来保存正在处理(in flight)的对同一个Key的请求。

循环屏障CyclicBarrier

CyclicBarrier更适合用在“数量固定的goroutine等待到达同一个检查点”的场景中,而且放行后,屏障可以重复使用。

func main() {
    cnt := 0
    b := cyclicbarrier.NewWithAction(10, func() error {
        cnt++
        return nil
    })

    wg := sync.WaitGroup{}
    wg.Add(10)

    for i := 0; i < 10; i++ {
        i := i
        go func() {
            defer wg.Done()
            for j := 0; j < 5; j++ {
                time.Sleep(time.Duration(rand.Intn(10)) * time.Second)
                log.Printf("goroutine %d 来到第%d轮屏障", i, j)
                err := b.Await(context.TODO())
                log.Printf("goroutine %d 冲破第%d轮屏障", i, j)
                if err != nil {
                    panic(err)
                }
            }
        }()
    }

    wg.Wait()
    fmt.Println(cnt)
}

分组操作

ErrGroup,SizedGroup,ErrSizedGroup,gollback,Hunch,schedgroup.

ErrGroup底层是基于WaitGroup.他的功能更加丰富,可与Context集成,error可以向上传播,把子任务的错误传递给Wait的调用者。

SizedGroup内部做了控制,虽然任务可以有成千上万个,但是内部只使用有限的goroutine来执行。

gollback可以把子任务的执行结果和error都返回,不需要再额外定义变量数组在每个goroutine里记录了。

Hunch和gollback差不多。

schedgroup可以指定任务在某个时间或者某个时间之后执行。

限流

令牌桶和漏桶。golang.org/x/time/rate是Go官方提供的一个基于令牌桶实现的限流库,uber-go/ratelimit就是一个被广泛使用的基于漏桶技术实现的单机的限流库。go-redis/redis_rate提供了一种成熟的基于Redis的分布式限流方案,采用漏洞技术,还支持突发请求。其底层使用redis/go-redis库来访问Redis,并且使用Lua脚本的方式在Redis服务端计算是否有充足的令牌。

限流属于服务上层,不属于业务考虑范围。

Go并发编程和调度器

主从:Campaign方法,其作用是把一个节点选择为主节点;Leader方法,查询当前主节点的方法;Observe方法:来监控主节点的变化。

etcd提供了选主逻辑,而我们要做的就是利用这些方法,让它们为我们的业务服务。etcd还提供了简单的Locker同步原语。

Locker是基于Mutex实现的。请求锁调用Lock方法,释放锁调用Unlock方法,通过Key方法获取Mutex的值。持有锁的节点崩溃后,锁在未来也会释放,可设置超时过期时间。etcd也提供了RWMutex。

基于etcd实现的分布式队列,分布式优先队列也是有的。还提供了分布式屏障Barrier。计数型屏障DoubleBarrier,集齐数量后才能一进一出。

软件事务内存(software transactional memory,STM)是一种并发编程模型,用于解决多线程访问共享内存时可能产生的数据竞争和并发性问题。当利用etcd做存储时,是可以利用STM实现事务操作的,一个事务可以包含多个账号的数据更改操作,事务能保证这些更改要不全部成功,要不全部失败。

并发模式

经典并发问题解析

哲学家就餐问题:

形成死锁的四个条件:

1.禁止抢占(No Preemption):不能强制抢夺别人的资源,抢筷子

2.持有和等待(Hold and Wait):一个线程在等待时持有并发资源。拿着筷子不放手,等别的筷子

3.互斥(Mutual Exclusion):资源在同一时刻只能被分配给一个线程。一个筷子只能在一个人手里

4.循环等待(Circular Waiting):一系列线程相互持有其他线程所需要的资源

破坏循环等待条件-减少就餐人数。奇偶处理办法。资源分级,必须拿到低级的筷子才能拿高级的筷子。引入服务生,将拿两个筷子视为一个原子操作,打破持有且等待条件。

理发师问题:

用Cond或者用channel实现信号量。


文章作者: 爱敲代码の鱼儿
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 爱敲代码の鱼儿 !
  目录