码迷,mamicode.com
首页 > 其他好文 > 详细

ZooKeeper源码分析:Log和Snapshot持久化(SyncRequestProcessor类)

时间:2015-03-04 09:55:32      阅读:222      评论:0      收藏:0      [点我收藏+]

标签:分布式   云计算   zookeeper   

事务日志的持久化是在SyncRequestProcessor类中实现,并会按照一定的规则滚动日志(关闭当前文件,创建一个新文件),以及生成新的Snapshot。在持久化过程中,使用组提交(Group Commits)来优化磁盘io 操作。组提交是指将多个Request对象的事务作为一次写附加到磁盘上。使用这种方式可以在持久化多个事务的时候,只使用一次磁盘寻道(Disk Seek)的开销。Request对象只有在其中事务同步到磁盘后,才会传递到下一个处理器。

SyncRequestProcessor被用于下面三种不同的情景中:

  • Leader - 同步请求到磁盘,并且转发这个请求到AckRequestProcessor。该处理器发送Ack消息给Leader自己。
  • Follower - 同步请求到磁盘,并转发请求到SendAckRequestProcessor。该处理器发送确认数据包给Leader。SendAckRequestProcessor是flushable, 允许我们强制将数据包推送到Leader。
  • Observer - 同步已经提交的请求到磁盘(从INFORM数据包中接收)。它不会发送确认数据包给Leader。所以nextProcessor是null。在observer中, 和一般的txnlog语义不同,因为它仅包含已经提交的txn。

在SyncRequestProcessor中有两个关键队列:

  • queuedRequest队列:存放从传入该处理器的Request对象。当调用该处理器的processRequest方法,会将Request对象放入到queuedRequest队列;
  • toFlush队列:存放已经附加到日志文件,但还没有Flush的Request对象。

SyncRequestProcessor的run方法循环读取queuedRequests队列中的Request对象并进行持久化。

流程图如下:

技术分享

如果toFlush队列为空,则调用queuedRequest队列的阻塞方法take();如果toFlush队列不为空,则调用queuedRequest队列的非阻塞方法poll()。如果poll()方法返回null,则会立即将toFlush队列中所有Request对象中事务Flush到磁盘,并将Request对象传入到下一个处理器。这样可以避免增加请求处理的延时。如果queuedRequest.poll()方法返回不为Null或者queuedRequest.take()方法返回, 则将返回的Request对象si中的事务附加到事务日志文件中,并放入toFlush队列中。如果toFlush队列大小大于1000,则将队列中所有Request对象中事务Flush到磁盘,并将Request对象传入下一个处理器。这是可以避免在有大量请求的时候增加请求处理的延时。

Request对象附加到事务日志之后,会检查日志记录数logCount是否大于(snapCount / 2 + randRoll)。如果大于则滚动日志,并启动生成新Snapshot的线程。其中randRoll是一个随机数。这个随机数的使用可以避免Zookeeper集群里的所有机器同时构建Snapshot。

SyncRequestProcessor.run方法如下:

public void run() {
    try {
        int logCount = 0;

       //这个随机数randRoll的使用可以避免Zookeeper集群里的所有机器同时构建Snapshot
        setRandRoll(r.nextInt( snapCount/2));
        while (true ) {
            Request si = null;
            //如果toFlush为空,则调用队列queuedRequests的阻塞方法take()
            if (toFlush .isEmpty()) {
                si = queuedRequests.take();
            }
                            //如果toFlush不为空,则调用队列queuedRequests的非阻塞方法poll() 
            else {
                si = queuedRequests.poll();
                //如果si为null, 说明queuedRequests为空,则调用flush()方法
                if (si == null) {
                    flush( toFlush);
                    continue;
                }
            }
            //如果si是一个poison pill, 则退出循环
            if (si == requestOfDeath ) {
                break;
            }
            if (si != null) {
                // track the number of records written to the log
                //将record的操作记到日志中
                if (zks .getZKDatabase().append(si)) {
                    logCount++;
                    if (logCount > (snapCount / 2 + randRoll)) {
                        randRoll = r .nextInt(snapCount/2);
                        //滚动事务日志
                        zks.getZKDatabase().rollLog();
                        //构建snapshot
                        if (snapInProcess != null && snapInProcess.isAlive()) {
                            LOG.warn("Too busy to snap, skipping" );
                        } else {
                            //生成snapshot线程
                            snapInProcess = new Thread("Snapshot Thread") {
                                    public void run() {
                                        try {
                                            zks.takeSnapshot();
                                        } catch(Exception e) {
                                            LOG.warn("Unexpected exception", e);
                                        }
                                    }
                                };
                            //启动snapInProcess
                            snapInProcess.start();
                        }
                        logCount = 0;
                    }
                }
                                      
                else if (toFlush .isEmpty()) {
                    // optimization for read heavy workloads
                    //如果这是一个read, 并且没有pending的flushes(writes), 那么直接传递到下一个处理器
                    if (nextProcessor != null) {
                        nextProcessor.processRequest(si);
                        if (nextProcessor instanceof Flushable) {
                            ((Flushable) nextProcessor).flush();
                        }
                    }
                    continue;
                }
                toFlush.add(si);
                //如果toFlush的大小大于1000, 则flush
                if (toFlush .size() > 1000) {
                    flush( toFlush);
                }
            }
        }
    } catch (Throwable t) {
        LOG.error("Severe unrecoverable error, exiting" , t);
        running = false ;
        System. exit(11);
    }
    LOG.info("SyncRequestProcessor exited!" );
}

 

转载请附上原博客地址:http://blog.csdn.net/jeff_fangji/article/details/44046997

ZooKeeper源码分析:Log和Snapshot持久化(SyncRequestProcessor类)

标签:分布式   云计算   zookeeper   

原文地址:http://blog.csdn.net/jeff_fangji/article/details/44046997

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!