预备知识

unsafe.Pointer

unsafe.Pointer 是一种特殊意义的指针,它可以包含任意类型的地址,有点类似于 C 语言里的 void* 指针,全能型的。

对unsafe.Pointer 又爱又恨,你会有效使用它吗?

unsafe 是关注 Go 程序操作类型安全的包。

unsafe.Pointer 可以让你无视 Go 的类型系统,完成任何类型与内建的 uintptr 类型之间的转化。根据文档,unsafe.Pointer 可以实现四种其他类型不能的操作:

  • 任何类型的指针都可以转化为一个 unsafe.Pointer
  • 一个 unsafe.Pointer 可以转化成任何类型的指针
  • 一个 uintptr 可以转化成一个 unsafe.Pointer
  • 一个 unsafe.Pointer 可以转化成一个 uintptr

两种只能借助 unsafe 包才能完成的操作:

  • 使用 unsafe.Pointer 实现两种类型间转换
  • 使用 unsafe.Pointer 处理系统调用。

CAS比较并交换—-Compare And Swap

Go 的一个 CAS 操作使用场景

  • 在并发执行的多个 routine R1,R2…Rn 的中,同一时间只允许唯一一个 routine 执行某一个操作,并且其他 routine 需要非阻塞的知道自己无权操作并返回的时候,可以使用 CAS 操作。

大方向:任务编排用 Channel,共享资源保护用传统并发原语

互斥锁实现机制

使用互斥锁,限定临界区只能同时由一个线程持有。

  • 临界区
    • 在并发编程中,如果程序中的一部分会被并发访问或修改,那么,为了避免并发访问导致的意想不到的结果,这部分程序需要被保护起来,这部分被保护起来的程序,就叫做临界区。

image-20210828135504575

在 Go 标准库中,它提供了 Mutex 来实现互斥锁这个功能。

  • 共享资源。并发地读写共享资源,会出现数据竞争(data race)的问题,所以需要 Mutex、RWMutex 这样的并发原语来保护。
  • 任务编排。需要 goroutine 按照一定的规律执行,而 goroutine 之间有相互等待或者依赖的顺序关系,我们常常使用 WaitGroup 或者 Channel 来实现。
  • 消息传递。信息交流以及不同的 goroutine 之间的线程安全的数据交流,常常使用 Channel 来实现。

简单的计数器例子 Counter

方法一:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import(
"fmt"
   "sync"
)

func main() {
var count = 0
   // 互斥锁保护计数器
   var mu sync.Mutex
   var wg sync.WaitGroup

   wg.Add(10)

for i := 0; i < 10; i++{
go func() {
defer wg.Done()
// 对变量 count 进行加法操作
         // count++ 不是一个原子操作,它至少包含几个步骤,
         // 比如读取变量 count 的当前值,
         // 对这个值加 1,把结果再保存到 count 中。
         // 因为不是原子操作,就可能有并发的问题。
         for i := 0; i < 10000; i++{
mu.Lock()
count++
            mu.Unlock()
}
}()
}
// 等待 10 个 goroutine 完成
   wg.Wait()
fmt.Println("结果为", count)

}

方法二(推荐):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
type Counter struct{
	 id    int
   name  string
   mu    sync.Mutex
   count uint
}

func(c *Counter)Inc() {
c.mu.Lock()
c.count++
   c.mu.Unlock()
}

func(c *Counter)Count()uint{
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}

func main2() {
var counter Counter

   var wg sync.WaitGroup
   wg.Add(10)

for i := 0; i < 10; i++{
go func() {
defer wg.Done()
for i := 0; i < 10000; i++{
counter.Inc()// 受到 mutex 保护的方法
}
}()
}
wg.Wait()
fmt.Println("结果为", counter.count)
}

等待的goroutine们是以FIFO排队的

  • 1)当Mutex处于正常模式时,若此时没有新goroutine与队头goroutine竞争,则队头goroutine获得。若有新goroutine竞争大概率新goroutine获得。

  • 2)当队头goroutine竞争锁失败1ms后,它会将Mutex调整为饥饿模式。进入饥饿模式后,锁的所有权会直接从解锁goroutine移交给队头goroutine,此时新来的goroutine直接放入队尾。

  • 3)当一个goroutine获取锁后,如果发现自己满足下列条件中的任何一个

    • 它是队列中最后一个
    • 它等待锁的时间少于1ms

    将锁切换回正常模式

mutex 注意点

  • Unlock 方法可以被任意的 goroutine 调用释放锁,即使是没持有这个互斥锁的 goroutine,也可以进行这个操作。这是因为,Mutex 本身并没有包含持有这把锁的 goroutine 的信息,所以,Unlock 也不会对此进行检查。Mutex 的这个设计一直保持至今。

Mutex常见错误

