码迷,mamicode.com
首页 > 其他好文 > 详细

以太坊源码机制:挖矿

时间:2018-03-06 21:42:35      阅读:585      评论:0      收藏:0      [点我收藏+]

标签:开启   func   adc   for   知识   spi   特定   int   abort   

狗年吉祥,开工利是,我们继续研究以太坊源码。从本篇文章开始,我们会深入到以太坊核心源码中去,进而分析与研究以太坊的核心技术。

关键字:拜占庭,挖矿,矿工,分叉,源码分析,uncle叔块,agent,worker,事件监听

本文基于go-ethereum 1.7.3-stable源码版本。源码范围主要在miner pkg。

miner.start()

miner即矿工的意思,矿工要做的工作就是“挖矿”,挖矿就是将一系列最新未封装到块中的交易封装到一个新的区块的过程。学习以太坊挖矿之前,我们要先搞清楚几个概念:

拜占庭将军问题

分布式系统的状态同步问题。

拜占庭帝国繁荣富饶,周边的几个小国家的将军对其垂涎已久但又各自心怀鬼胎。他们必须有超过一半以上的将军同意进攻拜占庭并且不能在战场上做出背叛的动作(达成共识),否则就会进攻失败,引火烧身。而将军的领土有可能被其他几个将军瓜分。基于这种情况,将军们之间的沟通很成问题,有的人口是心非,有的人是忠诚为了组织的利益。如何能最终达成共识,这是个问题。

分布式系统中的每个结点就是将军,这些结点要同步状态时,就会面临拜占庭将军问题。如何有效避免结点发出的错误信息对结果造成的影响?

POW(Proof Of Work)

工作量证明,顾名思义,就是证明你做了多少工作。POW是目前最流行的解决上面拜占庭将军问题的共识算法,比特币、以太坊等主流区块链数字货币都是基POW。

要想解决拜占庭将军问题,需要先确定一个方法:就是在这些平等的将军中间选拔出来一个忠诚的“大将军”,其他将军听他的决定即可。

这似乎违反了去中心化的思想,但我们仔细分析可得,这些将军在做这次决定之前都是去中心化的平等的结点,而选拔出来的大将军只是针对这一次决定,下一次决定还会再次选拔。而不是中心化的固定了一位大将军,以后几十年的朝堂内外都要听他一人的命令。

搞清楚了这个问题以后,下面要解决的就是如何选拔大将军的问题。决定之前,这些将军都是平等结点,所以要通过他们在本次决定时的发言来判断。将军们通过已知战场情报,各自估量当前战场形势,进行计算,有了结论(出块)以后广播给其他将军。同时该将军还要始终监听来自其他将军的广播内容,一旦接收到来自其他将军的结论广播,将军们会立即停止手中的计算,来验证广播的内容,如果所有将军都验证通过,那么第一个发出这个可验证通过结果广播的将军就被选拔为大将军,本次决定听他的结论(已经包含在广播中的结果内容)。

所以这个过程中比较重要的因素有两点:

  1. 首先是速度,第一个通过验证的才能被选拔为大将军,慢一步的第二个没有任何机会。
  2. 然后是正确性,即将军发出的结论是否正确,可被其他将军验证成功。

速度问题是计算能力的问题,例如我的电脑是8核32G配置,计算能力肯定比你单核1G内存的快,这个与POW算法关系不大。而POW算法是用来解决正确性的方案,POW提供了一种难于计算易于验证的方式,这是基于哈希函数的特性实现的,前面文章也有介绍过,哈希函数是

  • 免碰撞的
  • 逆向困难的
  • 如果想求得特定范围的加密值,只能通过穷举法

通过这些特性,工作量证明可以发布给各结点一个哈希加密函数,各结点将待封印区块信息通过该函数再加上一个nonce值计算得到一个加密哈希,这个加密哈希需要满足一定的规则(例如前四位必须是1111),各结点只能通过穷举法来不断尝试,直到满足条件以后,即得出结论,也即出块成功,这时候再将区块哈希广播出去,其他结点一验证,的确满足预定规则(前四位的确是1111),则完成共识,该块由刚才广播的结点出块。这个工作量指的就是结点在不断尝试计算的工作量,得到符合条件的区块哈希以后,经过广播,其他结点手头正在进行的以及已经完成的工作量作废(其实这也是一种算力浪费),该块被证明为由你出块。

