Hadoop版本:Hadoop-1.2.1
参考:《Hadoop技术内幕-深入解析Hadoop Common和HDFS架构设计与实现原理》  
如HDFS节点的VersionedProtocol实现中对NamenodeProtocol的分析,SecondaryNameNode周期性通过getEditLogSize获取NameNode编辑日志大小,如果超出某个值,则通过rollEditLog通知NameNode开始一次检查点过程,NameNode关闭当前编辑日志,使用新的编辑日志edits.new继续记录日志。然后SecondaryNameNode通过HTTP服务获取NameNode上的命名空间镜像和编辑日志,进行合并操作。合并后通过HTTP服务通知NameNode下载新的命名空间镜像,然后通过rollFsImage通知NameNode使用新的检查点。  
本文将按照以上流程对NameNode和SecondaryNameNode之间的交互进行分析。
1. getEditLogSize
NameNode的getEditLogSize实现如下1
2
3
4
5
6
7public long getEditLogSize() throws IOException {
    return namesystem.getEditLogSize();
}
long getEditLogSize() throws IOException {
    return getEditLog().getEditLogSize();
}
如上,还是由FSNamesystem提供服务,通过FSNamesystem获得FSDirectory对象,由FSDirectory获得FSImage对象,FSImage对象最终获得FSEditLog对象,由FSEditLog对象的getEditLogSize返回当前编辑日志的大小1
2
3
4
5
6
7
8
9
10synchronized long getEditLogSize() throws IOException {
    assert(getNumStorageDirs() == editStreams.size());
    long size = 0;
    for (int idx = 0; idx < editStreams.size(); idx++) {
      long curSize = editStreams.get(idx).length();
      assert (size == 0 || size == curSize) : "All streams must be the same";
      size = curSize;
    }
    return size;
}
如上,返回对应的编辑日志文件大小。所有的日志输出流EditLogFileOutputStream对应文件长度应该是一样的,因为每次都是往所有的输出流写数据(日志记录见NameNode命名空间镜像和编辑日志)。
2. rollEditLog
SecondaryNameNode获取NameNode编辑日志大小后,判断是否大于某一值,如果大于再通过rollEditLog开始一个新的检查点。SecondaryNameNode的中间过程后面分析,这里假设编辑日志超过了阈值,通过NamenodeProtocol代理调用rollEditLog,NameNode上rollEditLog实现如下1
2
3
4
5
6
7
8
9
10
11
12
13
14public CheckpointSignature rollEditLog() throws IOException {
    return namesystem.rollEditLog();
}
synchronized CheckpointSignature rollEditLog() throws IOException {
    checkSuperuserPrivilege();//超级用户权限
    synchronized (this) {
      if (isInSafeMode()) {
        throw new SafeModeException("Log not rolled", safeMode);
      }
      LOG.info("Roll Edit Log from " + Server.getRemoteAddress());
      return getFSImage().rollEditLog();
    }
}
如上,还是通过FSNamesystem提供服务,rollEditLog要有超级用户权限,否则会抛出异常。在安全模式下时也不能开始检查点过程。
由FSNamesystem获取到FSImage对象后,FSImage对象调用rollEditLog完成1
2
3
4
5CheckpointSignature rollEditLog() throws IOException {
    getEditLog().rollEditLog();
    ckptState = CheckpointStates.ROLLED_EDITS;
    return new CheckpointSignature(this);
}
最终通过FSImage管理的FSEditLog对象的rollEditLog方法完成相应的操作,FSImage对象的成员ckptState状态改为ROLLED_EDITS,ckptState为CheckpointStates枚举1
enum CheckpointStates{START, ROLLED_EDITS, UPLOAD_START, UPLOAD_DONE; }
最开始为START。
更改状态后创建CheckpointSignature对象返回给SecondaryNameNode,作为一次检查点的标识1
2
3
4
5CheckpointSignature(FSImage fsImage) {
    super(fsImage);
    editsTime = fsImage.getEditLog().getFsEditTime();
    checkpointTime = fsImage.checkpointTime;
}
CheckpointSignature继承自StorageInfo,父类构造保存当前FSImage的存储信息(layoutVersion,namespaceID,cTime),成员editsTime为当前编辑日志的修改时间,checkpointTime为FSImage的检查点时间。这两个值标识一个检查点。  
实际的操作由FSEditLog的rollEditLog完成,经前面分析,应该是关闭当前编辑日志,然后使用新的编辑日志文件(edits.new)记录。FSEditLog相关见NameNode命名空间镜像和编辑日志1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46synchronized void rollEditLog() throws IOException {
    //如果edits.new已经在某些存储目录中存在了,看是否在所有的存储目录中都存在
    if (existsNew()) {
      Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS);//遍历EDITS类型存储目录
      StringBuilder b = new StringBuilder();
      while (it.hasNext()) {
        File editsNew = getEditNewFile(it.next());
        b.append("\n  ").append(editsNew);
        if (!editsNew.exists()) {//并不是所有的EDITS存储目录都存在edits.new文件,异常
          throw new IOException(
              "Inconsistent existence of edits.new " + editsNew);
        }
      }
      LOG.warn("Cannot roll edit log," + " edits.new files already exists in all healthy directories:" + b);
      return;
    }
    close(); //关闭当前日志文件
    // After edit streams are closed, healthy edits files should be identical,
    // and same to fsimage files
    fsimage.restoreStorageDirs();
    
    
    Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS);//所有的EDITS存储目录
    LinkedList<StorageDirectory> toRemove = new LinkedList<StorageDirectory>();
    while (it.hasNext()) {
      StorageDirectory sd = it.next();
      try {
        //每个EDITS存储目录下创建新的日志文件edits.new,写文件头版本号
        EditLogFileOutputStream eStream = new EditLogFileOutputStream(getEditNewFile(sd));
        eStream.create();
        editStreams.add(eStream);//添加到editStreams中
      } catch (IOException ioe) {
        LOG.error("error retrying to reopen storage directory '" +
            sd.getRoot().getAbsolutePath() + "'", ioe);
        toRemove.add(sd);
        it.remove();
      }
    }
    for (StorageDirectory sd : toRemove) {//移除错误的EDITS存储目录
      removeEditsForStorageDir(sd);
      fsimage.updateRemovedDirs(sd);
    }
    exitIfNoStreams();
}
如上,如果在EDITS存储目录中已经有了edits.new文件,检查是否所有的EDITS存储目录存在edits.new文件,如果都存在说明新的日至文件已经创建在使用了,返回。否则只有部分存在,存储目录不一致抛出异常。
edits.new文件需要创建时,首先通过close关闭当前使用的日至文件edits,close不再贴代码分析。
close方法中如果当前正在进行同步,等待同步完成。然后对管理的所有日志文件输出流EditLogFileOutputStream对象editStreams逐一刷出到底层文件中,然后关闭输出流的两个缓冲区,关闭底层文件通道。最终将editStreams中管理的输出流清空。  
关闭原来的日志文件后,在每个EDITS存储目录下创建新的日志文件,如NameNode命名空间镜像和编辑日志中对日志文件输出流的分析,创建相应日志文件的输出流后,调用create方法往底层文件中写文件头(版本号),完成日志文件的初始化过程,最终将所有的日志文件输出流添加到editStreams中进行管理。
至此,旧的日志文件已经关闭,新的日志文件已经打开,等待SecondaryNameNode下载命名空间镜像和旧的日志文件。
3. SecondaryNameNode
SecondaryNameNode由main方法启动1
2
3
4
5
6
7
8
9
10
11
12public static void main(String[] argv) throws Exception {
    StringUtils.startupShutdownMessage(SecondaryNameNode.class, argv, LOG);
    Configuration tconf = new Configuration();
    if (argv.length >= 1) {
      SecondaryNameNode secondary = new SecondaryNameNode(tconf);
      int ret = secondary.processArgs(argv);
      System.exit(ret);
    }
    Daemon checkpointThread = new Daemon(new SecondaryNameNode(tconf)); 
    checkpointThread.start();
}
如上,可以带参数启动,如-checkpoint,-geteditsize,执行相应功能后,SecondaryNameNode就会退出。
checkpoint会尝试进行一次检查点过程,即先通过getEditLogSize获取日志大小,如果超过阈值或者没有超过阈值但同时指定了force(-checkpoint force),则通过doCheckpoint执行一次检查点。而如果没有超过阈值且没有指定force,则记录日志。
geteditsize简单的在控制台打印当前NameNode日志大小。  
如果不带参数启动则会开启一个线程,定期检查是否进行检查点。
3.1 构造
SecondaryNameNode的构造函数中,通过initialize(conf)初始化,读取配置对基本成员进行设置,开启HTTP服务,创建NamenodeProtocol代理,创建检查点工作需要的CheckpointStorage对象,重要的初始化如下
- HTTPServer - 1 
 2- final InetSocketAddress infoSocAddr = getHttpAddress(conf);//HTTP服务地址 
 infoBindAddress = infoSocAddr.getHostName();- 如上,HTTP地址新的使用配置项 - dfs.secondary.http.address,包含地址和端口,使用- :分隔。旧的使用- dfs.secondary.info.bindAddress配置地址,- dfs.secondary.info.port配置端口。
 后面初始化HTTP服务- 1 
 2
 3
 4
 5- if (SecurityUtil.useKsslAuth()) { 
 initializeKsslWebServer(infoSocAddr);
 } else {
 initializeHttpWebServer(infoSocAddr);
 }- 会初始化成员 - infoServer构造HTTPServer对象,名字为secondary。然后设置属性- secondary.name.node为当前SecondaryNameNode对象,- name.system.image为创建的CheckpointStorage对象,- current.conf为当前使用的配置文件对象。然后创建名为getimage的Servlet,路径为- /getimage,对应的类为- GetImageServlet,启动HTTPServer。GetImageServlet的分析见后面。