Mutex 常见的错误场景有 4 类,分别是 Lock/Unlock 不是成对出现、Copy 已使用的 Mutex、重入和死锁。

可重入的概念

  • 当一个线程获取锁时,如果没有其它线程拥有这个锁,那么,这个线程就成功获取到这个锁。之后,如果其它线程再请求这个锁,就会处于阻塞等待的状态。但是,如果拥有这把锁的线程再请求这把锁的话,不会阻塞,而是成功返回,所以叫可重入锁(有时候也叫做递归锁)。只要你拥有这把锁,你可以可着劲儿地调用,比如通过递归实现一些算法,调用者不会阻塞或者死锁。

死锁

  • 两个或两个以上的进程(或线程,goroutine)在执行过程中,因争夺共享资源而处于一种互相等待的状态,如果没有外部干涉,它们都将无法推进下去,此时,我们称系统处于死锁状态或系统产生了死锁。

避免死锁,只要破坏这四个条件中的一个或者几个,就可以了。

  • 互斥: 至少一个资源是被排他性独享的,其他线程必须处于等待状态,直到资源被释放。
  • 持有和等待:goroutine 持有一个资源,并且还在请求其它 goroutine 持有的资源,也就是咱们常说的“吃着碗里,看着锅里”的意思。
  • 不可剥夺:资源只能由持有它的 goroutine 来释放。
  • 环路等待:一般来说,存在一组等待进程,P={P1,P2,…,PN},P1 等待 P2 持有的资源,P2 等待 P3 持有的资源,依此类推,最后是 PN 等待 P1 持有的资源,这就形成了一个环路等待的死结。

image-20210828135540673

Mutex小结

RWMutex — 读写锁

标准库中的 RWMutex 是一个 reader/writer 互斥锁。RWMutex在某一时刻只能由任意数量的 reader 持有,或者是只被单个的 writer 持有。RWMutex 的方法也很少,总共有 5 个。

  • Lock/Unlock:写操作时调用的方法。如果锁已经被 reader 或者 writer 持有,那么,Lock 方法会一直阻塞,直到能获取到锁;Unlock 则是配对的释放锁的方法。

  • RLock/RUnlock:读操作时调用的方法。如果锁已经被 writer 持有的话,RLock 方法会一直阻塞,直到能获取到锁,否则就直接返回;而 RUnlock 是 reader 释放锁的方法。

  • RLocker:这个方法的作用是为读操作返回一个 Locker 接口的对象。它的 Lock 方法会调用 RWMutex 的 RLock 方法,它的 Unlock 方法会调用 RWMutex 的 RUnlock 方法。

    RWMutex 的零值是未加锁的状态,所以,当你使用 RWMutex 的时候,无论是声明变量,还是嵌入到其它 struct 中,都不必显式地初始化。

如果你遇到可以明确区分 reader 和 writer goroutine 的场景,且有大量的并发读、少量的并发写,并且有强烈的性能需求,你就可以考虑使用读写锁 RWMutex 替换 Mutex。

RWMutex 的实现原理

RWMutex 是很常见的并发原语,很多编程语言的库都提供了类似的并发类型。RWMutex 一般都是基于互斥锁、条件变量(condition variables)或者信号量(semaphores)等并发原语来实现。Go 标准库中的 RWMutex 是基于 Mutex 实现的。

  • readers-writers 问题一般有三类,基于对读和写操作的优先级,读写锁的设计和实现也分成三类。Read-preferring:读优先的设计可以提供很高的并发性,但是,在竞争激烈的情况下可能会导致写饥饿。这是因为,如果有大量的读,这种设计会导致只有所有的读都释放了锁之后,写才可能获取到锁。
  • Write-preferring:写优先的设计意味着,如果已经有一个 writer 在等待请求锁的话,它会阻止新来的请求锁的 reader 获取到锁,所以优先保障 writer。当然,如果有一些 reader 已经请求了锁的话,新请求的 writer 也会等待已经存在的 reader 都释放锁之后才能获取。所以,写优先级设计中的优先权是针对新来的请求而言的。这种设计主要避免了 writer 的饥饿问题。
  • 不指定优先级:这种设计比较简单,不区分 reader 和 writer 优先级,某些场景下这种不指定优先级的设计反而更有效,因为第一类优先级会导致写饥饿,第二类优先级可能会导致读饥饿,这种不指定优先级的访问不再区分读写,大家都是同一个优先级,解决了饥饿的问题。

Go 标准库中的 RWMutex 设计是 Write-preferring 方案。一个正在阻塞的 Lock 调用会排除新的 reader 请求到锁。

RWMutex的锁

