NameNode实现源码分析---心跳检测

Hadoop版本:Hadoop-1.2.1
参考:《Hadoop技术内幕-深入解析Hadoop Common和HDFS架构设计与实现原理》


数据节点会周期性通过DatanodeProtocol代理向NameNode发送心跳,在NameNode中更新数据节点的活动时间,如果数据节点长时间没有心跳到来,则NameNode会判断该数据节点死亡,由HeartbeatMonitor执行该扫描操作

1. HeartbeatMonitor

HeartbeatMonitor线程用于周期性检查注册的数据节点心跳和更新访问令牌信息。HeartbeatMonitor线程初始化如下

1
2
this.hbthread = new Daemon(new HeartbeatMonitor());
hbthread.start();

HeartbeatMonitor包含两个成员

1
2
private long lastHeartbeatCheck;//上次心跳检查时间
private long lastAccessKeyUpdate;//上次访问令牌更新时间

线程主程序如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void run() {
while (fsRunning) {
try {
long now = now();
if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {//达到心跳检查时间,心跳检查
heartbeatCheck();
lastHeartbeatCheck = now;
}
//使能了口令检查且达到了检查时间,口令更新
if (isAccessTokenEnabled && (lastAccessKeyUpdate + accessKeyUpdateInterval < now)) {
updateAccessKey();
lastAccessKeyUpdate = now;
}
} catch (Exception e) {
FSNamesystem.LOG.error(StringUtils.stringifyException(e));
}
try {
Thread.sleep(5000); // 5 seconds
} catch (InterruptedException ie) {
}
}
}

如上,每5s查看是否达到了心跳检查时间,心跳检查周期为FSNamesystem成员heartbeatRecheckInterval,配置项heartbeat.recheck.interval,默认5min。
如果配置了执行口令更新操作,即配置项dfs.block.access.token.enable(默认为false),则每过dfs.block.access.key.update.interval(默认10h)更新一次数据节点访问口令。这里不分析口令的更新。

心跳检查由heartbeatCheck完成,检查完后更新lastHeartbeatCheck上次检查时间

2. heartbeatCheck

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void heartbeatCheck() {
if (isInSafeMode()) {//安全模式不进行心跳检查
return;
}
boolean allAlive = false;
while (!allAlive) {//循环检查直到没有过期节点
boolean foundDead = false;
DatanodeID dead = null;
int numOfStaleNodes = 0;

synchronized (heartbeats) {
for (Iterator<DatanodeDescriptor> it = heartbeats.iterator(); it .hasNext();) {
DatanodeDescriptor nodeInfo = it.next();
if (dead == null && isDatanodeDead(nodeInfo)) {//一次处理一个过期节点
foundDead = true;
dead = nodeInfo;
}
if (nodeInfo.isStale(this.staleInterval)) {//统计所有陈旧节点数目
numOfStaleNodes++;
}
}
setNumStaleNodes(numOfStaleNodes);//更新成员numStaleNodes
}

if (foundDead) {
synchronized (this) {
synchronized(heartbeats) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeInfo = null;
try {
nodeInfo = getDatanode(dead);
} catch (IOException e) {
nodeInfo = null;
}
if (nodeInfo != null && isDatanodeDead(nodeInfo)) {
NameNode.stateChangeLog.info("BLOCK* heartbeatCheck: " + "lost heartbeat from " + nodeInfo.getName());
removeDatanode(nodeInfo);//移除过期节点,一次循环中移除一个过期节点
}
}
}
}
}
allAlive = !foundDead;
}
}

如上,会循环判断并移除过期(expired)或者说死亡(dead,以下说的过期节点都等同死亡节点)节点,直到没有过期节点,每次移除一个过期节点。
在移除过期节点的过程中会持有heartbeats对象锁,如果在持有该锁过程中处理多个过期节点,则因为更新数据节点的心跳时间也需要持有heartbeats锁(handleHeartbeat中),这样多个数据节点的移除时间过长,数据节点心跳到来时长时间获取不到heartbeats锁,长时间更新不了自己的心跳时间,可能会导致越来越多的数据节点误判为过期节点。因此每次只移除一个过期节点,多次判断处理,直到所有的过期节点移除完,如上。

判断数据节点是否过期由isDatanodeDead完成

1
2
3
private boolean isDatanodeDead(DatanodeDescriptor node) {
return (node.getLastUpdate() < (now() - heartbeatExpireInterval));
}

