Hadoop FileSystem实现---RawLocalFileSystem

Hadoop版本:Hadoop-1.2.1
参考:《Hadoop技术内幕-深入解析Hadoop Common和HDFS架构设计与实现原理》


Hadoop抽象文件系统FileSystem具体见Hadoop抽象文件系统,包括主要的实现类。
本文分析RawLocalFileSystem。
RawLocalFileSystem用于访问本地文件系统,Java的本地文件系统操作通过java.io.File类实现,具体为File的私有静态成员fs完成

1
private static final FileSystem fs = DefaultFileSystem.getFileSystem();

因此,RawLocalFileSystem使用适配器模式实现了Hadoop的本地文件系统。


1. 成员,构造,初始化

成员有:

1
2
static final URI NAME = URI.create("file:///");
private Path workingDir;

  • NAME,该文件系统对应的URI,模式为file
  • workingDir,保存当前工作目录

构造:

1
public RawLocalFileSystem() { workingDir = new Path(System.getProperty("user.dir")).makeQualified(this); }

当前工作目录为系统属性user.dir
初始化:

1
2
3
4
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
setConf(conf);
}

调用抽象文件系统初始化,创建对应的统计信息对象,然后设置配置conf。


2. 输入流

输入流为内部类LocalFSFileInputStream,实现了FSInputStreamHasFileDescriptor接口

2.1 构造

成员:

1
2
FileInputStream fis;//文件输入流
private long position;//当前位置

底层肯定为文件输入流,position记录了当前位置。
构造:

1
2
3
public LocalFSFileInputStream(Path f) throws IOException {
this.fis = new TrackingFileInputStream(pathToFile(f));
}

成员fis创建为指定文件的TrackingFileInputStream对象,TrackingFileInputStream继承了FileInputStream,在文件输入流读取数据的基础上,更新文件系统的统计信息。
如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class TrackingFileInputStream extends FileInputStream {
public TrackingFileInputStream(File f) throws IOException {
super(f);
}//文件创建FileInputStream

public int read() throws IOException {
int result = super.read();
if (result != -1) {//读取数据成功,更新文件系统的统计信息
statistics.incrementBytesRead(1);
}
return result;
}

public int read(byte[] data) throws IOException {
int result = super.read(data);
if (result != -1) {//读取数据成功,更新文件系统的统计信息
statistics.incrementBytesRead(result);
}
return result;
}

public int read(byte[] data, int offset, int length) throws IOException {
int result = super.read(data, offset, length);
if (result != -1) {
statistics.incrementBytesRead(result);
}
return result;
}
}

2.2 Seekable接口实现

Seekable接口包括seek,getPos,seekToNewSource三个方法

1
2
3
4
public void seek(long pos) throws IOException {
fis.getChannel().position(pos);
this.position = pos;
}

seek方法直接通过文件输入流获取通道,然后通过通道的position设置当前位置,通道位置改变后,流的当前位置也相应改变。通过position记录当前流的位置。

1
2
3
public long getPos() throws IOException {
return this.position;
}

getPos方法返回position即可。

1
2
3
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}

因为是本地文件系统,所以不支持切换到其他备份的源,seekToNewSource直接返回false。

2.3 PositionedReadable实现

1
2
3
4
5
6
7
8
public int read(long position, byte[] b, int off, int len) throws IOException {
ByteBuffer bb = ByteBuffer.wrap(b, off, len);//包裹成ByteBuffer,ByteBuffer中数据改变,原字节数组b内容也改变
try {
return fis.getChannel().read(bb, position);//通过通道直接读取指定位置的数据
} catch (IOException e) {
throw new FSError(e);
}
}

如上,直接通过通道读取指定位置上的数据
而至于readFully方法,直接使用FSInputStream中的实现,循环调用上面的read方法。

2.4 HasFileDescriptor实现

1
2
3
public FileDescriptor getFileDescriptor() throws IOException {
return fis.getFD();
}

由文件输入流返回对应的文件描述符对象。

