Go語言 如何優(yōu)雅地關閉通道

2023-02-16 17:39 更新

在本文發(fā)表數(shù)日前,我曾寫了一篇文章來解釋通道的規(guī)則。 那篇文章在redditHN上獲得了很多點贊,但也有很多人對Go通道的細節(jié)設計提出了一些批評意見。

這些批評主要針對于通道設計中的下列細節(jié):

  1. 沒有一個簡單和通用的方法用來在不改變一個通道的狀態(tài)的情況下檢查這個通道是否已經關閉。
  2. 關閉一個已經關閉的通道將產生一個恐慌,所以在不知道一個通道是否已經關閉的時候關閉此通道是很危險的。
  3. 向一個已關閉的通道發(fā)送數(shù)據(jù)將產生一個恐慌,所以在不知道一個通道是否已經關閉的時候向此通道發(fā)送數(shù)據(jù)是很危險的。

這些批評看上去有幾分道理(實際上屬于對通道的不正確使用導致的偏見)。 是的,Go語言中并沒有提供一個內置函數(shù)來檢查一個通道是否已經關閉。

在Go中,如果我們能夠保證從不會向一個通道發(fā)送數(shù)據(jù),那么有一個簡單的方法來判斷此通道是否已經關閉。 此方法已經在上一篇文章通道用例大全中展示過了。 這里為了本文的連貫性,在下面的例子中重新列出了此方法。

package main

import "fmt"

type T int

func IsClosed(ch <-chan T) bool {
	select {
	case <-ch:
		return true
	default:
	}

	return false
}

func main() {
	c := make(chan T)
	fmt.Println(IsClosed(c)) // false
	close(c)
	fmt.Println(IsClosed(c)) // true
}

如前所述,此方法并不是一個通用的檢查通道是否已經關閉的方法。

事實上,即使有一個內置closed函數(shù)用來檢查一個通道是否已經關閉,它的有用性也是十分有限的。 原因是當此函數(shù)的一個調用的結果返回時,被查詢的通道的狀態(tài)可能已經又改變了,導致此調用結果并不能反映出被查詢的通道的最新狀態(tài)。 雖然我們可以根據(jù)一個調用closed(ch)的返回結果為true而得出我們不應該再向通道ch發(fā)送數(shù)據(jù)的結論, 但是我們不能根據(jù)一個調用closed(ch)的返回結果為false而得出我們可以繼續(xù)向通道ch發(fā)送數(shù)據(jù)的結論。

通道關閉原則

一個常用的使用Go通道的原則是不要在數(shù)據(jù)接收方或者在有多個發(fā)送者的情況下關閉通道。 換句話說,我們只應該讓一個通道唯一的發(fā)送者關閉此通道。

下面我們將稱此原則為通道關閉原則

當然,這并不是一個通用的關閉通道的原則。通用的原則是不要關閉已關閉的通道。 如果我們能夠保證從某個時刻之后,再沒有協(xié)程將向一個未關閉的非nil通道發(fā)送數(shù)據(jù),則一個協(xié)程可以安全地關閉此通道。 然而,做出這樣的保證常常需要很大的努力,從而導致代碼過度復雜。 另一方面,遵循通道關閉原則是一件相對簡單的事兒。

粗魯?shù)仃P閉通道的方法

如果由于某種原因,你一定非要從數(shù)據(jù)接收方或者讓眾多發(fā)送者中的一個關閉一個通道,你可以使用恢復機制來防止可能產生的恐慌而導致程序崩潰。 下面就是這樣的一個實現(xiàn)(假設通道的元素類型為T)。

func SafeClose(ch chan T) (justClosed bool) {
	defer func() {
		if recover() != nil {
			// 一個函數(shù)的返回結果可以在defer調用中修改。
			justClosed = false
		}
	}()

	// 假設ch != nil。
	close(ch)   // 如果ch已關閉,則產生一個恐慌。
	return true // <=> justClosed = true; return
}

此方法違反了通道關閉原則。