即当前时间超出了上次心跳时间加上数据节点过期周期,过期周期由FSNamesystem成员heartbeatExpireInterval,初始化如下

1
2
long heartbeatInterval = conf.getLong("dfs.heartbeat.interval", 3) * 1000;
this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval + 10 * heartbeatInterval;

如上,过期时间为2倍的心跳检查周期加上10倍的心跳周期heartbeatInterval,即默认情况下数据节点过期时间为2*5min+10*3s=10分钟30s。超过这个10min30s还没收到数据节点的心跳,则NameNode判断该数据节点死亡。

另外,每次循环中还会统计当前陈旧(stale)的数据节点,更新FSNamesystem的成员numStaleNodes,是否成旧由DatanodeDescriptor的isStale方法判断,成旧时间为FSNamesystem成员staleInterval,为配置项dfs.namenode.stale.datanode.interval,默认为30s,最小为dfs.namenode.stale.datanode.minimum.interval(默认3)倍的心跳周期,即最小为3*3s=9s,默认情况下,NameNode至少要经过三个心跳周期,如果还没收到数据节点心跳,才会判定该数据节点成旧。

对于过期节点,通过removeDatanode移除,移除过期节点的过程中,需要获取heartbeats锁

3. removeDatanode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void removeDatanode(DatanodeDescriptor nodeInfo) {
synchronized (heartbeats) {
if (nodeInfo.isAlive) {//节点当前还活着
updateStats(nodeInfo, false);//更新集群数据统计信息
heartbeats.remove(nodeInfo);//从heartbeats中移除该数据节点
nodeInfo.isAlive = false;//置isAlive为false
}
}
//移除所有的区块
for (Iterator<Block> it = nodeInfo.getBlockIterator(); it.hasNext();) {
removeStoredBlock(it.next(), nodeInfo);
}
unprotectedRemoveDatanode(nodeInfo);//重置数据节点的成员,从recentInvalidateSets中移除记录
clusterMap.remove(nodeInfo);//从网络拓扑中移除

if (safeMode != null) {//检查并根据条件触发安全模式
safeMode.checkMode();
}
}

如上,如果当前节点还是存活状态,则更新集群的数据统计信息(如使用量,剩余量等),然后从heartbeats中移除该数据节点,设置数据节点的isAlive为false。
然后通过removeStoredBlock移除该数据节点上的所有区块,将区块信息从相应的数据结构中移除,通过unprotectedRemoveDatanode重置数据节点的成员,并从recentInvalidateSets中移除数据节点记录,从网络拓扑clusterMap中移除数据节点信息。总的来说就是从与数据节点和区块相关的数据节点中移除相关记录。
最后,因为移除了一个数据节点,要检查安全模式,如果满足条件要进入安全模式进行相关操作。安全模式后文会详细分析。

3.1 updateStats

updateStats根据传入的数据节点信息,更新集群中资源统计信息,数据节点可能加入到集群中,也可能从集群中移除,加入时第二个参数为true,移除时为false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void updateStats(DatanodeDescriptor node, boolean isAdded) {
//统计信息被heartbeats锁保护,该方法调用前确保已经获得了heartbeats锁
assert(Thread.holdsLock(heartbeats));
if (isAdded) {//添加数据节点
capacityTotal += node.getCapacity();
capacityUsed += node.getDfsUsed();
capacityRemaining += node.getRemaining();
totalLoad += node.getXceiverCount();
} else {//移除数据节点
capacityTotal -= node.getCapacity();
capacityUsed -= node.getDfsUsed();
capacityRemaining -= node.getRemaining();
totalLoad -= node.getXceiverCount();
}
}

如上,添加数据节点时,根据添加的数据节点统计信息,相应的增加集群中总容量capacityTotal,已使用容量capacityUsed,剩余容量capacityRemaining,总负载(流操作相关线程个数),而移除数据节点时减少对应的值。

