NameNode实现源码分析---数据节点生存期管理

Hadoop版本:Hadoop-1.2.1
参考:《Hadoop技术内幕-深入解析Hadoop Common和HDFS架构设计与实现原理》


1. include文件和exclude文件

可以根据配置文件对HDFS中的数据节点进行管理,配置项dfs.hosts为指定能够连接到名字节点的数据节点列表文件(简称include文件),而配置项dfs.hosts.exclude指定不能连接到名字节点的数据节点列表文件(简称exclude文件)。
添加数据节点时,往include中增加相应记录后,可以通过dfsadmin命令hadoop dfsadmin -refreshNodes刷新名字节点信息,然后相应的启动数据节点,则在HDFS中增加了数据节点。
撤销数据节点时,往exclude中增加相应记录后,通过dfsadmin工具刷新后,名字节点便会开始撤销数据节点,被撤销数据节点上的数据会复制到其他数据节点上,复制时数据节点处于”正在撤销”状态,复制完后处于”已撤销”状态。
dfsadmin的refreshNodes命令使用ClientProtocol的refreshNodes方法,NameNode的实现如下

1
2
3
public void refreshNodes() throws IOException {
namesystem.refreshNodes(new Configuration());
}

由FSNamesystem提供服务,如下

1.1 refreshNodes

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void refreshNodes(Configuration conf) throws IOException {
checkSuperuserPrivilege();//要拥有超级用户权限
if (conf == null)
conf = new Configuration();
//更新include文件和exclude文件名
hostsReader.updateFileNames(conf.get("dfs.hosts",""), conf.get("dfs.hosts.exclude", ""));
hostsReader.refresh();//重新读取include列表和exclude列表
synchronized (this) {
for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next();
if (!inHostsList(node, null)) {//现存的数据节点不在include中,设置为DECOMMISSIONED状态
node.setDecommissioned(); // case 2.
} else {
if (inExcludedHostsList(node, null)) {
//既在include中,又在exclude中,如果不处于正在撤销或已撤销状态,则更新状态为DECOMMISSION_INPROGRESS,然后开始撤销过程
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
startDecommission(node); //开始撤销过程
}
} else {//在include中,但不在exclude中,如果当前处于正在撤销或已撤销状态,停止撤销

if (node.isDecommissionInProgress() || node.isDecommissioned()) {
stopDecommission(node); // case 4.
}
}
}
}
}
}

如上,include文件和exclude文件的读取由FSNamesystem的成员hostsReader负责,为HostsFileReader类型。
首先重新读取配置,更新include文件名和exclude文件名,include文件名保存在HostsFileReader的成员includesFile中,exclude文件名保存在excludesFile中

1.1.1 读取include和exclude文件

然后通过refresh重新读取include文件和exclude文件,主要过程如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
FileInputStream fis = new FileInputStream(file);
BufferedReader reader = null;
try {
reader = new BufferedReader(new InputStreamReader(fis));
String line;
while ((line = reader.readLine()) != null) {
String[] nodes = line.split("[ \t\n\f\r]+");
if (nodes != null) {
for (int i = 0; i < nodes.length; i++) {
if (!nodes[i].equals("")) {
set.add(nodes[i]); // might need to add canonical name
}
}
}
}
}

因此,文件中的数据节点中的一项(具体为什么见后面)可以通过空格,制表符,回车符,换行符,换页符进行分隔(可以多个组合)。读取的include中数据节点列表保存在成员includes中,exclude中数据节点列表保存在成员excludes中,两者都为HashSet。

读取完include和exclude文件后,对于当前管理的数据节点datanodeMap进行分析,有一下集中情况

  • 数据节点不在include列表中,更新状态为DECOMMISSIONED,已撤销状态
  • 数据节点在include列表中,同时在exclude列表中,如果当前不处于正在撤销或已撤销状态,则设置为DECOMMISSION_INPROGRESS即正在撤销状态,然后开始撤销过程
  • 数据节点在include列表中,但不在exclude列表中,如果当前处于正在撤销或已撤销状态,将AdminStates成员adminState置空来停止撤销过程。

