Go程序可以使用channel
进行多个goroutine
间的数据交换,但是这仅仅是数据同步中的一种方法。Go语言与其他语言如C、Java一样,也提供了同步机制,在某些轻量级的场合,原子访问(sync/atomic
包),互斥锁(sync.Mutex
)以及等待组(sync.WaitGroup
)能最大程度满足需求。
利用通道优雅的实现了并发通信,但是其内部的实现依然使用了各种锁,因此优雅代码的代价是性能的损失。
Go语言包中的sync
包提供了两种锁类型:sync.Mutex
和sync.RWMutex
,前者是互斥锁,后者是读写锁。
🌙 1.互斥锁
互斥锁是传统并发程序进行共享资源访问控制的主要方式。
Go中使用结构体sync.Mutex
表示互斥锁,保证同时只有一个goroutine
可以访问共享资源。
建议:同一个互斥锁的成对锁定和解锁操作放在同一层次的代码块中。 使用锁的经典模式:
var lck sync.Mutex
func foo() {
// 加锁
lck.Lock()
// 解锁
defer lck.Unlock()
// 处理任务...
}
2
3
4
5
6
7
8
lck.Lock()
会阻塞直到获取锁,然后利用defer
语句在函数返回时自动释放锁。
🌙 1.1示例一:普通数据加锁
import (
"fmt"
"sync"
"time"
)
func testDemo1() {
var mutex sync.Mutex
var wg sync.WaitGroup
num := 0
for i:=0; i<10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// 1.加锁
mutex.Lock()
// 2.并发任务
num += 1
// 3.解锁
mutex.Unlock()
}()
}
wg.Wait()
fmt.Println("num=", num)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func testDemo2() {
wg := sync.WaitGroup{}
var mutex sync.Mutex
fmt.Println("Locking G0")
mutex.Lock()
fmt.Println("Locked G0")
for i:=1; i<4;i++ {
wg.Add(1)
go func(i int) {
fmt.Printf("Locking G%d\n", i)
mutex.Lock()
fmt.Printf("Locked G%d\n", i)
time.Sleep(time.Second * 2)
mutex.Unlock()
fmt.Printf("Unlocked G%d\n", i)
wg.Done()
}(i)
}
time.Sleep(time.Second * 5)
fmt.Println("Ready unlock G0")
mutex.Unlock()
fmt.Println("Unlocked G0")
wg.Wait()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
结果:
Locking G0
Locked G0
Locking G3
Locking G1
Locking G2
Ready unlock G0
Unlocked G0
Locked G3
Unlocked G3
Locked G1
Unlocked G1
Locked G2
Unlocked G2
2
3
4
5
6
7
8
9
10
11
12
13
通过程序执行结果我们可以看到,当有锁释放时,才能进行lock动作,G0锁释放时,才有后续锁释放的可能,这里是G3抢到释放机会。
🌙 1.2 示例二:对象加锁
Mutex
也可以作为struct
的一部分,这样这个struct
就会防止被多线程更改数据。
type Book struct {
Name string
LK *sync.Mutex
}
func(b *Book) SetName(wg *sync.WaitGroup, name string) {
defer func() {
fmt.Println("Unlock set name:", name)
b.LK.Unlock()
wg.Done()
}()
b.LK.Lock()
fmt.Println("Lock set name:", name)
b.Name = name
}
func testDemo3() {
bk := Book{
LK: new(sync.Mutex),
}
wg := &sync.WaitGroup{}
books := []string{"Java", "Golang", "C++"}
for _, book:=range books {
wg.Add(1)
bk.SetName(wg, book)
}
wg.Wait()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
结果:
Lock set name: Java
Unlock set name: Java
Lock set name: Golang
Unlock set name: Golang
Lock set name: C++
Unlock set name: C++
2
3
4
5
6
type Account struct {
Money int
Lk *sync.Mutex
}
// 注意:指针接收器
func(a *Account) Query() {
fmt.Println("Money balance: ", a.Money)
}
// 注意:指针接收器
func(a *Account) Add(wg *sync.WaitGroup, num int) {
defer func() {
fmt.Println("Unlock Add: ", num)
a.Lk.Unlock()
wg.Done()
}()
a.Lk.Lock()
fmt.Println("Lock Add: ", num)
a.Money += num
}
func testDemo4() {
a := &Account{
0,
new(sync.Mutex),
//&sync.Mutex{}, // 同上一行
}
wg := &sync.WaitGroup{}
for i:=1; i<=10; i++ {
wg.Add(1)
go func(n int) {
a.Add(wg, n)
}(i)
}
// 等待执行
wg.Wait()
// 查询结果
a.Query()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
结果:
Lock Add: 1
Unlock Add: 1
Lock Add: 10
Unlock Add: 10
Lock Add: 4
Unlock Add: 4
Lock Add: 5
Unlock Add: 5
Lock Add: 6
Unlock Add: 6
Lock Add: 2
Unlock Add: 2
Lock Add: 7
Unlock Add: 7
Lock Add: 8
Unlock Add: 8
Lock Add: 9
Unlock Add: 9
Lock Add: 3
Unlock Add: 3
Money balance: 55
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
🌙 2.读写锁
读写锁是分别针对读操作和写操作进行锁定和解锁操作的互斥锁。在Go语言中,读写锁由结构体类型sync.RWMutex
代表。
// 设定为写模式:与互斥锁使用方式一致,一路只写
func (*RWMutex) Lock() // 锁定写
func (*RWMutex) Unlock() // 解锁写
// 设定为读模式:对读执行加锁解锁,即多路只读
func (*RWMutex) RLock()
func (*RWMutex) RUnlock()
2
3
4
5
6
7
- 写操作与读操作之间也是互斥的
- 读写锁控制下的多个写操作之间是互斥的,即一路写
- 多个读操作之间不存在互斥关系,即多路读
func testDemo5() {
var rwm sync.RWMutex
for i := 0; i < 3; i++ {
go func(i int) {
fmt.Println("Try Lock 【reading】 i:", i)
rwm.RLock() // 一个读加锁
fmt.Println("Ready Lock 【reading】 i:", i)
time.Sleep(time.Second * 2)
fmt.Println("Try Unlock 【reading】 i:", i)
rwm.RUnlock() // 一个读解锁
fmt.Println("Ready Unlock 【reading】 i:", i)
}(i)
}
time.Sleep(time.Microsecond * 100)
fmt.Println("Try Lock 【writing】")
rwm.Lock() // 必须等读解锁才可以写加锁
fmt.Println("Ready Locked 【writing】")
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Mutex
和RWMutex
都不关联goroutine
,但RWMutex
显然更适用于读多写少的场景。仅针对读的性能来说,RWMutex
要高于Mutex
,因为RWMutex
的多个读可以并存。- 所有被读锁定的
goroutine
会在写解锁时唤醒 - 读解锁只会在没有任何读锁定时,唤醒一个要进行写锁定而被阻塞的
goroutine
- 对未被锁定的读写锁进行写解锁或读解锁,都会引发运行时崩溃
panic
- 对同一个读写锁来说,读锁定可以有多个,所以需要进行等量的读解锁,才能让某一个写锁获得机会,否则该
goroutine
一直处于阻塞,但是sync.RWMutext
没有提供获取读锁数量方法,这里需要使用defer
避免,如下案例所示。
🌙 3.死锁
常见会出现死锁的场景:
- 两个协程互相要求对方先操作,如:AB相互要求对方先发红包,然后自己再发
- 读写双方相互要求对方先执行,自己后执行
模拟死锁:
func testDemo6() {
var rwm sync.RWMutex
ch := make(chan int)
go func() {
rwm.RLock() // 加读锁
x := <- ch // 如果不写入,则无法读取
fmt.Println("读取到的x:", x)
rwm.RUnlock()
}()
go func() {
rwm.Lock() // 加入写锁
ch <- 10 // 管道无缓存,没有读取,则无法写入
fmt.Println("写入:", 10)
rwm.Unlock()
}()
time.Sleep(time.Second * 5)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
将上述死锁案例中的锁部分代码去除,则两个协程正常执行。
🌙 4.等待组
sync.WaitGroup
类型是并发安全的,该类型结构体中有一个计数器,计数器的值可以通过方法调用实现计数器的增加或减少。
func(wg *WaitGroup) Add(delta int)
: 等待组计数器+delta
, 也可以传入负数使计数器减少,但是计数器值不能为负值,否则引发panic
func(wg *WaitGroup) Done()
: 等待组计数器-1, 等同于Add(-1)
func(wg *WaitGroup) Wait()
: 等待组计数器!=0时阻塞,直到为0
:有了等待组,我们就不需要再在函数中使用
time.Sleep()
方法来模拟等待协程运行结束了。
等待组与互斥锁配合解决钱数问题:
func testDemo7() {
var mt sync.Mutex
var wg sync.WaitGroup
money := 10000
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
mt.Lock()
fmt.Printf("goroutine %d lock\n", i)
for j := 0; j < 100; j++ {
money += 10
}
fmt.Printf("groutine %d unlock\n", i)
mt.Unlock()
wg.Done()
}(i)
}
wg.Wait()
fmt.Println("money =", money) // 20000
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
可能的结果:
goroutine 0 lock
groutine 0 unlock
goroutine 9 lock
groutine 9 unlock
goroutine 4 lock
groutine 4 unlock
goroutine 5 lock
groutine 5 unlock
goroutine 6 lock
groutine 6 unlock
goroutine 7 lock
groutine 7 unlock
goroutine 8 lock
groutine 8 unlock
goroutine 1 lock
groutine 1 unlock
goroutine 2 lock
groutine 2 unlock
goroutine 3 lock
groutine 3 unlock
money = 20000
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
🌙 5.条件变量
sync.Cond
类型即是Go中的条件变量,该类型内部包含一个锁接口。条件变量通常与锁配合使用:
创建条件变量的函数:
func NewCond(l locker) *Cond // 条件变量必须传入一个锁,二者需要配合使用
*sync.Cond
类型有三个方法:
+Wait
: 该方法会阻塞等待条件变量满足条件。也会对锁进行解锁,一旦收到通知则唤醒,并立即锁定该锁
Signal
: 发送通知(单发),给一个正在等待在该条件变量上的协程发送通知Broadcast
: 发送通知(广播),给正在等待该条件变量的所有协程发送通知
使用条件变量优化生产消费模型(支持多个生产者、多个消费者):
func producer(ch chan<- int, cond *sync.Cond, BUFLEN int) {
for {
// 给条件变量对应的互斥锁加锁
cond.L.Lock()
// 缓冲区满,则等待消费者消费,这里不能是if
for len(ch) == BUFLEN {
cond.Wait()
}
// 写入缓冲区一个随机数
n := rand.Intn(1000)
ch <- n
fmt.Println("Produce:", n)
// 生产结束,解锁互斥锁
cond.L.Unlock()
// 一旦生产后,就唤醒其他被阻塞的消费者
cond.Signal()
time.Sleep(time.Second * 2)
}
}
func consumer(ch <-chan int, cond *sync.Cond) {
for {
// 全局条件变量加锁
cond.L.Lock()
// 缓冲区为空,则等待生产者生产,这里不能是if
for len(ch) == 0 {
cond.Wait()
}
fmt.Println("Consume:", <-ch)
cond.L.Unlock()
// 一旦消费后,就唤醒其他被阻塞的生产者
cond.Signal()
time.Sleep(time.Second * 1)
}
}
func testDemo8() {
// 定义缓冲区大小
const BUFLEN = 5
// 定义cond
var cond = sync.NewCond(&sync.Mutex{})
// 设置随机数种子
rand.Seed(time.Now().UnixNano())
//生产消费模型中的通道
ch := make(chan int, BUFLEN)
// 启动10个生产者
for i := 0; i < 10; i++ {
go producer(ch, cond, BUFLEN)
}
// 启动10个消费者
for i := 0; i < 10; i++ {
go consumer(ch, cond)
}
// 阻塞主程序退出
for {}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
可能的结果:
Produce: 836
Produce: 582
Produce: 280
Produce: 815
Produce: 408
Consume: 836
Produce: 284
Consume: 582
Produce: 877
Consume: 280
Produce: 125
Consume: 815
Produce: 763
Consume: 408
Consume: 284
Consume: 877
Consume: 125
Consume: 763
Produce: 808
Consume: 808
Produce: 652
Consume: 652
Produce: 937
Consume: 937
... ...
... ...
... ...
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
如果产量满了,只有消费了,才会再次生产。如果消费没了。只有再生产才能再次消费。从而满足生产与消费的供需平衡。