RWMutex 是⼀个多 writer 多 reader 的读写锁,所以同时可能有多个 writer 和 reader。那 么,为了避免 writer 之间的竞争,RWMutex 就会使用⼀个 Mutex 来保证 writer 的互斥。

  • 在 Lock 方法中,是先获取内部互斥锁,才会修改的其他字段;
  • 在 Unlock 方法中,是先修改的其他字段,才会释放内部互斥锁,这样才能保证字段的修改也受到互斥锁的保护。

使用读写锁最需要注意的⼀点就是尽量避免重入,重入带来的死锁⾮常隐蔽,⽽且难以 诊断。

WaitGroup:协同等待,任务编排利器

WaitGroup基本用法

1
2
3
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
  • Add,用来设置 WaitGroup 的计数值;
  • Done,用来将 WaitGroup 的计数值减 1,其实就是调用了 Add(-1);
  • Wait,调用这个方法的 goroutine 会⼀直阻塞,直到 WaitGroup 的计数值变为 0。

WaitGroup数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
type WaitGroup struct {
		// 避免复制使用的⼀个技巧,可以告诉vet⼯具违反了复制使用的规则
		noCopy noCopy

		// 64bit(8bytes)的值分成两段,⾼32bit是计数值,低32bit是waiter的计数
		// 另外32bit是用作信号量的
		// 因为64bit值的原子操作需要64bit对⻬,但是32bit编译器不⽀持,所以数组中的元素在不同的架构
		// 总之,会找到对⻬的那64bit作为state,其余的32bit做信号量
		state1 [3]uint32
}

// 得到state的地址和信号量的地址
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
// 如果地址是64bit对⻬的,数组前两个元素做state,后⼀个元素做信号量
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
// 如果地址是32bit对⻬的,数组后两个元素用来做state,它可以用来做64bit的原子操作,第
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}

WaitGroup 是可以重用的。只要 WaitGroup 的计数值恢复到零值的状态,那么它就可以被看作是新创建的 WaitGroup,被重复使用

小结

  • 不重用 WaitGroup。新建⼀个 WaitGroup 不会带来多⼤的资源开销,重用反⽽更容易出 错。
  • 保证所有的 Add 方法调用都在 Wait 之前。
  • 不传递负数给 Add 方法,只通过 Done 来给计数值减 1。
  • 不做多余的 Done 方法调用,保证 Add 的计数值和 Done 方法调用的数量是⼀样的。
  • 不遗漏 Done 方法的调用,否则会导致 Wait hang 住⽆法返回。

Cond

Go 标准库提供 Cond 原语的目的是,为等待 / 通知场景下的并发问题提供⽀持。

Cond 的基本用法

1
2
3
4
5
type Cond
func NeWCond(l Locker) *Cond
func (c *Cond) Broadcast()
func (c *Cond) Signal()
func (c *Cond) Wait()
  • ⾸先,Cond 关联的 Locker 实例可以通过 c.L 访问,它内部维护着⼀个先入先出的等待队 列。
  • Signal 方法,允许调用者 Caller 唤醒⼀个等待此 Cond 的 goroutine。如果此时没有等待的goroutine,显然⽆需通知 waiter;如果 Cond 等待队列中有⼀个或者多个等待的goroutine,则需要从等待队列中移除第⼀个 goroutine 并把它唤醒。在其他编程语言中,⽐如 Java 语⾔中,Signal 方法也被叫做 notify 方法。
    • 调用 Signal 方法时,不强求你⼀定要持有 c.L 的锁。
  • Broadcast 方法,允许调用者 Caller 唤醒所有等待此 Cond 的 goroutine。如果此时没有等待的 goroutine,显然⽆需通知 waiter;如果 Cond 等待队列中有⼀个或者多个等待的goroutine,则清空所有等待的 goroutine,并全部唤醒。在其他编程语⾔中,⽐如 Java 语⾔中,Broadcast 方法也被叫做 notifyAll 方法。
    • 同样地,调用 Broadcast 方法时,也不强求你⼀定持有 c.L 的锁。
  • Wait 方法,会把调用者 Caller 放入 Cond 的等待队列中并阻塞,直到被 Signal 或者Broadcast 的方法从等待队列中移除并唤醒。
    • 调用 Wait 方法时必须要持有 c.L 的锁。

案例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
	"log"
	"math/rand"
	"sync"
	"time"
)

func main() {

	c := sync.NewCond(&sync.Mutex{})
	var ready int

	for i := 0; i <10 ; i++ {

		go func(i int) {
			time.Sleep(time.Duration(rand.Int63n(10)) * time.Second)

			// 加锁更改等待条件
			c.L.Lock()
			ready++
			c.L.Unlock()
			log.Printf("运动员#%d 准备就绪\\n", i)
			// 广播唤醒所有的等待着
			c.Broadcast()
		}(i)
	}

	c.L.Lock()
	for ready!= 10{
		c.Wait()
		log.Printf("裁判员被唤醒一次")
	}
	c.L.Unlock()

	// 所有运动员是否准备就绪
	log.Println("所有运动员准备就绪,比赛开始!")
}

