NameNode实现源码分析---和SecondaryNameNode的交互

Hadoop版本:Hadoop-1.2.1
参考:《Hadoop技术内幕-深入解析Hadoop Common和HDFS架构设计与实现原理》


HDFS节点的VersionedProtocol实现中对NamenodeProtocol的分析,SecondaryNameNode周期性通过getEditLogSize获取NameNode编辑日志大小,如果超出某个值,则通过rollEditLog通知NameNode开始一次检查点过程,NameNode关闭当前编辑日志,使用新的编辑日志edits.new继续记录日志。然后SecondaryNameNode通过HTTP服务获取NameNode上的命名空间镜像和编辑日志,进行合并操作。合并后通过HTTP服务通知NameNode下载新的命名空间镜像,然后通过rollFsImage通知NameNode使用新的检查点。

本文将按照以上流程对NameNode和SecondaryNameNode之间的交互进行分析。


1. getEditLogSize

NameNode的getEditLogSize实现如下

1
2
3
4
5
6
7
public long getEditLogSize() throws IOException {
return namesystem.getEditLogSize();
}

long getEditLogSize() throws IOException {
return getEditLog().getEditLogSize();
}

如上,还是由FSNamesystem提供服务,通过FSNamesystem获得FSDirectory对象,由FSDirectory获得FSImage对象,FSImage对象最终获得FSEditLog对象,由FSEditLog对象的getEditLogSize返回当前编辑日志的大小

1
2
3
4
5
6
7
8
9
10
synchronized long getEditLogSize() throws IOException {
assert(getNumStorageDirs() == editStreams.size());
long size = 0;
for (int idx = 0; idx < editStreams.size(); idx++) {
long curSize = editStreams.get(idx).length();
assert (size == 0 || size == curSize) : "All streams must be the same";
size = curSize;
}
return size;
}

如上,返回对应的编辑日志文件大小。所有的日志输出流EditLogFileOutputStream对应文件长度应该是一样的,因为每次都是往所有的输出流写数据(日志记录见NameNode命名空间镜像和编辑日志)。


2. rollEditLog

SecondaryNameNode获取NameNode编辑日志大小后,判断是否大于某一值,如果大于再通过rollEditLog开始一个新的检查点。SecondaryNameNode的中间过程后面分析,这里假设编辑日志超过了阈值,通过NamenodeProtocol代理调用rollEditLog,NameNode上rollEditLog实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public CheckpointSignature rollEditLog() throws IOException {
return namesystem.rollEditLog();
}

synchronized CheckpointSignature rollEditLog() throws IOException {
checkSuperuserPrivilege();//超级用户权限
synchronized (this) {
if (isInSafeMode()) {
throw new SafeModeException("Log not rolled", safeMode);
}
LOG.info("Roll Edit Log from " + Server.getRemoteAddress());
return getFSImage().rollEditLog();
}
}

如上,还是通过FSNamesystem提供服务,rollEditLog要有超级用户权限,否则会抛出异常。在安全模式下时也不能开始检查点过程。
由FSNamesystem获取到FSImage对象后,FSImage对象调用rollEditLog完成

1
2
3
4
5
CheckpointSignature rollEditLog() throws IOException {
getEditLog().rollEditLog();
ckptState = CheckpointStates.ROLLED_EDITS;
return new CheckpointSignature(this);
}

最终通过FSImage管理的FSEditLog对象的rollEditLog方法完成相应的操作,FSImage对象的成员ckptState状态改为ROLLED_EDITS,ckptState为CheckpointStates枚举

1
enum CheckpointStates{START, ROLLED_EDITS, UPLOAD_START, UPLOAD_DONE; }

最开始为START。
更改状态后创建CheckpointSignature对象返回给SecondaryNameNode,作为一次检查点的标识

1
2
3
4
5
CheckpointSignature(FSImage fsImage) {
super(fsImage);
editsTime = fsImage.getEditLog().getFsEditTime();
checkpointTime = fsImage.checkpointTime;
}

CheckpointSignature继承自StorageInfo,父类构造保存当前FSImage的存储信息(layoutVersion,namespaceID,cTime),成员editsTime为当前编辑日志的修改时间,checkpointTime为FSImage的检查点时间。这两个值标识一个检查点。