判断数据节点是否在include列表中由inHostsList完成,是否在exclude列表中由inExcludedHostsList完成

1.1.2 inHostsList

1
2
3
4
5
6
7
8
9
private boolean inHostsList(DatanodeID node, String ipAddr) {
Set<String> hostsList = hostsReader.getHosts();
return (hostsList.isEmpty() ||
(ipAddr != null && hostsList.contains(ipAddr)) ||
hostsList.contains(node.getHost()) ||
hostsList.contains(node.getName()) ||
((node instanceof DatanodeInfo) &&
hostsList.contains(((DatanodeInfo)node).getHostName())));
}

如上,hostsList即为HostsReader中的includes成员,即include列表。在include列表中有一下情况

  • include列表为空,即如果没有配置include文件或者include文件为空,则默认所有的数据节点在include列表中
  • 如果指定了数据节点的IP,则include列表中包含该IP
  • include列表中包含数据节点的主机名(name)
  • include列表中包含数据节点的主机名和端口(hostname:portNumber,端口为流服务端口)
  • 给定的节点为DatanodeInfo,如果配置了IP(hostname),则判断include列表是否包含IP,否则判断是否包含主机名

由上可知,如果不配置include文件或者include文件为空,则默认所有的数据节点在include文件中。同时可以反推,include中可以配置的项为

  • IP地址,这时需要指定数据节点的IP地址,或者在数据节点中配置了hostname成员为IP
  • 主机名(DatanodeID成员name去掉端口)
  • 主机名和端口(hostname:portNumber,DatanodeID成员name),端口为流服务端口

1.1.3 inExcludedHostsList

1
2
3
4
5
6
7
8
private boolean inExcludedHostsList(DatanodeID node, String ipAddr) {
Set<String> excludeList = hostsReader.getExcludedHosts();
return ((ipAddr != null && excludeList.contains(ipAddr)) ||
excludeList.contains(node.getHost()) ||
excludeList.contains(node.getName()) ||
((node instanceof DatanodeInfo) &&
excludeList.contains(((DatanodeInfo)node).getHostName())));
}

与include类似,不过如果没有配置exclude文件或者exclude文件为空,则默认所有的数据节点不在exclude列表中。其他的一样,可以配置数据节点项为IP地址,主机名,主机名加端口

2. 撤销数据节点

如前面分析,如果数据节点不在include列表中,直接更新状态为已撤销,则通过countNodes更新区块副本状态时,已撤销节点上的所有区块为已撤销状态,不是有效副本,有效副本数减少,某些情况下会触发区块复制。
如果数据节点在include列表中,同时在exclude列表中,则需要撤销该数据节点。撤销通过startDecommission开始

2.1 startDecommission

1
2
3
4
5
6
7
8
9
private void startDecommission (DatanodeDescriptor node) throws IOException {
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
LOG.info("Start Decommissioning node " + node.getName());
node.startDecommission();//设置状态为DECOMMISSION_INPROGRESS
node.decommissioningStatus.setStartTime(now());//更新数据节点撤销开始时间
//检查所有区块,对于没有达到副本数要求的区块添加到neededReplications中
checkDecommissionStateInternal(node);
}
}

如上,设置数据节点状态为DECOMMISSION_INPROGRESS,然后更新撤销开始时间为当前时间,最后通过checkDecommissionStateInternal检查数据节点的所有区块,对于没有达到期望副本的区块添加到neededReplications中,等待复制。当所有的区块副本数达到期望值时,更新状态为已撤销。
checkDecommissionStateInternal如下

1
2
3
4
5
6
7
8
9
10
11
12
boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
if (node.isDecommissionInProgress()) {

if (!isReplicationInProgress(node)) {//所有区块副本数达到期望值时返回false
node.setDecommissioned();
LOG.info("Decommission complete for node " + node.getName());
}
}
if (node.isDecommissioned()) {
return true;
}
return false;
}