小结

  • Cond 是为等待 / 通知场景下的并发问题提供⽀持的。它提供了条件变量的三个基本方法Signal、Broadcast 和 Wait,为并发的 goroutine 提供等待 / 通知机制。
  • 使用 Cond 之所以容易出错,就是 Wait 调用需要加锁,以及被唤醒后⼀定要检查条件是否真 的已经满⾜。你需要牢记这两点。
  • WaitGroup和 Cond 的区别:WaitGroup 是主 goroutine 等待确定数量的子 goroutine 完成任务;⽽ Cond 是等待某个条件满⾜,这个条件的修改可以被任意多的 goroutine 更新,⽽且 Cond的 Wait 不关⼼也不知道其他 goroutine 的数量,只关⼼等待条件。⽽且 Cond 还有单个通知的机制,也就是 Signal 方法。

Once

Once可以用来执行且仅仅执行⼀次动作,常常用于单例对象的初始化场景。

使用场景

sync.Once 只暴露了⼀个方法 Do,你可以多次调用 Do 方法,但是只有第⼀次调用 Do 方法 时 f 参数才会执行,这⾥的 f 是⼀个⽆参数⽆返回值的函数。

1
 func (o *Once) Do(f func())

Once 常常用来初始化单例资源,或者并发访问只需初始化⼀次的共享资源,或者在测试的时候初始化⼀次测试资源。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
	"fmt"
	"sync"
)

func main() {
	var once sync.Once
	
	f1:=func(){
		fmt.Println("f1 exce!")
	}
	once.Do(f1)
	
	f2 := func() {
		fmt.Println("f2 exce")
	}
	once.Do(f2)
}

小结

⼀旦你遇到只需要初始化⼀次的场景,⾸先想到的就应该是 Once 并发原语。

Pool

Go 标准库中提供了⼀个通用的 Pool 数据结构,也就是 sync.Pool,我们使用它可以创建池化的对象。但是它池化的对象可能会被垃圾回收掉。

  • sync.Pool 数据类型用来保存⼀组可独立访问的临时对象。
    • 也就是说,它池化的对象会在未来的某个时候被毫无预兆地移除掉。而且,如果没有别的对象引用这个被移除的对象的话,这个被移除的对象就会被垃圾回收掉。

注意点

    1. sync.Pool 本身就是线程安全的,多个 goroutine 可以并发地调用它的方法存取对象;
    1. sync.Pool 不可在使用之后再复制使用。

方法介绍

1.New

  • Pool struct 包含⼀个 New 字段,这个字段的类型是函数 func() interface{}。当调用 Pool 的 Get 方法从池中获取元素,没有更多的空闲元素可返回时,就会调用这个 New 方法来创建新 的元素。如果你没有设置 New 字段,没有更多的空闲元素可返回时,Get 方法将返回 nil,表 明当前没有可用的元素。 有趣的是,New 是可变的字段。这就意味着,你可以在程序运行的时候改变创建元素的方 法。当然,很少有⼈会这么做,因为⼀般我们创建元素的逻辑都是⼀致的,要创建的也是同⼀ 类的元素,所以你在使用 Pool 的时候也没必要玩⼀些“花活”,在程序运行时更改 New 的 值。

2.Get

  • 如果调用这个方法,就会从 Pool取⾛⼀个元素,这也就意味着,这个元素会从 Pool 中移除, 返回给调用者。不过,除了返回值是正常实例化的元素,Get 方法的返回值还可能会是⼀个 nil(Pool.New 字段没有设置,⼜没有空闲元素可以返回),所以你在使用的时候,可能需要 判断。

3.Put

  • 这个方法用于将⼀个元素返还给 Pool,Pool 会把这个元素保存到池中,并且可以复用。但如 果 Put ⼀个 nil 值,Pool 就会忽略这个值。

推荐的三方pool

  • gammazero/workerpool:gammazero/workerpool 可以⽆限制地提交任务,提供了更便利的 Submit 和 SubmitWait 方法提交任务,还可以提供当前的 worker 数和任务数以及关闭 Pool 的功能。
  • ivpusic/grpool:grpool 创建 Pool 的时候需要提供 Worker 的数量和等待执行的任务的 最⼤数量,任务的提交是直接往 Channel 放入任务。
  • dpaks/goworkers:dpaks/goworkers 提供了更便利的 Submit 方法提交任务以及Worker 数、任务数等查询方法、关闭 Pool 的方法。它的任务的执行结果需要在ResultChan 和 ErrChan 中去获取,没有提供阻塞的方法,但是它可以在初始化的时候设置 Worker 的数量和任务数。

