Go语言入门22:并发
Table of Contents
Go 语言入门基础学习笔记之 Go 语言的并发
并发
Go 语言支持并发,通过 goroutines 和 channels 提供了一种简洁且高效的方式来实现并发。
Go 语言的并发性能非常优异,它使用协程而不是线程来并发执行任务,协程比线程占用的内存空间更小,协程的创建、销毁、切换的成本也更低。
进程线程协程
在早期的单进程时代有两个问题:
- 单一执行流程、计算机只能一个任务一个任务处理。
- 进程阻塞所带来的 CPU 浪费时间。
那么能不能宏观的执行多个任务?这就出现了多进程/多线程操作系统。使用轮询策略,执行 A 后又执行 B,这时需要判断进程是否执行完成,因此也会有时间片的概念。这事实上就是并发执行的效果,同一个时刻还是只有一个进程在执行,并不是并行!
多进程/多线程解决了阻塞的问题,但是又面临了一个新的问题:不停的切换进程所导致的上下文切换的成本增加。进程/线程的数量越多,切换成本就越大,也就越浪费。
与此同时,多线程随着同步竞争(如锁、竞争资源冲突等)开发设计变得越来越复杂。同时高内存占用也是多线程/多进程开发的壁垒。
所以,工程师又会想:能不能将一个线程一分为二,即用户线程和内核线程,CPU 只能看到内核线程,而用户线程又和内核线程进行绑定,这样就巧妙地解决了 CPU 不断切换进程所带来地消耗。后来,用户线程又叫做协程 co-routine,内核进程叫做线程 thread。
在实际操作中,通过协程调度器来调度每个程序的切换调度,在线程层面则没有切换,整个对于 CPU 来说就是透明的。
这种 1:N 的方式也存在一定的弊端,想象一下当一个协程阻塞时,会耽误下一个程序的执行。如果是 1:1,则又回到了之前的局面,协程的创建、删除和切换的代价都由 CPUI 完成,代价有些昂贵。如果是 N:M 的形式,其决定开销的核心就在于协程调度器了,因为此时有多个线程和多个协程,怎么切换和调度它们对于程序执行的效率有非常大的影响。
Golang 对协程的处理和改进。它把协程 co-routine 改为 Goroutine,将内存改小为几 KB 并增强灵活调度,这样就使得 Goroutine 可以大量使用并切换协程。
在 Go 语言早期的调度器处理中,G 为协程 goroutine、M 为线程 thread。Go 将协程创建成一个队列,要执行一个协程时,先获取一个锁给队列,然后取出一个协程给线程执行,执行完成后再放回队列的队尾。
这种调度器十分简单,但是也有很多弊端:
- 创建、销毁、调度 G 都需要每个 M 获取锁,这就形成了激烈的锁竞争。
- M 转移 G 会造成延迟和额外的系统负载。
- 系统调用( CPU 在 M 之间的切换)导致频繁的线程阻塞和取消阻塞操作增加了系统开
Go 语言优化后的调度器 GMP,其中 G 为协程 goroutine、P 为处理器 processor、M 为线程 thread。
对于 Go 语言调度器的设计策略以下几个方面:
- 复用线程:work stealing 机制、hand off 机制
- 利用并行:GOMAXPROCS 限定 P 的个数(比如=CPU/2)
- 抢占:以前的协程不允许抢占,现在允许抢占
- 全局 G 队列:word stealing 的升级
所谓 work stealing 机制,就是当一个 G 队列为空时,为了不浪费资源,此调度器会从旁边的队列中“偷取”一个程序进行执行。
所有 hand off 机制,就是当一个协程阻塞时,会唤醒一个新的线程,将整个连同 P 处理器一同迁移到新的线程上执行,并且 CPU 也相应分配,而原先阻塞的协程依旧独占原来的那个线程等待完成。
所谓全局 G 队列,就是 work stealing 机制的升级,当一个线程所对应的队列没有协程时,它会去旁边偷取一个协程,但是如果旁边的队列也没有,那么就是去全局队列中取一个协程执行。但是全局队列需要锁机制,因此优先级和效率较低。
goroutine
goroutine 是轻量级线程,goroutine 的调度是由 Golang 运行时进行管理的。
goroutine 语法格式:
go 函数名( 参数列表 )
比如:
go f(x, y, z)
// 或者是匿名函数
go func(arg string){
// do something
}("abc")
Go 允许使用 go
语句开启一个新的运行期线程,即 goroutine,以一个不同的、新创建的 goroutine 来执行一个函数。
同一个程序中的所有 goroutine 共享同一个地址空间。当多个 goroutine 访问共享资源时,确保使用互斥锁(sync.Mutex
)或其他同步机制来防止数据竞争。
注意,goroutine 的执行不会阻塞主进程的执行,如果主进程先结束,则整个程序退出,此时 goroutine 也许还没执行完。因此需要有操作来等待所有 goroutine 完成,这可以使用 WaitGroup 或 Channel。
package main
import (
"fmt"
"time"
)
func say(s string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}
func main() {
go say("world")
say("hello")
}
// 可能的输出结果
world
hello
world
hello
world
hello
world
hello
world
hello
say("hello")
函数并没有使用 go
关键字,因此它应该是以正常的顺序执行,而不是并行执行。然而,实际上程序的输出是并发的,这是因为 goroutine 的调度机制和主线程的执行顺序。
执行以上代码,你会看到输出的 hello 和 world 是没有固定先后顺序。因为它们是两个 goroutine 在执行。由于 time.Sleep(100 * time.Millisecond)
的存在,say("world")
和 say("hello")
的输出会交错。
执行流程:
- 主函数启动:当
main()
函数被调用时,首先执行go say("world")
。这行代码启动了一个新的 goroutine,在这个 goroutine 中,say("world")
开始执行。 - 主线程执行:紧接着,主线程执行
say("hello")
。由于say
函数包含一个循环和time.Sleep(100 * time.Millisecond)
,它会依次打印"hello"
,每次打印之间有 100 毫秒的延迟。 - 并发执行:因为
go say("world")
是在一个单独的 goroutine 中执行的,所以"world"
和"hello"
的输出会交错出现。具体输出的顺序取决于 Go 运行时如何调度这两个 goroutine。
Go 语言等等一个协程正常结束会比较麻烦,主进程提前结束会直接中断协程的执行,因此需要用到 Channel 通道阻塞等待协程完成。
package main
import (
"fmt"
"time"
)
func main (){
var result int
ch := make(chan bool)
go func() { // 启动子协程
time.Sleep(2 * time.Second)
// Go语言不能获得子协程的返回值,需要利用公共变量来传递结果
result = 5
ch <- true
}()
<-ch // 阻塞等待协程发生true完成的信号,才会解除阻塞
fmt.Println(result)
}
如果我们想直接中断某个 goroutine,可以使用
runtime.Goexit()
直接退出当前的 goroutine。
通道 Channel
通道定义
通道(channel)是用来传递数据的一个数据结构,也是一个引用数据类型。它引用的是一个环形队列,队列意为着先进先出。
通道可用于两个 goroutine 之间通过传递一个指定类型的值来同步运行和通讯。
使用 make
函数创建一个 channel,使用 <-
操作符发送和接收数据。如果未指定方向,则为双向通道。
ch <- v // 把 v 发送到通道 ch
<- ch // 接收并丢弃
v := <-ch // 从 ch 接收数据 并把值赋给 v
v, ok := <- ch // 同时检查通道是否已经关闭或者是否为空
ch <- v
如果这个队列满了,会一直阻塞直到元素被取走。 v := <-ch
如果这个队列里的元素被取空了,则会一直阻塞,直到有元素进来。
声明一个通道很简单,我们使用 chan
关键字即可,通道在使用前必须先创建:
ch := make(chan int)
通道使用
package main
import "fmt"
func main() {
// 定义一个channel
c := make(chan int)
go func() {
defer fmt.Println("goroutine 结束...")
fmt.Println("goroutine 正在运行...")
c <- 666 // 将666发送给c
}()
num := <-c // 从c中接收数据并赋值给num
fmt.Println("num = ", num)
fmt.Println("main goroutine 结束...")
}
// 输出结果
goroutine 正在运行...
goroutine 结束...
num = 666
main goroutine 结束...
按理说 goroutine 结束时间与 main 结束的时间是随机的,但是我们经过多次测试发现 main goroutine 结束...
这段话的打印总是在最后,这是为什么?这就归功于 channel 的阻塞功能,导致它能够同步两个进程之间的顺序。
当 main 函数已经到了接收 channel 信息的步骤,而信息还未被子进程传递时,它会阻塞等待,直到接受到传递的参数。相似的,如果子进程先到了传递信息的步骤,而主进程还没有到,因为这个管道是无缓冲的,因此它也会被阻塞,直到 main 来接收信息。因此 main 函数的结束一定是在最后的。这也就是为什么 channel 可以解决主进程先结束导致协程未完成的问题。
goroutine 的函数常常要配合 defer 关键字来标识协程的退出。
注意:默认情况下,通道是不带缓冲区的。发送端发送数据,同时必须有接收端相应的接收数据。
以下实例通过两个 goroutine 来计算数字之和,在 goroutine 完成计算后,它会计算两个结果的和:
package main
import "fmt"
// sum 函数计算切片 s 的总和,并将结果发送到通道 c
func sum(s []int, c chan int) {
sum := 0 // 初始化 sum 变量
for _, v := range s { // 遍历切片 s
sum += v // 累加切片中的每个元素
}
c <- sum // 把计算得到的 sum 发送到通道 c
}
func main() {
// 定义一个整数切片 s
s := []int{7, 2, 8, -9, 4, 0}
// 创建一个整型通道 c
c := make(chan int)
// 启动两个 goroutine,分别计算切片 s 的前半部分和后半部分的总和
go sum(s[:len(s)/2], c) // 计算切片前半部分 [7, 2, 8]
go sum(s[len(s)/2:], c) // 计算切片后半部分 [-9, 4, 0]
// 从通道 c 中接收两个结果
x, y := <-c, <-c // 从通道 c 中接收两个 sum 结果
fmt.Println(x, y, x+y) // 打印两个部分的总和和它们的总和
}
// 输出结果
-5 17 12
通道缓存区
无缓冲的通道:
- 在第 1 步,两个 goroutine 都到达通道,但哪个都没有开始执⾏发送或者接收。
- 在第 2 步,左侧的 goroutine 将它的手伸进了了通道,这模拟了向通道发送数据的⾏为。这时,这个 goroutine 会在通道中被锁住,直到交换完成。
- 在第 3 步,右侧的 goroutine 将它的手放⼊=入通道,这模拟了了从通道⾥接收数据。这个 goroutine 一样也会在通道中被锁住,直到交换完成。
- 在第 4 步和第 5 步,进⾏交换,并最终,在第 6 步,两个 goroutine 都将它们的手从通道里拿出来,这模拟了了被锁住的 goroutine 得到释放。两个 goroutine 现在都可以去做其他事情了了。
无缓冲的通道的特点:
- 接收者,未接收到数据会被阻塞
- 发送者,没有人接收数据也会被阻塞
有缓冲的通道:
- 在第 1 步,右侧的 goroutine 正在从通道接收一个值。
- 在第 2 步,右侧的这个 goroutine 独⽴完成了接收值的动作,而左侧的 goroutine 正在发送一个新值到通道⾥。
- 在第 3 步,左侧的 goroutine 还在向通道发送新值,⽽而右侧的 goroutine 正在从通道接收另外一个值。这个步骤⾥的两个操作既不是同步的,也不会互相阻塞。
- 最后,在第 4 步,所有的发送和接收都完成,而通道⾥还有几个值,也有一些空间可以存更多的值。
有缓冲的通道的特点:
- 当 channel 已经满,再向⾥⾯写数据,就会阻塞
- 当 channel 为空,从⾥面取数据也会阻塞
通道可以设置缓冲区(队列容量),通过 make 的第二个参数指定缓冲区大小:
ch := make(chan int, 100)
带缓冲区的通道允许发送端的数据发送和接收端的数据获取处于异步状态,就是说发送端发送的数据可以放在缓冲区里面,可以等待接收端去获取数据,而不是立刻需要接收端去获取数据。
不过由于缓冲区的大小是有限的,所以还是必须有接收端来接收数据的,否则缓冲区一满,数据发送端就无法再发送数据了。
注意:如果通道不带缓冲,发送方会阻塞直到接收方从通道中接收了值。如果通道带缓冲,发送方则会阻塞直到发送的值被拷贝到缓冲区内;如果缓冲区已满,则意味着需要等待直到某个接收方获取到一个值。接收方在有值可以接收之前会一直阻塞。
package main
import (
"fmt"
"time"
)
func main() {
c := make(chan int, 3) // 带有缓冲的channel
fmt.Println("len(c) = ", len(c), ", cap(c)", cap(c))
go func() {
defer fmt.Println("子go程序结束")
for i := 0; i < 4; i++ {
c <- i
fmt.Println("'子go程序正在运行', 发送的元素=", i, " len(c)=", len(c), "cap(c)=", cap(c))
}
}()
time.Sleep(2 * time.Second)
for i := 0; i < 4; i++ {
num := <-c // 从channel中接收数据并赋值
fmt.Println("num = ", num)
}
fmt.Println("main 结束")
}
// 可能输出结果
len(c) = 0 , cap(c) 3
'子go程序在运行', 发送的元素= 0 len(c)= 1 cap(c)= 3
'子go程序在运行', 发送的元素= 1 len(c)= 2 cap(c)= 3
'子go程序在运行', 发送的元素= 2 len(c)= 3 cap(c)= 3
num = 0
num = 1
'子go程序在运行', 发送的元素= 3 len(c)= 3 cap(c)= 3
子go程序结束
num = 2
num = 3
main 结束
关闭通道
v, ok := <-ch
如果通道接收不到数据后 ok
就为 false
,这时通道就可以使用 close()
函数来关闭。
func main() {
c := make(chan int)
go func() {
for i := 0; i < 5; i++ {
c <- i
}
//close可以关闭一个channel
close(c)
}()
for {
//ok如果为true表示channel没有关闭,如果为false表示channel已经关闭
if data, ok := <-c; ok {
fmt.Println(data)
} else {
break
}
}
fmt.Println("Main Finished..")
}
// 输出结果
0
1
2
3
4
如果将以上代码的 close(c)
去除,则程序在打印 5 次数据之后,就会报错,提示 goroutine 没有关闭造成阻塞死锁。
注意:
- channel 不像文件一样需要经常去关闭,只有当你确实没有任何发送数据了,或者你想显式的结束 range 循环之类的,才去关闭 channel;
- 关闭 channel 后,无法向 channel 再发送数据 (引发 panic 错误后导致接收立即返回零值);
- 关闭 channel 后,可以继续从 channel 接收数据,比如遍历通道操作;
- 对于 nil channel,⽆无论收发都会被阻塞
为什么要关闭通道?
- 防止阻塞:如果不关闭通道,接收方在尝试接收数据时将会阻塞,导致程序无法正常退出。
- 清晰的信号:关闭通道提供了一种明确的信号,告诉接收方所有数据已经发送完毕。
遍历通道
Go 通过 range 关键字来实现遍历读取到的数据,类似于与数组或切片。
close(ch)
for ele := range ch {
fmt.Println(ele)
}
在遍历通道前,一定要先关闭通道,表示不允许再向队列里写入元素,否则可能会发生循环无法退出甚至是死锁情况。因为在循环遍历通道时会阻塞取出通道内的元素,当元素取空时,自动跳出循环。
Select 语句
单流程下一个 goroutine 只能监控一个 channel 的状态,而 select 可以完成监控多个 channel 的状态。select 具备多路 channel 的监控状态功能。
select {
case <- chan1:
// 如果chan1成功读到数据, 则进行该case处理语句
case chan2 <- 1:
// 如果成功向chan2写入数据, 则进行该case处理语句
default:
// 如果上面都没有成功, 则进入default处理流程
}
select
语句使得一个 goroutine 可以等待多个通信操作。select
会阻塞,直到其中的某个 case 可以继续执行:
package main
import "fmt"
// fibonacci 函数计算 Fibonacci 数列,并通过通道 c 发送结果
// 当从 quit 通道接收到信号时,停止计算
func fibonacci(c, quit chan int) {
x, y := 0, 1 // 初始化 Fibonacci 数列的前两个数
for {
select { // 同时监控两个channel
case c <- x: // 将当前 Fibonacci 数值发送到通道 c
x, y = y, x+y // 更新 x 和 y 为下一个 Fibonacci 数值
case <-quit: // 等待 quit 通道的信号
fmt.Println("quit") // 打印退出信息
return // 退出函数,结束 goroutine
}
}
}
func main() {
c := make(chan int) // 创建一个用于发送 Fibonacci 数值的通道
quit := make(chan int) // 创建一个用于接收退出信号的通道
// 启动一个新的 goroutine 用于接收 Fibonacci 数值
go func() {
for i := 0; i < 10; i++ { // 循环接收 10 个 Fibonacci 数值
fmt.Println(<-c) // 从通道 c 接收并打印 Fibonacci 数值
}
quit <- 0 // 发送退出信号到 quit 通道,通知主 goroutine 停止计算
}()
// 调用 fibonacci 函数,开始计算 Fibonacci 数列
fibonacci(c, quit) // 传入通道 c 和 quit
}
// 输出结果
0
1
1
2
3
5
8
13
21
34
quit
参考课程: