DataNode实现源码分析---DataNode启动

Hadoop版本:Hadoop-1.2.1
参考:《Hadoop技术内幕-深入解析Hadoop Common和HDFS架构设计与实现原理》


当通过start-dfs.sh或者start-all.sh等方式启动DataNode时,最终会调用DataNode的main方法,这中间过程不再分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static void main(String args[]) {
secureMain(args, null);
}
public static void secureMain(String [] args, SecureResources resources) {
try {
StringUtils.startupShutdownMessage(DataNode.class, args, LOG);//启动日志,设置关闭日志
DataNode datanode = createDataNode(args, null, resources);//创建DataNode启动
if (datanode != null)//主线程中等待DataNode运行完
datanode.join();
} catch (Throwable e) {
LOG.error(StringUtils.stringifyException(e));
System.exit(-1);
} finally {
// We need to add System.exit here because either shutdown was called or
// some disk related conditions like volumes tolerated or volumes required
// condition was not met. Also, In secure mode, control will go to Jsvc and
// the process hangs without System.exit.
LOG.info("Exiting Datanode");
System.exit(0);
}
}
void join() {
if (dataNodeThread != null) {
try {
dataNodeThread.join();
} catch (InterruptedException e) {}
}
}

如上,首先会记录”STARTUP_MSG。。。”这样的启动日志信息,在DataNode的日志文件中可以找到,然后注册关闭日志”SHUTDOWN_MSG。。。”。通过createDataNode创建DataNode并启动,在main线程中等待datanode执行完。

1
2
3
4
5
public static DataNode createDataNode(String args[], Configuration conf, SecureResources resources) throws IOException {
DataNode dn = instantiateDataNode(args, conf, resources);
runDatanodeDaemon(dn);
return dn;
}

如上,通过instantiateDataNode实例化DataNode对象,然后通过runDatanodeDaemon向NameNode注册,并创建dataNodeThread线程启动。

1. 创建实例对象

1
2
3
4
5
6
7
8
public static DataNode instantiateDataNode(String args[], Configuration conf, SecureResources resources) throws IOException {
if (conf == null) conf = new Configuration();
...//参数检查
String[] dataDirs = conf.getStrings(DATA_DIR_KEY);//dfs.data.dir数据节点存储目录
dnThreadName = "DataNode: [" + StringUtils.arrayToString(dataDirs) + "]";//DataNode线程名
DefaultMetricsSystem.initialize("DataNode");
return makeInstance(dataDirs, conf, resources);
}

如上,读取dfs.data.dir配置项作为DataNode的本地存储目录,该目录可以有多个,以分隔,然后通过makeInstance创建DataNode对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static DataNode makeInstance(String[] dataDirs, Configuration conf, SecureResources resources) throws IOException {
UserGroupInformation.setConfiguration(conf);
LocalFileSystem localFS = FileSystem.getLocal(conf);//本地文件系统(含校验和)
ArrayList<File> dirs = new ArrayList<File>();
//dfs.datanode.data.dir.perm配置项,配置dfs.data.dir权限
FsPermission dataDirPermission = new FsPermission(conf.get(DATA_DIR_PERMISSION_KEY, DEFAULT_DATA_DIR_PERMISSION));
for (String dir : dataDirs) {
try {
DiskChecker.checkDir(localFS, new Path(dir), dataDirPermission);//检查目录是否有读/写权限,没有抛出异常,不会添加到dirs中
dirs.add(new File(dir));
} catch(IOException e) {
LOG.warn("Invalid directory in " + DATA_DIR_KEY + ": " +
e.getMessage());
}
}
if (dirs.size() > 0) //存在满足条件的本地存储目录,创建DataNode对象
return new DataNode(conf, dirs, resources);
LOG.error("All directories in " + DATA_DIR_KEY + " are invalid.");
return null;//没有满足条件的目录,不能创建DataNode,返回null
}

如上,dfs.data.dir目录对应的权限为配置项dfs.datanode.data.dir.perm,默认为755。要保证配置的项为目录且有读写权限,否则不能作为DataNode的本地存储目录,当没有满足条件的本地存储目录时,不能创建DataNode对象,返回null,否则通过DataNode的构造函数创建DataNode对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
DataNode(final Configuration conf, final AbstractList<File> dataDirs, SecureResources resources) throws IOException {
super(conf);
SecurityUtil.login(conf, DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY);
datanodeObject = this;
//这里为true,启动时若发现blocksBeingWritten目录下有文件,将这些区块文件添加到ongoingCreates和volumeMap中,否则删除
durableSync = conf.getBoolean("dfs.durable.sync", true);
this.userWithLocalPathAccess = conf .get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
try {
startDataNode(conf, dataDirs, resources);
} catch (IOException ie) {
shutdown();
throw ie;
}
}

如上,通过startDataNode初始化其他主要成员,这里关于安全登陆的不做分析。
在startDataNode中构造初始化了DataNode的主要成员,这里分析主要成员的初始化

1.1 配置相关成员

1
2
3
this.blockReportInterval = conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
this.initialBlockReportDelay = conf.getLong("dfs.blockreport.initialDelay", BLOCKREPORT_INITIAL_DELAY)* 1000L;
this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;

DataNode需要周期性的向NameNode报告区块信息,dfs.blockreport.intervalMsec定义了报告的周期,默认1个小时,dfs.blockreport.initialDelay用于初始化DataNode创建时的lastBlockReport时间,一般来说启动后立即向NameNode报告。
dfs.heart.interval定义了DataNode向NameNode发送心跳的周期,默认3s。

1.2 服务地址

1.2.1 机器名machineName

