标签:
有些数据推送需要用观察者模式(也称作订阅者模式),看看docker是如何用golang实现这个的
//过一遍数据结构
type Events struct {
mu sync.Mutex //锁
events []*jsonmessage.JSONMessage //数据
pub *pubsub.Publisher //发布者
}
type Publisher struct {
m sync.RWMutex
buffer int //缓冲
timeout time.Duration //超时
subscribers map[subscriber]struct{} //订阅者map结构,方便以后遍历map
}
作为一个订阅者,订阅一次
func (e *Events) Subscribe() ([]*jsonmessage.JSONMessage, chan interface{}) {
e.mu.Lock()
current := make([]*jsonmessage.JSONMessage, len(e.events)) //e.events发布时候都得通知订阅者
copy(current, e.events)
l := e.pub.Subscribe() //订阅,记录到Publisher的subscribers里面去
e.mu.Unlock()
return current, l
}
func (p *Publisher) Subscribe() chan interface{} {
ch := make(chan interface{}, p.buffer)
p.m.Lock()
p.subscribers[ch] = struct{}{} //记录
p.m.Unlock()
return ch //返回一个管道
}
产生一次publisher,当然得广播给所有的订阅人
func (e *Events) Log(action, id, from string) {
go func() {
e.mu.Lock()
jm := &jsonmessage.JSONMessage{Status: action, ID: id, From: from, Time: time.Now().UTC().Unix()}
if len(e.events) == cap(e.events) {
// 抛弃最老的一条记录,因为大小超过分配的咯
copy(e.events, e.events[1:])
e.events[len(e.events)-1] = jm
} else {
e.events = append(e.events, jm)
}
e.mu.Unlock()
e.pub.Publish(jm) //发布广播
}()
}
最后再看看广播的代码
func (p *Publisher) Publish(v interface{}) {
p.m.RLock()
for sub := range p.subscribers {
// send under a select as to not block if the receiver is unavailable
//如果设置超时,默认设置是100*time.Millisecond,则发送数据,如果发送不了则阻塞直到收到超时信息
if p.timeout > 0 {
select {
case sub <- v:
case <-time.After(p.timeout):
}
continue
}
//不设置超时,没有发送到管道数据成功,则默认继续执行
select {
case sub <- v:
default:
}
}
p.m.RUnlock()
}
看看我们哪里取了这些数据:
//............省略
for {
select {
case ev := <-l: //取到订阅者的发往管道的数据
jev, ok := ev.(*jsonmessage.JSONMessage)
if !ok {
continue
}
if err := sendEvent(jev); err != nil {
return err
}
case <-timer.C://取到超时信息
return nil
case <-closeNotify://取到关闭通知信息
logrus.Debug("Client disconnected, stop sending events")
return nil
}
}
docker的观察者模式Subscribe/Publisher实现
标签:
原文地址:http://my.oschina.net/yang1992/blog/507774