3.2 removeStoredBlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) {
NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: " +block + " from "+node.getName());
if (!blocksMap.removeNode(block, node)) {//从blockMap中移除记录
NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: " +block+" has already been removed from node "+node);
return;
}

//如果对应文件还存在,更新在neededReplications中优先级
//文件还存在,只是说因为数据节点关闭该文件区块副本少了一个,因此更新neededReplications
//即如果之前副本数满足期望值,减少了一个少于期望值会增加到neededReplications中,进行复制操作
//而如果之前就在neededReplications中,减少了副本数会相应增加优先级,从一个队列中移到更高优先级队列中,更优先复制
INode fileINode = blocksMap.getINode(block);
if (fileINode != null) {
decrementSafeBlockCount(block);
updateNeededReplications(block, -1, 0);//更新neededReplications
}

//区块在数据节点上已经移除了,所以在excessReplicateMap中对应的记录删除
Collection<Block> excessBlocks = excessReplicateMap.get(node.getStorageID());
if (excessBlocks != null) {
if (excessBlocks.remove(block)) {
excessBlocksCount--;
NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: " + block + " is removed from excessBlocks");
if (excessBlocks.size() == 0) {//数据节点上所有的超出期望值的副本区块都处理完了,移除该数据节点对应记录
excessReplicateMap.remove(node.getStorageID());
}
}
}
//区块在数据节点上已经移除了,所以在corruptReplicas中移除记录
corruptReplicas.removeFromCorruptReplicasMap(block, node);
}

如上,removeStoredBlock会更新数据节点和数据节点相关区块的大部分元数据信息。

首先从BlocksMap中更新记录,包括移除区块和数据节点的双向关系,如果BlocksMap中区块对应所有副本都不存在了且文件不存在了则会从BlocksMap移除区块记录。

然后通过updateNeededReplications更新在neededReplications中的元数据,即如果之前副本数满足要求,在neededReplications中应该没有记录,这时删除了一个副本,副本数应该会小于期望值,则会添加到neededReplications中的相应队列中。而如果之前副本数本来不满足要求,存在于neededReplications中,这时副本数减少,则在neededReplications的相应优先级会增加,从低优先级队列中移到高优先级队列中。

然后从excessReplicateMap和corruptReplicas中移除相关元数据。

3.2.1 BlocksMap.removeNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
boolean removeNode(Block b, DatanodeDescriptor node) {
BlockInfo info = blocks.get(b);
if (info == null)//不存在该区块元数据信息

return false;

//从数据节点区块列表中移除区块,从区块信息BlockInfo中移除数据节点,即解除区块和数据节点的双向关系
boolean removed = node.removeBlock(info);


//区块不存在任何数据节点上,且不属于一个文件,从blocks中移除
if (info.getDatanode(0) == null && info.inode == null) {

blocks.remove(b); // remove block from the map
}
return removed;
}

如上,通过DatanodeDescriptor的removeBlock方法从数据节点区块列表中移除区块,并从对应区块信息BlockInfo中移除数据节点,即解除区块和数据节点的双向关系。解除双向关系后,区块不存在任何数据节点上,且不属于一个文件,从blocks中移除,即所有区块副本和区块对应文件已经删除了,从blocks中移除。

1
2
3
4
boolean removeBlock(BlockInfo b) {
blockList = b.listRemove(blockList, this);//从数据节点区块列表中移除区块
return b.removeNode(this);//BlockInfo中移除数据节点
}

如上,blockList是数据节点DatanodeDescriptor的区块列表头,为BlockInfo类型。首先,将区块从数据节点区块列表中移除,如果区块是区块列表头,设置数据节点的区块列表头为该区块的下一个区块,此时通过数据节点的区块链表已经找不到该区块了。接着从区块BlockInfo的triplets中移除数据节点相关信息,这样通过BlockInfo也找不到数据节点的信息了,双向解除了区块和数据节点的关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
BlockInfo listRemove(BlockInfo head, DatanodeDescriptor dn) {
if(head == null)
return null;
int dnIndex = this.findDatanode(dn);//查找数据节点所在索引
if(dnIndex < 0) //区块不再数据节点列表中
return head;

BlockInfo next = this.getNext(dnIndex);//获取该区块在数据节点区块列表的下一个区块
BlockInfo prev = this.getPrevious(dnIndex);//该区块在数据节点列表中前一个区块

//当前区块下一个区块和前一个区块(triplets中)置空,即从数据节点区块列表中移除
this.setNext(dnIndex, null);
this.setPrevious(dnIndex, null);

//重新连接数据节点区块列表
if(prev != null)
prev.setNext(prev.findDatanode(dn), next);
if(next != null)
next.setPrevious(next.findDatanode(dn), prev);

//如果当前要移除的区块为区块列表头,则区块列表头置为当前区块移除前的下一个区块
if(this == head) // removing the head
head = next;
return head;
}

