NameNode实现源码分析---租约管理

Hadoop版本:Hadoop-1.2.1
参考:《Hadoop技术内幕-深入解析Hadoop Common和HDFS架构设计与实现原理》


1. NameNode租约管理线程

NameNode中租约管理由LeaseManager负责,LeaseManager和相关线程初始化如下

1
2
3
4
public LeaseManager leaseManager = new LeaseManager(this); 

this.lmthread = new Daemon(leaseManager.new Monitor());
lmthread.start();

2. 租约管理器LeaseManager

租约管理器主要成员如下

1
2
3
4
5
6
private final FSNamesystem fsnamesystem;//所属FSNamesystem
private long softLimit = FSConstants.LEASE_SOFTLIMIT_PERIOD;//租约超期软周期
private long hardLimit = FSConstants.LEASE_HARDLIMIT_PERIOD;//租约超期硬周期
private SortedMap<String, Lease> leases = new TreeMap<String, Lease>();//客户端-租约映射
private SortedSet<Lease> sortedLeases = new TreeSet<Lease>();//所有租约
private SortedMap<String, Lease> sortedLeasesByPath = new TreeMap<String, Lease>();//路径名-租约映射

一个客户端在NameNode中维护一个租约,客户端可能持有多个正在写的文件,因此客户端持有的租约可能对应着多个文件系统路径。
leases维护了客户端到客户端租约的映射,而sortedLeasesByPath维护了每个正在写的文件到该文件对应所属客户端租约的映射。
sortedLeases是租约管理器管理的所有租约集合,为TreeSet,元素为Lease,通过Lease的compareTo进行排序。
LEASE_SOFTLIMIT_PERIOD为租约超期的软周期,客户端尝试打开已经被其他客户端打开的文件时,如果不指定强制恢复租约,则占用文件的客户端超过了软周期没有更新租约,尝试打开文件的客户端便会进行租约恢复。软周期为60*1000即1min。
LEASE_HARDLIMIT_PERIOD为租约超期的硬周期,在租约管理线程LeaseManager.Monitor中超过该时间的租约没更新,NameNode便会发起租约恢复。硬周期为60*60*1000,即1h。

3. 租约Lease

租约是LeaseManager的内部类,实现了Comparable接口,主要成员如下

1
2
3
private final String holder;
private long lastUpdate;
private final Collection<String> paths = new TreeSet<String>();

如上,holder为持有该租约的客户端,lastUpdate为客户端上次租约更新时间,paths为改租约管理的打开文件路径,也就是客户端打开的文件。
一个租约管理多个打开文件,即一个客户端所有打开文件由一个租约管理,当客户端更新租约时,则重新获取了所有打开文件的持有权限。
如2.1租约管理器中的sortedLeases中,元素为Lease,通过Lease的compareTo方法排序

1
2
3
4
5
6
7
8
9
10
11
12
13
public int compareTo(Lease o) {
Lease l1 = this;
Lease l2 = o;
long lu1 = l1.lastUpdate;
long lu2 = l2.lastUpdate;
if (lu1 < lu2) {
return -1;
} else if (lu1 > lu2) {
return 1;
} else {
return l1.holder.compareTo(l2.holder);
}
}

如上,根据最近更新时间lastUpdate排序,lastUpdate小的即最长时间没有更新的排在前面,更新时间一样的通过客户端排序。

4. LeaseManager.Monitor线程

线程主程序如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void run() {
for(; fsnamesystem.isRunning(); ) {
synchronized(fsnamesystem) {
checkLeases();
}

try {
Thread.sleep(2000);
} catch(InterruptedException ie) {
if (LOG.isDebugEnabled()) {
LOG.debug(name + " is interrupted", ie);
}
}
}
}

如上,每过2s检查一次。通过checkLeases进行检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
synchronized void checkLeases() {
for(; sortedLeases.size() > 0; ) {
final Lease oldest = sortedLeases.first();
if (!oldest.expiredHardLimit()) {//如果最长时间没有更新的租约都没有超期,返回
return;
}

final List<String> removing = new ArrayList<String>();
// need to create a copy of the oldest lease paths, becuase
// internalReleaseLease() removes paths corresponding to empty files,
// i.e. it needs to modify the collection being iterated over
// causing ConcurrentModificationException
String[] leasePaths = new String[oldest.getPaths().size()];//租约管理的所有打开路径
oldest.getPaths().toArray(leasePaths);
for(String p : leasePaths) {//恢复最长时间没有更新的租约
try {
fsnamesystem.internalReleaseLeaseOne(oldest, p);
} catch (IOException e) {
LOG.error("Cannot release the path "+p+" in the lease "+oldest, e);
removing.add(p);
}
}

for(String p : removing) {租约恢复失败,直接删除
removeLease(oldest, p);
}
}
}

