go专题-同步sync

2022/5/16 go

Go程序可以使用channel进行多个goroutine间的数据交换,但是这仅仅是数据同步中的一种方法。Go语言与其他语言如C、Java一样,也提供了同步机制,在某些轻量级的场合,原子访问(sync/atomic包),互斥锁(sync.Mutex)以及等待组(sync.WaitGroup)能最大程度满足需求。

利用通道优雅的实现了并发通信,但是其内部的实现依然使用了各种锁,因此优雅代码的代价是性能的损失。

Go语言包中的sync包提供了两种锁类型:sync.Mutexsync.RWMutex,前者是互斥锁,后者是读写锁。

🌙 1.互斥锁

互斥锁是传统并发程序进行共享资源访问控制的主要方式。

Go中使用结构体sync.Mutex表示互斥锁,保证同时只有一个goroutine可以访问共享资源。

建议:同一个互斥锁的成对锁定和解锁操作放在同一层次的代码块中。 使用锁的经典模式:

var lck sync.Mutex
func foo() {
    // 加锁
    lck.Lock()
    // 解锁
    defer lck.Unlock()
    // 处理任务...
}
1
2
3
4
5
6
7
8

lck.Lock()会阻塞直到获取锁,然后利用defer语句在函数返回时自动释放锁。

🌙 1.1示例一:普通数据加锁

import (
    "fmt"
    "sync"
    "time"
)

