标签:features flat return otto loop war isp prot rac
package nsqdimport ("bufio""compress/flate""crypto/tls""fmt""net""sync""sync/atomic""time""github.com/mreiferson/go-snappystream""github.com/nsqio/nsq/internal/auth")
const defaultBufferSize = 16 * 1024const (stateInit = iotastateDisconnected
stateConnected
stateSubscribed
stateClosing
)
type identifyDataV2 struct {ShortID string `json:"short_id"` // TODO: deprecated, remove in 1.0LongID string `json:"long_id"` // TODO: deprecated, remove in 1.0ClientID string `json:"client_id"`Hostname string `json:"hostname"`HeartbeatInterval int `json:"heartbeat_interval"`OutputBufferSize int `json:"output_buffer_size"`OutputBufferTimeout int `json:"output_buffer_timeout"`FeatureNegotiation bool `json:"feature_negotiation"`TLSv1 bool `json:"tls_v1"`Deflate bool `json:"deflate"`DeflateLevel int `json:"deflate_level"`Snappy bool `json:"snappy"`SampleRate int32 `json:"sample_rate"`UserAgent string `json:"user_agent"`MsgTimeout int `json:"msg_timeout"`}
type identifyEvent struct {OutputBufferTimeout time.DurationHeartbeatInterval time.DurationSampleRate int32MsgTimeout time.Duration}
type clientV2 struct {// 64bit atomic vars need to be first for proper alignment on 32bit platformsReadyCount int64InFlightCount int64MessageCount uint64FinishCount uint64RequeueCount uint64writeLock sync.RWMutexmetaLock sync.RWMutexID int64ctx *contextUserAgent string// original connectionnet.Conn// connections based on negotiated featurestlsConn *tls.ConnflateWriter *flate.Writer// reading/writing interfacesReader *bufio.ReaderWriter *bufio.WriterOutputBufferSize intOutputBufferTimeout time.DurationHeartbeatInterval time.DurationMsgTimeout time.DurationState int32ConnectTime time.TimeChannel *ChannelReadyStateChan chan intExitChan chan intClientID stringHostname stringSampleRate int32IdentifyEventChan chan identifyEventSubEventChan chan *ChannelTLS int32Snappy int32Deflate int32// re-usable buffer for reading the 4-byte lengths off the wirelenBuf [4]bytelenSlice []byteAuthSecret stringAuthState *auth.State}
func newClientV2(id int64, conn net.Conn, ctx *context) *clientV2 {var identifier stringif conn != nil {identifier, _, _ = net.SplitHostPort(conn.RemoteAddr().String())}c := &clientV2{ID: id,ctx: ctx,Conn: conn,Reader: bufio.NewReaderSize(conn, defaultBufferSize),Writer: bufio.NewWriterSize(conn, defaultBufferSize),OutputBufferSize: defaultBufferSize,OutputBufferTimeout: 250 * time.Millisecond,MsgTimeout: ctx.nsqd.getOpts().MsgTimeout,// ReadyStateChan has a buffer of 1 to guarantee that in the event// there is a race the state update is not lostReadyStateChan: make(chan int, 1),ExitChan: make(chan int),ConnectTime: time.Now(),State: stateInit,ClientID: identifier,Hostname: identifier,SubEventChan: make(chan *Channel, 1),IdentifyEventChan: make(chan identifyEvent, 1),// heartbeats are client configurable but default to 30sHeartbeatInterval: ctx.nsqd.getOpts().ClientTimeout / 2,}c.lenSlice = c.lenBuf[:]return c}
func (c *clientV2) String() string {return c.RemoteAddr().String()}
func (c *clientV2) Identify(data identifyDataV2) error {c.ctx.nsqd.logf("[%s] IDENTIFY: %+v", c, data)// TODO: for backwards compatibility, remove in 1.0hostname := data.Hostnameif hostname == "" {hostname = data.LongID}// TODO: for backwards compatibility, remove in 1.0clientID := data.ClientIDif clientID == "" {clientID = data.ShortID}c.metaLock.Lock()c.ClientID = clientIDc.Hostname = hostnamec.UserAgent = data.UserAgentc.metaLock.Unlock()err := c.SetHeartbeatInterval(data.HeartbeatInterval)if err != nil {return err}err = c.SetOutputBufferSize(data.OutputBufferSize)if err != nil {return err}err = c.SetOutputBufferTimeout(data.OutputBufferTimeout)if err != nil {return err}err = c.SetSampleRate(data.SampleRate)if err != nil {return err}err = c.SetMsgTimeout(data.MsgTimeout)if err != nil {return err}ie := identifyEvent{OutputBufferTimeout: c.OutputBufferTimeout,HeartbeatInterval: c.HeartbeatInterval,SampleRate: c.SampleRate,MsgTimeout: c.MsgTimeout,}// update the client‘s message pumpselect {case c.IdentifyEventChan <- ie:default:}return nil}
func (c *clientV2) Stats() ClientStats {c.metaLock.RLock()// TODO: deprecated, remove in 1.0name := c.ClientIDclientID := c.ClientIDhostname := c.HostnameuserAgent := c.UserAgentvar identity stringvar identityURL stringif c.AuthState != nil {identity = c.AuthState.IdentityidentityURL = c.AuthState.IdentityURL}c.metaLock.RUnlock()stats := ClientStats{// TODO: deprecated, remove in 1.0Name: name,Version: "V2",RemoteAddress: c.RemoteAddr().String(),ClientID: clientID,Hostname: hostname,UserAgent: userAgent,State: atomic.LoadInt32(&c.State),ReadyCount: atomic.LoadInt64(&c.ReadyCount),InFlightCount: atomic.LoadInt64(&c.InFlightCount),MessageCount: atomic.LoadUint64(&c.MessageCount),FinishCount: atomic.LoadUint64(&c.FinishCount),RequeueCount: atomic.LoadUint64(&c.RequeueCount),ConnectTime: c.ConnectTime.Unix(),SampleRate: atomic.LoadInt32(&c.SampleRate),TLS: atomic.LoadInt32(&c.TLS) == 1,Deflate: atomic.LoadInt32(&c.Deflate) == 1,Snappy: atomic.LoadInt32(&c.Snappy) == 1,Authed: c.HasAuthorizations(),AuthIdentity: identity,AuthIdentityURL: identityURL,}if stats.TLS {p := prettyConnectionState{c.tlsConn.ConnectionState()}stats.CipherSuite = p.GetCipherSuite()stats.TLSVersion = p.GetVersion()stats.TLSNegotiatedProtocol = p.NegotiatedProtocolstats.TLSNegotiatedProtocolIsMutual = p.NegotiatedProtocolIsMutual}return stats}
// struct to convert from integers to the human readable stringstype prettyConnectionState struct {tls.ConnectionState}
func (p *prettyConnectionState) GetCipherSuite() string {switch p.CipherSuite {case tls.TLS_RSA_WITH_RC4_128_SHA:return "TLS_RSA_WITH_RC4_128_SHA"case tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA:return "TLS_RSA_WITH_3DES_EDE_CBC_SHA"case tls.TLS_RSA_WITH_AES_128_CBC_SHA:return "TLS_RSA_WITH_AES_128_CBC_SHA"case tls.TLS_RSA_WITH_AES_256_CBC_SHA:return "TLS_RSA_WITH_AES_256_CBC_SHA"case tls.TLS_ECDHE_ECDSA_WITH_RC4_128_SHA:return "TLS_ECDHE_ECDSA_WITH_RC4_128_SHA"case tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA:return "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA"case tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA:return "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA"case tls.TLS_ECDHE_RSA_WITH_RC4_128_SHA:return "TLS_ECDHE_RSA_WITH_RC4_128_SHA"case tls.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA:return "TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA"case tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA:return "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA"case tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA:return "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA"case tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256:return "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"case tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256:return "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256"}return fmt.Sprintf("Unknown %d", p.CipherSuite)}
func (p *prettyConnectionState) GetVersion() string {switch p.Version {case tls.VersionSSL30:return "SSL30"case tls.VersionTLS10:return "TLS1.0"case tls.VersionTLS11:return "TLS1.1"case tls.VersionTLS12:return "TLS1.2"default:return fmt.Sprintf("Unknown %d", p.Version)}}
func (c *clientV2) IsReadyForMessages() bool {if c.Channel.IsPaused() {return false}readyCount := atomic.LoadInt64(&c.ReadyCount)inFlightCount := atomic.LoadInt64(&c.InFlightCount)if c.ctx.nsqd.getOpts().Verbose {c.ctx.nsqd.logf("[%s] state rdy: %4d inflt: %4d",c, readyCount, inFlightCount)}if inFlightCount >= readyCount || readyCount <= 0 {return false}return true}
func (c *clientV2) SetReadyCount(count int64) {atomic.StoreInt64(&c.ReadyCount, count)c.tryUpdateReadyState()}
func (c *clientV2) tryUpdateReadyState() {// you can always *try* to write to ReadyStateChan because in the cases// where you cannot the message pump loop would have iterated anyway.// the atomic integer operations guarantee correctness of the value.select {case c.ReadyStateChan <- 1:default:}}
func (c *clientV2) FinishedMessage() {atomic.AddUint64(&c.FinishCount, 1)atomic.AddInt64(&c.InFlightCount, -1)c.tryUpdateReadyState()}
func (c *clientV2) Empty() {atomic.StoreInt64(&c.InFlightCount, 0)c.tryUpdateReadyState()}
func (c *clientV2) SendingMessage() {atomic.AddInt64(&c.InFlightCount, 1)atomic.AddUint64(&c.MessageCount, 1)}
func (c *clientV2) TimedOutMessage() {atomic.AddInt64(&c.InFlightCount, -1)c.tryUpdateReadyState()}
func (c *clientV2) RequeuedMessage() {atomic.AddUint64(&c.RequeueCount, 1)atomic.AddInt64(&c.InFlightCount, -1)c.tryUpdateReadyState()}
func (c *clientV2) StartClose() {// Force the client into ready 0c.SetReadyCount(0)// mark this client as closingatomic.StoreInt32(&c.State, stateClosing)}
func (c *clientV2) Pause() {c.tryUpdateReadyState()}
func (c *clientV2) UnPause() {c.tryUpdateReadyState()}
func (c *clientV2) SetHeartbeatInterval(desiredInterval int) error {c.writeLock.Lock()defer c.writeLock.Unlock()switch {case desiredInterval == -1:c.HeartbeatInterval = 0case desiredInterval == 0:// do nothing (use default)case desiredInterval >= 1000 &&desiredInterval <= int(c.ctx.nsqd.getOpts().MaxHeartbeatInterval/time.Millisecond):c.HeartbeatInterval = time.Duration(desiredInterval) * time.Milliseconddefault:return fmt.Errorf("heartbeat interval (%d) is invalid", desiredInterval)}return nil}
func (c *clientV2) SetOutputBufferSize(desiredSize int) error {var size intswitch {case desiredSize == -1:// effectively no buffer (every write will go directly to the wrapped net.Conn)size = 1case desiredSize == 0:// do nothing (use default)case desiredSize >= 64 && desiredSize <= int(c.ctx.nsqd.getOpts().MaxOutputBufferSize):size = desiredSizedefault:return fmt.Errorf("output buffer size (%d) is invalid", desiredSize)}if size > 0 {c.writeLock.Lock()defer c.writeLock.Unlock()c.OutputBufferSize = sizeerr := c.Writer.Flush()if err != nil {return err}c.Writer = bufio.NewWriterSize(c.Conn, size)}return nil}
func (c *clientV2) SetOutputBufferTimeout(desiredTimeout int) error {c.writeLock.Lock()defer c.writeLock.Unlock()switch {case desiredTimeout == -1:c.OutputBufferTimeout = 0case desiredTimeout == 0:// do nothing (use default)case desiredTimeout >= 1 &&desiredTimeout <= int(c.ctx.nsqd.getOpts().MaxOutputBufferTimeout/time.Millisecond):c.OutputBufferTimeout = time.Duration(desiredTimeout) * time.Milliseconddefault:return fmt.Errorf("output buffer timeout (%d) is invalid", desiredTimeout)}return nil}
func (c *clientV2) SetSampleRate(sampleRate int32) error {if sampleRate < 0 || sampleRate > 99 {return fmt.Errorf("sample rate (%d) is invalid", sampleRate)}atomic.StoreInt32(&c.SampleRate, sampleRate)return nil}
func (c *clientV2) SetMsgTimeout(msgTimeout int) error {c.writeLock.Lock()defer c.writeLock.Unlock()switch {case msgTimeout == 0:// do nothing (use default)case msgTimeout >= 1000 &&msgTimeout <= int(c.ctx.nsqd.getOpts().MaxMsgTimeout/time.Millisecond):c.MsgTimeout = time.Duration(msgTimeout) * time.Milliseconddefault:return fmt.Errorf("msg timeout (%d) is invalid", msgTimeout)}return nil}
func (c *clientV2) UpgradeTLS() error {c.writeLock.Lock()defer c.writeLock.Unlock()tlsConn := tls.Server(c.Conn, c.ctx.nsqd.tlsConfig)tlsConn.SetDeadline(time.Now().Add(5 * time.Second))err := tlsConn.Handshake()if err != nil {return err}c.tlsConn = tlsConnc.Reader = bufio.NewReaderSize(c.tlsConn, defaultBufferSize)c.Writer = bufio.NewWriterSize(c.tlsConn, c.OutputBufferSize)atomic.StoreInt32(&c.TLS, 1)return nil}
func (c *clientV2) UpgradeDeflate(level int) error {c.writeLock.Lock()defer c.writeLock.Unlock()conn := c.Connif c.tlsConn != nil {conn = c.tlsConn}c.Reader = bufio.NewReaderSize(flate.NewReader(conn), defaultBufferSize)fw, _ := flate.NewWriter(conn, level)c.flateWriter = fwc.Writer = bufio.NewWriterSize(fw, c.OutputBufferSize)atomic.StoreInt32(&c.Deflate, 1)return nil}
func (c *clientV2) UpgradeSnappy() error {c.writeLock.Lock()defer c.writeLock.Unlock()conn := c.Connif c.tlsConn != nil {conn = c.tlsConn}c.Reader = bufio.NewReaderSize(snappystream.NewReader(conn, snappystream.SkipVerifyChecksum), defaultBufferSize)c.Writer = bufio.NewWriterSize(snappystream.NewWriter(conn), c.OutputBufferSize)atomic.StoreInt32(&c.Snappy, 1)return nil}
func (c *clientV2) Flush() error {var zeroTime time.Timeif c.HeartbeatInterval > 0 {c.SetWriteDeadline(time.Now().Add(c.HeartbeatInterval))} else {c.SetWriteDeadline(zeroTime)}err := c.Writer.Flush()if err != nil {return err}if c.flateWriter != nil {return c.flateWriter.Flush()}return nil}
func (c *clientV2) QueryAuthd() error {remoteIP, _, err := net.SplitHostPort(c.String())if err != nil {return err}tls := atomic.LoadInt32(&c.TLS) == 1tlsEnabled := "false"if tls {tlsEnabled = "true"}authState, err := auth.QueryAnyAuthd(c.ctx.nsqd.getOpts().AuthHTTPAddresses,remoteIP, tlsEnabled, c.AuthSecret, c.ctx.nsqd.getOpts().HTTPClientConnectTimeout,c.ctx.nsqd.getOpts().HTTPClientRequestTimeout)if err != nil {return err}c.AuthState = authStatereturn nil}
func (c *clientV2) Auth(secret string) error {c.AuthSecret = secretreturn c.QueryAuthd()}
func (c *clientV2) IsAuthorized(topic, channel string) (bool, error) {if c.AuthState == nil {return false, nil}if c.AuthState.IsExpired() {err := c.QueryAuthd()if err != nil {return false, err}}if c.AuthState.IsAllowed(topic, channel) {return true, nil}return false, nil}
func (c *clientV2) HasAuthorizations() bool {if c.AuthState != nil {return len(c.AuthState.Authorizations) != 0}return false}
标签:features flat return otto loop war isp prot rac
原文地址:http://www.cnblogs.com/zhangboyu/p/7457261.html