问题1:筛选块

就是当你广播时,其他某个结点也算出来符合条件的哈希值了,即该结点也出了编号相同的一个区块,此时就要对比两个块的时间戳,时间较早的会被确认,保留到链上,而另一个较晚的则会被抛弃。

问题2:分叉链

当某个结点发布了新的共识规则,其他结点并未同步该共识规则,一般来讲新的共识规则是向前兼容的,即链上前面的数据仍然是有效的被承认的,但未同步新规则的结点会继续挖矿,他们挖出的块不会被更新新规则的结点共识或者承认。这个时候,链就分叉了,分为1.0(旧共识规则)和2.0(新共识规则)两条链。此时有更多群众(矿工)基础的链会留下来。

例如这个公链是我们公司发布的,我们会去拉动更多客户进来,但其实作为发布者,我和这些客户的结点都是平等的,此时某个陌生结点只要加入进来,它也可以发布新规则,而作为发布者的我们也要更新我们的软件,所以社区就非常重要了,通过社区,我们可以维护着我们客户的支持与信任,当我们发布新规则时会得到他们的支持。由于区块链本身的开源、去中心化的属性,我们的公链一旦发布,就不属于我们了,而是属于每一个参与进来的结点,而我们只能通过确切地办实事,解决问题,才会得到客户矿工们的认可才能保障这条链的优秀竞争力(当然了,作为发布者的我们拥有了大量的预购币,所以为了链的发展,影响力的扩大,币的升值,我们的动力更加强劲)。

不过也有一种情况,就是原来1.0的链还有一部分人再使用,但我要说的是,他们那条链的生命力肯定不行了,因为没有利益相关者,大家不会免费付出。区块链技术是平等公平的,没有得到大家也就不会付出。

miner源码分析

下面采用代码调试的顺序,来分析以太坊miner.go文件源码内容。整个以太坊挖矿相关的操作都是通过Miner结构体暴露出来的方法:

type Miner struct {
    mux *event.TypeMux // 事件锁,已被feed.mu.lock替代
    worker *worker // 干活的人
    coinbase common.Address // 结点地址
    mining   int32 // 代表挖矿进行中的状态
    eth      Backend // Backend对象,Backend是一个自定义接口封装了所有挖矿所需方法。
    engine   consensus.Engine // 共识引擎
    canStart    int32 // 是否能够开始挖矿操作
    shouldStart int32 // 同步以后是否应该开始挖矿
}
  • 事件锁,事件发生时,会有一个TypeMux将时间分派给注册的接收者。接收者可以注册以处理特定类型的事件。在mux结束后调用的任何操作将返回ErrMuxClosed。
  • 共识引擎,获得共识算法的工具对象,以提供后续共识相关操作使用。我会在这篇文章之后单独写两篇文章仔细分析以太坊的两种共识算法ethash和clique。

worker

Miner结构体中其他的都介绍完毕,唯独worker对象需要深入研究,因为外部有一个单独的worker.go文件,Miner包含了这个worker对象。上面注释给出的是“干活的人”,每个miner都会有一个worker成员对象,可以理解为工人,他负责全部具体的挖矿工作流程。

type worker struct {
    config *params.ChainConfig
    engine consensus.Engine

    mu sync.Mutex

    // update loop
    mux          *event.TypeMux
    txCh         chan core.TxPreEvent
    txSub        event.Subscription
    chainHeadCh  chan core.ChainHeadEvent
    chainHeadSub event.Subscription
    chainSideCh  chan core.ChainSideEvent
    chainSideSub event.Subscription
    wg           sync.WaitGroup

    agents map[Agent]struct{} // worker拥有一个Agent的map集合
    recv   chan *Result

    eth     Backend
    chain   *core.BlockChain
    proc    core.Validator
    chainDb ethdb.Database

    coinbase common.Address
    extra    []byte

    currentMu sync.Mutex
    current   *Work

    uncleMu        sync.Mutex
    possibleUncles map[common.Hash]*types.Block

    unconfirmed *unconfirmedBlocks // 本地挖出的待确认的块

    mining int32
    atWork int32
}

worker的属性非常多而具体了,都是挖矿具体操作相关的,其中包括链本身的属性以及区块数据结构的属性。首先来看ChainConfig:

type ChainConfig struct {
    ChainId *big.Int `json:"chainId"` // 链id标识了当前链,主键唯一id,也用于replay protection重发保护(用来防止replay attack重发攻击:恶意重复或拖延正确数据传输的一种网络攻击手段)

    HomesteadBlock *big.Int `json:"homesteadBlock,omitempty"` // 当前链Homestead,置为0

    DAOForkBlock   *big.Int `json:"daoForkBlock,omitempty"`   // TheDAO硬分叉切换。
    DAOForkSupport bool     `json:"daoForkSupport,omitempty"` // 结点是否支持或者反对DAO硬分叉。

    // EIP150 implements the Gas price changes (https://github.com/ethereum/EIPs/issues/150)
    EIP150Block *big.Int    `json:"eip150Block,omitempty"` // EIP150 HF block (nil = no fork)
    EIP150Hash  common.Hash `json:"eip150Hash,omitempty"`  // EIP150 HF hash (needed for header only clients as only gas pricing changed)

    EIP155Block *big.Int `json:"eip155Block,omitempty"` // EIP155 HF block,没有硬分叉置为0
    EIP158Block *big.Int `json:"eip158Block,omitempty"` // EIP158 HF block,没有硬分叉置为0

    ByzantiumBlock *big.Int `json:"byzantiumBlock,omitempty"` // Byzantium switch block (nil = no fork, 0 = already on byzantium)

    // Various consensus engines
    Ethash *EthashConfig `json:"ethash,omitempty"`
    Clique *CliqueConfig `json:"clique,omitempty"`
}

ChainConfig顾名思义就是链的配置属性。

go语法补充:结构体中的标签。我想对于上面ChainId属性后面的``内容,我们都有疑惑,这是结构体中的标签。它是可选的,是对变量的附加内容,通过reflect包可以读取到这些内容,通过对ChainConfig结构体中的属性标签的观察,我们能看出来这些标签是用来声明变量在结构体转化为json结构以后的id值,这个值是可以与当前变量名字不同的。

书归正传,ChainConfig中包含了ChainID等属性,其中有很多都是针对以太坊历史发生的问题进行的专门配置。

  • ChainId可以预防replay攻击。
  • Homestead是以太坊发展蓝图中的一个阶段。第一阶段是以太坊区块链面世,代号为frontier,第二个阶段即为当前阶段,代号为Homestead(家园),第三阶段为大都会Metropolis,大都会又细分为两个小阶段,第一个是拜占庭Byzantium硬分叉(引入新型零知识证明算法以及pos权益证明共识算法),第二个是君士坦丁堡Constantinople硬分叉(以太坊正式应用pow和pos混合链,解决拜占庭引发的问题)。最后一个阶段代号Serenity,最终版本的以太坊稳定运行。
  • 2017年6月18日,以太坊上DAO(去中心自治组织)的一次大危机做出的相应调整。感兴趣的可以自行谷百。
  • 2017年10月16日,以太坊的一次Byzantium拜占庭硬分叉。
  • EIPs(Ethereum Improvement Proposals),是以太坊更新改善的一些方案,对应后面的数字就是以太坊github源码issue的编号。

agent

一个miner拥有一个worker,一个worker拥有多个agent。Agent接口是定义在Worker.go文件中:

// Agent 可以注册到worker
type Agent interface {
    Work() chan<- *Work
    SetReturnCh(chan<- *Result)
    Stop()
    Start()
    GetHashRate() int64
}

这个接口有两个实现:CpuAgent和RemoteAgent。这里使用的是CpuAgent,该Agent会完成一个块的出块工作,同级的多个Agent是竞争关系,最终通过共识算法完成出一个块的工作。

