HDFS相关实体对象

Hadoop版本:Hadoop-1.2.1
参考:《Hadoop技术内幕-深入解析Hadoop Common和HDFS架构设计与实现原理》


本文介绍HDFS中用到的常见实体类。

1. Block

Block即HDFS中的一个区块,区块保存在本地文件中的文件名为”blk_${blockId}”,这里blockId为Block对象的成员属性blockId
Block实现了Writable和Comparable接口,静态代码段中注册到WritableFactories中

1
2
3
4
5
6
static {                                      // register a ctor
WritableFactories.setFactory
(Block.class, new WritableFactory() {
public Writable newInstance() { return new Block(); }
});
}

Block的成员属性如下:
Block成员属性

  • GRANDFATHER_GENERATION_STAMP,静态成员,为下一个数据块的时间戳
  • blockId,数据块唯一标识,数据块的ID,本地文件名为blk_blockId;
  • numBytes,数据块包含数据的大小;
  • generationStamp,数据块时间戳,每次对数据块改动后,都会修改;

一个区块在本地文件中对应一个文件,文件名为blk_blockId。Block中方法比较简单,主要看Writable和Comparable接口的实现

1
2
3
4
5
public void write(DataOutput out) throws IOException {
out.writeLong(blockId);
out.writeLong(numBytes);
out.writeLong(generationStamp);
}

如上,序列化时,写blockId,numBytes,generationStamp。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public int compareTo(Block b) {
//Wildcard generationStamp is NOT ALLOWED here
validateGenerationStamp(this.generationStamp);
validateGenerationStamp(b.generationStamp);

if (blockId < b.blockId) {
return -1;
} else if (blockId == b.blockId) {
return GenerationStamp.compare(generationStamp, b.generationStamp);//long型之间的比较
} else {
return 1;
}
}
static void validateGenerationStamp(long generationstamp) {
if (generationstamp == GenerationStamp.WILDCARD_STAMP) {//WILDCARD_STAMP为1
throw new IllegalStateException("generationStamp (=" + generationstamp + ") == GenerationStamp.WILDCARD_STAMP");
}
}
public static int compare(long x, long y) { return x < y? -1: x == y? 0: 1; }

如上,两个区块进行比较时,先看blockId,相等时比较generationStamp。


2. BlockMetaDataInfo

BlockMetaDataInfo继承Block,在原来Block的基础上增加了lastScanTime

1
public class BlockMetaDataInfo extends Block

成员

1
private long lastScanTime;

静态代码段WritableFactories注册

1
2
3
4
5
6
7
static final WritableFactory FACTORY = new WritableFactory() {
public Writable newInstance() { return new BlockMetaDataInfo(); }
};
static { // register a ctor
WritableFactories.setFactory(BlockMetaDataInfo.class, FACTORY);
}
public BlockMetaDataInfo() {}

序列化

1
2
3
4
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeLong(lastScanTime);
}


3. BlockRecoveryInfo

BlockRecoveryInfo管理了一个区块以及标识该区块是否是在启动时修复的,实现了Writable接口

1
public class BlockRecoveryInfo implements Writable

成员

1
2
private Block block;
private boolean wasRecoveredOnStartup;

序列化

1
2
3
4
public void write(DataOutput out) throws IOException {
block.write(out);
out.writeBoolean(wasRecoveredOnStartup);
}


4. LocatedBlock

LocatedBlock是确定了存储位置的区块,成员变量除了Block之外,还包括数据块在对应文件中的偏移量和所属的数据节点信息等。
LocatedBlock实现了Writable接口,没有像Block一样实现Comparable接口,同样的在静态代码段中注册WritableFactories中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static {                                      // register a ctor
WritableFactories.setFactory
(LocatedBlock.class, new WritableFactory() {
public Writable newInstance() { return new LocatedBlock(); }
});
}
public LocatedBlock() { this(new Block(), new DatanodeInfo[0], 0L, false); }
public LocatedBlock(Block b, DatanodeInfo[] locs, long startOffset, boolean corrupt) {
this.b = b;
this.offset = startOffset;
this.corrupt = corrupt;
if (locs==null) {
this.locs = new DatanodeInfo[0];
} else {
this.locs = locs;
}
}

