【后端开发】Golang协程与Channel

Last updated on February 22, 2024 am

Golang进阶

这一部分主要介绍golang中的并发,并发是 golang 的优势之一,使用关键字 go 可以很方便的开启一个协程. go 语言中,常常用 gochanselectsync 库完成并发操作,处理同步异步阻塞非阻塞任务

go 语言的并发编程,以下是需要了解的基础知识点

  • 阻塞: 阻塞是进程(也可以是线程、协程)的状态之一(新建、就绪、运行、阻塞、终止). 指的是当数据未准备就绪,这个进程(线程、协程)一直等待,这就是阻塞.
  • 非阻塞: 当数据为准备就绪,该进程(线程、协程)不等待可以继续执行,这就是非阻塞.
  • 同步: 在发起一个调用时,在没有得到结果之前,这个调用就不返回,这个调用过程一直在等待. 这是同步.
  • 异步: 在发起调用后,就立刻返回了,这次调用过程就结束了. 等到有结果了被调用方主动通知调用者结果. 这是异步
  • goroutine: 通过关键字 go 即可创建一个协程.
  • chan : golang 中用于并发的通道,用于协程的通信.

Goroutine协程

协程并发,coroutine,也叫轻量级线程。

与传统的系统级线程和进程相比,协程最大的优势在于“轻量级”。可以轻松创建上万个而不会导致系统资源衰竭。

一个线程中可以有任意多个协程,但某一时刻只能有一个协程在运行,多个协程分享该线程分配到的计算机资源

传统的线程调度方式

存在切换成本,存储当前的线程的状态

CPU切换造成的时间消耗

在协程中,调用一个任务就像调用一个函数一样,消耗的系统资源最少!但能达到进程、线程并发相同的效果。

在一次并发任务中,进程、线程、协程均可以实现。从系统资源消耗的角度出发来看,进程相当多,线程次之,协程最少

Goroutine基本模型

线程数量越多会导致切换成本越大越容易造成浪费

CPU管理协程的具体细节

早期调度器的方式

老式调度器的缺点:

  • 创建、销毁、调度G都需要每个M获取锁,这就形成了激烈的锁竞争
  • M转移G会造成延迟和额外的系统负载
  • 系统调用(CPU在M之间的切换) 导致频繁的线程阻塞和取消阻塞操作增加了系统开销

调度器设计的准则

Goroutine的定义

goroutine是Go语言并行设计的核心,有人称之为go程。 Goroutine从量级上看很像协程,它比线程更小,十几个goroutine可能体现在底层就是五六个线程,Go语言内部帮你实现了这些goroutine之间的内存共享。执行goroutine只需极少的栈内存(大概是4~5KB),当然会根据相应的数据伸缩。也正因为如此,可同时运行成千上万个并发任务。goroutine比thread更易用、更高效、更轻便。

一般情况下,一个普通计算机跑几十个线程就有点负载过大了,但是同样的机器却可以轻松地让成百上千个goroutine进行资源竞争。

创建Goroutine

只需在函数调⽤语句前添加 go 关键字,就可创建并发执⾏单元。在并发编程中,我们通常想将一个过程切分成几块,然后让每个goroutine各自负责一块工作,当一个程序启动时,主函数在一个单独的goroutine中运行,我们叫它main goroutine。

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"fmt"
"time"
)

func newTask() {
i := 0
for {
i++
fmt.Printf("new goroutine: i = %d\n", i)
time.Sleep(1*time.Second) //延时1s
}
}

func main() {
//创建一个 goroutine,启动另外一个任务
go newTask()
i := 0
//main goroutine 循环打印
for {
i++
fmt.Printf("main goroutine: i = %d\n", i)
time.Sleep(1 * time.Second) //延时1s
}
}

运行结果

1
2
3
4
5
6
7
main Goroutine: i=1
new Goroutine: i =2
new Goroutine: i =3
main Goroutine: i=2
main Goroutine: i=3
new Goroutine: i =4
new Goroutine: i =5

Goroutine特性

主goroutine退出后,其它的工作goroutine也会自动退出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"fmt"
"time"
)

func newTask() {
i := 0
for {
i++
fmt.Printf("new goroutine: i = %d\n", i)
time.Sleep(1 * time.Second) //延时1s
}
}

func main() {
//创建一个 goroutine,启动另外一个任务
go newTask()

fmt.Println("main goroutine exit")
}

主进程结束那么其余的goroutine结束

Goexit函数

调用 runtime.Goexit() 将立即终止当前 goroutine 执⾏,调度器确保所有已注册 defer 延迟调用被执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
"fmt"
"runtime"
)