同樣的方法可以用來粗魯?shù)叵蛞粋€關閉狀態(tài)未知的通道發(fā)送數(shù)據(jù)。

func SafeSend(ch chan T, value T) (closed bool) {
	defer func() {
		if recover() != nil {
			closed = true
		}
	}()

	ch <- value  // 如果ch已關閉,則產生一個恐慌。
	return false // <=> closed = false; return
}

這樣的粗魯方法不僅違反了通道關閉原則,而且Go白皮書和標準編譯器不保證它的實現(xiàn)中不存在數(shù)據(jù)競爭。

禮貌地關閉通道的方法

很多Go程序員喜歡使用sync.Once來關閉通道。

type MyChannel struct {
	C    chan T
	once sync.Once
}

func NewMyChannel() *MyChannel {
	return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
	mc.once.Do(func() {
		close(mc.C)
	})
}

當然,我們也可以使用sync.Mutex來防止多次關閉一個通道。

type MyChannel struct {
	C      chan T
	closed bool
	mutex  sync.Mutex
}

func NewMyChannel() *MyChannel {
	return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
	mc.mutex.Lock()
	defer mc.mutex.Unlock()
	if !mc.closed {
		close(mc.C)
		mc.closed = true
	}
}

func (mc *MyChannel) IsClosed() bool {
	mc.mutex.Lock()
	defer mc.mutex.Unlock()
	return mc.closed
}

這些實現(xiàn)確實比上一節(jié)中的方法禮貌一些,但是它們不能完全有效地避免數(shù)據(jù)競爭。 目前的Go白皮書并不保證發(fā)生在一個通道上的并發(fā)關閉操作和發(fā)送操作不會產生數(shù)據(jù)競爭。 如果一個SafeClose函數(shù)和同一個通道上的發(fā)送操作同時運行,則數(shù)據(jù)競爭可能發(fā)生(雖然這樣的數(shù)據(jù)競爭一般并不會帶來什么危害)。

優(yōu)雅地關閉通道的方法

上一節(jié)中介紹的SafeSend函數(shù)有一個弊端,它的調用不能做為case操作而被使用在select代碼塊中。 另外,很多Go程序員(包括我)認為上面兩節(jié)展示的關閉通道的方法不是很優(yōu)雅。 本節(jié)下面將介紹一些在各種情形下使用純通道操作來關閉通道的方法。

(為了演示程序的完整性,下面這些例子中使用到了sync.WaitGroup。在實踐中,sync.WaitGroup并不是必需的。)

情形一:M個接收者和一個發(fā)送者。發(fā)送者通過關閉用來傳輸數(shù)據(jù)的通道來傳遞發(fā)送結束信號

這是最簡單的一種情形。當發(fā)送者欲結束發(fā)送,讓它關閉用來傳輸數(shù)據(jù)的通道即可。

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)

	// ...
	const Max = 100000
	const NumReceivers = 100

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)

	// ...
	dataCh := make(chan int)

	// 發(fā)送者
	go func() {
		for {
			if value := rand.Intn(Max); value == 0 {
				// 此唯一的發(fā)送者可以安全地關閉此數(shù)據(jù)通道。
				close(dataCh)
				return
			} else {
				dataCh <- value
			}
		}
	}()

	// 接收者
	for i := 0; i < NumReceivers; i++ {
		go func() {
			defer wgReceivers.Done()

			// 接收數(shù)據(jù)直到通道dataCh已關閉
			// 并且dataCh的緩沖隊列已空。
			for value := range dataCh {
				log.Println(value)
			}
		}()
	}

	wgReceivers.Wait()
}

情形二:一個接收者和N個發(fā)送者,此唯一接收者通過關閉一個額外的信號通道來通知發(fā)送者不要再發(fā)送數(shù)據(jù)了