如上,添加没有达到期望副本值的区块到neededReplications中在isReplicationInProgress中完成,当所有区块达到副本数时方法返回false,从而设置数据节点的状态为DECOMMISSIONED
isReplicationInProgress如下

2.1.1 isReplicationInProgress

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
...
for(final Iterator<Block> i = srcNode.getBlockIterator(); i.hasNext(); ) {
final Block block = i.next();
INode fileINode = blocksMap.getINode(block);
if (fileINode != null) {
NumberReplicas num = countNodes(block);//更新区块副本状态
int curReplicas = num.liveReplicas();
int curExpectedReplicas = getReplication(block);
if (curExpectedReplicas > curReplicas) {//副本数没有达到期望值
...
underReplicatedBlocks++;//增加正在复制区块统计量
if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {//只在正在撤销的数据节点上有副本
decommissionOnlyReplicas++;//增加只在正在撤销的数据节点上有副本的区块统计量
}
if (fileINode.isUnderConstruction()) {//为打开的文件中的副本
underReplicatedInOpenFiles++;
}
//没有正在执行的复制操作,且neededReplications中不存在该Block,添加到neededReplications
if (!neededReplications.contains(block) &&
pendingReplications.getNumReplicas(block) == 0) {
neededReplications.add(block, curReplicas, num.decommissionedReplicas(), curExpectedReplicas);
}
}
}
}
srcNode.decommissioningStatus.set(underReplicatedBlocks, decommissionOnlyReplicas, underReplicatedInOpenFiles);
return status;
}

如上,countNodes中更新区块副本状态,当前数据节点现在处于DECOMMISSION_INPROGRESS状态,因此countNodes后,本数据节点上的区块副本全部变为decommissionedReplicas(NumberReplicas成员)副本而不是有效副本,有效副本数减少,从而添加到neededReplications中,复制到其他数据节点中。
最终更新数据节点的正在撤销状态成员decommissioningStatus,为DecommissioningStatus类,成员如下

1
2
3
4
int underReplicatedBlocks;//正在复制的区块数
int decommissionOnlyReplicas;//只在正在撤销的数据节点上有副本的区块数
int underReplicatedInOpenFiles;//属于打开文件的区块数
long startTime;//撤销的开始时间

如上,startTime在数据节点状态更新为DECOMMISSION_INPROGRESS后初始化了,在最后更新其他三个成员。

因此,撤销数据节点时,只做一次遍历,将没有达到副本期望值的区块添加到neededReplications中等待复制,那么当所有区块复制完了,怎么设置数据节点状态为已撤销状态呢。遍历工作由DecommissionManager.Monitor线程完成

3. DecommissionManager.Monitor线程撤销状态更新

FSnamesystem成员dnthread DecommissionManager.Monitor线程会周期性检查处于DECOMMISSION_INPROGRESS状态的数据节点,数据节点处于DECOMMISSION_INPROGRESS状态后要确保所有区块副本数达到期望值才能最终关闭,也因此在DataNode的关闭过程可能需要长时间等待。
dnthread成员初始化如下

1
2
3
4
this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(
conf.getInt("dfs.namenode.decommission.interval", 30),
conf.getInt("dfs.namenode.decommission.nodes.per.interval", 5)));
dnthread.start();

DecommissionManager中维护了所属的FSNamesystem对象,看Monitor线程

1
2
3
4
Monitor(int recheckIntervalInSecond, int numNodesPerCheck) {
this.recheckInterval = recheckIntervalInSecond * 1000L;
this.numNodesPerCheck = numNodesPerCheck;
}

如上,recheckInterval为检查周期,由配置项dfs.namenode.decommission.interval确定,默认30s检查一次。
numNodesPerCheck为每个周期检查DataNode的个数,由配置项dfs.namenode.decommission.nodes.per.interval确定,默认一次检查5个DataNode。
另外,还有一个成员firstkey,缺省为空,为从datanodeMap开始检查的数据节点StorageID。
线程如下

