Go 语言并发之协程的通信

协程通信的概念

在上一章节的例子中,协程是独立执行的,他们之间没有通信

在实际开发中,协程之间必须通信才会变得更加强大:彼此之间发送接收信息并且协调/同步他们的工作。协程可以使用共享变量来通信,但是不提倡这样做,因为这种方式给所有的共享内存的多线程都带来了困难。

Go 有一种特殊的类型,通道(channel),就像一个可以用于发送类型化数据的管道,由其负责协程之间的通信,从而避开所有由共享内存导致的陷阱,这种通过通道进行通信的方式保证了同步性。数据在通道中进行传递:在任何给定时间,一个数据被设计为只有一个协程可以对其访问,所以不会发生数据竞争,保证了数据的一致性。

工厂的传送带是个很好的例子。一个机器(生产者协程)在传送带上放置物品,另外一个机器(消费者协程)拿到物品并打包。

通道服务于通信的两个目的:值的交换、同步,保证了两个计算(协程)任何时候都是可知状态。

通常使用这样的格式来声明通道:

  1. var identifier chan datatype

未初始化的通道的值是nil

所以通道只能传输一种类型的数据,比如 chan int 或者 chan string,所有的类型都可以用于通道,空接口 interface{} 也可以,甚至可以(有时非常有用)创建通道的通道。

通道实际上是类型化的消息队列:使数据得以传输。它是先进先出(FIFO)的结构所以可以保证发送给他们的元素的顺序。通道也是引用类型,所以我们使用 make() 函数来给它分配内存。这里先声明了一个字符串通道 ch1,然后创建了它(实例化):

  1. var ch1 chan string
  2. ch1 = make(chan string)

当然还可以更简洁: ch1 := make(chan string)

这里我们还可以构建一个int通道的通道: chanOfChans := make(chan int)。或者函数通道:funcChan := make(chan func())

所以通道是第一类对象:可以存储在变量中,作为函数的参数传递,从函数返回以及通过通道发送它们自身。另外它们是类型化的,允许类型检查,比如尝试使用整数通道发送一个指针。

通信操作符 <-

这个操作符直观的标示了数据的传输: 信息按照箭头的方向流动

流向通道(发送)

ch <- int1 表示:用通道 ch 发送变量 int1(双目运算符,中缀 = 发送)

从通道流出(接收)

int2 = <- ch 表示:变量 int2 从通道 ch(一元运算的前缀操作符,前缀 = 接收)接收数据(获取新值),假设 int2 已经声明过,如果没有声明可以写成:int2 := <- ch

<- ch 可以单独调用获取通道的(下一个)值,当前值会被丢弃,但是可以用来验证,所以以下代码是合法的:

  1. if <- ch != 1000{
  2. ...
  3. }

同一个操作符 <- 既用于发送也用于接收,但聪明的Go会根据操作对象弄明白到底是发送还是接收。虽非强制要求,但为了可读性通道的命名通常以 ch 开头或者包含 chan。通道的发送和接收都是原子操作:它们总是互不干扰的完成的。

下面的示例展示了通信操作符的使用,示例 goroutine2.go:

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. ch := make(chan string)
  8. go sendData(ch)
  9. go getData(ch)
  10. time.Sleep(1e9)
  11. }
  12. func sendData(ch chan string) {
  13. ch <- "Washington"
  14. ch <- "Tripoli"
  15. ch <- "London"
  16. ch <- "Beijing"
  17. ch <- "Tokyo"
  18. }
  19. func getData(ch chan string) {
  20. var input string
  21. // time.Sleep(2e9)
  22. for {
  23. input = <-ch
  24. fmt.Printf("%s ", input)
  25. }
  26. }

输出:

  1. Washington Tripoli London Beijing tokyo

main()函数中启动了两个协程:sendData() 通过通道 ch 发送了 5 个字符串,getData() 按顺序接收它们并打印出来。

如果 2个协程需要通信,你必须给他们同一个通道作为参数才行。

大家可以尝试一下如果注释掉 time.Sleep(1e9) 会如何。