如上,因为sortedLeases中的Lease根据最近更新时间lastUpdate来排序的,lastUpdate小的排在前面。checkLeases取出sortedLeases中的第一个Lease即为最长时间没有更新的租约,如果没有超期,则说明剩下的所有的都没超期。
在循环中每次都取出第一个Lease,因为如果第一个租约超期了则会进行恢复,恢复后该租约lastUpdate时间更新或者不存在该租约了,因此在sortedLeases中位置改变了,重新读取第一个元素则为下一个租约,可见,要么是sortedLeases中的第一个元素没有超期,要么是所有超期元素都处理完了,才会退出循环。

判断租约是否超期通过expiredHardLimit完成

1
2
3
public boolean expiredHardLimit() {
return FSNamesystem.now() - lastUpdate > hardLimit;
}

如上,从上次租约更新时间lastUpdate到现在已经过去了hardLimit,hardLimit为LeaseManager成员,为60*60*1000ms(1h),即1个小时没有更新租约,则租约管理器判断该租约失效,需要恢复租约。

在checkLeases循环中,如果第一个Lease超期,则对改租约管理的所有打开路径进行租约恢复,每一个路径的恢复通过FSNamesystem的internalReleaseLeaseOne方法完成,恢复失败则直接通过removeLease从租约中删除该路径,removeLease如下

1
2
3
4
5
6
7
8
9
10
11
12
13
synchronized void removeLease(Lease lease, String src) {
sortedLeasesByPath.remove(src);
if (!lease.removePath(src)) {
LOG.error(src + " not found in lease.paths (=" + lease.paths + ")");
}

if (!lease.hasPath()) {
leases.remove(lease.holder);
if (!sortedLeases.remove(lease)) {
LOG.error(lease + " not found in sortedLeases");
}
}
}

如上,从sortedLeasesByPath,lease中移除路径记录,如果路径所属租约的所有路径都释放了,则从leases,sortedLeases中移除该租约。

4.1 internalReleaseLeaseOne

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void internalReleaseLeaseOne(Lease lease, String src) throws IOException {
assert Thread.holdsLock(this);//保证调用该方法时持有了FSNamesystem对象锁

INodeFile iFile = dir.getFileINode(src);
if (iFile == null) {//找不到对应INode文件异常
...
throw new IOException(message);
}
if (!iFile.isUnderConstruction()) {//租约恢复的文件不是处于写状态,异常
...
throw new IOException(message);
}

INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;

//通过追加文件初始化租约恢复。如果该文件没有区块,则立刻收回该租约。否则新建一个租约然后触发租约恢复。
if (pendingFile.getTargets() == null || pendingFile.getTargets().length == 0) {
if (pendingFile.getBlocks().length == 0) {//文件没有数据块,直接关闭文件
finalizeINodeFileUnderConstruction(src, pendingFile);
return;
}
// 数据流管道成员为空,选择文件的最后一个区块对应的数据节点重新设置数据流管道成员
Block[] blocks = pendingFile.getBlocks();
Block last = blocks[blocks.length-1];//最后一个区块
DatanodeDescriptor[] targets = new DatanodeDescriptor[blocksMap.numNodes(last)];
Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(last);
for (int i = 0; it != null && it.hasNext(); i++) {
targets[i] = it.next();
}
pendingFile.setTargets(targets);//重新设置数据流管道成员
}
//从数据流管道成员中,选择第一个存活的数据节点作为恢复的主节点,将最后一个区块恢复信息添加到该数据节点的recoverBlocks中
pendingFile.assignPrimaryDatanode();
//为该文件恢复一个新的临时租约,回收原来的租约
Lease reassignedLease = reassignLease(lease, src, HdfsConstants.NN_RECOVERY_LEASEHOLDER, pendingFile);
leaseManager.renewLease(reassignedLease);
}

如上,恢复文件的租约,其实就是说要保证文件区块副本保持一致状态,然后关闭文件,从租约移除该文件的管理记录,文件在租约管理器中没有记录,则刻意被其他客户端持有(打开进行相应操作等)。如果租约没有剩余管理文件,则从租约管理器中移除租约对象,对应到前面removeLease方法,会对客户端打开的每个文件调用internalReleaseLeaseOne,则当最后一个打开文件执行完后,客户端持有的租约会从租约管理器中移除。

