DataNode实现源码分析

DataNode源码分析系列文章有DataNode启动DataNode本地存储管理DataNode流接口实现上篇DataNode流接口实现下篇,本文也属于其中一篇,是对上述的补充。


1. 数据块恢复

HDFS节点的VersionedProtocol实现中已经分析过了,与DataNode有关的RPC协议有:DatanodeProtocol(数据节点和名字节点),InterDatanodeProtocol(数据节点之间),ClientDatanodeProtocol(Client和数据节点之间)。
DataProtocol为数据节点和名字节点间通信使用,NameNode使用。因此在DataNode中实现InterDatanodeProtocol和ClientDatanodeProtocol两个协议,分别为其他数据节点和Client提供服务。
HDFS节点的VersionedProtocol实现可知,InterDatanodeProtocol和ClientDatanodeProtocol提供了简单的几个方法,都与数据块恢复有关。
因为在DataXceiver和BlockReceiver的实现中,没有对写数据过程中出现的异常情况进行过多的处理,一般来说,关闭到上游节点的Socket连接,由上游节点检测错误并发送携带错误信息的确认包。这样简化了数据节点写请求处理的实现,但把故障恢复工作转移给了客户端或名字节点,当它们发现某次写操作出现错误以后,需要进行数据块恢复。(参考技术内幕P309)

当客户端发现写操作异常,进行错误恢复时,选择一个主数据节点,通过ClientDatanodeProtocol中的recoverBlock方法开始错误恢复,recoverBlock在DataNode中的实现如下

1
2
3
4
5
public LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets) throws IOException {
logRecoverBlock("Client", block, targets);//记录日志
checkBlockToken(block, BlockTokenSecretManager.AccessMode.WRITE);//检查是否有权执行该操作
return recoverBlock(block, keepLength, targets, false);
}

如上,记录日志并检查访问权限后,通过4参数的recoverBlock进行恢复
首先检查该区块是否正在修复过程中,因为客户端和NameNode都可能对区块进行修复

1
2
3
4
5
6
7
8
synchronized (ongoingRecovery) {
if (ongoingRecovery.get(block.getWithWildcardGS()) != null) {
String msg = block + " is already being recovered, " + " ignoring this request to recover it.";
LOG.info(msg);
throw new IOException(msg);
}
ongoingRecovery.put(block, block);
}

如上,正在恢复的区块保存在DataNode成员ongoingRecovery中,如果没有则添加到该成员中。
然后读取targets指定的各DataNode上对应的区块信息,targets包含本节点,对于非本节点的DataNode创建InterDatanodeProtocol代理,通过代理调用startBlockRecovery方法获取其他DataNode上恢复区块的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
int errorCount = 0;
int rbwCount = 0;
int rwrCount = 0;

List<BlockRecord> blockRecords = new ArrayList<BlockRecord>();
for (DatanodeInfo id : targets) {

try {
//其他节点创建InterDatanodeProtocol代理
InterDatanodeProtocol datanode = dnRegistration.equals(id) ? this
: DataNode.createInterDataNodeProtocolProxy(id, getConf(), socketTimeout, connectToDnViaHostname);
//通过startBlockRecovery获得所有参与恢复的DataNode上的区块信息
BlockRecoveryInfo info = datanode.startBlockRecovery(block);
if (info == null) {//不存在,该节点将不参与后续的恢复过程

LOG.info("No block metadata found for " + block + " on datanode "+ id);
continue;
}
if (info.getBlock().getGenerationStamp() < block.getGenerationStamp()) {
LOG.info("Only old generation stamp " + info.getBlock().getGenerationStamp()
+ " found on datanode " + id + " (needed block=" + block + ")");
continue;
}
blockRecords.add(new BlockRecord(id, datanode, info));
if (info.wasRecoveredOnStartup()) {
rwrCount++;
} else {
rbwCount++;
}
} catch (IOException e) {
++errorCount;
InterDatanodeProtocol.LOG.warn("Failed to getBlockMetaDataInfo for block (=" + block
+ ") from datanode (=" + id + ")", e);
}
}

如上,通过startBlockRecovery方法获取各节点待修复区块的信息,如果不是本节点需要通过RPC获取其他节点上的区块信息,对于不存在或者不符合要求的数据节点,讲不会参与到后续的区块修复过程中。符合要求的数据节点和区块信息创建BlockRecord对象添加到列表中。BlockRecord对象存储了数据节点以及对应的区块信息,比较简单。

1.1 startBlockRecovery