- NameNode代理 - 1 
 2- shouldRun = true; 
 nameNodeAddr = NameNode.getServiceAddress(conf, true);//NameNode代理地址- NameNode代理服务地址,优先查找 - dfs.namenode.servicerpc-address配置,如果没有查找- dfs.namenode.rpc-address,还是没有使用- fs.default.name和缺省端口8020作为地址。
- CheckpointStorage - 1 
 2
 3
 4- checkpointDirs = FSImage.getCheckpointDirs(conf, "/tmp/hadoop/dfs/namesecondary");//校验点命名空间镜像目录 
 checkpointEditsDirs = FSImage.getCheckpointEditsDirs(conf, "/tmp/hadoop/dfs/namesecondary");//校验点日志目录
 checkpointImage = new CheckpointStorage();//创建CheckpointStorage对象
 checkpointImage.recoverCreate(checkpointDirs, checkpointEditsDirs);//从本地文件中恢复- 如上,校验点命名空间镜像目录为配置项 - fs.checkpoint.dir,默认- /tmp/hadoop/dfs/namesecondary,校验点日志目录为配置项- fs.checkpoint.edits.dir,默认- /tmp/hadoop/dfs/namesecondary。
 CheckpointStorage继承自FSImage,构造函数中简单调用父类构造函数如下- 1 - CheckpointStorage() throws IOException { super(); } - recoverCreate中,设置CheckpointStorage的存储目录为checkpointDirs和checkpointEditsDirs,创建相应目录然后对每一个存储目录通过Storage的 - analyzeStorage方法分析存储目录当前状态,analyzeStorage见DataNode本地存储管理。如果目录不存在或不可访问抛出异常,如果处于中间状态通过- doRecover恢复到NORMAL状态,doRecover同样见DataNode本地存储管理。recoverCreate具体代码不再贴出来。