成员属性如下:
LocatedBlock成员属性

  • b,对应的Block;
  • offset,对应Block在文件中的偏移量;
  • locs,区块所属DataNode的信息,为数组,因为含有多个备份,可能存在多个DataNode上,为DatanodeInfo对象;
  • corrupt,该Block是否损坏;
  • blockToken,区块的口令信息,安全相关;

对应的序列化方法

1
2
3
4
5
6
7
8
9
10
public void write(DataOutput out) throws IOException {
blockToken.write(out);
out.writeBoolean(corrupt);
out.writeLong(offset);
b.write(out);
out.writeInt(locs.length);
for (int i = 0; i < locs.length; i++) {
locs[i].write(out);
}
}


5. LocatedBlocks

LocatedBlocks包含一个文件内的局部或全部区块信息,管理了一系列的LocatedBlock对象。
LocatedBlocks实现了Writable接口,静态代码段中注册WritableFactories

1
2
3
4
5
6
7
8
9
10
11
static {                                      // register a ctor
WritableFactories.setFactory
(LocatedBlocks.class, new WritableFactory() {
public Writable newInstance() { return new LocatedBlocks(); }
});
}
LocatedBlocks() {
fileLength = 0;
blocks = null;
underConstruction = false;
}

成员属性如下
LocatedBlocks成员属性

  • fileLength,为这些区块所属文件的文件长度;
  • blocks,管理的LocatedBlock集合;
  • underConstruction,是否处于构建标识,当文件处于构建状态,此时客户端正在写文件,fileLength可能在后续过程中会发生变化;

对应的序列化方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void write(DataOutput out) throws IOException {
out.writeLong(this.fileLength);
out.writeBoolean(underConstruction);
// write located blocks
int nrBlocks = locatedBlockCount();
out.writeInt(nrBlocks);
if (nrBlocks == 0) {
return;
}
for (LocatedBlock blk : this.blocks) {
blk.write(out);
}
}
public int locatedBlockCount() { return blocks == null ? 0 : blocks.size(); }


6. BlockLocalPathInfo

BlockLocalPathInfo应用于ClientDatanodeProtocol接口中,用于HDFS读文件的数据节点本地读优化。当客户端发现它和它要读取的数据块正好位于同一台主机上时,它可以不通过数据节点读数据块,而是直接读取本地文件,以获取数据块的内容,大大降低了对应数据节点的负载。(参考技术内幕)
当客户端执行本地读优化时,需要通过ClientDatanodeProtocol获取数据块对应的本地文件,数据节点通过BlockLocalPathInfo返回本地文件的路径,保存在成员变量localBlockPath中,成员变量localMetaPath是数据块校验信息文件的路径,该文件保存了数据块的校验信息。客户端通过这两个路径,直接读取本地文件。(参考技术内幕)

BlockLocalPathInfo实现了Writable接口,在静态代码段中注册WritableFactories

1
2
3
4
5
6
7
static final WritableFactory FACTORY = new WritableFactory() {
public Writable newInstance() { return new BlockLocalPathInfo(); }
};
static { // register a ctor
WritableFactories.setFactory(BlockLocalPathInfo.class, FACTORY);
}
public BlockLocalPathInfo() {}

成员属性
BlockLocalPathInfo成员属性

  • block,对应的区块
  • localBlockPath,区块的本地路径
  • localMetaPath,区块校验信息的本地路径

序列化

1
2
3
4
5
public void write(DataOutput out) throws IOException {
block.write(out);
Text.writeString(out, localBlockPath);
Text.writeString(out, localMetaPath);
}


7. DatanodeID

DatanodeID标识了一个数据节点,实现了WritableComparable接口

1
public class DatanodeID implements WritableComparable<DatanodeID>

