NameNode实现源码分析---启动和停止

Hadoop版本:Hadoop-1.2.1
参考:《Hadoop技术内幕-深入解析Hadoop Common和HDFS架构设计与实现原理》


NameNode.main为NameNode启动的入口,如下

1
2
3
4
5
6
7
8
9
10
11
public static void main(String argv[]) throws Exception {
try {
StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
NameNode namenode = createNameNode(argv, null);
if (namenode != null)
namenode.join();
} catch (Throwable e) {
LOG.error(StringUtils.stringifyException(e));
System.exit(-1);
}
}

如上,与DataNode启动类似,首先通过startupShutdownMessage记录启动日志STARTUP_MSG:...,并注册关闭时的关闭日志SHUTDOWN_MSG:...,然后通过createNameNode方法创建NameNode对象,通过NameNode的join方法等待RPC服务结束,join方法如下

1
2
3
4
5
6
public void join() {
try {
this.server.join();
} catch (InterruptedException ie) {
}
}

createNameNode方法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static NameNode createNameNode(String argv[], Configuration conf) throws IOException {
if (conf == null)
conf = new Configuration();
StartupOption startOpt = parseArguments(argv);//解析启动参数
if (startOpt == null) {
printUsage();
System.exit(-2);
}
setStartupOption(conf, startOpt);//启动参数写到配置中

switch (startOpt) {
case FORMAT://格式化
boolean aborted = format(conf, startOpt.getConfirmationNeeded(), startOpt.getInteractive());
System.exit(aborted ? 1 : 0);
case FINALIZE://提交升级
aborted = finalize(conf, true);
System.exit(aborted ? 1 : 0);
case RECOVER://修复
NameNode.doRecovery(startOpt, conf);
return null;
default:
}
//其他启动方式,创建NameNode对象
DefaultMetricsSystem.initialize("NameNode");
NameNode namenode = new NameNode(conf);
return namenode;
}

1. 启动参数

createNameNode中,首先通过parseArguments解析启动参数,启动时可附带的参数在StartupOption枚举中,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static public enum StartupOption{
FORMAT ("-format"),//格式化
REGULAR ("-regular"),//正常启动
UPGRADE ("-upgrade"),//更新启动
RECOVER ("-recover"),//恢复启动
FORCE ("-force"),//格式化或恢复方式启动时可以指定
ROLLBACK("-rollback"),//回滚启动
FINALIZE("-finalize"),//提交升级启动
IMPORT ("-importCheckpoint"),//导入检查点
NONINTERACTIVE ("-nonInteractive");//格式化时可以指定,非交互方式

//只在恢复方式启动时使用
private int force = MetaRecoveryContext.FORCE_NONE;

//用于格式化方式启动
private boolean isConfirmationNeeded = true;
private boolean isInteractive = true;
...

如上,可以通过format(格式化)方式启动,regular(正常)方式启动,upgrade(启动更新)方式启动,recover(恢复到正常状态)方式启动,rollback(升级回滚)方式启动,finalize(提交升级)方式启动,importCheckpoint(导入检查点)方式启动。

parseArguments具体代码不贴出来了,默认情况下(不带附加参数)为regular正常方式启动,NameNode在启动前需要格式化,格式化时附带参数”-regular”,同时可以指定”-force”表示直接格式化所有存储目录而不提醒,影响的是StartupOption中的成员isConfirmationNeeded,默认为true即会提示。而指定”-nonInteractive”时,会在ifConfirmationNeeded为true时打印提示信息,影响的是成员isInteractive,默认为true即交互模式,会打印提示信息。

其他升级提交回滚等相关操作和DataNode类似,对应的current和previous相关目录操作,这里不分析,importCheckpoint也不分析了。主要分析格式化和正常方式启动。

createNameNode中解析完参数后,对于格式化方式,提交升级方式,修复方式在相应的操作后直接返回不会创建NameNode对象,而其他方式创建NameNode对象然后返回。

2. 格式化启动

格式化启动时,由FSNamesystem的format方法负责进行格式化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static boolean format(Configuration conf,
boolean isConfirmationNeeded, boolean isInteractive) throws IOException {
Collection<File> dirsToFormat = FSNamesystem.getNamespaceDirs(conf);//fsimage目录
Collection<File> editDirsToFormat = FSNamesystem.getNamespaceEditsDirs(conf);//编辑日志目录
for(Iterator<File> it = dirsToFormat.iterator(); it.hasNext();) {
File curDir = it.next();
if (!curDir.exists())
continue;
if (isConfirmationNeeded) {//需要用户验证,没有指定"-force"参数
if (!isInteractive) {//不是处于交互模式,指定了"-nonInteractive"
System.err.println("Format aborted: " + curDir + " exists.");
return true;
}
//需要用户验证,且没有指定"-nonInteractive",输出提示信息
System.err.print("Re-format filesystem in " + curDir +" ? (Y or N) ");
if (!(System.in.read() == 'Y')) {//如果用户输入的不是Y,则格式化终止,返回
System.err.println("Format aborted in "+ curDir);
return true;
}
while(System.in.read() != '\n'); // discard the enter-key
}
}

FSNamesystem nsys = new FSNamesystem(new FSImage(dirsToFormat, editDirsToFormat), conf);
nsys.dir.fsImage.format();
return false;
}

如上,首先通过FSNamesystem.getNamespaceDirs从配置中读取fsimage的存储目录,比较简单这里不贴代码,为配置项dfs.name.dir,可配置多个目录,如果没有配置,缺省为/tmp/hadoop/dfs/name。同样的通过FSNamesystem.getNamespaceEditsDirs读取编辑日志的存储目录,为配置项dfs.name.edits.dir,缺省/tmp/hadoop/dfs/name

然后,如果格式化时,没有指定附加参数”-force”但指定了”-nonInteractive”,则由于需要用户确认格式化但又不是处于交互模式,因此不能执行格式化,返回。而如果没有指定”-force”且没有指定”-nonInteractive”时(即没有附加参数),则默认需要用户确认格式化且处于交互模式,这时对每一个存储目录输出格式化的提示信息,如果用户禁止了任何一个存储目录的格式化(输入的不是Y),则整个格式化操作会终止返回。而如果指定了”-force”时,则不需要用户确认,所有的存储目录会进行格式化操作。

格式化时,创建FSNamesystem对象,其实主要是FSNamesystem中的FSImage对象,包括需要格式化的fsimage目录和编辑日志目录,然后通过FSImage.format进行格式化。

2.1 FSImage.format

1
2
3
4
5
6
7
8
9
10
11
public void format() throws IOException {
this.layoutVersion = FSConstants.LAYOUT_VERSION;
this.namespaceID = newNamespaceID();//分配新的存储ID
this.cTime = 0L;
this.checkpointTime = FSNamesystem.now();
//对每一个存储目录格式化
for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
format(sd);
}
}