我们发现协程之间的同步非常重要:

main()等待了 1 秒让两个协程完成,如果不等待,sendData() 就没有机会输出。getData() 使用了无限循环:它随着 sendData()的发送完成和 ch 变空才会结束。

如果我们移除一个或所有 go 关键字,程序无法运行,Go 运行时会抛出 panic

  1. ---- Error run E:/Go/Goboek/code examples/chapter 14/goroutine2.exe with code Crashed ---- Program exited with code -2147483645: panic: all goroutines are asleep-deadlock!

运行时(runtime)会检查所有的协程(像本例中只有一个)是否在等待着什么东西(可从某个通道读取或者写入某个通道),这意味着程序将无法继续执行。这是死锁(deadlock)的一种形式,而运行时(runtime)可以为我们检测到这种情况。

注意:不要使用打印状态来表明通道的发送和接收顺序:由于打印状态和通道实际发生读写的时间延迟会导致和真实发生的顺序不同。

通道阻塞

默认情况下,通信是同步且无缓冲的,在有接受者接收数据之前,发送不会结束。可以想象一个无缓冲的通道在没有空间来保存数据的时候,必须要一个接收者准备好接收通道的数据然后发送者可以直接把数据发送给接收者。所以通道的发送/接收操作在对方准备好之前是阻塞的:

  1. 对于同一个通道,发送操作(协程或者函数中的),在接收者准备好之前是阻塞的:如果ch中的数据无人接收,就无法再给通道传入其他数据:新的输入无法在通道非空的情况下传入。所以发送操作会等待 ch 再次变为可用状态:就是通道值被接收时(可以传入变量)。

  2. 对于同一个通道,接收操作是阻塞的(协程或函数中的),直到发送者可用:如果通道中没有数据,接收者就阻塞了。

下面的示例验证了以上理论,一个协程在无限循环中给通道发送整数数据。因为没有接收者,所以只输出了一个数字 0。

示例 channel_block.go:

  1. package main
  2. import "fmt"
  3. func main() {
  4. ch1 := make(chan int)
  5. go pump(ch1) // pump hangs
  6. fmt.Println(<-ch1) // prints only 0
  7. }
  8. func pump(ch chan int) {
  9. for i := 0; ; i++ {
  10. ch <- i
  11. }
  12. }

输出:

  1. 0

pump() 函数为通道提供数值,也被叫做生产者

为通道解除阻塞定义了 suck 函数来在无限循环中读取通道:

  1. func suck(ch chan int) {
  2. for {
  3. fmt.Println(<-ch)
  4. }
  5. }

main() 中使用协程开始它:

  1. go pump(ch1)
  2. go suck(ch1)
  3. time.Sleep(1e9)

给程序 1 秒的时间来运行:输出了上万个整数。

14.2.4 通过一个(或多个)通道交换数据进行协程同步。通信是一种同步形式:通过通道,两个协程在通信(协程会和)中某刻同步交换数据。无缓冲通道成为了多个协程同步的完美工具。

甚至可以在通道两端互相阻塞对方,形成了叫做死锁的状态。Go 运行时会检查并 panic,停止程序。死锁几乎完全是由糟糕的设计导致的。

无缓冲通道会被阻塞。设计无阻塞的程序可以避免这种情况,或者使用带缓冲的通道。

练习:解释为什么下边这个程序会导致 panic:所有的协程都休眠了 - 死锁!

  1. package main
  2. import (
  3. "fmt"
  4. )
  5. func f1(in chan int) {
  6. fmt.Println(<-in)
  7. }
  8. func main() {
  9. out := make(chan int)
  10. out <- 2
  11. go f1(out)
  12. }

同步通道-使用带缓冲的通道

一个无缓冲通道只能包含 1 个元素,有时显得很局限。我们给通道提供了一个缓存,可以在扩展的 make 命令中设置它的容量,如下:

  1. buf := 100
  2. ch1 := make(chan string, buf)

buf 是通道可以同时容纳的元素(这里是 string)个数

