Hadoop版本:Hadoop-1.2.1
参考:《Hadoop技术内幕-深入解析Hadoop Common和HDFS架构设计与实现原理》
数据节点会周期性通过DatanodeProtocol代理向NameNode发送心跳,在NameNode中更新数据节点的活动时间,如果数据节点长时间没有心跳到来,则NameNode会判断该数据节点死亡,由HeartbeatMonitor执行该扫描操作
1. HeartbeatMonitor
HeartbeatMonitor线程用于周期性检查注册的数据节点心跳和更新访问令牌信息。HeartbeatMonitor线程初始化如下1
2this.hbthread = new Daemon(new HeartbeatMonitor());
hbthread.start();
HeartbeatMonitor包含两个成员1
2private long lastHeartbeatCheck;//上次心跳检查时间
private long lastAccessKeyUpdate;//上次访问令牌更新时间
线程主程序如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public void run() {
while (fsRunning) {
try {
long now = now();
if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {//达到心跳检查时间,心跳检查
heartbeatCheck();
lastHeartbeatCheck = now;
}
//使能了口令检查且达到了检查时间,口令更新
if (isAccessTokenEnabled && (lastAccessKeyUpdate + accessKeyUpdateInterval < now)) {
updateAccessKey();
lastAccessKeyUpdate = now;
}
} catch (Exception e) {
FSNamesystem.LOG.error(StringUtils.stringifyException(e));
}
try {
Thread.sleep(5000); // 5 seconds
} catch (InterruptedException ie) {
}
}
}
如上,每5s查看是否达到了心跳检查时间,心跳检查周期为FSNamesystem成员heartbeatRecheckInterval
,配置项heartbeat.recheck.interval
,默认5min。
如果配置了执行口令更新操作,即配置项dfs.block.access.token.enable
(默认为false),则每过dfs.block.access.key.update.interval
(默认10h)更新一次数据节点访问口令。这里不分析口令的更新。
心跳检查由heartbeatCheck完成,检查完后更新lastHeartbeatCheck上次检查时间
2. heartbeatCheck
1 | void heartbeatCheck() { |
如上,会循环判断并移除过期(expired)或者说死亡(dead,以下说的过期节点都等同死亡节点)节点,直到没有过期节点,每次移除一个过期节点。
在移除过期节点的过程中会持有heartbeats对象锁,如果在持有该锁过程中处理多个过期节点,则因为更新数据节点的心跳时间也需要持有heartbeats锁(handleHeartbeat中),这样多个数据节点的移除时间过长,数据节点心跳到来时长时间获取不到heartbeats锁,长时间更新不了自己的心跳时间,可能会导致越来越多的数据节点误判为过期节点。因此每次只移除一个过期节点,多次判断处理,直到所有的过期节点移除完,如上。
判断数据节点是否过期由isDatanodeDead完成1
2
3private boolean isDatanodeDead(DatanodeDescriptor node) {
return (node.getLastUpdate() < (now() - heartbeatExpireInterval));
}
即当前时间超出了上次心跳时间加上数据节点过期周期,过期周期由FSNamesystem成员heartbeatExpireInterval
,初始化如下1
2long heartbeatInterval = conf.getLong("dfs.heartbeat.interval", 3) * 1000;
this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval + 10 * heartbeatInterval;
如上,过期时间为2倍的心跳检查周期加上10倍的心跳周期heartbeatInterval,即默认情况下数据节点过期时间为2*5min+10*3s=10分钟30s。超过这个10min30s还没收到数据节点的心跳,则NameNode判断该数据节点死亡。
另外,每次循环中还会统计当前陈旧(stale)的数据节点,更新FSNamesystem的成员numStaleNodes
,是否成旧由DatanodeDescriptor的isStale方法判断,成旧时间为FSNamesystem成员staleInterval
,为配置项dfs.namenode.stale.datanode.interval
,默认为30s,最小为dfs.namenode.stale.datanode.minimum.interval
(默认3)倍的心跳周期,即最小为3*3s=9s,默认情况下,NameNode至少要经过三个心跳周期,如果还没收到数据节点心跳,才会判定该数据节点成旧。
对于过期节点,通过removeDatanode
移除,移除过期节点的过程中,需要获取heartbeats锁
3. removeDatanode
1 | private void removeDatanode(DatanodeDescriptor nodeInfo) { |
如上,如果当前节点还是存活状态,则更新集群的数据统计信息(如使用量,剩余量等),然后从heartbeats中移除该数据节点,设置数据节点的isAlive为false。
然后通过removeStoredBlock移除该数据节点上的所有区块,将区块信息从相应的数据结构中移除,通过unprotectedRemoveDatanode重置数据节点的成员,并从recentInvalidateSets中移除数据节点记录,从网络拓扑clusterMap中移除数据节点信息。总的来说就是从与数据节点和区块相关的数据节点中移除相关记录。
最后,因为移除了一个数据节点,要检查安全模式,如果满足条件要进入安全模式进行相关操作。安全模式后文会详细分析。
3.1 updateStats
updateStats根据传入的数据节点信息,更新集群中资源统计信息,数据节点可能加入到集群中,也可能从集群中移除,加入时第二个参数为true,移除时为false。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15private void updateStats(DatanodeDescriptor node, boolean isAdded) {
//统计信息被heartbeats锁保护,该方法调用前确保已经获得了heartbeats锁
assert(Thread.holdsLock(heartbeats));
if (isAdded) {//添加数据节点
capacityTotal += node.getCapacity();
capacityUsed += node.getDfsUsed();
capacityRemaining += node.getRemaining();
totalLoad += node.getXceiverCount();
} else {//移除数据节点
capacityTotal -= node.getCapacity();
capacityUsed -= node.getDfsUsed();
capacityRemaining -= node.getRemaining();
totalLoad -= node.getXceiverCount();
}
}
如上,添加数据节点时,根据添加的数据节点统计信息,相应的增加集群中总容量capacityTotal
,已使用容量capacityUsed
,剩余容量capacityRemaining
,总负载(流操作相关线程个数),而移除数据节点时减少对应的值。
3.2 removeStoredBlock
1 | synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) { |
如上,removeStoredBlock会更新数据节点和数据节点相关区块的大部分元数据信息。
首先从BlocksMap中更新记录,包括移除区块和数据节点的双向关系,如果BlocksMap中区块对应所有副本都不存在了且文件不存在了则会从BlocksMap移除区块记录。
然后通过updateNeededReplications
更新在neededReplications中的元数据,即如果之前副本数满足要求,在neededReplications中应该没有记录,这时删除了一个副本,副本数应该会小于期望值,则会添加到neededReplications中的相应队列中。而如果之前副本数本来不满足要求,存在于neededReplications中,这时副本数减少,则在neededReplications的相应优先级会增加,从低优先级队列中移到高优先级队列中。
然后从excessReplicateMap和corruptReplicas中移除相关元数据。
3.2.1 BlocksMap.removeNode
1 | boolean removeNode(Block b, DatanodeDescriptor node) { |
如上,通过DatanodeDescriptor的removeBlock方法从数据节点区块列表中移除区块,并从对应区块信息BlockInfo中移除数据节点,即解除区块和数据节点的双向关系。解除双向关系后,区块不存在任何数据节点上,且不属于一个文件,从blocks中移除,即所有区块副本和区块对应文件已经删除了,从blocks中移除。1
2
3
4boolean removeBlock(BlockInfo b) {
blockList = b.listRemove(blockList, this);//从数据节点区块列表中移除区块
return b.removeNode(this);//BlockInfo中移除数据节点
}
如上,blockList是数据节点DatanodeDescriptor的区块列表头,为BlockInfo类型。首先,将区块从数据节点区块列表中移除,如果区块是区块列表头,设置数据节点的区块列表头为该区块的下一个区块,此时通过数据节点的区块链表已经找不到该区块了。接着从区块BlockInfo的triplets中移除数据节点相关信息,这样通过BlockInfo也找不到数据节点的信息了,双向解除了区块和数据节点的关系。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25BlockInfo listRemove(BlockInfo head, DatanodeDescriptor dn) {
if(head == null)
return null;
int dnIndex = this.findDatanode(dn);//查找数据节点所在索引
if(dnIndex < 0) //区块不再数据节点列表中
return head;
BlockInfo next = this.getNext(dnIndex);//获取该区块在数据节点区块列表的下一个区块
BlockInfo prev = this.getPrevious(dnIndex);//该区块在数据节点列表中前一个区块
//当前区块下一个区块和前一个区块(triplets中)置空,即从数据节点区块列表中移除
this.setNext(dnIndex, null);
this.setPrevious(dnIndex, null);
//重新连接数据节点区块列表
if(prev != null)
prev.setNext(prev.findDatanode(dn), next);
if(next != null)
next.setPrevious(next.findDatanode(dn), prev);
//如果当前要移除的区块为区块列表头,则区块列表头置为当前区块移除前的下一个区块
if(this == head) // removing the head
head = next;
return head;
}
如上,主要是BlockInfo和DatanodeDescriptor的相关操作,相关见NameNode实现源码分析—数据块和数据节点相关数据结构和线程。
获取区块在数据节点区块列表的下一个和前一个区块后,将区块从列表中移除,然后重新连接链表。如果当前区块为数据节点区块链表头即blockList
,则重新设置blockList为区块移除前在链表中的下一个区块。
从数据节点区块列表中移除后,只是解除了数据节点区块链表到区块的单向关系,即通过数据节点区块链表找不到该区块了,不过在区块的的triplets
中还记录者数据节点的信息,因此需要移除,如前面DatanodeDescriptor的removeBlock方法中,通过BlockInfo的removeNode方法完成1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20boolean removeNode(DatanodeDescriptor node) {
int dnIndex = findDatanode(node);//要移除的数据节点在triplets中索引
if(dnIndex < 0) //不存在该数据节点信息
return false;
//此时区块应该从数据节点的区块链表中移除了
assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
"Block is still in the list and must be removed first.";
int lastNode = numNodes()-1;
//使用最后一个数据节点代替当前节点
setDatanode(dnIndex, getDatanode(lastNode));
setNext(dnIndex, getNext(lastNode));
setPrevious(dnIndex, getPrevious(lastNode));
//替代后,最后索引位置的相关信息置null
setDatanode(lastNode, null);
setNext(lastNode, null);
setPrevious(lastNode, null);
return true;
}
如上,从BlockInfo中移除数据节点信息时,保证该区块已经从数据节点的区块链表中移除了。然后使用最后索引的triplets三个引用(数据节点,下一个区块,前一个区块)代替当前索引位置的相关引用,然后设置最后索引位置的引用为null,这样来移除当前索引位置的数据节点信息。
到这里BlocksMap.removeNode分析完了,回到1.2.2 removeStoredBlock的下一步骤,更新区块在neededReplications中状态
3.2.2 updateNeededReplications
区块对应的文件还存在,即只是移除了一个区块副本,需要更新neededReplications中的状态。如果原来副本数满足要求,则因为删除了一个副本,需要添加区块到neededReplications中,或者原来neededReplications中存在区块记录(副本数没有达到期望值),则更新区块在neededReplications中所在的优先级队列。1
2
3
4
5
6synchronized void updateNeededReplications(Block block, int curReplicasDelta, int expectedReplicasDelta) {
NumberReplicas repl = countNodes(block);
int curExpectedReplicas = getReplication(block);
neededReplications.update(block, repl.liveReplicas(), repl.decommissionedReplicas(),
curExpectedReplicas, curReplicasDelta, expectedReplicasDelta);
}
如上,参数curReplicasDelta
为当前副本的改变值,小于0表示副本减少,大于0副本数增加,等于0当前副本数不变。参数expectedReplicasDelta
表示期望的副本数改变值,数值意义和curReplicasDelta类似。removeStoredBlock中传入的第二个参数为-1,对应删除区块,副本数减少一个。
首先通过countNodes统计区块的副本数情况,包括损坏的(在corruptReplicas中),处于撤销的(对应副本所在数据节点处于DECOMMISSION_INPROGRESS或DECOMMISSIONED状态),超过期望副本值的(在excessReplicateMap中),这些都不是有效副本数,除这些外的副本数即为有效的副本数。
然后获取区块的期望副本数,使用neededReplications的update方法更新在neededReplications的状态,如上,传入有效副本数,撤销副本数等信息1
2
3
4
5
6
7
8
9
10
11
12
13
14synchronized void update(Block block, int curReplicas, int decommissionedReplicas,
int curExpectedReplicas, int curReplicasDelta, int expectedReplicasDelta) {
int oldReplicas = curReplicas-curReplicasDelta;//操作之前的副本数
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;//操作前期望副本数
int curPri = getPriority(block, curReplicas, decommissionedReplicas, curExpectedReplicas);//当前优先级
int oldPri = getPriority(block, oldReplicas, decommissionedReplicas, oldExpectedReplicas);//之前优先级
...//日志
if(oldPri != LEVEL && oldPri != curPri) {//从之前优先级队列中移除区块记录
remove(block, oldPri);
}
if(curPri != LEVEL && priorityQueues.get(curPri).add(block)) {
...//日志
}
}
如上,首先根据之前的副本数和期望副本数计算在队列中优先级,根据当前副本数和期望副本数计算现在在队列中应该所属的优先级,当前副本数和之前副本数差值即为curReplicasDelta,而当前期望副本数和之前期望副本数差值即为expectedReplicasDelta
,优先级计算见NameNode实现源码分析—数据块和数据节点相关数据结构和线程的1.8小节。
计算了优先级之后,首先从原来优先级队列中移除区块记录,即如果原来区块副本数满足要求,肯定不存在neededReplications中移除操作无效,而如果原来区块在neededReplications中,则先移除。
然后如果新的优先级在[0,2]范围内,需要添加到优先级队列中。则添加。
这样便完成了更新操作。
到这里removeStoredBlock方法分析完成,回到removeDatanode
中,通过removeStoredBlock移除所有数据节点区块后,调用unprotectedRemoveDatanode
移除数据节点
3.3 unprotectedRemoveDatanode
1 | void unprotectedRemoveDatanode(DatanodeDescriptor nodeDescr) { |
如上,首先通过unprotectedRemoveDatanode重置数据节点的成员变量1
2
3
4
5
6
7
8void resetBlocks() {
this.capacity = 0;//容量
this.remaining = 0;//剩余容量
this.dfsUsed = 0;//使用量
this.xceiverCount = 0;//流操作活动线程数
this.blockList = null;//区块链表
this.invalidateBlocks.clear();//待删除区块列表
}
如上,resetBlocks中重置基本成员和invalidateBlocks,清除invalidateBlocks,即当前需要删除的区块失效,因为我们已经将所有的区块都移除了。
然后通过removeFromInvalidates从recentInvalidateSets中移除该数据节点的待删除区块元数据1
2
3
4void removeFromInvalidates(String storageID) {
Collection<Block> blocks = recentInvalidateSets.remove(storageID);
if (blocks != null) { pendingDeletionBlocksCount -= blocks.size(); }
}
到这里,removeDatanode方法已经分析完了,总结下NameNode中与数据块相关的元数据的相关清理工作。
- 首先从
heartbeats
中移除了数据节点记录 - removeStoredBlock中,更新了
blocksMap
,neededReplications
,excessReplicateMap
,corruptReplicas
中与数据节点中数据块相关元数据 - unprotectedRemoveDatanode中,更新了
recentInvalidateSets
中与数据节点相关元数据(移除)
因此,NameNode中相关元数据除了pendingReplications和datanodeMap都已经处理了。
那么在pendingReplications中的区块记录怎么办,数据节点移除了,正在执行的复制操作失败,则在pendingReplications相关的ReplicationMonitor线程中会判断复制超时,从而重新添加到neededReplications中,下次处理时重新选择复制的源数据节点和目标数据节点执行复制。
datanodeMap通过refreshNodes
读取配置文件,进行相应的操作。
疑问
DatanodeDescriptor中有replicateBlocks
,recoverBlocks
,invalidateBlocks
三个对应的数据结构,分别管理需要复制的区块,需要恢复的区块和需要删除的区块,在下次心跳到来时会发送相应的命令。
HeartbeatMonitor线程中,发现数据节点长时间没有发送心跳信息,判断为死亡移除该数据节点。从removeDatanode的整个流程来看,在resetBlocks中已经清理了invalidateBlocks,那么replicateBlocks和recoverBlocks为什么不清理呢?
数据节点已经移除了,当然不会继续添加区块到replicatedBlocks和recoverBlocks中了,但是现存的为什么不进行清理呢?
如果说要等待复制或者恢复操作完成,那么该数据节点已经死了,不可能完成这些操作(对应的也没有设置该数据节点为DECOMMISSION_INPROGRESS)。还是说数据节点死了,这些不会用到了,也就没必要清理了,感觉有点乱。
可能解答:
对于replicateBlocks中区块,数据节点被移除后,如果通过refreshNodes将数据节点状态设置为DECOMMISSION_INPROGRESS
,这样便会继续执行未完成的复制操作。而如果没有继续执行未完成的复制操作,则只是这些由NameNode下达的复制操作命令执行失败,在NameNode的pendingReplications相关线程ReplicationMonitor中会判断复制失败从而重新添加到neededReplications中(见NameNode实现源码分析—数据块和数据节点相关数据结构和线程),从而重新选择源数据节点和目标数据节点进行重新复制。
同样的恢复操作相应的如果为执行成功,则NameNode端判断失败时也应该做相应处理(待分析)