如上,会初始化NameNode的存储信息,包括layoutVersion,namespaceID和cTime。其中namespaceID由newNamespaceID分配,如下

1
2
3
4
5
6
7
8
private int newNamespaceID() {
Random r = new Random();
r.setSeed(FSNamesystem.now());
int newID = 0;
while(newID == 0)
newID = r.nextInt(0x7FFFFFFF); // use 31 bits only
return newID;
}

以当前时间作为随机数的种子生成一个随机数,只使用31位。namespaceID在NameNode启动时格式化,并在整个NameNode生命周期内维持不变,数据节点注册时获得该namespaceID作为注册ID,然后在每次和NameNode通信时都进行验证

初始化存储信息后,对每一个存储目录进行格式化,dirIterator为存储目录迭代器,迭代FSImage的成员storageDirs,继承自父类Storage。storageDirs由前面FSNamesystem中的format方法中,构造FSImage时传入的fsimage目录和编辑日志目录进行初始化,如下

2.1.1 setStorageDirectories

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void setStorageDirectories(Collection<File> fsNameDirs, Collection<File> fsEditsDirs) throws IOException {
storageDirs = new ArrayList<StorageDirectory>();
removedStorageDirs = new ArrayList<StorageDirectory>();
for (File dirName : fsNameDirs) {
boolean isAlsoEdits = false;
for (File editsDirName : fsEditsDirs) {
if (editsDirName.compareTo(dirName) == 0) {//既是fsimage存储目录,又是编辑日志存储目录
isAlsoEdits = true;
fsEditsDirs.remove(editsDirName);//从编辑日志中移除该目录
break;
}
}
//既是fsimage存储目录,又是编辑日志存储目录,目录类型为IMAGE_AND_EDITS,否则为IMAGE
NameNodeDirType dirType = (isAlsoEdits) ?
NameNodeDirType.IMAGE_AND_EDITS :
NameNodeDirType.IMAGE;
addStorageDir(new StorageDirectory(dirName, dirType));
}

//编辑日志目录中剩余的目录为EDITS目录类型
for (File dirName : fsEditsDirs) {
addStorageDir(new StorageDirectory(dirName, NameNodeDirType.EDITS));
}
}