在缓冲满载(缓冲被全部使用)之前,给一个带缓冲的通道发送数据是不会阻塞的,而从通道读取数据也不会阻塞,直到缓冲空了。

缓冲容量和类型无关,所以可以(尽管可能导致危险)给一些通道设置不同的容量,只要他们拥有同样的元素类型。内置的 cap 函数可以返回缓冲区的容量。

如果容量大于 0,通道会变成异步的:缓冲满载(发送)变空(接收)之前通信不会阻塞,元素会按照发送的顺序被接收。如果容量是0或者未设置,通信仅在收发双方准备好的情况下才可以成功。

同步:ch :=make(chan type, value)

  1. value == 0 -> synchronous, unbuffered (阻塞)
  2. value > 0 -> asynchronous, buffered(非阻塞)取决于value元素

若使用通道的缓冲,你的程序会在“请求”激增的时候表现更好,更具有弹性,专业术语叫:更具有伸缩性(scalable)。在设计算法时首先考虑使用无缓冲通道,只在不确定的情况下使用缓冲。

协程中用通道输出结果

为了知道计算何时完成,可以通过信道回报:

  1. ch := make(chan int)
  2. go sum(bigArray, ch) // bigArray puts the calculated sum on ch
  3. // .. do something else for a while
  4. sum := <- ch // wait for, and retrieve the sum

也可以使用通道来达到同步的目的,这个很有效的用法在传统计算机中称为信号量(semaphore)。或者换个方式:通过通道发送信号告知处理已经完成(在协程中)。

在其他协程运行时让 main 程序无限阻塞的通常做法是在 main 函数的最后放置一个 select {}

也可以使用通道让 main 程序等待协程完成,就是所谓的信号量模式,我们会在接下来的部分讨论。

信号量模式

下边的示例展示了协程通过在通道 ch 中放置一个值来处理结束的信号。main 协程等待 <-ch 直到从中获取到值。

我们期望从这个通道中获取返回的结果,像这样:

  1. func compute(ch chan int){
  2. ch <- someComputation() // when it completes, signal on the channel.
  3. }
  4. func main(){
  5. ch := make(chan int) // allocate a channel.
  6. go compute(ch) // start something in a goroutines
  7. doSomethingElseForAWhile()
  8. result := <- ch
  9. }

这个信号也可以是其他的,不返回结果,比如下面这个协程中的匿名函数(lambda)协程:

  1. ch := make(chan int)
  2. go func(){
  3. // doSomething
  4. ch <- 1 // Send a signal; value does not matter
  5. }()
  6. doSomethingElseForAWhile()
  7. <- ch // Wait for goroutine to finish; discard sent value.

或者等待两个协程完成,每一个都会对切片s的一部分进行排序,片段如下:

  1. done := make(chan bool)
  2. // doSort is a lambda function, so a closure which knows the channel done:
  3. doSort := func(s []int){
  4. sort(s)
  5. done <- true
  6. }
  7. i := pivot(s)
  8. go doSort(s[:i])
  9. go doSort(s[i:])
  10. <-done
  11. <-done

下面的示例,用完整的信号量模式对长度为Nfloat64 切片进行了 NdoSomething() 计算并同时完成,通道 sem 分配了相同的长度(且包含空接口类型的元素),待所有的计算都完成后,发送信号(通过放入值)。在循环中从通道 sem 不停的接收数据来等待所有的协程完成。

  1. type Empty interface {}
  2. var empty Empty
  3. ...
  4. data := make([]float64, N)
  5. res := make([]float64, N)
  6. sem := make(chan Empty, N)
  7. ...
  8. for i, xi := range data {
  9. go func (i int, xi float64) {
  10. res[i] = doSomething(i, xi)
  11. sem <- empty
  12. } (i, xi)
  13. }
  14. // wait for goroutines to finish
  15. for i := 0; i < N; i++ { <-sem }

实现并行的 for 循环

  1. for i, v := range data {
  2. go func (i int, v float64) {
  3. doSomething(i, v)
  4. ...
  5. } (i, v)
  6. }