2.5 其他方法重载

1
2
3
public int available() throws IOException { return fis.available(); }
public void close() throws IOException { fis.close(); }
public boolean markSupport() { return false; }

如上,简单的使用文件输入流对应的方法

1
2
3
4
5
6
7
8
9
10
11
public int read() throws IOException {
try {
int value = fis.read();
if (value >= 0) {
this.position++;
}
return value;
} catch (IOException e) { // unexpected exception
throw new FSError(e); // assume native fs error
}
}

而read方法使用TrackingFileInputStream的read方法读取数据并更新文件系统的统计信息,然后增加流的位置position。
其他的读取指定字节的read方法类似。

1
2
3
4
5
6
7
public long skip(long n) throws IOException {
long value = fis.skip(n);
if (value > 0) {
this.position += value;
}
return value;
}

skip方法也在使用文件输入流跳过指定字节后,更新position。


3. 输出流

输出流为内部类LocalFSFileOutputStream,继承OutputStream实现了Syncable接口。

3.1 构造

1
2
3
4
5
FileOutputStream fos;

private LocalFSFileOutputStream(Path f, boolean append) throws IOException {
this.fos = new FileOutputStream(pathToFile(f), append);//通过文件构造文件输出流,append是否追加模式
}

通过给定的文件构造文件输出流

3.2 OutputStream方法实现

相应的OutputStream的方法实现依赖于文件输出流的方法。

1
2
3
4
5
6
7
8
9
public void close() throws IOException { fos.close(); }
public void flush() throws IOException { fos.flush(); }
public void write(int b) throws IOException {
try {
fos.write(b);
} catch (IOException e) { // unexpected exception
throw new FSError(e); // assume native fs error
}
}

3.3 Syncable接口实现

1
2
3
public void sync() throws IOException {
fos.getFD().sync();
}

4. 文件状态

RawLocalFileSystem的文件状态对应为内部类RawLocalFileStatus类,继承FileStatus实现。

1
static class RawLocalFileStatus extends FileStatus

4.1 构造

1
2
3
RawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) {
super(f.length(), f.isDirectory(), 1, defaultBlockSize, f.lastModified(), new Path(f.getPath()).makeQualified(fs));
}

如上,由传入的File对象获取长度,f为目录时长度不明确。

4.2 获取权限信息

包括getOwner获取所有者,getGroup获取所在组,getPermission获取权限对象。三者都需要先从文件系统中加载权限信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
public String getOwner() {
if (!isPermissionLoaded()) { loadPermissionInfo(); }
return super.getOwner();
}
public String getGroup() {
if (!isPermissionLoaded()) { loadPermissionInfo(); }
return super.getGroup();
}
public FsPermission getPermission() {
if (!isPermissionLoaded()) { loadPermissionInfo(); }
return super.getPermission();
}
private boolean isPermissionLoaded() { return !super.getOwner().equals(""); }

另外,序列化时也要先判断是否获取了权限信息,即该对象的成员是否正常,然后序列化

1
2
3
4
public void write(DataOutput out) throws IOException {
if (!isPermissionLoaded()) { loadPermissionInfo(); }
super.write(out);
}