pool可能造成的问题

  • 内存泄漏
    • 在使用 sync.Pool 回收 buffer 的时候,⼀定要检查回收的对象的⼤⼩。如果 buffer 太⼤,就 不要回收了,否则就太浪费了
  • 内存浪费
    • 要做到物尽其用,尽可能不浪费的话,我们可以将 buffer 池分成⼏层
    • ⼩于 512 byte的元素的 buffer 占⼀个池子;其次,⼩于 1K byte ⼤⼩的元素占⼀个池子;再次,⼩于 4Kbyte ⼤⼩的元素占⼀个池子。这样分成⼏个池子以后,就可以根据需要,到所需⼤⼩的池子中获取 buffer 了。

小结

  • Pool 是⼀个通用的概念,也是解决对象重用和预先分配的⼀个常用的优化⼿段。即使你⾃⼰ 没在项目中直接使用过,但肯定在使用其它库的时候,就享受到应用 Pool 的好处了,⽐如数 据库的访问、http API 的请求等等。
  • 我们⼀般不会在程序⼀开始的时候就开始考虑优化,⽽是等项目开发到⼀个阶段,或者快结束 的时候,才全⾯地考虑程序中的优化点,⽽ Pool 就是常用的⼀个优化⼿段。如果你发现程序 中有⼀种 GC 耗时特别⾼,有⼤量的相同类型的临时对象,不断地被创建销毁,这时,你就可 以考虑看看,是不是可以通过池化的⼿段重用这些对象。
  • 另外,在分布式系统或者微服务框架中,可能会有⼤量的并发 Client 请求,如果 Client 的耗 时占⽐很⼤,你也可以考虑池化 Client,以便重用。
  • 如果你发现系统中的 goroutine 数量⾮常多,程序的内存资源占用⽐较⼤,⽽且整体系统的耗 时和 GC 也⽐较⾼,我建议你看看,是否能够通过 Worker Pool 解决⼤量 goroutine 的问 题,从⽽降低这些指标。

Context:信息穿透上下文

在 API之间或者方法调用之间,所传递的除了业务参数之外的额外信息。

context使用场景

  • 上下⽂信息传递 (request-scoped),⽐如处理 http 请求、在请求处理链路上传递信 息;
  • 控制子 goroutine 的运行;
  • 超时控制的方法调用;
  • 可以取消的方法调用。

context 接口函数

1
2
3
4
5
6
type Context interface {
		Deadline() (deadline time.Time, ok bool)
		Done() <-chan struct{}
		Err() error
		Value(key interface{}) interface{}
}
  • Deadline 方法会返回这个 Context 被取消的截⽌⽇期。如果没有设置截⽌⽇期,ok 的值是 false。后续每次调用这个对象的 Deadline 方法时,都会返回和第⼀次调用相同的结果。
  • Done 方法返回⼀个 Channel 对象。在 Context 被取消时,此 Channel 会被 close,如果没 被取消,可能会返回 nil。后续的 Done 调用总是返回相同的结果。当 Done 被 close 的时 候,你可以通过 ctx.Err 获取错误信息。Done 这个方法名其实起得并不好,因为名字太过笼 统,不能明确反映 Done 被 close 的原因,因为 cancel、timeout、deadline 都可能导致
  • Done 被 close,不过,目前还没有⼀个更合适的方法名称。
    • 如果 Done 没有被 close,Err 方法返回 nil;如果 Done 被 close,Err 方法会返回 Done 被 close 的原因。
  • Value 返回此 ctx 中和指定的 key 相关联的 value。

Context 中实现了 2 个常用的生成顶层 Context 的方法。

  • context.Background():返回⼀个⾮ nil 的、空的 Context,没有任何值,不会被 cancel,不会超时,没有截⽌⽇期。⼀般用在主函数、初始化、测试以及创建根 Context 的时候
  • context.TODO():返回⼀个⾮ nil 的、空的 Context,没有任何值,不会被 cancel,不会超时,没有截⽌⽇期。当你不清楚是否该用 Context,或者目前还不知道要传递⼀些什么上下⽂信息的时候,就可以使用这个方法。

关于Context的一些约定规定

    1. ⼀般函数使用 Context 的时候,会把这个参数放在第⼀个参数的位置。从来不把 nil 当Context 类型的参数值,可以使用 context.Background() 创建⼀个空的上下⽂对象,也不要使用 nil。
  • 2.Context 只用来临时做函数之间的上下⽂透传,不能持久化 Context 或者把 Context ⻓久存。把 Context 持久化到数据库、本地⽂件或者全局变量、缓存中都是错误的用法
  • 3.key 的类型不应该是字符串类型或者其它内建类型,否则容易在包之间使用 Context 时候产生冲突。使用 WithValue 时,key 的类型应该是⾃⼰定义的类型。
  • 4.常常使用 struct{}作为底层类型定义 key 的类型。对于 exported key 的静态类型,常常是接⼝或者指针。这样可以尽量减少内存分配。