func testDemo1() {
    var mutex sync.Mutex
    var wg sync.WaitGroup
    
    num := 0
    
    for i:=0; i<10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            // 1.加锁
            mutex.Lock()
            // 2.并发任务
            num += 1
            // 3.解锁
            mutex.Unlock()
        }()
    }
    
    wg.Wait()
    fmt.Println("num=", num)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func testDemo2() {
	wg := sync.WaitGroup{}

	var mutex sync.Mutex
	fmt.Println("Locking G0")
	mutex.Lock()
	fmt.Println("Locked G0")

	for i:=1; i<4;i++ {
		wg.Add(1)
		go func(i int) {
			fmt.Printf("Locking G%d\n", i)
			mutex.Lock()
			fmt.Printf("Locked G%d\n", i)

			time.Sleep(time.Second * 2)
			mutex.Unlock()
			fmt.Printf("Unlocked G%d\n", i)
			wg.Done()
		}(i)
	}

	time.Sleep(time.Second * 5)
	fmt.Println("Ready unlock G0")
	mutex.Unlock()
	fmt.Println("Unlocked G0")

	wg.Wait()
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

结果:

Locking G0
Locked G0
Locking G3
Locking G1
Locking G2
Ready unlock G0
Unlocked G0
Locked G3
Unlocked G3
Locked G1
Unlocked G1
Locked G2
Unlocked G2
1
2
3
4
5
6
7
8
9
10
11
12
13

通过程序执行结果我们可以看到,当有锁释放时,才能进行lock动作,G0锁释放时,才有后续锁释放的可能,这里是G3抢到释放机会。

🌙 1.2 示例二:对象加锁

Mutex也可以作为struct的一部分,这样这个struct就会防止被多线程更改数据。

type Book struct {
	Name string
	LK   *sync.Mutex
}

func(b *Book) SetName(wg *sync.WaitGroup, name string) {
	defer func() {
		fmt.Println("Unlock set name:", name)
		b.LK.Unlock()
		wg.Done()
	}()

	b.LK.Lock()
	fmt.Println("Lock set name:", name)
	b.Name = name
}

func testDemo3() {
	bk := Book{
		LK: new(sync.Mutex),
	}
	wg := &sync.WaitGroup{}
	books := []string{"Java", "Golang", "C++"}

	for _, book:=range books {
		wg.Add(1)
		bk.SetName(wg, book)
	}

	wg.Wait()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

结果:

Lock set name: Java
Unlock set name: Java
Lock set name: Golang
Unlock set name: Golang
Lock set name: C++
Unlock set name: C++
1
2
3
4
5
6
type Account struct {
	Money int
	Lk *sync.Mutex
}

// 注意:指针接收器
func(a *Account) Query() {
	fmt.Println("Money balance: ", a.Money)
}

// 注意:指针接收器
func(a *Account) Add(wg *sync.WaitGroup, num int) {
	defer func() {
		fmt.Println("Unlock Add: ", num)
		a.Lk.Unlock()
		wg.Done()
	}()
	a.Lk.Lock()
	fmt.Println("Lock Add: ", num)
	a.Money += num
}


func testDemo4() {
	a := &Account{
		0,
		new(sync.Mutex),
		//&sync.Mutex{}, // 同上一行
	}

	wg := &sync.WaitGroup{}

	for i:=1; i<=10; i++ {
		wg.Add(1)
		go func(n int) {
			a.Add(wg, n)
		}(i)
	}
        // 等待执行
	wg.Wait()
        // 查询结果    
	a.Query()
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

结果:

Lock Add:  1
Unlock Add:  1
Lock Add:  10
Unlock Add:  10
Lock Add:  4
Unlock Add:  4
Lock Add:  5
Unlock Add:  5
Lock Add:  6
Unlock Add:  6
Lock Add:  2
Unlock Add:  2
Lock Add:  7
Unlock Add:  7
Lock Add:  8
Unlock Add:  8
Lock Add:  9
Unlock Add:  9
Lock Add:  3
Unlock Add:  3
Money balance:  55
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

🌙 2.读写锁

读写锁是分别针对读操作写操作进行锁定和解锁操作的互斥锁。在Go语言中,读写锁由结构体类型sync.RWMutex代表。

// 设定为写模式:与互斥锁使用方式一致,一路只写
func (*RWMutex) Lock()				// 锁定写
func (*RWMutex) Unlock()			// 解锁写

// 设定为读模式:对读执行加锁解锁,即多路只读
func (*RWMutex) RLock()
func (*RWMutex) RUnlock()
1
2
3
4
5
6
7
  • 写操作与读操作之间也是互斥的
  • 读写锁控制下的多个写操作之间是互斥的,即一路写
  • 多个读操作之间不存在互斥关系,即多路读
func testDemo5() {
	var rwm sync.RWMutex

	for i := 0; i < 3; i++ {
		go func(i int) {
			fmt.Println("Try Lock 【reading】 i:", i)
			rwm.RLock() // 一个读加锁
			fmt.Println("Ready Lock 【reading】 i:", i)
			time.Sleep(time.Second * 2)
			fmt.Println("Try Unlock 【reading】 i:", i)
			rwm.RUnlock() // 一个读解锁
			fmt.Println("Ready Unlock 【reading】 i:", i)
		}(i)
	}

	time.Sleep(time.Microsecond * 100)
	fmt.Println("Try Lock 【writing】")
	rwm.Lock() // 必须等读解锁才可以写加锁
	fmt.Println("Ready Locked 【writing】")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
  • MutexRWMutex都不关联goroutine,但RWMutex显然更适用于读多写少的场景。仅针对读的性能来说,RWMutex要高于Mutex,因为RWMutex的多个读可以并存。
  • 所有被读锁定的goroutine会在写解锁时唤醒
  • 读解锁只会在没有任何读锁定时,唤醒一个要进行写锁定而被阻塞的goroutine
  • 对未被锁定的读写锁进行写解锁或读解锁,都会引发运行时崩溃panic
  • 对同一个读写锁来说,读锁定可以有多个,所以需要进行等量的读解锁,才能让某一个写锁获得机会,否则该goroutine一直处于阻塞,但是sync.RWMutext没有提供获取读锁数量方法,这里需要使用defer避免,如下案例所示。

🌙 3.死锁

常见会出现死锁的场景:

  • 两个协程互相要求对方先操作,如:AB相互要求对方先发红包,然后自己再发
  • 读写双方相互要求对方先执行,自己后执行

模拟死锁:

func testDemo6() {

	var rwm sync.RWMutex
	ch := make(chan int)
	
	go func() {
		rwm.RLock()				// 加读锁
		 x := <- ch				// 如果不写入,则无法读取
		 fmt.Println("读取到的x:", x)
		rwm.RUnlock()
	}()

	go func() {
		rwm.Lock()			// 加入写锁
		ch <- 10			// 管道无缓存,没有读取,则无法写入
		fmt.Println("写入:", 10)
		rwm.Unlock()
	}()

	time.Sleep(time.Second * 5)

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

将上述死锁案例中的锁部分代码去除,则两个协程正常执行。

🌙 4.等待组

sync.WaitGroup类型是并发安全的,该类型结构体中有一个计数器,计数器的值可以通过方法调用实现计数器的增加或减少。

  • func(wg *WaitGroup) Add(delta int): 等待组计数器+delta, 也可以传入负数使计数器减少,但是计数器值不能为负值,否则引发panic
  • func(wg *WaitGroup) Done(): 等待组计数器-1, 等同于Add(-1)
  • func(wg *WaitGroup) Wait(): 等待组计数器!=0时阻塞,直到为0

:有了等待组,我们就不需要再在函数中使用time.Sleep()方法来模拟等待协程运行结束了。

等待组与互斥锁配合解决钱数问题:

func testDemo7() {
	var mt sync.Mutex
	var wg sync.WaitGroup
	money := 10000

	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(i int) {
			mt.Lock()
			fmt.Printf("goroutine %d lock\n", i)

			for j := 0; j < 100; j++ {
				money += 10
			}

			fmt.Printf("groutine %d unlock\n", i)
			mt.Unlock()
			wg.Done()
		}(i)
	}

	wg.Wait()
	fmt.Println("money =", money) // 20000
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

可能的结果:

goroutine 0 lock
groutine 0 unlock
goroutine 9 lock
groutine 9 unlock
goroutine 4 lock
groutine 4 unlock
goroutine 5 lock
groutine 5 unlock
goroutine 6 lock
groutine 6 unlock
goroutine 7 lock
groutine 7 unlock
goroutine 8 lock
groutine 8 unlock
goroutine 1 lock
groutine 1 unlock
goroutine 2 lock
groutine 2 unlock
goroutine 3 lock
groutine 3 unlock
money = 20000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

🌙 5.条件变量

sync.Cond类型即是Go中的条件变量,该类型内部包含一个锁接口。条件变量通常与锁配合使用:

创建条件变量的函数:

func NewCond(l locker) *Cond        // 条件变量必须传入一个锁,二者需要配合使用
1

*sync.Cond类型有三个方法:

+Wait: 该方法会阻塞等待条件变量满足条件。也会对锁进行解锁,一旦收到通知则唤醒,并立即锁定该锁

  • Signal: 发送通知(单发),给一个正在等待在该条件变量上的协程发送通知
  • Broadcast: 发送通知(广播),给正在等待该条件变量的所有协程发送通知

使用条件变量优化生产消费模型(支持多个生产者、多个消费者):

func producer(ch chan<- int, cond *sync.Cond, BUFLEN int) {
	for {
		// 给条件变量对应的互斥锁加锁
		cond.L.Lock()
		// 缓冲区满,则等待消费者消费,这里不能是if
		for len(ch) == BUFLEN {
			cond.Wait()
		}
		// 写入缓冲区一个随机数
		n := rand.Intn(1000)
		ch <- n
		fmt.Println("Produce:", n)
		// 生产结束,解锁互斥锁
		cond.L.Unlock()
		// 一旦生产后,就唤醒其他被阻塞的消费者
		cond.Signal()
		time.Sleep(time.Second * 2)
	}
}

func consumer(ch <-chan int, cond *sync.Cond) {
	for {
		// 全局条件变量加锁
		cond.L.Lock()
		// 缓冲区为空,则等待生产者生产,这里不能是if
		for len(ch) == 0 {
			cond.Wait()
		}
		fmt.Println("Consume:", <-ch)
		cond.L.Unlock()
		// 一旦消费后,就唤醒其他被阻塞的生产者
		cond.Signal()
		time.Sleep(time.Second * 1)
	}
}

func testDemo8() {
	// 定义缓冲区大小
	const BUFLEN = 5
	// 定义cond
	var cond = sync.NewCond(&sync.Mutex{})
	// 设置随机数种子
	rand.Seed(time.Now().UnixNano())
	//生产消费模型中的通道
	ch := make(chan int, BUFLEN)
	// 启动10个生产者
	for i := 0; i < 10; i++ {
		go producer(ch, cond, BUFLEN)
	}
	// 启动10个消费者
	for i := 0; i < 10; i++ {
		go consumer(ch, cond)
	}
	// 阻塞主程序退出
	for {}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

可能的结果:

Produce: 836
Produce: 582
Produce: 280
Produce: 815
Produce: 408
Consume: 836
Produce: 284
Consume: 582
Produce: 877
Consume: 280
Produce: 125
Consume: 815
Produce: 763
Consume: 408
Consume: 284
Consume: 877
Consume: 125
Consume: 763
Produce: 808
Consume: 808
Produce: 652
Consume: 652
Produce: 937
Consume: 937
... ...
... ...
... ...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

如果产量满了,只有消费了,才会再次生产。如果消费没了。只有再生产才能再次消费。从而满足生产与消费的供需平衡。