Golang 并发编程最佳实践#
Goroutine 定义#
Goroutines 是与其他函数或方法同时运行的函数或方法。Goroutines 可以被认为是轻量级线程。与线程相比,创建 Goroutine 的成本很小。因此,Go 应用程序通常会同时运行数千个 Goroutine。
Goroutines 相对于线程的优势#
- 与线程相比,Goroutines 非常便宜。它们的堆栈大小只有几 kb,堆栈可以根据应用程序的需要增长和缩小,而在线程的情况下,堆栈大小必须指定并固定。
- Goroutine 被多路复用到更少数量的 OS 线程。一个包含数千个 Goroutine 的程序中可能只有一个线程。如果该线程中的任何 Goroutine 阻塞等待用户输入,则创建另一个 OS 线程并将剩余的 Goroutine 移动到新的 OS 线程。所有这些都由运行时处理,我们作为程序员从这些复杂的细节中抽象出来,并获得了一个干净的 API 来处理并发性。
- Goroutines 使用通道进行通信。通道通过设计防止在使用 Goroutine 访问共享内存时发生竞争条件。通道可以被认为是 Goroutine 进行通信的管道。
三种常见使用#
通过sync. WaitGroup
的三个方法 Add()
, Done()
, Wait()
来实现协程的控制#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| package main
import (
"fmt"
"sync"
)
func A(i int) {
fmt.Println("我是A", i)
}
func main() {
var wg sync.WaitGroup
fmt.Println("我是main")
wg.Add(1)
go func(i int) {
defer wg.Done()
A(i)
}(1)
wg.Wait()
fmt.Println("执行完了")
}
|
通过带buffe
的channel
来控制#
无缓冲 channel,发送者会阻塞直到接收者接收了发送的值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| package main
import (
"fmt"
)
func A(i int) {
fmt.Println("我是A", i)
}
func main() {
ch := make(chan bool, 1)
fmt.Println("我是main")
go func(i int, chp chan<- bool) {
defer close(chp)
A(i)
fmt.Println("finish")
chp <- true
}(1, ch)
fmt.Println("wait")
<-ch
fmt.Println("执行完了")
}
|
通过sync.Cond
#
sync.Cond
条件变量用来协调想要访问共享资源的那些 goroutine,当共享资源的状态发生变化的时候,它可以用来通知被互斥锁阻塞的 goroutine
。
cond.L.Lock()
和cond.L.Unlock()
:也可以使用lock.Lock()
和lock.Unlock()
,完全一样,因为是指针转递cond.Wait()
:Unlock()->*阻塞等待通知(即等待Signal()或Broadcast()的通知)->收到通知*->Lock()cond.Signal()
:Signal
唤醒一个协程,若没有Wait(),也不会报错。Signal()通知的顺序是根据原来加入通知列表(Wait())的先入先出cond.Broadcast()
: 通知所有Wait()了的,若没有Wait(),也不会报错
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| var done = false
func read(name string, c *sync.Cond) {
c.L.Lock()
for !done {
c.Wait()
}
log.Println(name, "starts reading")
c.L.Unlock()
}
func write(name string, c *sync.Cond) {
log.Println(name, "starts writing")
time.Sleep(time.Second)
c.L.Lock()
done = true
c.L.Unlock()
log.Println(name, "wakes all")
c.Broadcast()
}
func main() {
cond := sync.NewCond(&sync.Mutex{})
go read("reader1", cond)
go read("reader2", cond)
go read("reader3", cond)
write("writer", cond)
time.Sleep(time.Second * 3)
}
|
done
即互斥锁需要保护的条件变量。read()
调用 Wait()
等待通知,直到 done 为 true。write()
接收数据,接收完成后,将 done 置为 true,调用 Broadcast()
通知所有等待的协程。write()
中的暂停了 1s,一方面是模拟耗时,另一方面是确保前面的 3 个 read 协程都执行到 Wait()
,处于等待状态。main 函数最后暂停了 3s,确保所有操作执行完毕。
使用案例#
从 Channel 发送和接收值#
我们创建一个 c int
通道并从生成器函数中返回它。在匿名 goroutine
中运行的 for
循环将值写入通道 c
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| package main
import (
"fmt"
)
func main() {
c := generator()
receiver(c)
}
func receiver(c <-chan int) {
for v := range c {
fmt.Println(v)
}
}
func generator() <-chan int {
c := make(chan int)
go func() {
for i := 0; i < 10; i++ {
c <- i
}
close(c)
}()
return c
}
|
读取和写入斐波那契数列到通道#
主函数有两个无缓冲通道 ch
和 quit
。在 fibonacci
函数内部,select
语句会阻塞,直到其中一个 case
准备好。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| package main
import (
"fmt"
)
func fibonacci(ch chan int, quit chan bool) {
x, y := 0, 1
for {
select {
case ch <- x: // write to channel ch
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}
func main() {
ch := make(chan int)
quit := make(chan bool)
n := 10
go func(n int) {
for i := 0; i < n; i++ {
fmt.Println(<-ch) // read from channel ch
}
quit <- false
}(n)
fibonacci(ch, quit)
}
|
生产消费者模型#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
| package main
import (
"flag"
"fmt"
"log"
"os"
"runtime"
"runtime/pprof"
)
type Consumer struct {
msgs *chan int
}
// NewConsumer creates a Consumer
func NewConsumer(msgs *chan int) *Consumer {
return &Consumer{msgs: msgs}
}
// consume reads the msgs channel
func (c *Consumer) consume() {
fmt.Println("consume: Started")
for {
msg := <-*c.msgs
fmt.Println("consume: Received:", msg)
}
}
// Producer definition
type Producer struct {
msgs *chan int
done *chan bool
}
// NewProducer creates a Producer
func NewProducer(msgs *chan int, done *chan bool) *Producer {
return &Producer{msgs: msgs, done: done}
}
// produce creates and sends the message through msgs channel
func (p *Producer) produce(max int) {
fmt.Println("produce: Started")
for i := 0; i < max; i++ {
fmt.Println("produce: Sending ", i)
*p.msgs <- i
}
*p.done <- true // signal when done
fmt.Println("produce: Done")
}
func main() {
// profile flags
cpuprofile := flag.String("cpuprofile", "", "write cpu profile to `file`")
memprofile := flag.String("memprofile", "", "write memory profile to `file`")
// get the maximum number of messages from flags
max := flag.Int("n", 5, "defines the number of messages")
flag.Parse()
// utilize the max num of cores available
runtime.GOMAXPROCS(runtime.NumCPU())
// CPU Profile
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
log.Fatal("could not create CPU profile: ", err)
}
if err := pprof.StartCPUProfile(f); err != nil {
log.Fatal("could not start CPU profile: ", err)
}
defer pprof.SweightCPUProfile()
}
var msgs = make(chan int) // channel to send messages
var done = make(chan bool) // channel to control when production is done
// Start a goroutine for Produce.produce
go NewProducer(&msgs, &done).produce(*max)
// Start a goroutine for Consumer.consume
go NewConsumer(&msgs).consume()
// Finish the program when the production is done
<-done
// Memory Profile
if *memprofile != "" {
f, err := os.Create(*memprofile)
if err != nil {
log.Fatal("could not create memory profile: ", err)
}
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
log.Fatal("could not write memory profile: ", err)
}
f.Close()
}
}
|
Generator 模式#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
| package main
import (
"fmt"
"math/rand"
"time"
)
// boring is a function that returns a channel to communicate with it.
// <-chan string means receives-only channel of string.
func boring(msg string) <-chan string {
c := make(chan string)
// we launch goroutine inside a function
// that sends the data to channel
go func() {
// The for loop simulate the infinite sender.
for i := 0; i < 10; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
// The sender should close the channel
close(c)
}()
return c // return a channel to caller.
}
func main() {
joe := boring("Joe")
ahn := boring("Ahn")
// This loop yields 2 channels in sequence
for i := 0; i < 10; i++ {
fmt.Println(<-joe)
fmt.Println(<-ahn)
}
// or we can simply use the for range
// for msg := range joe {
// fmt.Println(msg)
// }
fmt.Println("You're both boring. I'm leaving")
}
|
google 3.0#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
| package main
import (
"fmt"
"math/rand"
"time"
)
type Result string
type Search func(query string) Result
var (
Web1 = fakeSearch("web1")
Web2 = fakeSearch("web2")
Image1 = fakeSearch("image1")
Image2 = fakeSearch("image2")
Video1 = fakeSearch("video1")
Video2 = fakeSearch("video2")
)
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\n", kind, query))
}
}
// How do we avoid discarding result from the slow server.
// We duplicates to many instance, and perform parallel request.
func First(query string, replicas ...Search) Result {
c := make(chan Result)
for i := range replicas {
go func(idx int) {
c <- replicas[idx](query)
}(i)
}
// the magic is here. First function always waits for 1 time after receiving the result
return <-c
}
// I don't want to wait for slow server
func Google(query string) []Result {
c := make(chan Result)
// each search performs in a goroutine
go func() {
c <- First(query, Web1, Web2)
}()
go func() {
c <- First(query, Image1, Image2)
}()
go func() {
c <- First(query, Video1, Video2)
}()
var results []Result
// the global timeout for 3 queries
// it means after 50ms, it ignores the result from the server that taking response greater than 50ms
timeout := time.After(50 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case r := <-c:
results = append(results, r)
// this line ignore the slow server.
case <-timeout:
fmt.Println("timeout")
return results
}
}
return results
}
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang")
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}
|
work-pool#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
| // Credit:
// https://gobyexample.com/worker-pools
// Worker pool benefits:
// - Efficiency because it distributes the work across threads.
// - Flow control: Limit work in flight
// Disadvantage of worker:
// Lifetimes complexity: clean up and idle worker
// Principles:
// Start goroutines whenever you have the concurrent work to do.
// The goroutine should exit as soon as posible the work is done. This helps us
// to clean up the resources and manage the lifetimes correctly.
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "fnished job", j)
results <- j * 2
}
}
func workerEfficient(id int, jobs <-chan int, results chan<- int) {
// sync.WaitGroup helps us to manage the job
var wg sync.WaitGroup
for j := range jobs {
wg.Add(1)
// we start a goroutine to run the job
go func(job int) {
// start the job
fmt.Println("worker", id, "started job", job)
time.Sleep(time.Second)
fmt.Println("worker", id, "fnished job", job)
results <- job * 2
wg.Done()
}(j)
}
// With a help to manage the lifetimes of goroutines
// we can add more handler when a goroutine finished
wg.Wait()
}
func main() {
const numbJobs = 8
jobs := make(chan int, numbJobs)
results := make(chan int, numbJobs)
// 1. Start the worker
// it is a fixed pool of goroutines receive and perform tasks from a channel
// In this example, we define a fixed 3 workers
// they receive the `jobs` from the channel jobs
// we also naming the worker name with `w` variable.
for w := 1; w <= 3; w++ {
go workerEfficient(w, jobs, results)
}
// 2. send the work
// other goroutine sends the work to the channels
// in this example, the `main` goroutine sends the work to the channel `jobs`
for j := 1; j <= numbJobs; j++ {
jobs <- j
}
close(jobs)
fmt.Println("Closed job")
for a := 1; a <= numbJobs; a++ {
<-results
}
close(results)
}
|
崩溃处理(recover)#
在Go语言中,如果一个协程崩溃了,则所有协程都会退出,比如数组越界,会触发panic(相当于throw exception), 这对持续可运行的应用来说,显然不是我们想要的效果.那这个时候我们需要对崩溃进行修复.
在Go
语言中提供了一个defer
和recover
来实现崩溃恢复,这个相当于其它语言的try catch
的方式.在使用recover
函数时,如果要达到能捕获异常的作用,有几点需要注意:
recover
如果想起作用的话, 必须在defer
函数前声明,因为只要panic
,后面的函数不会被执行recover
函数只有在方法内部发生panic
时,返回值不会为nil
,没有panic
的情况下返回值为nil
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| package main
import (
"fmt"
"sync"
)我·
func A(i int) {
fmt.Println("我是A", i)
panic("崩溃")
}
func main() {
var wg sync.WaitGroup
fmt.Println("我是main")
wg.Add(1)
go func(i int) {
defer func() {
//在调用A函数前声明defer recover,能捕获异常
if err := recover(); err != nil {
fmt.Println("恢复", err)
}
wg.Done()
}()
A(i)
}(1)
wg.Wait()
fmt.Println("执行完了")
}
|
如果在协程内执行其它函数时,为了保证不崩溃,安全的做法是,提前声明defer recover函数
参考链接#