type CpuAgent struct {
    mu sync.Mutex // 锁

    workCh        chan *Work // Work通道对象
    stop          chan struct{} // 结构体通道对象
    quitCurrentOp chan struct{} // 结构体通道对象
    returnCh      chan<- *Result // Result指针通道

    chain  consensus.ChainReader
    engine consensus.Engine

    isMining int32 // agent是否正在挖矿的标志位
}

挖矿start()整个生命周期

开始挖矿,首先要初始化一个miner实例,

func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine) *Miner {
    miner := &Miner{
        eth:      eth,
        mux:      mux,
        engine:   engine,
        worker:   newWorker(config, engine, common.Address{}, eth, mux),
        canStart: 1,
    }
    miner.Register(NewCpuAgent(eth.BlockChain(), engine))
    go miner.update()

    return miner
}

在创建miner实例时,会根据Miner结构体成员属性依次赋值,其咋红worker对象需要调用newWorker构造函数,

func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker {
    worker := &worker{
        config:         config,
        engine:         engine,
        eth:            eth,
        mux:            mux,
        txCh:           make(chan core.TxPreEvent, txChanSize),// TxPreEvent事件是TxPool发出的事件,代表一个新交易tx加入到了交易池中,这时候如果work空闲会将该笔交易收进work.txs,准备下一次打包进块。
        chainHeadCh:    make(chan core.ChainHeadEvent, chainHeadChanSize),// ChainHeadEvent事件,代表已经有一个块作为链头,此时work.update函数会监听到这个事件,则会继续挖新的区块。
        chainSideCh:    make(chan core.ChainSideEvent, chainSideChanSize),// ChainSideEvent事件,代表有一个新块作为链的旁支,会被放到possibleUncles数组中,可能称为叔块。
        chainDb:        eth.ChainDb(),// 区块链数据库
        recv:           make(chan *Result, resultQueueSize),
        chain:          eth.BlockChain(), // 链
        proc:           eth.BlockChain().Validator(),
        possibleUncles: make(map[common.Hash]*types.Block),// 存放可能称为下一个块的叔块数组
        coinbase:       coinbase,
        agents:         make(map[Agent]struct{}),
        unconfirmed:    newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),// 返回一个数据结构,包括追踪当前未被确认的区块。
    }
    // 注册TxPreEvent事件到tx pool交易池
    worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh)
    // 注册事件到blockchain
    worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
    worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
    go worker.update()

    go worker.wait()
    worker.commitNewWork()

    return worker
}

在创建work实例的时候,会有几个比较重要的事件,包括TxPreEvent、ChainHeadEvent、ChainSideEvent,我在上面代码注释中都有了标识。下面看一下开启新线程执行的worker.update(),

        case <-self.chainHeadCh:
            self.commitNewWork()

        // Handle ChainSideEvent
        case ev := <-self.chainSideCh:
            self.uncleMu.Lock()
            self.possibleUncles[ev.Block.Hash()] = ev.Block
            self.uncleMu.Unlock()

        // Handle TxPreEvent
        case ev := <-self.txCh:

由于源码较长,我只展示了一部分,我们知道update方法是用来监听处理上面谈到的三个事件即可。下面再看一下worker.wait()方法,

func (self *worker) wait() {
    for {
        mustCommitNewWork := true
        for result := range self.recv {
            atomic.AddInt32(&self.atWork, -1)

            if result == nil {
                continue
            }
            block := result.Block
            work := result.Work

            // Update the block hash in all logs since it is now available and not when the
            // receipt/log of individual transactions were created.
            for _, r := range work.receipts {
                for _, l := range r.Logs {
                    l.BlockHash = block.Hash()
                }
            }
            for _, log := range work.state.Logs() {
                log.BlockHash = block.Hash()
            }
            stat, err := self.chain.WriteBlockAndState(block, work.receipts, work.state)
            if err != nil {
                log.Error("Failed writing block to chain", "err", err)
                continue
            }
            // 检查是否是标准块,写入交易数据。
            if stat == core.CanonStatTy {
                // 受ChainHeadEvent事件的影响。
                mustCommitNewWork = false
            }
            // 广播一个块声明插入链事件NewMinedBlockEvent
            self.mux.Post(core.NewMinedBlockEvent{Block: block})
            var (
                events []interface{}
                logs   = work.state.Logs()
            )
            events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
            if stat == core.CanonStatTy {
                events = append(events, core.ChainHeadEvent{Block: block})
            }
            self.chain.PostChainEvents(events, logs)

            // 将处理中的数据插入到区块中,等待确认
            self.unconfirmed.Insert(block.NumberU64(), block.Hash())

            if mustCommitNewWork {
                self.commitNewWork() // 多次见到,顾名思义,就是提交新的work
            }
        }
    }
}