1
2
3
4
5
6
7
8
9
10
11
12
public void run() {
for(; fsnamesystem.isRunning(); ) {
synchronized(fsnamesystem) {
check();
}
try {
Thread.sleep(recheckInterval);
} catch (InterruptedException ie) {
LOG.info("Interrupted " + this.getClass().getSimpleName(), ie);
}
}
}

由check方法执行检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void check() {
int count = 0;
//遍历datanodeMap中每一个DatanodeDescriptor对象
for(Map.Entry<String, DatanodeDescriptor> entry
: new CyclicIteration<String, DatanodeDescriptor>(fsnamesystem.datanodeMap, firstkey)) {
final DatanodeDescriptor d = entry.getValue();
firstkey = entry.getKey();

if (d.isDecommissionInProgress()) {//如果处于DECOMMISSION_INPROGRESS,检查是否所有的区块副本数满足要求
try {
//检查是否所有区块副本数满足要求,如果是则更新DataNode状态为DECOMMISSIONED
fsnamesystem.checkDecommissionStateInternal(d);
} catch(Exception e) {
LOG.warn("entry=" + entry, e);
}
if (++count == numNodesPerCheck) {
return;
}
}
}
}

如上,遍历datanodeMap中从firstKey开始的所有DatanodeDescriptor,对于处于DECOMMISSION_INPROGRESS状态的DataNode,如果所有区块副本数达到了期望值,则更新状态为DECOMMISSIONED,否则没有达到副本期望值的区块,如果不在neededReplications中则添加,等待复制,已经在neededReplications中的当然不需要继续添加了。
checkDecommissionStateInternal前面已经分析过了。

因此DecommissionManager.Monitor线程会周期性检查正在撤销的数据节点上所有的区块副本数是否达到期望值,完成后设置数据节点状态为已撤销状态。

4. 取消撤销

如前include文件和exclude文件的分析,refreshNodes时发现数据节点在include列表中且不在exclude列表中,不过此时正处于撤销状态或者处于已撤销状态,则取消撤销,取消撤销由stopDecommission完成

1
2
3
4
5
6
7
8
public void stopDecommission (DatanodeDescriptor node) throws IOException {
LOG.info("Stop Decommissioning node " + node.getName());
node.stopDecommission();
}

public void stopDecommission() {
adminState = null;
}

如上,更新数据节点的adminState为null,这样在countNodes方法被调用时,该数据节点上原来区块为已撤销副本变为了有效副本,副本数增加。
对于添加在neededReplications中还没执行复制的副本,在执行操作时,选择复制的源数据节点和目标数据节点前都会通过countNodes更新副本状态(ReplicationMonitor线程),就会发现有效副本数增加了,从而可能不会继续进行复制。
而对于在取消撤销前就已经执行了操作,选择好源数据节点目标数据节点,添加到源数据节点的待复制区块列表中的,则只能执行这个不该执行的复制操作(如果取消撤销后副本数本来达到了期望值),然后会在区块报告时发现副本数超出了期望值,做出相应的删除操作。

5. 添加数据节点

前面refreshNodes中,只处理了datanodeMap中的数据节点,即现有管理的数据节点。那么通过在include或exclude中添加数据节点,然后启动新的数据节点,数据节点向NameNode注册时会怎么处理,拒绝还是允许连接呢。
数据节点启动时会通过DatanodeProtocol代理的register方法向NameNode注册,注册时会通过verifyNodeRegistration方法判断该数据节点是否能够连接到NameNode上(FSNamesystem的registerDatanode方法中),如下

