标签:hdfs 文件系统 hadoop namespace 存储
建议读者在阅读本文的过程中,结合代码一起阅读, Hadoop的源码可自行下载,这样效果可能会更好。与本文有涉及的类包括下面一些,比上次的分析略多一点,个别主类的代码量已经在上千行了。
/** * FSImage handles checkpointing and logging of the namespace edits. * fsImage镜像类 */ public class FSImage extends Storage { //标准时间格式 private static final SimpleDateFormat DATE_FORM = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // // The filenames used for storing the images // 在命名空间镜像中可能用的几种名称 // enum NameNodeFile { IMAGE ("fsimage"), TIME ("fstime"), EDITS ("edits"), IMAGE_NEW ("fsimage.ckpt"), EDITS_NEW ("edits.new"); private String fileName = null; private NameNodeFile(String name) {this.fileName = name;} String getName() {return fileName;} } // checkpoint states // 检查点击几种状态 enum CheckpointStates{START, ROLLED_EDITS, UPLOAD_START, UPLOAD_DONE; } /** * Implementation of StorageDirType specific to namenode storage * A Storage directory could be of type IMAGE which stores only fsimage, * or of type EDITS which stores edits or of type IMAGE_AND_EDITS which * stores both fsimage and edits. * 名字节点目录存储类型 */ static enum NameNodeDirType implements StorageDirType { //名字节点存储类型定义主要有以下4种定义 UNDEFINED, IMAGE, EDITS, IMAGE_AND_EDITS; public StorageDirType getStorageDirType() { return this; } //做存储类型的验证 public boolean isOfType(StorageDirType type) { if ((this == IMAGE_AND_EDITS) && (type == IMAGE || type == EDITS)) return true; return this == type; } } protected long checkpointTime = -1L; //内部维护了编辑日志类,与镜像类配合操作 protected FSEditLog editLog = null; private boolean isUpgradeFinalized = false;马上看到的是几个文件状态的名称,什么edit,edit.new,fsimage.ckpt,后面这些都会在元数据机制的中进行细致讲解,可以理解为就是临时存放的目录名称。而对于这些目录的遍历,查询操作,都是下面这个类实现的
//目录迭代器 private class DirIterator implements Iterator<StorageDirectory> { //目录存储类型 StorageDirType dirType; //向前的指标,用于移除操作 int prevIndex; // for remove() //向后指标 int nextIndex; // for next() DirIterator(StorageDirType dirType) { this.dirType = dirType; this.nextIndex = 0; this.prevIndex = 0; } public boolean hasNext() { .... } public StorageDirectory next() { StorageDirectory sd = getStorageDir(nextIndex); prevIndex = nextIndex; nextIndex++; if (dirType != null) { while (nextIndex < storageDirs.size()) { if (getStorageDir(nextIndex).getStorageDirType().isOfType(dirType)) break; nextIndex++; } } return sd; } public void remove() { ... } }
/** * Common class for storage information. * 存储信息公告类 * TODO namespaceID should be long and computed as hash(address + port) * 命名空间ID必须足够长,ip地址+端口号做哈希计算而得 */ public class StorageInfo { //存储信息版本号 public int layoutVersion; // Version read from the stored file. //命名空间ID public int namespaceID; // namespace id of the storage //存储信息创建时间 public long cTime; // creation timestamp public StorageInfo () { //默认构造函数,全为0 this(0, 0, 0L); }
/** * Save the contents of the FS image to the file. * 保存镜像文件 */ void saveFSImage(File newFile) throws IOException { FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem(); FSDirectory fsDir = fsNamesys.dir; long startTime = FSNamesystem.now(); // // Write out data // DataOutputStream out = new DataOutputStream( new BufferedOutputStream( new FileOutputStream(newFile))); try { //写入版本号 out.writeInt(FSConstants.LAYOUT_VERSION); //写入命名空间ID out.writeInt(namespaceID); //写入目录下的孩子总数 out.writeLong(fsDir.rootDir.numItemsInTree()); //写入时间 out.writeLong(fsNamesys.getGenerationStamp()); byte[] byteStore = new byte[4*FSConstants.MAX_PATH_LENGTH]; ByteBuffer strbuf = ByteBuffer.wrap(byteStore); // save the root saveINode2Image(strbuf, fsDir.rootDir, out); // save the rest of the nodes saveImage(strbuf, 0, fsDir.rootDir, out); fsNamesys.saveFilesUnderConstruction(out); fsNamesys.saveSecretManagerState(out); strbuf = null; } finally { out.close(); } LOG.info("Image file of size " + newFile.length() + " saved in " + (FSNamesystem.now() - startTime)/1000 + " seconds."); }从上面的几行可以看到,一个完整的镜像文件首部应该包括版本号,命名空间iD,文件数,数据块版本块,然后后面是具体的文件信息。在这里人家还保存了构建节点文件信息以及安全信息。在保存文件目录的信息时,采用的saveINode2Image()先保留目录信息,然后再调用saveImage()保留孩子文件信息,因为在saveImage()中会调用saveINode2Image()方法。
/* * Save one inode‘s attributes to the image. * 保留一个节点的属性到镜像中 */ private static void saveINode2Image(ByteBuffer name, INode node, DataOutputStream out) throws IOException { int nameLen = name.position(); out.writeShort(nameLen); out.write(name.array(), name.arrayOffset(), nameLen); if (!node.isDirectory()) { // write file inode INodeFile fileINode = (INodeFile)node; //写入的属性包括,副本数,最近修改数据,最近访问时间 out.writeShort(fileINode.getReplication()); out.writeLong(fileINode.getModificationTime()); out.writeLong(fileINode.getAccessTime()); out.writeLong(fileINode.getPreferredBlockSize()); Block[] blocks = fileINode.getBlocks(); out.writeInt(blocks.length); for (Block blk : blocks) //将数据块信息也写入 blk.write(out); FILE_PERM.fromShort(fileINode.getFsPermissionShort()); PermissionStatus.write(out, fileINode.getUserName(), fileINode.getGroupName(), FILE_PERM); } else { // write directory inode //如果是目录,则还要写入节点的配额限制值 out.writeShort(0); // replication out.writeLong(node.getModificationTime()); out.writeLong(0); // access time out.writeLong(0); // preferred block size out.writeInt(-1); // # of blocks out.writeLong(node.getNsQuota()); out.writeLong(node.getDsQuota()); FILE_PERM.fromShort(node.getFsPermissionShort()); PermissionStatus.write(out, node.getUserName(), node.getGroupName(), FILE_PERM); } }
/** * Save file tree image starting from the given root. * This is a recursive procedure, which first saves all children of * a current directory and then moves inside the sub-directories. * 按照给定节点进行镜像的保存,每个节点目录会采取递归的方式进行遍历 */ private static void saveImage(ByteBuffer parentPrefix, int prefixLength, INodeDirectory current, DataOutputStream out) throws IOException { int newPrefixLength = prefixLength; if (current.getChildrenRaw() == null) return; for(INode child : current.getChildren()) { // print all children first parentPrefix.position(prefixLength); parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes()); saveINode2Image(parentPrefix, child, out); .... }写入正在处理的文件的方法是一个static静态方法,是被外部方法所引用的
// Helper function that writes an INodeUnderConstruction // into the input stream // 写入正在操作的文件的信息 // static void writeINodeUnderConstruction(DataOutputStream out, INodeFileUnderConstruction cons, String path) throws IOException { writeString(path, out); out.writeShort(cons.getReplication()); out.writeLong(cons.getModificationTime()); out.writeLong(cons.getPreferredBlockSize()); int nrBlocks = cons.getBlocks().length; out.writeInt(nrBlocks); for (int i = 0; i < nrBlocks; i++) { cons.getBlocks()[i].write(out); } cons.getPermissionStatus().write(out); writeString(cons.getClientName(), out); writeString(cons.getClientMachine(), out); out.writeInt(0); // do not store locations of last block }在这里顺便看下格式化相关的方法,格式化的操作是在每次开始使用HDFS前进行的,在这个过程中会生出新的版本号和命名空间ID,在代码中是如何实现的呢
public void format() throws IOException { this.layoutVersion = FSConstants.LAYOUT_VERSION; //对每个目录进行格式化操作 format(sd); } } /** Create new dfs name directory. Caution: this destroys all files * 格式化操作,会创建一个dfs/name的目录 * in this filesystem. */ void format(StorageDirectory sd) throws IOException { sd.clearDirectory(); // create currrent dir sd.lock(); try { saveCurrent(sd); } finally { sd.unlock(); } LOG.info("Storage directory " + sd.getRoot() + " has been successfully formatted."); }操作很简单,就是清空原有目录并创建新的目录。
/** * FSEditLog maintains a log of the namespace modifications. * 编辑日志类包含了命名空间各种修改操作的日志记录 */ public class FSEditLog { //操作参数种类 private static final byte OP_INVALID = -1; // 文件操作相关 private static final byte OP_ADD = 0; private static final byte OP_RENAME = 1; // rename private static final byte OP_DELETE = 2; // delete private static final byte OP_MKDIR = 3; // create directory private static final byte OP_SET_REPLICATION = 4; // set replication //the following two are used only for backward compatibility : @Deprecated private static final byte OP_DATANODE_ADD = 5; @Deprecated private static final byte OP_DATANODE_REMOVE = 6; //下面2个权限设置相关 private static final byte OP_SET_PERMISSIONS = 7; private static final byte OP_SET_OWNER = 8; private static final byte OP_CLOSE = 9; // close after write private static final byte OP_SET_GENSTAMP = 10; // store genstamp /* The following two are not used any more. Should be removed once * LAST_UPGRADABLE_LAYOUT_VERSION is -17 or newer. */ //配额设置相关 private static final byte OP_SET_NS_QUOTA = 11; // set namespace quota private static final byte OP_CLEAR_NS_QUOTA = 12; // clear namespace quota private static final byte OP_TIMES = 13; // sets mod & access time on a file private static final byte OP_SET_QUOTA = 14; // sets name and disk quotas. //Token认证相关 private static final byte OP_GET_DELEGATION_TOKEN = 18; //new delegation token private static final byte OP_RENEW_DELEGATION_TOKEN = 19; //renew delegation token private static final byte OP_CANCEL_DELEGATION_TOKEN = 20; //cancel delegation token private static final byte OP_UPDATE_MASTER_KEY = 21; //update master key可见操作之多啊,然后才是基本变量的定义
//日志刷入的缓冲大小值512k private static int sizeFlushBuffer = 512*1024; //编辑日志同时有多个输出流对象 private ArrayList<EditLogOutputStream> editStreams = null; //内部维护了1个镜像类,与镜像进行交互 private FSImage fsimage = null; // a monotonically increasing counter that represents transactionIds. //每次进行同步刷新的事物ID private long txid = 0; // stores the last synced transactionId. //最近一次已经同步的事物Id private long synctxid = 0; // the time of printing the statistics to the log file. private long lastPrintTime; // is a sync currently running? //是否有日志同步操作正在进行 private boolean isSyncRunning; // these are statistics counters. //事务相关的统计变量 //事务的总数 private long numTransactions; // number of transactions //未能即使被同步的事物次数统计 private long numTransactionsBatchedInSync; //事务的总耗时 private long totalTimeTransactions; // total time for all transactions private NameNodeInstrumentation metrics;这里txtid,synctxid等变量会在后面的同步操作时频繁出现。作者为了避免多线程事务id之间的相互干扰,采用了ThreadLocal的方式来维护自己的事务id
//事物ID对象类,内部包含long类型txid值 private static class TransactionId { //操作事物Id public long txid; TransactionId(long value) { this.txid = value; } } // stores the most current transactionId of this thread. //通过ThreadLocal类保存线程私有的状态信息 private static final ThreadLocal<TransactionId> myTransactionId = new ThreadLocal<TransactionId>() { protected synchronized TransactionId initialValue() { return new TransactionId(Long.MAX_VALUE); } };在EditLog编辑日志中,所有的文件操作都是通过特有的EditLog输入输出流实现的,他是一个父类,这里以EditLogOutput为例
/** * A generic abstract class to support journaling of edits logs into * a persistent storage. */ abstract class EditLogOutputStream extends OutputStream { // these are statistics counters //下面是2个统计量 //文件同步的次数,可以理解为就是缓冲写入的次数 private long numSync; // number of sync(s) to disk //同步写入的总时间计数 private long totalTimeSync; // total time to sync EditLogOutputStream() throws IOException { numSync = totalTimeSync = 0; } abstract String getName(); abstract public void write(int b) throws IOException; abstract void write(byte op, Writable ... writables) throws IOException; abstract void create() throws IOException; abstract public void close() throws IOException; abstract void setReadyToFlush() throws IOException; abstract protected void flushAndSync() throws IOException; /** * Flush data to persistent store. * Collect sync metrics. * 刷出时间方法 */ public void flush() throws IOException { //同步次数加1 numSync++; long start = FSNamesystem.now(); //刷出同步方法为抽象方法,由继承的子类具体 flushAndSync(); long end = FSNamesystem.now(); //同时进行耗时的累加 totalTimeSync += (end - start); } abstract long length() throws IOException; long getTotalSyncTime() { return totalTimeSync; } long getNumSync() { return numSync; } }人家在这里对同步相关的操作做了一些设计,包括一些计数的统计。输入流与此类似,就不展开讨论了,但是EditLog并没有直接用了此类,而是在这个类中继承了一个内容更加丰富的EditLogFileOutputStream
/** * An implementation of the abstract class {@link EditLogOutputStream}, * which stores edits in a local file. * 所有的写日志文件的操作,都会通过这个输出流对象实现 */ static private class EditLogFileOutputStream extends EditLogOutputStream { private File file; //内部维护了一个文件输出流对象 private FileOutputStream fp; // file stream for storing edit logs private FileChannel fc; // channel of the file stream for sync //这里设计了一个双缓冲区的设计,大大加强并发度,bufCurrent负责写入写入缓冲区 private DataOutputBuffer bufCurrent; // current buffer for writing //bufReady负载刷入数据到文件中 private DataOutputBuffer bufReady; // buffer ready for flushing static ByteBuffer fill = ByteBuffer.allocateDirect(512); // preallocation注意这里有双缓冲的设计,双缓冲的设计在许多的别的优秀的系统中都有用到。现在从编辑日志写文件开始看起
/** * Create empty edit log files. * Initialize the output stream for logging. * * @throws IOException */ public synchronized void open() throws IOException { //在文件打开的时候,计数值都初始化0 numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0; if (editStreams == null) { editStreams = new ArrayList<EditLogOutputStream>(); } //传入目录类型获取迭代器 Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS); while (it.hasNext()) { StorageDirectory sd = it.next(); File eFile = getEditFile(sd); try { //打开存储目录下的文件获取输出流 EditLogOutputStream eStream = new EditLogFileOutputStream(eFile); editStreams.add(eStream); } catch (IOException ioe) { fsimage.updateRemovedDirs(sd, ioe); it.remove(); } } exitIfNoStreams(); }这里将会把一个新的输出流加入到editStreams全局变量中。那么对于一次标准的写入过程是怎么样的呢,我们以文件关闭的方法为例,因为文件关闭会触发一次最后剩余数据的写入操作
/** * Shutdown the file store. * 关闭操作 */ public synchronized void close() throws IOException { while (isSyncRunning) { //如果同正在进行,则等待1s try { wait(1000); } catch (InterruptedException ie) { } } if (editStreams == null) { return; } printStatistics(true); //当文件关闭的时候重置计数 numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0; for (int idx = 0; idx < editStreams.size(); idx++) { EditLogOutputStream eStream = editStreams.get(idx); try { //关闭将最后的数据刷出缓冲 eStream.setReadyToFlush(); eStream.flush(); eStream.close(); } catch (IOException ioe) { removeEditsAndStorageDir(idx); idx--; } } editStreams.clear(); }主要是中的2行代码,setReadyToFlush()交换缓冲区
/** * All data that has been written to the stream so far will be flushed. * New data can be still written to the stream while flushing is performed. */ @Override void setReadyToFlush() throws IOException { assert bufReady.size() == 0 : "previous data is not flushed yet"; write(OP_INVALID); // insert end-of-file marker //交换2个缓冲区 DataOutputBuffer tmp = bufReady; bufReady = bufCurrent; bufCurrent = tmp; }bufCurrent的缓冲用于外部写进行的数据缓冲,而bufReady则是将要写入文件的数据缓冲。而真正起作用的是flush()方法,他是父类中的方法
/** * Flush data to persistent store. * Collect sync metrics. * 刷出时间方法 */ public void flush() throws IOException { //同步次数加1 numSync++; long start = FSNamesystem.now(); //刷出同步方法为抽象方法,由继承的子类具体 flushAndSync(); long end = FSNamesystem.now(); //同时进行耗时的累加 totalTimeSync += (end - start); }会调用到同步方法
/** * Flush ready buffer to persistent store. * currentBuffer is not flushed as it accumulates new log records * while readyBuffer will be flushed and synced. */ @Override protected void flushAndSync() throws IOException { preallocate(); // preallocate file if necessary //将ready缓冲区中的数据写入文件中 bufReady.writeTo(fp); // write data to file bufReady.reset(); // erase all data in the buffer fc.force(false); // metadata updates not needed because of preallocation //跳过无效标志位,因为无效标志位每次都会写入 fc.position(fc.position()-1); // skip back the end-of-file marker }你也许会想,简简单单的文件写入过程,的确设计的有点精巧。再回顾之前文件最顶上的几十种操作码类型,代表了各式各样的操作,他们是如何被调用的呢,第一反应当然是外界传入参数值,然后我调用相应语句做操作匹配,EditLog沿用的也是这个思路。
/** * Add set replication record to edit log */ void logSetReplication(String src, short replication) { logEdit(OP_SET_REPLICATION, new UTF8(src), FSEditLog.toLogReplication(replication)); } /** Add set namespace quota record to edit log * * @param src the string representation of the path to a directory * @param quota the directory size limit */ void logSetQuota(String src, long nsQuota, long dsQuota) { logEdit(OP_SET_QUOTA, new UTF8(src), new LongWritable(nsQuota), new LongWritable(dsQuota)); } /** Add set permissions record to edit log */ void logSetPermissions(String src, FsPermission permissions) { logEdit(OP_SET_PERMISSIONS, new UTF8(src), permissions); }其实还有很多的logSet*系列的方法,形式都是传入操作码,操作对象以及附加参数,就会调用到更加基层的logEdit方法,这个方法才是最终写入操作记录的方法。
/** * Write an operation to the edit log. Do not sync to persistent * store yet. * 写入一个操作到编辑日志中 */ synchronized void logEdit(byte op, Writable ... writables) { if (getNumEditStreams() < 1) { throw new AssertionError("No edit streams to log to"); } long start = FSNamesystem.now(); for (int idx = 0; idx < editStreams.size(); idx++) { EditLogOutputStream eStream = editStreams.get(idx); try { // 写入操作到每个输出流中 eStream.write(op, writables); } catch (IOException ioe) { removeEditsAndStorageDir(idx); idx--; } } exitIfNoStreams(); // get a new transactionId //获取一个新的事物Id txid++; // // record the transactionId when new data was written to the edits log // TransactionId id = myTransactionId.get(); id.txid = txid; // update statistics long end = FSNamesystem.now(); //在每次进行logEdit写入记录操作的时候,都会累加事物次数和耗时 numTransactions++; totalTimeTransactions += (end-start); if (metrics != null) // Metrics is non-null only when used inside name node metrics.addTransaction(end-start); }每次新的操作,在这里都生成一个新的事务id,并且会统计事务执行写入缓冲时间等,但是此时只是写入的输出流中,还没有写到文件。原因是你要考虑到多线程操作的情况。
// // Sync all modifications done by this thread. // public void logSync() throws IOException { ArrayList<EditLogOutputStream> errorStreams = null; long syncStart = 0; // Fetch the transactionId of this thread. long mytxid = myTransactionId.get().txid; ArrayList<EditLogOutputStream> streams = new ArrayList<EditLogOutputStream>(); boolean sync = false; try { synchronized (this) { printStatistics(false); // if somebody is already syncing, then wait while (mytxid > synctxid && isSyncRunning) { try { wait(1000); } catch (InterruptedException ie) { } } // // If this transaction was already flushed, then nothing to do // if (mytxid <= synctxid) { //当执行的事物id小于已同步的Id,也进行计数累加 numTransactionsBatchedInSync++; if (metrics != null) // Metrics is non-null only when used inside name node metrics.incrTransactionsBatchedInSync(); return; } // now, this thread will do the sync syncStart = txid; isSyncRunning = true; sync = true; // swap buffers exitIfNoStreams(); for(EditLogOutputStream eStream : editStreams) { try { //交换缓冲 eStream.setReadyToFlush(); streams.add(eStream); } catch (IOException ie) { FSNamesystem.LOG.error("Unable to get ready to flush.", ie); // // remember the streams that encountered an error. // if (errorStreams == null) { errorStreams = new ArrayList<EditLogOutputStream>(1); } errorStreams.add(eStream); } } } // do the sync long start = FSNamesystem.now(); for (EditLogOutputStream eStream : streams) { try { //同步完成之后,做输入数据操作 eStream.flush(); .... }ok,整个操作过程总算理清了。写入的过程完成之后,编辑日志类是如何读入编辑日志文件,并完成内存元数据的恢复 的呢,整个过程其实就是一个解码的过程
/** * Load an edit log, and apply the changes to the in-memory structure * This is where we apply edits that we‘ve been writing to disk all * along. * 导入编辑日志文件,并在内存中构建此时状态 */ static int loadFSEdits(EditLogInputStream edits) throws IOException { FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem(); //FSDirectory是一个门面模式的体现,所有的操作都是在这个类中分给里面的子系数实现 FSDirectory fsDir = fsNamesys.dir; int numEdits = 0; int logVersion = 0; String clientName = null; String clientMachine = null; String path = null; int numOpAdd = 0, numOpClose = 0, numOpDelete = 0, numOpRename = 0, numOpSetRepl = 0, numOpMkDir = 0, numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0, numOpTimes = 0, numOpGetDelegationToken = 0, numOpRenewDelegationToken = 0, numOpCancelDelegationToken = 0, numOpUpdateMasterKey = 0, numOpOther = 0; long startTime = FSNamesystem.now(); DataInputStream in = new DataInputStream(new BufferedInputStream(edits)); try { // Read log file version. Could be missing. in.mark(4); // If edits log is greater than 2G, available method will return negative // numbers, so we avoid having to call available boolean available = true; try { // 首先读入日志版本号 logVersion = in.readByte(); } catch (EOFException e) { available = false; } if (available) { in.reset(); logVersion = in.readInt(); if (logVersion < FSConstants.LAYOUT_VERSION) // future version throw new IOException( "Unexpected version of the file system log file: " + logVersion + ". Current version = " + FSConstants.LAYOUT_VERSION + "."); } assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION : "Unsupported version " + logVersion; while (true) { .... //下面根据操作类型进行值的设置 switch (opcode) { case OP_ADD: case OP_CLOSE: { ... break; } case OP_SET_REPLICATION: { numOpSetRepl++; path = FSImage.readString(in); short replication = adjustReplication(readShort(in)); fsDir.unprotectedSetReplication(path, replication, null); break; } case OP_RENAME: { numOpRename++; int length = in.readInt(); if (length != 3) { throw new IOException("Incorrect data format. " + "Mkdir operation."); } String s = FSImage.readString(in); String d = FSImage.readString(in); timestamp = readLong(in); HdfsFileStatus dinfo = fsDir.getFileInfo(d); fsDir.unprotectedRenameTo(s, d, timestamp); fsNamesys.changeLease(s, d, dinfo); break; } ...
// // The main work loop // public void doWork() { long period = 5 * 60; // 5 minutes long lastCheckpointTime = 0; if (checkpointPeriod < period) { period = checkpointPeriod; } //主循环程序 while (shouldRun) { try { Thread.sleep(1000 * period); } catch (InterruptedException ie) { // do nothing } if (!shouldRun) { break; } try { // We may have lost our ticket since last checkpoint, log in again, just in case if(UserGroupInformation.isSecurityEnabled()) UserGroupInformation.getCurrentUser().reloginFromKeytab(); long now = System.currentTimeMillis(); long size = namenode.getEditLogSize(); if (size >= checkpointSize || now >= lastCheckpointTime + 1000 * checkpointPeriod) { //周期性调用检查点方法 doCheckpoint(); ... } }然后我们就找doCheckpoint()检查点检查方法
/** * Create a new checkpoint */ void doCheckpoint() throws IOException { // Do the required initialization of the merge work area. //做初始化的镜像操作 startCheckpoint(); // Tell the namenode to start logging transactions in a new edit file // Retuns a token that would be used to upload the merged image. CheckpointSignature sig = (CheckpointSignature)namenode.rollEditLog(); // error simulation code for junit test if (ErrorSimulator.getErrorSimulation(0)) { throw new IOException("Simulating error0 " + "after creating edits.new"); } //从名字节点获取当前镜像或编辑日志 downloadCheckpointFiles(sig); // Fetch fsimage and edits //进行镜像合并操作 doMerge(sig); // Do the merge // // Upload the new image into the NameNode. Then tell the Namenode // to make this new uploaded image as the most current image. //把合并好后的镜像重新上传到名字节点 putFSImage(sig); // error simulation code for junit test if (ErrorSimulator.getErrorSimulation(1)) { throw new IOException("Simulating error1 " + "after uploading new image to NameNode"); } //通知名字节点进行镜像的替换操作,包括将edit.new的名称重新改为edit,镜像名称fsimage.ckpt改为fsImage namenode.rollFsImage(); checkpointImage.endCheckpoint(); LOG.info("Checkpoint done. New Image Size: " + checkpointImage.getFsImageName().length()); }这个方法中描述了非常清晰的备份机制。我们主要再来看下文件的替换方法,也就是namenode.rollFsImage方法,这个方法最后还是会调到FSImage的同名方法。
/** * Moves fsimage.ckpt to fsImage and edits.new to edits * Reopens the new edits file. * 完成2个文件的名称替换 */ void rollFSImage() throws IOException { if (ckptState != CheckpointStates.UPLOAD_DONE) { throw new IOException("Cannot roll fsImage before rolling edits log."); } // // First, verify that edits.new and fsimage.ckpt exists in all // checkpoint directories. // if (!editLog.existsNew()) { throw new IOException("New Edits file does not exist"); } Iterator<StorageDirectory> it = dirIterator(NameNodeDirType.IMAGE); while (it.hasNext()) { StorageDirectory sd = it.next(); File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW); if (!ckpt.exists()) { throw new IOException("Checkpoint file " + ckpt + " does not exist"); } } editLog.purgeEditLog(); // renamed edits.new to edits方法前半部分交待的很明确,做2类文件的替换,
// // Renames new image // 重命名新镜像名称 // it = dirIterator(NameNodeDirType.IMAGE); while (it.hasNext()) { StorageDirectory sd = it.next(); File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW); File curFile = getImageFile(sd, NameNodeFile.IMAGE); // renameTo fails on Windows if the destination file // already exists. if (!ckpt.renameTo(curFile)) { curFile.delete(); if (!ckpt.renameTo(curFile)) { editLog.removeEditsForStorageDir(sd); updateRemovedDirs(sd); it.remove(); } } } editLog.exitIfNoStreams();中间代码部分完成fsimage.ckpt的新镜像重命名为当前名称fsimage,最后要对旧的目录文件进行删除操作
// // Updates the fstime file on all directories (fsimage and edits) // and write version file // this.layoutVersion = FSConstants.LAYOUT_VERSION; this.checkpointTime = FSNamesystem.now(); it = dirIterator(); while (it.hasNext()) { StorageDirectory sd = it.next(); // delete old edits if sd is the image only the directory if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) { File editsFile = getImageFile(sd, NameNodeFile.EDITS); editsFile.delete(); } // delete old fsimage if sd is the edits only the directory if (!sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) { File imageFile = getImageFile(sd, NameNodeFile.IMAGE); imageFile.delete(); }
标签:hdfs 文件系统 hadoop namespace 存储