1
2
3
4
5
6
7
if (conf.get("slave.host.name") != null) {
machineName = conf.get("slave.host.name");
}
if (machineName == null) {
machineName = DNS.getDefaultHost( conf.get("dfs.datanode.dns.interface","default"),
conf.get("dfs.datanode.dns.nameserver","default"));
}

DataNode在集群中的机器名通过slave.host.name配置,如果没有通过DNS查询

1.2.2 NameNode地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
InetSocketAddress nameNodeAddr = NameNode.getServiceAddress(conf, true);
public static InetSocketAddress getServiceAddress(Configuration conf, boolean fallback) {
//通过dfs.namenode.servicerpc-address获取NameNode地址
String addr = conf.get(DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY);
if (addr == null || addr.isEmpty()) {//没有配置dfs.namenode.servicerpc-address或为空,使用dfs.namenode.rpc-address
return fallback ? getAddress(conf) : null;
}
return getAddress(addr);
}
public static InetSocketAddress getAddress(Configuration conf) {
String addr = conf.get(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);//旧的配置项为dfs.namenode.rpc-address
if (addr == null || addr.isEmpty()) {//如果旧的配置项也没配置或为空,使用缺省的即`fs.default.name`配置
return getAddress(FileSystem.getDefaultUri(conf).toString());
}
return getAddress(addr);
}
public static InetSocketAddress getAddress(String address) {//NameNode RPC端口缺省为8020
return NetUtils.createSocketAddr(address, DEFAULT_PORT);
}

如上,如果配置项dfs.namenode.servicerpc-address没有配置或为空,使用dfs.namenode.rpc-address作为NameNod地址,如果dfs.namenode.rpc-address配置项也没有配置或为空时,使用缺省的fs.default.name地址。
这里涉及到NameNode的RPC服务,NameNode根据配置可以提供两个RPC服务器。如果配置了dfs.namenode.servicerpc-address,则DataNode和SecondaryNameNode与NameNode的RPC通信使用该地址的RPC服务,而Client与NameNode的RPC通信使用dfs.namenode.rpc-address地址的RPC服务。如果没有配置dfs.namenode.servicerpc-address,则只有一个RPC服务器,Client,DataNode和SecondaryNameNode都使用dfs.namenode.rpc-address地址的RPC服务(没有配置的话,RPC服务地址为fs.default.name,端口为缺省值8020)。

1.2.3 本机流服务地址

1
2
3
4
5
6
7
8
InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);
int tmpPort = socAddr.getPort();

public static InetSocketAddress getStreamingAddr(Configuration conf) {
String address = NetUtils.getServerAddress(
conf, "dfs.datanode.bindAddress", "dfs.datanode.port", "dfs.datanode.address");
return NetUtils.createSocketAddr(address);
}

流服务地址现在使用dfs.datanode.address配置,包括端口,老的流服务器使用dfs.datanode.bindAddress配置地址,dfs.datanode.port配置端口。
NetUtils.getServerAddress不再分析,逻辑为当新的地址和端口没有配置,不管有没有配置旧的地址和端口都会抛出异常,而同时配置新的和旧的配置项时,会使用新的配置项,在日志中会提示旧的配置项已经不建议使用。
dfs.datanode.addresshdfs-default.xml中配置为0.0.0.0:50010

1.3 流服务相关

1.3.1 配置选项

1
2
3
4
this.socketTimeout =  conf.getInt("dfs.socket.timeout", HdfsConstants.READ_TIMEOUT);//默认1min
this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout", HdfsConstants.WRITE_TIMEOUT);//默认6min
this.transferToAllowed = conf.getBoolean("dfs.datanode.transferTo.allowed", true);//不同平台可能设置不同的值
this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);//默认64KB
  • socketTimeout,流服务中建立Socket连接时的连接超时和Socket读超时的超时时间,默认1min
  • socketWriteTimeout,流服务中Socket写超时时间,默认6min
  • transferToAllowed,是否支持”零拷贝传输”,在不同平台上应该设置不同的值,关于”零拷贝传输”在后面数据传输中具体分析
  • writePacketSize,为数据传输过程中写数据的包大小,参考HDFS流接口中的写数据包响应

1.3.2 Socket

1
2
3
4
5
6
7
8
9
ServerSocket ss;
if(secureResources == null) {
ss = (socketWriteTimeout > 0) ?
ServerSocketChannel.open().socket() : new ServerSocket();
Server.bind(ss, socAddr, 0);
} else {
ss = resources.getStreamingSocket();
}
ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);

创建Socket,绑定流服务地址和端口

1.3.3 服务线程DataXceiverServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
this.threadGroup = new ThreadGroup("dataXceiverServer");
this.dataXceiverServer = new Daemon(threadGroup, new DataXceiverServer(ss, conf, this));
this.threadGroup.setDaemon(true); // auto destroy when empty

DataXceiverServer(ServerSocket ss, Configuration conf, DataNode datanode) {
this.ss = ss;
this.datanode = datanode;
//最大服务线程数为256
this.maxXceiverCount = conf.getInt("dfs.datanode.max.xcievers", MAX_XCEIVER_COUNT);
//预估区块大小为64MB
this.estimateBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
//创建节流器
this.balanceThrottler = new BlockBalanceThrottler(conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024));
}

DataNode的流服务由DataXceiverServer提供,其管理的Socket ss连接到来的请求,accept后创建DataXceiver线程进行相应的读写操作,创建的线程不能超过maxXceiverCount