此情形比上一種情形復雜一些。我們不能讓接收者關閉用來傳輸數(shù)據(jù)的通道來停止數(shù)據(jù)傳輸,因為這樣做違反了通道關閉原則。 但是我們可以讓接收者關閉一個額外的信號通道來通知發(fā)送者不要再發(fā)送數(shù)據(jù)了。

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)

	// ...
	const Max = 100000
	const NumSenders = 1000

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(1)

	// ...
	dataCh := make(chan int)
	stopCh := make(chan struct{})
		// stopCh是一個額外的信號通道。它的
		// 發(fā)送者為dataCh數(shù)據(jù)通道的接收者。
		// 它的接收者為dataCh數(shù)據(jù)通道的發(fā)送者。

	// 發(fā)送者
	for i := 0; i < NumSenders; i++ {
		go func() {
			for {
				// 這里的第一個嘗試接收用來讓此發(fā)送者
				// 協(xié)程盡早地退出。對于這個特定的例子,
				// 此select代碼塊并非必需。
				select {
				case <- stopCh:
					return
				default:
				}

				// 即使stopCh已經關閉,此第二個select
				// 代碼塊中的第一個分支仍很有可能在若干個
				// 循環(huán)步內依然不會被選中。如果這是不可接受
				// 的,則上面的第一個select代碼塊是必需的。
				select {
				case <- stopCh:
					return
				case dataCh <- rand.Intn(Max):
				}
			}
		}()
	}

	// 接收者
	go func() {
		defer wgReceivers.Done()

		for value := range dataCh {
			if value == Max-1 {
				// 此唯一的接收者同時也是stopCh通道的
				// 唯一發(fā)送者。盡管它不能安全地關閉dataCh數(shù)
				// 據(jù)通道,但它可以安全地關閉stopCh通道。
				close(stopCh)
				return
			}

			log.Println(value)
		}
	}()

	// ...
	wgReceivers.Wait()
}

如此例中的注釋所述,對于此額外的信號通道stopCh,它只有一個發(fā)送者,即dataCh數(shù)據(jù)通道的唯一接收者。 dataCh數(shù)據(jù)通道的接收者關閉了信號通道stopCh,這是不違反通道關閉原則的。

在此例中,數(shù)據(jù)通道dataCh并沒有被關閉。是的,我們不必關閉它。 當一個通道不再被任何協(xié)程所使用后,它將逐漸被垃圾回收掉,無論它是否已經被關閉。 所以這里的優(yōu)雅性體現(xiàn)在通過不關閉一個通道來停止使用此通道。

情形三:M個接收者和N個發(fā)送者。它們中的任何協(xié)程都可以讓一個中間調解協(xié)程幫忙發(fā)出停止數(shù)據(jù)傳送的信號

