码迷,mamicode.com
首页 > 其他好文 > 详细

message.go

时间:2017-08-31 13:00:05      阅读:209      评论:0      收藏:0      [点我收藏+]

标签:iter   byte   buffer   turn   and   ota   eset   body   big   

package nsqd

import (
    "bytes"
    "encoding/binary"
    "fmt"
    "io"
    "time"
)

const (
    MsgIDLength       = 16
    minValidMsgLength = MsgIDLength + 8 + 2 // Timestamp + Attempts
)

type MessageID [MsgIDLength]byte

type Message struct {
    ID        MessageID
    Body      []byte
    Timestamp int64
    Attempts  uint16

    // for in-flight handling
    deliveryTS time.Time
    clientID   int64
    pri        int64
    index      int
    deferred   time.Duration
}

func NewMessage(id MessageID, body []byte) *Message {
    return &Message{
        ID:        id,
        Body:      body,
        Timestamp: time.Now().UnixNano(),
    }
}

func (m *Message) WriteTo(w io.Writer) (int64, error) {
    var buf [10]byte
    var total int64

    binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp))
    binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts))

    n, err := w.Write(buf[:])
    total += int64(n)
    if err != nil {
        return total, err
    }

    n, err = w.Write(m.ID[:])
    total += int64(n)
    if err != nil {
        return total, err
    }

    n, err = w.Write(m.Body)
    total += int64(n)
    if err != nil {
        return total, err
    }

    return total, nil
}

// decodeMessage deserializes data (as []byte) and creates a new Message
// message format:
// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
// |       (int64)        ||    ||      (hex string encoded in ASCII)           || (binary)
// |       8-byte         ||    ||                 16-byte                      || N-byte
// ------------------------------------------------------------------------------------------...
//   nanosecond timestamp    ^^                   message ID                       message body
//                        (uint16)
//                         2-byte
//                        attempts
func decodeMessage(b []byte) (*Message, error) {
    var msg Message

    if len(b) < minValidMsgLength {
        return nil, fmt.Errorf("invalid message buffer size (%d)", len(b))
    }

    msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8]))
    msg.Attempts = binary.BigEndian.Uint16(b[8:10])
    copy(msg.ID[:], b[10:10+MsgIDLength])
    msg.Body = b[10+MsgIDLength:]

    return &msg, nil
}

func writeMessageToBackend(buf *bytes.Buffer, msg *Message, bq BackendQueue) error {
    buf.Reset()
    _, err := msg.WriteTo(buf)
    if err != nil {
        return err
    }
    return bq.Put(buf.Bytes())
}

message.go

标签:iter   byte   buffer   turn   and   ota   eset   body   big   

原文地址:http://www.cnblogs.com/zhangboyu/p/7457343.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!