成员属性如下:
DatanodeID成员属性

  • EMPTY_ARRAY,一个空的DatanodeID数组

    1
    public static final DatanodeID[] EMPTY_ARRAY = {};
  • name,DataNode的名字,可以为”IP:Port”或”HostName:Port”,这里的Port为数据传输端口

  • storageID,数据节点的存储标识,当数据节点用不同的存储标识在名字节点上注册时,名字节点通过这个标识,了解到这是一个被重新使用的数据节点(参考技术内幕)
  • infoPort,数据节点www服务器的监听端口,通过该端口可以使用http/https协议访问数据节点
  • ipcPort,数据节点IPC服务器监听端口,即ClientDatanodeProtocol,InterDatanodeProtocol协议提供的服务

序列化

1
2
3
4
5
public void write(DataOutput out) throws IOException {
UTF8.writeString(out, name);
UTF8.writeString(out, storageID);
out.writeShort(infoPort);
}


8. DatanodeRegistration

DatanodeRegistration包含了NameNode标识和验证DataNode所需的所有DataNode信息,每次DataNode和NameNode通信时都会发送该对象给NameNode。
DatanodeRegistration继承DatanodeID实现了Writable接口。
成员如下:

1
2
public StorageInfo storageInfo;
public ExportedBlockKeys exportedKeys;

  • storageInfo,包含存储系统信息
  • exportedKeys,安全相关

    8.1 StorageInfo

    storageInfo包含了数据节点的存储系统信息,成员属性如下:
    1
    2
    3
    public int   layoutVersion;  //HDFS存储系统信息结构的版本号
    public int namespaceID; //存储系统的唯一标识符
    public long cTime; //存储系统信息的创建时间

当数据节点注册时,名字节点根据StorageInfo包含的信息进行检查,保证当前注册的节点是HDFS的一个合法数据节点,而不是一个属于其他集群的将诶点,或者曾经属于集群,但未进行必要升级的节点,以保证存储系统的一致性。

DatanodeRegistration实现了Writable接口,在静态代码段中注册WritableFactories

1
2
3
4
5
6
7
8
9
10
11
12
static {                                      // register a ctor
WritableFactories.setFactory
(DatanodeRegistration.class, new WritableFactory() {
public Writable newInstance() { return new DatanodeRegistration(); }
});
}
public DatanodeRegistration() { this(""); }
public DatanodeRegistration(String nodeName) {
super(nodeName);
this.storageInfo = new StorageInfo();
this.exportedKeys = new ExportedBlockKeys();
}

序列化

1
2
3
4
5
6
7
8
9
10
11
public void write(DataOutput out) throws IOException {
super.write(out);

//TODO: move it to DatanodeID once HADOOP-2797 has been committed
out.writeShort(ipcPort);

out.writeInt(storageInfo.getLayoutVersion());
out.writeInt(storageInfo.getNamespaceID());
out.writeLong(storageInfo.getCTime());
exportedKeys.write(out);
}


9. NamespaceInfo

NamespaceInfo继承StorageInfo实现了Writable接口,用于在数据节点向名字节点握手时versionRequest返回

1
public class NamespaceInfo extends StorageInfo implements Writable

除了StorageInfo中的成员外,还包含了

1
2
3
String revision;
String version;

int distributedUpgradeVersion;

静态代码段中注册WritableFactories

1
2
3
4
5
6
7
8
9
static {                                      // register a ctor
WritableFactories.setFactory
(NamespaceInfo.class, new WritableFactory() {
public Writable newInstance() { return new NamespaceInfo(); }
});
}
public NamespaceInfo() {
super();
}

序列化

1
2
3
4
5
6
7
8
public void write(DataOutput out) throws IOException {
UTF8.writeString(out, getVersion());
UTF8.writeString(out, getRevision());
out.writeInt(getLayoutVersion());
out.writeInt(getNamespaceID());
out.writeLong(getCTime());
out.writeInt(getDistributedUpgradeVersion());
}

以上为所属成员和父类成员


10. DatanodeInfo