实际的操作由FSEditLog的rollEditLog完成,经前面分析,应该是关闭当前编辑日志,然后使用新的编辑日志文件(edits.new)记录。FSEditLog相关见NameNode命名空间镜像和编辑日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
synchronized void rollEditLog() throws IOException {
//如果edits.new已经在某些存储目录中存在了,看是否在所有的存储目录中都存在
if (existsNew()) {
Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS);//遍历EDITS类型存储目录
StringBuilder b = new StringBuilder();
while (it.hasNext()) {
File editsNew = getEditNewFile(it.next());
b.append("\n ").append(editsNew);
if (!editsNew.exists()) {//并不是所有的EDITS存储目录都存在edits.new文件,异常
throw new IOException(
"Inconsistent existence of edits.new " + editsNew);
}
}
LOG.warn("Cannot roll edit log," + " edits.new files already exists in all healthy directories:" + b);
return;
}
close(); //关闭当前日志文件

// After edit streams are closed, healthy edits files should be identical,
// and same to fsimage files
fsimage.restoreStorageDirs();


Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS);//所有的EDITS存储目录
LinkedList<StorageDirectory> toRemove = new LinkedList<StorageDirectory>();
while (it.hasNext()) {
StorageDirectory sd = it.next();
try {
//每个EDITS存储目录下创建新的日志文件edits.new,写文件头版本号
EditLogFileOutputStream eStream = new EditLogFileOutputStream(getEditNewFile(sd));
eStream.create();
editStreams.add(eStream);//添加到editStreams中
} catch (IOException ioe) {
LOG.error("error retrying to reopen storage directory '" +
sd.getRoot().getAbsolutePath() + "'", ioe);
toRemove.add(sd);
it.remove();
}
}

for (StorageDirectory sd : toRemove) {//移除错误的EDITS存储目录
removeEditsForStorageDir(sd);
fsimage.updateRemovedDirs(sd);
}
exitIfNoStreams();
}

如上,如果在EDITS存储目录中已经有了edits.new文件,检查是否所有的EDITS存储目录存在edits.new文件,如果都存在说明新的日至文件已经创建在使用了,返回。否则只有部分存在,存储目录不一致抛出异常。
edits.new文件需要创建时,首先通过close关闭当前使用的日至文件edits,close不再贴代码分析。
close方法中如果当前正在进行同步,等待同步完成。然后对管理的所有日志文件输出流EditLogFileOutputStream对象editStreams逐一刷出到底层文件中,然后关闭输出流的两个缓冲区,关闭底层文件通道。最终将editStreams中管理的输出流清空。

关闭原来的日志文件后,在每个EDITS存储目录下创建新的日志文件,如NameNode命名空间镜像和编辑日志中对日志文件输出流的分析,创建相应日志文件的输出流后,调用create方法往底层文件中写文件头(版本号),完成日志文件的初始化过程,最终将所有的日志文件输出流添加到editStreams中进行管理。

至此,旧的日志文件已经关闭,新的日志文件已经打开,等待SecondaryNameNode下载命名空间镜像和旧的日志文件。


3. SecondaryNameNode

SecondaryNameNode由main方法启动

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] argv) throws Exception {
StringUtils.startupShutdownMessage(SecondaryNameNode.class, argv, LOG);
Configuration tconf = new Configuration();
if (argv.length >= 1) {
SecondaryNameNode secondary = new SecondaryNameNode(tconf);
int ret = secondary.processArgs(argv);
System.exit(ret);
}

Daemon checkpointThread = new Daemon(new SecondaryNameNode(tconf));
checkpointThread.start();
}

如上,可以带参数启动,如-checkpoint-geteditsize,执行相应功能后,SecondaryNameNode就会退出。
checkpoint会尝试进行一次检查点过程,即先通过getEditLogSize获取日志大小,如果超过阈值或者没有超过阈值但同时指定了force(-checkpoint force),则通过doCheckpoint执行一次检查点。而如果没有超过阈值且没有指定force,则记录日志。
geteditsize简单的在控制台打印当前NameNode日志大小。

如果不带参数启动则会开启一个线程,定期检查是否进行检查点。

3.1 构造