如上,fsNameDirs为fsimage存储目录,fsEditDirs为编辑日志存储目录,如果一个目录既存储fsimage又存储编辑日志(既在fsNameDirs中又在fsEditsDir中),则存储目录类型为IMAGE_AND_EDITS,否则fsNameDirs中的目录类型为IMAGE,fsEditsDirs中目录类型为EDITS。创建对应的StorageDirectory对象添加到成员storageDirs中。StorageDirectory和其他与存储目录相关的另见DataNode实现源码分析—本地存储管理

因此,回到FSImage的format方法,遍历storageDirs中的所有存储目录,对每个存储目录使用format方法进行格式化,如下

1
2
3
4
5
6
7
8
9
10
void format(StorageDirectory sd) throws IOException {
sd.clearDirectory(); //如果current目录存在,则删除。创建新的current目录
sd.lock();//占有存储目录锁
try {
saveCurrent(sd);
} finally {
sd.unlock();//释放存储目录锁
}
LOG.info("Storage directory " + sd.getRoot() + " has been successfully formatted.");
}

如上,格式化时,首先通过clearDirectory将可能存在的current目录删除,然后重新创建新的current目录。
然后通过saveCurrent将当前镜像和空的编辑日志写到current目录中,并写VERSION文件

1
2
3
4
5
6
7
8
9
10
11
12
protected void saveCurrent(StorageDirectory sd) throws IOException {
File curDir = sd.getCurrentDir();
NameNodeDirType dirType = (NameNodeDirType)sd.getStorageDirType();
if (!curDir.exists() && !curDir.mkdir())//current目录不存在则创建,创建失败抛出异常
throw new IOException("Cannot create directory " + curDir);

if (dirType.isOfType(NameNodeDirType.IMAGE))
saveFSImage(getImageFile(sd, NameNodeFile.IMAGE));
if (dirType.isOfType(NameNodeDirType.EDITS))
editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS));
sd.write();
}

如上,如果存储目录是IMAGE类型,则通过saveFSImage将当前命名空间镜像保存到current目录的fsimage中,而如果是EDITS类型,则需要通过createEditLogFile创建新的日志文件(文件内容目前只包含文件头版本号),createEditLogFile为标准的创建新的日志流程,创建EditLogFileOutputStream文件输出流,然后使用EditLogFileOutputStream的create方法写版本到流中并刷新到底层文件中。saveFSImage和EditLogFileOutputStream的create方法另见NameNode实现源码分析—命名空间镜像和编辑日志
注意的是,对于IMAGE_AND_EDITS类型的存储目录,既存储fsimage,又存储编辑日志,上面isOfType判断是否为IMAGE和EDITS都返回true。

最后通过StorageDirectory的write方法写版本VERSION文件到current目录中。

总结下,名字节点格式化时,会将所有存储目录下的current目录删除,然后创建新的current目录。对于IMAGE类型的存储目录,会将当前镜像写到fsimage中(还没启动当然为空),而对于EDITS类型的存储目录,会写新的编辑日志文件(包含版本信息),IMAGE_AND_EDITS类型的当然既有fsimage又有编辑日志文件,然后在每个存储目录中写新的VERSION文件(包含layoutVersion,storageType,namespaceID,cTime)。


3. 正常启动

在NameNode的mian方法中,如果是正常启动,则会通过构造函数创建NameNode对象,然后等待NameNode的RPC服务完成
NameNode构造如下

1
2
3
4
5
6
7
8
public NameNode(Configuration conf) throws IOException {
try {
initialize(conf);
} catch (IOException e) {
this.stop();
throw e;
}
}

如上,由initialize执行初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
private void initialize(Configuration conf) throws IOException {
InetSocketAddress socAddr = NameNode.getAddress(conf);//RPC地址
...//安全相关
int handlerCount = conf.getInt("dfs.namenode.handler.count", 10);//RPC服务的处理器个数

...//安全相关

myMetrics = NameNodeInstrumentation.create(conf);//创建度量对象NameNodeInstrumentation
this.namesystem = new FSNamesystem(this, conf);//创建FSNamesystem对象

...//安全相关

//RPC服务
InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
if (dnSocketAddr != null) {
//serviceRpcServer的处理器个数
int serviceHandlerCount =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
//创建serviceRpcServer服务对象
this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),
dnSocketAddr.getPort(), serviceHandlerCount,
false, conf, namesystem.getDelegationTokenSecretManager());
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
//更新serviceRpcServer地址到配置中
setRpcServiceServerAddress(conf);
}
//创建server对象
this.server = RPC.getServer(this, socAddr.getHostName(),
socAddr.getPort(), handlerCount, false, conf, namesystem
.getDelegationTokenSecretManager());
// Set terse exception whose stack trace won't be logged
this.server.addTerseExceptions(SafeModeException.class);