如果文件没有区块(此时客户端刚创建文件,还没来得及写就出现了故障,技术内幕),则通过finalizeINodeFileUnderConstruction直接关闭文件,会回收租约。
如果文件有区块,不过最后一个区块数据流管道成员不存在(客户端通过abandonBlock,放弃数据块后故障,技术内幕),则使用最后一个区块对应的数据节点信息设置数据流管道成员。然后从数据流管道成员中选择一个存活的数据节点作为主节点进行区块修复,将最后一个区块的修复信息(区块,数据流管道成员)添加到选择的主数据节点的recoverBlocks中,然后为这个文件分配一个新的临时租约,对应持有者名字为NN_Recovery,从原来的租约移除路径记录,如果原来租约没有剩余管理路径,则移除原来租约对象。在主数据节点下次心跳到来时发送区块修复命令,修复完后文件副本区块状态一致,由数据节点commitBlockSynchronization报告区块恢复结果,NameNode将对应文件记录在NN_Recovery中移除。

相关方法具体分析见下

4.1.1 finalizeINodeFileUnderConstruction

finalizeINodeFileUnderConstruction用于关闭一个正在写的文件,同时回收租约

1
2
3
4
5
6
7
8
9
10
11
12
private void finalizeINodeFileUnderConstruction(String src, INodeFileUnderConstruction pendingFile) throws IOException {
leaseManager.removeLease(pendingFile.clientName, src);//从客户端clientName租约中移除路径

//INodeFileUnderConstruction转化为INodeFile,然后将转换后的INodeFile替换原来的INodeFileUnderConstruction
INodeFile newFile = pendingFile.convertToInodeFile();
dir.replaceNode(src, pendingFile, newFile);

//关闭文件并且持久化文件的区块
dir.closeFile(src, newFile);
//检查所有的区块副本数是否达到期望值,如果没有则添加相应信息到neededReplications中
checkReplicationFactor(newFile);
}

如上首先通过leaseManager的removeLease方法从客户端租约中移除路径记录,通过clientName获取到租约Lease对象,然后由前面分析过的removeLease方法移除。
然后将INodeFileUnderConstruction转换为INodeFile,根据INodeFileUnderConstruction记录的当前文件信息创建新的INodeFile对象

  • convertToInodeFile

    1
    2
    3
    4
    5
    INodeFile convertToInodeFile() {
    INodeFile obj = new INodeFile(getPermissionStatus(), getBlocks(), getReplication(),
    getModificationTime(), getModificationTime(), getPreferredBlockSize());
    return obj;
    }
  • replaceNode
    由FSDirectory的replaceNode方法用新建的INodeFile对象替换原来的INodeFileUnderConstruction对象

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    void replaceNode(String path, INodeFile oldnode, INodeFile newnode) throws IOException {
    synchronized (rootDir) {
    if (!oldnode.removeNode()) {//从目录树中移除旧的INodeFile
    ...//移除失败,抛出异常
    }

    /* Currently oldnode and newnode are assumed to contain the same
    * blocks. Otherwise, blocks need to be removed from the blocksMap.
    */


    rootDir.addNode(path, newnode); //新的INodeFile添加到目录树中

    int index = 0;
    //更新blocksMap中区块信息BlockInfo对应的INodeFile对象为新的INodeFile,设置新的INodeFile和区块的对应关系
    for (Block b : newnode.getBlocks()) {
    BlockInfo info = namesystem.blocksMap.addINode(b, newnode);
    newnode.setBlock(index, info); // inode refers to the block in BlocksMap
    index++;
    }
    }
    }

    如上,首先从目树中移除旧的INodeFile(我们这里便是INodeFileUnderConstruction),然后添加新的INodeFile到目录树中。然后通过blocksMap的addINode方法更新blocksMap中原来区块信息BlockMap的INodeFile为新的INodeFile,并设置新INodeFile的区块信息。该方法假定旧的INodeFile和新的INodeFile拥有相同的区块信息,因此不需要移除原来INodeFile的区块信息,而只是在原来区块信息的基础上更新INodeFile对象。

  • closeFile
    替代完后通过FSDirectory的closeFile方法关闭文件

    1
    2
    3
    4
    5
    6
    7
    8
    void closeFile(String path, INodeFile file) throws IOException {
    waitForReady();
    synchronized (rootDir) {
    fsImage.getEditLog().logCloseFile(path, file);
    ...
    }
    }
    }

    如上,关闭操作已经完成了,这里只是在FSEditLog记录关闭日志

  • checkReplicationFactor
    最后的checkReplicationFactor方法会检查文件的所有区块有效副本数是否达到期望副本值,对于没有达到的区块添加到neededReplications中等待复制
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    private void checkReplicationFactor(INodeFile file) {
    int numExpectedReplicas = file.getReplication();
    Block[] pendingBlocks = file.getBlocks();
    int nrBlocks = pendingBlocks.length;
    for (int i = 0; i < nrBlocks; i++) {
    NumberReplicas number = countNodes(pendingBlocks[i]);//统计副本状态
    if (number.liveReplicas() < numExpectedReplicas) {//有效副本数小于期望副本值
    //添加到neededReplications中等待复制
    neededReplications.add(pendingBlocks[i], number.liveReplicas(),
    number.decommissionedReplicas, numExpectedReplicas);
    }
    }
    }