SecondaryNameNode的构造函数中,通过initialize(conf)初始化,读取配置对基本成员进行设置,开启HTTP服务,创建NamenodeProtocol代理,创建检查点工作需要的CheckpointStorage对象,重要的初始化如下

  • HTTPServer

    1
    2
    final InetSocketAddress infoSocAddr = getHttpAddress(conf);//HTTP服务地址
    infoBindAddress = infoSocAddr.getHostName();

    如上,HTTP地址新的使用配置项dfs.secondary.http.address,包含地址和端口,使用:分隔。旧的使用dfs.secondary.info.bindAddress配置地址,dfs.secondary.info.port配置端口。
    后面初始化HTTP服务

    1
    2
    3
    4
    5
    if (SecurityUtil.useKsslAuth()) {
    initializeKsslWebServer(infoSocAddr);
    } else {
    initializeHttpWebServer(infoSocAddr);
    }

    会初始化成员infoServer构造HTTPServer对象,名字为secondary。然后设置属性secondary.name.node为当前SecondaryNameNode对象,name.system.image为创建的CheckpointStorage对象,current.conf为当前使用的配置文件对象。然后创建名为getimage的Servlet,路径为/getimage,对应的类为GetImageServlet,启动HTTPServer。GetImageServlet的分析见后面。

  • NameNode代理

    1
    2
    shouldRun = true;
    nameNodeAddr = NameNode.getServiceAddress(conf, true);//NameNode代理地址

    NameNode代理服务地址,优先查找dfs.namenode.servicerpc-address配置,如果没有查找dfs.namenode.rpc-address,还是没有使用fs.default.name和缺省端口8020作为地址。

  • CheckpointStorage

    1
    2
    3
    4
    checkpointDirs = FSImage.getCheckpointDirs(conf, "/tmp/hadoop/dfs/namesecondary");//校验点命名空间镜像目录
    checkpointEditsDirs = FSImage.getCheckpointEditsDirs(conf, "/tmp/hadoop/dfs/namesecondary");//校验点日志目录
    checkpointImage = new CheckpointStorage();//创建CheckpointStorage对象
    checkpointImage.recoverCreate(checkpointDirs, checkpointEditsDirs);//从本地文件中恢复

    如上,校验点命名空间镜像目录为配置项fs.checkpoint.dir,默认/tmp/hadoop/dfs/namesecondary,校验点日志目录为配置项fs.checkpoint.edits.dir,默认/tmp/hadoop/dfs/namesecondary
    CheckpointStorage继承自FSImage,构造函数中简单调用父类构造函数如下

    1
    CheckpointStorage() throws IOException { super(); }

    recoverCreate中,设置CheckpointStorage的存储目录为checkpointDirs和checkpointEditsDirs,创建相应目录然后对每一个存储目录通过Storage的analyzeStorage方法分析存储目录当前状态,analyzeStorage见DataNode本地存储管理。如果目录不存在或不可访问抛出异常,如果处于中间状态通过doRecover恢复到NORMAL状态,doRecover同样见DataNode本地存储管理。recoverCreate具体代码不再贴出来。

  • 扫描线程配置

    1
    2
    checkpointPeriod = conf.getLong("fs.checkpoint.period", 3600);//时间阈值
    checkpointSize = conf.getLong("fs.checkpoint.size", 4194304);//日志文件阈值

    如上,线程中默认每过5min检查一次日志文件,日志文件超过4MB做一次检查点。而如果日志文件在1h内都没有超过4MB,则也会做一次检查点,即最长1h一次检查点。

如main函数中,如果不是带参数的启动,则会开启线程

3.2 SecondaryNameNode线程

线程中,通过doWork方法完成工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void doWork() {
long period = 5 * 60; //默认5m检查一次日志文件
if (checkpointPeriod < period) {
period = checkpointPeriod;
}
while (shouldRun) {
try {
Thread.sleep(1000 * period);
} catch (InterruptedException ie) {
}
if (!shouldRun) {
break;
}
try {
// We may have lost our ticket since last checkpoint, log in again, just in case
if(UserGroupInformation.isSecurityEnabled())
UserGroupInformation.getCurrentUser().reloginFromKeytab();
long now = System.currentTimeMillis();
long size = namenode.getEditLogSize();
if (size >= checkpointSize || now >= lastCheckpointTime + 1000 * checkpointPeriod) {
doCheckpoint();//执行一次检查点
lastCheckpointTime = now;//更新检查点时间
}
} catch (IOException e) {
LOG.error("Exception in doCheckpoint: ");
LOG.error(StringUtils.stringifyException(e));
e.printStackTrace();
} catch (Throwable e) {
LOG.error("Throwable Exception in doCheckpoint: ");
LOG.error(StringUtils.stringifyException(e));
e.printStackTrace();
Runtime.getRuntime().exit(-1);
}
}
}

如上,默认5min执行一次检查,如果日志文件大小超过阈值checkpointSize(4MB)则执行一次检查点过程,或者没有超过阈值但是从上一次执行检查点过程到现在已经过去了checkpointPeriod s(1h),也要强行执行一次检查点过程。
检查点过程通过doCheckpoint完成,如带参数的启动一样。