// The rpc-server port can be ephemeral... ensure we have the correct info
this.serverAddress = this.server.getListenerAddress();
FileSystem.setDefaultUri(conf, getUri(serverAddress));
LOG.info("Namenode up at: " + this.serverAddress);

//启动HTTP服务器
startHttpServer(conf);

//启动RPC服务
this.server.start(); //start RPC server
if (serviceRpcServer != null) {
serviceRpcServer.start();
}

//启动垃圾清理器
startTrashEmptier(conf);

//启动可能的插件
plugins = conf.getInstances("dfs.namenode.plugins", ServicePlugin.class);
for (ServicePlugin p: plugins) {
try {
p.start(this);
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be started", t);
}
}
}

如上,初始化时,主要包括创建NameNode度量对象NameNodeInstrumentation,创建FSNamesystem对象(提供元数据相关操作服务),启动RPC服务,启动HTTP服务,创建垃圾清理器Trash.Emptier线程,还有与DataNode类似的插件。
NameNodeInstrumentation不分析,创建为配置项dfs.namenode.plugins,需要实现ServicePlugin接口,包括启动start方法和停止stop方法,也不分析。接下来分析其他的初始化

3.1 RPC服务初始化

NameNode中有两个RPC服务器,分别为成员serverserviceRpcServer
server的地址为配置项dfs.namenode.rpc-address,如果没有配置或者为空,则使用fs.default.name作为地址,端口为默认端口8020。server的处理器数量由配置项dfs.namenode.handler.count决定,默认10个。
serviceRpcServer的地址为配置项dfs.namenode.servicerpc-address,没有配置或为空时不开启该RPC服务,处理器数量为配置项dfs.namenode.service.handler.count,默认10个。

当serviceRpcServer服务存在时,接口DatanodeProtocol和NamenodeProtocol使用serviceRpcServer(可见DataNode启动中DataNode的初始化建立NameNode代理过程),而ClientProtocol使用server。这是因为Client的RPC调用执行时间短,但往往需要等待日志的持久化,而数据节点和第二名字节点的RPC调用,执行的逻辑比较复杂,执行的时间也比较长,在一个繁忙的系统中,对客户端和HDFS其他实体(包括数据节点和第二名字节点)的RPC请求,使用不同的服务器,可以保证客户端的请求得到及时的响应。(该解释来自技术内幕P451)。
而如果serviceRpcServer服务不存在(没有配置),则NameNode只有一个RPC服务器,所有的RPC通信都由server负责。

RPC服务器的创建和启动另见RPC的相关源码分析。

3.2 HTTP服务

HTTP服务主要用于和SecondaryNameNode之间的交互,即fsimage和编辑日志的传输,参见NameNode实现源码分析—和SecondaryNameNode的交互

3.3 垃圾清理器Trash.Emptier线程

如前面initialize中,由startTrashEmptier方法创建并启动

1
2
3
4
5
private void startTrashEmptier(Configuration conf) throws IOException {
this.emptier = new Thread(new Trash(conf).getEmptier(), "Trash Emptier");
this.emptier.setDaemon(true);
this.emptier.start();
}

3.3.1 Trash

Trash为文件系统提供了回收站的功能,Trash主要成员如下如下

1
2
3
4
5
6
7
8
9
10
11
12
13
private static final Path CURRENT = new Path("Current");
private static final Path TRASH = new Path(".Trash/");
private static final Path HOMES = new Path("/user/");

private static final FsPermission PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);

private static final DateFormat CHECKPOINT = new SimpleDateFormat("yyMMddHHmm");
private static final int MSECS_PER_MINUTE = 60*1000;

private final FileSystem fs;
private final Path trash;
private final Path current;
private final long interval;

如上,回收站位于用户主目录的.Trash目录下,即对于用户”xiaoyun”来说回收站路径为”/user/xiaoyun/.Trash”,成员trash即为该目录路径。文件删除时默认被移到回收站的Current子目录下,成员current即为Current目录路径,在current子目录下保留文件原来的路径,如用户xiaoyun删除文件”/tmp/a.txt”,在回收站的路径为”/user/xiaoyun/.Trash/Current/tmp/a.txt”。

PERMISSION为权限,回收站中只有原来的用户有权访问。
CHECKPOINT为垃圾检查点文件名格式。

Trash的构造