這是最復雜的一種情形。我們不能讓接收者和發(fā)送者中的任何一個關閉用來傳輸數(shù)據(jù)的通道,我們也不能讓多個接收者之一關閉一個額外的信號通道。 這兩種做法都違反了通道關閉原則。 然而,我們可以引入一個中間調解者角色并讓其關閉額外的信號通道來通知所有的接收者和發(fā)送者結束工作。 具體實現(xiàn)見下例。注意其中使用了一個嘗試發(fā)送操作來向中間調解者發(fā)送信號。

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
	"strconv"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)

	// ...
	const Max = 100000
	const NumReceivers = 10
	const NumSenders = 1000

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)

	// ...
	dataCh := make(chan int)
	stopCh := make(chan struct{})
		// stopCh是一個額外的信號通道。它的發(fā)送
		// 者為中間調解者。它的接收者為dataCh
		// 數(shù)據(jù)通道的所有的發(fā)送者和接收者。
	toStop := make(chan string, 1)
		// toStop是一個用來通知中間調解者讓其
		// 關閉信號通道stopCh的第二個信號通道。
		// 此第二個信號通道的發(fā)送者為dataCh數(shù)據(jù)
		// 通道的所有的發(fā)送者和接收者,它的接收者
		// 為中間調解者。它必須為一個緩沖通道。

	var stoppedBy string

	// 中間調解者
	go func() {
		stoppedBy = <-toStop
		close(stopCh)
	}()

	// 發(fā)送者
	for i := 0; i < NumSenders; i++ {
		go func(id string) {
			for {
				value := rand.Intn(Max)
				if value == 0 {
					// 為了防止阻塞,這里使用了一個嘗試
					// 發(fā)送操作來向中間調解者發(fā)送信號。
					select {
					case toStop <- "發(fā)送者#" + id:
					default:
					}
					return
				}

				// 此處的嘗試接收操作是為了讓此發(fā)送協(xié)程盡早
				// 退出。標準編譯器對嘗試接收和嘗試發(fā)送做了
				// 特殊的優(yōu)化,因而它們的速度很快。
				select {
				case <- stopCh:
					return
				default:
				}

				// 即使stopCh已關閉,如果這個select代碼塊
				// 中第二個分支的發(fā)送操作是非阻塞的,則第一個
				// 分支仍很有可能在若干個循環(huán)步內依然不會被選
				// 中。如果這是不可接受的,則上面的第一個嘗試
				// 接收操作代碼塊是必需的。
				select {
				case <- stopCh:
					return
				case dataCh <- value:
				}
			}
		}(strconv.Itoa(i))
	}

	// 接收者
	for i := 0; i < NumReceivers; i++ {
		go func(id string) {
			defer wgReceivers.Done()

			for {
				// 和發(fā)送者協(xié)程一樣,此處的嘗試接收操作是為了
				// 讓此接收協(xié)程盡早退出。
				select {
				case <- stopCh:
					return
				default:
				}

				// 即使stopCh已關閉,如果這個select代碼塊
				// 中第二個分支的接收操作是非阻塞的,則第一個
				// 分支仍很有可能在若干個循環(huán)步內依然不會被選
				// 中。如果這是不可接受的,則上面嘗試接收操作
				// 代碼塊是必需的。
				select {
				case <- stopCh:
					return
				case value := <-dataCh:
					if value == Max-1 {
						// 為了防止阻塞,這里使用了一個嘗試
						// 發(fā)送操作來向中間調解者發(fā)送信號。
						select {
						case toStop <- "接收者#" + id:
						default:
						}
						return
					}

					log.Println(value)
				}
			}
		}(strconv.Itoa(i))
	}

	// ...
	wgReceivers.Wait()
	log.Println("被" + stoppedBy + "終止了")
}

在此例中,通道關閉原則依舊得到了遵守。

請注意,信號通道toStop的容量必須至少為1。 如果它的容量為0,則在中間調解者還未準備好的情況下就已經有某個協(xié)程向toStop發(fā)送信號時,此信號將被拋棄。

我們也可以不使用嘗試發(fā)送操作向中間調解者發(fā)送信號,但信號通道toStop的容量必須至少為數(shù)據(jù)發(fā)送者和數(shù)據(jù)接收者的數(shù)量之和,以防止向其發(fā)送數(shù)據(jù)時(有一個極其微小的可能)導致某些發(fā)送者和接收者協(xié)程永久阻塞。

...
toStop := make(chan string, NumReceivers + NumSenders)
...
			value := rand.Intn(Max)
			if value == 0 {
				toStop <- "sender#" + id
				return
			}
...
				if value == Max-1 {
					toStop <- "receiver#" + id
					return
				}
...

情形四:“M個接收者和一個發(fā)送者”情形的一個變種:用來傳輸數(shù)據(jù)的通道的關閉請求由第三方發(fā)出

有時,數(shù)據(jù)通道(dataCh)的關閉請求需要由某個第三方協(xié)程發(fā)出。對于這種情形,我們可以使用一個額外的信號通道來通知唯一的發(fā)送者關閉數(shù)據(jù)通道(dataCh)。

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)

	// ...
	const Max = 100000
	const NumReceivers = 100
	const NumThirdParties = 15

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)

	// ...
	dataCh := make(chan int)
	closing := make(chan struct{}) // 信號通道
	closed := make(chan struct{})
	
	// 此stop函數(shù)可以被安全地多次調用。
	stop := func() {
		select {
		case closing<-struct{}{}:
			<-closed
		case <-closed:
		}
	}
	
	// 一些第三方協(xié)程
	for i := 0; i < NumThirdParties; i++ {
		go func() {
			r := 1 + rand.Intn(3)
			time.Sleep(time.Duration(r) * time.Second)
			stop()
		}()
	}

	// 發(fā)送者
	go func() {
		defer func() {
			close(closed)
			close(dataCh)
		}()

		for {
			select{
			case <-closing: return
			default:
			}

			select{
			case <-closing: return
			case dataCh <- rand.Intn(Max):
			}
		}
	}()

	// 接收者
	for i := 0; i < NumReceivers; i++ {
		go func() {
			defer wgReceivers.Done()

			for value := range dataCh {
				log.Println(value)
			}
		}()
	}

	wgReceivers.Wait()
}