如上,主要是BlockInfo和DatanodeDescriptor的相关操作,相关见NameNode实现源码分析—数据块和数据节点相关数据结构和线程
获取区块在数据节点区块列表的下一个和前一个区块后,将区块从列表中移除,然后重新连接链表。如果当前区块为数据节点区块链表头即blockList,则重新设置blockList为区块移除前在链表中的下一个区块。

从数据节点区块列表中移除后,只是解除了数据节点区块链表到区块的单向关系,即通过数据节点区块链表找不到该区块了,不过在区块的的triplets中还记录者数据节点的信息,因此需要移除,如前面DatanodeDescriptor的removeBlock方法中,通过BlockInfo的removeNode方法完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
boolean removeNode(DatanodeDescriptor node) {
int dnIndex = findDatanode(node);//要移除的数据节点在triplets中索引
if(dnIndex < 0) //不存在该数据节点信息
return false;
//此时区块应该从数据节点的区块链表中移除了
assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
"Block is still in the list and must be removed first.";

int lastNode = numNodes()-1;
//使用最后一个数据节点代替当前节点
setDatanode(dnIndex, getDatanode(lastNode));
setNext(dnIndex, getNext(lastNode));
setPrevious(dnIndex, getPrevious(lastNode));

//替代后,最后索引位置的相关信息置null
setDatanode(lastNode, null);
setNext(lastNode, null);
setPrevious(lastNode, null);
return true;
}

如上,从BlockInfo中移除数据节点信息时,保证该区块已经从数据节点的区块链表中移除了。然后使用最后索引的triplets三个引用(数据节点,下一个区块,前一个区块)代替当前索引位置的相关引用,然后设置最后索引位置的引用为null,这样来移除当前索引位置的数据节点信息。

到这里BlocksMap.removeNode分析完了,回到1.2.2 removeStoredBlock的下一步骤,更新区块在neededReplications中状态

3.2.2 updateNeededReplications

区块对应的文件还存在,即只是移除了一个区块副本,需要更新neededReplications中的状态。如果原来副本数满足要求,则因为删除了一个副本,需要添加区块到neededReplications中,或者原来neededReplications中存在区块记录(副本数没有达到期望值),则更新区块在neededReplications中所在的优先级队列。

1
2
3
4
5
6
synchronized void updateNeededReplications(Block block, int curReplicasDelta, int expectedReplicasDelta) {
NumberReplicas repl = countNodes(block);
int curExpectedReplicas = getReplication(block);
neededReplications.update(block, repl.liveReplicas(), repl.decommissionedReplicas(),
curExpectedReplicas, curReplicasDelta, expectedReplicasDelta);
}

如上,参数curReplicasDelta为当前副本的改变值,小于0表示副本减少,大于0副本数增加,等于0当前副本数不变。参数expectedReplicasDelta表示期望的副本数改变值,数值意义和curReplicasDelta类似。removeStoredBlock中传入的第二个参数为-1,对应删除区块,副本数减少一个。

首先通过countNodes统计区块的副本数情况,包括损坏的(在corruptReplicas中),处于撤销的(对应副本所在数据节点处于DECOMMISSION_INPROGRESS或DECOMMISSIONED状态),超过期望副本值的(在excessReplicateMap中),这些都不是有效副本数,除这些外的副本数即为有效的副本数。
然后获取区块的期望副本数,使用neededReplications的update方法更新在neededReplications的状态,如上,传入有效副本数,撤销副本数等信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
synchronized void update(Block block, int curReplicas, int decommissionedReplicas,
int curExpectedReplicas, int curReplicasDelta, int expectedReplicasDelta) {
int oldReplicas = curReplicas-curReplicasDelta;//操作之前的副本数
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;//操作前期望副本数
int curPri = getPriority(block, curReplicas, decommissionedReplicas, curExpectedReplicas);//当前优先级
int oldPri = getPriority(block, oldReplicas, decommissionedReplicas, oldExpectedReplicas);//之前优先级
...//日志
if(oldPri != LEVEL && oldPri != curPri) {//从之前优先级队列中移除区块记录
remove(block, oldPri);
}
if(curPri != LEVEL && priorityQueues.get(curPri).add(block)) {
...//日志
}
}