应用场景

main函数返回时,所有的goroutine都会被直接打断,程序退出。除此之外如果想通过编程的方法让一个goroutine中断其他goroutine的执行,只能是通过在多个goroutine间通过context上下文对象同步取消信号的方式来实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func main() {
			ctx, cancel := context.WithCancel(context.Background())
			go func() {
			defer func() {
			fmt.Println("goroutine exit")
			}()
			for {
			select {
			case <-ctx.Done():
			return
			default:
			time.Sleep(time.Second)
			}
			}
			}()
			time.Sleep(time.Second)
			cancel()
			time.Sleep(2 * time.Second)
	}

atomic 原子操作

原子操作,是因为⼀个原子在执行的时候,其它线程不会看到执行⼀半的操作结果。在其它线程看来,原子操作要么执行完了,要么还没有执行,就像⼀个最⼩的粒子 - 原子⼀样,不可分割

atomic 提供的方法

  • atomic 操作的对象是⼀个地址,你需要把可寻址的变量的地址作为参数传递给方法,⽽不是把变量的值传递给方法。

Add

Add 方法就是给第⼀个参数地址中的值增加⼀个 delta 值

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// AddInt32 atomically adds delta to *addr and returns the new value.
func AddInt32(addr *int32, delta int32) (new int32)

// AddUint32 atomically adds delta to *addr and returns the new value.
// To subtract a signed positive constant value c from x, do AddUint32(&x, ^uint32(c-1)).
// In particular, to decrement x, do AddUint32(&x, ^uint32(0)).
func AddUint32(addr *uint32, delta uint32) (new uint32)

// AddInt64 atomically adds delta to *addr and returns the new value.
func AddInt64(addr *int64, delta int64) (new int64)

// AddUint64 atomically adds delta to *addr and returns the new value.
// To subtract a signed positive constant value c from x, do AddUint64(&x, ^uint64(c-1)).
// In particular, to decrement x, do AddUint64(&x, ^uint64(0)).
func AddUint64(addr *uint64, delta uint64) (new uint64)

// AddUintptr atomically adds delta to *addr and returns the new value.
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr

CAS (CompareAndSwap)

这个方法会⽐较当前 addr 地址⾥的值是不是 old,如果不等于 old,就返回 false;如果等于old,就把此地址的值替换成 new 值,返回 true。这就相当于“判断相等才替换”。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// CompareAndSwapInt32 executes the compare-and-swap operation for an int32 value.
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

// CompareAndSwapInt64 executes the compare-and-swap operation for an int64 value.
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)

// CompareAndSwapUint32 executes the compare-and-swap operation for a uint32 value.
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)

// CompareAndSwapUint64 executes the compare-and-swap operation for a uint64 value.
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)

// CompareAndSwapUintptr executes the compare-and-swap operation for a uintptr value.
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)

// CompareAndSwapPointer executes the compare-and-swap operation for a unsafe.Pointer value.
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

效果如下

1
2
3
4
5
if *addr == old {
	*addr = new
return true
}
return false

Swap

如果不需要⽐较旧值,只是⽐较粗暴地替换的话,就可以使用 Swap 方法,它替换后还可以 返回旧值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// SwapInt32 atomically stores new into *addr and returns the previous *addr value.
func SwapInt32(addr *int32, new int32) (old int32)

// SwapInt64 atomically stores new into *addr and returns the previous *addr value.
func SwapInt64(addr *int64, new int64) (old int64)

// SwapUint32 atomically stores new into *addr and returns the previous *addr value.
func SwapUint32(addr *uint32, new uint32) (old uint32)

// SwapUint64 atomically stores new into *addr and returns the previous *addr value.
func SwapUint64(addr *uint64, new uint64) (old uint64)

// SwapUintptr atomically stores new into *addr and returns the previous *addr value.
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)

// SwapPointer atomically stores new into *addr and returns the previous *addr value.
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)

效果如下:

1
2
3
old = *addr
*addr = new
return old

Load

Load 方法会取出 addr 地址中的值,即使在多处理器、多核、有 CPU cache 的情况下,这个操作也能保证 Load 是⼀个原子操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// LoadInt32 atomically loads *addr.
func LoadInt32(addr *int32) (val int32)

// LoadInt64 atomically loads *addr.
func LoadInt64(addr *int64) (val int64)

// LoadUint32 atomically loads *addr.
func LoadUint32(addr *uint32) (val uint32)

// LoadUint64 atomically loads *addr.
func LoadUint64(addr *uint64) (val uint64)