4.1.2 assignPrimaryDatanode

如前面internalReleaseLeaseOne方法中,assignPrimaryDatanode方法用于从文件的数据流管道成员中找出一个存活的数据节点,然后执行区块恢复相关操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void assignPrimaryDatanode() {
if (targets.length == 0) {
...
}

int previous = primaryNodeIndex;
//从primaryNodeIndex开始查找一个存活的数据节点
for(int i = 1; i <= targets.length; i++) {
int j = (previous + i)%targets.length;
if (targets[j].isAlive) {
//第一个找到的存活的数据节点作为区块修复主节点
DatanodeDescriptor primary = targets[primaryNodeIndex = j];
//添加区块修复信息(修复的区块为最后一个区块,参与修复的数据节点为targets)添加到主节点的recoverBlocks中
primary.addBlockToBeRecovered(blocks[blocks.length - 1], targets);
...
return;
}
}
}

如上,在targets中从primaryNodeIndex索引开始查找一个存活的数据节点,第一个存活的数据节点作为区块修复的主数据节点,然后将区块修复信息添加到主节点的recoverBlocks中。我们知道recoverBlocks为BlockQueue类型,包含操作的区块和对应的数据节点,因此这里修复的区块为最后一个节点,而参与修复的数据节点为targets。

4.1.3 reassignLease

internalReleaseLeaseOne中,reassignLease为进行区块修复的文件分配一个新的临时租约,然后从原来租约中移除路径记录,调用如下

1
Lease reassignedLease = reassignLease(lease, src, HdfsConstants.NN_RECOVERY_LEASEHOLDER, pendingFile);

lease为原来文件的租约,src为文件路径,NN_RECOVERY_LEASEHOLDER为常亮NN_Recovery是所有租约恢复的临时租约持有者,pendingFile为文件对象INodeFile。

1
2
3
4
5
6
private Lease reassignLease(Lease lease, String src, String newHolder, INodeFileUnderConstruction pendingFile) {
if(newHolder == null)
return lease;
pendingFile.setClientName(newHolder);//设置新的客户端
return leaseManager.reassignLease(lease, src, newHolder);//分配新的租约,回收旧的租约
}

如上,将文件打开的客户端设置为临时持有者NN_Recovery,然后通过reassignLease从旧的租约中移除该路径,并为该路径分配新的租约

1
2
3
4
5
6
7
synchronized Lease reassignLease(Lease lease, String src, String newHolder) {
assert newHolder != null : "new lease holder is null";
if (lease != null) {
removeLease(lease, src);//从lease中移除src
}
return addLease(newHolder, src);//添加新的租约
}

如上,removeLease前面分析过了,将src从lease中相关结构中移除,如果lease没有剩余的管理路径,则从租约管理器中相关数据结构中移除lease。
addLease如下

1
2
3
4
5
6
7
8
9
10
11
12
13
synchronized Lease addLease(String holder, String src) {
Lease lease = getLease(holder);
if (lease == null) {//如果不存在,创建新的租约,添加到leases,sortedLeases中
lease = new Lease(holder);
leases.put(holder, lease);
sortedLeases.add(lease);
} else {
renewLease(lease);//更新租约lastUpdate
}
sortedLeasesByPath.put(src, lease);//添加到sortedLeasesByPath中
lease.paths.add(src);//新租约中添加src
return lease;
}