如上,首先根据之前的副本数和期望副本数计算在队列中优先级,根据当前副本数和期望副本数计算现在在队列中应该所属的优先级,当前副本数和之前副本数差值即为curReplicasDelta,而当前期望副本数和之前期望副本数差值即为expectedReplicasDelta,优先级计算见NameNode实现源码分析—数据块和数据节点相关数据结构和线程的1.8小节。
计算了优先级之后,首先从原来优先级队列中移除区块记录,即如果原来区块副本数满足要求,肯定不存在neededReplications中移除操作无效,而如果原来区块在neededReplications中,则先移除。
然后如果新的优先级在[0,2]范围内,需要添加到优先级队列中。则添加。
这样便完成了更新操作。

到这里removeStoredBlock方法分析完成,回到removeDatanode中,通过removeStoredBlock移除所有数据节点区块后,调用unprotectedRemoveDatanode移除数据节点

3.3 unprotectedRemoveDatanode

1
2
3
4
5
6
void unprotectedRemoveDatanode(DatanodeDescriptor nodeDescr) {
nodeDescr.resetBlocks();
removeFromInvalidates(nodeDescr.getStorageID());
NameNode.stateChangeLog.debug("BLOCK* unprotectedRemoveDatanode: "
+ nodeDescr.getName() + " is out of service now");
}

如上,首先通过unprotectedRemoveDatanode重置数据节点的成员变量

1
2
3
4
5
6
7
8
void resetBlocks() {
this.capacity = 0;//容量
this.remaining = 0;//剩余容量
this.dfsUsed = 0;//使用量
this.xceiverCount = 0;//流操作活动线程数
this.blockList = null;//区块链表
this.invalidateBlocks.clear();//待删除区块列表
}

如上,resetBlocks中重置基本成员和invalidateBlocks,清除invalidateBlocks,即当前需要删除的区块失效,因为我们已经将所有的区块都移除了。

然后通过removeFromInvalidates从recentInvalidateSets中移除该数据节点的待删除区块元数据

1
2
3
4
void removeFromInvalidates(String storageID) {
Collection<Block> blocks = recentInvalidateSets.remove(storageID);
if (blocks != null) { pendingDeletionBlocksCount -= blocks.size(); }
}

到这里,removeDatanode方法已经分析完了,总结下NameNode中与数据块相关的元数据的相关清理工作。

  • 首先从heartbeats中移除了数据节点记录
  • removeStoredBlock中,更新了blocksMapneededReplicationsexcessReplicateMapcorruptReplicas中与数据节点中数据块相关元数据
  • unprotectedRemoveDatanode中,更新了recentInvalidateSets中与数据节点相关元数据(移除)

因此,NameNode中相关元数据除了pendingReplications和datanodeMap都已经处理了。
那么在pendingReplications中的区块记录怎么办,数据节点移除了,正在执行的复制操作失败,则在pendingReplications相关的ReplicationMonitor线程中会判断复制超时,从而重新添加到neededReplications中,下次处理时重新选择复制的源数据节点和目标数据节点执行复制。
datanodeMap通过refreshNodes读取配置文件,进行相应的操作。


疑问

DatanodeDescriptor中有replicateBlocksrecoverBlocksinvalidateBlocks三个对应的数据结构,分别管理需要复制的区块,需要恢复的区块和需要删除的区块,在下次心跳到来时会发送相应的命令。
HeartbeatMonitor线程中,发现数据节点长时间没有发送心跳信息,判断为死亡移除该数据节点。从removeDatanode的整个流程来看,在resetBlocks中已经清理了invalidateBlocks,那么replicateBlocks和recoverBlocks为什么不清理呢?
数据节点已经移除了,当然不会继续添加区块到replicatedBlocks和recoverBlocks中了,但是现存的为什么不进行清理呢?
如果说要等待复制或者恢复操作完成,那么该数据节点已经死了,不可能完成这些操作(对应的也没有设置该数据节点为DECOMMISSION_INPROGRESS)。还是说数据节点死了,这些不会用到了,也就没必要清理了,感觉有点乱。
可能解答:
对于replicateBlocks中区块,数据节点被移除后,如果通过refreshNodes将数据节点状态设置为DECOMMISSION_INPROGRESS,这样便会继续执行未完成的复制操作。而如果没有继续执行未完成的复制操作,则只是这些由NameNode下达的复制操作命令执行失败,在NameNode的pendingReplications相关线程ReplicationMonitor中会判断复制失败从而重新添加到neededReplications中(见NameNode实现源码分析—数据块和数据节点相关数据结构和线程),从而重新选择源数据节点和目标数据节点进行重新复制。
同样的恢复操作相应的如果为执行成功,则NameNode端判断失败时也应该做相应处理(待分析)