wait方法比较长,但必须展示出来的原因是它包含了重要的具体写入块的操作,具体内容请看上面代码中的注释。

通过New方法来初始化创建一个miner实例,入参包括Backend对象,ChainConfig对象属性集合,事件锁,以及指定共识算法引擎,返回一个Miner指针。方法体中对miner对象进行了组装赋值,并且调用了方法NewCpuAgent创建agent的一个实例然后注册到该miner上来,启动一个单独线程执行miner.update(),我们先来看NewCpuAgent方法:

func NewCpuAgent(chain consensus.ChainReader, engine consensus.Engine) *CpuAgent {
    miner := &CpuAgent{
        chain:  chain,
        engine: engine,
        stop:   make(chan struct{}, 1),
        workCh: make(chan *Work, 1),
    }
    return miner
}

通过NewCpuAgent方法,先组装一个CpuAgent,赋值ChainReader,共识引擎,stop结构体,work通道,然后将这个CpuAgent实例赋值给miner并返回miner。然后让我们回到miner.update()方法:

// update方法可以保持对下载事件的监听,请了解这是一段短型的update循环。
func (self *Miner) update() {
    // 注册下载开始事件,下载结束事件,下载失败事件。
    events := self.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
out:
    for ev := range events.Chan() {
        switch ev.Data.(type) {
        case downloader.StartEvent:
            atomic.StoreInt32(&self.canStart, 0)
            if self.Mining() {// 开始下载对应Miner操作Mining。
                self.Stop()
                atomic.StoreInt32(&self.shouldStart, 1)
                log.Info("Mining aborted due to sync")
            }
        case downloader.DoneEvent, downloader.FailedEvent: // 下载完成和失败都走相同的分支。
            shouldStart := atomic.LoadInt32(&self.shouldStart) == 1

            atomic.StoreInt32(&self.canStart, 1)
            atomic.StoreInt32(&self.shouldStart, 0)
            if shouldStart {
                self.Start(self.coinbase) // 执行Miner的start方法。
            }
            // 处理完以后要取消订阅
            events.Unsubscribe()
            // 跳出循环,不再监听
            break out
        }
    }
}

然后我们来看miner的Mining方法,

// 如果miner的mining属性大于1即返回ture,说明正在挖矿中。
func (self *Miner) Mining() bool {
    return atomic.LoadInt32(&self.mining) > 0
}

再来看Miner的start方法,它是属于Miner指针实例的方法,首字母大写代表可以被外部所访问,传入一个地址。

func (self *Miner) Start(coinbase common.Address) {
    atomic.StoreInt32(&self.shouldStart, 1)
    self.worker.setEtherbase(coinbase)
    self.coinbase = coinbase

    if atomic.LoadInt32(&self.canStart) == 0 {
        log.Info("Network syncing, will start miner afterwards")
        return
    }
    atomic.StoreInt32(&self.mining, 1)

    log.Info("Starting mining operation")
    self.worker.start()
    self.worker.commitNewWork()
}

关键代码在self.worker.start()和self.worker.commitNewWork()。先来说worker.start()方法。

func (self *worker) start() {
    self.mu.Lock()
    defer self.mu.Unlock()

    atomic.StoreInt32(&self.mining, 1)

    // spin up agents
    for agent := range self.agents {
        agent.Start()
    }
}

worker.start()实际上遍历启动了它所有的agent。上面提到了,这里走的是CpuAgent的实现。