上述代碼中的stop函數(shù)中使用的技巧偷自Roger Peppe在此貼中的一個留言。

情形五:“N個發(fā)送者”的一個變種:用來傳輸數(shù)據(jù)的通道必須被關閉以通知各個接收者數(shù)據(jù)發(fā)送已經結束了

在上面的提到的“N個發(fā)送者”情形中,為了遵守通道關閉原則,我們避免了關閉數(shù)據(jù)通道(dataCh)。 但是有時候,數(shù)據(jù)通道(dataCh)必須被關閉以通知各個接收者數(shù)據(jù)發(fā)送已經結束。 對于這種“N個發(fā)送者”情形,我們可以使用一個中間通道將它們轉化為“一個發(fā)送者”情形,然后繼續(xù)使用上一節(jié)介紹的技巧來關閉此中間通道,從而避免了關閉原始的dataCh數(shù)據(jù)通道。

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
	"strconv"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)

	// ...
	const Max = 1000000
	const NumReceivers = 10
	const NumSenders = 1000
	const NumThirdParties = 15

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)

	// ...
	dataCh := make(chan int)   // 將被關閉
	middleCh := make(chan int) // 不會被關閉
	closing := make(chan string)
	closed := make(chan struct{})

	var stoppedBy string

	stop := func(by string) {
		select {
		case closing <- by:
			<-closed
		case <-closed:
		}
	}
	
	// 中間層
	go func() {
		exit := func(v int, needSend bool) {
			close(closed)
			if needSend {
				dataCh <- v
			}
			close(dataCh)
		}

		for {
			select {
			case stoppedBy = <-closing:
				exit(0, false)
				return
			case v := <- middleCh:
				select {
				case stoppedBy = <-closing:
					exit(v, true)
					return
				case dataCh <- v:
				}
			}
		}
	}()
	
	// 一些第三方協(xié)程
	for i := 0; i < NumThirdParties; i++ {
		go func(id string) {
			r := 1 + rand.Intn(3)
			time.Sleep(time.Duration(r) * time.Second)
			stop("3rd-party#" + id)
		}(strconv.Itoa(i))
	}

	// 發(fā)送者
	for i := 0; i < NumSenders; i++ {
		go func(id string) {
			for {
				value := rand.Intn(Max)
				if value == 0 {
					stop("sender#" + id)
					return
				}

				select {
				case <- closed:
					return
				default:
				}

				select {
				case <- closed:
					return
				case middleCh <- value:
				}
			}
		}(strconv.Itoa(i))
	}

	// 接收者
	for range [NumReceivers]struct{}{} {
		go func() {
			defer wgReceivers.Done()

			for value := range dataCh {
				log.Println(value)
			}
		}()
	}

	// ...
	wgReceivers.Wait()
	log.Println("stopped by", stoppedBy)
}

更多情形?

在日常編程中可能會遇到更多的變種情形,但是上面介紹的情形是最常見和最基本的。 通過聰明地使用通道(和其它并發(fā)同步技術),我相信,對于各種變種,我們總會找到相應的遵守通道關閉原則的解決方法。

結論

并沒有什么情況非得逼得我們違反通道關閉原則。 如果你遇到了此情形,請考慮修改你的代碼流程和結構設計。

使用通道編程宛如在藝術創(chuàng)作一般!


以上內容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號