https://weibo.com/p/1001603839863400100733
// node is the canonical implementation of the Node interface
type node struct {
propc chan pb.Message --> APP传递需要raft进行一致性的日志封装数据
recvc chan pb.Message --> APP传递接收到的其它peer消息,raft提供了封装接口: func (n *node) Step(ctx context.Context, m pb.Message) error
/*
应用从Ready中拿出CommittedEntries,检查其如果含有成员变更类型的日志,则需要调用封装接口: func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState
这个函数会向 confc 传输ConfChange,confc同样是个无buffer的channel,node.run() 内部会从confc中拿出ConfChange,然后进行真正的增减peers操作,
之后将最新的成员组放入到confstatec通道中,而 ApplyConfChange() 函数从confstatec 读取最新的成员组返回给应用。
*/
confc chan pb.ConfChange
confstatec chan pb.ConfState
/*
node.run()内部把相关的一些状态更新打包成Ready结构体放入readyc中。应用从readyc中pop出Ready中,对相应的状态进行处理.
处理完成后,应用调用 rc.node.Advance()
往advancec中push一个空结构体告诉raft,已经对这批Ready包含的状态进行了相应的处理。
node.run()内部从advancec中得到通知后,对内部一些状态进行处理,比如把已经持久化到storage中的entries从内存(对应type unstable struct)中删除等。
*/
readyc chan Ready
advancec chan struct{}
tickc chan struct{} --> 应用定期往tickc中push空结构体,定期触发 raft 机制运行(提供定时器机制)
done chan struct{}
stop chan struct{}
status chan chan Status
logger Logger
}
raft首先需要创建Node,可以用两种方法创建:
StartNode(): 从头启动一个节点
RestartNode():从一些初始态启动一个节点
StartNode:
storage := raft.NewMemoryStorage() c := &Config{ ID: 0x01, ElectionTick: 10, HeartbeatTick: 1, Storage: storage, MaxSizePerMsg: 4096, MaxInflightMsgs: 256, } n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})
RestartNode:
storage := raft.NewMemoryStorage() // recover the in-memory storage from persistent // snapshot, state and entries. storage.ApplySnapshot(snapshot) storage.SetHardState(state) storage.Append(entries) c := &Config{ ID: 0x01, ElectionTick: 10, HeartbeatTick: 1, Storage: storage, MaxSizePerMsg: 4096, MaxInflightMsgs: 256, } // restart raft without peer information. // peer information is already included in the storage. n := raft.RestartNode(c)
func (rn *RawNode) Ready() Ready :
type Ready struct { // The current volatile state of a Node. // SoftState will be nil if there is no update. // It is not required to consume or store SoftState. *SoftState // The current state of a Node to be saved to stable storage BEFORE // Messages are sent. // HardState will be equal to empty state if there is no update. pb.HardState // ReadStates can be used for node to serve linearizable read requests locally // when its applied index is greater than the index in ReadState. // Note that the readState will be returned when raft receives msgReadIndex. // The returned is only valid for the request that requested to read. ReadStates []ReadState // Entries specifies entries to be saved to stable storage BEFORE // Messages are sent. Entries []pb.Entry // Snapshot specifies the snapshot to be saved to stable storage. Snapshot pb.Snapshot // CommittedEntries specifies entries to be committed to a // store/state-machine. These have previously been committed to stable // store. CommittedEntries []pb.Entry // Messages specifies outbound messages to be sent AFTER Entries are // committed to stable storage. // If it contains a MsgSnap message, the application MUST report back to raft // when the snapshot has been received or has failed by calling ReportSnapshot. Messages []pb.Message // MustSync indicates whether the HardState and Entries must be synchronously // written to disk or if an asynchronous write is permissible. MustSync bool }
A. RUN() 循环调用 Ready(), 并根据返回操作:
1)将 HardState, Entries, Snapshot 写入持久化存储。如果写入的entry 的idx 为i,则持久化存储中所有 index>=i 的entry都要删除掉
2)发送 Message 信息到peers节点。如果有Message为MsgSnap类型,则在发送后,需要调用 ReportSnapshot()
3)将 Snapshot (if any) 和 CommittedEntries 作用到应用中。如果有 commitedEntry 类型是 EntryConfChange, 则调用 Node.ApplyConfChange() 将其应用到Node上
4)调用 Node.Advance()通知下一个状态
B. 然后,all persisted log entries must be made available via an implementation of the Storage interface. The provided MemoryStorage type can be used for this
C. 当收到其他节点的消息后,调用 Node.Step()
D. 定时调用 Node.Tick()
整体代码简化如下:
for { select { case <-s.Ticker: n.Tick() case rd := <-s.Node.Ready(): saveToStorage(rd.State, rd.Entries, rd.Snapshot) send(rd.Messages) if !raft.IsEmptySnap(rd.Snapshot) { processSnapshot(rd.Snapshot) } for _, entry := range rd.CommittedEntries { process(entry) if entry.Type == raftpb.EntryConfChange { var cc raftpb.ConfChange cc.Unmarshal(entry.Data) s.Node.ApplyConfChange(cc) } } s.Node.Advance() case <-s.done: return } }
增加或删除一个节点,需要创建 ConfChange struct 如 ‘cc‘ 然后调用: n.ProposeConfChange(ctx, cc) 当config修改被committed后,committed entry将有类型 "raftpb.EntryConfChange" 在Ready中返回,需要应用到本地节点中:
var cc raftpb.ConfChange cc.Unmarshal(data) n.ApplyConfChange(cc)
SnapShort
type ConfState struct { Nodes []uint64 } type SnapshotMetadata struct { ConfState ConfState Index uint64 Term uint64 } type Snapshot struct { Data []byte Metadata SnapshotMetadata }