DatanodeInfo继承DatanodeID实现了Node接口,添加了关于DataNode的很多状态信息。
Node接口实例为网络拓扑结构的一个节点表示,如果数据节点位于数据中心dog的机架orange中,则其网络位置的字符串表示形式为”/dog/orange”,而该数据节点的名字为”hostname:port”,由网络位置和数据节点名字确定该数据节点。
Node声明的方法有

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface Node {
//返回该节点网络位置的字符串表示形式
public String getNetworkLocation();
//设置该节点的网络位置
public void setNetworkLocation(String location);
//返回该节点的名字,如数据节点"hostname:port"
public String getName();
//返回父节点
public Node getParent();
//设置父节点
public void setParent(Node parent);
//返回节点在树中的层级,根节点为0,以此类推
public int getLevel();
//设置节点在树的层级
public void setLevel(int i);
}

DatanodeInfo的成员属性如下:
DatanodeInfo成员属性

  • capacity,容量
  • dfsUsed,已经使用容量
  • remaining,剩余容量
  • lastUpdate,状态更新时间
  • xceiverCount,流接口服务线程数
  • location,网络位置信息,缺省为NetworkTopology.DEFAULT_RACK(/default-rack)
  • hostName,主机名(IP:port或hostname:port)
  • adminState,管理员状态,可能为正常正在退役已经退役三种

    1
    public enum AdminStates {NORMAL, DECOMMISSION_INPROGRESS, DECOMMISSIONED; }
  • level,在集群树中的层级

  • parent,集群中父节点

静态代码段中注册WritableFactories

1
2
3
4
5
6
7
8
9
10
static {                                      // register a ctor
WritableFactories.setFactory
(DatanodeInfo.class, new WritableFactory() {
public Writable newInstance() { return new DatanodeInfo(); }
});
}
public DatanodeInfo() {
super();
adminState = null;
}

序列化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void write(DataOutput out) throws IOException {
super.write(out);

//TODO: move it to DatanodeID once DatanodeID is not stored in FSImage
out.writeShort(ipcPort);
out.writeLong(capacity);
out.writeLong(dfsUsed);
out.writeLong(remaining);
out.writeLong(lastUpdate);
out.writeInt(xceiverCount);
Text.writeString(out, location);
Text.writeString(out, hostName == null? "": hostName);
WritableUtils.writeEnum(out, getAdminState());
}


11. HdfsFileStatus

HdfsFileStatus是HDFS中文件状态的实现,实现了Writable接口,成员属性如下:
HdfsFileStatus成员属性

  • path,文件路径的UTF8编码形式
  • EMPTY_NAME,静态成员,空名字
    其他和FileStatus一样

静态代码段中注册WritableFactories

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static {                                      // register a ctor
WritableFactories.setFactory
(HdfsFileStatus.class, new WritableFactory() {
public Writable newInstance() { return new HdfsFileStatus(); }
});
}
public HdfsFileStatus() { this(0, false, 0, 0, 0, 0, null, null, null, null); }
public HdfsFileStatus(long length, boolean isdir, int block_replication,
long blocksize, long modification_time, long access_time,
FsPermission permission, String owner, String group, byte[] path)
{

this.length = length;
this.isdir = isdir;
this.block_replication = (short)block_replication;
this.blocksize = blocksize;
this.modification_time = modification_time;
this.access_time = access_time;
this.permission = (permission == null) ?
FsPermission.getDefault() : permission;
this.owner = (owner == null) ? "" : owner;
this.group = (group == null) ? "" : group;
this.path = path;
}

序列化

1
2
3
4
5
6
7
8
9
10
11
12
13
public void write(DataOutput out) throws IOException {
out.writeInt(path.length);
out.write(path);
out.writeLong(length);
out.writeBoolean(isdir);
out.writeShort(block_replication);
out.writeLong(blocksize);
out.writeLong(modification_time);
out.writeLong(access_time);
permission.write(out);
Text.writeString(out, owner);
Text.writeString(out, group);
}


12. DirectoryListing

DirectoryListing维护了目录下的局部文件状态信息,看它的成员属性

1
2
private HdfsFileStatus[] partialListing;//文件状态集合
private int remainingEntries;//目录下还剩多少项

如上,简单的维护了一组文件状态信息。
DirectoryListing实现Writable接口,在静态代码段中注册WritableFactories