startBlockRecovery是InterDatanodeProtocol中的方法,如上创建代理获取其他节点上关于待修复区块的信息,实现如下

1
2
3
public BlockRecoveryInfo startBlockRecovery(Block block) throws IOException { 
return data.startBlockRecovery(block.getBlockId());
}

如上,通过FSDataset对象的startBlockRecovery方法获取区块恢复信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public BlockRecoveryInfo startBlockRecovery(long blockId) throws IOException {
Block stored = getStoredBlock(blockId);//获取Block
if (stored == null) { return null; }//如果不存在直接返回null

//如果目前该区块上有活动线程,中断并等待返回
while (true) {
DataNode.LOG.debug("Interrupting active writer threads for block " + stored);
List<Thread> activeThreads = getActiveThreads(stored);
if (activeThreads == null) break;
if (interruptAndJoinThreads(activeThreads))
break;
}

synchronized (this) {
ActiveFile activeFile = ongoingCreates.get(stored);
boolean isRecovery = (activeFile != null) && activeFile.wasRecoveredOnStartup;

BlockRecoveryInfo info = new BlockRecoveryInfo(stored, isRecovery);
...//日志记录
//对该区块进行验证
validateBlockMetadata(stored);
return info;
}
}

如上,首先获取Block对象,不存在该区块直接返回null。
存在该区块的话,如果当前有活动的线程在对该区块进行操作,中断并等待线程返回。这些活动的线程应该是该数据块的写数据线程,BlockReceiver对应的线程,区块修复时必须中断这些线程并等待它们退出,保证后续的修复操作不受写操作影响。
然后便能根据区块信息创建BlockRecoveryInfo对象,其中的wasRecoveredOnStartup字段和Linux Ext3文件系统日志相关,Linux系统重启,文件系统恢复过程中,非正常关闭的处于写入过程中的文件可能会被截断,wasRecoverOnStartup用于标记这种情况。(技术内幕P311)
处于恢复状态的数据块文件和校验文件可能处于不一致的状态,因此需要通过validateBlockMetadata进行验证,validateBlockMetadata具体代码不再分析,不过它不会重新计算区块文件每个数据块的校验和与校验文件比较,而是简便的看文件是否存在,时间戳是否匹配,文件长度是否匹配来判断两个文件是否一致。

获取了参与修复过程中符合条件数据节点上区块信息后,要根据recoverBlock传入的第二个参数keepLength重新计算恢复后区块的长度。
如果keepLength为true,则只有区块长度和待修复区块长度一致的数据节点才会继续参与修复,否则修复后区块长度为所有符合条件数据节点区块长度最短值,即截断到最小区块长度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
boolean shouldRecoverRwrs = (rbwCount == 0);
List<BlockRecord> syncList = new ArrayList<BlockRecord>();
long minlength = Long.MAX_VALUE;


for (BlockRecord record : blockRecords) {
BlockRecoveryInfo info = record.info;
assert (info != null && info.getBlock().getGenerationStamp() >= block.getGenerationStamp());
if (!shouldRecoverRwrs && info.wasRecoveredOnStartup()) {

LOG.info("Not recovering replica " + record + " since it was recovered on "
+ "startup and we have better replicas");
continue;
}
if (keepLength) {//如果keepLength,则只有区块长度一致的DataNode才参与修复
if (info.getBlock().getNumBytes() == block.getNumBytes()) {
syncList.add(record);
}

} else {//keepLength为false,截断到最短长度
syncList.add(record);
if (info.getBlock().getNumBytes() < minlength) {

minlength = info.getBlock().getNumBytes();
}
}
}

如果满足条件的数据节点数不存在,且在通过startBlockRecovery获取区块信息的过程中抛出了异常(最后的验证阶段,即区块已损坏),则抛出异常,修复失败

1
2
3
4
if (syncList.isEmpty() && errorCount > 0) {
throw new IOException("All datanodes failed: block=" + block
+ ", datanodeids=" + Arrays.asList(targets));
}

否则,通过重新计算的区块长度更新待修复区块长度,然后syncBlock方法同步满足条件的数据节点上该区块,最后从ongoingRecovery中移除该区块记录

1
2
3
4
5
6
7
8
9
10
if (!keepLength) {
block.setNumBytes(minlength);
}
return syncBlock(block, syncList, targets, closeFile);
...
finally {
synchronized (ongoingRecovery) {
ongoingRecovery.remove(block);
}
}

1.2 syncBlock

首先,判断参与同步的节点数是否为0,如果没有参与同步的节点,则表明该区块应该被删除了,通知NameNode删除该区块