3.3 doCheckpoint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void doCheckpoint() throws IOException {

//初始化
startCheckpoint();

//通知NameNode开始校验点过程,关闭当前日志文件,使用新的日志文件
//返回的校验点签名能够验证下载的命名空间镜像文件
CheckpointSignature sig = (CheckpointSignature)namenode.rollEditLog();

if (ErrorSimulator.getErrorSimulation(0)) {//单元测试
throw new IOException("Simulating error0 " + "after creating edits.new");
}

downloadCheckpointFiles(sig); //下载命名空间镜像和日志文件
doMerge(sig); //合并

//通知NameNode下载新的命名空间镜像
putFSImage(sig);

if (ErrorSimulator.getErrorSimulation(1)) {
throw new IOException("Simulating error1 " +
"after uploading new image to NameNode");
}

//通知NameNode使用新的命名空间镜像代替原来的
namenode.rollFsImage();
checkpointImage.endCheckpoint();//结束工作

LOG.info("Checkpoint done. New Image Size: " + checkpointImage.getFsImageName().length());
}

如上,分为几个步骤

3.3.1 startCheckpoint

startCheckpoint是检查点的初始化过程

1
2
3
4
5
6
7
8
private void startCheckpoint() throws IOException {
checkpointImage.unlockAll();//释放所有存储目录锁
checkpointImage.getEditLog().close();//关闭当前使用的编辑日志
//分析所有的存储目录,如果不存在重新创建,如果处于中间状态恢复到NORMAL
checkpointImage.recoverCreate(checkpointDirs, checkpointEditsDirs);
//所有存储目录的current重命名为lastcheckpoint.tmp,然后创建新的current目录
checkpointImage.startCheckpoint();
}

如上,初始化过程由成员CheckpointStorage完成,CheckpointStorage继承自FSImage,因此都是FSImage中的方法。
unlockAll为父类Storage中方法,释放锁对象,关闭in_use.lock文件。
getEditLog.close获取FSEditLog,关闭日志文件输出流,关闭底层缓冲和文件通道,即关闭当前日志文件相关资源。
recoverCreate会分析所有存储目录,如果不存在重新创建,对于处于中间状态的存储目录使用doRecover恢复到NORMAL,另见DataNode本地存储管理
startCheckpoint对所有存储目录,将current目录重命名为lastcheckpoint.tmp,然后创建新的current目录。

3.3.2 namenode.rollEditLog

通过NameNode代理调用NameNode的rollEditLog,如前面分析,NameNode关闭当前正在使用的日志文件,使用新的日志文件edits.new记录日志。旧的日志文件和命名空间镜像等待SecondaryNameNode读取。

3.3.3 downloadCheckpointFiles

通过获取到的校验点签名对象CheckpointSignature下载命名空间镜像和日志文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void downloadCheckpointFiles(final CheckpointSignature sig) throws IOException {
try {
UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
checkpointImage.cTime = sig.cTime;//更新CheckpointStorage的cTime
checkpointImage.checkpointTime = sig.checkpointTime;//更新CheckpointStorage的checkpointTime

String fileid = "getimage=1";
File[] srcNames = checkpointImage.getImageFiles();
assert srcNames.length > 0 : "No checkpoint targets.";
//下载fsimage到存储目录
TransferFsImage.getFileClient(fsName, fileid, srcNames, false);
LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
srcNames[0].length() + " bytes.");

fileid = "getedit=1";
srcNames = checkpointImage.getEditsFiles();
assert srcNames.length > 0 : "No checkpoint targets.";
//下载edits到存储目录
TransferFsImage.getFileClient(fsName, fileid, srcNames, false);
LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
srcNames[0].length() + " bytes.");
//更新状态为UPLOAD_DONE
checkpointImage.checkpointUploadDone();
return null;
}
});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

如上,命名空间镜像和编辑日志通过TransferFsImage的静态方法getFileClient获取。fsName为NameNode地址。获取fsimage时传入的fileid为”getimage=1”,获取edits时传入的fileid为”getedit=1”。srcNames为对应存储目录下的相应文件,fsimage为IMAGE存储目录下的fsimage文件,edits为EDITS存储目录下的edits文件,即为输出文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static MD5Hash getFileClient(String fsName, String id, File[] localPath, boolean getChecksum) throws IOException {
byte[] buf = new byte[BUFFER_SIZE];
String str = NameNode.getHttpUriScheme() + "://" + fsName + "/getimage?" + id;
LOG.info("Opening connection to " + str);
URL url = new URL(str);
//打开URL连接
URLConnection connection = SecurityUtil.openSecureHttpConnection(url);
InputStream stream = connection.getInputStream();//URL连接输入流
MessageDigest digester = null;
if (getChecksum) {//false,不获取校验信息
digester = MD5Hash.getDigester();
stream = new DigestInputStream(stream, digester);
}
FileOutputStream[] output = null;

try {
if (localPath != null) {//输出文件创建输出流
output = new FileOutputStream[localPath.length];
for (int i = 0; i < output.length; i++) {
output[i] = new FileOutputStream(localPath[i]);
}
}
int num = 1;
while (num > 0) {
num = stream.read(buf);//URL连接输入流中读取数据
if (num > 0 && localPath != null) {//写到每一个输出流中
for (int i = 0; i < output.length; i++) {
output[i].write(buf, 0, num);
}
}
}
} finally {//关闭打开的流
stream.close();
if (output != null) {
for (int i = 0; i < output.length; i++) {
if (output[i] != null) {
output[i].close();
}
}
}
}
return digester == null ? null : new MD5Hash(digester.digest());
}