如上,不存在就创建(此时lastUpdate当然为创建时间),然后添加到租约管理器中相关结构中,存在的话通过renewLease更新lastUpdate以及在sortedLeases中位置

1
2
3
4
5
6
7
synchronized void renewLease(Lease lease) {
if (lease != null) {
sortedLeases.remove(lease);
lease.renew();
sortedLeases.add(lease);
}
}

从sortedLeases中移除,更新lastUpdate,再添加到sortedLeases中,从而更新在sortedLeases中位置。

这样reassignLease后,原来租约中便没有路径的记录了,如果原来租约没有剩余管理路径则会从租约管理器中移除原来租约对象,该路径文件拥有了新的租约,租约对应的持有者为NN_Recovery,所有参与租约修复的路径都被该持有者管理,在修复完成通过commitBlockSynchronization通知NameNode时,从NN_Recovery租约中移除。

5. 数据节点区块恢复完成报告

4.中分析了Monitor线程检测到客户端长时间没有更新租约,从而恢复该租约的过程。如前面分析,租约恢复时,对于每一个客户端持有的文件,会进行区块修复,以保证所有副本一致,区块修复前,会从原来的租约中移除该文件记录,然后分配一个临时租约,持有者为NN_Recovery。数据节点的区块修复过程见DataNode实现源码分析。在区块修复完成后,数据节点会通过NameNode的RPC代理调用commitBlockSynchronization报告修复结果,因此看下commitBlockSynchronization在NameNode中的实现。

1
2
3
4
public void commitBlockSynchronization(Block block, long newgenerationstamp, long newlength,
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets ) throws IOException
{

namesystem.commitBlockSynchronization(block, newgenerationstamp, newlength, closeFile, deleteblock, newtargets);
}

如上,还是由FSNamesystem提供服务,方法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public void commitBlockSynchronization(Block lastblock, long newgenerationstamp, long newlength,
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets) throws IOException {
...//日志记录
String src = null;
try {
synchronized (this) {
if (isInSafeMode()) {//安全模式,不处理,抛出异常
throw new SafeModeException("Cannot commitBlockSynchronization " + lastblock, safeMode);
}
final BlockInfo oldblockinfo = blocksMap.getStoredBlock(lastblock);
if (oldblockinfo == null) {//对应区块信息不存在,抛出异常
throw new IOException("Block (=" + lastblock + ") not found");
}
INodeFile iFile = oldblockinfo.getINode();
if (!iFile.isUnderConstruction()) {//文件不处于构建状态,抛出异常
...//抛出异常
}
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;

//generationStamp改变了,从blocksMap中移除旧的区块
blocksMap.removeBlock(oldblockinfo);

if (deleteblock) {//数据节点没有找到可恢复的区块,区块恢复失败,删除
pendingFile.removeBlock(lastblock);
} else {
//更新打开文件的最后一个区块,构造新的区块信息,添加到blocksMap中
lastblock.set(lastblock.getBlockId(), newlength, newgenerationstamp);
final BlockInfo newblockinfo = blocksMap.addINode(lastblock, pendingFile);

DatanodeDescriptor[] descriptors = null;
List<DatanodeDescriptor> descriptorsList = new ArrayList<DatanodeDescriptor>(newtargets.length);
for(int i = 0; i < newtargets.length; i++) {
DatanodeDescriptor node = datanodeMap.get(newtargets[i].getStorageID());
if (node != null) {
if (closeFile) {
//如果不关闭文件,则不应该将最后一个区块信息添加到数据节点的区块列表中,因为该区块还处于构建状态,不应该属于数据节点
//对应的是在所属的INodeFileUnderConstruction的targets成员记录正在构建的数据节点
//例如,在getAdditionalBlock中,获取的区块信息就不会添加到数据节点区块列表中
node.addBlock(newblockinfo);//关闭文件,添加区块到数据节点区块列表
}
descriptorsList.add(node);
} else {
...//记录日志
}
}
if (!descriptorsList.isEmpty()) {
descriptors = descriptorsList.toArray(new DatanodeDescriptor[0]);
}
//创建的新的区块信息和对应数据节点更新到INodeFileUnderConstruction中
pendingFile.setLastBlock(newblockinfo, descriptors);
}

//如果不关闭文件,且durableSync为true,则持久化打开文件的区块到编辑日志中
src = leaseManager.findPath(pendingFile);
if (!closeFile) {
if (durableSync) {
dir.persistBlocks(src, pendingFile);
}
LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
return;
}
//如果closeFile为true,关闭文件,从租约中移除路径记录
finalizeINodeFileUnderConstruction(src, pendingFile);
}
} finally {
if (closeFile || durableSync) {
getEditLog().logSync();//刷新FSEditLog到底层文件
}
}
...//日志
}