1.4 NameNode RPC代理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
this.namenode = (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class, DatanodeProtocol.versionID, nameNodeAddr, conf);
public static VersionedProtocol waitForProxy(Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException
{

return waitForProxy(protocol, clientVersion, addr, conf, 0, Long.MAX_VALUE);
}
static VersionedProtocol waitForProxy(Class<? extends VersionedProtocol> protocol, long clientVersion,
InetSocketAddress addr, Configuration conf, int rpcTimeout, long connTimeout) throws IOException
{

long startTime = System.currentTimeMillis();
IOException ioe;
while (true) {
try {
return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);//连接到NameNode
} catch(ConnectException se) { //NameNode还没启动
LOG.info("Server at " + addr + " not available yet, Zzzzz...");
ioe = se;
} catch(SocketTimeoutException te) {//NameNode现在忙
LOG.info("Problem connecting to server: " + addr);
ioe = te;
}
//还没到超时时间,等待1s,继续尝试连接
if (System.currentTimeMillis()-connTimeout >= startTime) {
throw ioe;
}
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
}

如上,最终通过RPC.getProxy获取代理对象,默认情况下创建动态代理后,会首次通过checkVersion进行首次通信,验证版本号,因此会连接NameNode。NameNode没有启动或者正在忙的时候会超时,没有超过设置的超时时间(默认值Long.MAX_VALUE)等待1s继续尝试获取代理。
RPC.getProxy另见RPC源码分析上篇

1.5 对外提供的RPC服务

1
2
3
4
InetSocketAddress ipcAddr = NetUtils.createSocketAddr(conf.get("dfs.datanode.ipc.address"));
ipcServer = RPC.getServer(this, ipcAddr.getHostName(), ipcAddr.getPort(),
conf.getInt("dfs.datanode.handler.count", 3), false, conf, blockTokenSecretManager);
dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());

如上,通过配置项dfs.datanode.ipc.address配置DataNode对外(DataNode对应InterDatanodeProtocol,Client对应ClientDatanodeProtocol)提供的RPC服务地址,然后通过RPC.getServer创建RPC服务器,服务器的处理器线程数为3个,关于RPC.getServer另见RPC源码分析上篇
此外,更新dnRegistration中的ipc地址和端口信息(dnRegistration包含DataNode向NameNode通信时所有需要的注册信息,另见HDFS相关实体对象),创建过程见下面,这里没有按照DataNode构造方法中的顺序来。

1.6 本地存储相关

DataNode本地存储管理中分析了DataNode的本地存储由DataStorage和FSDataset负责
创建DataStorage

1
storage = new DataStorage();

通过机器名和流服务地址端口创建DatanodeRegistration对象,其他成员如layoutVersion等还为空

1
this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);

然后从NameNode中获取集群的信息

1
2
NamespaceInfo nsInfo = handshake();
StartupOption startOpt = getStartupOption(conf);

如上,handshake通过NameNode代理通过versionRequest询问NameNode版本信息

1
2
3
4
5
6
7
8
9
boolean simulatedFSDataset = conf.getBoolean("dfs.datanode.simulateddatastorage", false);
if (simulatedFSDataset) {
...//伪分布式处理
}
else { // real storage
storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
this.dnRegistration.setStorageInfo(storage);
this.data = new FSDataset(storage, conf);//创建FSDataset对象,读取所有存储目录下的文件初始化成员volumeMap
}

如上,创建DataStorage对象,从NameNode获取命名空间信息后,分析本地目录,根据启动方式和集群版本做相应的处理,dfs.datanode.simulateddatastorage为true时这里不分析,主要用于调试,性能评估。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
void recoverTransitionRead(NamespaceInfo nsInfo, Collection<File> dataDirs, StartupOption startOpt) throws IOException {
assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() : "Data-node and name-node layout versions must be the same.";

// 1. For each data directory calculate its state and
// check whether all is consistent before transitioning.
// Format and recover.
this.storageID = "";
this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(dataDirs.size());
for(Iterator<File> it = dataDirs.iterator(); it.hasNext();) {//遍历所有配置的目录
File dataDir = it.next();
StorageDirectory sd = new StorageDirectory(dataDir);//创建StorageDirectory
StorageState curState;
try {
curState = sd.analyzeStorage(startOpt);//分析存储目录的状态
// sd is locked but not opened
switch(curState) {
case NORMAL:
break;
case NON_EXISTENT://NON_EXISTENT,跳过不添加到DataStorage的storageDirs中
LOG.info("Storage directory " + dataDir + " does not exist");
it.remove();
continue;
case NOT_FORMATTED: //NOT_FORMATTED,格式化,删除原来current目录,创建新的current目录,根据nsInfo写VERSION文件
LOG.info("Storage directory " + dataDir + " is not formatted");
LOG.info("Formatting ...");
format(sd, nsInfo);
break;
default: //中间状态,恢复到NORMAL
sd.doRecover(curState);
}
} catch (IOException ioe) {
sd.unlock();
throw ioe;
}
// add to the storage list
addStorageDir(sd);
dataDirStates.add(curState);
}

if (dataDirs.size() == 0) // none of the data dirs exist
throw new IOException( "All specified directories are not accessible or do not exist.");

// 2. Do transitions
// Each storage directory is treated individually.
// During sturtup some of them can upgrade or rollback
// while others could be uptodate for the regular startup.
for(int idx = 0; idx < getNumStorageDirs(); idx++) {
doTransition(getStorageDir(idx), nsInfo, startOpt);
assert this.getLayoutVersion() == nsInfo.getLayoutVersion() :
"Data-node and name-node layout versions must be the same.";
assert this.getCTime() == nsInfo.getCTime() :
"Data-node and name-node CTimes must be the same.";
}

// 3. Update all storages. Some of them might have just been formatted.
this.writeAll();
}