// LoadUintptr atomically loads *addr.
func LoadUintptr(addr *uintptr) (val uintptr)

// LoadPointer atomically loads *addr.
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)

Store

Store 方法会把⼀个值存入到指定的 addr 地址中,即使在多处理器、多核、有 CPU cache的情况下,这个操作也能保证 Store 是⼀个原子操作。别的 goroutine 通过 Load 读取出来,不会看到存取了⼀半的值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// StoreInt32 atomically stores val into *addr.
func StoreInt32(addr *int32, val int32)

// StoreInt64 atomically stores val into *addr.
func StoreInt64(addr *int64, val int64)

// StoreUint32 atomically stores val into *addr.
func StoreUint32(addr *uint32, val uint32)

// StoreUint64 atomically stores val into *addr.
func StoreUint64(addr *uint64, val uint64)

// StoreUintptr atomically stores val into *addr.
func StoreUintptr(addr *uintptr, val uintptr)

// StorePointer atomically stores val into *addr.
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer

Value 类型

它可以原子地存取对象类型,但也只能存取,不能 CAS 和 Swap,常常用在配置变更等场景中

1
2
3
4
5
6
7
// A Value must not be copied after first use.
type Value struct {
	v interface{}
}

func (v *Value) Load() (x interface{}) {...}
func (v *Value) Store(x interface{}) {...}

Channel:解决并发问题

CSP允许使用进程组件来描述系统,它们独⽴运行,并且只通过消息传递的方式通信。

Channel 的应用场景

执行业务处理的 goroutine 不要通过共享内存的方式通信,⽽是要通过 Channel 通信的方式分享数据

  • “communicate by sharing memory”是传统的并发编程处理方式,就是指,共享的数据需要用锁进行保护,goroutine 需要获取到锁,才能并发访问数据。
  • “share memory by communicating”则是类似于 CSP 模型的方式,通过通信的方式,⼀个goroutine 可以把数据的“所有权”交给另外⼀个 goroutine(虽然 Go 中没有“所有权”的概念,但是从逻辑上说,你可以把它理解为是所有权的转移)。

五大应用场景

  • 数据交流:当作并发的 buffer 或者 queue,解决生产者 - 消费者问题。多个 goroutine 可以并发当作生产者(Producer)和消费者(Consumer)。
  • 数据传递:⼀个 goroutine 将数据交给另⼀个 goroutine,相当于把数据的拥有权 (引用)托付出去。
  • 信号通知:⼀个 goroutine 可以将信号 (closing、closed、data ready 等) 传递给另⼀个或者另⼀组 goroutine 。
  • 任务编排:可以让⼀组 goroutine 按照⼀定的顺序并发或者串行的执行,这就是编排的功能。
  • :利用 Channel 也可以实现互斥锁的机制。

channel 基本用法