func main() {
go func() {
defer fmt.Println("A.defer")

func() {
defer fmt.Println("B.defer")
runtime.Goexit() // 终止当前 goroutine, import "runtime"
fmt.Println("B") // 不会执行
}()

fmt.Println("A") // 不会执行
}() //不要忘记()

//死循环,目的不让主goroutine结束
for {
}
}

image-20231219214022511

对于goroutine的形式没办法拿到协程返回的值 ,从而阻塞了两个协程之间的通信,因此有必要研究如何让两个协程进行通信的机制因此这个地方引入了

Channel管道

channel是Go语言中的一个核心类型,可以把它看成管道。并发核心单元通过它就可以发送或者接收数据进行通讯

go语言之间的goroutine通信机制

channel是一个数据类型,主要用来解决go程的同步问题以及go程之间数据共享(数据传递)的问题。引⽤类型 channel可用于多个 goroutine 通讯。其内部实现了同步,确保并发安全

基于channel的同步并发安全

这个channel在这个位置就已经隐含表示了先后执行的关系

  • 如果main.go在运行的时候如果找不到 channel返回的值,那么就会channel让main.go产生阻塞,
  • 同理对于另一边的sub.go也是一样,如果没有另一个goroutine来接受发出的channel数据,那么就会一直处于等待接受的状态产生阻塞,
  • 相当于首先得握上手才能继续执行进程

主goroutine发生阻塞

子进程发生阻塞

定义channel基本语法

定义一个channel时,也需要定义发送到channel的值的类型。channel可以使用内置的make()函数来创建:

chan是创建channel所需使用的关键字。Type 代表指定channel收发数据的类型。

1
2
make(chan Type)  //等价于make(chan Type, 0)
make(chan Type, capacity)

当 参数capacity= 0 时,channel 是无缓冲阻塞读写的;当capacity > 0 时,channel 有缓冲非阻塞的,直到写满 capacity个元素才阻塞写入。

channel非常像生活中的管道,一边可以存放东西,另一边可以取出东西。channel通过操作符 <- 来接收和发送数据,发送和接收数据语法:

1
2
3
4
channel <- value      //发送value到channel
<-channel //接收并将其丢弃
x := <-channel //从channel中接收数据,并赋值给x
x, ok := <-channel //功能同上,同时检查通道是否已关闭或者是否为空

默认情况下,channel接收和发送数据都是阻塞的,除非另一端已经准备好,这样就使得goroutine同步变的更加的简单,而不需要显式的lock。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"fmt"
)

func main() {
c := make(chan int)

go func() {
defer fmt.Println("子go程结束")

fmt.Println("子go程正在运行……")

c <- 666 //666发送到c
}()

num := <-c //从c中接收数据,并赋值给num

fmt.Println("num = ", num)
fmt.Println("main go程结束")
}

运行结果:

1
2
3
4
goroutine 正在运行...
goroutine结束
num = 666
main goroutine 结束...

分析:

  • 首先第一步会需要子进程将666放进channel中才不会发生阻塞,同时主进程会在 num := <-c的位置进行等待channel中传入数据
  • 当channel中出现了数据之后,那么下一步就是从中选择数据并进行赋值
  • 最后结束

channel的无缓冲阻塞

无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何数据值的通道。

这种类型的通道要求发送goroutine和接收goroutine同时准备好,才能完成发送和接收操作。否则,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。

阻塞:由于某种原因数据没有到达,当前go程(线程)持续处于等待状态,直到条件满足,才解除阻塞。

同步:在两个或多个go程(线程)间,保持数据内容一致性的机制。

无缓冲的channel同步

如果两个goroutine存在利用channel通信的情况,两个goroutine之间存在相互的消息传递,那么在无缓冲的情况下,存在隐含的先后同步关系,必须接收方到达了channel之后消息发送方才会持续发送消息并往下运行

  • 在第 1 步,两个 goroutine 都到达通道,但哪个都没有开始执行发送或者接收。
  • 在第 2 步,左侧的 goroutine 将它的手伸进了通道,这模拟了向通道发送数据的行为。这时,这个 goroutine 会在通道中被锁住,直到交换完成。
  • 在第 3 步,右侧的 goroutine 将它的手放入通道,这模拟了从通道里接收数据。这个 goroutine 一样也会在通道中被锁住,直到交换完成。
  • 在第 4 步和第 5 步,进行交换,并最终,在第 6 步,两个 goroutine 都将它们的手从通道里拿出来,这模拟了被锁住的 goroutine 得到释放。两个 goroutine 现在都可以去做其他事情了。