private void doTransition( StorageDirectory sd, NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
if (startOpt == StartupOption.ROLLBACK)
doRollback(sd, nsInfo); //回滚方式启动,回滚
sd.read();//重新读取版本文件
checkVersionUpgradable(this.layoutVersion);//检查版本,不能为LAST_UPGRADABLE_LAYOUT_VERSION(-7)之前的版本
assert this.layoutVersion >= FSConstants.LAYOUT_VERSION : "Future version is not allowed";
if (getNamespaceID() != nsInfo.getNamespaceID())
throw new IOException( "Incompatible namespaceIDs in " + sd.getRoot().getCanonicalPath()
+ ": namenode namespaceID = " + nsInfo.getNamespaceID()
+ "; datanode namespaceID = " + getNamespaceID());
if (this.layoutVersion == FSConstants.LAYOUT_VERSION && this.cTime == nsInfo.getCTime())
return; //版本与集群版本一致
verifyDistributedUpgradeProgress(nsInfo);
if (this.layoutVersion > FSConstants.LAYOUT_VERSION || this.cTime < nsInfo.getCTime()) {
doUpgrade(sd, nsInfo); //当前版本小于集群版本,更新
return;
}
// layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
// must shutdown
throw new IOException("Datanode state: LV = " + this.getLayoutVersion()
+ " CTime = " + this.getCTime() + " is newer than the namespace state: LV = "
+ nsInfo.getLayoutVersion() + " CTime = " + nsInfo.getCTime());
}

如上,对每一个配置的目录,首先检查当前状态,如果为NON_EXISTENT不会添加到DataStorage的管理目录内。否则如果为NOT_FORMATTED,则格式化后添加到DataStorage的storageDirs中,而如果为NORMAL,则直接添加到storageDirs中,处于其他中间状态的,通过doRecover恢复到NORMAL状态,添加到storageDirs中。
通过analyzeStorage获取当前目录状态,通过format格式化,通过doRecover恢复这些方法见DataNode本地存储管理

recoverTransitionRead之后,DataNode的本地存储目录处于正常状态,构建FSDataset对象,FSDataset构造见DataNode本地存储管理,会对每一个存储目录构造相应的FSVolume对象,然后读取本地存储目录下的区块文件初始化volumeMap成员。
至此,本地存储相关的DataStorage和FSDataset对象构建和初始化完成了,DataStorage管理的存储目录在storageDirs中,而FSDataset的volumes包含了所有的FSVolume对象,volumeMap包含了所有现存的区块信息。

1.7 HTTP服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
String infoHost = infoSocAddr.getHostName();
int tmpInfoPort = infoSocAddr.getPort();
this.infoServer = (secureResources == null)
? new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,
conf, SecurityUtil.getAdminAcls(conf, DFSConfigKeys.DFS_ADMIN))
: new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,
conf, SecurityUtil.getAdminAcls(conf, DFSConfigKeys.DFS_ADMIN),
secureResources.getListener());
...//https相关
this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
this.infoServer.addInternalServlet(null, "/getFileChecksum/*", FileChecksumServlets.GetServlet.class);

this.infoServer.setAttribute("datanode", this);
this.infoServer.setAttribute("datanode.blockScanner", blockScanner);
this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
this.infoServer.addServlet(null, "/blockScannerReport", DataBlockScanner.Servlet.class);
this.infoServer.start();
this.dnRegistration.setInfoPort(this.infoServer.getPort());

如上,getInfoAddr读取配置dfs.datanode.http.address作为HTTP服务器的地址和端口,创建HttpServer,addInternalServlet已经声明为Deprecated,设置属性datanodedatanode.blockScannercurrent.conf可以直接在jsp文件中或Servlet类中使用,添加一个Servlet,路径为/blockScannerReport,对应的类为DataBlockScanner.Servlet
HTTP服务器在创建DataNode时启动,其他服务都在DataNode线程中启动,此外更新dnRegistration中关于info地址和端口信息。

1.8 其他

1.8.1 区块扫描器

1
2
3
4
5
6
7
8
9
10
11
blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);
DataBlockScanner(DataNode datanode, FSDataset dataset, Configuration conf) {
this.datanode = datanode;
this.dataset = dataset;
scanPeriod = conf.getInt("dfs.datanode.scan.period.hours", 0);
if ( scanPeriod <= 0 ) {
scanPeriod = DEFAULT_SCAN_PERIOD_HOURS;
}
scanPeriod *= 3600 * 1000;
// initialized when the scanner thread is started.
}

区块扫描器所属DataNode为当前DataNode,管理的数据集为DataNode的FSDataset,扫描周期为dfs.datanode.scan.period.hours,默认情况下3个小时。

1.8.2 度量统计

1
myMetrics = DataNodeInstrumentation.create(conf, dnRegistration.getStorageID());

1.8.3 区块口令安全管理器

1
2
3
// BlockTokenSecretManager is created here, but it shouldn't be
// used until it is initialized in register().
this.blockTokenSecretManager = new BlockTokenSecretManager(false, 0, 0);

1.8.4 插件

1
2
3
4
5
6
7
8
9
plugins = conf.getInstances("dfs.datanode.plugins", ServicePlugin.class);
for (ServicePlugin p: plugins) {
try {
p.start(this);
LOG.info("Started plug-in " + p);
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be started", t);
}
}

总结DataNode中的相关资源实例

  • NameNode RPC代理
  • 对外提供RPC服务的RPC服务器(ClientDatanodeProtocol,InterDatanodeProtocol)
  • 流服务请求处理线程DataXceiverServer和若干处理请求的DataXceiver线程
  • HTTP服务器HttpServer
  • 区块扫描器BlockScanner,默认3周扫描一次
  • 管理本地存储的DataStorageFSDataset,其中FSDataset中有用于异步区块删除的线程FSDatasetAsyncDiskService和用于异步区块扫描(扫描本地区块用于区块报告)的线程AsyncBlockReport
  • 度量统计DataNodeInstrumentation
  • 安全相关的BlockTokenSecretManager
  • 可能的若干插件ServicePlugin线程