1
2
3
4
5
6
7
public Trash(FileSystem fs, Configuration conf) throws IOException {
super(conf);
this.fs = fs;
this.trash = new Path(fs.getHomeDirectory(), TRASH);
this.current = new Path(trash, CURRENT);
this.interval = conf.getLong("fs.trash.interval", 60) * MSECS_PER_MINUTE;
}

如上,fs.getHomeDirectory即为/user/${user.home},周期默认为1h。
还可以指定home目录进行构造

1
2
3
4
5
6
7
private Trash(Path home, Configuration conf) throws IOException {
super(conf);
this.fs = home.getFileSystem(conf);
this.trash = new Path(home, TRASH);
this.current = new Path(trash, CURRENT);
this.interval = conf.getLong("fs.trash.interval", 60) * MSECS_PER_MINUTE;
}

3.3.2 Emptier

Emptier为Trash的内部类,可以访问Trash的成员,Emptier的成员如下

1
2
3
private Configuration conf;
private FileSystem fs;
private long interval;

interval为线程工作周期
构造如下

1
2
3
4
5
public Emptier(Configuration conf) throws IOException {
this.conf = conf;
this.interval = conf.getLong("fs.trash.interval", 60) * MSECS_PER_MINUTE;
this.fs = FileSystem.get(conf);
}

因此,周期为配置项fs.trash.interval,默认60min即1个小时。该周期既是检查周期,又是检查点删除周期。

在startTrashEmptier中通过Trash的getEmptier即创建Emptier对象,然后启动Emptier线程,线程主程序如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public void run() {
if (interval == 0) return;//Trash功能被关闭了,返回
long now = System.currentTimeMillis();
long end;
while (true) {
end = ceiling(now, interval);
try { //等待清理时间到达
Thread.sleep(end - now);
} catch (InterruptedException e) {
return; // exit on interrupt
}
try {
now = System.currentTimeMillis();
if (now >= end) {
FileStatus[] homes = null;
try {
homes = fs.listStatus(HOMES); //列出所有的home目录,home目录位于/user下
} catch (IOException e) {
LOG.warn("Trash can't list homes: "+e+" Sleeping.");
continue;
}
if (homes == null)//没有home目录
continue;
for (FileStatus home : homes) { // dump each trash
if (!home.isDir())
continue;
try {
Trash trash = new Trash(home.getPath(), conf);//每一个home目录创建Trash对象清理
trash.expunge();//清理过期的垃圾检查点
trash.checkpoint();//将current目录重命名为当前时间作为一个检查点
} catch (IOException e) {
LOG.warn("Trash caught: "+e+". Skipping "+home.getPath()+".");
}
}
}
} catch (Exception e) {
LOG.warn("RuntimeException during Trash.Emptier.run() " + StringUtils.stringifyException(e));
}
}
}

如上,垃圾清理的周期为配置项fs.trash.interval,默认1个小时,没有到达清理时间则休眠等待。

清理时,/user目录下的每一个目录为一个用户的主目录,对所有的home目录进行清理。
每个home目录创建一个Trash对象,通过Trash对象的expunge方法进行清理。清理.Trash目录下过期的垃圾检查点,垃圾检查点为之前current目录重命名后的目录,目录名为重命名时间,格式为CHECKPOINT。垃圾检查点从创建到现在超过期限了就会被删除,期限同样为配置项fs.trash.interval,即默认垃圾检查点经过1个小时后便会被清理。
清理垃圾检查点后,将当前垃圾存放目录current创建检查点,即将current目录重命名为一个垃圾检查点,该垃圾检查点目录名为重命名日期,格式为CHECKPOINT,检查点创建由Trash的checkpoint方法产生,如上。
dopunge和checkpoint都比较简单,这里就不贴代码了。

总的来说,用户删除的文件会放到用户主目录的.Trash/Current目录下,垃圾清理器默认每隔1个小时检查所有用户主目录,将用户当前使用的垃圾目录(.Trash/Current)创建检查点,检查点即将Current目录重命名为当前操作时间。且会检查每个垃圾检查点,对从创建时间(检查点名字)到现在默认超过1个小时的检查点进行删除。
也就是说,删除文件存放到’/user/${user.home}/.Trash/Current`目录下,然后过了1个小时后被移到检查点中,Current继续保存接下来新删除的文件,检查点再过一个小时被删除,因此用户删除的文件默认经过两个小时候会从HDFS中移除。

3.4 FSNamesystem

如前面initialize中,由FSNamesystem的构造函数创建FSNamesystem对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
FSNamesystem(NameNode nn, Configuration conf) throws IOException {
try {
initialize(nn, conf);
} catch (IOException e) {
LOG.error(getClass().getSimpleName() + " initialization failed.", e);
close();
shutdown();
throw e;
} catch (RuntimeException e) {
LOG.error(getClass().getSimpleName() + " initialization failed.", e);
close();
shutdown();
throw e;
}
}

如上,通过initialize对FSNamesystem进行初始化,异常时通过close关闭相关服务线程,通过shutdown关闭目录服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private void initialize(NameNode nn, Configuration conf) throws IOException {
this.systemStart = now();//启动时间
setConfigurationParameters(conf);//通过配置初始化成员
dtSecretManager = createDelegationTokenSecretManager(conf);//口令安全管理器

this.nameNodeAddress = nn.getNameNodeAddress();//名字节点RPC服务地址
this.registerMBean(conf); // register the MBean for the FSNamesystemStutus
this.dir = new FSDirectory(this, conf);//创建FSDirectory对象
StartupOption startOpt = NameNode.getStartupOption(conf);
//读取存储目录,如果处于中间状态则进行相应的恢复,加载命名空间镜像和编辑日志并进行合并,创建新的检查点
this.dir.loadFSImage(getNamespaceDirs(conf), getNamespaceEditsDirs(conf), startOpt);
long timeTakenToLoadFSImage = now() - systemStart;
LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");//记录加载命名空间镜像时间
NameNode.getNameNodeMetrics().setFsImageLoadTime(timeTakenToLoadFSImage);
this.safeMode = new SafeModeInfo(conf);//创建SafeModeInfo对象
setBlockTotal();//更新总区块数
//创建pendingReplications
pendingReplications = new PendingReplicationBlocks(
conf.getInt("dfs.replication.pending.timeout.sec", -1) * 1000L);
if (isAccessTokenEnabled) {
//创建访问令牌处理器
accessTokenHandler = new BlockTokenSecretManager(true, accessKeyUpdateInterval, accessTokenLifetime);
}
//心跳处理线程
this.hbthread = new Daemon(new HeartbeatMonitor());
//租约管理器线程
this.lmthread = new Daemon(leaseManager.new Monitor());
//复制删除操作线程
this.replmon = new ReplicationMonitor();
this.replthread = new Daemon(replmon);
hbthread.start();
lmthread.start();
replthread.start();

//include和exclude文件的HostsFileReader对象
this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""), conf.get("dfs.hosts.exclude",""));
//撤销状态管理线程DecommissionManager
this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(
conf.getInt("dfs.namenode.decommission.interval", 30),
conf.getInt("dfs.namenode.decommission.nodes.per.interval", 5)));
dnthread.start();
//DNSToSwitchMapping对象
this.dnsToSwitchMapping = ReflectionUtils.newInstance(
conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
DNSToSwitchMapping.class), conf);

//dns到switch映射支持缓冲的话,从include列表中解析网络位置,存储解析结果在缓冲中,这样之后的解析请求就会快一点
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
}

InetSocketAddress socAddr = NameNode.getAddress(conf);
this.nameNodeHostName = socAddr.getHostName();
registerWith(DefaultMetricsSystem.INSTANCE);
}

如上,会依次完成如下工作

  • 通过setConfigurationParameters读取配置初始化FSNamesystem中与配置相关的成员,比较重要的成员如下
FSNamesystem中成员 含义 配置项 缺省值
supergroup 拥有超级权限用户所在组 dfs.permissions.supergroup supergroup
isPermissionEnabled 是否使能权限访问 dfs.permissions true
defaultPermission 缺省权限 N/A 当前用户,超级组,777访问权限
blocksInvalidateWorkPct ReplicationMonitor线程每次删除操作的数据节点占总数据节点比重 dfs.namenode.invalidate.work.pct.per.iteration 0.32
blocksReplWorkMultiplier ReplicationMonitor线程每次复制操作,每个需要复制的数据节点执行区块数 dfs.namenode.replication.work.multiplier.per.iteration 2
clusterMap 网络拓扑实现类 net.topology.impl NetworkTopology
replicator 区块放置策略实现类,决定区块在集群中放置位置 dfs.block.replicator.classname BlockPlacementPolicyDefault
defaultReplication 缺省期望副本数 dfs.replication 3
maxReplication 最大副本数 dfs.replication.max 512
minReplication 最小副本数,达到此值才是安全的区块,可以进行复制操作 dfs.replication.min 1
maxReplicationStreams 数据节点同时执行复制操作的数目 dfs.max-repl-streams 2
heartbeatRecheckInterval HeartbeatMonitor线程检查周期 heartbeat.recheck.interval 5*60*1000(5min)
heartbeatExpireInterval 判断数据节点死亡的未发送心跳间隔 N/A 2*heartbeatRecheckInterval+10*heartbeatInterval(5min30s)
replicationRecheckInterval ReplicationMonitor线程检查周期 dfs.replication.interval 3(s)
defaultBlockSize 缺省区块大小 dfs.block.size 64*1024*1024(64MB)
maxFsObjects 最大INodeFile对象数目 dfs.max.objects 0(无限制)
blockInvalidateLimit ReplicationMonitor线程的一次处理中,每个数据节点删除区块限制 dfs.block.invalidate.limit 20*(heartbeatInterval/1000)即60
durableSync N/A dfs.durable.sync true
staleInterval 数据节点成旧周期,即该时间内没有心跳到来则判断数据节点成旧 dfs.namenode.stale.datanode.interval 30*1000(30s)
  • 创建FSDirectory对象,与目录树,命名空间镜像,编辑日志等相关的操作由该对象负责,FSDirectory包含FSImage成员,而FSImage对象又包含FSEditLog成员,这些在其他文章已经分析过了
  • FSDirectory的loadFSImage方法。可能会根据启动选项进行格式化,然后分析本地存储目录,如果处于中间状态(类似DataNode,更新提交等中间状态)进行恢复,然后读取最新的命名空间镜像和编辑日志,进行合并,这些就是从本地文件中恢复(由FSImage的recoverTransitionRead方法完成)。恢复到上次工作状态后,需要将合并后新的命名空间镜像保存到本地,首先将之前的工作目录current目录重命名为lastcheckpoint.tmp,然后将新的镜像保存到current中,打开新的编辑日志文件,最后将lastcheckpoint.tmp重命名为previous.checkpoint,即一次更新操作(由FSImage的saveNamespace方法完成)。
    recoverTransitionRead和saveNamespace方法不再分析。
  • 创建安全模式管理对象SafeModeInfo,因为前面已经加载了镜像并合并了编辑日志,现在内存中信息就是上次关闭前的工作状态,通过setBlockTotal更新SafeModeInfo的成员blockTotal,关于安全模式另见NameNode安全模式
  • 创建PendingReplicationsBlocks对象,即初始化pendingReplications成员,会启动相关PendingReplicationMonitor线程,复制超时时间为配置项dfs.replication.pending.timeout.sec,默认-1表示使用缺省值5min。pendingReplications以及PendingReplicationMonitor线程另见NameNode维护的数据结构
  • 创建并启动心跳检查线程(HeartbeatMonitor),租约检查线程(LeaseManager.Monitor),区块复制/删除线程(ReplicationMonitor),HeartbeatMonitor线程另见NameNode心跳检测,LeaseManager.Monitor另见NameNode租约管理,ReplicationMonitor线程另见NameNode区块复制和删除线程
  • 创建解析include文件和exclude文件的HostsFileReader对象,另见NameNode数据节点生存期管理
  • 创建并启动用于检测并改变撤销数据节点状态的DecommissionManager线程,DecommissionManager另见NameNode数据节点生存期管理,如上,检查周期为30s,每次检查5个数据节点。
  • 创建网络拓扑相关的DNSToSwitchMapping对象,DNSToSwitchMapping另见NameNode区块复制和删除线程,如上,默认为ScriptBasedMapping,实现了CachedDNSToSwitchMapping,因此创建后会进行解析,如果配置了脚本,则会使用脚本进行解析,脚本处理的输入为IP地址形式,每次默认处理100个输入。解析结果缓存在ScriptBasedMapping成员cache中。而如果没有配置脚本,则所有的主机会被解析成/default-rack

4. 停止

NameNode停止由stop方法完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void stop() {
if (stopRequested)//用于测试
return;
stopRequested = true;
if (plugins != null) {//停止所有插件
for (ServicePlugin p : plugins) {
try {
p.stop();
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be stopped", t);
}
}
}
try {
if (httpServer != null) httpServer.stop();//停止HTTP服务
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
}
if(namesystem != null) namesystem.close();//关闭FSNamesystem相关线程
if(emptier != null) emptier.interrupt();//中断垃圾清理器
if(server != null) server.stop();//停止RPC服务
if(serviceRpcServer != null) serviceRpcServer.stop();//如果serviceRpcServer存在,停止
if (myMetrics != null) {//关闭度量对象
myMetrics.shutdown();
}
if (namesystem != null) {//关闭FSNamesystem
namesystem.shutdown();
}
}

如上,与启动对应,停止所有插件(插件就两个方法,start启动,stop停止),停止HTTPServer,停止FSNamesystem中相关服务,中断垃圾清理器,停止RPC服务。
其中RPC服务可能由两个因此对应的进行停止,RPC服务器的stop另见RPC相关分析。

4.1 垃圾清理器中断

如前面垃圾清理器线程的主程序中,通过interrupt会中断可能处于休眠等待状态的垃圾清理器,因为垃圾清理器默认1小时执行一次,因此很有可能处于休眠状态从而唤醒返回。而如果处于工作状态,如在expunge清理或checkpoint生成检查点过程中,则要等到这次工作完成。

4.2 FSNamesystem的关闭

如上,首先调用FSNamesystem的close方法,然后调用shutdown方法,在初始化FSNamesystem对象失败时也是这个顺序

4.2.1 FSNamesystem.close

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void close() {
fsRunning = false;
try {
if (pendingReplications != null) pendingReplications.stop();//中断PendingReplicationMonitor线程
if (hbthread != null) hbthread.interrupt();//中断心跳检测线程
if (replthread != null) replthread.interrupt();//中断复制/删除操作线程
if (dnthread != null) dnthread.interrupt();//中断撤销状态管理线程
if (smmthread != null) smmthread.interrupt();//中断安全模式管理线程
if (dtSecretManager != null) dtSecretManager.stopThreads();//中断口令安全管理线程
} catch (Exception e) {
LOG.warn("Exception shutting down FSNamesystem", e);
} finally {
try {
if (lmthread != null) {//中断租约管理线程
lmthread.interrupt();
lmthread.join(3000);
}
dir.close();//关闭FSDirectory
blocksMap.close();//清除blocksMap
} catch (InterruptedException ie) {
} catch (IOException ie) {
LOG.error("Error closing FSDirectory", ie);
IOUtils.cleanup(LOG, dir);
}
}
}

如上,修改fsRunning为false,线程都会循环判断该标识。

4.2.1.1 线程中断

通过pendingReplications.stop()中断对应的PendingReplicationMonitor线程,PendingReplicationMonitor默认5min检查一次(操作一次后休眠5min),因此可能将其从休眠等待中唤醒从而返回,如果处于检查状态则要等待检查完成。

HeartbeatMonitor线程每5min检查一次(当前时间距离上次检查时间超过了5min),ReplicationMonitor线程默认每3s检查一次(每次操作后休眠3s),DecommissionManager线程默认每30s检查一次(每次操作后休眠30s),SafeModeMonitor线程默认1s检查一次(每次操作后休眠1s),LeaseManager.Monitor线程默认2s检查一次(每次操作后休眠2s),如果这些线程处于相关操作中,则要等待本次操作完成才会返回,如果处于休眠等待则会被唤醒并返回。
如上,最后会等待线程完成,最多等待3s中。

4.2.1.2 关闭FSDirectory
1
2
3
public void close() throws IOException {
fsImage.close();
}

关闭FSImage

1
2
3
4
void close() throws IOException {
getEditLog().close();
unlockAll();
}

如上,关闭FSEditLog,然后释放所有的存储目录锁,FSEditLog的close方法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public synchronized void close() throws IOException {
while (isSyncRunning) {//正在同步,等待
try {
wait(1000);
} catch (InterruptedException ie) {
}
}
if (editStreams == null) {
return;
}
printStatistics(true);
numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;//清零统计量

for (int idx = 0; idx < editStreams.size(); idx++) {
EditLogOutputStream eStream = editStreams.get(idx);
try {
eStream.setReadyToFlush();
eStream.flush();
eStream.close();//刷新所有的输出流EditLogFileOutputStream到底层文件,然后关闭输出流
} catch (IOException ioe) {
removeEditsAndStorageDir(idx);
idx--;
}
}
editStreams.clear();//清除editStreams成员
}

如上,如果当前正在同步,则等待,然后清零相关统计量,刷新素有的EditLogFileOutputStream输出流当前缓存日志到底层文件,然后关闭输出流关闭文件,最后清除editStreams成员。同步和EditLogFileOutputStream相关另见NameNode命名空间镜像和编辑日志

4.2.1.3 关闭BlocksMap

由close方法完成

1
2
3
void close() {
blocks.clear();
}

即清除所有的区块记录

4.2.2 FSnamesystem.shutdown

1
2
3
4
5
6
7
8
9
10
11
public void shutdown() {
if (mbeanName != null) {
MBeans.unregister(mbeanName);
}
if (mxBean != null) {
MBeans.unregister(mxBean);
}
if (dir != null) {
dir.shutdown();
}
}

如上主要是FSDirectory的shutdown方法,如下

1
2
3
void shutdown() {
nameCache.reset();
}

即清除NameCache相关对象,NameCache缓存最频繁使用的文件,不具体分析。