无缓冲的channel创建格式:

1
make(chan Type)   //等价于make(chan Type, 0)

如果没有指定缓冲区容量,那么该通道就是同步的,因此会阻塞到发送者准备好发送和接收者准备好接收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"time"
)

func main() {
c := make(chan int, 0) //创建无缓冲的通道 c

//内置函数 len 返回未被读取的缓冲元素数量,cap 返回缓冲区大小
fmt.Printf("len(c)=%d, cap(c)=%d\n", len(c), cap(c))

go func() {
defer fmt.Println("子go程结束")

for i := 0; i < 3; i++ {
c <- i
fmt.Printf("子go程正在运行[%d]: len(c)=%d, cap(c)=%d\n", i, len(c), cap(c))
}
}()

time.Sleep(2 * time.Second) //延时2s

for i := 0; i < 3; i++ {
num := <-c //从c中接收数据,并赋值给num
fmt.Println("num = ", num)
}

fmt.Println("main进程结束")
}

那么运行的结果是:

无缓冲channel的运行结果

分析:

  • 本质上会出现阻塞,如果没有及时把数据拿出
  • 因此必须是其中一个放进去数据,另一个拿出数据才能能让进程执行下去,本质上保证了同步的进行

channel的有缓冲阻塞

本质上是生产者消费者模型 消息队列模型

这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动作的条件也不同。

只有通道中没有要接收的值时,接收动作才会阻塞。只有通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。

有缓冲的通道在channe之间同步数据
  • 在第 1 步,右侧的 goroutine 正在从通道接收一个值。
  • 在第 2 步,右侧的这个 goroutine独立完成了接收值的动作,而左侧的 goroutine 正在发送一个新值到通道里。
  • 在第 3 步,左侧的goroutine 还在向通道发送新值,而右侧的 goroutine 正在从通道接收另外一个值。这个步骤里的两个操作既不是同步的,也不会互相阻塞
  • 最后,在第 4 步,所有的发送和接收都完成,而通道里还有几个值,也有一些空间可以存更多的值。

有缓冲的channel创建格式:

1
make(chan Type, capacity)

如果给定了一个缓冲区容量,通道就是异步的。只要缓冲区有未使用空间用于发送数据,或还包含可以接收的数据,那么其通信就会无阻塞地进行。

函数 len(ch)求取缓冲区中剩余元素个数, cap(ch) 求取缓冲区元素容量大小

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"fmt"
"time"
)

func main(){
c := make(chan int ,3) //带有缓冲的channel

//查看当前的channel中的缓冲的容量大小
//len表示的是元素数量 cap表示的是缓冲的容量大小
fmt.Println("len(c) = ", len(c), ", cap(c)", cap(c))

//定义一个子goroutine
go func(){
defer fmt.Println("sub goroutine结束")

// for i := 0; i<3; i++{
for i := 0; i<4; i++{// 这种情况就会出现阻塞,因为超过了缓冲的容量
c <- i
fmt.Println("sub goroutine 正在运行: len(c) = ", len(c), ", cap(c) = ", cap(c)," 发送的元素是= ",i)
}
}()

time.Sleep(1*time.Second)

//有缓存的话就不会出现阻塞的情况

for i := 0;i < 3;i++{
num := <-c //从c中接受数据并赋值给num
fmt.Println("num = ", num)
}
fmt.Println("main 结束")
}

运行结果:

有缓冲通道的运行结果

关闭Channel

没有更多的值需要发送到channel的话,那么让接收者也能及时知道没有多余的值可接收将是有用的,因为接收者可以停止不必要的接收等待。这可以通过内置的close函数来关闭channel实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
)

func main(){
c := make(chan int) //channel要通过make来触发,如果是nil channel 会发生阻塞

go func(){
defer fmt.Println("sub finished ...")
for i := 0;i<5;i++{
c <- i
// close(c)
//如果向已经关闭的channel发送数据就会出现panic的错误
}
// close 关闭一个channel
close(c)
//如果去掉关闭channel这个开关,那么会出现死锁的情况,就是main的进程数据都在等待塞数据
}()

for{
//如果ok为ture 那么channel没有关闭, 如果为false 那么已经被关闭
if data, ok := <-c; ok{
fmt.Println(data)
}else{
break
}
}

fmt.Println("Main finished ...")
}

输出的结果:

1
2
3
4
5
6
7
8
/* 输出的值go run channel4.go
0
1
2
3
4
Main finished ...
sub finished ... */