func (self *CpuAgent) Start() {
    if !atomic.CompareAndSwapInt32(&self.isMining, 0, 1) {
        return // agent already started
    }
    go self.update()
}

启用一个单独线程去执行CpuAgent的update()方法。update方法与上面miner.update十分相似。

func (self *CpuAgent) update() {
out:
    for {
        select {
        case work := <-self.workCh:
            self.mu.Lock()
            if self.quitCurrentOp != nil {
                close(self.quitCurrentOp)
            }
            self.quitCurrentOp = make(chan struct{})
            go self.mine(work, self.quitCurrentOp)
            self.mu.Unlock()
        case <-self.stop:
            self.mu.Lock()
            if self.quitCurrentOp != nil {
                close(self.quitCurrentOp)
                self.quitCurrentOp = nil
            }
            self.mu.Unlock()
            break out
        }
    }
}

out: break out , 跳出for循环,for循环不断监听self信号,当监测到self停止时,则调用关闭操作代码,并直接挑出循环监听,函数退出。

通过监测CpuAgent的workCh通道,是否有work工作信号进入,如果有agent则开始挖矿,期间要上锁,启用一个单独线程去执行CpuAgent的mine方法。

func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) {
    if result, err := self.engine.Seal(self.chain, work.Block, stop); result != nil {
        log.Info("Successfully sealed new block", "number", result.Number(), "hash", result.Hash())
        self.returnCh <- &Result{work, result}
    } else {
        if err != nil {
            log.Warn("Block sealing failed", "err", err)
        }
        self.returnCh <- nil
    }
}

执行到这里,可以看到是调用的CpuAgent的共识引擎的块封装函数Seal来执行具体的挖矿操作。

** 前面提到了以太坊共识算法有ethash和clique两种,所以对应着有两种Seal方法的实现,这里卖个关子,我们在之后的博文会抽丝剥茧,仔细介绍。**

这里收起一个口子,回到上面继续分析另一个比较重要的方法self.worker.commitNewWork()。commitNewWork方法源码比较长,这里就不粘贴出来了,这个方法主要的工作是为新块准备基本数据,包括header,txs,uncles等。

uncles叔块的概念

区块链中会存在一种可能,就是由于网络的原因造成一个块并未存在于最长的链上,这个块就叫做孤块。一般来讲区块链崇尚最长即正义,会毫不犹豫地淘汰掉孤块,但叔块的挖掘也有很大的能耗,并且它是合法的,只是不在最长链上。在以太坊中,孤块被称作叔块,不会被认为是无价值的,而是也会给予奖励。

我们换一种方式来解释叔块,当要出块的时候,可能有两个结点同时出块了,这时候,区块链会保留这两个块,然后看哪个块先有后继块就采用谁,而淘汰另一个块(谁先生儿子,谁就是老大,淘汰另一个)。在以太坊中,生儿子的老大会称为正式块,但叔块的矿工也会得到1/32的奖励,同时,老大的儿子如果记录了叔块,也会得到额外的奖励,但是叔块本身封装的交易会被退回交易池,重新等待打包。这样一来,以太坊显得很有人情味,相当于对挖叔块的工作量的一种认可,是一种以公平为出发点的角度设计的。

总结

关于以太坊挖矿的源码粗略的分析就到这,粗略的意思是对于我自己的标准来讲,我并未逐一介绍每个流程控制,还有每行代码的具体意思,只是大致提供了一种看源码的路线,一条一条的进入,再收紧退回,最终完成了一个闭环,让我们对以太坊挖矿的一些具体操作有了了解。这部分源码主要工作是对交易数据池的维护,块数据组织,各种事件的监听处理,miner-worker-agent的分工关系。最后是留下了唯一的出块共识的问题,就是决定到底谁出块的算法,我们在下一篇文章中继续介绍。

参考资料

go-ethereum源码,网上资料

更多文章请转到醒者呆的博客园

以太坊源码机制:挖矿

标签:开启   func   adc   for   知识   spi   特定   int   abort   

原文地址:https://www.cnblogs.com/Evsward/p/miner.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!