2. DataNode启动

1
2
3
4
5
6
7
8
9
public static void runDatanodeDaemon(DataNode dn) throws IOException {
if (dn != null) {
//register datanode
dn.register();
dn.dataNodeThread = new Thread(dn, dnThreadName);
dn.dataNodeThread.setDaemon(true); // needed for JUnit testing
dn.dataNodeThread.start();
}
}

2.1 注册

如上,启动DataNode线程前先向NameNode注册,从NameNode中获取存储信息,报告当前区块信息等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void register() throws IOException {
if (dnRegistration.getStorageID().equals("")) {
setNewStorageID(dnRegistration);
}
while(shouldRun) {//注册超时时,等待1s中继续注册
try {
dnRegistration.name = machineName + ":" + dnRegistration.getPort();
dnRegistration = namenode.register(dnRegistration);
break;
} catch(SocketTimeoutException e) { // namenode is busy
LOG.info("Problem connecting to server: " + getNameNodeAddr());
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {}
}
}
if (storage.getStorageID().equals("")) {//当前storageID没设置,通过从NameNode中获取的信息设置
storage.setStorageID(dnRegistration.getStorageID());
storage.writeAll();//写到VERSION文件
}
if(!storage.getStorageID().equals(dnRegistration.getStorageID())) {
throw new IOException("Inconsistent storage IDs. Name-node returned "
+ dnRegistration.getStorageID() + ". Expecting " + storage.getStorageID());
}
...//安全相关

if (durableSync) {//不删除blocksBeingWritten目录中的区块,则需要向NameNode报告这些区块信息
Block[] bbwReport = data.getBlocksBeingWrittenReport();
long[] blocksBeingWritten = BlockListAsLongs.convertToArrayLongs(bbwReport);
namenode.blocksBeingWrittenReport(dnRegistration, blocksBeingWritten);
}

data.requestAsyncBlockReport();
scheduleBlockReport(initialBlockReportDelay);
}

如上,通过NameNode代理register向NameNode注册并返回集群的版本信息,更新DataNode中的版本信息。如果blocksBeingWritten目录下存在区块,且允许不删除这些区块(这些区块会在创建FSDataset对象时恢复到ongoingCreates和volumeMap中),需要向NameNode报告这些区块信息。然后进行区块报告相关设置。

2.2 启动DataNode线程

注册后,创建DataNode线程启动,线程主流程如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void run() {
LOG.info(dnRegistration + "In DataNode.run, data = " + data);
dataXceiverServer.start();//启动流服务
ipcServer.start();//启动RPC服务器

while (shouldRun) {
try {
startDistributedUpgradeIfNeeded();
offerService();//主服务
} catch (Exception ex) {
LOG.error("Exception: " + StringUtils.stringifyException(ex));
if (shouldRun) {
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
}
}
}
}
//通过shutdown退出
LOG.info(dnRegistration + ":Finishing DataNode in: "+data);
shutdown();
}