如上,通过创建对应URL的URLConnection来获取数据,根据传入参数,对于fsimage其URL为

1
http://<名字节点地址>:<名字节点HTTP端口>/getimage?getimage=1

而edits的URL为

1
http://<名字节点地址>:<名字节点HTTP端口>/getimage?getedit=1

打开对应URLConnection的输入流读取数据写到每一个输出文件中,而在NameNode端由HTTPServer提供服务,传输对应的数据,具体见后分析

3.3.4 doMerge

下载了fsimage和edits后,进行合并操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void doMerge(CheckpointSignature sig) throws IOException {
getEditLog().open();//重新打开最新下载的edits文件
StorageDirectory sdName = null;
StorageDirectory sdEdits = null;
Iterator<StorageDirectory> it = null;
it = dirIterator(NameNodeDirType.IMAGE);
if (it.hasNext())
sdName = it.next();//获取其中一个IMAGE存储目录
it = dirIterator(NameNodeDirType.EDITS);
if (it.hasNext())//获取其中一个EDITS存储目录
sdEdits = it.next();
if ((sdName == null) || (sdEdits == null))
throw new IOException("Could not locate checkpoint directories");
loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE));//加载fsimage
loadFSEdits(sdEdits, null);//加载edits和可能的edits.new,并更新整个目录树的配额
sig.validateStorageInfo(this);//验证校验点签名是否和当前CheckpointStorage相关信息匹配
//保存内存中元数据到存储目录
saveNamespace(false);
}

如上,合并时从IMAGE存储目录中取出一个获得fsimage文件,从EDITS存储目录中取出一个获得edits文件,因为所有的存储目录都是一样的(从NameNode获取时写到所有存储目录中)。
然后通过FSImage的loadFSImage读取fsimage文件,这个在NameNode命名空间镜像和编辑日志已经分析过了。通过FSImage的loadFSEdits读取edits文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int loadFSEdits(StorageDirectory sd, MetaRecoveryContext recovery) throws IOException {
int numEdits = 0;
EditLogFileInputStream edits = new EditLogFileInputStream(getImageFile(sd, NameNodeFile.EDITS));
numEdits = FSEditLog.loadFSEdits(edits, editsTolerationLength, recovery);//加载edits文件
edits.close();
File editsNew = getImageFile(sd, NameNodeFile.EDITS_NEW);
if (editsNew.exists() && editsNew.length() > 0) {
edits = new EditLogFileInputStream(editsNew);
numEdits += FSEditLog.loadFSEdits(edits, editsTolerationLength, recovery);//如果存在edits.new,加载edits.new文件
edits.close();
}
// update the counts.
FSNamesystem.getFSNamesystem().dir.updateCountForINodeWithQuota();//更新整个目录树的配额
return numEdits;
}

如上,在NameNode命名空间镜像和编辑日志中已经分析过部分FSEditLog的loadFSEdits方法读取编辑日志,更新内存元数据。在FSImage的loadFSEdits方法中,通过FSEditLog的loadFSEdits读取最新下载到的edits文件,然后如果还存在edits.new文件(SecondaryNameNode一般不会存在)继续加载edits.new中的日志,加载完后内存中元数据已经为最新的了,然后遍历整个目录树,更新整个目录树的目录配额。

然后回到doMerge方法,加载完fsimage和日志文件后,通过validateStorageInfo验证SecondaryNameNode和校验点签名对象CheckpointSignature的layoutVersionnamespaceIDcTime是否一致,即获取的是否为指定的校验点,不一致抛出异常。

