go专题-通道channel

2022/4/25 go

🌙 1.为什么需要channel

go程序中如果开启了多个协程(goroutine),那么协程直接如何进行通信呢?

go提供了通道channel数据类型,用来解决协程之间的通信问题。channel的本质是一个队列,遵循先进先出(FIFO),内部实现了同步,确保了并发安全

🌙 2.channel的创建

channel在创建时,可以设置一个可选参数:缓冲区容量(int)

  • 有缓冲区
var c1 = make(chan int, 10)
1
  • 无缓冲区
var c2 = make(chan int)

// 存储多种数据类型
ci := make(chan int)
cs := make(chan string)
cf := make(chan interface{})
1
2
3
4
5
6

上面都是双向通道,既可以存入,也可以读取。

从管道中读取,或者向管道写入数据,使用运算符:<-,他在channel的左边则是读取,右边则代表写入:

  • 只写通道
// <- 在chan右边: 写入
var r = make(chan<- int)
1
2
  • 只读通道
// <- 在chan左边:读取
var w = make(<-chan int)
1
2

🌙 3.无缓冲和有缓冲通道

🌙 3.1 无缓冲通道

无缓冲通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。

这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。

package main

import (
    "fmt"
    "time"
)
// 写入
func write(ch chan int) {
    ch <- 100
    fmt.Printf("ch addr %v\n", ch) // ch addr 0xc0000120c0
    ch <- 200
    fmt.Printf("ch addr %v\n", ch) // ch addr 0xc0000120c0
    ch <- 300 // 只读取了前两个数据,后续操作直接阻塞
    fmt.Printf("ch addr %v\n", ch) // 不会执行
}
// 读取
func read(ch chan int) {
    fmt.Printf("取出数据1: %v\n", <-ch) // 取出数据1:100
    fmt.Printf("取出数据2: %v\n", <-ch) // 取出数据2:200
}