- 扫描线程配置 - 1 
 2- checkpointPeriod = conf.getLong("fs.checkpoint.period", 3600);//时间阈值 
 checkpointSize = conf.getLong("fs.checkpoint.size", 4194304);//日志文件阈值- 如上,线程中默认每过5min检查一次日志文件,日志文件超过4MB做一次检查点。而如果日志文件在1h内都没有超过4MB,则也会做一次检查点,即最长1h一次检查点。 
如main函数中,如果不是带参数的启动,则会开启线程
3.2 SecondaryNameNode线程
线程中,通过doWork方法完成工作1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35public void doWork() {
    long period = 5 * 60;              //默认5m检查一次日志文件
    if (checkpointPeriod < period) {
      period = checkpointPeriod;
    }
    while (shouldRun) {
      try {
        Thread.sleep(1000 * period);
      } catch (InterruptedException ie) {
      }
      if (!shouldRun) {
        break;
      }
      try {
        // We may have lost our ticket since last checkpoint, log in again, just in case
        if(UserGroupInformation.isSecurityEnabled())
          UserGroupInformation.getCurrentUser().reloginFromKeytab();
        long now = System.currentTimeMillis();
        long size = namenode.getEditLogSize();
        if (size >= checkpointSize || now >= lastCheckpointTime + 1000 * checkpointPeriod) {
          doCheckpoint();//执行一次检查点
          lastCheckpointTime = now;//更新检查点时间
        }
      } catch (IOException e) {
        LOG.error("Exception in doCheckpoint: ");
        LOG.error(StringUtils.stringifyException(e));
        e.printStackTrace();
      } catch (Throwable e) {
        LOG.error("Throwable Exception in doCheckpoint: ");
        LOG.error(StringUtils.stringifyException(e));
        e.printStackTrace();
        Runtime.getRuntime().exit(-1);
      }
    }
}
如上,默认5min执行一次检查,如果日志文件大小超过阈值checkpointSize(4MB)则执行一次检查点过程,或者没有超过阈值但是从上一次执行检查点过程到现在已经过去了checkpointPeriod s(1h),也要强行执行一次检查点过程。
检查点过程通过doCheckpoint完成,如带参数的启动一样。  
3.3 doCheckpoint
| 1 | void doCheckpoint() throws IOException { | 
如上,分为几个步骤
3.3.1 startCheckpoint
startCheckpoint是检查点的初始化过程1
2
3
4
5
6
7
8private void startCheckpoint() throws IOException {
    checkpointImage.unlockAll();//释放所有存储目录锁
    checkpointImage.getEditLog().close();//关闭当前使用的编辑日志
    //分析所有的存储目录,如果不存在重新创建,如果处于中间状态恢复到NORMAL
    checkpointImage.recoverCreate(checkpointDirs, checkpointEditsDirs);
    //所有存储目录的current重命名为lastcheckpoint.tmp,然后创建新的current目录
    checkpointImage.startCheckpoint();
}
如上,初始化过程由成员CheckpointStorage完成,CheckpointStorage继承自FSImage,因此都是FSImage中的方法。
unlockAll为父类Storage中方法,释放锁对象,关闭in_use.lock文件。
getEditLog.close获取FSEditLog,关闭日志文件输出流,关闭底层缓冲和文件通道,即关闭当前日志文件相关资源。
recoverCreate会分析所有存储目录,如果不存在重新创建,对于处于中间状态的存储目录使用doRecover恢复到NORMAL,另见DataNode本地存储管理。
startCheckpoint对所有存储目录,将current目录重命名为lastcheckpoint.tmp,然后创建新的current目录。  
3.3.2 namenode.rollEditLog
通过NameNode代理调用NameNode的rollEditLog,如前面分析,NameNode关闭当前正在使用的日志文件,使用新的日志文件edits.new记录日志。旧的日志文件和命名空间镜像等待SecondaryNameNode读取。
3.3.3 downloadCheckpointFiles
通过获取到的校验点签名对象CheckpointSignature下载命名空间镜像和日志文件。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32private void downloadCheckpointFiles(final CheckpointSignature sig) throws IOException {
    try {
      UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<Void>() {
        @Override
        public Void run() throws Exception {
          checkpointImage.cTime = sig.cTime;//更新CheckpointStorage的cTime
          checkpointImage.checkpointTime = sig.checkpointTime;//更新CheckpointStorage的checkpointTime
          String fileid = "getimage=1";
          File[] srcNames = checkpointImage.getImageFiles();
          assert srcNames.length > 0 : "No checkpoint targets.";
          //下载fsimage到存储目录
          TransferFsImage.getFileClient(fsName, fileid, srcNames, false);
          LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
                   srcNames[0].length() + " bytes.");
          fileid = "getedit=1";
          srcNames = checkpointImage.getEditsFiles();
          assert srcNames.length > 0 : "No checkpoint targets.";
          //下载edits到存储目录
          TransferFsImage.getFileClient(fsName, fileid, srcNames, false);
          LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
              srcNames[0].length() + " bytes.");
          //更新状态为UPLOAD_DONE
          checkpointImage.checkpointUploadDone();
          return null;
        }
      });
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
}
如上,命名空间镜像和编辑日志通过TransferFsImage的静态方法getFileClient获取。fsName为NameNode地址。获取fsimage时传入的fileid为”getimage=1”,获取edits时传入的fileid为”getedit=1”。srcNames为对应存储目录下的相应文件,fsimage为IMAGE存储目录下的fsimage文件,edits为EDITS存储目录下的edits文件,即为输出文件。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43static MD5Hash getFileClient(String fsName, String id, File[] localPath, boolean getChecksum) throws IOException {
    byte[] buf = new byte[BUFFER_SIZE];
    String str = NameNode.getHttpUriScheme() + "://" + fsName + "/getimage?" + id;
    LOG.info("Opening connection to " + str);
    URL url = new URL(str);
    //打开URL连接
    URLConnection connection = SecurityUtil.openSecureHttpConnection(url);
    InputStream stream = connection.getInputStream();//URL连接输入流
    MessageDigest digester = null;
    if (getChecksum) {//false,不获取校验信息
      digester = MD5Hash.getDigester();
      stream = new DigestInputStream(stream, digester);
    }
    FileOutputStream[] output = null;
    try {
      if (localPath != null) {//输出文件创建输出流
        output = new FileOutputStream[localPath.length];
        for (int i = 0; i < output.length; i++) {
          output[i] = new FileOutputStream(localPath[i]);
        }
      }
      int num = 1;
      while (num > 0) {
        num = stream.read(buf);//URL连接输入流中读取数据
        if (num > 0 && localPath != null) {//写到每一个输出流中
          for (int i = 0; i < output.length; i++) {
            output[i].write(buf, 0, num);
          }
        }
      }
    } finally {//关闭打开的流
      stream.close();
      if (output != null) {
        for (int i = 0; i < output.length; i++) {
          if (output[i] != null) {
            output[i].close();
          }
        }
      }
    }
    return digester == null ? null : new MD5Hash(digester.digest());
}
如上,通过创建对应URL的URLConnection来获取数据,根据传入参数,对于fsimage其URL为1
http://<名字节点地址>:<名字节点HTTP端口>/getimage?getimage=1
而edits的URL为1
http://<名字节点地址>:<名字节点HTTP端口>/getimage?getedit=1
打开对应URLConnection的输入流读取数据写到每一个输出文件中,而在NameNode端由HTTPServer提供服务,传输对应的数据,具体见后分析
3.3.4 doMerge
下载了fsimage和edits后,进行合并操作1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19private void doMerge(CheckpointSignature sig) throws IOException {
    getEditLog().open();//重新打开最新下载的edits文件
    StorageDirectory sdName = null;
    StorageDirectory sdEdits = null;
    Iterator<StorageDirectory> it = null;
    it = dirIterator(NameNodeDirType.IMAGE);
    if (it.hasNext())
        sdName = it.next();//获取其中一个IMAGE存储目录
    it = dirIterator(NameNodeDirType.EDITS);
    if (it.hasNext())//获取其中一个EDITS存储目录
        sdEdits = it.next();
    if ((sdName == null) || (sdEdits == null))
        throw new IOException("Could not locate checkpoint directories");
    loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE));//加载fsimage
    loadFSEdits(sdEdits, null);//加载edits和可能的edits.new,并更新整个目录树的配额
    sig.validateStorageInfo(this);//验证校验点签名是否和当前CheckpointStorage相关信息匹配
    //保存内存中元数据到存储目录
    saveNamespace(false);
}
如上,合并时从IMAGE存储目录中取出一个获得fsimage文件,从EDITS存储目录中取出一个获得edits文件,因为所有的存储目录都是一样的(从NameNode获取时写到所有存储目录中)。
然后通过FSImage的loadFSImage读取fsimage文件,这个在NameNode命名空间镜像和编辑日志已经分析过了。通过FSImage的loadFSEdits读取edits文件1
2
3
4
5
6
7
8
9
10
11
12
13
14
15int loadFSEdits(StorageDirectory sd, MetaRecoveryContext recovery) throws IOException {
    int numEdits = 0;
    EditLogFileInputStream edits = new EditLogFileInputStream(getImageFile(sd, NameNodeFile.EDITS));
    numEdits = FSEditLog.loadFSEdits(edits, editsTolerationLength, recovery);//加载edits文件
    edits.close();
    File editsNew = getImageFile(sd, NameNodeFile.EDITS_NEW);
    if (editsNew.exists() && editsNew.length() > 0) {
      edits = new EditLogFileInputStream(editsNew);
      numEdits += FSEditLog.loadFSEdits(edits, editsTolerationLength, recovery);//如果存在edits.new,加载edits.new文件
      edits.close();
    }
    // update the counts.
    FSNamesystem.getFSNamesystem().dir.updateCountForINodeWithQuota();//更新整个目录树的配额
    return numEdits;
}
如上,在NameNode命名空间镜像和编辑日志中已经分析过部分FSEditLog的loadFSEdits方法读取编辑日志,更新内存元数据。在FSImage的loadFSEdits方法中,通过FSEditLog的loadFSEdits读取最新下载到的edits文件,然后如果还存在edits.new文件(SecondaryNameNode一般不会存在)继续加载edits.new中的日志,加载完后内存中元数据已经为最新的了,然后遍历整个目录树,更新整个目录树的目录配额。
然后回到doMerge方法,加载完fsimage和日志文件后,通过validateStorageInfo验证SecondaryNameNode和校验点签名对象CheckpointSignature的layoutVersion,namespaceID,cTime是否一致,即获取的是否为指定的校验点,不一致抛出异常。  
最终通过saveNamespace将内存元数据保存到存储目录中。saveNamespace会关闭当前日志文件,然后根据传入的参数更新checkpointTime(CheckpointStorage),这里传入的为false,不更新检查点时间。
然后将所有存储目录的current重命名为lastcheckpoint.tmp,即保存当前信息。创建新的current目录,通过saveFSImage将内存元数据保存到新的current目录中,即创建新的检查点,saveFSImage在NameNode命名空间镜像和编辑日志分析过,并创建新的edits文件(只有文件头版本信息)。最终将lastcheckpoint.tmp重命名为previous.checkpoint完成一次检查点的更新过程。
最终重新打开新的edits文件,更新CheckpointStates为UPLOAD_DONE。完成合并过程。  
3.3.5 putFSImage
合并后得通知NameNode获取最新的fsimage,通过putFSImage完成1
2
3
4
5
6
7
8private void putFSImage(CheckpointSignature sig) throws IOException {
    String fileid = "putimage=1&port=" + imagePort +
      "&machine=" + infoBindAddress +
      "&token=" + sig.toString() +
      "&newChecksum=" + getNewChecksum();
    LOG.info("Posted URL " + fsName + fileid);
    TransferFsImage.getFileClient(fsName, fileid, (File[])null, false);
}
如上,建立URLConnection,URL为1
http://<名字节点地址>:<名字节点HTTP端口>/getimage?putimage=1&port=<第二名字节点HTTP端口>&machine=<第二名字节点地址>&token=<CheckpointSignature信息>&newChecksum=<最新fsimage的MD5哈希值>
其实还是通过向NameNode的HTTPServer发送get请求,不过如上getFileClient第三个参数为null,即实际不读取数据到SecondaryNameNode本地目录中。起到一个通知的作用,NameNode在收到该get请求后会从SecondaryNameNode的HTTPServer中下载最新的fsimage文件。见后分析。
3.3.6 namenode.rollFsImage
通知NameNode下载最新fsimage后,通过NameNode代理通知开始NameNode使用新的fsimage代替原来的命名空间镜像,NameNode的rollFsImage方法实现见后分析。
3.3.7 endCheckpoint
最终的endCheckpoint做最后的清理工作1
2
3
4
5void endCheckpoint() throws IOException {
    for(StorageDirectory sd : storageDirs) {
        moveLastCheckpoint(sd);
    }
}
如上,所有的存储目录通过moveLastCheckpoint完成清理工作。存储目录中如果存在previous.checkpoint即上一个检查点目录则删除,然后将lastcheckpoint.tmp重命名为previous.checkpoint(其实在前面saveNamespace时已经重命名了),完成清理工作
4. NameNode的HTTP服务
如前面分析,SecondaryNameNode下载fsimage和edits文件时,通过建立URLConnection获取数据,对应为HTTP的get方法,在NameNode中对应由HTTPServer提供服务。
因为还没分析NameNode的启动过程,这里单独分析HTTPServer的启动。在NameNode的初始化过程中,由startHttpServer启动HTTP服务器。
与SecondaryNameNode的HTTP服务类似,这里主要有以下几点
- 地址 - 1 - final String infoAddr = NetUtils.getServerAddress(conf, "dfs.info.bindAddress", "dfs.info.port", "dfs.http.address"); - 如上,旧的HTTP地址使用 - dfs.info.bindAddress作为地址,- dfs.info.port作为端口。新的HTTP地址由- dfs.http.address配置,包括端口。创建的HTTPServer对象为NameNode的成员- httpServer
- 属性
| 属性名称 | 对应值 | 
|---|---|
| name.node | 对应的NameNode对象 | 
| name.node.address | NameNode地址为InetSocketAddress | 
| name.system.image | NameNode中的FSImage对象 | 
| current.conf | NameNode和HTTPServer的配置 | 
- Servlet
| 名字 | 路径 | 对应类 | 是否要鉴权 | 
|---|---|---|---|
| getDelegationToken | /getDelegationToken | GetDelegationTokenServlet | true | 
| renewDelegationToken | /renewDelegationToken | RenewDelegationTokenServlet | true | 
| cancelDelegationToken | /cancelDelegationToken | CancelDelegationTokenServlet | true | 
| fsck | /fsck | FsckServlet | true | 
| getimage | /getimage | GetImageServlet | true | 
| listPaths | /listPaths/* | ListPathsServlet | false | 
| data | /data/* | FileDataServlet | false | 
| checksum | /fileChecksum/* | FileChecksumServlets.RedirectServlet | false | 
| contentSummary | /contentSummary/* | ContentSummaryServlet | false | 
如上,SecondaryNameNode通过getimage获取fsimage和edits,对应的类GetImageServlet,其doGet方法如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44public void doGet(final HttpServletRequest request, final HttpServletResponse response
                    ) throws ServletException, IOException {
    Map<String,String[]> pmap = request.getParameterMap();//获取get请求的参数
    try {
      ServletContext context = getServletContext();//ServletContext
      final FSImage nnImage = (FSImage)context.getAttribute("name.system.image");//属性,NameNode中的FSImage
      //创建TransferFsImage对象,进行相应的数据读取或发送操作
      final TransferFsImage ff = new TransferFsImage(pmap, request, response);
      final Configuration conf = (Configuration)getServletContext().getAttribute(JspHelper.CURRENT_CONF);//读取属性配置
      ...//鉴权
            
      UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<Void>() {
        @Override
        public Void run() throws Exception {
          if (ff.getImage()) {//发送fsimage
            TransferFsImage.getFileServer(response.getOutputStream(), nnImage.getFsImageName(), getThrottler(conf)); 
          } else if (ff.getEdit()) {//发送edits
            TransferFsImage.getFileServer(response.getOutputStream(), nnImage.getFsEditName(), getThrottler(conf));
          } else if (ff.putImage()) {//读取新的fsimage
            synchronized (fsImageTransferLock) {
              final MD5Hash expectedChecksum = ff.getNewChecksum();
              nnImage.validateCheckpointUpload(ff.getToken());
              reloginIfNecessary().doAs(new PrivilegedExceptionAction<Void>() {
                @Override
                public Void run() throws Exception {
                  MD5Hash actualChecksum = TransferFsImage.getFileClient(ff.getInfoServer(),
                      "getimage=1", nnImage.getFsImageNameCheckpoint(), true);
                  LOG.info("Downloaded new fsimage with checksum: " + actualChecksum);
                  if (!actualChecksum.equals(expectedChecksum)) {
                    throw new IOException("Actual checksum of transferred fsimage: "
                        + actualChecksum + " does not match expected checksum: "
                        + expectedChecksum);
                  }
                  return null;
                }
              });
              nnImage.checkpointUploadDone();
            }
          }
          return null;
        }
        ...
}
如上,通过请求的参数创建TransferFsImage对象,由参数判断执行的操作1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31public TransferFsImage(Map<String,String[]> pmap, HttpServletRequest request, HttpServletResponse response
                         ) throws IOException {
    isGetImage = isGetEdit = isPutImage = false;
    remoteport = 0;
    machineName = null;
    token = null;
    newChecksum = null;
    for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
      String key = it.next();
      if (key.equals("getimage")) { //请求读取fsimage
        isGetImage = true;
      } else if (key.equals("getedit")) { //请求读取edits
        isGetEdit = true;
      } else if (key.equals("putimage")) { //表示fsimage合并完,通知获取新的fsimage
        isPutImage = true;
      } else if (key.equals("port")) { //SecondaryNameNode端口
        remoteport = new Integer(pmap.get("port")[0]).intValue();
      } else if (key.equals("machine")) { //SecondaryNameNode地址
        machineName = pmap.get("machine")[0];
      } else if (key.equals("token")) { //访问SecondaryNameNode口令
        token = new CheckpointSignature(pmap.get("token")[0]);
      } else if (key.equals("newChecksum")) { //新fsimage的MD5哈希值
        newChecksum = new MD5Hash(pmap.get("newChecksum")[0]);
      }
    }
    int numGets = (isGetImage?1:0) + (isGetEdit?1:0);
    if ((numGets > 1) || (numGets == 0) && !isPutImage) {
      throw new IOException("Illegal parameters to TransferFsImage");
    }
}
如上,解析SecondaryNameNode的请求,由前面分析,SecondaryNameNode可能有获取fsimage,获取edits请求,也可能在fsimage合并完后发送putimage请求来表示合并完成,通知NameNode下载新的fsimage,因此分别进行标识。对于putimage还要读取SecondaryNameNode的相应信息。
然后回到GetImageServlet的doGet方法,对不同的请求进行不同操作
4.1 SecondaryNameNode获取fsimage和edits
在doGet中,如果构造的TransferFsImage解析到请求为getimage或者getedit,则通过TransferFsImage的静态方法getFileServer发送请求,对getimage发送fsimage文件,getedit发送edits文件。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24static void getFileServer(OutputStream outstream, File localfile, DataTransferThrottler throttler) 
    throws IOException {
    byte buf[] = new byte[BUFFER_SIZE];
    FileInputStream infile = null;
    try {
      infile = new FileInputStream(localfile);
      ...
      int num = 1;
      while (num > 0) {//读取文件,循环发送到输出流
        num = infile.read(buf);
        if (num <= 0) {
          break;
        }
        outstream.write(buf, 0, num);
        if (throttler != null) {
          throttler.throttle(num);
        }
      }
    } finally {
      if (infile != null) {
        infile.close();
      }
    }
}
如上,循环读取相应的文件发送到response的输出流中。
4.2 SecondaryNameNode通知获取新的fsimage
如doGet方法中,如果是putimage请求,则应该从SecondaryNameNode中获取新的fsimage,SecondaryNameNode相关信息从请求的参数中获取,通过TransferFsImage的静态方法getFileClient向SecondaryNameNode发送get请求1
MD5Hash actualChecksum = TransferFsImage.getFileClient(ff.getInfoServer(), "getimage=1", nnImage.getFsImageNameCheckpoint(), true);
getFileClient前面分析过,URL为1
http://<第二名字节点地址>:<第二名字节点HTTP端口>/getimage?getimage=1
由前面SecondaryNameNode的HTTPServer可知,/getimage对应的Servlet也是GetImageServlet类,当NameNode发送getimage=1的请求时,如上分析SecondaryNameNode会发送fsimage文件。
NameNode获取新的fsimage文件写到fsimage.ckpt中(getFsImageNameCheckpoint),最终通过checkpointUploadDone将CheckpointStates更新为UPLOAD_DONE。  
之后接收到SecondaryNameNode的rollFsImage调用,实现如下
5. rollFsImage
| 1 | public void rollFsImage() throws IOException { | 
如上,由FSNamesystem提供服务,最终由FSNamesystem的FSImage的rollFSImage方法完成1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60void rollFSImage() throws IOException {
    if (ckptState != CheckpointStates.UPLOAD_DONE) {
      throw new IOException("Cannot roll fsImage before rolling edits log.");
    }
    //首先检查在所有存储目录下有edits.new和fsimage.ckpt
    if (!editLog.existsNew()) {//edits.new不存在
      throw new IOException("New Edits file does not exist");
    }
    Iterator<StorageDirectory> it = dirIterator(NameNodeDirType.IMAGE);
    while (it.hasNext()) {//保证所有存储目录下存在fsimage.ckpt
      StorageDirectory sd = it.next();
      File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW);
      if (!ckpt.exists()) {
        throw new IOException("Checkpoint file " + ckpt + " does not exist");
      }
    }
    editLog.purgeEditLog(); //重命名edits.new为edits
    //重命名fsimage.ckpt为fsimage
    it = dirIterator(NameNodeDirType.IMAGE);
    while (it.hasNext()) {
      StorageDirectory sd = it.next();
      File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW);
      File curFile = getImageFile(sd, NameNodeFile.IMAGE);
      if (!ckpt.renameTo(curFile)) {//fsimage存在可能在Windows下重命名失败
        curFile.delete();//先删除fsimage
        if (!ckpt.renameTo(curFile)) {//重命名
          editLog.removeEditsForStorageDir(sd);
          updateRemovedDirs(sd);
          it.remove();
        }
      }
    }
    editLog.exitIfNoStreams();
    //在所有存储目录下更新fstime文件,写版本文件
    this.layoutVersion = FSConstants.LAYOUT_VERSION;
    this.checkpointTime = FSNamesystem.now();//更新检查点时间
    it = dirIterator();
    while (it.hasNext()) {
      StorageDirectory sd = it.next();
      //如果存储目录只有镜像文件(不存储日志文件,类型为IMAGE),删除旧的日志文件
      if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
        File editsFile = getImageFile(sd, NameNodeFile.EDITS);
        editsFile.delete();
      }
      //如果存储目录只存储日志文件(不存储镜像文件,类型为EDITS),删除旧的镜像文件
      if (!sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
        File imageFile = getImageFile(sd, NameNodeFile.IMAGE);
        imageFile.delete();
      }
      try {
        sd.write();//写新的版本文件
      } catch (IOException ioe) {
        editLog.removeEditsForStorageDir(sd);
        updateRemovedDirs(sd, ioe);
        it.remove();
      }
    }
    ckptState = FSImage.CheckpointStates.START;//检查点完成,重置CheckpointStates状态为初始状态STRAT
}
如上,首先得确保已经下载了最新的fsimage文件,即处于UPLOAD_DONE状态。且保证所有存储目录存在edits.new和fsimage.ckpt。
然后将edits.new重命名为edits,fsimage.ckpt重命名为fsimage,更新命名空间镜像和日志文件。
之后遍历存储目录,对于EDITS类型的存储目录,删除不应该存在的旧的fsimage文件,对于IMAGE类型的存储目录,删除不应该存在的edits文件,而IMAGE_AND_EDITS类型的存储目录可以同时存在fsimage和edits,并对每一个存储目录根据当前内存中版本信息重新写版本文件。
最终将FSImage的CheckpointStates成员ckptState重置为初始状态START。
到这里完成了一个检查点过程,fsimage为合并过后的最新的命名空间镜像,edits也从原来的edits.new中恢复回来了,继续记录日志。