最终通过saveNamespace将内存元数据保存到存储目录中。saveNamespace会关闭当前日志文件,然后根据传入的参数更新checkpointTime(CheckpointStorage),这里传入的为false,不更新检查点时间。
然后将所有存储目录的current重命名为lastcheckpoint.tmp,即保存当前信息。创建新的current目录,通过saveFSImage将内存元数据保存到新的current目录中,即创建新的检查点,saveFSImage在NameNode命名空间镜像和编辑日志分析过,并创建新的edits文件(只有文件头版本信息)。最终将lastcheckpoint.tmp重命名为previous.checkpoint完成一次检查点的更新过程。
最终重新打开新的edits文件,更新CheckpointStates为UPLOAD_DONE。完成合并过程。

3.3.5 putFSImage

合并后得通知NameNode获取最新的fsimage,通过putFSImage完成

1
2
3
4
5
6
7
8
private void putFSImage(CheckpointSignature sig) throws IOException {
String fileid = "putimage=1&port=" + imagePort +
"&machine=" + infoBindAddress +
"&token=" + sig.toString() +
"&newChecksum=" + getNewChecksum();
LOG.info("Posted URL " + fsName + fileid);
TransferFsImage.getFileClient(fsName, fileid, (File[])null, false);
}

如上,建立URLConnection,URL为

1
http://<名字节点地址>:<名字节点HTTP端口>/getimage?putimage=1&port=<第二名字节点HTTP端口>&machine=<第二名字节点地址>&token=<CheckpointSignature信息>&newChecksum=<最新fsimage的MD5哈希值>

其实还是通过向NameNode的HTTPServer发送get请求,不过如上getFileClient第三个参数为null,即实际不读取数据到SecondaryNameNode本地目录中。起到一个通知的作用,NameNode在收到该get请求后会从SecondaryNameNode的HTTPServer中下载最新的fsimage文件。见后分析。

3.3.6 namenode.rollFsImage

通知NameNode下载最新fsimage后,通过NameNode代理通知开始NameNode使用新的fsimage代替原来的命名空间镜像,NameNode的rollFsImage方法实现见后分析。

3.3.7 endCheckpoint

最终的endCheckpoint做最后的清理工作

1
2
3
4
5
void endCheckpoint() throws IOException {
for(StorageDirectory sd : storageDirs) {
moveLastCheckpoint(sd);
}
}

如上,所有的存储目录通过moveLastCheckpoint完成清理工作。存储目录中如果存在previous.checkpoint即上一个检查点目录则删除,然后将lastcheckpoint.tmp重命名为previous.checkpoint(其实在前面saveNamespace时已经重命名了),完成清理工作


4. NameNode的HTTP服务

如前面分析,SecondaryNameNode下载fsimage和edits文件时,通过建立URLConnection获取数据,对应为HTTP的get方法,在NameNode中对应由HTTPServer提供服务。
因为还没分析NameNode的启动过程,这里单独分析HTTPServer的启动。在NameNode的初始化过程中,由startHttpServer启动HTTP服务器。
与SecondaryNameNode的HTTP服务类似,这里主要有以下几点

  • 地址

    1
    final String infoAddr = NetUtils.getServerAddress(conf, "dfs.info.bindAddress", "dfs.info.port", "dfs.http.address");

    如上,旧的HTTP地址使用dfs.info.bindAddress作为地址,dfs.info.port作为端口。新的HTTP地址由dfs.http.address配置,包括端口。创建的HTTPServer对象为NameNode的成员httpServer

  • 属性
属性名称 对应值
name.node 对应的NameNode对象
name.node.address NameNode地址为InetSocketAddress
name.system.image NameNode中的FSImage对象
current.conf NameNode和HTTPServer的配置
  • Servlet