1
2
3
4
5
6
7
8
9
10
String dnAddress = Server.getRemoteAddress();//数据节点IP,当前处于数据节点的RPC调用,能获得数据节点IP
if (dnAddress == null) {
//如果不能获取数据节点IP,则通过数据节点传过来的注册信息DatanodeRegistration获取DatanodeID中的主机名(不包括端口)
//数据节点每次和NameNode的RPC通信都会发送标识该数据节点的DatanodeRegistration。
dnAddress = nodeReg.getHost();
}

if (!verifyNodeRegistration(nodeReg, dnAddress)) {//该数据节点是否能够连接到NameNode中
throw new DisallowedDatanodeException(nodeReg);
}

如上,FSNamesystem的verifyNodeRegistration在register实现中被调用(即register其实还是由FSNamesystem提供服务),因此当前处于RPC调用中,一般可以获得对端数据节点的IP地址,如果不能获得IP地址,则使用数据节点的注册信息获取数据节点主机名(DatanodeID中成员name去掉端口号),每次数据节点和名字节点的RPC通信都会发送数据节点的注册信息DatanodeRegistration以标识该数据节点,因此总能获得数据节点的主机名。

然后通过verifyNodeRegistration验证该数据节点是否能够注册,不能注册抛出异常,注册失败

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private synchronized boolean verifyNodeRegistration(DatanodeRegistration nodeReg, String ipAddr) throws IOException {
if (!inHostsList(nodeReg, ipAddr)) {//不在include列表中,不允许注册
return false;
}
if (inExcludedHostsList(nodeReg, ipAddr)) {//既在include列表中,又在exclude列表中,
DatanodeDescriptor node = getDatanode(nodeReg);
if (node == null) {//数据节点信息不存在,抛出异常
throw new IOException("verifyNodeRegistration: unknown datanode " + nodeReg.getName());
}
if (!checkDecommissionStateInternal(node)) {//如果数据节点正常工作,撤销数据节点
startDecommission(node);
}
}
//在include中,不在exclude中,返回true允许注册
return true;
}

如上,验证时,如果不在include列表中,则直接不能注册,返回false。这种情况包括不在include中且不在exclude中,不在include中在exclude中,注意的是,如果include没配置或为空则默认在include中,而exclude没配置或为空,则默认不在exclude中。
如果在include列表中,且不在exclude列表中,则能注册,返回true。
如果在include列表中,且在exclude列表中,则会在datanodeMap中查找该数据节点。如果当前数据节点不存在,抛出异常,注册失败。否则当前数据节点存在了且处于正常状态,则撤销数据节点,如果处于正在撤销状态则更新一次状态,而如果已经处于了已撤销状态,则不用处理。因此对于既在include列表中,又在exclude列表中的情况,如果对应数据节点不存在则抛出异常,不能注册,否则要将当前数据节点进行撤销,然后返回true可以注册。

最后,对include和exclude做下总结。

  • include和exclude文件的格式。include和exclude中每个数据节点可以通过空格,换行符,回车符,制表符,换页符进行分隔,当然最好一行一个数据节点。数据节点可以配置IP地址,数据节点的主机名,数据节点的主机名加端口,这里的主机名和端口都是DatanodeID中的name成员,同时在include或exclude文件中这三种方式可以混杂
  • include和exclude默认规则。如果没有配置include文件或者include为空,则默认所有数据节点在include中,而如果没有配置exclude文件或exclude列表为空,则默认所有数据节点不在exclude中。
  • 数据节点向NameNode注册时,不在include中的数据节点不允许注册;在include中且在exclude中的数据节点如果存在会撤销然后允许重新注册,不存在则抛出异常,不允许注册;在include中且不在exclude中的数据节点允许注册。
  • datanodeMap中数据节点,通过refreshNodes刷新时,不在include中数据节点不应该存在于集群中,直接置为已撤销状态;在include中且在exclude中的也不应该存在于集群中,如果当前处于正常状态,则撤销;在include中且不在exclude中的数据节点,为允许存在的数据节点,如果当前处于正在撤销或已经撤销状态,将adminState置空从而停止撤销。