1
2
3
4
5
6
7
8
9
10
if (syncList.isEmpty()) {
namenode.commitBlockSynchronization(block, 0, 0, closeFile, true, DatanodeID.EMPTY_ARRAY);
//always return a new access token even if everything else stays the same
LocatedBlock b = new LocatedBlock(block, targets);
if (isBlockTokenEnabled) {
b.setBlockToken(blockTokenSecretManager.generateToken(null, b.getBlock(),
EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
}

return b;
}

如上,通过NameNode代理调用commitBlockSynchronization通知名字节点删除该节点,参数依次为:block(要删除的节点),0(newgenerationstamp),0(newlength),closeFile,true(deleteblock),DataNodeID.EMPTY_ARRAY(newtargets),因此对应的操作为通知NameNode删除该区块。

如果有参与同步的数据节点,则首先向名字节点申请一个新的generationStamp,作为恢复后数据块的版本号,新的版本号可以防止出现故障的数据节点重新启动后,上报过时的数据块。
根据之前更新的数据块长度和申请到的generationStamp构建新的Block,然后通过InterDatanodeProtocol代理调用updateBLock方法在所有参与同步的数据节点上更新该区块。
如下

1
2
3
4
5
6
7
8
9
10
11
12
List<DatanodeID> successList = new ArrayList<DatanodeID>();
long generationstamp = namenode.nextGenerationStamp(block, closeFile);//申请新的generationStamp
Block newblock = new Block(block.getBlockId(), block.getNumBytes(), generationstamp);//构建新的Block

for(BlockRecord r : syncList) {
try {
r.datanode.updateBlock(r.info.getBlock(), newblock, closeFile);//在所有参与同步的DataNode中更新区块
successList.add(r.id);
} catch (IOException e) {
InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock=" + newblock + ", datanode=" + r.id + ")", e);
}
}

如果有任何节点更新成功,则通知NameNode该区块已经更新,并创建该区块的LocatedBlock对象返回,包含目前最新的该区块所在的数据节点信息,而如果所有参与同步的节点都更新失败,则抛出异常,修复失败

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
if (!successList.isEmpty()) {
DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
//通知名字节点节点更新,包含新的区块generationStamp,新的长度,新的数据节点
namenode.commitBlockSynchronization(block,
newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false, nlist);
DatanodeInfo[] info = new DatanodeInfo[nlist.length];
for (int i = 0; i < nlist.length; i++) {//创建新的DatanodeInfo
info[i] = new DatanodeInfo(nlist[i]);
}
LocatedBlock b = new LocatedBlock(newblock, info); //创建新的LocatedBlock对象,包含新的数据节点位置
if (isBlockTokenEnabled) {
b.setBlockToken(blockTokenSecretManager.generateToken(null, b.getBlock(),
EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
}
return b;
}

//失败,抛出异常,修复失败
StringBuilder b = new StringBuilder();
for(BlockRecord r : syncList) {
b.append("\n " + r.id);
}
throw new IOException("Cannot recover " + block + ", none of these "
+ syncList.size() + " datanodes success {" + b + "\n}");

如上,这里的commitBlockSynchronization可与上面通知NameNode删除进行对比,发送给了NameNode新的generationStamp,新的长度,新的数据节点位置。
更新成功后,创建新的LocatedBlock对象,保存该区块所在DataNode的信息。

1.3 updateBlock

1
2
3
4
5
6
7
8
9
10
11
public void updateBlock(Block oldblock, Block newblock, boolean finalize) throws IOException {
.../日志记录
data.updateBlock(oldblock, newblock);
if (finalize) {
data.finalizeBlockIfNeeded(newblock);
myMetrics.incrBlocksWritten();
notifyNamenodeReceivedBlock(newblock, EMPTY_DEL_HINT);
LOG.info("Received " + newblock + " of size " + newblock.getNumBytes() +
" as part of lease recovery");
}
}

如上,通过FSDataset更新区块,根据传入的参数closeFile来决定是否提交到current目录中,提交后将区块加入到DataNode的receivedBlockList中,在offerService中通知NameNode接收到新的区块
FSDataset中的updateBlock如下

1
2
3
4
5
6
7
8
9
10
11
public void updateBlock(Block oldblock, Block newblock) throws IOException {
...//检查合法性,要求Id一致,且新的generationStamp不能小于旧的generationStamp
//如果两者generationStamp一样则大小也要一样,不合法抛出异常,不能更新
for(;;) {
final List<Thread> threads = tryUpdateBlock(oldblock, newblock);
if (threads == null) {
return;
}
interruptAndJoinThreads(threads);
}
}

如上,首先检查合法性,然后通过tryUpdateBlock执行更新操作,如果旧的区块有活动的线程正在处理,会直接返回活动的线程不进行更新操作,因此返回活动线程后等待线程结束。而没有活动线程在处理旧区块时,会进行更新操作,更新完成返回null

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private synchronized List<Thread> tryUpdateBlock(Block oldblock, Block newblock) throws IOException {
//如果有活动线程在处理旧区块,返回活动线程,不进行更新操作
ArrayList<Thread> activeThreads = getActiveThreads(oldblock);
if (activeThreads != null) {
return activeThreads;
}

//No ongoing create threads is alive. Update block.
//查找对应的区块文件,先通过ongoingCreates查找活动文件(即在临时目录中),没有通过volumeMap查找提交的文件
File blockFile = findBlockFile(oldblock.getBlockId());
if (blockFile == null) {
throw new IOException("Block " + oldblock + " does not exist.");
}
File oldMetaFile = findMetaFile(blockFile);//对应的校验文件
long oldgs = parseGenerationStamp(blockFile, oldMetaFile);

...//更新操作的合法性检验,要求旧的generationStamp不能大于新的generationStamp,新的区块长度不能大于旧的区块

//将校验文件重命名为一个临时文件
File tmpMetaFile = new File(oldMetaFile.getParent(),
oldMetaFile.getName()+"_tmp" + newblock.getGenerationStamp());
if (!oldMetaFile.renameTo(tmpMetaFile)){
throw new IOException("Cannot rename block meta file to " + tmpMetaFile);
}
//如果新区块长度小于旧区块,截断区块文件和校验文件,使得符合新区块长度
if (newblock.getNumBytes() < oldblock.getNumBytes()) {
truncateBlock(blockFile, tmpMetaFile, oldblock.getNumBytes(), newblock.getNumBytes());
}
//重命名临时文件回新的校验文件
File newMetaFile = getMetaFile(blockFile, newblock);
if (!tmpMetaFile.renameTo(newMetaFile)) {
throw new IOException("Cannot rename tmp meta file to " + newMetaFile);
}
//更新ongoingCreates和volumeMap中记录
updateBlockMap(ongoingCreates, oldblock, newblock);
updateBlockMap(volumeMap, oldblock, newblock);
//验证新的区块文件和校验文件是否一致(主要检查长度,不会重新计算校验和)
validateBlockMetadata(newblock);
return null;
}

如上,更新的过程,其实最多就是截断,因为新的区块文件不会小于旧的区块文件,长度一致时不需要操作,否则进行截断操作。更新后要相应的更新ongoingCreates和volumeMap中的记录。


2. 数据节点和名字节点交互

数据节点通过DatanodeProtocol和名字节点交互,在DataNode中有DatanodeProtocol接口的代理成员namenode,通过该代理调用NameNode方法执行交互。
在目前DataNode的实现分析过程中,已经涉及到了很多交互过程

  • blockReceived,Client写数据块或者数据块拷贝,数据块修复等过程中接收到新的数据块后会添加到DataNode的成员receivedBlockList中,在DataNode线程主服务程序offerService中上报给NameNode,见DataNode启动
  • reportedBadBlocks,在流接口数据操作等过程或者区块扫描器发现区块损坏时,报告给NameNode
  • commitBlockSynchronization,如本文中前面数据块修复过程中,通知NameNode删除区块或通知NameNode区块更新
  • nextGenerationStamp,如本问前面数据块修复过程中,需要申请新的generationStamp作为修复后区块的generationStamp信息,还有在Client写区块时创建新的区块也要申请generationStamp
  • versionRequest,在DataNode启动开始时,向NameNode握手,handshake方法中获取NameNode版本信息(startDataNode)
  • register,还是在DataNode启动中,创建完DataNode对象后,启动服务线程之前,向NameNode注册,具体在runDatanodeDaemon方法中
  • blocksBeingWrittenReportDataNode启动时,在向NameNode的注册过程中,如果允许同步blocksBeingWritten目录下的区块(配置项dfs.durable.sync,默认true),则通过该方法向NameNode报告该目录下的区块
  • sendHeartbeat,在DataNode的主服务offerService中定期向NameNode发送心跳信息,包括DataNode注册信息(标识报告者身份),总容量,已使用容量,剩余容量,xceiver线程数目等信息
  • blockReport,在offerService中定期向NameNode报告当前区块信息,报告周期为成员blockReportInterval(配置项dfs.blockreport.intervalMsec,默认1h),而本地的区块通过FSDataset中的AsyncBlockReport线程负责扫描,关于AsyncBlockReport见DataNode本地存储管理
  • errorReport,handshake时,从NameNode获取到的版本信息与DataNode不匹配,或者读本地区块发现不存在或磁盘错误时,都需要向NameNode报告错误

2.1 processCommand

此外,对于像sendHeartbeatblockReport向NameNode报告信息后,DataNode需要根据NameNode返回的DatanodeCommand执行相应的命令,在offerService中通过processCommand方法处理到来的命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private boolean processCommand(DatanodeCommand[] cmds) {
if (cmds != null) {
for (DatanodeCommand cmd : cmds) {
try {
if (processCommand(cmd) == false) {
return false;
}
} catch (IOException ioe) {
LOG.warn("Error processing datanode Command", ioe);
}
}
}
return true;
}

DatanodeCommand有多种实现,具体见HDFS相关实体对象
processCommand重载,处理一个命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
private boolean processCommand(DatanodeCommand cmd) throws IOException {
if (cmd == null)
return true;
final BlockCommand bcmd = cmd instanceof BlockCommand? (BlockCommand)cmd: null;

switch(cmd.getAction()) {
case DatanodeProtocol.DNA_TRANSFER://拷贝区块到其他节点
transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
myMetrics.incrBlocksReplicated(bcmd.getBlocks().length);
break;
case DatanodeProtocol.DNA_INVALIDATE://删除区块
Block toDelete[] = bcmd.getBlocks();
try {
if (blockScanner != null) {
blockScanner.deleteBlocks(toDelete);
}
data.invalidate(toDelete);
} catch(IOException e) {
// Exceptions caught here are not expected to be disk-related.
throw e;
}
myMetrics.incrBlocksRemoved(toDelete.length);
break;
case DatanodeProtocol.DNA_SHUTDOWN://关闭数据节点
this.shutdown();
return false;
case DatanodeProtocol.DNA_REGISTER://重新注册
LOG.info("DatanodeCommand action: DNA_REGISTER");
if (shouldRun) {
register();
}
break;
case DatanodeProtocol.DNA_FINALIZE://提交更新
storage.finalizeUpgrade();
break;
case UpgradeCommand.UC_ACTION_START_UPGRADE:
processDistributedUpgradeCommand((UpgradeCommand)cmd);
break;
case DatanodeProtocol.DNA_RECOVERBLOCK://开始恢复区块
recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());
break;
case DatanodeProtocol.DNA_ACCESSKEYUPDATE://更新访问令牌
LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
if (isBlockTokenEnabled) {
blockTokenSecretManager.setKeys(((KeyUpdateCommand) cmd).getExportedKeys());
}
break;
case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE://更新宽带均衡器带宽
LOG.info("DatanodeCommand action: DNA_BALANCERBANDWIDTHUPDATE");
int vsn = ((BalancerBandwidthCommand) cmd).getBalancerBandwidthVersion();
if (vsn >= 1) {
long bandwidth = ((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue();
if (bandwidth > 0) {
DataXceiverServer dxcs = (DataXceiverServer) this.dataXceiverServer.getRunnable();
dxcs.balanceThrottler.setBandwidth(bandwidth);
}
}
break;
default:
LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
}
return true;
}

如上,针对不同的指令,有不同的操作。包括区块相关的拷贝(DNA_TRANSFER),删除(DNA_INVALIDATE),恢复(DNA_RECOVERBLOCK);关闭数据节点(DNA_SHUTDOWN),重新注册(DNA_REGISTER),提交更新(DNA_FINALIZE),开始更新(UC_ACTION_START_UPGRADE)指令,更新访问令牌(DNA_ACCESSKEYUPDATE),更新宽带均衡器带宽(DNA_BALANCERBANDWIDTHUPDATE)

2.1.1 拷贝DNA_TRANSFER

拷贝操作由transferBlocks负责

1
2
3
4
5
6
7
8
9
private void transferBlocks( Block blocks[], DatanodeInfo xferTargets[][]) {
for (int i = 0; i < blocks.length; i++) {
try {
transferBlock(blocks[i], xferTargets[i]);
} catch (IOException ie) {
LOG.warn("Failed to transfer " + blocks[i], ie);
}
}
}

如上将区块blocks[i]传输到xferTargets[i]指定的系列数据节点中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void transferBlock( Block block, DatanodeInfo xferTargets[]) throws IOException {
if (!data.isValidBlock(block)) {
//区块不存在或正在构造
String errStr = "Can't send invalid " + block;
LOG.info(errStr);
notifyNamenode(DatanodeProtocol.INVALID_BLOCK, errStr);//通过errorReport报告错误
return;
}
long onDiskLength = data.getLength(block);
if (block.getNumBytes() > onDiskLength) {
//磁盘中区块文件小于希望的大小,区块损坏,通过reportBadBlocks报告
namenode.reportBadBlocks(new LocatedBlock[]{new LocatedBlock(block, new DatanodeInfo[] {
new DatanodeInfo(dnRegistration)})});
LOG.info("Can't replicate " + block + " because on-disk length " + onDiskLength
+ " is shorter than NameNode recorded length " + block.getNumBytes());
return;
}
int numTargets = xferTargets.length;
if (numTargets > 0) {
...//日志记录
new Daemon(new DataTransfer(xferTargets, block, this)).start();
}
}

如上,首先检查区块是否正常,如果区块不存在或者正在构造,则通过DataProtocol代理调用errorReport向名字节点报告错误。如果区块大小小于期望的大小,则区块损坏,通过reportBadBlocks向名字节点报告。
正常情况创建DataTransfer后台线程传输区块。该线程中执行写操作一样的流程,构建BlockSender,然后发送写请求到下游节点xferTargets[0](上述一维数组而不是二维),建立数据流管道,建立数据流管道时client信息为空。建立数据流管道后,通过BlockSender的sendBlock方法发送数据。具体代码这里不再分析。

2.1.2 删除DNA_INVALIDATE

如processCommand中,先在区块管理器中删除记录,然后通过FSDataset对象data的invalidate方法删除区块文件和校验文件。
invalidate如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void invalidate(Block invalidBlks[]) throws IOException {
boolean error = false;
for (int i = 0; i < invalidBlks.length; i++) {
File f = null;
FSVolume v;
synchronized (this) {
f = getFile(invalidBlks[i]);
DatanodeBlockInfo dinfo = volumeMap.get(invalidBlks[i]);
if (dinfo == null) {
...//日志记录
error = true;
continue;
}
v = dinfo.getVolume();
if (f == null) {
...//日志记录
error = true;
continue;
}
if (v == null) {
...//日志记录
error = true;
continue;
}
File parent = f.getParentFile();
if (parent == null) {
...//日志记录
error = true;
continue;
}
v.clearPath(parent);
volumeMap.remove(invalidBlks[i]);//从volume中移除记录
}
File metaFile = getMetaFile( f, invalidBlks[i] );
long dfsBytes = f.length() + metaFile.length();//区块文件和校验文件所占空间
//通过FSDatasetAsyncDiskService创建线程异步删除
asyncDiskService.deleteAsync(v, f, metaFile, dfsBytes, invalidBlks[i].toString());
}
if (error) {
throw new IOException("Error in deleting blocks.");
}
}

如上,区块文件和校验文件通过FSDatasetAsyncDiskService删除,会创建ReplicaFileDeleteTask线程异步删除区块文件和校验文件,删除后减少所属FSVolume的使用量,FSDatasetAsyncDiskService另见DataNode本地存储管理

1
2
3
4
5
void deleteAsync(FSDataset.FSVolume volume, File blockFile, File metaFile, long dfsBytes, String blockName) {
DataNode.LOG.info("Scheduling " + blockName + " file " + blockFile + " for deletion");
ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(volume, blockFile, metaFile, dfsBytes, blockName);
execute(volume.getCurrentDir(), deletionTask);

}

2.1.3 恢复DNA_RECOVERBLOCK

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
Daemon d = new Daemon(threadGroup, new Runnable() {
public void run() {
for(int i = 0; i < blocks.length; i++) {
try {
logRecoverBlock("NameNode", blocks[i], targets[i]);
recoverBlock(blocks[i], false, targets[i], true);
} catch (IOException e) {
LOG.warn("recoverBlocks FAILED, blocks[" + i + "]=" + blocks[i], e);
}
}
}
});
d.start();
return d;
}

如上创建后台线程对需要恢复的区块循环调用recoverBlock进行恢复,recoverBlock在前面已经分析过了,这里注意的是第二个参数keepLength为false即会将各节点上区块截断到所有节点最短长度,第四个参数closeFile为true,即写完数据到本地文件后会提交到current目录并通知NameNode。

2.1.4 其他

  • 关闭数据节点(DNA_SHUTDOWN)见后面数据节点关闭分析。
  • 重新注册(DNA_REGISTER)通过register方法重新向NameNode注册,使用DatanodeProtocol代理register方法注册获取集群信息,更新本本节点的信息,然后根据需要向NameNode报告blocksBeingWritten目录下区块,并开始请求AsyncBlockReport扫描本地区块以便在offerService中能够向NameNode报告区块信息。
  • 提交更新(DNA_FINALIZE)通过DataStorage对象storagefinalizeUpgrade方法对每个管理的存储目录提交更新

    1
    2
    3
    4
    5
    void finalizeUpgrade() throws IOException {
    for (Iterator<StorageDirectory> it = storageDirs.iterator(); it.hasNext();) {
    doFinalize(it.next());
    }
    }

    doFinalize方法见DataNode本地存储管理

  • 开始更新(UC_ACTION_START_UPGRADE)和更新访问令牌(DNA_ACCESSKEYUPDATE)这里不分析
  • 更新宽带均衡器带宽(DNA_BALANCERBANDWIDTHUPDATE)如processCommand中的处理

    1
    2
    3
    4
    5
    6
    7
    8
    int vsn = ((BalancerBandwidthCommand) cmd).getBalancerBandwidthVersion();
    if (vsn >= 1) {
    long bandwidth = ((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue();
    if (bandwidth > 0) {
    DataXceiverServer dxcs = (DataXceiverServer) this.dataXceiverServer.getRunnable();
    dxcs.balanceThrottler.setBandwidth(bandwidth);
    }
    }

    此时对应为BalancerBandwidthCommand,关于BalancerBandwidthCommand见HDFS相关实体对象
    获取BalancerBandwidthCommand中的带宽,然后重新设置DataXceiverServer中BlockBalanceThrottler的带宽。


3. 数据节点关闭

数据节点关闭由shutdown负责

3.1 插件,HTTP服务器,RPC服务

按照程序流程,首先关闭插件和HTTP服务器,以及提供的RPC服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if (plugins != null) {//停止插件
for (ServicePlugin p : plugins) {
try {
p.stop();
LOG.info("Stopped plug-in " + p);
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be stopped", t);
}
}
}
this.unRegisterMXBean();
if (infoServer != null) {//关闭HTTP服务器
try {
infoServer.stop();
} catch (Exception e) {
LOG.warn("Exception shutting down DataNode", e);
}
}
if (ipcServer != null) {//关闭RPC服务
ipcServer.stop();
}

插件和HTTP服务器没有分析,因此这里的关闭也不分析。为Client和DataNode提供的RPC服务关闭为RPC服务器关闭过程,见RPC源码分析下篇

3.2 流服务

然后关闭DataXceiverServer和DataXceiver线程,即流服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
this.shouldRun = false;
if (dataXceiverServer != null) {
((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
this.dataXceiverServer.interrupt();

// wait for all data receiver threads to exit
if (this.threadGroup != null) {
while (true) {
this.threadGroup.interrupt();
LOG.info("Waiting for threadgroup to exit, active threads is " +
this.threadGroup.activeCount());
if (this.threadGroup.activeCount() == 0) {
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
}
}
try {
this.dataXceiverServer.join();
} catch (InterruptedException ie) {
}
}

如上,设置shouldRun为false,然后kill DataXceiverServer,关闭绑定的Socket和所有连接上的socket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void kill() {
try {
this.ss.close();//关闭绑定的socket
}...//异常

synchronized (childSockets) {//关闭所有读写连接的Socket
for (Iterator<Socket> it = childSockets.values().iterator(); it.hasNext();) {
Socket thissock = it.next();
try {
thissock.close();
} catch (IOException e) {
}
}
}
}

这样,ss关闭会导致接下来到来的connect请求超时,而DataXceiver上的socket关闭,会导致相应的读写操作超时,从读写失败最后检测到DataNode.shouldRun为false退出

如上,除了关闭socket外,还中断DataXceiverServer线程,这会将可能阻塞在accept操作上的DataXceiverServer线程唤醒,然后检测到DataNode.shouldRun为false退出。
中断所有DataXceiver线程,目前没有发现在DataXceiver线程处理过程中对Interrupt的响应(因为获得的是Socket输入输出流,读写操作不会响应Interrupt),因此估计最终还是因为读写超时而退出线程(可能没分析到)

3.3 RPC代理,更新管理器,区块扫描器

关闭了流服务后,关闭到NameNode的代理以及更新管理器,区块扫描器

1
2
3
4
5
6
7
8
9
10
RPC.stopProxy(namenode); //停止RPC代理
if(upgradeManager != null)//关闭更新管理
upgradeManager.shutdownUpgrade();
if (blockScannerThread != null) { //中断区块扫描器线程
blockScannerThread.interrupt();
try {
blockScannerThread.join(3600000L); // wait for at most 1 hour
} catch (InterruptedException ie) {
}
}

如上,RPC代理关闭见RPC源码分析下篇,更新管理器没有分析,关闭过程也不分析。
区块扫描器线程循环中,每次判断shouldRun和isInterrupted

1
while (datanode.shouldRun && !Thread.interrupted())

而在循环中要么处于扫描过程,这时无法中断要等待扫描完成(最多尝试2次),因此在上面join等待最多1个小时。要么没有需要扫描的区块,处于休眠状态,这时interrupt操作会唤醒,从而调用shutdown退出

1
2
3
4
5
6
7
8
9
10
11
12
13
synchronized void shutdown() {
LogFileHandler log = verificationLog;
verificationLog = null;
if (log != null) {
log.close();
}
}
synchronized void close() {
if (out != null) {
out.close();
out = null;
}
}

如上,关闭当前日志文件

3.4 DataStorage

然后释放DataStorage获得存储目录独占锁,关闭”in_use.lock”文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if (storage != null) {
try {
this.storage.unlockAll();
} catch (IOException ie) {
}
}
public void unlockAll() throws IOException {
for (Iterator<StorageDirectory> it = storageDirs.iterator(); it.hasNext();) {
it.next().unlock();//释放每个存储目录的锁
}
}
public void unlock() throws IOException {
if (this.lock == null)//已经释放了
return;
this.lock.release();//释放文件锁
lock.channel().close();//关闭锁文件
lock = null;
}

3.5 DataNode线程

接着中断DataNode线程,并等待其结束

1
2
3
4
5
6
7
if (dataNodeThread != null) {
dataNodeThread.interrupt();
try {
dataNodeThread.join();
} catch (InterruptedException ie) {
}
}

在DataNode线程中,可能处于sendHeartBeat,blockReceived,blockReport与NameNode交互的过程中,这时因为namenode代理已经关闭了,将抛出异常从而检测shouldRun为false。
也可能处于processCommand的过程中,这时可能需要等待处理过程完成。
也可能因为当前循环事情处理完,还没到达下一心跳周期处于休眠状态,这时会被interrupt唤醒,从而检测到shouldRun为false。最终会退出。(按照线程流程,如果循环退出或者异常中调用shutdown会一直死循环吗)

3.6 FSDataset,DataNodeInstrumentation

然后关闭FSDataset对象以及度量统计DataNodeInstrumentation对象

1
2
3
4
5
6
if (data != null) {
data.shutdown();
}
if (myMetrics != null) {
myMetrics.shutdown();
}

如上,DataNodeInstrumentation没有分析,这里也不分析关闭过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void shutdown() {
if (mbeanName != null)
MBeans.unregister(mbeanName);

if (asyncBlockReport != null) {
asyncBlockReport.shutdown();
}

if (asyncDiskService != null) {
asyncDiskService.shutdown();
}

if(volumes != null) {
for (FSVolume volume : volumes.volumes) {
if(volume != null) {
volume.dfsUsage.shutdown();
}
}
}
}

如上,关闭异步区块报告线程,关闭异步磁盘服务线程,然后关闭FSVolume每一个存储目录下的DU对象
异步区块报告线程关闭如下

1
2
3
4
synchronized void shutdown() {
shouldRun = false;
thread.interrupt();
}

设置shouldRun为false,然后中断线程。这样如果线程在进行磁盘扫描,则要等待扫描完成后才能检测到shouldRun为false,如果在等待DataNode请求扫描中,则会被interrupt唤醒,然后退出。
异步磁盘服务线程关闭如下

1
2
3
4
5
6
7
8
9
10
11
12
synchronized void shutdown() {
if (executors == null) {//已经关闭了
LOG.warn("AsyncDiskService has already shut down.");
} else {
LOG.info("Shutting down all async disk service threads...");
for (Map.Entry<File, ThreadPoolExecutor> e : executors.entrySet()) {
e.getValue().shutdown();//关闭线程池
}
executors = null;
LOG.info("All async disk service threads have been shut down");
}
}

如上,关闭每一个线程池执行器ThreadPoolExecutor,从而关闭其中的线程。