如上,启动流服务,RPC服务(HTTP服务,插件等在DataNode构造中已经启动,区块扫描器线程在DataNode线程中启动),DataNode运行时通过offerService提供主服务,退出时通过shutdown清理相关资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
public void offerService() throws Exception {
while (shouldRun) {
try {
long startTime = now();
if (startTime - lastHeartbeat > heartBeatInterval) {//到达心跳间隔,发送心跳信息
//心跳信息包括,DataNode注册信息,数据传输端口,当前总容量,剩余容量
lastHeartbeat = startTime;
DatanodeCommand[] cmds = namenode.sendHeartbeat(
dnRegistration, data.getCapacity(), data.getDfsUsed(),
data.getRemaining(), xmitsInProgress.get(), getXceiverCount());
myMetrics.addHeartBeat(now() - startTime);
if (!processCommand(cmds))//处理NameNode发送过来的命令
continue;
}

// check if there are newly received blocks
Block [] blockArray=null;
String [] delHintArray=null;
synchronized(receivedBlockList) {
synchronized(delHints) {
int numBlocks = receivedBlockList.size();
if (numBlocks > 0) {
if(numBlocks!=delHints.size()) {
LOG.warn("Panic: receiveBlockList and delHints are not of the same length" );
}
blockArray = receivedBlockList.toArray(new Block[numBlocks]);//新接收到的区块
delHintArray = delHints.toArray(new String[numBlocks]);//这些节点上的对应区块将被删除
}
}
}
if (blockArray != null) {
if(delHintArray == null || delHintArray.length != blockArray.length ) {
LOG.warn("Panic: block array & delHintArray are not the same" );
}
//向NameNode报告新接收到的区块信息,delHintArray如果不为空则节点上对应的区块将被删除
namenode.blockReceived(dnRegistration, blockArray, delHintArray);
synchronized (receivedBlockList) {
synchronized (delHints) {
for(int i=0; i<blockArray.length; i++) {
receivedBlockList.remove(blockArray[i]);
delHints.remove(delHintArray[i]);
}
}
}
}

// Send latest blockinfo report if timer has expired.
if (startTime - lastBlockReport > blockReportInterval) {
if (data.isAsyncBlockReportReady()) {//异步区块扫描器已经扫描完,scan中保存了扫描结果,发送扫描结果区块报告
long brCreateStartTime = now();
Block[] bReport = data.retrieveAsyncBlockReport();//获取扫描结果

long brSendStartTime = now();
//发送扫描结果,区块报告
DatanodeCommand cmd = namenode.blockReport(dnRegistration, BlockListAsLongs.convertToArrayLongs(bReport));
long brSendCost = now() - brSendStartTime;
long brCreateCost = brSendStartTime - brCreateStartTime;
myMetrics.addBlockReport(brSendCost);
//
// If we have sent the first block report, then wait a random
// time before we start the periodic block reports.
//
if (resetBlockReportTime) {
lastBlockReport = startTime - R.nextInt((int)(blockReportInterval));
resetBlockReportTime = false;
} else {
/* say the last block report was at 8:20:14. The current report
* should have started around 9:20:14 (default 1 hour interval).
* If current time is :
* 1) normal like 9:20:18, next report should be at 10:20:14
* 2) unexpected like 11:35:43, next report should be at
* 12:20:14
*/

lastBlockReport += (now() - lastBlockReport) / blockReportInterval * blockReportInterval;
}
processCommand(cmd);//处理区块报告时,NameNode发送过来的命令
} else {
data.requestAsyncBlockReport();//如果区块扫描器没有扫描结果,则请求区块扫描器开始扫描
if (lastBlockReport > 0) { // this isn't the first report
...//记录日志
}
}
}

//启动区块扫描器
if (blockScanner != null && blockScannerThread == null &&
upgradeManager.isUpgradeCompleted()) {
LOG.info("Starting Periodic block scanner");
blockScannerThread = new Daemon(blockScanner);
blockScannerThread.start();
}

//
// There is no work to do; sleep until hearbeat timer elapses,
// or work arrives, and then iterate again.
//
long waitTime = heartBeatInterval - (System.currentTimeMillis() - lastHeartbeat);
synchronized(receivedBlockList) {//没有新的可发送的区块,且还没到达心跳周期,等待
if (waitTime > 0 && receivedBlockList.size() == 0) {
try {
receivedBlockList.wait(waitTime);
} catch (InterruptedException ie) {
}
delayBeforeBlockReceived();
}
} // synchronized
} catch(RemoteException re) {
String reClass = re.getClassName();
if (UnregisteredDatanodeException.class.getName().equals(reClass) ||
DisallowedDatanodeException.class.getName().equals(reClass) ||
IncorrectVersionException.class.getName().equals(reClass)) {
LOG.warn("DataNode is shutting down: " +
StringUtils.stringifyException(re));
shutdown();
return;
}
LOG.warn(StringUtils.stringifyException(re));
} catch (IOException e) {
LOG.warn(StringUtils.stringifyException(e));
}
} // while (shouldRun)
} // offerService

如上,在主服务线程中,如果到达心跳周期,通过NameNode代理sendHeartbeat向NameNode发送心跳,包括DataNode的注册信息,数据传输端口,总容量,剩余容量,然后使用processCommand处理返回值中NameNode的命令。

如果接收到了新的区块,通过NameNode代理blockReceived向NameNode报告,其中的delHintArray参数表示对应的区块在这些节点上应该删除。

如果到达区块报告周期,查找异步区块扫描线程AsyncBlockReport扫描结果,如果扫描完成,通过NameNode代理blockReport进行区块报告,并通过processCommand处理NameNode下达的命令。而如果没有扫描结果,则通知AsyncBlockReport开始扫描本地区块。

然后如果区块扫描器没有启动,且更新管理器当前没有在进行更新操作,则启动区块扫描器

以上事情处理完后,如果没有到达下一次心跳周期处理过程中没有接收到新的区块,则等待直到下一次心跳时间。
processCommand对NameNode命令的处理涉及到数据的读写等操作,见后面具体分析。

2.3 区块扫描器线程