lastBlock为提交的区块,newgenerationstamp为区块修复过程中申请的新的generationStamp,newlength为修复后的长度(小于等于原来长度),closeFile数据节点提交时指定是否关闭文件,deleteblock数据节点提交时指定是否删除该区块(区块修复失败),newTargets为成功进行区块修复的数据节点,作为修复后区块对应的新的数据节点。
首先,进行安全性检查,安全模式下不处理区块修复提交,如果对应的区块信息不存在或者文件不处于构建状态也会抛出异常,正常情况下会的到对应的INodeFileUnderConstruction对象。

然后,通过blocksMap的removeBlock方法从blocksMap中移除旧的区块信息,因为区块修复时会申请新的generationStamp
blocksMap的removeBlock方法如下

1
2
3
4
5
6
7
8
9
10
void removeBlock(BlockInfo blockInfo) {
if (blockInfo == null)
return;
blockInfo.inode = null;
for(int idx = blockInfo.numNodes()-1; idx >= 0; idx--) {
DatanodeDescriptor dn = blockInfo.getDatanode(idx);
dn.removeBlock(blockInfo); // remove from the list and wipe the location

}
blocks.remove(blockInfo); // remove block from the map
}

如上,会依次从BlockInfo的triplets中获取对应的数据节点对象,然后解除和数据节点的双向关系,DatanodeDescriptor的removeBlock方法在前面1.2.2.1小节已经分析过了,会从数据节点的区块列表中移除区块,且从区块信息的triplets中移除数据节点信息。
然后从blocksMap中移除区块信息(blocks)。

接着,如果区块修复失败,数据节点通知要删除区块时,会删除区块,否则更新新的区块元数据
从INodeFileUnderConstruction中删除一个区块由INodeFileUnderConstruction的removeBlock方法完成,就是改变blocks成员,比较简单。
如果不删除区块,则需要更新元数据。
首先通过blocksMap的addINode方法查找现有区块信息或添加新的区块信息,即如果当前blocks中存在区块信息则返回否则创建新的区块信息添加到blocks中并返回。BlocksMap查找元素见NameNode实现源码分析—数据块和数据节点相关数据结构和线程中的1.2.1小节,blocksMap中数据存储通过链式哈希表,查找时首先根据Block的hashCode算出在数组中索引,获取首元素后遍历链表,当ID和generationStamp相等时元素相等。因此由于这里区块的generationStamp改变了,在blocksMap中找不到区块信息,会新建BlockInfo然后添加到blocks中。
然后如果需要关闭文件的话,将区块信息添加到数据节点的区块列表中。这里注意的是,如果不关闭文件,则不应该将最后一个区块信息添加到数据节点的区块列表中,因为该区块还处于构建状态,不属于数据节点,对应的是在所属的INodeFileUnderConstruction的targets成员记录正在构建的数据节点。例如,在getAdditionalBlock申请新的区块后,获取的区块信息就不会添加到数据节点区块列表中。
最后将新的区块信息添加到INodeFileUnderConstruction中,并设置INodeFileUnderConstruction的targets为新的targets。

最后,如果不关闭文件,且durableSync为true时,通过FSDirectory的persistBlocks持久化打开文件的所有区块到编辑日志中,返回。
persistBlocks如下,通过logOpenFile记录正在打开文件的区块

1
2
3
4
5
6
7
void persistBlocks(String path, INodeFileUnderConstruction file) throws IOException {
waitForReady();
synchronized (rootDir) {
fsImage.getEditLog().logOpenFile(path, file);//记录日志
...
}
}

否则通过finalizeINodeFileUnderConstruction关闭文件,finalizeINodeFileUnderConstruction方法在前面2.3.1.1分析过了,会从租约中移除文件记录(因此对应为从NN_Recovery租约中移除文件记录),将INodeFileUnderConstruction转换为INodeFile,并在blocksMap中进行替换,然后在日志中记录文件的关闭操作,最终会检查所有的区块副本数是否达到期望值,对于不满足条件的添加到neededReplications中等待复制。
此外,关闭文件时要通过FSEditLog的logSync方法刷新日志记录到底层文件中。