1
2
3
4
5
6
7
static {                                      // register a ctor
WritableFactories.setFactory
(DirectoryListing.class, new WritableFactory() {
public Writable newInstance() { return new DirectoryListing(); }
});
}
public DirectoryListing() {}

序列化

1
2
3
4
5
6
7
8
@Override
public void write(DataOutput out) throws IOException {

out.writeInt(partialListing.length);
for (HdfsFileStatus fileStatus : partialListing) {
fileStatus.write(out);
}
out.writeInt(remainingEntries);
}


13. DatanodeCommand(技术内幕)

NameNode发送给DataNode的命令,在DataNode对NameNode的心跳返回等方法中获得,然后执行相应命令。
成员

1
private int action;

执行的命令,支持的命令有

1
2
3
4
5
6
7
8
9
10
11
12
final static int DNA_UNKNOWN = 0;    //未定义
//数据块复制,当系统中某数据块的副本数小于设置值时,名字节点会通知某一个拥有该数据块的数据节点将数据块复制到其他数据节点
final static int DNA_TRANSFER = 1;
final static int DNA_INVALIDATE = 2; //数据块删除,删除多余的数据块
final static int DNA_SHUTDOWN = 3; //关闭数据节点
final static int DNA_REGISTER = 4; //数据节点重新注册
final static int DNA_FINALIZE = 5; //提交上一次上级
//请求数据块恢复,客户端在写数据过程中异常退出时,数据节点上同一数据块的不同副本数可能处于不一致的状态。
//名字节点选择一个主数据节点,协调其他数据节点将副本恢复到一致的状态。HDFS在数据恢复过程中采取了将各个数据块恢复到它们中的最小长度的策略。
final static int DNA_RECOVERBLOCK = 6;
final static int DNA_ACCESSKEYUPDATE = 7; //安全相关
final static int DNA_BALANCERBANDWIDTHUPDATE = 8; //更新平衡器可用带宽

对应的类如下
DatanodeCommand

  • BalancerBandwidthCommand,继承DatanodeCommand,设置数据节点均衡器可用的最大带宽,对应的命令编号为DNA_BALANCERBANDWIDTHUPDATE(8)。新的带宽值为成员bandwidth,单位为bytes/s

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public final static int BBC_VERSION = 1;
    private final static long BBC_DEFAULTBANDWIDTH = 0L;
    private long bandwidth;
    private int version = BBC_VERSION;

    public BalancerBandwidthCommand(long bandwidth) {
    super(DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE);//命令编号DNA_BALANCERBANDWIDTHUPDATE
    this.bandwidth = bandwidth;
    }

    静态代码段注册和序列化略。

  • Finalize,DatanodeCommand内部类,继承DatanodeCommand

    1
    2
    3
    4
    5
    static class Finalize extends DatanodeCommand {
    private Finalize() {super(DatanodeProtocol.DNA_FINALIZE);}//命令编号DNA_FINALIZE
    public void readFields(DataInput in) {}
    public void write(DataOutput out) {}
    }

    静态注册略。

  • UpgradeCommand,用于启动升级,命令编号为DNA_UNKNOWN,由于数据节点在处理这条命令时,不会处于一般的正常工作状态,所以通过子类而不是命令编号来区分命令。
  • KeyUpdateCommand,对应命令编号为DNA_ACCESSKEYUPDATE,安全相关。
  • BlockCommand,对应的命令编号有DNA_TRANSFER,DNA_INVALIDATEDNA_RECOVERBLOCK,与数据块操作有关,继承DatanodeCommand的同时提供以下成员

    1
    2
    Block blocks[];
    DatanodeInfo targets[][];

    如上,blocks为要操作(删除或复制等)的区块,targets为区块对应的数据节点信息。
    具体静态代码段中的注册和序列化略。

  • Register,DatanodeCommand的内部类,对应的命令编号为DNA_REGISTER
  • 命令编号DNA_SHUTDOWN已经废弃不用,如果名字节点需要让数据节点停止服务,名字节点通过抛出异常DisallowedDatanodeException来通知数据节点,而不是通过某个命令编号为DNA_SHUTDOWN的DatanodeCommand子类来下发这个命令。