<- v有个规则,总是尽量和左边的 chan 结合(The <- operator associates with the leftmost chan possible:

  • nil 是 chan 的零值,是⼀种特殊的 chan,对值是 nil 的 chan 的发送接收调用者总是会阻塞

关于channel的选择

    1. 共享资源的并发访问使用传统并发原语;
    1. 复杂的任务编排和消息传递使用 Channel;
    1. 消息通知机制使用 Channel,除非只想 signal ⼀个 goroutine,才使用 Cond;
    1. 简单等待所有任务的完成用 WaitGroup,也有 Channel 的推崇者用 Channel,都可以;
    1. 需要和 Select 语句结合,使用 Channel;
    1. 需要和超时配合时,使用 Channel 和 Context。

chan 的编排方式

Or-Done 模式、扇入模式、扇出模式、Stream 和 map-reduce

Or-Done 模式

Or-Done 模式是信号通知模式中更宽泛的⼀种模式

我们会使用“信号通知”实现某个任务执行完成后的通知机制,在实现时,我们为这个任务定义 ⼀个类型为 chan struct{}类型的 done 变量,等任务结束后,我们就可以 close 这个变量, 然后,其它 receiver 就会收到这个通知。 这是有⼀个任务的情况,如果有多个任务,只要有任意⼀个任务执行完,我们就想获得这个信 号,这就是 Or-Done 模式。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package main

import (
	"fmt"
	"reflect"
	"time"
)

// or or-done 模式 递归实现
func or(channels ...<-chan interface{}) <-chan interface{} {
	// 特殊情况,只有零个,1个或2个 chan
	switch len(channels) {
	case 0:
		return nil
	case 1:
		return channels[0]
	case 2:
		select {
		case <-channels[0]:
		case <-channels[1]:
		}
	}

	orDone := make(chan interface{})
	go func() {
		defer close(orDone)
		// fmt.Println("执行")
		if len(channels) > 2 {
			m := len(channels) / 2
			select {
			case <-or(channels[:m]...):
			case <-or(channels[m:]...):
			}
		}
	}()
	return orDone
}

func sig(after time.Duration) <-chan interface{} {
	c := make(chan interface{})
	go func() {
		defer close(c)
		time.Sleep(after)
	}()

	return c
}

// orSelect 反射⽅式 实现
func orSelect(channels ...<-chan interface{}) <-chan interface{} {
	// 特殊情况,只有零个,1个或2个 chan
	switch len(channels) {
	case 0:
		return nil
	case 1:
		return channels[0]
	}

	orDone := make(chan interface{})
	go func() {
		defer close(orDone)
		// 利用反射构建SelectCase
		var cases []reflect.SelectCase
		for _, c := range channels {
			cases = append(cases, reflect.SelectCase{
				Dir:  reflect.SelectRecv,
				Chan: reflect.ValueOf(c),
			})
		}
		// 随机选取一个可用 case
		reflect.Select(cases)
	}()

	return orDone
}

func main() {
	start := time.Now()

	<-orSelect(
		sig(10*time.Second),
		sig(20*time.Second),
		sig(30*time.Second),
		sig(40*time.Second),
		sig(50*time.Second),
		sig(01*time.Second),
	)

	fmt.Printf("done after %v", time.Since(start))

}

扇入模式

扇入借鉴了数字电路的概念,它定义了单个逻辑们能够接受的数字信号输入最⼤量的术语。⼀ 个逻辑们可以有多个输入,⼀个输出。

  • 在软件⼯程中,模块的扇入是指有多少个上级模块调用它
  • 而对于我们这里的 Channel 扇入模式来说,就是指有多个源 Channel 输入、⼀个目的 Channel 输出的情况。

扇入比就是源 Channel 数量比1。

  • 每个源 Channel 的元素都会发送给目标 Channel,相当于目标 Channel 的 receiver 只需要 监听目标 Channel,就可以接收所有发送给源 Channel 的数据。
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
package main

import (
	"fmt"
	"reflect"
	"time"
)

// 扇入模式 反射实现
func fanInReflect(channels ...<-chan interface{}) <-chan interface{} {
	out:= make(chan interface{})
	go func() {
		defer close(out)
		// 构造 SelectCases slice
		var cases []reflect.SelectCase
		for _, c := range channels{
			cases = append(cases, reflect.SelectCase{
				Dir:  reflect.SelectRecv,
				Chan: reflect.ValueOf(c),
			})
		}
	
		// 循环,从 cases 中选择一个可用的
		for len(cases)>0{
			i,v,ok:=reflect.Select(cases)
			if !ok{ // chan 关闭
				cases = append(cases[:i],cases[i+1:]...)
				continue
			}
			out <-v.Interface()
		}
	}()
	
	return out
}

func fanInRec(channels ...<-chan interface{}) <-chan interface{}{
	switch len(channels){
	case 0:
		c:= make(chan interface{})
		close(c)
		return c
	case 1:
		return channels[0]
	case 2:
		return mergeTow(channels[0], channels[1])
	default:
		m:=len(channels)/2
		return mergeTow(
			fanInRec(channels[:m]...),
			fanInRec(channels[m:]...))
	}
}

// 合并两个 chan
func mergeTow(a,b <-chan interface{}) <-chan interface{} {
	c := make(chan interface{})
	go func() {
		defer close(c)
		for a!= nil || b !=nil{
			select{
			case v,ok := <-a:
				if !ok { // a 已关闭,设置为nil
					a=nil
					continue
				}
				c <- v
			
			case v,ok := <-b:
				if !ok { // b已关闭,设置为nil
				b=nil
				continue
				}
				c <- v
			}
		}
	}()

	return c
}

func sigs(after time.Duration) <-chan interface{} {
	c := make(chan interface{})
	go func() {
		defer close(c)
		time.Sleep(after)
	}()

	return c
}

func main() {
	start := time.Now()

	<-fanInReflect(
		sigs(10*time.Second),
		sigs(02*time.Second),
		sigs(03*time.Second),
		sigs(04*time.Second),
		sigs(05*time.Second),
		sigs(01*time.Second),
	)

	fmt.Printf("done after %v", time.Since(start))
}

扇出模式

扇出模式是和扇入模式相反的。

扇出模式只有⼀个输入源 Channel,有多个目标 Channel,扇出比就是 1比目标 Channel 数 的值,经常用在设计模式中的观察者模式中(观察者设计模式定义了对象间的⼀种⼀对多的 组合关系。这样⼀来,⼀个对象的状态发⽣变化时,所有依赖于它的对象都会得到通知并⾃动 刷新)。在观察者模式中,数据变动后,多个观察者都会收到这个变更信号。