因此,NameNode租约恢复时,从原来租约中移除文件,然后分配临时持有者为NN_Recovery的租约,在数据节点区块恢复commitBlockSynchronization通知NameNode时,对于NameNode发起的租约恢复closeFile为true,因此最终会从NN_Recovery临时租约中移除文件记录。


6. 客户端租约相关

前面分析了NameNode的租约维护和发起租约恢复的过程,并分析了数据节点区块恢复后通知NameNode,从而从NN_Recovery临时租约中移除记录的过程。接下来分析客户端租约相关过程。

6.1 打开文件时租约检查以及租约恢复

客户端不管是创建文件还是为了追加数据而打开文件,都会调用FSNamesystem.startFileInternal(具体相关分析见其他文章),在startFileInternal中会判断文件是否已经被打开(可能被其他客户端打开,也可能被当前客户端打开),如下

1
recoverLeaseInternal(myFile, src, holder, clientMachine, false);

其中,myFile为要打开或追加文件的INodeFile对象,src为文件路径,holder为当前客户端,clientMachine为客户端主机,recoverLeaseInternal如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void recoverLeaseInternal(INode fileInode, String src, String holder, String clientMachine, boolean force)
throws IOException {
if (fileInode != null && fileInode.isUnderConstruction()) {
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) fileInode;
Lease lease = leaseManager.getLease(holder);//当前客户端持有的租约
if (!force && lease != null) {
Lease leaseFile = leaseManager.getLeaseByPath(src);//要打开文件在租约管理器中所属租约
//文件所属租约为当前客户端租约,即当前客户端已经打开了该文件,抛出异常
if (leaseFile != null && leaseFile.equals(lease)) {
...//AlreadyBeingCreatedException异常
}
}
lease = leaseManager.getLease(pendingFile.clientName);//文件所属租约
if (lease == null) {//文件已经被打开了,但是没有对应的租约对象,抛出AlreadyBeingCreatedException异常
...//AlreadyBeingCreatedException异常
}

//force为true,则强制恢复租约,将src所有副本恢复到一致状态,然后关闭,从现有租约lease中移除记录,以便重新打开文件
if (force) {
LOG.info("recoverLease: recover lease " + lease + ", src=" + src + " from client " + pendingFile.clientName);
internalReleaseLeaseOne(lease, src);
} else {
//force为false,如果原来的租约到了租约超期软限制,则恢复租约
if (lease.expiredSoftLimit()) {
LOG.info("startFile: recover lease " + lease + ", src=" + src +
" from client " + pendingFile.clientName);
internalReleaseLease(lease, src);
}
//force为false,不管有没有进行租约恢复,都抛出AlreadyBeingCreatedException,即其他客户端正在占用该文件
...//抛出AlreadyBeingCreatedException异常
}
}
}

如上,如果文件处于构建状态,则文件应该对应一个租约,如果该租约不存在,抛出AlreadyBeingCreatedException异常。
租约存在时,如果文件所属租约就是当前客户端租约时,表明客户端已经打开了该文件,抛出AlreadyBeingCreatedException异常。

如果文件所属租约不是当前客户端租约时,表明被其他客户端占用。
如果指定了force,则不管其他客户端是否有没有在软周期内更新租约,都强制通过internalReleaseLeaseOne进行租约恢复,internalReleaseLeaseOne在前面分析过了,会从原来租约lease中移除文件src的记录,分配临时租约NN_Recovery,然后执行区块恢复将所有文件副本恢复到一致状态,然后关闭文件,最终从临时租约中移除src记录,从而客户端能够重新打开文件进行操作。
而没有指定force,则其他占有该文件的客户端超过软周期(1min)没有更新租约,则进行租约恢复,然后不管是否进行了租约恢复,抛出AlreadyBeingCreatedException异常。internalReleaseLease如下

1
2
3
4
5
6
7
8
9
10
11
12
void internalReleaseLease(Lease lease, String src) throws IOException {
//对所有打开文件进行恢复,文件区块副本恢复到一致状态后关闭,从lease中移除文件记录,然后移除lease
if (lease.hasPath()) {
String[] leasePaths = new String[lease.getPaths().size()];
lease.getPaths().toArray(leasePaths);
for (String p: leasePaths) {
internalReleaseLeaseOne(lease, p);
}
} else {//租约没有打开的文件,恢复src,src的区块恢复到一致状态后关闭
internalReleaseLeaseOne(lease, src);
}
}

