原文鏈接:https://gopl-zh.github.io/ch8/ch8-04.html
如果說goroutine是Go語言程序的并發(fā)體的話,那么channels則是它們之間的通信機制。一個channel是一個通信機制,它可以讓一個goroutine通過它給另一個goroutine發(fā)送值信息。每個channel都有一個特殊的類型,也就是channels可發(fā)送數(shù)據(jù)的類型。一個可以發(fā)送int類型數(shù)據(jù)的channel一般寫為chan int。
使用內(nèi)置的make函數(shù),我們可以創(chuàng)建一個channel:
ch := make(chan int) // ch has type 'chan int'
和map類似,channel也對應一個make創(chuàng)建的底層數(shù)據(jù)結構的引用。當我們復制一個channel或用于函數(shù)參數(shù)傳遞時,我們只是拷貝了一個channel引用,因此調(diào)用者和被調(diào)用者將引用同一個channel對象。和其它的引用類型一樣,channel的零值也是nil。
兩個相同類型的channel可以使用==運算符比較。如果兩個channel引用的是相同的對象,那么比較的結果為真。一個channel也可以和nil進行比較。
一個channel有發(fā)送和接受兩個主要操作,都是通信行為。一個發(fā)送語句將一個值從一個goroutine通過channel發(fā)送到另一個執(zhí)行接收操作的goroutine。發(fā)送和接收兩個操作都使用<-
運算符。在發(fā)送語句中,<-
運算符分割channel和要發(fā)送的值。在接收語句中,<-
運算符寫在channel對象之前。一個不使用接收結果的接收操作也是合法的。
ch <- x // a send statement
x = <-ch // a receive expression in an assignment statement
<-ch // a receive statement; result is discarded
Channel還支持close操作,用于關閉channel,隨后對基于該channel的任何發(fā)送操作都將導致panic異常。對一個已經(jīng)被close過的channel進行接收操作依然可以接受到之前已經(jīng)成功發(fā)送的數(shù)據(jù);如果channel中已經(jīng)沒有數(shù)據(jù)的話將產(chǎn)生一個零值的數(shù)據(jù)。
使用內(nèi)置的close函數(shù)就可以關閉一個channel:
close(ch)
以最簡單方式調(diào)用make函數(shù)創(chuàng)建的是一個無緩存的channel,但是我們也可以指定第二個整型參數(shù),對應channel的容量。如果channel的容量大于零,那么該channel就是帶緩存的channel。
ch = make(chan int) // unbuffered channel
ch = make(chan int, 0) // unbuffered channel
ch = make(chan int, 3) // buffered channel with capacity 3
我們將先討論無緩存的channel,然后在8.4.4節(jié)討論帶緩存的channel。
一個基于無緩存Channels的發(fā)送操作將導致發(fā)送者goroutine阻塞,直到另一個goroutine在相同的Channels上執(zhí)行接收操作,當發(fā)送的值通過Channels成功傳輸之后,兩個goroutine可以繼續(xù)執(zhí)行后面的語句。反之,如果接收操作先發(fā)生,那么接收者goroutine也將阻塞,直到有另一個goroutine在相同的Channels上執(zhí)行發(fā)送操作。
基于無緩存Channels的發(fā)送和接收操作將導致兩個goroutine做一次同步操作。因為這個原因,無緩存Channels有時候也被稱為同步Channels。當通過一個無緩存Channels發(fā)送數(shù)據(jù)時,接收者收到數(shù)據(jù)發(fā)生在再次喚醒發(fā)送者goroutine之前(譯注:happens before,這是Go語言并發(fā)內(nèi)存模型的一個關鍵術語!)。
在討論并發(fā)編程時,當我們說x事件在y事件之前發(fā)生(happens before),我們并不是說x事件在時間上比y時間更早;我們要表達的意思是要保證在此之前的事件都已經(jīng)完成了,例如在此之前的更新某些變量的操作已經(jīng)完成,你可以放心依賴這些已完成的事件了。
當我們說x事件既不是在y事件之前發(fā)生也不是在y事件之后發(fā)生,我們就說x事件和y事件是并發(fā)的。這并不是意味著x事件和y事件就一定是同時發(fā)生的,我們只是不能確定這兩個事件發(fā)生的先后順序。在下一章中我們將看到,當兩個goroutine并發(fā)訪問了相同的變量時,我們有必要保證某些事件的執(zhí)行順序,以避免出現(xiàn)某些并發(fā)問題。
在8.3節(jié)的客戶端程序,它在主goroutine中(譯注:就是執(zhí)行main函數(shù)的goroutine)將標準輸入復制到server,因此當客戶端程序關閉標準輸入時,后臺goroutine可能依然在工作。我們需要讓主goroutine等待后臺goroutine完成工作后再退出,我們使用了一個channel來同步兩個goroutine:
gopl.io/ch8/netcat3
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
done := make(chan struct{})
go func() {
io.Copy(os.Stdout, conn) // NOTE: ignoring errors
log.Println("done")
done <- struct{}{} // signal the main goroutine
}()
mustCopy(conn, os.Stdin)
conn.Close()
<-done // wait for background goroutine to finish
}
當用戶關閉了標準輸入,主goroutine中的mustCopy函數(shù)調(diào)用將返回,然后調(diào)用conn.Close()關閉讀和寫方向的網(wǎng)絡連接。關閉網(wǎng)絡連接中的寫方向的連接將導致server程序收到一個文件(end-of-file)結束的信號。關閉網(wǎng)絡連接中讀方向的連接將導致后臺goroutine的io.Copy函數(shù)調(diào)用返回一個“read from closed connection”(“從關閉的連接讀”)類似的錯誤,因此我們臨時移除了錯誤日志語句;在練習8.3將會提供一個更好的解決方案。(需要注意的是go語句調(diào)用了一個函數(shù)字面量,這是Go語言中啟動goroutine常用的形式。)
在后臺goroutine返回之前,它先打印一個日志信息,然后向done對應的channel發(fā)送一個值。主goroutine在退出前先等待從done對應的channel接收一個值。因此,總是可以在程序退出前正確輸出“done”消息。
基于channels發(fā)送消息有兩個重要方面。首先每個消息都有一個值,但是有時候通訊的事實和發(fā)生的時刻也同樣重要。當我們更希望強調(diào)通訊發(fā)生的時刻時,我們將它稱為消息事件。有些消息事件并不攜帶額外的信息,它僅僅是用作兩個goroutine之間的同步,這時候我們可以用struct{}
空結構體作為channels元素的類型,雖然也可以使用bool或int類型實現(xiàn)同樣的功能,done <- 1
語句也比done <- struct{}{}
更短。
練習 8.3: 在netcat3例子中,conn雖然是一個interface類型的值,但是其底層真實類型是*net.TCPConn
,代表一個TCP連接。一個TCP連接有讀和寫兩個部分,可以使用CloseRead和CloseWrite方法分別關閉它們。修改netcat3的主goroutine代碼,只關閉網(wǎng)絡連接中寫的部分,這樣的話后臺goroutine可以在標準輸入被關閉后繼續(xù)打印從reverb1服務器傳回的數(shù)據(jù)。(要在reverb2服務器也完成同樣的功能是比較困難的;參考練習 8.4。)
Channels也可以用于將多個goroutine連接在一起,一個Channel的輸出作為下一個Channel的輸入。這種串聯(lián)的Channels就是所謂的管道(pipeline)。下面的程序用兩個channels將三個goroutine串聯(lián)起來,如圖8.1所示。
第一個goroutine是一個計數(shù)器,用于生成0、1、2、……形式的整數(shù)序列,然后通過channel將該整數(shù)序列發(fā)送給第二個goroutine;第二個goroutine是一個求平方的程序,對收到的每個整數(shù)求平方,然后將平方后的結果通過第二個channel發(fā)送給第三個goroutine;第三個goroutine是一個打印程序,打印收到的每個整數(shù)。為了保持例子清晰,我們有意選擇了非常簡單的函數(shù),當然三個goroutine的計算很簡單,在現(xiàn)實中確實沒有必要為如此簡單的運算構建三個goroutine。
gopl.io/ch8/pipeline1
func main() {
naturals := make(chan int)
squares := make(chan int)
// Counter
go func() {
for x := 0; ; x++ {
naturals <- x
}
}()
// Squarer
go func() {
for {
x := <-naturals
squares <- x * x
}
}()
// Printer (in main goroutine)
for {
fmt.Println(<-squares)
}
}
如您所料,上面的程序?qū)⑸?、1、4、9、……形式的無窮數(shù)列。像這樣的串聯(lián)Channels的管道(Pipelines)可以用在需要長時間運行的服務中,每個長時間運行的goroutine可能會包含一個死循環(huán),在不同goroutine的死循環(huán)內(nèi)部使用串聯(lián)的Channels來通信。但是,如果我們希望通過Channels只發(fā)送有限的數(shù)列該如何處理呢?
如果發(fā)送者知道,沒有更多的值需要發(fā)送到channel的話,那么讓接收者也能及時知道沒有多余的值可接收將是有用的,因為接收者可以停止不必要的接收等待。這可以通過內(nèi)置的close函數(shù)來關閉channel實現(xiàn):
close(naturals)
當一個channel被關閉后,再向該channel發(fā)送數(shù)據(jù)將導致panic異常。當一個被關閉的channel中已經(jīng)發(fā)送的數(shù)據(jù)都被成功接收后,后續(xù)的接收操作將不再阻塞,它們會立即返回一個零值。關閉上面例子中的naturals變量對應的channel并不能終止循環(huán),它依然會收到一個永無休止的零值序列,然后將它們發(fā)送給打印者goroutine。
沒有辦法直接測試一個channel是否被關閉,但是接收操作有一個變體形式:它多接收一個結果,多接收的第二個結果是一個布爾值ok,ture表示成功從channels接收到值,false表示channels已經(jīng)被關閉并且里面沒有值可接收。使用這個特性,我們可以修改squarer函數(shù)中的循環(huán)代碼,當naturals對應的channel被關閉并沒有值可接收時跳出循環(huán),并且也關閉squares對應的channel.
// Squarer
go func() {
for {
x, ok := <-naturals
if !ok {
break // channel was closed and drained
}
squares <- x * x
}
close(squares)
}()
因為上面的語法是笨拙的,而且這種處理模式很常見,因此Go語言的range循環(huán)可直接在channels上面迭代。使用range循環(huán)是上面處理模式的簡潔語法,它依次從channel接收數(shù)據(jù),當channel被關閉并且沒有值可接收時跳出循環(huán)。
在下面的改進中,我們的計數(shù)器goroutine只生成100個含數(shù)字的序列,然后關閉naturals對應的channel,這將導致計算平方數(shù)的squarer對應的goroutine可以正常終止循環(huán)并關閉squares對應的channel。(在一個更復雜的程序中,可以通過defer語句關閉對應的channel。)最后,主goroutine也可以正常終止循環(huán)并退出程序。
gopl.io/ch8/pipeline2
func main() {
naturals := make(chan int)
squares := make(chan int)
// Counter
go func() {
for x := 0; x < 100; x++ {
naturals <- x
}
close(naturals)
}()
// Squarer
go func() {
for x := range naturals {
squares <- x * x
}
close(squares)
}()
// Printer (in main goroutine)
for x := range squares {
fmt.Println(x)
}
}
其實你并不需要關閉每一個channel。只有當需要告訴接收者goroutine,所有的數(shù)據(jù)已經(jīng)全部發(fā)送時才需要關閉channel。不管一個channel是否被關閉,當它沒有被引用時將會被Go語言的垃圾自動回收器回收。(不要將關閉一個打開文件的操作和關閉一個channel操作混淆。對于每個打開的文件,都需要在不使用的時候調(diào)用對應的Close方法來關閉文件。)
試圖重復關閉一個channel將導致panic異常,試圖關閉一個nil值的channel也將導致panic異常。關閉一個channels還會觸發(fā)一個廣播機制,我們將在8.9節(jié)討論。
隨著程序的增長,人們習慣于將大的函數(shù)拆分為小的函數(shù)。我們前面的例子中使用了三個goroutine,然后用兩個channels來連接它們,它們都是main函數(shù)的局部變量。將三個goroutine拆分為以下三個函數(shù)是自然的想法:
func counter(out chan int)
func squarer(out, in chan int)
func printer(in chan int)
其中計算平方的squarer函數(shù)在兩個串聯(lián)Channels的中間,因此擁有兩個channel類型的參數(shù),一個用于輸入一個用于輸出。兩個channel都擁有相同的類型,但是它們的使用方式相反:一個只用于接收,另一個只用于發(fā)送。參數(shù)的名字in和out已經(jīng)明確表示了這個意圖,但是并無法保證squarer函數(shù)向一個in參數(shù)對應的channel發(fā)送數(shù)據(jù)或者從一個out參數(shù)對應的channel接收數(shù)據(jù)。
這種場景是典型的。當一個channel作為一個函數(shù)參數(shù)時,它一般總是被專門用于只發(fā)送或者只接收。
為了表明這種意圖并防止被濫用,Go語言的類型系統(tǒng)提供了單方向的channel類型,分別用于只發(fā)送或只接收的channel。類型chan<- int
表示一個只發(fā)送int的channel,只能發(fā)送不能接收。相反,類型<-chan int
表示一個只接收int的channel,只能接收不能發(fā)送。(箭頭<-
和關鍵字chan的相對位置表明了channel的方向。)這種限制將在編譯期檢測。
因為關閉操作只用于斷言不再向channel發(fā)送新的數(shù)據(jù),所以只有在發(fā)送者所在的goroutine才會調(diào)用close函數(shù),因此對一個只接收的channel調(diào)用close將是一個編譯錯誤。
這是改進的版本,這一次參數(shù)使用了單方向channel類型:
gopl.io/ch8/pipeline3
func counter(out chan<- int) {
for x := 0; x < 100; x++ {
out <- x
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for v := range in {
out <- v * v
}
close(out)
}
func printer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
naturals := make(chan int)
squares := make(chan int)
go counter(naturals)
go squarer(squares, naturals)
printer(squares)
}
調(diào)用counter(naturals)時,naturals的類型將隱式地從chan int轉(zhuǎn)換成chan<- int。調(diào)用printer(squares)也會導致相似的隱式轉(zhuǎn)換,這一次是轉(zhuǎn)換為<-chan int
類型只接收型的channel。任何雙向channel向單向channel變量的賦值操作都將導致該隱式轉(zhuǎn)換。這里并沒有反向轉(zhuǎn)換的語法:也就是不能將一個類似chan<- int
類型的單向型的channel轉(zhuǎn)換為chan int
類型的雙向型的channel。
帶緩存的Channel內(nèi)部持有一個元素隊列。隊列的最大容量是在調(diào)用make函數(shù)創(chuàng)建channel時通過第二個參數(shù)指定的。下面的語句創(chuàng)建了一個可以持有三個字符串元素的帶緩存Channel。圖8.2是ch變量對應的channel的圖形表示形式。
ch = make(chan string, 3)
向緩存Channel的發(fā)送操作就是向內(nèi)部緩存隊列的尾部插入元素,接收操作則是從隊列的頭部刪除元素。如果內(nèi)部緩存隊列是滿的,那么發(fā)送操作將阻塞直到因另一個goroutine執(zhí)行接收操作而釋放了新的隊列空間。相反,如果channel是空的,接收操作將阻塞直到有另一個goroutine執(zhí)行發(fā)送操作而向隊列插入元素。
我們可以在無阻塞的情況下連續(xù)向新創(chuàng)建的channel發(fā)送三個值:
ch <- "A"
ch <- "B"
ch <- "C"
此刻,channel的內(nèi)部緩存隊列將是滿的(圖8.3),如果有第四個發(fā)送操作將發(fā)生阻塞。
如果我們接收一個值,
fmt.Println(<-ch) // "A"
那么channel的緩存隊列將不是滿的也不是空的(圖8.4),因此對該channel執(zhí)行的發(fā)送或接收操作都不會發(fā)生阻塞。通過這種方式,channel的緩存隊列解耦了接收和發(fā)送的goroutine。
在某些特殊情況下,程序可能需要知道channel內(nèi)部緩存的容量,可以用內(nèi)置的cap函數(shù)獲?。?
fmt.Println(cap(ch)) // "3"
同樣,對于內(nèi)置的len函數(shù),如果傳入的是channel,那么將返回channel內(nèi)部緩存隊列中有效元素的個數(shù)。因為在并發(fā)程序中該信息會隨著接收操作而失效,但是它對某些故障診斷和性能優(yōu)化會有幫助。
fmt.Println(len(ch)) // "2"
在繼續(xù)執(zhí)行兩次接收操作后channel內(nèi)部的緩存隊列將又成為空的,如果有第四個接收操作將發(fā)生阻塞:
fmt.Println(<-ch) // "B"
fmt.Println(<-ch) // "C"
在這個例子中,發(fā)送和接收操作都發(fā)生在同一個goroutine中,但是在真實的程序中它們一般由不同的goroutine執(zhí)行。Go語言新手有時候會將一個帶緩存的channel當作同一個goroutine中的隊列使用,雖然語法看似簡單,但實際上這是一個錯誤。Channel和goroutine的調(diào)度器機制是緊密相連的,如果沒有其他goroutine從channel接收,發(fā)送者——或許是整個程序——將會面臨永遠阻塞的風險。如果你只是需要一個簡單的隊列,使用slice就可以了。
下面的例子展示了一個使用了帶緩存channel的應用。它并發(fā)地向三個鏡像站點發(fā)出請求,三個鏡像站點分散在不同的地理位置。它們分別將收到的響應發(fā)送到帶緩存channel,最后接收者只接收第一個收到的響應,也就是最快的那個響應。因此mirroredQuery函數(shù)可能在另外兩個響應慢的鏡像站點響應之前就返回了結果。(順便說一下,多個goroutines并發(fā)地向同一個channel發(fā)送數(shù)據(jù),或從同一個channel接收數(shù)據(jù)都是常見的用法。)
func mirroredQuery() string {
responses := make(chan string, 3)
go func() { responses <- request("asia.gopl.io") }()
go func() { responses <- request("europe.gopl.io") }()
go func() { responses <- request("americas.gopl.io") }()
return <-responses // return the quickest response
}
func request(hostname string) (response string) { /* ... */ }
如果我們使用了無緩存的channel,那么兩個慢的goroutines將會因為沒有人接收而被永遠卡住。這種情況,稱為goroutines泄漏,這將是一個BUG。和垃圾變量不同,泄漏的goroutines并不會被自動回收,因此確保每個不再需要的goroutine能正常退出是重要的。
關于無緩存或帶緩存channels之間的選擇,或者是帶緩存channels的容量大小的選擇,都可能影響程序的正確性。無緩存channel更強地保證了每個發(fā)送操作與相應的同步接收操作;但是對于帶緩存channel,這些操作是解耦的。同樣,即使我們知道將要發(fā)送到一個channel的信息的數(shù)量上限,創(chuàng)建一個對應容量大小的帶緩存channel也是不現(xiàn)實的,因為這要求在執(zhí)行任何接收操作之前緩存所有已經(jīng)發(fā)送的值。如果未能分配足夠的緩存將導致程序死鎖。
Channel的緩存也可能影響程序的性能。想象一家蛋糕店有三個廚師,一個烘焙,一個上糖衣,還有一個將每個蛋糕傳遞到它下一個廚師的生產(chǎn)線。在狹小的廚房空間環(huán)境,每個廚師在完成蛋糕后必須等待下一個廚師已經(jīng)準備好接受它;這類似于在一個無緩存的channel上進行溝通。
如果在每個廚師之間有一個放置一個蛋糕的額外空間,那么每個廚師就可以將一個完成的蛋糕臨時放在那里而馬上進入下一個蛋糕的制作中;這類似于將channel的緩存隊列的容量設置為1。只要每個廚師的平均工作效率相近,那么其中大部分的傳輸工作將是迅速的,個體之間細小的效率差異將在交接過程中彌補。如果廚師之間有更大的額外空間——也是就更大容量的緩存隊列——將可以在不停止生產(chǎn)線的前提下消除更大的效率波動,例如一個廚師可以短暫地休息,然后再加快趕上進度而不影響其他人。
另一方面,如果生產(chǎn)線的前期階段一直快于后續(xù)階段,那么它們之間的緩存在大部分時間都將是滿的。相反,如果后續(xù)階段比前期階段更快,那么它們之間的緩存在大部分時間都將是空的。對于這類場景,額外的緩存并沒有帶來任何好處。
生產(chǎn)線的隱喻對于理解channels和goroutines的工作機制是很有幫助的。例如,如果第二階段是需要精心制作的復雜操作,一個廚師可能無法跟上第一個廚師的進度,或者是無法滿足第三階段廚師的需求。要解決這個問題,我們可以再雇傭另一個廚師來幫助完成第二階段的工作,他執(zhí)行相同的任務但是獨立工作。這類似于基于相同的channels創(chuàng)建另一個獨立的goroutine。
我們沒有太多的空間展示全部細節(jié),但是gopl.io/ch8/cake包模擬了這個蛋糕店,可以通過不同的參數(shù)調(diào)整。它還對上面提到的幾種場景提供對應的基準測試(§11.4) 。
![]() | ![]() |
更多建議: