Hadoop版本:Hadoop-1.2.1
参考:《Hadoop技术内幕-深入解析Hadoop Common和HDFS架构设计与实现原理》
NameNode.main为NameNode启动的入口,如下1
2
3
4
5
6
7
8
9
10
11public static void main(String argv[]) throws Exception {
try {
StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
NameNode namenode = createNameNode(argv, null);
if (namenode != null)
namenode.join();
} catch (Throwable e) {
LOG.error(StringUtils.stringifyException(e));
System.exit(-1);
}
}
如上,与DataNode启动类似,首先通过startupShutdownMessage记录启动日志STARTUP_MSG:...
,并注册关闭时的关闭日志SHUTDOWN_MSG:...
,然后通过createNameNode方法创建NameNode对象,通过NameNode的join方法等待RPC服务结束,join方法如下1
2
3
4
5
6public void join() {
try {
this.server.join();
} catch (InterruptedException ie) {
}
}
createNameNode方法如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27public static NameNode createNameNode(String argv[], Configuration conf) throws IOException {
if (conf == null)
conf = new Configuration();
StartupOption startOpt = parseArguments(argv);//解析启动参数
if (startOpt == null) {
printUsage();
System.exit(-2);
}
setStartupOption(conf, startOpt);//启动参数写到配置中
switch (startOpt) {
case FORMAT://格式化
boolean aborted = format(conf, startOpt.getConfirmationNeeded(), startOpt.getInteractive());
System.exit(aborted ? 1 : 0);
case FINALIZE://提交升级
aborted = finalize(conf, true);
System.exit(aborted ? 1 : 0);
case RECOVER://修复
NameNode.doRecovery(startOpt, conf);
return null;
default:
}
//其他启动方式,创建NameNode对象
DefaultMetricsSystem.initialize("NameNode");
NameNode namenode = new NameNode(conf);
return namenode;
}
1. 启动参数
createNameNode中,首先通过parseArguments
解析启动参数,启动时可附带的参数在StartupOption枚举中,如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18static public enum StartupOption{
FORMAT ("-format"),//格式化
REGULAR ("-regular"),//正常启动
UPGRADE ("-upgrade"),//更新启动
RECOVER ("-recover"),//恢复启动
FORCE ("-force"),//格式化或恢复方式启动时可以指定
ROLLBACK("-rollback"),//回滚启动
FINALIZE("-finalize"),//提交升级启动
IMPORT ("-importCheckpoint"),//导入检查点
NONINTERACTIVE ("-nonInteractive");//格式化时可以指定,非交互方式
//只在恢复方式启动时使用
private int force = MetaRecoveryContext.FORCE_NONE;
//用于格式化方式启动
private boolean isConfirmationNeeded = true;
private boolean isInteractive = true;
...
如上,可以通过format(格式化)方式启动,regular(正常)方式启动,upgrade(启动更新)方式启动,recover(恢复到正常状态)方式启动,rollback(升级回滚)方式启动,finalize(提交升级)方式启动,importCheckpoint(导入检查点)方式启动。
parseArguments具体代码不贴出来了,默认情况下(不带附加参数)为regular正常方式启动,NameNode在启动前需要格式化,格式化时附带参数”-regular”,同时可以指定”-force”表示直接格式化所有存储目录而不提醒,影响的是StartupOption中的成员isConfirmationNeeded
,默认为true即会提示。而指定”-nonInteractive”时,会在ifConfirmationNeeded为true时打印提示信息,影响的是成员isInteractive
,默认为true即交互模式,会打印提示信息。
其他升级提交回滚等相关操作和DataNode类似,对应的current和previous相关目录操作,这里不分析,importCheckpoint也不分析了。主要分析格式化和正常方式启动。
createNameNode中解析完参数后,对于格式化方式,提交升级方式,修复方式在相应的操作后直接返回不会创建NameNode对象,而其他方式创建NameNode对象然后返回。
2. 格式化启动
格式化启动时,由FSNamesystem的format方法负责进行格式化1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27private static boolean format(Configuration conf,
boolean isConfirmationNeeded, boolean isInteractive) throws IOException {
Collection<File> dirsToFormat = FSNamesystem.getNamespaceDirs(conf);//fsimage目录
Collection<File> editDirsToFormat = FSNamesystem.getNamespaceEditsDirs(conf);//编辑日志目录
for(Iterator<File> it = dirsToFormat.iterator(); it.hasNext();) {
File curDir = it.next();
if (!curDir.exists())
continue;
if (isConfirmationNeeded) {//需要用户验证,没有指定"-force"参数
if (!isInteractive) {//不是处于交互模式,指定了"-nonInteractive"
System.err.println("Format aborted: " + curDir + " exists.");
return true;
}
//需要用户验证,且没有指定"-nonInteractive",输出提示信息
System.err.print("Re-format filesystem in " + curDir +" ? (Y or N) ");
if (!(System.in.read() == 'Y')) {//如果用户输入的不是Y,则格式化终止,返回
System.err.println("Format aborted in "+ curDir);
return true;
}
while(System.in.read() != '\n'); // discard the enter-key
}
}
FSNamesystem nsys = new FSNamesystem(new FSImage(dirsToFormat, editDirsToFormat), conf);
nsys.dir.fsImage.format();
return false;
}
如上,首先通过FSNamesystem.getNamespaceDirs从配置中读取fsimage的存储目录,比较简单这里不贴代码,为配置项dfs.name.dir
,可配置多个目录,如果没有配置,缺省为/tmp/hadoop/dfs/name
。同样的通过FSNamesystem.getNamespaceEditsDirs读取编辑日志的存储目录,为配置项dfs.name.edits.dir
,缺省/tmp/hadoop/dfs/name
。
然后,如果格式化时,没有指定附加参数”-force”但指定了”-nonInteractive”,则由于需要用户确认格式化但又不是处于交互模式,因此不能执行格式化,返回。而如果没有指定”-force”且没有指定”-nonInteractive”时(即没有附加参数),则默认需要用户确认格式化且处于交互模式,这时对每一个存储目录输出格式化的提示信息,如果用户禁止了任何一个存储目录的格式化(输入的不是Y),则整个格式化操作会终止返回。而如果指定了”-force”时,则不需要用户确认,所有的存储目录会进行格式化操作。
格式化时,创建FSNamesystem对象,其实主要是FSNamesystem中的FSImage对象,包括需要格式化的fsimage目录和编辑日志目录,然后通过FSImage.format进行格式化。
2.1 FSImage.format
1 | public void format() throws IOException { |
如上,会初始化NameNode的存储信息,包括layoutVersion,namespaceID和cTime。其中namespaceID由newNamespaceID分配,如下1
2
3
4
5
6
7
8private int newNamespaceID() {
Random r = new Random();
r.setSeed(FSNamesystem.now());
int newID = 0;
while(newID == 0)
newID = r.nextInt(0x7FFFFFFF); // use 31 bits only
return newID;
}
以当前时间作为随机数的种子生成一个随机数,只使用31位。namespaceID在NameNode启动时格式化,并在整个NameNode生命周期内维持不变,数据节点注册时获得该namespaceID作为注册ID,然后在每次和NameNode通信时都进行验证
初始化存储信息后,对每一个存储目录进行格式化,dirIterator为存储目录迭代器,迭代FSImage的成员storageDirs
,继承自父类Storage。storageDirs由前面FSNamesystem中的format方法中,构造FSImage时传入的fsimage目录和编辑日志目录进行初始化,如下
2.1.1 setStorageDirectories
1 | void setStorageDirectories(Collection<File> fsNameDirs, Collection<File> fsEditsDirs) throws IOException { |
如上,fsNameDirs
为fsimage存储目录,fsEditDirs
为编辑日志存储目录,如果一个目录既存储fsimage又存储编辑日志(既在fsNameDirs中又在fsEditsDir中),则存储目录类型为IMAGE_AND_EDITS,否则fsNameDirs中的目录类型为IMAGE,fsEditsDirs中目录类型为EDITS。创建对应的StorageDirectory对象添加到成员storageDirs中。StorageDirectory和其他与存储目录相关的另见DataNode实现源码分析—本地存储管理。
因此,回到FSImage的format方法,遍历storageDirs中的所有存储目录,对每个存储目录使用format方法进行格式化,如下1
2
3
4
5
6
7
8
9
10void format(StorageDirectory sd) throws IOException {
sd.clearDirectory(); //如果current目录存在,则删除。创建新的current目录
sd.lock();//占有存储目录锁
try {
saveCurrent(sd);
} finally {
sd.unlock();//释放存储目录锁
}
LOG.info("Storage directory " + sd.getRoot() + " has been successfully formatted.");
}
如上,格式化时,首先通过clearDirectory将可能存在的current目录删除,然后重新创建新的current目录。
然后通过saveCurrent将当前镜像和空的编辑日志写到current目录中,并写VERSION文件1
2
3
4
5
6
7
8
9
10
11
12protected void saveCurrent(StorageDirectory sd) throws IOException {
File curDir = sd.getCurrentDir();
NameNodeDirType dirType = (NameNodeDirType)sd.getStorageDirType();
if (!curDir.exists() && !curDir.mkdir())//current目录不存在则创建,创建失败抛出异常
throw new IOException("Cannot create directory " + curDir);
if (dirType.isOfType(NameNodeDirType.IMAGE))
saveFSImage(getImageFile(sd, NameNodeFile.IMAGE));
if (dirType.isOfType(NameNodeDirType.EDITS))
editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS));
sd.write();
}
如上,如果存储目录是IMAGE类型,则通过saveFSImage将当前命名空间镜像保存到current目录的fsimage中,而如果是EDITS类型,则需要通过createEditLogFile创建新的日志文件(文件内容目前只包含文件头版本号),createEditLogFile为标准的创建新的日志流程,创建EditLogFileOutputStream文件输出流,然后使用EditLogFileOutputStream的create方法写版本到流中并刷新到底层文件中。saveFSImage和EditLogFileOutputStream的create方法另见NameNode实现源码分析—命名空间镜像和编辑日志。
注意的是,对于IMAGE_AND_EDITS类型的存储目录,既存储fsimage,又存储编辑日志,上面isOfType判断是否为IMAGE和EDITS都返回true。
最后通过StorageDirectory的write方法写版本VERSION文件到current目录中。
总结下,名字节点格式化时,会将所有存储目录下的current目录删除,然后创建新的current目录。对于IMAGE类型的存储目录,会将当前镜像写到fsimage中(还没启动当然为空),而对于EDITS类型的存储目录,会写新的编辑日志文件(包含版本信息),IMAGE_AND_EDITS类型的当然既有fsimage又有编辑日志文件,然后在每个存储目录中写新的VERSION文件(包含layoutVersion,storageType,namespaceID,cTime)。
3. 正常启动
在NameNode的mian方法中,如果是正常启动,则会通过构造函数创建NameNode对象,然后等待NameNode的RPC服务完成
NameNode构造如下1
2
3
4
5
6
7
8public NameNode(Configuration conf) throws IOException {
try {
initialize(conf);
} catch (IOException e) {
this.stop();
throw e;
}
}
如上,由initialize执行初始化1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61private void initialize(Configuration conf) throws IOException {
InetSocketAddress socAddr = NameNode.getAddress(conf);//RPC地址
...//安全相关
int handlerCount = conf.getInt("dfs.namenode.handler.count", 10);//RPC服务的处理器个数
...//安全相关
myMetrics = NameNodeInstrumentation.create(conf);//创建度量对象NameNodeInstrumentation
this.namesystem = new FSNamesystem(this, conf);//创建FSNamesystem对象
...//安全相关
//RPC服务
InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
if (dnSocketAddr != null) {
//serviceRpcServer的处理器个数
int serviceHandlerCount =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
//创建serviceRpcServer服务对象
this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),
dnSocketAddr.getPort(), serviceHandlerCount,
false, conf, namesystem.getDelegationTokenSecretManager());
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
//更新serviceRpcServer地址到配置中
setRpcServiceServerAddress(conf);
}
//创建server对象
this.server = RPC.getServer(this, socAddr.getHostName(),
socAddr.getPort(), handlerCount, false, conf, namesystem
.getDelegationTokenSecretManager());
// Set terse exception whose stack trace won't be logged
this.server.addTerseExceptions(SafeModeException.class);
// The rpc-server port can be ephemeral... ensure we have the correct info
this.serverAddress = this.server.getListenerAddress();
FileSystem.setDefaultUri(conf, getUri(serverAddress));
LOG.info("Namenode up at: " + this.serverAddress);
//启动HTTP服务器
startHttpServer(conf);
//启动RPC服务
this.server.start(); //start RPC server
if (serviceRpcServer != null) {
serviceRpcServer.start();
}
//启动垃圾清理器
startTrashEmptier(conf);
//启动可能的插件
plugins = conf.getInstances("dfs.namenode.plugins", ServicePlugin.class);
for (ServicePlugin p: plugins) {
try {
p.start(this);
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be started", t);
}
}
}
如上,初始化时,主要包括创建NameNode度量对象NameNodeInstrumentation,创建FSNamesystem对象(提供元数据相关操作服务),启动RPC服务,启动HTTP服务,创建垃圾清理器Trash.Emptier线程,还有与DataNode类似的插件。
NameNodeInstrumentation不分析,创建为配置项dfs.namenode.plugins
,需要实现ServicePlugin接口,包括启动start方法和停止stop方法,也不分析。接下来分析其他的初始化
3.1 RPC服务初始化
NameNode中有两个RPC服务器,分别为成员server
和serviceRpcServer
。
server的地址为配置项dfs.namenode.rpc-address
,如果没有配置或者为空,则使用fs.default.name
作为地址,端口为默认端口8020。server的处理器数量由配置项dfs.namenode.handler.count
决定,默认10个。
serviceRpcServer的地址为配置项dfs.namenode.servicerpc-address
,没有配置或为空时不开启该RPC服务,处理器数量为配置项dfs.namenode.service.handler.count
,默认10个。
当serviceRpcServer服务存在时,接口DatanodeProtocol和NamenodeProtocol使用serviceRpcServer(可见DataNode启动中DataNode的初始化建立NameNode代理过程),而ClientProtocol使用server。这是因为Client的RPC调用执行时间短,但往往需要等待日志的持久化,而数据节点和第二名字节点的RPC调用,执行的逻辑比较复杂,执行的时间也比较长,在一个繁忙的系统中,对客户端和HDFS其他实体(包括数据节点和第二名字节点)的RPC请求,使用不同的服务器,可以保证客户端的请求得到及时的响应。(该解释来自技术内幕P451)。
而如果serviceRpcServer服务不存在(没有配置),则NameNode只有一个RPC服务器,所有的RPC通信都由server负责。
RPC服务器的创建和启动另见RPC的相关源码分析。
3.2 HTTP服务
HTTP服务主要用于和SecondaryNameNode之间的交互,即fsimage和编辑日志的传输,参见NameNode实现源码分析—和SecondaryNameNode的交互。
3.3 垃圾清理器Trash.Emptier线程
如前面initialize中,由startTrashEmptier方法创建并启动1
2
3
4
5private void startTrashEmptier(Configuration conf) throws IOException {
this.emptier = new Thread(new Trash(conf).getEmptier(), "Trash Emptier");
this.emptier.setDaemon(true);
this.emptier.start();
}
3.3.1 Trash
Trash为文件系统提供了回收站的功能,Trash主要成员如下如下1
2
3
4
5
6
7
8
9
10
11
12
13private static final Path CURRENT = new Path("Current");
private static final Path TRASH = new Path(".Trash/");
private static final Path HOMES = new Path("/user/");
private static final FsPermission PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
private static final DateFormat CHECKPOINT = new SimpleDateFormat("yyMMddHHmm");
private static final int MSECS_PER_MINUTE = 60*1000;
private final FileSystem fs;
private final Path trash;
private final Path current;
private final long interval;
如上,回收站位于用户主目录的.Trash目录下,即对于用户”xiaoyun”来说回收站路径为”/user/xiaoyun/.Trash”,成员trash
即为该目录路径。文件删除时默认被移到回收站的Current子目录下,成员current
即为Current目录路径,在current子目录下保留文件原来的路径,如用户xiaoyun删除文件”/tmp/a.txt”,在回收站的路径为”/user/xiaoyun/.Trash/Current/tmp/a.txt”。
PERMISSION
为权限,回收站中只有原来的用户有权访问。CHECKPOINT
为垃圾检查点文件名格式。
Trash的构造1
2
3
4
5
6
7public Trash(FileSystem fs, Configuration conf) throws IOException {
super(conf);
this.fs = fs;
this.trash = new Path(fs.getHomeDirectory(), TRASH);
this.current = new Path(trash, CURRENT);
this.interval = conf.getLong("fs.trash.interval", 60) * MSECS_PER_MINUTE;
}
如上,fs.getHomeDirectory即为/user/${user.home}
,周期默认为1h。
还可以指定home目录进行构造1
2
3
4
5
6
7private Trash(Path home, Configuration conf) throws IOException {
super(conf);
this.fs = home.getFileSystem(conf);
this.trash = new Path(home, TRASH);
this.current = new Path(trash, CURRENT);
this.interval = conf.getLong("fs.trash.interval", 60) * MSECS_PER_MINUTE;
}
3.3.2 Emptier
Emptier为Trash的内部类,可以访问Trash的成员,Emptier的成员如下1
2
3private Configuration conf;
private FileSystem fs;
private long interval;
interval为线程工作周期
构造如下1
2
3
4
5public Emptier(Configuration conf) throws IOException {
this.conf = conf;
this.interval = conf.getLong("fs.trash.interval", 60) * MSECS_PER_MINUTE;
this.fs = FileSystem.get(conf);
}
因此,周期为配置项fs.trash.interval
,默认60min即1个小时。该周期既是检查周期,又是检查点删除周期。
在startTrashEmptier中通过Trash的getEmptier即创建Emptier对象,然后启动Emptier线程,线程主程序如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40public void run() {
if (interval == 0) return;//Trash功能被关闭了,返回
long now = System.currentTimeMillis();
long end;
while (true) {
end = ceiling(now, interval);
try { //等待清理时间到达
Thread.sleep(end - now);
} catch (InterruptedException e) {
return; // exit on interrupt
}
try {
now = System.currentTimeMillis();
if (now >= end) {
FileStatus[] homes = null;
try {
homes = fs.listStatus(HOMES); //列出所有的home目录,home目录位于/user下
} catch (IOException e) {
LOG.warn("Trash can't list homes: "+e+" Sleeping.");
continue;
}
if (homes == null)//没有home目录
continue;
for (FileStatus home : homes) { // dump each trash
if (!home.isDir())
continue;
try {
Trash trash = new Trash(home.getPath(), conf);//每一个home目录创建Trash对象清理
trash.expunge();//清理过期的垃圾检查点
trash.checkpoint();//将current目录重命名为当前时间作为一个检查点
} catch (IOException e) {
LOG.warn("Trash caught: "+e+". Skipping "+home.getPath()+".");
}
}
}
} catch (Exception e) {
LOG.warn("RuntimeException during Trash.Emptier.run() " + StringUtils.stringifyException(e));
}
}
}
如上,垃圾清理的周期为配置项fs.trash.interval
,默认1个小时,没有到达清理时间则休眠等待。
清理时,/user目录下的每一个目录为一个用户的主目录,对所有的home目录进行清理。
每个home目录创建一个Trash对象,通过Trash对象的expunge
方法进行清理。清理.Trash
目录下过期的垃圾检查点,垃圾检查点为之前current目录重命名后的目录,目录名为重命名时间,格式为CHECKPOINT。垃圾检查点从创建到现在超过期限了就会被删除,期限同样为配置项fs.trash.interval
,即默认垃圾检查点经过1个小时后便会被清理。
清理垃圾检查点后,将当前垃圾存放目录current创建检查点,即将current目录重命名为一个垃圾检查点,该垃圾检查点目录名为重命名日期,格式为CHECKPOINT,检查点创建由Trash的checkpoint方法产生,如上。
dopunge和checkpoint都比较简单,这里就不贴代码了。
总的来说,用户删除的文件会放到用户主目录的.Trash/Current
目录下,垃圾清理器默认每隔1个小时检查所有用户主目录,将用户当前使用的垃圾目录(.Trash/Current)创建检查点,检查点即将Current目录重命名为当前操作时间。且会检查每个垃圾检查点,对从创建时间(检查点名字)到现在默认超过1个小时的检查点进行删除。
也就是说,删除文件存放到’/user/${user.home}/.Trash/Current`目录下,然后过了1个小时后被移到检查点中,Current继续保存接下来新删除的文件,检查点再过一个小时被删除,因此用户删除的文件默认经过两个小时候会从HDFS中移除。
3.4 FSNamesystem
如前面initialize中,由FSNamesystem的构造函数创建FSNamesystem对象1
2
3
4
5
6
7
8
9
10
11
12
13
14
15FSNamesystem(NameNode nn, Configuration conf) throws IOException {
try {
initialize(nn, conf);
} catch (IOException e) {
LOG.error(getClass().getSimpleName() + " initialization failed.", e);
close();
shutdown();
throw e;
} catch (RuntimeException e) {
LOG.error(getClass().getSimpleName() + " initialization failed.", e);
close();
shutdown();
throw e;
}
}
如上,通过initialize对FSNamesystem进行初始化,异常时通过close关闭相关服务线程,通过shutdown关闭目录服务。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55private void initialize(NameNode nn, Configuration conf) throws IOException {
this.systemStart = now();//启动时间
setConfigurationParameters(conf);//通过配置初始化成员
dtSecretManager = createDelegationTokenSecretManager(conf);//口令安全管理器
this.nameNodeAddress = nn.getNameNodeAddress();//名字节点RPC服务地址
this.registerMBean(conf); // register the MBean for the FSNamesystemStutus
this.dir = new FSDirectory(this, conf);//创建FSDirectory对象
StartupOption startOpt = NameNode.getStartupOption(conf);
//读取存储目录,如果处于中间状态则进行相应的恢复,加载命名空间镜像和编辑日志并进行合并,创建新的检查点
this.dir.loadFSImage(getNamespaceDirs(conf), getNamespaceEditsDirs(conf), startOpt);
long timeTakenToLoadFSImage = now() - systemStart;
LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");//记录加载命名空间镜像时间
NameNode.getNameNodeMetrics().setFsImageLoadTime(timeTakenToLoadFSImage);
this.safeMode = new SafeModeInfo(conf);//创建SafeModeInfo对象
setBlockTotal();//更新总区块数
//创建pendingReplications
pendingReplications = new PendingReplicationBlocks(
conf.getInt("dfs.replication.pending.timeout.sec", -1) * 1000L);
if (isAccessTokenEnabled) {
//创建访问令牌处理器
accessTokenHandler = new BlockTokenSecretManager(true, accessKeyUpdateInterval, accessTokenLifetime);
}
//心跳处理线程
this.hbthread = new Daemon(new HeartbeatMonitor());
//租约管理器线程
this.lmthread = new Daemon(leaseManager.new Monitor());
//复制删除操作线程
this.replmon = new ReplicationMonitor();
this.replthread = new Daemon(replmon);
hbthread.start();
lmthread.start();
replthread.start();
//include和exclude文件的HostsFileReader对象
this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""), conf.get("dfs.hosts.exclude",""));
//撤销状态管理线程DecommissionManager
this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(
conf.getInt("dfs.namenode.decommission.interval", 30),
conf.getInt("dfs.namenode.decommission.nodes.per.interval", 5)));
dnthread.start();
//DNSToSwitchMapping对象
this.dnsToSwitchMapping = ReflectionUtils.newInstance(
conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
DNSToSwitchMapping.class), conf);
//dns到switch映射支持缓冲的话,从include列表中解析网络位置,存储解析结果在缓冲中,这样之后的解析请求就会快一点
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
}
InetSocketAddress socAddr = NameNode.getAddress(conf);
this.nameNodeHostName = socAddr.getHostName();
registerWith(DefaultMetricsSystem.INSTANCE);
}
如上,会依次完成如下工作
- 通过
setConfigurationParameters
读取配置初始化FSNamesystem中与配置相关的成员,比较重要的成员如下
FSNamesystem中成员 | 含义 | 配置项 | 缺省值 |
---|---|---|---|
supergroup | 拥有超级权限用户所在组 | dfs.permissions.supergroup | supergroup |
isPermissionEnabled | 是否使能权限访问 | dfs.permissions | true |
defaultPermission | 缺省权限 | N/A | 当前用户,超级组,777访问权限 |
blocksInvalidateWorkPct | ReplicationMonitor线程每次删除操作的数据节点占总数据节点比重 | dfs.namenode.invalidate.work.pct.per.iteration | 0.32 |
blocksReplWorkMultiplier | ReplicationMonitor线程每次复制操作,每个需要复制的数据节点执行区块数 | dfs.namenode.replication.work.multiplier.per.iteration | 2 |
clusterMap | 网络拓扑实现类 | net.topology.impl | NetworkTopology |
replicator | 区块放置策略实现类,决定区块在集群中放置位置 | dfs.block.replicator.classname | BlockPlacementPolicyDefault |
defaultReplication | 缺省期望副本数 | dfs.replication | 3 |
maxReplication | 最大副本数 | dfs.replication.max | 512 |
minReplication | 最小副本数,达到此值才是安全的区块,可以进行复制操作 | dfs.replication.min | 1 |
maxReplicationStreams | 数据节点同时执行复制操作的数目 | dfs.max-repl-streams | 2 |
heartbeatRecheckInterval | HeartbeatMonitor线程检查周期 | heartbeat.recheck.interval | 5*60*1000(5min) |
heartbeatExpireInterval | 判断数据节点死亡的未发送心跳间隔 | N/A | 2*heartbeatRecheckInterval+10*heartbeatInterval(5min30s) |
replicationRecheckInterval | ReplicationMonitor线程检查周期 | dfs.replication.interval | 3(s) |
defaultBlockSize | 缺省区块大小 | dfs.block.size | 64*1024*1024(64MB) |
maxFsObjects | 最大INodeFile对象数目 | dfs.max.objects | 0(无限制) |
blockInvalidateLimit | ReplicationMonitor线程的一次处理中,每个数据节点删除区块限制 | dfs.block.invalidate.limit | 20*(heartbeatInterval/1000)即60 |
durableSync | N/A | dfs.durable.sync | true |
staleInterval | 数据节点成旧周期,即该时间内没有心跳到来则判断数据节点成旧 | dfs.namenode.stale.datanode.interval | 30*1000(30s) |
- 创建FSDirectory对象,与目录树,命名空间镜像,编辑日志等相关的操作由该对象负责,FSDirectory包含FSImage成员,而FSImage对象又包含FSEditLog成员,这些在其他文章已经分析过了
- FSDirectory的loadFSImage方法。可能会根据启动选项进行格式化,然后分析本地存储目录,如果处于中间状态(类似DataNode,更新提交等中间状态)进行恢复,然后读取最新的命名空间镜像和编辑日志,进行合并,这些就是从本地文件中恢复(由FSImage的recoverTransitionRead方法完成)。恢复到上次工作状态后,需要将合并后新的命名空间镜像保存到本地,首先将之前的工作目录current目录重命名为lastcheckpoint.tmp,然后将新的镜像保存到current中,打开新的编辑日志文件,最后将lastcheckpoint.tmp重命名为previous.checkpoint,即一次更新操作(由FSImage的saveNamespace方法完成)。
recoverTransitionRead和saveNamespace方法不再分析。 - 创建安全模式管理对象SafeModeInfo,因为前面已经加载了镜像并合并了编辑日志,现在内存中信息就是上次关闭前的工作状态,通过setBlockTotal更新SafeModeInfo的成员blockTotal,关于安全模式另见NameNode安全模式
- 创建PendingReplicationsBlocks对象,即初始化pendingReplications成员,会启动相关PendingReplicationMonitor线程,复制超时时间为配置项
dfs.replication.pending.timeout.sec
,默认-1表示使用缺省值5min。pendingReplications以及PendingReplicationMonitor线程另见NameNode维护的数据结构。 - 创建并启动心跳检查线程(HeartbeatMonitor),租约检查线程(LeaseManager.Monitor),区块复制/删除线程(ReplicationMonitor),HeartbeatMonitor线程另见NameNode心跳检测,LeaseManager.Monitor另见NameNode租约管理,ReplicationMonitor线程另见NameNode区块复制和删除线程。
- 创建解析include文件和exclude文件的HostsFileReader对象,另见NameNode数据节点生存期管理。
- 创建并启动用于检测并改变撤销数据节点状态的DecommissionManager线程,DecommissionManager另见NameNode数据节点生存期管理,如上,检查周期为30s,每次检查5个数据节点。
- 创建网络拓扑相关的DNSToSwitchMapping对象,DNSToSwitchMapping另见NameNode区块复制和删除线程,如上,默认为
ScriptBasedMapping
,实现了CachedDNSToSwitchMapping,因此创建后会进行解析,如果配置了脚本,则会使用脚本进行解析,脚本处理的输入为IP地址形式,每次默认处理100个输入。解析结果缓存在ScriptBasedMapping成员cache中。而如果没有配置脚本,则所有的主机会被解析成/default-rack
。
4. 停止
NameNode停止由stop方法完成1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29public void stop() {
if (stopRequested)//用于测试
return;
stopRequested = true;
if (plugins != null) {//停止所有插件
for (ServicePlugin p : plugins) {
try {
p.stop();
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be stopped", t);
}
}
}
try {
if (httpServer != null) httpServer.stop();//停止HTTP服务
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
}
if(namesystem != null) namesystem.close();//关闭FSNamesystem相关线程
if(emptier != null) emptier.interrupt();//中断垃圾清理器
if(server != null) server.stop();//停止RPC服务
if(serviceRpcServer != null) serviceRpcServer.stop();//如果serviceRpcServer存在,停止
if (myMetrics != null) {//关闭度量对象
myMetrics.shutdown();
}
if (namesystem != null) {//关闭FSNamesystem
namesystem.shutdown();
}
}
如上,与启动对应,停止所有插件(插件就两个方法,start启动,stop停止),停止HTTPServer,停止FSNamesystem中相关服务,中断垃圾清理器,停止RPC服务。
其中RPC服务可能由两个因此对应的进行停止,RPC服务器的stop另见RPC相关分析。
4.1 垃圾清理器中断
如前面垃圾清理器线程的主程序中,通过interrupt会中断可能处于休眠等待状态的垃圾清理器,因为垃圾清理器默认1小时执行一次,因此很有可能处于休眠状态从而唤醒返回。而如果处于工作状态,如在expunge清理或checkpoint生成检查点过程中,则要等到这次工作完成。
4.2 FSNamesystem的关闭
如上,首先调用FSNamesystem的close方法,然后调用shutdown方法,在初始化FSNamesystem对象失败时也是这个顺序
4.2.1 FSNamesystem.close
1 | public void close() { |
如上,修改fsRunning为false,线程都会循环判断该标识。
4.2.1.1 线程中断
通过pendingReplications.stop()中断对应的PendingReplicationMonitor线程,PendingReplicationMonitor默认5min检查一次(操作一次后休眠5min),因此可能将其从休眠等待中唤醒从而返回,如果处于检查状态则要等待检查完成。
HeartbeatMonitor线程每5min检查一次(当前时间距离上次检查时间超过了5min),ReplicationMonitor线程默认每3s检查一次(每次操作后休眠3s),DecommissionManager线程默认每30s检查一次(每次操作后休眠30s),SafeModeMonitor线程默认1s检查一次(每次操作后休眠1s),LeaseManager.Monitor线程默认2s检查一次(每次操作后休眠2s),如果这些线程处于相关操作中,则要等待本次操作完成才会返回,如果处于休眠等待则会被唤醒并返回。
如上,最后会等待线程完成,最多等待3s中。
4.2.1.2 关闭FSDirectory
1 | public void close() throws IOException { |
关闭FSImage1
2
3
4void close() throws IOException {
getEditLog().close();
unlockAll();
}
如上,关闭FSEditLog,然后释放所有的存储目录锁,FSEditLog的close方法如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public synchronized void close() throws IOException {
while (isSyncRunning) {//正在同步,等待
try {
wait(1000);
} catch (InterruptedException ie) {
}
}
if (editStreams == null) {
return;
}
printStatistics(true);
numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;//清零统计量
for (int idx = 0; idx < editStreams.size(); idx++) {
EditLogOutputStream eStream = editStreams.get(idx);
try {
eStream.setReadyToFlush();
eStream.flush();
eStream.close();//刷新所有的输出流EditLogFileOutputStream到底层文件,然后关闭输出流
} catch (IOException ioe) {
removeEditsAndStorageDir(idx);
idx--;
}
}
editStreams.clear();//清除editStreams成员
}
如上,如果当前正在同步,则等待,然后清零相关统计量,刷新素有的EditLogFileOutputStream输出流当前缓存日志到底层文件,然后关闭输出流关闭文件,最后清除editStreams成员。同步和EditLogFileOutputStream相关另见NameNode命名空间镜像和编辑日志
4.2.1.3 关闭BlocksMap
由close方法完成1
2
3void close() {
blocks.clear();
}
即清除所有的区块记录
4.2.2 FSnamesystem.shutdown
1 | public void shutdown() { |
如上主要是FSDirectory的shutdown方法,如下1
2
3void shutdown() {
nameCache.reset();
}
即清除NameCache相关对象,NameCache缓存最频繁使用的文件,不具体分析。