如上,对客户端的所有占有文件进行恢复操作,从租约中移除文件记录,将文件区块副本恢复到一致状态然后关闭。如果该租约已经没有管理的打开文件时,则internalReleaseLeaseOne对文件src进行区块恢复工作,恢复到一致状态后,进行关闭(无法从lease中移除src记录了)

6.2 打开文件时添加租约

同6.1中分析,在startFileInternal中,如果通过recoverLeaseInternal检查后,当前客户端能够正常的打开文件,则在最后会添加租约信息

1
leaseManager.addLease(newNode.clientName, src);

addLease在前面4.1.3中已经分析过了,如果客户端没有租约对象,则创建租约对象,在租约中添加src,此时租约的lastUpdate为创建时间,然后更新租约管理器中的相关成员。
而如果客户端存在租约对象,则在相关租约对象中添加src,更新租约的lastUpdate为当前时间,相应的改变了在sortedLeases中的位置,同样的更新租约管理器中相关成员。

6.3 checkLease

客户端执行添加区块(getAdditionalBlock),为追加数据打开文件(appendFile),放弃区块(abandonBlock),关闭文件(complete->completeFileInternal),持久化文件元数据(fsync)等这些与打开文件相关的请求时(以上方法都是ClientProtocol),NameNode在以上方法实现中都需要对被操作的打开文件进行租约检查,由FSNamesystem.checkLease完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void checkLease(String src, String holder, INode file) throws IOException {
if (file == null || file.isDirectory()) {//文件不存在
Lease lease = leaseManager.getLease(holder);
throw new LeaseExpiredException("No lease on " + src +...);
}
if (!file.isUnderConstruction()) {//文件不是处于构建状态
Lease lease = leaseManager.getLease(holder);
throw new LeaseExpiredException("No lease on " + src +...);
}
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file;
//租约持有者与当前操作客户端不符
if (holder != null && !pendingFile.getClientName().equals(holder)) {
throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
+ pendingFile.getClientName() + " but is accessed by " + holder);
}
}

如上,这些对打开文件的相关操作,都会检查文件是否存在且是否处于构建状态,处于构建状态时,文件租约持有者是否为当前请求客户端。

6.4 renewLease

客户端持有租约时,会周期性的通过ClientProtocol的renewLease更新租约,renewLease实现如下

1
2
3
public void renewLease(String clientName) throws IOException {
namesystem.renewLease(clientName);
}

还是由FSNamesystem提供服务,如下

1
2
3
4
5
synchronized void renewLease(String holder) throws IOException {
if (isInSafeMode())
throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
leaseManager.renewLease(holder);
}

安全模式下不进行租约更新,由租约管理器的renewLease进行租约更新,renewLease在前面4.1.3中分析过了,首先从sortedLeases中移除租约,然后更新租约的lastUpdate为当前时间,再添加到sortedLeases中,从而改变了在sortedLeases中位置(sortedLeases根据lastUpdate排序)

6.5 客户端主动租约恢复

客户端可以通过ClientProtocol中的recoverLease方法主动进行租约恢复,在NameNode实现如下

1
2
3
4
public boolean recoverLease(String src, String clientName) throws IOException {
String clientMachine = getClientMachine();
return namesystem.recoverLease(src, clientName, clientMachine);
}

恢复客户端clientName占有的文件src,由FSNamesystem提供服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
boolean recoverLease(String src, String holder, String clientMachine) throws IOException {
FSPermissionChecker pc = getPermissionChecker();
synchronized (this) {
if (isInSafeMode()) {//安全模式不进行租约恢复
throw new SafeModeException("Cannot recover the lease of " + src, safeMode);
}
if (!DFSUtil.isValidName(src)) {//检查是否为合法的文件名
throw new IOException("Invalid name: " + src);
}

INode inode = dir.getFileINode(src);
if (inode == null) {//对应文件不存在
throw new FileNotFoundException("File not found " + src);
}

if (!inode.isUnderConstruction()) {//文件没有打开,直接返回true
return true;
}
if (isPermissionEnabled) {//检查是否有写的权限
checkPathAccess(pc, src, FsAction.WRITE);
}
//恢复租约
recoverLeaseInternal(inode, src, holder, clientMachine, true);
}
return false;
}

如上,一系列检查后,如果文件没有打开,则客户端可以打开该文件,因此直接返回。否则通过recoverLeaseInternal进行租约恢复,传入的最后一个参数force为true,即如果文件被其他客户端打开,强制进行租约恢复。