标签:dead WaitGroup 在家 发送消息 gre 计算 等价 无线 消息
无缓存并不等价于缓存为1
func main(){
ch := make(chan int)
ch <- 1
}
这句话会报错,当向无缓存的chan放数据时,如果一直没有接收者,那么它会一直堵塞,直到有接收者。
无缓冲的 就是一个送信人去你家门口送信,你不在家他不走,你一定要接下信,他才会走。无缓冲保证信能到你手上有缓冲的 就是一个送信人去你家仍到你家的信箱转身就走 ,除非你的信箱满了 他必须等信箱空下来。有缓冲的 保证 信能进你家的邮箱
参考,这篇文章已经总结的很好
在使用Go channel的时候,一个适用的原则是不要从接收端关闭channel,也不要关闭有多个并发发送者的channel。换句话说,如果sender(发送者)只是唯一的sender或者是channel最后一个活跃的sender,那么你应该在sender的goroutine关闭channel,从而通知receiver(s)(接收者们)已经没有值可以读了。维持这条原则将保证永远不会发生向一个已经关闭的channel发送值或者关闭一个已经关闭的channel。
sender通过关闭data channel说“不再发送”,这是最简单的场景了,就只是当sender不想再发送的时候让sender关闭data 来关闭channel:
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const MaxRandomNumber = 100000
const NumReceivers = 100
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int, 100)
// the sender
go func() {
for {
if value := rand.Intn(MaxRandomNumber); value == 0 {
// the only sender can close the channel safely.
close(dataCh)
return
} else {
dataCh <- value
}
}
}()
// receivers
for i := 0; i < NumReceivers; i++ {
go func() {
defer wgReceivers.Done()
// receive values until dataCh is closed and
// the value buffer queue of dataCh is empty.
for value := range dataCh {
log.Println(value)
}
}()
}
wgReceivers.Wait()
}
这个例子好理解,sender发送消息,并当符合条件时关闭dataCh。reciver中,用for range接收消息直到dataCh关闭,每个协程输出完消息后用WaitGroup确认下Done。
注意这里reciver循环时,dataCh没有关闭,是边接收后来关闭了。sender有个return因为他是无限循环,然而reciver没有return因为它可以自己退出。 <-Ch可以不关闭,里面有数据时。
receiver通过关闭一个额外的signal channel说“请停止发送”这种场景比上一个要复杂一点。我们不能让receiver关闭data channel,因为这么做将会打破channel closing principle。但是我们可以让receiver关闭一个额外的signal channel来通知sender停止发送值:
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const MaxRandomNumber = 100000
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(1)
// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the receiver of channel dataCh.
// Its reveivers are the senders of channel dataCh.
// senders
for i := 0; i < NumSenders; i++ {
go func() {
for {
value := rand.Intn(MaxRandomNumber)
select {
case <- stopCh:
return
case dataCh <- value:
}
}
}()
}
// the receiver
go func() {
defer wgReceivers.Done()
for value := range dataCh {
if value == MaxRandomNumber-1 {
// the receiver of the dataCh channel is
// also the sender of the stopCh cahnnel.
// It is safe to close the stop channel here.
close(stopCh)
return
}
log.Println(value)
}
}()
// ...
wgReceivers.Wait()
}
注意这个例子没有关闭dataCh,sender里的无线循环时刻盯着stopCh,一旦它能返回来值就退出。reciver里,满足退出条件后,关闭stopCh,因为只有关闭了channel在(sender)无限循环里,stopCh才能返回值,正常情况下一个Channel没有关闭,无限循环会返回deadlock。
还要注意这里的reciver,关闭stopCh后直接return了,这样就还没有取到dataCh最后的值,也就不用关闭dataCh,但我觉得一般情况下需要在sender里面关闭。
还需注意,这两个例子的WaitGroup的Done都放在reciver里面,因为wg是为了保证程序运行结束,结束只有当reciver接收完才结束,而不是sender发送完结束,所以放在reciver里面判断waitgroup是否Done一个。
正如注释说的,对于额外的signal channel来说,它的sender是data channel的receiver。这个额外的signal channel被它唯一的sender关闭,遵守了channel closing principle。
判断sender的标准就是能否有权利关闭channel
它们当中任意一个通过通知一个moderator(仲裁者)关闭额外的signal channel来说“让我们结束游戏吧”,这是最复杂的场景了。
我们不能让任意的receivers和senders关闭data channel,也不能让任何一个receivers通过关闭一个额外的signal channel来通知所有的senders和receivers退出游戏。这么做的话会打破channel closing principle。但是,我们可以引入一个moderator来关闭一个额外的signal channel。这个例子的一个技巧是怎么通知moderator去关闭额外的signal channel:
package main
import (
"time"
"math/rand"
"sync"
"log"
"strconv"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const MaxRandomNumber = 100000
const NumReceivers = 10
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the moderator goroutine shown below.
// Its reveivers are all senders and receivers of dataCh.
toStop := make(chan string, 1)
// the channel toStop is used to notify the moderator
// to close the additional signal channel (stopCh).
// Its senders are any senders and receivers of dataCh.
// Its reveiver is the moderator goroutine shown below.
var stoppedBy string
// moderator
go func() {
stoppedBy = <- toStop // part of the trick used to notify the moderator
// to close the additional signal channel.
close(stopCh)
}()
// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(MaxRandomNumber)
if value == 0 {
// here, a trick is used to notify the moderator
// to close the additional signal channel.
select {
case toStop <- "sender#" + id:
default:
}
return
}
// the first select here is to try to exit the
// goroutine as early as possible.
select {
case <- stopCh:
return
default:
}
select {
case <- stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// receivers
for i := 0; i < NumReceivers; i++ {
go func(id string) {
defer wgReceivers.Done()
for {
// same as senders, the first select here is to
// try to exit the goroutine as early as possible.
select {
case <- stopCh:
return
default:
}
select {
case <- stopCh:
return
case value := <-dataCh:
if value == MaxRandomNumber-1 {
// the same trick is used to notify the moderator
// to close the additional signal channel.
select {
case toStop <- "receiver#" + id:
default:
}
return
}
log.Println(value)
}
}
}(strconv.Itoa(i))
}
// ...
wgReceivers.Wait()
log.Println("stopped by", stoppedBy)
}
在这个例子中,仍然遵守着channel closing principle。
请注意channel toStop的缓冲大小是1.这是为了避免当mederator goroutine 准备好之前第一个通知就已经发送了,导致丢失。
if value == 0 {
// here, a trick is used to notify the moderator
// to close the additional signal channel.
select {
case toStop <- "sender#" + id:
default:
}
return
}
这里用select的目的是:
toStop的buffer只有1,如果多个同时发送给toStop的话,会导致阻塞在 toStop <- id,所以使用了select,这样子当不能发送的时候就知道已经有其他goroutine发送了信号了。其实也可以将toStop的buffer大小改成接收者和发送者数量之和,这样子就可以直接发送了。
// the first select here is to try to exit the
// goroutine as early as possible.
select {
case <- stopCh:
return
default:
}
select {
case <- stopCh:
return
case dataCh <- value:
}
这里写两次的目的:
为了提前知道channel是否已经关闭了,如果省略了这个select,有可能计算关闭了channel,也会执行发送操作,因为在一个select里面,是随机选择一个能执行的case来执行的
不是所有channel都需要关闭的, 因为它完全遵循GC回收规则. 但是如果用channel来通知其他协程停止工作的话, 就需要用到关闭了. 典型的例子就是其他协程使用for xxx := range channel 这样的语句时, 如果不关闭channel的话, 这些代码会一直堵住
标签:dead WaitGroup 在家 发送消息 gre 计算 等价 无线 消息
原文地址:https://www.cnblogs.com/ivan-blog/p/12430793.html