注意:

  • channel不像文件一样需要经常去关闭,只有当你确实没有任何发送数据了,或者你想显式的结束range循环之类的,才去关闭channel;
  • 如果向已经关闭的channel发送数据就会出现panic的错误
  • 关闭channel后,可以继续从channel接收数据;
  • 如果ok为ture 那么channel没有关闭, 如果为false 那么已经被关闭
  • 对于nil channel,无论收发都会被阻塞。

Range操作channel

可以使用 range 来迭代不断操作channel:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
"fmt"
)

func main() {
c := make(chan int)

go func() {
for i := 0; i < 5; i++ {
c <- i
}
close(c)
}()

for data := range c {
fmt.Println(data)
}
fmt.Println("Finished")
}

注意:

  • 利用range进行简写

  • 利用range不断迭代从channel中操作数据

单向Channel

默认情况下,通道channel是双向的,也就是,既可以往里面发送数据也可以同里面接收数据。

但是,我们经常见一个通道作为参数进行传递而只希望对方是单向使用的,要么只让它发送数据,要么只让它接收数据,这时候我们可以指定通道的方向。

单向channel的定义:

1
2
3
var ch1 chan int       // ch1是一个正常的channel,是双向的
var ch2 chan<- float64 // ch2是单向channel,只用于写float64数据
var ch3 <-chan int // ch3是单向channel,只用于读int数据

可以将 channel 隐式转换为单向队列,只收或只发,不能将单向 channel 转换为普通 channel:

1
2
3
4
5
6
7
8
9
10
11
c := make(chan int, 3)
var send chan<- int = c // send-only
var recv <-chan int = c // receive-only
send <- 1
//<-send //invalid operation: <-send (receive from send-only type chan<- int)
<-recv
//recv <- 2 //invalid operation: recv <- 2 (send to receive-only type <-chan int)

//不能将单向 channel 转换为普通 channel
d1 := (chan int)(send) //cannot convert send (type chan<- int) to type chan int
d2 := (chan int)(recv) //cannot convert recv (type <-chan int) to type chan int

单向channel的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//   chan<- //只写
func counter(out chan<- int) {
defer close(out)
for i := 0; i < 5; i++ {
out <- i //如果对方不读 会阻塞
}
}

// <-chan //只读
func printer(in <-chan int) {
for num := range in {
fmt.Println(num)
}
}

func main() {
c := make(chan int) // chan //读写

go counter(c) //生产者
printer(c) //消费者

fmt.Println("done")
}

Channel的Select

Select基本定义

Go里面提供了一个关键字select,通过select可以监听channel上的数据流动。每个case表示监控不同的条件. 通过select来进行控制。

select有比较多的限制,其中最大的一条限制就是每个case语句里必须是一个IO操作,大致的结构如下:

1
2
3
4
5
6
7
8
select {
case <- chan1:
// 如果chan1成功读到数据,则进行该case处理语句
case chan2 <- 1:
// 如果成功向chan2写入数据,则进行该case处理语句
default:
// 如果上面都没有成功,则进入default处理流程
}

在一个select语句中,Go语言会按顺序从头至尾评估每一个发送和接收的语句。如果其中的任意一语句可以继续执行(即没有被阻塞),那么就从那些可以执行的语句中任意选择一条来使用。

如果没有任意一条语句可以执行(即所有的通道都被阻塞),那么有两种可能的情况:

  1. 如果给出了default语句,那么就会执行default语句,同时程序的执行会从select语句后的语句中恢复
  2. 如果没有default语句,那么select语句将被阻塞,直到至少有一个通信可以进行下去

下面是示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
)

func fibonacci(c, quit chan int) {
x, y := 1, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}

func main() {
c := make(chan int)
quit := make(chan int)

go func() {
for i := 0; i < 6; i++ {
fmt.Println(<-c)
}
quit <- 0
}()

fibonacci(c, quit)
}

运行的结果:

执行的结果

交替打印数字字母

需求:反复打印数字1字母a->1a2b3c4d5e...依次类推

思路:选择两个goroutine来实现这部分的内容,产生阻塞的条件是一个进程需要取数,一个goroutine需要存数来进行阻塞

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {
c := make(chan int)

go func() {
for i := 1; i <= 26; i++ {
<-c
fmt.Println(string(96 + i))
c <- 0
}
}()

for i := 1; i <= 26; i++ {
fmt.Println(i)
c <- 0
<-c
}
}

打印的结果是:

数字字母交替打印


【后端开发】Golang协程与Channel
https://lihaibineric.github.io/2023/12/19/develop_goroutine/
Author
Haibin Li
Posted on
December 19, 2023
Updated on
February 22, 2024
Licensed under