名字 路径 对应类 是否要鉴权
getDelegationToken /getDelegationToken GetDelegationTokenServlet true
renewDelegationToken /renewDelegationToken RenewDelegationTokenServlet true
cancelDelegationToken /cancelDelegationToken CancelDelegationTokenServlet true
fsck /fsck FsckServlet true
getimage /getimage GetImageServlet true
listPaths /listPaths/* ListPathsServlet false
data /data/* FileDataServlet false
checksum /fileChecksum/* FileChecksumServlets.RedirectServlet false
contentSummary /contentSummary/* ContentSummaryServlet false

如上,SecondaryNameNode通过getimage获取fsimage和edits,对应的类GetImageServlet,其doGet方法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public void doGet(final HttpServletRequest request, final HttpServletResponse response
) throws ServletException, IOException
{

Map<String,String[]> pmap = request.getParameterMap();//获取get请求的参数
try {
ServletContext context = getServletContext();//ServletContext
final FSImage nnImage = (FSImage)context.getAttribute("name.system.image");//属性,NameNode中的FSImage
//创建TransferFsImage对象,进行相应的数据读取或发送操作
final TransferFsImage ff = new TransferFsImage(pmap, request, response);
final Configuration conf = (Configuration)getServletContext().getAttribute(JspHelper.CURRENT_CONF);//读取属性配置

...//鉴权

UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
if (ff.getImage()) {//发送fsimage
TransferFsImage.getFileServer(response.getOutputStream(), nnImage.getFsImageName(), getThrottler(conf));
} else if (ff.getEdit()) {//发送edits
TransferFsImage.getFileServer(response.getOutputStream(), nnImage.getFsEditName(), getThrottler(conf));
} else if (ff.putImage()) {//读取新的fsimage
synchronized (fsImageTransferLock) {
final MD5Hash expectedChecksum = ff.getNewChecksum();
nnImage.validateCheckpointUpload(ff.getToken());
reloginIfNecessary().doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
MD5Hash actualChecksum = TransferFsImage.getFileClient(ff.getInfoServer(),
"getimage=1", nnImage.getFsImageNameCheckpoint(), true);
LOG.info("Downloaded new fsimage with checksum: " + actualChecksum);
if (!actualChecksum.equals(expectedChecksum)) {
throw new IOException("Actual checksum of transferred fsimage: "
+ actualChecksum + " does not match expected checksum: "
+ expectedChecksum);
}
return null;
}
});
nnImage.checkpointUploadDone();
}
}
return null;
}
...
}

如上,通过请求的参数创建TransferFsImage对象,由参数判断执行的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public TransferFsImage(Map<String,String[]> pmap, HttpServletRequest request, HttpServletResponse response
) throws IOException {
isGetImage = isGetEdit = isPutImage = false;
remoteport = 0;
machineName = null;
token = null;
newChecksum = null;

for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
String key = it.next();
if (key.equals("getimage")) { //请求读取fsimage
isGetImage = true;
} else if (key.equals("getedit")) { //请求读取edits
isGetEdit = true;
} else if (key.equals("putimage")) { //表示fsimage合并完,通知获取新的fsimage
isPutImage = true;
} else if (key.equals("port")) { //SecondaryNameNode端口
remoteport = new Integer(pmap.get("port")[0]).intValue();
} else if (key.equals("machine")) { //SecondaryNameNode地址
machineName = pmap.get("machine")[0];
} else if (key.equals("token")) { //访问SecondaryNameNode口令
token = new CheckpointSignature(pmap.get("token")[0]);
} else if (key.equals("newChecksum")) { //新fsimage的MD5哈希值
newChecksum = new MD5Hash(pmap.get("newChecksum")[0]);
}
}
int numGets = (isGetImage?1:0) + (isGetEdit?1:0);
if ((numGets > 1) || (numGets == 0) && !isPutImage) {
throw new IOException("Illegal parameters to TransferFsImage");
}
}

如上,解析SecondaryNameNode的请求,由前面分析,SecondaryNameNode可能有获取fsimage,获取edits请求,也可能在fsimage合并完后发送putimage请求来表示合并完成,通知NameNode下载新的fsimage,因此分别进行标识。对于putimage还要读取SecondaryNameNode的相应信息。

然后回到GetImageServlet的doGet方法,对不同的请求进行不同操作

4.1 SecondaryNameNode获取fsimage和edits

在doGet中,如果构造的TransferFsImage解析到请求为getimage或者getedit,则通过TransferFsImage的静态方法getFileServer发送请求,对getimage发送fsimage文件,getedit发送edits文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static void getFileServer(OutputStream outstream, File localfile, DataTransferThrottler throttler) 
throws IOException {
byte buf[] = new byte[BUFFER_SIZE];
FileInputStream infile = null;
try {
infile = new FileInputStream(localfile);
...
int num = 1;
while (num > 0) {//读取文件,循环发送到输出流
num = infile.read(buf);
if (num <= 0) {
break;
}
outstream.write(buf, 0, num);
if (throttler != null) {
throttler.throttle(num);
}
}
} finally {
if (infile != null) {
infile.close();
}
}
}

如上,循环读取相应的文件发送到response的输出流中。

4.2 SecondaryNameNode通知获取新的fsimage

如doGet方法中,如果是putimage请求,则应该从SecondaryNameNode中获取新的fsimage,SecondaryNameNode相关信息从请求的参数中获取,通过TransferFsImage的静态方法getFileClient向SecondaryNameNode发送get请求

1
MD5Hash actualChecksum = TransferFsImage.getFileClient(ff.getInfoServer(), "getimage=1", nnImage.getFsImageNameCheckpoint(), true);

getFileClient前面分析过,URL为

1
http://<第二名字节点地址>:<第二名字节点HTTP端口>/getimage?getimage=1

由前面SecondaryNameNode的HTTPServer可知,/getimage对应的Servlet也是GetImageServlet类,当NameNode发送getimage=1的请求时,如上分析SecondaryNameNode会发送fsimage文件。
NameNode获取新的fsimage文件写到fsimage.ckpt中(getFsImageNameCheckpoint),最终通过checkpointUploadDone将CheckpointStates更新为UPLOAD_DONE。

之后接收到SecondaryNameNode的rollFsImage调用,实现如下

5. rollFsImage

1
2
3
4
5
6
7
8
9
10
11
public void rollFsImage() throws IOException {
namesystem.rollFSImage();
}

synchronized void rollFSImage() throws IOException {
if (isInSafeMode()) {
throw new SafeModeException("Checkpoint not created", safeMode);
}
LOG.info("Roll FSImage from " + Server.getRemoteAddress());
getFSImage().rollFSImage();
}

如上,由FSNamesystem提供服务,最终由FSNamesystem的FSImage的rollFSImage方法完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void rollFSImage() throws IOException {
if (ckptState != CheckpointStates.UPLOAD_DONE) {
throw new IOException("Cannot roll fsImage before rolling edits log.");
}
//首先检查在所有存储目录下有edits.new和fsimage.ckpt
if (!editLog.existsNew()) {//edits.new不存在
throw new IOException("New Edits file does not exist");
}
Iterator<StorageDirectory> it = dirIterator(NameNodeDirType.IMAGE);
while (it.hasNext()) {//保证所有存储目录下存在fsimage.ckpt
StorageDirectory sd = it.next();
File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW);
if (!ckpt.exists()) {
throw new IOException("Checkpoint file " + ckpt + " does not exist");
}
}
editLog.purgeEditLog(); //重命名edits.new为edits
//重命名fsimage.ckpt为fsimage
it = dirIterator(NameNodeDirType.IMAGE);
while (it.hasNext()) {
StorageDirectory sd = it.next();
File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW);
File curFile = getImageFile(sd, NameNodeFile.IMAGE);
if (!ckpt.renameTo(curFile)) {//fsimage存在可能在Windows下重命名失败
curFile.delete();//先删除fsimage
if (!ckpt.renameTo(curFile)) {//重命名
editLog.removeEditsForStorageDir(sd);
updateRemovedDirs(sd);
it.remove();
}
}
}
editLog.exitIfNoStreams();

//在所有存储目录下更新fstime文件,写版本文件
this.layoutVersion = FSConstants.LAYOUT_VERSION;
this.checkpointTime = FSNamesystem.now();//更新检查点时间
it = dirIterator();
while (it.hasNext()) {
StorageDirectory sd = it.next();
//如果存储目录只有镜像文件(不存储日志文件,类型为IMAGE),删除旧的日志文件
if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
File editsFile = getImageFile(sd, NameNodeFile.EDITS);
editsFile.delete();
}
//如果存储目录只存储日志文件(不存储镜像文件,类型为EDITS),删除旧的镜像文件
if (!sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
File imageFile = getImageFile(sd, NameNodeFile.IMAGE);
imageFile.delete();
}
try {
sd.write();//写新的版本文件
} catch (IOException ioe) {
editLog.removeEditsForStorageDir(sd);
updateRemovedDirs(sd, ioe);
it.remove();
}
}
ckptState = FSImage.CheckpointStates.START;//检查点完成,重置CheckpointStates状态为初始状态STRAT
}

如上,首先得确保已经下载了最新的fsimage文件,即处于UPLOAD_DONE状态。且保证所有存储目录存在edits.new和fsimage.ckpt。
然后将edits.new重命名为edits,fsimage.ckpt重命名为fsimage,更新命名空间镜像和日志文件。
之后遍历存储目录,对于EDITS类型的存储目录,删除不应该存在的旧的fsimage文件,对于IMAGE类型的存储目录,删除不应该存在的edits文件,而IMAGE_AND_EDITS类型的存储目录可以同时存在fsimage和edits,并对每一个存储目录根据当前内存中版本信息重新写版本文件。
最终将FSImage的CheckpointStates成员ckptState重置为初始状态START。

到这里完成了一个检查点过程,fsimage为合并过后的最新的命名空间镜像,edits也从原来的edits.new中恢复回来了,继续记录日志。