区块扫描器线程主程序如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void run() {
try {
init();//初始化,获取所有的区块,记录用的日志文件
//读取存储在本地的日志文件,更新区块扫描记录
if (!assignInitialVerificationTimes()) {
return;
}
adjustThrottler();//根据当期需要扫描的数据量设置扫描节流器,控制扫描频率
while (datanode.shouldRun && !Thread.interrupted()) {//主循环
long now = System.currentTimeMillis();
synchronized (this) {
if ( now >= (currentPeriodStart + scanPeriod)) {
startNewPeriod();
}
}
if ( (now - getEarliestScanTime()) >= scanPeriod ) {//如果最长时间没有扫描的区块应该扫描了,则扫描该区块
verifyFirstBlock();
} else {
try {
Thread.sleep(1000);//最长时间没有扫描的区块都不需要扫描,休眠1s
} catch (InterruptedException ignored) {}
}
}
...//异常处理和最终关闭区块扫描器
}

如上,首先通过init初始化,会读取当前的区块,记录在成员blockInfoSetblockMap中,并读取日志文件

2.3.1 初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void init() {
Block arr[] = dataset.getBlockReport();//通过FSDataset获取所有的区块
Collections.shuffle(Arrays.asList(arr));//打乱顺序
blockInfoSet = new TreeSet<BlockScanInfo>();//创建blockInfoSet对象,存储的为BlockScanInfo
blockMap = new HashMap<Block, BlockScanInfo>();//创建blockMap对象

long scanTime = -1;
for (Block block : arr) {//每个Block创建BlockScanInfo对象,扫描时间从-1开始递减,负数没有扫描
BlockScanInfo info = new BlockScanInfo(block);
info.lastScanTime = scanTime--;
addBlockInfo(info);//添加到blockInfoSet和blockMap中,blockInfoSet中lastScanTime小的排在前面
}
//找到第一个包含扫描日志文件的存储目录,如果都没有则在第一个存储目录中创建扫描日志文件
//日志文件为`dncp_block_verification.log.curr`或`dncp_block_verification.log.prev`
File dir = null;
FSDataset.FSVolume[] volumes = dataset.volumes.volumes;
for(FSDataset.FSVolume vol : volumes) {
if (LogFileHandler.isFilePresent(vol.getDir(), verificationLogFile)) {
dir = vol.getDir();
break;
}
}
if (dir == null) {
dir = volumes[0].getDir();
}
try {
//创建LogFileHandler对象,包含cur日志文件,prev日志文件,并打开cur文件的输出流
verificationLog = new LogFileHandler(dir, verificationLogFile, 100);
} catch (IOException e) {
LOG.warn("Could not open verfication log. " + "Verification times are not stored.");
}
synchronized (this) {
throttler = new DataTransferThrottler(200, MAX_SCAN_RATE);//创建扫描区块用的节流器,周期为200ms,带宽为8MB/s
}
}

如上,初始化时通过FSDataset读取所有的区块,每一个区块创建BlockScanInfo对象,添加到blockMap和blockInfoSet中。然后根据本地的日志文件创建LogFileHandler对象。

2.3.1.1 BlockScanInfo

BlockScanInfo记录了区块的扫描信息,成员如下

1
2
3
4
5
Block block;//区块
long lastScanTime = 0;//上次扫描时间
long lastLogTime = 0;//上次日志记录时间
ScanType lastScanType = ScanType.NONE;//扫描类型
boolean lastScanOk = true;//扫描结果,true表示区块没有错误

其中ScanType为枚举类型

1
2
3
4
5
private static enum ScanType {
REMOTE_READ,
VERIFICATION_SCAN,
NONE,
}

  • REMOTE_READ,Client读取数据时会进行验证,当读取整个区块时,Client会发送验证结果给DataNode,如果验证无误,则标识为REMOTE_READ,即由Client验证过
  • VERIFICATION_SCAN,区块扫描器周期性扫描
  • NONE,还未扫描过

回到init方法,每个区块创建BlockScanInfo对象,lastScanTime为负数,依次递增,此时扫描状态为NONE。通过addBlockInfo添加到成员blockInfoSet和blockMap中
添加到blockInfoSet中时,因为blockInfoSet是TreeSet,通过BlockScanInfo的compareTo方法比较

1
2
3
4
5
public int compareTo(BlockScanInfo other) {
long t1 = lastScanTime;
long t2 = other.lastScanTime;
return ( t1 < t2 ) ? -1 : (( t1 > t2 ) ? 1 : block.compareTo(other.block));
}

因此,lastScanTime小的在前面,即最长时间没有扫描的排在前面,在后面扫描时会先扫描。

初始化所有的Block后,会在所有存储目录下找是否存在dncp_block_verification.log.currdncp_block_verification.log.prev文件,找到了则使用该存储目录存储日志,否则使用第一个存储目录创建日志文件存储日志。根据找到的存储目录和日志文件创建LogFileHandler对象

2.3.1.2 LogFileHandler

LogFileHandler负责日志文件相关处理,成员如下

1
2
3
4
5
6
7
private File curFile;//当前日志文件
private File prevFile;//上一次使用的日志文件
private int maxNumLines = -1;
private int curNumLines = -1;//当前日志文件最大行
long lastWarningTime = 0;
private PrintStream out;//当前日志文件输出流
int numReaders = 0;

如上,日志记录包含两个文件,.curr.prev,如果curNumLines大于maxNumLines且到了可以回滚的时间,则将curr文件重命名为prev文件,创建新的curr日志文件,逻辑在rollIfRequired中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void rollIfRequired() throws IOException {
if (curNumLines < maxNumLines || numReaders > 0) {
return;
}
long now = System.currentTimeMillis();
if (now < minRollingPeriod) {
return;
}
if (!prevFile.delete() && prevFile.exists()) {//删除prev文件
throw new IOException("Could not delete " + prevFile);
}
close();//关闭当前文件输出流out
if (!curFile.renameTo(prevFile)) {cur文件重命名为prev文件
openCurFile();//重命名失败,重新打开curr文件
throw new IOException("Could not rename " + curFile + " to " + prevFile);
}
openCurFile();//打开新的curr文件
updateCurNumLines();//重新读取curr文件,更新curNumLines
}

如上,minRollingPeriod为6 * 3600 * 1000L即6个小时。上面的close方法会关闭当前curr文件的输出流out,而openCurFile会重新打开curr文件的输出流构造out,在将curr文件重命名为prev文件后,重新打开curr文件为空文件。然后更新curr文件的行数curNumLines。

成员numReaders表示当前读取日志文件的Reader数,Reader为LogFileHandler的内部类,后面分析。

因此,回到init方法中构造LogFileHandler对象

1
2
3
4
5
6
7
LogFileHandler(File dir, String filePrefix, int maxNumLines) throws IOException {
curFile = new File(dir, filePrefix + curFileSuffix);
prevFile = new File(dir, filePrefix + prevFileSuffix);
openCurFile();
curNumLines = -1;
setMaxNumLines(maxNumLines);
}

如上,初始化curNumLines为-1,最大行数为100。

在init方法的最后构造区块扫描器使用的DataTransferThrottler,周期为200ms,带宽为8MB/s,用于在控制扫描区块频率,因为扫描区块时通过BlockSender,控制BlockSender的带宽即控制扫描频率。

2.3.2 读取之前扫描结果

DataBlockScanner线程中,初始化后通过assignInitialVerificationTimes读取本地日志文件信息,即之前扫描结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private boolean assignInitialVerificationTimes() {
...
LogFileHandler.Reader logReader = null;
try {
if (verificationLog != null) {
logReader = verificationLog.new Reader(false);//创建Reader对日志文件进行读取
}
}...//异常处理
if (verificationLog != null) {//更新curr日志文件行数,创建Reader读取,参数为true
verificationLog.updateCurNumLines();
}
try {
while (logReader != null && logReader.hasNext()) {//读取每一行,更新区块扫描信息
if (!datanode.shouldRun || Thread.interrupted()) {
return false;
}
LogEntry entry = LogEntry.parseEntry(logReader.next());
if (entry != null) {
updateBlockInfo(entry);
}
}
} finally {
IOUtils.closeStream(logReader);
}
...//初始化之前没有扫描的区块lastScanTime
return true;
}

如上,创建Reader进行日志记录的读取

2.3.2.1 LogFileHandler.Reader

成员如下

1
2
3
4
BufferedReader reader;//FileReader输入流
File file;//读取的文件
String line;//下一行数据
boolean closed = false;

构造

1
2
3
4
5
6
7
8
private Reader(boolean skipPrevFile) throws IOException {
synchronized (LogFileHandler.this) {
numReaders++;
}
reader = null;
file = (skipPrevFile) ? curFile : prevFile;
readNext();
}

如上,skipPrevFile为false时,表示不跳过prev日志文件,即先读取prev文件,读完后读取curr文件。当skipPrevFile为false时,file成员为LogFileHandler中的preFile。
Reader通过readNext读取下一行到成员line中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void readNext() throws IOException {
line = null;
try {
//创建了文件输入流,且能够读取下一行,读取下一行到line中返回
if (reader != null && (line = reader.readLine()) != null) {
return;
}
if (line == null) {//当前文件没有下一行或者文件还没创建输入流
//如果还没创建输入流,则创建文件输入流,否则当前文件没有剩余行,如果当前文件为prev,切换到curr文件
if (openFile()) {
readNext();//打开了文件流,读取下一行数据
}
}
} finally {
if (!hasNext()) {
close();
}
}
}

如上,如果打开了文件输入流,且能够读取下一行数据,则读取下一行数据到line中返回。否则如果没有读取到数据,则可能没有打开输入流或者当前文件输入流读取完,如果为prev文件,则可以打开curr文件继续读取,通过openFile完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private boolean openFile() throws IOException {
for(int i=0; i<2; i++) {
if (reader != null || i > 0) {
file = (file == prevFile) ? curFile : null;
}
if (file == null) {
return false;
}
if (file.exists()) {
break;
}
}
if (reader != null ) {
reader.close();
reader = null;
}
reader = new BufferedReader(new FileReader(file));
return true;
}

读取到的行数据通过next获得,并读取下一行数据到line中

1
2
3
4
5
6
7
8
9
10
public String next() {
String curLine = line;
try {
readNext();
} catch (IOException e) {
LOG.info("Could not reade next line in LogHandler : " +
StringUtils.stringifyException(e));
}
return curLine;
}

回到assignInitialVerificationTimes中,创建Reader对象传入的参数为false,因此会先读取prev日志文件的内容,然后在读取curr日志文件的内容。
读取的每一行内容通过LogEntry的parseEntry进行解析,更新到区块扫描信息对象中

2.3.2.2 BlockScanInfo.LogEntry

成员如下

1
2
3
long blockId = -1;
long verificationTime = -1;
long genStamp = Block.GRANDFATHER_GENERATION_STAMP;

因为日志文件中每一行记录形式通过newEntry创建

1
2
3
4
5
6
static String newEnry(Block block, long time) {
return "date=\"" + dateFormat.format(new Date(time)) + "\"\t " +
"time=\"" + time + "\"\t " +
"genstamp=\"" + block.getGenerationStamp() + "\"\t " +
"id=\"" + block.getBlockId() +"\"";
}

如上四个key=value类型的键值对,其中time为该项创建时间即扫描时间。parseEntry解析一行,由正则表达式匹配键和值,并判断为id,time,genstamp然后更新到区块对应的BlockScanInfo中。限于篇幅这里不贴代码。

回到assignInitialVerificationTimes中,解析完prev和curr中以前扫描记录后通过updateBlockInfo更新区块扫描信息,重新插入到blockInfoSet和blockMap中。
最后,对于prev和curr没有记录的区块,即该区块以前没有扫描过,初始化其lastScanTime,重新插入到blockInfoSet和blockMap中。这样便读取本地日志记录,恢复了之前的扫描状态。

回到DataBlockScanner主线程中,如果blockInfoSet中的第一个区块即最长时间没有扫描的区块还没到达扫描时间,休眠1s,否则通过verifyFirstBlock扫描第一个区块,verifyFirstBlock从blockInfoSet中取出第一个BlockScanInfo对象,然后通过verifyBlock扫描该区块。

verifyBlock通过构造BlockSender,并设置verifyChecksum为true,发送区块数据到NullOutputStream中,因为verifyChecksum为true,因此会进行校验和验证,失败时会抛出异常。尝试两次,防止暂时的错误。如果两次都失败,则该区块对应的BlockScanInfo的lastScanOk更新为false,表示扫描失败。扫描成功时更新BlockScanInfo的lastScanOk为true。verifyBlock完成(不管成功还是失败)都会更新BlockScanInfo的lastScanTime,以及lastScanType为VERIFICATION_SCAN。
BlockSender见DataNode流接口实现下篇中读数据块的分析。

最终将新的BlockScanInfo重新插入到blockInfoSet和blockMap中,并通过LogEntry.newEntry创建一个日志实例,使用DataBlockScanner的LogFileHandler对象追加到当前日志文件curr中,追加完后通过rollIfRequired判断当前文件超过行限制且达到最短roll时间时,重命名curr为prev,并使用新的curr文件记录日志。