func main() {
    // 无缓冲通道
    var ch chan int
    ch = make(chan int)
    
    // 向协程中写入数据
    go write(ch)
    // 向协程中读取数据
    go read(ch)
    
    // 等待协程执行完成,防止主go程序提前退出导致协程未完成
    time.Sleep(time.Second * 3)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

注意无缓冲通道的收发操作必须在两个不同的goroutine间进行,因为通道的数据在没有接收方处理时,数据发送方会持续阻塞,所以通道的接收必定在另外一个goroutine中进行。

如果不按照上述规则使用,会引发fatal error: all goroutines are asleep - deadlock!错误, 比如:

func main() {
	ch := make(chan int)
	ch <- 10
	<-ch
}
1
2
3
4
5

🌙 3.2有缓冲通道

有缓冲通道(buffered channel)是一种在被接收前能存储一个或者多个值的通道。

这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动作的条件也会不同。只有在通道中没有要接收的值时,接收动作才会阻塞只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。

package main

import {
    "fmt"
    "time"
}

func write(ch chan int) {
    ch <- 100
    fmt.Printf("ch addr %v\n", ch) // ch addr 0xc000016150
    ch <- 200
     fmt.Printf("ch addr %v\n", ch) // ch addr 0xc000016150
    ch <- 300 // 写入第三个,造成阻塞
     fmt.Printf("ch addr %v\n", ch) // 没有输出
}

func main() {
    // 带缓冲:声明可以写入2个数据的通道
    ch := make(chan int, 2)
    
    // 向协程中写入数据
    go write(ch)
    
    time.Sleep(time.Second * 3)
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

注意:有缓冲的通道,在写入数据后可以读取,不用在两个goroutine中进行。但是,当数据全部读取完毕后,再次读取也会造成阻塞并报错fatal error: all goroutines are asleep - deadlock!,如:

func main() {
    // 定义可以写入一个数据的缓冲通道
    ch := make(chan int, 1)
    // 写入数据
    ch <- 10
     // 取出数据
    <-ch
    // 再次取出,没有数据了会阻塞
    // 报错:fatal error: all goroutines are asleep - deadlock!
    // <-ch
}
1
2
3
4
5
6
7
8
9
10
11

🌙 3.3两者对比

区别 无缓冲通道 有缓冲通道
容量 cap(ch) 0 >0
通道个数len(ch) 0 >=0 (写入的数据个数)
同异步 读、写两端具备并发同步的能力 缓冲区具备数据存储的能力,到达存储上限后才会阻塞,相当于具备了异步的能力
阻塞时机 当通道中的有数据未取出再次写入时会阻塞 1.当缓冲通道被填满时,尝试再次发送数据会发生阻塞 2.当缓冲通道为空时,尝试接收数据会发生阻塞
func main() {
    	ch1 := make(chan int)
	fmt.Println("no buffer", len(ch1), cap(ch1)) // 0 0
	ch2 := make(chan int, 2)
	fmt.Println("buffer", len(ch2), cap(ch2)) // 0 2
	ch2 <- 1
	fmt.Println("buffer ch<-1", len(ch2), cap(ch2)) // 1 2
	ch2 <- 2
	fmt.Println("buffer ch<-2", len(ch2), cap(ch2)) // 2 2
}
1
2
3
4
5
6
7
8
9
10

🌙 4.操作channe

🌙 4.1遍历通道

channel只支持 for-range的方式遍历

func testRangeChannel() {
	ch := make(chan int)

	go func() {
		for i:=0; i <=3 ;i++ {
			ch <-i
			time.Sleep(time.Second)
		}
	}()

	for item := range ch {
		fmt.Println("item:", item)
                 // 遍历完,break防止 fatal error: all goroutines are asleep - deadlock!
		if item == 3 {
			break
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

🌙 4.2关闭通道

channel是一个引用对象,支持GC回收,但是也可以主动被关闭。

func closeChannel() {
    ch := make(chan int)
    close(ch) // 关闭通道
    ch <- 1 // 报错:panic: send on closed channel
}
1
2
3
4
5

判断channel是否已经关闭:

func testClose1() {

	ch := make(chan int, 10)

	go func(ch chan int) {
		for i := 0; i < 10; i++ {
			ch <- i
		}
		close(ch)
	}(ch)

	for {
		if num, ok := <-ch; ok == true {
			fmt.Println("读到数据:", num)
		} else {
			fmt.Println("通道关闭")
			break
		}
	}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  • _, ok := <-ch 的第二个返回值 ok 表示 channel 是否已经关闭。如果已关闭,则返回 false
  • for range 语法会自动判断 channel 是否结束,如果结束则自动退出 for 循环:
func testClose2() {

	ch := make(chan int, 10)

	go func(ch chan int) {
		for i := 0; i < 10; i++ {
			ch <- i
		}
		close(ch)
	}(ch)

	for n := range ch {
		fmt.Println("读到数据:", n)
	}
        // 通道关闭,自动退出循环
	fmt.Println("通道关闭")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

🌙 4.3读写通道

一般,通道默认是双向的,但是可以定义channel为只读或只写:

var ch1 chan int // 双向通道
var ch2 chan<- int // 只写通道
var ch3 <-chan int  // 只读通道
1
2
3

单向通道不可以转为双向channel, 但是双向channel可以隐式转为任意类型的单向通道:

var ch = make(chan int) // 双向
var ch1 = <-chan int(ch) // 转为只读
var ch2 = chan<- int(ch) // 转为只写
1
2
3

🌙 5.channel的应用

🌙 5.1限制并发

func testDemo() {
	ch := make(chan string, 5)
	// 开启100个协程
	for i:=0; i<100; i++ {
		go func(j int) {
			s := fmt.Sprintf("task %d", j)
			ch <- s
			fmt.Println("test Time-consuming task", j)
                        // 模拟耗时操作
			time.Sleep(time.Second)
			<-ch
		}(i)
	}

	for {
		time.Sleep(time.Second * 5)
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

🌙 5.2生产者消费者模型

func produce(ch chan<- int) {
	i := 1
	for {
		ch <- i
		fmt.Println("Send:", i)
		i++
		time.Sleep(time.Second * time.Duration(i))
	}
}

func consume(ch <-chan int) {
	for {
		v := <-ch
		fmt.Println("Received:", v)
		time.Sleep(time.Second * 2)
	}
}

func test() {
	ch := make(chan int, 5)
	go produce(ch)
	go consume(ch)

        // 等待协程执行1min
	time.Sleep(time.Minute)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

当然,该模型也可以使用无缓冲模型,区别如下:

  • 无缓冲生产消费模型:同步通信
  • 有缓冲生产消费模型:异步通信