for 循环中并行计算迭代可能带来很好的性能提升。不过所有的迭代都必须是独立完成的。有些语言比如 Fortress 或者其他并行框架以不同的结构实现了这种方式,在 Go 中用协程实现起来非常容易。

用带缓冲通道实现一个信号量

信号量是实现互斥锁(排外锁)常见的同步机制,限制对资源的访问,解决读写问题,比如没有实现信号量的 syncGo 包,使用带缓冲的通道可以轻松实现:

  • 带缓冲通道的容量和要同步的资源容量相同
  • 通道的长度(当前存放的元素个数)与当前资源被使用的数量相同
  • 容量减去通道的长度就是未处理的资源个数(标准信号量的整数值)

不用管通道中存放的是什么,只关注长度;因此我们创建了一个长度可变但容量为0(字节)的通道:

  1. type Empty interface {}
  2. type semaphore chan Empty

用可用资源的数量N来初始化信号量 semaphore:sem = make(semaphore, N),然后直接对信号量进行操作:

  1. // acquire n resources
  2. func (s semaphore) P(n int) {
  3. e := new(Empty)
  4. for i := 0; i < n; i++ {
  5. s <- e
  6. }
  7. }
  8. // release n resources
  9. func (s semaphore) V(n int) {
  10. for i:= 0; i < n; i++{
  11. <- s
  12. }
  13. }

可以用来实现一个互斥的例子:

  1. /* mutexes */
  2. func (s semaphore) Lock() {
  3. s.P(1)
  4. }
  5. func (s semaphore) Unlock(){
  6. s.V(1)
  7. }
  8. /* signal-wait */
  9. func (s semaphore) Wait(n int) {
  10. s.P(n)
  11. }
  12. func (s semaphore) Signal() {
  13. s.V(1)
  14. }

通道工厂模式

编程中常见的另外一种模式如下:不将通道作为参数传递给协程,而是用函数来生成一个通道并返回(工厂角色),函数内有个匿名函数被协程调用:

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. stream := pump()
  8. go suck(stream)
  9. time.Sleep(1e9)
  10. }
  11. func pump() chan int {
  12. ch := make(chan int)
  13. go func() {
  14. for i := 0; ; i++ {
  15. ch <- i
  16. }
  17. }()
  18. return ch
  19. }
  20. func suck(ch chan int) {
  21. for {
  22. fmt.Println(<-ch)
  23. }
  24. }

对通道使用 for 循环

for 循环的 range 语句可以用在通道 ch 上,便可以从通道中获取值:

  1. for v := range ch {
  2. fmt.Printf("The value is %v\n", v)
  3. }

它从指定通道中读取数据直到通道关闭,才继续执行下边的代码。很明显,另外一个协程必须写入 ch(不然代码就阻塞在 for 循环了),而且必须在写入完成后才关闭。suck 函数可以这样写,且在协程中调用这个动作,程序变成了这样:

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. suck(pump())
  8. time.Sleep(1e9)
  9. }
  10. func pump() chan int {
  11. ch := make(chan int)
  12. go func() {
  13. for i := 0; ; i++ {
  14. ch <- i
  15. }
  16. }()
  17. return ch
  18. }
  19. func suck(ch chan int) {
  20. go func() {
  21. for v := range ch {
  22. fmt.Println(v)
  23. }
  24. }()
  25. }

通道的方向

通道类型可以用注解来表示它只发送或者只接收:

  1. var send_only chan<- int // channel can only receive data
  2. var recv_only <-chan int // channel can only send data

只接收的通道(<-chan T)无法关闭,因为关闭通道是发送者用来表示不再给通道发送值了,所以对只接收通道是没有意义的。通道创建的时候都是双向的,但也可以分配有方向的通道变量,就像以下代码:

  1. var c = make(chan int) // bidirectional
  2. go source(c)
  3. go sink(c)
  4. func source(ch chan<- int){
  5. for { ch <- 1 }
  6. }
  7. func sink(ch <-chan int) {
  8. for { <-ch }
  9. }