Hadoop FileSystem实现---LocalFileSystem

Hadoop版本:Hadoop-1.2.1


LocalFileSystem作为ChecksumFileSystem的本地实现,底层的文件系统为RawLocalFileSystem。是Hadoop中默认的文件系统(fs.default.name)。
ChecksumFileSystem另见ChecksumFileSystem
RawLocalFileSystem另见RawLocalFileSystem

1
public class LocalFileSystem extends ChecksumFileSystem


1. 成员,构造

1
2
3
static final URI NAME = URI.create("file:///");
static private Random rand = new Random();
FileSystem rfs;
  • NAME,LocalFileSystem的URI,模式为file
  • rand,随机数对象
  • rfs,底层文件系统对象

构造如下:

1
2
3
4
5
public LocalFileSystem() { this(new RawLocalFileSystem()); }
public LocalFileSystem(FileSystem rawLocalFileSystem) {
super(rawLocalFileSystem);
rfs = rawLocalFileSystem;
}

如上,底层文件系统为RawLocalFileSystem,调用父类ChecksumFileSystem的构造函数。


2. 对ChecksumFileSystem中一些过程的解析

ChecksumFileSystem分析,一些操作依赖于底层文件系统的相关实现,下面对主要的一些方法进行解析

2.1 数据流

ChecksumFileSystem中输出流为ChecksumFSOutputSummer,包含成员datas和sums,从这里可以看出实际对应为RawLocalFileSystem中的输出流对象LocalFSFileOutputStream,内部为文件输出流FileOutputStream。
ChecksumFileSystem中输入流为ChecksumFSInputChecker,包含成员datas和sums,因此这两个流实际对应为RawLocalFileSystem中的输入流对象LocalFSFileInputStream,内部为TrackingFileInputStream,即在读取数据的过程中会更新统计信息。

2.2 seek操作

ChecksumFileSystem中的seek操作依赖于父类FSInputChecker的seek方法,如果不在当前缓冲buf中,设置chunkPos后在下次读取数据时会将datas以及sums定位到chunkPos处,即使用datas和sums的seek方法,这样实际是LocalFSFileInputStream的seek方法,使用文件输入流获取通道,然后通过通道的position设置流的位置。

2.3 数据读写

ChecksumFileSystem中数据读取从内部缓冲buf中读取数据,不足时从datas,sums中读取,因此最终对应的是文件输入流中读取数据。
ChecksumFileSystem中数据写操作先写到内部缓冲中,满足刷新要求时,刷新到datas,sums中,因此最终对应的是文件输出流的写操作。

2.4 流的关闭

输入流和输出流的关闭,最终会由文件输入流和文件输出流的close方法完成。

2.5 文件系统的关闭

ChecksumFileSystem文件系统关闭close方法继承自FilterFileSystem的close方法,即完全依赖于底层文件系统的close方法,这里为RawLocalFileSystem的close方法。
而RawLocalFileSystem的close方法直接为父类FileSystem的close方法,即最终删除标记为在文件系统关闭时删除的文件,然后从CACHE中移除。

定位到其他源

ChecksumFileSystem输入流的seekToNewSource方法在读数据失败时使用,通过文件系统的reportChecksumFailure方法报告校验和失败后,通过datas和sums的seekToNewSource方法分别将源文件输入流和校验文件输入流定位到新的源,而两个流为LocalFSFileInputStream对象,直接返回false,即不能定位到新的源。
本来RawLocalFileSystem的备份数为1,也就是说失败一次就抛出异常,不会尝试定位到新的源继续读取。


3. 其他方法

1
2
3
4
5
6
7
8
public boolean exists(Path f) throws IOException {
File path = pathToFile(f);
if (path.exists()) {
return true;
} else {
return false;
}
}

路径存在性通过File对象判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public boolean reportChecksumFailure(Path p, FSDataInputStream in, long inPos, FSDataInputStream sums, long sumsPos) {
try {
// canonicalize f
File f = ((RawLocalFileSystem)fs).pathToFile(p).getCanonicalFile();

// find highest writable parent dir of f on the same device
String device = new DF(f, getConf()).getMount();
File parent = f.getParentFile();
File dir = null;
while (parent!=null && parent.canWrite() && parent.toString().startsWith(device)) {
dir = parent;
parent = parent.getParentFile();
}

if (dir==null) {
throw new IOException( "not able to find the highest writable parent dir");
}

// move the file there
File badDir = new File(dir, "bad_files");
if (!badDir.mkdirs()) {//创建报告目录
if (!badDir.isDirectory()) {//创建失败且目录不存在抛出异常
throw new IOException("Mkdirs failed to create " + badDir.toString());
}
}
String suffix = "." + rand.nextInt();
File badFile = new File(badDir, f.getName()+suffix);
LOG.warn("Moving bad file " + f + " to " + badFile);
//LocalFileSystem不支持定位到其他源,因此这里直接关闭源文件输入流
in.close(); // close it first
//将校验失败的源文件重命名为badFile
f.renameTo(badFile); // rename it

// move checksum file too
File checkFile = ((RawLocalFileSystem)fs).pathToFile(getChecksumFile(p));
checkFile.renameTo(new File(badDir, checkFile.getName()+suffix));

} catch (IOException e) {
LOG.warn("Error moving bad file " + p + ": " + e);
}
return false;
}

该方法在seekToNewSource方法中定位到其他源前被调用,报告校验错误。在源文件根目录(/)下创建一个”bad_files”目录记录校验失败的源文件和校验文件。将源文件拷贝到”bad_files”目录下命名为”srcName.rand”,srcName为源文件名,rand为一个随机整数,校验文件拷贝到该目录下命名为”parName.rand”,parName为校验文件名,rand为与源文件对应的随机数。

因此,LocalFileSystem读数据校验和验证失败时,通过seekToNewSource尝试定位到其他源,而LocalFileSystem的底层文件系统RawLocalFileSystem不支持定位到其他源,只是在源文件的跟目录下创建一个”bad_files”目录记录所有的校验失败源文件和校验文件。