Hadoop版本:Hadoop-1.2.1
参考:《Hadoop技术内幕-深入解析Hadoop Common和HDFS架构设计与实现原理》
当通过start-dfs.sh
或者start-all.sh
等方式启动DataNode时,最终会调用DataNode的main
方法,这中间过程不再分析。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28public static void main(String args[]) {
secureMain(args, null);
}
public static void secureMain(String [] args, SecureResources resources) {
try {
StringUtils.startupShutdownMessage(DataNode.class, args, LOG);//启动日志,设置关闭日志
DataNode datanode = createDataNode(args, null, resources);//创建DataNode启动
if (datanode != null)//主线程中等待DataNode运行完
datanode.join();
} catch (Throwable e) {
LOG.error(StringUtils.stringifyException(e));
System.exit(-1);
} finally {
// We need to add System.exit here because either shutdown was called or
// some disk related conditions like volumes tolerated or volumes required
// condition was not met. Also, In secure mode, control will go to Jsvc and
// the process hangs without System.exit.
LOG.info("Exiting Datanode");
System.exit(0);
}
}
void join() {
if (dataNodeThread != null) {
try {
dataNodeThread.join();
} catch (InterruptedException e) {}
}
}
如上,首先会记录”STARTUP_MSG。。。”这样的启动日志信息,在DataNode的日志文件中可以找到,然后注册关闭日志”SHUTDOWN_MSG。。。”。通过createDataNode
创建DataNode并启动,在main线程中等待datanode执行完。1
2
3
4
5public static DataNode createDataNode(String args[], Configuration conf, SecureResources resources) throws IOException {
DataNode dn = instantiateDataNode(args, conf, resources);
runDatanodeDaemon(dn);
return dn;
}
如上,通过instantiateDataNode
实例化DataNode对象,然后通过runDatanodeDaemon
向NameNode注册,并创建dataNodeThread线程启动。
1. 创建实例对象
1 | public static DataNode instantiateDataNode(String args[], Configuration conf, SecureResources resources) throws IOException { |
如上,读取dfs.data.dir
配置项作为DataNode的本地存储目录,该目录可以有多个,以,
分隔,然后通过makeInstance
创建DataNode对象1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public static DataNode makeInstance(String[] dataDirs, Configuration conf, SecureResources resources) throws IOException {
UserGroupInformation.setConfiguration(conf);
LocalFileSystem localFS = FileSystem.getLocal(conf);//本地文件系统(含校验和)
ArrayList<File> dirs = new ArrayList<File>();
//dfs.datanode.data.dir.perm配置项,配置dfs.data.dir权限
FsPermission dataDirPermission = new FsPermission(conf.get(DATA_DIR_PERMISSION_KEY, DEFAULT_DATA_DIR_PERMISSION));
for (String dir : dataDirs) {
try {
DiskChecker.checkDir(localFS, new Path(dir), dataDirPermission);//检查目录是否有读/写权限,没有抛出异常,不会添加到dirs中
dirs.add(new File(dir));
} catch(IOException e) {
LOG.warn("Invalid directory in " + DATA_DIR_KEY + ": " +
e.getMessage());
}
}
if (dirs.size() > 0) //存在满足条件的本地存储目录,创建DataNode对象
return new DataNode(conf, dirs, resources);
LOG.error("All directories in " + DATA_DIR_KEY + " are invalid.");
return null;//没有满足条件的目录,不能创建DataNode,返回null
}
如上,dfs.data.dir
目录对应的权限为配置项dfs.datanode.data.dir.perm
,默认为755
。要保证配置的项为目录且有读写权限,否则不能作为DataNode的本地存储目录,当没有满足条件的本地存储目录时,不能创建DataNode对象,返回null,否则通过DataNode的构造函数创建DataNode对象。1
2
3
4
5
6
7
8
9
10
11
12
13
14DataNode(final Configuration conf, final AbstractList<File> dataDirs, SecureResources resources) throws IOException {
super(conf);
SecurityUtil.login(conf, DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY);
datanodeObject = this;
//这里为true,启动时若发现blocksBeingWritten目录下有文件,将这些区块文件添加到ongoingCreates和volumeMap中,否则删除
durableSync = conf.getBoolean("dfs.durable.sync", true);
this.userWithLocalPathAccess = conf .get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
try {
startDataNode(conf, dataDirs, resources);
} catch (IOException ie) {
shutdown();
throw ie;
}
}
如上,通过startDataNode
初始化其他主要成员,这里关于安全登陆的不做分析。
在startDataNode中构造初始化了DataNode的主要成员,这里分析主要成员的初始化
1.1 配置相关成员
1 | this.blockReportInterval = conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL); |
DataNode需要周期性的向NameNode报告区块信息,dfs.blockreport.intervalMsec
定义了报告的周期,默认1个小时,dfs.blockreport.initialDelay
用于初始化DataNode创建时的lastBlockReport
时间,一般来说启动后立即向NameNode报告。dfs.heart.interval
定义了DataNode向NameNode发送心跳的周期,默认3s。
1.2 服务地址
1.2.1 机器名machineName
1 | if (conf.get("slave.host.name") != null) { |
DataNode在集群中的机器名通过slave.host.name
配置,如果没有通过DNS查询
1.2.2 NameNode地址
1 | InetSocketAddress nameNodeAddr = NameNode.getServiceAddress(conf, true); |
如上,如果配置项dfs.namenode.servicerpc-address
没有配置或为空,使用dfs.namenode.rpc-address
作为NameNod地址,如果dfs.namenode.rpc-address配置项也没有配置或为空时,使用缺省的fs.default.name
地址。
这里涉及到NameNode的RPC服务,NameNode根据配置可以提供两个RPC服务器。如果配置了dfs.namenode.servicerpc-address,则DataNode和SecondaryNameNode与NameNode的RPC通信使用该地址的RPC服务,而Client与NameNode的RPC通信使用dfs.namenode.rpc-address
地址的RPC服务。如果没有配置dfs.namenode.servicerpc-address,则只有一个RPC服务器,Client,DataNode和SecondaryNameNode都使用dfs.namenode.rpc-address地址的RPC服务(没有配置的话,RPC服务地址为fs.default.name,端口为缺省值8020)。
1.2.3 本机流服务地址
1 | InetSocketAddress socAddr = DataNode.getStreamingAddr(conf); |
流服务地址现在使用dfs.datanode.address
配置,包括端口,老的流服务器使用dfs.datanode.bindAddress
配置地址,dfs.datanode.port
配置端口。
NetUtils.getServerAddress不再分析,逻辑为当新的地址和端口没有配置,不管有没有配置旧的地址和端口都会抛出异常,而同时配置新的和旧的配置项时,会使用新的配置项,在日志中会提示旧的配置项已经不建议使用。dfs.datanode.address
在hdfs-default.xml
中配置为0.0.0.0:50010
。
1.3 流服务相关
1.3.1 配置选项
1 | this.socketTimeout = conf.getInt("dfs.socket.timeout", HdfsConstants.READ_TIMEOUT);//默认1min |
socketTimeout
,流服务中建立Socket连接时的连接超时和Socket读超时的超时时间,默认1minsocketWriteTimeout
,流服务中Socket写超时时间,默认6mintransferToAllowed
,是否支持”零拷贝传输”,在不同平台上应该设置不同的值,关于”零拷贝传输”在后面数据传输中具体分析writePacketSize
,为数据传输过程中写数据的包大小,参考HDFS流接口中的写数据包响应
1.3.2 Socket
1 | ServerSocket ss; |
创建Socket,绑定流服务地址和端口
1.3.3 服务线程DataXceiverServer
1 | this.threadGroup = new ThreadGroup("dataXceiverServer"); |
DataNode的流服务由DataXceiverServer提供,其管理的Socket ss连接到来的请求,accept后创建DataXceiver线程进行相应的读写操作,创建的线程不能超过maxXceiverCount
。
1.4 NameNode RPC代理
1 | this.namenode = (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class, DatanodeProtocol.versionID, nameNodeAddr, conf); |
如上,最终通过RPC.getProxy
获取代理对象,默认情况下创建动态代理后,会首次通过checkVersion
进行首次通信,验证版本号,因此会连接NameNode。NameNode没有启动或者正在忙的时候会超时,没有超过设置的超时时间(默认值Long.MAX_VALUE)等待1s继续尝试获取代理。
RPC.getProxy另见RPC源码分析上篇
1.5 对外提供的RPC服务
1 | InetSocketAddress ipcAddr = NetUtils.createSocketAddr(conf.get("dfs.datanode.ipc.address")); |
如上,通过配置项dfs.datanode.ipc.address
配置DataNode对外(DataNode对应InterDatanodeProtocol,Client对应ClientDatanodeProtocol)提供的RPC服务地址,然后通过RPC.getServer
创建RPC服务器,服务器的处理器线程数为3个,关于RPC.getServer另见RPC源码分析上篇
此外,更新dnRegistration中的ipc地址和端口信息(dnRegistration包含DataNode向NameNode通信时所有需要的注册信息,另见HDFS相关实体对象),创建过程见下面,这里没有按照DataNode构造方法中的顺序来。
1.6 本地存储相关
在DataNode本地存储管理中分析了DataNode的本地存储由DataStorage和FSDataset负责
创建DataStorage1
storage = new DataStorage();
通过机器名和流服务地址端口创建DatanodeRegistration对象,其他成员如layoutVersion等还为空1
this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);
然后从NameNode中获取集群的信息1
2NamespaceInfo nsInfo = handshake();
StartupOption startOpt = getStartupOption(conf);
如上,handshake通过NameNode代理通过versionRequest
询问NameNode版本信息1
2
3
4
5
6
7
8
9boolean simulatedFSDataset = conf.getBoolean("dfs.datanode.simulateddatastorage", false);
if (simulatedFSDataset) {
...//伪分布式处理
}
else { // real storage
storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
this.dnRegistration.setStorageInfo(storage);
this.data = new FSDataset(storage, conf);//创建FSDataset对象,读取所有存储目录下的文件初始化成员volumeMap
}
如上,创建DataStorage对象,从NameNode获取命名空间信息后,分析本地目录,根据启动方式和集群版本做相应的处理,dfs.datanode.simulateddatastorage
为true时这里不分析,主要用于调试,性能评估。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82void recoverTransitionRead(NamespaceInfo nsInfo, Collection<File> dataDirs, StartupOption startOpt) throws IOException {
assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() : "Data-node and name-node layout versions must be the same.";
// 1. For each data directory calculate its state and
// check whether all is consistent before transitioning.
// Format and recover.
this.storageID = "";
this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(dataDirs.size());
for(Iterator<File> it = dataDirs.iterator(); it.hasNext();) {//遍历所有配置的目录
File dataDir = it.next();
StorageDirectory sd = new StorageDirectory(dataDir);//创建StorageDirectory
StorageState curState;
try {
curState = sd.analyzeStorage(startOpt);//分析存储目录的状态
// sd is locked but not opened
switch(curState) {
case NORMAL:
break;
case NON_EXISTENT://NON_EXISTENT,跳过不添加到DataStorage的storageDirs中
LOG.info("Storage directory " + dataDir + " does not exist");
it.remove();
continue;
case NOT_FORMATTED: //NOT_FORMATTED,格式化,删除原来current目录,创建新的current目录,根据nsInfo写VERSION文件
LOG.info("Storage directory " + dataDir + " is not formatted");
LOG.info("Formatting ...");
format(sd, nsInfo);
break;
default: //中间状态,恢复到NORMAL
sd.doRecover(curState);
}
} catch (IOException ioe) {
sd.unlock();
throw ioe;
}
// add to the storage list
addStorageDir(sd);
dataDirStates.add(curState);
}
if (dataDirs.size() == 0) // none of the data dirs exist
throw new IOException( "All specified directories are not accessible or do not exist.");
// 2. Do transitions
// Each storage directory is treated individually.
// During sturtup some of them can upgrade or rollback
// while others could be uptodate for the regular startup.
for(int idx = 0; idx < getNumStorageDirs(); idx++) {
doTransition(getStorageDir(idx), nsInfo, startOpt);
assert this.getLayoutVersion() == nsInfo.getLayoutVersion() :
"Data-node and name-node layout versions must be the same.";
assert this.getCTime() == nsInfo.getCTime() :
"Data-node and name-node CTimes must be the same.";
}
// 3. Update all storages. Some of them might have just been formatted.
this.writeAll();
}
private void doTransition( StorageDirectory sd, NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
if (startOpt == StartupOption.ROLLBACK)
doRollback(sd, nsInfo); //回滚方式启动,回滚
sd.read();//重新读取版本文件
checkVersionUpgradable(this.layoutVersion);//检查版本,不能为LAST_UPGRADABLE_LAYOUT_VERSION(-7)之前的版本
assert this.layoutVersion >= FSConstants.LAYOUT_VERSION : "Future version is not allowed";
if (getNamespaceID() != nsInfo.getNamespaceID())
throw new IOException( "Incompatible namespaceIDs in " + sd.getRoot().getCanonicalPath()
+ ": namenode namespaceID = " + nsInfo.getNamespaceID()
+ "; datanode namespaceID = " + getNamespaceID());
if (this.layoutVersion == FSConstants.LAYOUT_VERSION && this.cTime == nsInfo.getCTime())
return; //版本与集群版本一致
verifyDistributedUpgradeProgress(nsInfo);
if (this.layoutVersion > FSConstants.LAYOUT_VERSION || this.cTime < nsInfo.getCTime()) {
doUpgrade(sd, nsInfo); //当前版本小于集群版本,更新
return;
}
// layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
// must shutdown
throw new IOException("Datanode state: LV = " + this.getLayoutVersion()
+ " CTime = " + this.getCTime() + " is newer than the namespace state: LV = "
+ nsInfo.getLayoutVersion() + " CTime = " + nsInfo.getCTime());
}
如上,对每一个配置的目录,首先检查当前状态,如果为NON_EXISTENT
不会添加到DataStorage的管理目录内。否则如果为NOT_FORMATTED
,则格式化后添加到DataStorage的storageDirs中,而如果为NORMAL
,则直接添加到storageDirs中,处于其他中间状态的,通过doRecover恢复到NORMAL状态,添加到storageDirs中。
通过analyzeStorage
获取当前目录状态,通过format
格式化,通过doRecover
恢复这些方法见DataNode本地存储管理
recoverTransitionRead之后,DataNode的本地存储目录处于正常状态,构建FSDataset对象,FSDataset构造见DataNode本地存储管理,会对每一个存储目录构造相应的FSVolume对象,然后读取本地存储目录下的区块文件初始化volumeMap成员。
至此,本地存储相关的DataStorage和FSDataset对象构建和初始化完成了,DataStorage管理的存储目录在storageDirs中,而FSDataset的volumes包含了所有的FSVolume对象,volumeMap包含了所有现存的区块信息。
1.7 HTTP服务器
1 | InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf); |
如上,getInfoAddr读取配置dfs.datanode.http.address
作为HTTP服务器的地址和端口,创建HttpServer,addInternalServlet
已经声明为Deprecated,设置属性datanode
,datanode.blockScanner
,current.conf
可以直接在jsp文件中或Servlet类中使用,添加一个Servlet,路径为/blockScannerReport,对应的类为DataBlockScanner.Servlet
。
HTTP服务器在创建DataNode时启动,其他服务都在DataNode线程中启动,此外更新dnRegistration中关于info地址和端口信息。
1.8 其他
1.8.1 区块扫描器
1 | blockScanner = new DataBlockScanner(this, (FSDataset)data, conf); |
区块扫描器所属DataNode为当前DataNode,管理的数据集为DataNode的FSDataset,扫描周期为dfs.datanode.scan.period.hours
,默认情况下3个小时。
1.8.2 度量统计
1 | myMetrics = DataNodeInstrumentation.create(conf, dnRegistration.getStorageID()); |
1.8.3 区块口令安全管理器
1 | // BlockTokenSecretManager is created here, but it shouldn't be |
1.8.4 插件
1 | plugins = conf.getInstances("dfs.datanode.plugins", ServicePlugin.class); |
总结DataNode中的相关资源实例
- NameNode
RPC代理
- 对外提供RPC服务的
RPC服务器
(ClientDatanodeProtocol,InterDatanodeProtocol) - 流服务请求处理线程
DataXceiverServer
和若干处理请求的DataXceiver
线程 - HTTP服务器
HttpServer
- 区块扫描器
BlockScanner
,默认3周扫描一次 - 管理本地存储的
DataStorage
和FSDataset
,其中FSDataset中有用于异步区块删除的线程FSDatasetAsyncDiskService
和用于异步区块扫描(扫描本地区块用于区块报告)的线程AsyncBlockReport
- 度量统计
DataNodeInstrumentation
- 安全相关的
BlockTokenSecretManager
- 可能的若干插件
ServicePlugin
线程
2. DataNode启动
1 | public static void runDatanodeDaemon(DataNode dn) throws IOException { |
2.1 注册
如上,启动DataNode线程前先向NameNode注册,从NameNode中获取存储信息,报告当前区块信息等1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35private void register() throws IOException {
if (dnRegistration.getStorageID().equals("")) {
setNewStorageID(dnRegistration);
}
while(shouldRun) {//注册超时时,等待1s中继续注册
try {
dnRegistration.name = machineName + ":" + dnRegistration.getPort();
dnRegistration = namenode.register(dnRegistration);
break;
} catch(SocketTimeoutException e) { // namenode is busy
LOG.info("Problem connecting to server: " + getNameNodeAddr());
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {}
}
}
if (storage.getStorageID().equals("")) {//当前storageID没设置,通过从NameNode中获取的信息设置
storage.setStorageID(dnRegistration.getStorageID());
storage.writeAll();//写到VERSION文件
}
if(!storage.getStorageID().equals(dnRegistration.getStorageID())) {
throw new IOException("Inconsistent storage IDs. Name-node returned "
+ dnRegistration.getStorageID() + ". Expecting " + storage.getStorageID());
}
...//安全相关
if (durableSync) {//不删除blocksBeingWritten目录中的区块,则需要向NameNode报告这些区块信息
Block[] bbwReport = data.getBlocksBeingWrittenReport();
long[] blocksBeingWritten = BlockListAsLongs.convertToArrayLongs(bbwReport);
namenode.blocksBeingWrittenReport(dnRegistration, blocksBeingWritten);
}
data.requestAsyncBlockReport();
scheduleBlockReport(initialBlockReportDelay);
}
如上,通过NameNode代理register向NameNode注册并返回集群的版本信息,更新DataNode中的版本信息。如果blocksBeingWritten目录下存在区块,且允许不删除这些区块(这些区块会在创建FSDataset对象时恢复到ongoingCreates和volumeMap中),需要向NameNode报告这些区块信息。然后进行区块报告相关设置。
2.2 启动DataNode线程
注册后,创建DataNode线程启动,线程主流程如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public void run() {
LOG.info(dnRegistration + "In DataNode.run, data = " + data);
dataXceiverServer.start();//启动流服务
ipcServer.start();//启动RPC服务器
while (shouldRun) {
try {
startDistributedUpgradeIfNeeded();
offerService();//主服务
} catch (Exception ex) {
LOG.error("Exception: " + StringUtils.stringifyException(ex));
if (shouldRun) {
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
}
}
}
}
//通过shutdown退出
LOG.info(dnRegistration + ":Finishing DataNode in: "+data);
shutdown();
}
如上,启动流服务,RPC服务(HTTP服务,插件等在DataNode构造中已经启动,区块扫描器线程在DataNode线程中启动),DataNode运行时通过offerService提供主服务,退出时通过shutdown清理相关资源1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122public void offerService() throws Exception {
while (shouldRun) {
try {
long startTime = now();
if (startTime - lastHeartbeat > heartBeatInterval) {//到达心跳间隔,发送心跳信息
//心跳信息包括,DataNode注册信息,数据传输端口,当前总容量,剩余容量
lastHeartbeat = startTime;
DatanodeCommand[] cmds = namenode.sendHeartbeat(
dnRegistration, data.getCapacity(), data.getDfsUsed(),
data.getRemaining(), xmitsInProgress.get(), getXceiverCount());
myMetrics.addHeartBeat(now() - startTime);
if (!processCommand(cmds))//处理NameNode发送过来的命令
continue;
}
// check if there are newly received blocks
Block [] blockArray=null;
String [] delHintArray=null;
synchronized(receivedBlockList) {
synchronized(delHints) {
int numBlocks = receivedBlockList.size();
if (numBlocks > 0) {
if(numBlocks!=delHints.size()) {
LOG.warn("Panic: receiveBlockList and delHints are not of the same length" );
}
blockArray = receivedBlockList.toArray(new Block[numBlocks]);//新接收到的区块
delHintArray = delHints.toArray(new String[numBlocks]);//这些节点上的对应区块将被删除
}
}
}
if (blockArray != null) {
if(delHintArray == null || delHintArray.length != blockArray.length ) {
LOG.warn("Panic: block array & delHintArray are not the same" );
}
//向NameNode报告新接收到的区块信息,delHintArray如果不为空则节点上对应的区块将被删除
namenode.blockReceived(dnRegistration, blockArray, delHintArray);
synchronized (receivedBlockList) {
synchronized (delHints) {
for(int i=0; i<blockArray.length; i++) {
receivedBlockList.remove(blockArray[i]);
delHints.remove(delHintArray[i]);
}
}
}
}
// Send latest blockinfo report if timer has expired.
if (startTime - lastBlockReport > blockReportInterval) {
if (data.isAsyncBlockReportReady()) {//异步区块扫描器已经扫描完,scan中保存了扫描结果,发送扫描结果区块报告
long brCreateStartTime = now();
Block[] bReport = data.retrieveAsyncBlockReport();//获取扫描结果
long brSendStartTime = now();
//发送扫描结果,区块报告
DatanodeCommand cmd = namenode.blockReport(dnRegistration, BlockListAsLongs.convertToArrayLongs(bReport));
long brSendCost = now() - brSendStartTime;
long brCreateCost = brSendStartTime - brCreateStartTime;
myMetrics.addBlockReport(brSendCost);
//
// If we have sent the first block report, then wait a random
// time before we start the periodic block reports.
//
if (resetBlockReportTime) {
lastBlockReport = startTime - R.nextInt((int)(blockReportInterval));
resetBlockReportTime = false;
} else {
/* say the last block report was at 8:20:14. The current report
* should have started around 9:20:14 (default 1 hour interval).
* If current time is :
* 1) normal like 9:20:18, next report should be at 10:20:14
* 2) unexpected like 11:35:43, next report should be at
* 12:20:14
*/
lastBlockReport += (now() - lastBlockReport) / blockReportInterval * blockReportInterval;
}
processCommand(cmd);//处理区块报告时,NameNode发送过来的命令
} else {
data.requestAsyncBlockReport();//如果区块扫描器没有扫描结果,则请求区块扫描器开始扫描
if (lastBlockReport > 0) { // this isn't the first report
...//记录日志
}
}
}
//启动区块扫描器
if (blockScanner != null && blockScannerThread == null &&
upgradeManager.isUpgradeCompleted()) {
LOG.info("Starting Periodic block scanner");
blockScannerThread = new Daemon(blockScanner);
blockScannerThread.start();
}
//
// There is no work to do; sleep until hearbeat timer elapses,
// or work arrives, and then iterate again.
//
long waitTime = heartBeatInterval - (System.currentTimeMillis() - lastHeartbeat);
synchronized(receivedBlockList) {//没有新的可发送的区块,且还没到达心跳周期,等待
if (waitTime > 0 && receivedBlockList.size() == 0) {
try {
receivedBlockList.wait(waitTime);
} catch (InterruptedException ie) {
}
delayBeforeBlockReceived();
}
} // synchronized
} catch(RemoteException re) {
String reClass = re.getClassName();
if (UnregisteredDatanodeException.class.getName().equals(reClass) ||
DisallowedDatanodeException.class.getName().equals(reClass) ||
IncorrectVersionException.class.getName().equals(reClass)) {
LOG.warn("DataNode is shutting down: " +
StringUtils.stringifyException(re));
shutdown();
return;
}
LOG.warn(StringUtils.stringifyException(re));
} catch (IOException e) {
LOG.warn(StringUtils.stringifyException(e));
}
} // while (shouldRun)
} // offerService
如上,在主服务线程中,如果到达心跳周期,通过NameNode代理sendHeartbeat
向NameNode发送心跳,包括DataNode的注册信息,数据传输端口,总容量,剩余容量,然后使用processCommand
处理返回值中NameNode的命令。
如果接收到了新的区块,通过NameNode代理blockReceived
向NameNode报告,其中的delHintArray
参数表示对应的区块在这些节点上应该删除。
如果到达区块报告周期,查找异步区块扫描线程AsyncBlockReport
扫描结果,如果扫描完成,通过NameNode代理blockReport
进行区块报告,并通过processCommand
处理NameNode下达的命令。而如果没有扫描结果,则通知AsyncBlockReport开始扫描本地区块。
然后如果区块扫描器没有启动,且更新管理器当前没有在进行更新操作,则启动区块扫描器
以上事情处理完后,如果没有到达下一次心跳周期处理过程中没有接收到新的区块,则等待直到下一次心跳时间。
processCommand对NameNode命令的处理涉及到数据的读写等操作,见后面具体分析。
2.3 区块扫描器线程
区块扫描器线程主程序如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public void run() {
try {
init();//初始化,获取所有的区块,记录用的日志文件
//读取存储在本地的日志文件,更新区块扫描记录
if (!assignInitialVerificationTimes()) {
return;
}
adjustThrottler();//根据当期需要扫描的数据量设置扫描节流器,控制扫描频率
while (datanode.shouldRun && !Thread.interrupted()) {//主循环
long now = System.currentTimeMillis();
synchronized (this) {
if ( now >= (currentPeriodStart + scanPeriod)) {
startNewPeriod();
}
}
if ( (now - getEarliestScanTime()) >= scanPeriod ) {//如果最长时间没有扫描的区块应该扫描了,则扫描该区块
verifyFirstBlock();
} else {
try {
Thread.sleep(1000);//最长时间没有扫描的区块都不需要扫描,休眠1s
} catch (InterruptedException ignored) {}
}
}
...//异常处理和最终关闭区块扫描器
}
如上,首先通过init初始化,会读取当前的区块,记录在成员blockInfoSet
和blockMap
中,并读取日志文件
2.3.1 初始化
1 | private void init() { |
如上,初始化时通过FSDataset读取所有的区块,每一个区块创建BlockScanInfo对象,添加到blockMap和blockInfoSet中。然后根据本地的日志文件创建LogFileHandler对象。
2.3.1.1 BlockScanInfo
BlockScanInfo记录了区块的扫描信息,成员如下1
2
3
4
5Block block;//区块
long lastScanTime = 0;//上次扫描时间
long lastLogTime = 0;//上次日志记录时间
ScanType lastScanType = ScanType.NONE;//扫描类型
boolean lastScanOk = true;//扫描结果,true表示区块没有错误
其中ScanType为枚举类型1
2
3
4
5private static enum ScanType {
REMOTE_READ,
VERIFICATION_SCAN,
NONE,
}
REMOTE_READ
,Client读取数据时会进行验证,当读取整个区块时,Client会发送验证结果给DataNode,如果验证无误,则标识为REMOTE_READ,即由Client验证过VERIFICATION_SCAN
,区块扫描器周期性扫描NONE
,还未扫描过
回到init方法,每个区块创建BlockScanInfo对象,lastScanTime为负数,依次递增,此时扫描状态为NONE。通过addBlockInfo添加到成员blockInfoSet和blockMap中
添加到blockInfoSet中时,因为blockInfoSet是TreeSet,通过BlockScanInfo的compareTo方法比较1
2
3
4
5public int compareTo(BlockScanInfo other) {
long t1 = lastScanTime;
long t2 = other.lastScanTime;
return ( t1 < t2 ) ? -1 : (( t1 > t2 ) ? 1 : block.compareTo(other.block));
}
因此,lastScanTime小的在前面,即最长时间没有扫描的排在前面,在后面扫描时会先扫描。
初始化所有的Block后,会在所有存储目录下找是否存在dncp_block_verification.log.curr
或dncp_block_verification.log.prev
文件,找到了则使用该存储目录存储日志,否则使用第一个存储目录创建日志文件存储日志。根据找到的存储目录和日志文件创建LogFileHandler对象
2.3.1.2 LogFileHandler
LogFileHandler负责日志文件相关处理,成员如下1
2
3
4
5
6
7private File curFile;//当前日志文件
private File prevFile;//上一次使用的日志文件
private int maxNumLines = -1;
private int curNumLines = -1;//当前日志文件最大行
long lastWarningTime = 0;
private PrintStream out;//当前日志文件输出流
int numReaders = 0;
如上,日志记录包含两个文件,.curr
和.prev
,如果curNumLines
大于maxNumLines
且到了可以回滚的时间,则将curr文件重命名为prev文件,创建新的curr日志文件,逻辑在rollIfRequired中1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19private void rollIfRequired() throws IOException {
if (curNumLines < maxNumLines || numReaders > 0) {
return;
}
long now = System.currentTimeMillis();
if (now < minRollingPeriod) {
return;
}
if (!prevFile.delete() && prevFile.exists()) {//删除prev文件
throw new IOException("Could not delete " + prevFile);
}
close();//关闭当前文件输出流out
if (!curFile.renameTo(prevFile)) {cur文件重命名为prev文件
openCurFile();//重命名失败,重新打开curr文件
throw new IOException("Could not rename " + curFile + " to " + prevFile);
}
openCurFile();//打开新的curr文件
updateCurNumLines();//重新读取curr文件,更新curNumLines
}
如上,minRollingPeriod为6 * 3600 * 1000L
即6个小时。上面的close方法会关闭当前curr文件的输出流out,而openCurFile会重新打开curr文件的输出流构造out,在将curr文件重命名为prev文件后,重新打开curr文件为空文件。然后更新curr文件的行数curNumLines。
成员numReaders
表示当前读取日志文件的Reader数,Reader为LogFileHandler的内部类,后面分析。
因此,回到init方法中构造LogFileHandler对象1
2
3
4
5
6
7LogFileHandler(File dir, String filePrefix, int maxNumLines) throws IOException {
curFile = new File(dir, filePrefix + curFileSuffix);
prevFile = new File(dir, filePrefix + prevFileSuffix);
openCurFile();
curNumLines = -1;
setMaxNumLines(maxNumLines);
}
如上,初始化curNumLines为-1,最大行数为100。
在init方法的最后构造区块扫描器使用的DataTransferThrottler,周期为200ms,带宽为8MB/s,用于在控制扫描区块频率,因为扫描区块时通过BlockSender,控制BlockSender的带宽即控制扫描频率。
2.3.2 读取之前扫描结果
DataBlockScanner线程中,初始化后通过assignInitialVerificationTimes读取本地日志文件信息,即之前扫描结果1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27private boolean assignInitialVerificationTimes() {
...
LogFileHandler.Reader logReader = null;
try {
if (verificationLog != null) {
logReader = verificationLog.new Reader(false);//创建Reader对日志文件进行读取
}
}...//异常处理
if (verificationLog != null) {//更新curr日志文件行数,创建Reader读取,参数为true
verificationLog.updateCurNumLines();
}
try {
while (logReader != null && logReader.hasNext()) {//读取每一行,更新区块扫描信息
if (!datanode.shouldRun || Thread.interrupted()) {
return false;
}
LogEntry entry = LogEntry.parseEntry(logReader.next());
if (entry != null) {
updateBlockInfo(entry);
}
}
} finally {
IOUtils.closeStream(logReader);
}
...//初始化之前没有扫描的区块lastScanTime
return true;
}
如上,创建Reader进行日志记录的读取
2.3.2.1 LogFileHandler.Reader
成员如下1
2
3
4BufferedReader reader;//FileReader输入流
File file;//读取的文件
String line;//下一行数据
boolean closed = false;
构造1
2
3
4
5
6
7
8private Reader(boolean skipPrevFile) throws IOException {
synchronized (LogFileHandler.this) {
numReaders++;
}
reader = null;
file = (skipPrevFile) ? curFile : prevFile;
readNext();
}
如上,skipPrevFile为false时,表示不跳过prev日志文件,即先读取prev文件,读完后读取curr文件。当skipPrevFile为false时,file成员为LogFileHandler中的preFile。
Reader通过readNext读取下一行到成员line中1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19private void readNext() throws IOException {
line = null;
try {
//创建了文件输入流,且能够读取下一行,读取下一行到line中返回
if (reader != null && (line = reader.readLine()) != null) {
return;
}
if (line == null) {//当前文件没有下一行或者文件还没创建输入流
//如果还没创建输入流,则创建文件输入流,否则当前文件没有剩余行,如果当前文件为prev,切换到curr文件
if (openFile()) {
readNext();//打开了文件流,读取下一行数据
}
}
} finally {
if (!hasNext()) {
close();
}
}
}
如上,如果打开了文件输入流,且能够读取下一行数据,则读取下一行数据到line中返回。否则如果没有读取到数据,则可能没有打开输入流或者当前文件输入流读取完,如果为prev文件,则可以打开curr文件继续读取,通过openFile完成1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19private boolean openFile() throws IOException {
for(int i=0; i<2; i++) {
if (reader != null || i > 0) {
file = (file == prevFile) ? curFile : null;
}
if (file == null) {
return false;
}
if (file.exists()) {
break;
}
}
if (reader != null ) {
reader.close();
reader = null;
}
reader = new BufferedReader(new FileReader(file));
return true;
}
读取到的行数据通过next获得,并读取下一行数据到line中1
2
3
4
5
6
7
8
9
10public String next() {
String curLine = line;
try {
readNext();
} catch (IOException e) {
LOG.info("Could not reade next line in LogHandler : " +
StringUtils.stringifyException(e));
}
return curLine;
}
回到assignInitialVerificationTimes中,创建Reader对象传入的参数为false,因此会先读取prev日志文件的内容,然后在读取curr日志文件的内容。
读取的每一行内容通过LogEntry的parseEntry进行解析,更新到区块扫描信息对象中
2.3.2.2 BlockScanInfo.LogEntry
成员如下1
2
3long blockId = -1;
long verificationTime = -1;
long genStamp = Block.GRANDFATHER_GENERATION_STAMP;
因为日志文件中每一行记录形式通过newEntry创建1
2
3
4
5
6static String newEnry(Block block, long time) {
return "date=\"" + dateFormat.format(new Date(time)) + "\"\t " +
"time=\"" + time + "\"\t " +
"genstamp=\"" + block.getGenerationStamp() + "\"\t " +
"id=\"" + block.getBlockId() +"\"";
}
如上四个key=value类型的键值对,其中time为该项创建时间即扫描时间。parseEntry解析一行,由正则表达式匹配键和值,并判断为id,time,genstamp然后更新到区块对应的BlockScanInfo中。限于篇幅这里不贴代码。
回到assignInitialVerificationTimes中,解析完prev和curr中以前扫描记录后通过updateBlockInfo更新区块扫描信息,重新插入到blockInfoSet和blockMap中。
最后,对于prev和curr没有记录的区块,即该区块以前没有扫描过,初始化其lastScanTime,重新插入到blockInfoSet和blockMap中。这样便读取本地日志记录,恢复了之前的扫描状态。
回到DataBlockScanner主线程中,如果blockInfoSet中的第一个区块即最长时间没有扫描的区块还没到达扫描时间,休眠1s,否则通过verifyFirstBlock扫描第一个区块,verifyFirstBlock从blockInfoSet中取出第一个BlockScanInfo对象,然后通过verifyBlock
扫描该区块。
verifyBlock通过构造BlockSender,并设置verifyChecksum为true,发送区块数据到NullOutputStream中,因为verifyChecksum为true,因此会进行校验和验证,失败时会抛出异常。尝试两次,防止暂时的错误。如果两次都失败,则该区块对应的BlockScanInfo的lastScanOk更新为false,表示扫描失败。扫描成功时更新BlockScanInfo的lastScanOk为true。verifyBlock完成(不管成功还是失败)都会更新BlockScanInfo的lastScanTime,以及lastScanType为VERIFICATION_SCAN。
BlockSender见DataNode流接口实现下篇中读数据块的分析。
最终将新的BlockScanInfo重新插入到blockInfoSet和blockMap中,并通过LogEntry.newEntry创建一个日志实例,使用DataBlockScanner的LogFileHandler对象追加到当前日志文件curr中,追加完后通过rollIfRequired判断当前文件超过行限制且达到最短roll时间时,重命名curr为prev,并使用新的curr文件记录日志。