如上,通过isPermissionLoaded判断是否加载了权限信息,如果加载了对应的owner成员应该不为空。
如果没有加载则通过loadPermissionInfo加载权限信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void loadPermissionInfo() {
IOException e = null;
try {
//创建新的Java虚拟机进程,在虚拟机进程中执行Shell命令"ls -al"获取权限
StringTokenizer t = new StringTokenizer(FileUtil.execCommand(new File(getPath().toUri()), Shell.getGET_PERMISSION_COMMAND()));
//-rw------- 1 username groupname ...
String permission = t.nextToken();
if (permission.length() > 10) { //files with ACLs might have a '+'
permission = permission.substring(0, 10);
}
setPermission(FsPermission.valueOf(permission));//设置权限对象
t.nextToken();
setOwner(t.nextToken());//所有者
setGroup(t.nextToken());//所在组
...

如上,loadPermissionInfo中追踪FileUtil.execCommand执行,最终在UNIXProcess中通过forkAndExec会创建新的虚拟机进程,执行Shell命令”ls -al”获取文件权限信息。

1
2
3
pid = forkAndExec(launchMechanism.ordinal() + 1,
helperpath, prog, argBlock, argc, envBlock, envc,
dir, fds, redirectErrorStream);

如上,prog为命令,这里为ls。argBlock为命令以外的参数信息,字节数组形式,这里为”-al filename”的字节数组;argc为命令以外的参数个数,这里为2个;envBlock为传入的环境变量字节数组形式,这里没有;envc为环境变量个数,这里为0;dir为当前工作目录,这里没有;fds为对应的输出文件描述符,stdin(0),stdout(1),stderr(2),pipe(-1),这里应该为stdout(1),通过该文件描述符创建输入流读取结果(这里对fds的描述可能有误)。
读取结果

1
2
3
4
5
6
7
8
9
10
BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
parseExecResult(inReader);
protected void parseExecResult(BufferedReader lines) throws IOException {
output = new StringBuffer();
char[] buf = new char[512];
int nRead;
while ( (nRead = lines.read(buf, 0, buf.length)) > 0 ) {//从stdout中读取结果至ShellCommandExecutor的output中
output.append(buf, 0, nRead);
}
}

process.getInputStream中的process为UNIXProcess,如下:

1
2
3
public InputStream getInputStream() {
return stdout;
}

如上从进程的stdout中读取结果至ShellCommandExecutor的output中,然后通过ShellCommandExecutor.getOutput获取结果。


5. 文件系统操作实现

5.1 open,create,append

1
2
3
4
5
6
7
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
if (!exists(f)) {//不存在抛出异常
throw new FileNotFoundException(f.toString());
}
//创建文件系统的输入流,封装成BufferedFSInputStream,然后封装成文件系统的统一输入流FSDataInputStream
return new FSDataInputStream(new BufferedFSInputStream(new LocalFSFileInputStream(f), bufferSize));
}

如上,创建本文件系统的输入流,然后封装成BufferedInputStream,BufferedInputStream与BufferedInputStream类似,只不过是对FSInputStream的缓冲,底层流只能是FSInputStream,具体见Hadoop抽象文件系统中对应描述。最终封装成文件系统统一输入流FSDataInputStream,拥有DataInput接口的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private FSDataOutputStream create(Path f, boolean overwrite, boolean createParent, int bufferSize,
short replication, long blockSize, Progressable progress) throws IOException {
if (exists(f) && !overwrite) {//文件存在,且不允许覆盖,抛出异常
throw new IOException("File already exists:"+f);
}
Path parent = f.getParent();
if (parent != null) {
if (!createParent && !exists(parent)) {//父目录不存在且不允许创建父目录,抛出异常
throw new FileNotFoundException("Parent directory doesn't exist: "
+ parent);
} else if (!mkdirs(parent)) {//创建父目录,如果失败抛出异常
throw new IOException("Mkdirs failed to create " + parent);
}
}
//创建本文件系统对应输出流,封装成BufferedOutputStream,然后封装成文件系统统一的输出流对象
//注意构建LocalFSFileOutputStream时append参数为false
return new FSDataOutputStream(new BufferedOutputStream(new LocalFSFileOutputStream(f, false), bufferSize), statistics);
}

如上,创建本文键系统对应的输出流LocalFSFileOutputStream,然后封装成BufferedOutputStream,最后封装成DataFSOutputStream。

1
2
3
4
5
6
7
8
9
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
if (!exists(f)) {//不存在,直接抛出异常
throw new FileNotFoundException("File " + f + " not found.");
}
if (getFileStatus(f).isDir()) {//为目录,抛出异常
throw new IOException("Cannot append to a diretory (=" + f + " ).");
}
return new FSDataOutputStream(new BufferedOutputStream( new LocalFSFileOutputStream(f, true), bufferSize), statistics);
}

