标签:处理 存储 写入 信息处理 port ring imp net disable
etcd是coreOS使用golang开发的分布式,一致性的kv存储系统,因其易用性和高可靠性被广泛运用于服务发现、消息发布和订阅、分布式锁和共享配置等方面,也被认为是zookeeper的强有力的竞争者。作为分布式kv,其底层使用raft算法实现多副本数据的强一致性。etcd作为raft开源实现的标杆,在设计上,将 raft 算法逻辑和持久化、网络、线程等完全抽离出来单独实现,充分解耦,在工程上,实现了诸多性能优化,是 raft 开源实践中较早的工业级的实现,很多后来的 raft 实践者都直接或者间接的参考了 ectd-raft 的设计和实现,例如kubernetes,tiDb等。其广泛的影响力和优雅的golang代码实践也使得ectd成为golang的明星项目。在我们实际的分布式存储系统的项目开发中,raft也被应用于元信息管理和数据存储等多个模块,因此熟悉和理解etcd-raft的实现具有重大意义,本文从raft的基本原理出发,深入浅出地分析了raft在ectd中的具体实现。
image
每个节点都包含状态机,日志模块和一致性模块。功能分别是:
实现一致性的过程可分为Leader选举(Leader election),日志同步(Log replication),安全性(safty),日志压缩(Log compaction),成员变更(membership change)
image
Leader选出后接受客户端请求,Leader把请求日志作为日志条目加入到日志中,然后向其他Follower节点复制日志,但超过半数的日志复制成功,则Leader将日志应用到状态机并向客户端返回执行结果,同时Follower也将结果提交。如果存在Follower没有成功复制日志,Leader会无限重试。
日志同步的关键点:
安全性的原则是一个term只有一个leader,被提交至状态机的数据不能发生更改。保证安全性主要通过限制leader的选举来保证:
raft的golang的开源实现主要包含两个:coreOS的raft实现 , 使用的项目如tidb和cockroachdb这两个经典的newsql。另外一个是hashicrop的raft实现,使用的项目如服务发现解决方案consul和时序数据库influxdb。对比二者的实现主要有如下特点:
分析raft的实现流程,我们可以从raft的几个核心问题入手:
其中leader的选举、log复制和线性一致读是raft协议的最基本要求,而leadership的转移在工程实践中有重大意义。
// node is the canonical implementation of the Node interface
type node struct {
propc chan msgWithResult
recvc chan pb.Message
confc chan pb.ConfChange
confstatec chan pb.ConfState
readyc chan Ready
advancec chan struct{}
tickc chan struct{}
done chan struct{}
stop chan struct{}
status chan chan Status
logger Logger
}
type Node interface {
Tick() //时钟的实现,选举超时和心跳超时基于此实现
Campaign(ctx context.Context) error //参与leader竞争
Propose(ctx context.Context, data []byte) error //在日志中追加数据,需要实现方保证数据追加的成功
ProposeConfChange(ctx context.Context, cc pb.ConfChange) error // 集群配置变更
Step(ctx context.Context, msg pb.Message) error //根据消息变更状态机的状态
//标志某一状态的完成,收到状态变化的节点必须提交变更
Ready() <-chan Ready
//进行状态的提交,收到完成标志后,必须提交过后节点才会实际进行状态机的更新。在包含快照的场景,为了避免快照落地带来的长时间阻塞,允许继续接受和提交其他状态,即使之前的快照状态变更并没有完成。
Advance()
//进行集群配置变更
ApplyConfChange(cc pb.ConfChange) *pb.ConfState
//变更leader
TransferLeadership(ctx context.Context, lead, transferee uint64)
//保证线性一致性读,
ReadIndex(ctx context.Context, rctx []byte) error
//状态机当前的配置
Status() Status
// ReportUnreachable reports the given node is not reachable for the last send.
//上报节点的不可达
ReportUnreachable(id uint64)
//上报快照状态
ReportSnapshot(id uint64, status SnapshotStatus)
//停止节点
Stop()
}
节点初始化raft,读取配置启动各个各个节点,初始化logindex.启动后 以for-loop方式循环运行,用select 机制监听不同的channel 实现对状态变化的监听,并执行相应动作。
//启动
func StartNode(c *Config, peers []Peer) Node {
r := newRaft(c) //初始化raft算法实例
r.becomeFollower(1, None)
//将配置中的节点加入集群
for _, peer := range peers {
...
}
//初始化logindex
r.raftLog.committed = r.raftLog.lastIndex()
for _, peer := range peers {
//初始化节点状态机(progress)
r.addNode(peer.ID)
}
n := newNode()
n.logger = c.Logger
go n.run(r)
return &n
}
//运行
func (n *node) run(r *raft) {
...
select {
//接收到写消息
case pm := <-propc:
...
//接收到readindex 请求
case m := <-n.recvc:
...
//配置变更
case cc := <-n.confc:
...
//超时时间到,包括心跳超时和选举超时等
case <-n.tickc:
...
//数据ready
case readyc <- rd:
...
//可以进行状态变更和日志提交
case <-advancec:
...
//节点状态信号
case c := <-n.status:
...
//收到停止信号
case <-n.stop:
...
}
}
}
初始化node为follower,设置任期为1,并初始化tickElection函数,这是实际参与选举的函数,同时也初始化step为stepFollower,这是作为follower的核心信息处理函数,后续选举,日志复制和快照等功能都基于此函数进行:
r := newRaft(c)
r.becomeFollower(1, None)
当节点接收leader的heartbeat超时时(每个节点都有随机的超时时间),会触发run函数中的tickc这个channel。发送MsgHup消息,并调用campaign参选, 将自身设置为candidate,并递增currentTerm,向其他节点发送竞选消息。其他节点通过监听propc channel获取其他节点发送的投票消息,并调用Step对消息进行判断,选择是否投票。
其中投票的判断逻辑主要分两步:1.如果投票信息中的任期id 是否 小于自身的id,则直接返回nil。2.通过isUpToDate判断能否投票,通过和本地已存在的最新log比较,首先要有最大任期id,如果任期id相同则要求有最大的logindex。
candidate节点收到其他节点的回复后,判断获取的票数是否超过半数,如果是则设置自身为leader,否则为follower。
func (n *node) run(r *raft) {
...
for {
select {
...
//触发heartbeat 超时
case <-n.tickc:
r.tick()
...
}
}
}
//超时触发选举
func (r *raft) tickElection() {
r.electionElapsed++
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
//随机超时时间
func (r *raft) pastElectionTimeout() bool {
return r.electionElapsed >= r.randomizedElectionTimeout
}
func (r *raft) resetRandomizedElectionTimeout() {
r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
}
//参与选举
func (r *raft) campaign(t CampaignType) {
var term uint64
var voteMsg pb.MessageType
//成为candicate,将任期id加1
if t == campaignPreElection {
r.becomePreCandidate()
voteMsg = pb.MsgPreVote
term = r.Term + 1
} else {
r.becomeCandidate()
voteMsg = pb.MsgVote
term = r.Term
}
//判断获取的票数是否超过半数,如果是当选为leader
if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
if t == campaignPreElection {
r.campaign(campaignElection)
} else {
r.becomeLeader()
}
return
}
//向其他节点发送竞选消息
for id := range r.prs {
if id == r.id {
continue
}
var ctx []byte
if t == campaignTransfer {
ctx = []byte(t)
}
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}
//节点投票过程
func (r *raft) Step(m pb.Message) error {
...
//比较任期id
case m.Term > r.Term:
if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
force := bytes.Equal(m.Context, []byte(campaignTransfer))
inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
if !force && inLease {
return nil
}
}
switch m.Type {
case pb.MsgVote, pb.MsgPreVote:
...
//与本地最新的持久化日志比较
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
//发送投票信息
r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
if m.Type == pb.MsgVote {
// Only record real votes.
r.electionElapsed = 0
r.Vote