如上,创建LocalFSFileOutputStream,其中append参数为true允许追加,然后进行相应的封装。

5.2 获取文件状态相关方法

1
2
3
4
5
6
7
8
public FileStatus getFileStatus(Path f) throws IOException {
File path = pathToFile(f);//将Path转换为File对象
if (path.exists()) {
return new RawLocalFileStatus(pathToFile(f), getDefaultBlockSize(), this);
} else {//不存在抛出异常
throw new FileNotFoundException( "File " + f + " does not exist.");
}
}

getFileStatus将Path转换为文件后,创建相应的RawLocalFileStatus对象返回。
而listStatus判断Path是否为文件,若为文件则创建相应的RawLocalFileStatus放入数组返回,否则对目录下每一个Path通过getFileStatus获取对应文件状态。

5.3 delete

1
2
3
4
5
6
7
8
9
10
public boolean delete(Path p, boolean recursive) throws IOException {
File f = pathToFile(p);
if (f.isFile()) {
return f.delete();
} else if ((!recursive) && f.isDirectory() &&
(FileUtil.listFiles(f).length != 0)) {
throw new IOException("Directory " + f.toString() + " is not empty");
}
return FileUtil.fullyDelete(f);
}

也是通过File来执行删除操作,如果是文件则直接通过File的delete方法删除该文件。若为目录,如果不为空且不允许递归删除的话,抛出异常,否则先删除目录下的所有目录项(可能为目录,此时仍然先删除目录项然后删除目录),然后删除该目(通过FileUtil.fullyDelete完成)。最终都是调用File的delete方法。

5.4 mkdirs

1
2
3
4
5
public boolean mkdirs(Path f) throws IOException {
Path parent = f.getParent();
File p2f = pathToFile(f);
return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory());
}

如上会递归创建目录,创建一个目录时,若其父目录不存在则会先创建父目录。
用到了布尔表达式短路求值。(parent==null||mkdirs(parent))成功时才会继续(p2f.mkdir)||p2f.isDirectory)。
而在前半部分中,如果父目录为null则前半部分直接返回true,不需要创建父目录,接下来创建子目录;若父目录不为null则执行mkdir(parent)创建父目录。若创建失败,则前半部分返回false,父目录创建失败,后半部分不会执行。父目录创建成功则前半部分返回true,执行后半部分创建子目录。创建父目录通过mkdirs同样的会递归创建父目录的父目录,直到根目录。
后半部分中,通过File的mkdir创建子目录。创建成功则直接返回true,否则通过isDirectory判断子目录是否已经创建了,若已经创建则返回ture,否则子目录创建失败,返回false。

5.5 rename

1
2
3
4
5
public boolean rename(Path src, Path dst) throws IOException {
if (pathToFile(src).renameTo(pathToFile(dst))) { return true; }
LOG.debug("Falling through to a copy of " + src + " to " + dst);
return FileUtil.copy(this, src, this, dst, true, getConf());
}

首先通过File的renameTo方法重命名,若失败则通过FileUtil拷贝数据到目的文件。

5.6 setOwner,setPermission

两者也是通过执行Shell命令设置相应的值

1
public void setOwner(Path p, String username, String groupname) throws IOException

username为null时设置所在组,命令对应为chgrp,否则设置所属用户,命令对应为chown
setPermission对应命令为chmod

5.7 其他方法

1
2
3
public Path getHomeDirectory() {
return new Path(System.getProperty("user.home")).makeQualified(this);
}

主目录为系统属性user.home

1
2
3
4
5
6
7
private Path makeAbsolute(Path f) {
if (f.isAbsolute()) {
return f;
} else {
return new Path(workingDir, f);
}
}

绝对目录父目录为当前工作目录

1
2
3
4
public void setWorkingDirectory(Path newDir) {
workingDir = makeAbsolute(newDir);
checkPath(workingDir);//检查workingDir是否为该文件系统中的路径,若不是会抛出运行时异常
}