Hadoop版本:Hadoop-1.2.1
参考:《Hadoop技术内幕-深入解析Hadoop Common和HDFS架构设计与实现原理》
Hadoop抽象文件系统FileSystem具体见Hadoop抽象文件系统,包括主要的实现类。
本文分析RawLocalFileSystem。
RawLocalFileSystem用于访问本地文件系统,Java的本地文件系统操作通过java.io.File
类实现,具体为File的私有静态成员fs完成1
private static final FileSystem fs = DefaultFileSystem.getFileSystem();
因此,RawLocalFileSystem使用适配器模式实现了Hadoop的本地文件系统。
1. 成员,构造,初始化
成员有:1
2static final URI NAME = URI.create("file:///");
private Path workingDir;
NAME
,该文件系统对应的URI,模式为fileworkingDir
,保存当前工作目录
构造:1
public RawLocalFileSystem() { workingDir = new Path(System.getProperty("user.dir")).makeQualified(this); }
当前工作目录为系统属性user.dir
。
初始化:1
2
3
4public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
setConf(conf);
}
调用抽象文件系统初始化,创建对应的统计信息对象,然后设置配置conf。
2. 输入流
输入流为内部类LocalFSFileInputStream
,实现了FSInputStream
和HasFileDescriptor
接口
2.1 构造
成员:1
2FileInputStream fis;//文件输入流
private long position;//当前位置
底层肯定为文件输入流,position记录了当前位置。
构造:1
2
3public LocalFSFileInputStream(Path f) throws IOException {
this.fis = new TrackingFileInputStream(pathToFile(f));
}
成员fis创建为指定文件的TrackingFileInputStream
对象,TrackingFileInputStream继承了FileInputStream,在文件输入流读取数据的基础上,更新文件系统的统计信息。
如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29class TrackingFileInputStream extends FileInputStream {
public TrackingFileInputStream(File f) throws IOException {
super(f);
}//文件创建FileInputStream
public int read() throws IOException {
int result = super.read();
if (result != -1) {//读取数据成功,更新文件系统的统计信息
statistics.incrementBytesRead(1);
}
return result;
}
public int read(byte[] data) throws IOException {
int result = super.read(data);
if (result != -1) {//读取数据成功,更新文件系统的统计信息
statistics.incrementBytesRead(result);
}
return result;
}
public int read(byte[] data, int offset, int length) throws IOException {
int result = super.read(data, offset, length);
if (result != -1) {
statistics.incrementBytesRead(result);
}
return result;
}
}
2.2 Seekable接口实现
Seekable接口包括seek,getPos,seekToNewSource三个方法1
2
3
4public void seek(long pos) throws IOException {
fis.getChannel().position(pos);
this.position = pos;
}
seek方法直接通过文件输入流获取通道,然后通过通道的position设置当前位置,通道位置改变后,流的当前位置也相应改变。通过position记录当前流的位置。1
2
3public long getPos() throws IOException {
return this.position;
}
getPos方法返回position即可。1
2
3public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
因为是本地文件系统,所以不支持切换到其他备份的源,seekToNewSource直接返回false。
2.3 PositionedReadable实现
1 | public int read(long position, byte[] b, int off, int len) throws IOException { |
如上,直接通过通道读取指定位置上的数据
而至于readFully方法,直接使用FSInputStream中的实现,循环调用上面的read方法。
2.4 HasFileDescriptor实现
1 | public FileDescriptor getFileDescriptor() throws IOException { |
由文件输入流返回对应的文件描述符对象。
2.5 其他方法重载
1 | public int available() throws IOException { return fis.available(); } |
如上,简单的使用文件输入流对应的方法1
2
3
4
5
6
7
8
9
10
11public int read() throws IOException {
try {
int value = fis.read();
if (value >= 0) {
this.position++;
}
return value;
} catch (IOException e) { // unexpected exception
throw new FSError(e); // assume native fs error
}
}
而read方法使用TrackingFileInputStream的read方法读取数据并更新文件系统的统计信息,然后增加流的位置position。
其他的读取指定字节的read方法类似。1
2
3
4
5
6
7public long skip(long n) throws IOException {
long value = fis.skip(n);
if (value > 0) {
this.position += value;
}
return value;
}
skip方法也在使用文件输入流跳过指定字节后,更新position。
3. 输出流
输出流为内部类LocalFSFileOutputStream
,继承OutputStream实现了Syncable接口。
3.1 构造
1 | FileOutputStream fos; |
通过给定的文件构造文件输出流
3.2 OutputStream方法实现
相应的OutputStream的方法实现依赖于文件输出流的方法。1
2
3
4
5
6
7
8
9public void close() throws IOException { fos.close(); }
public void flush() throws IOException { fos.flush(); }
public void write(int b) throws IOException {
try {
fos.write(b);
} catch (IOException e) { // unexpected exception
throw new FSError(e); // assume native fs error
}
}
3.3 Syncable接口实现
1 | public void sync() throws IOException { |
4. 文件状态
RawLocalFileSystem的文件状态对应为内部类RawLocalFileStatus
类,继承FileStatus实现。1
static class RawLocalFileStatus extends FileStatus
4.1 构造
1 | RawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) { |
如上,由传入的File对象获取长度,f为目录时长度不明确。
4.2 获取权限信息
包括getOwner
获取所有者,getGroup
获取所在组,getPermission
获取权限对象。三者都需要先从文件系统中加载权限信息。1
2
3
4
5
6
7
8
9
10
11
12
13public String getOwner() {
if (!isPermissionLoaded()) { loadPermissionInfo(); }
return super.getOwner();
}
public String getGroup() {
if (!isPermissionLoaded()) { loadPermissionInfo(); }
return super.getGroup();
}
public FsPermission getPermission() {
if (!isPermissionLoaded()) { loadPermissionInfo(); }
return super.getPermission();
}
private boolean isPermissionLoaded() { return !super.getOwner().equals(""); }
另外,序列化时也要先判断是否获取了权限信息,即该对象的成员是否正常,然后序列化1
2
3
4public void write(DataOutput out) throws IOException {
if (!isPermissionLoaded()) { loadPermissionInfo(); }
super.write(out);
}
如上,通过isPermissionLoaded
判断是否加载了权限信息,如果加载了对应的owner成员应该不为空。
如果没有加载则通过loadPermissionInfo加载权限信息。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15private void loadPermissionInfo() {
IOException e = null;
try {
//创建新的Java虚拟机进程,在虚拟机进程中执行Shell命令"ls -al"获取权限
StringTokenizer t = new StringTokenizer(FileUtil.execCommand(new File(getPath().toUri()), Shell.getGET_PERMISSION_COMMAND()));
//-rw------- 1 username groupname ...
String permission = t.nextToken();
if (permission.length() > 10) { //files with ACLs might have a '+'
permission = permission.substring(0, 10);
}
setPermission(FsPermission.valueOf(permission));//设置权限对象
t.nextToken();
setOwner(t.nextToken());//所有者
setGroup(t.nextToken());//所在组
...
如上,loadPermissionInfo中追踪FileUtil.execCommand
执行,最终在UNIXProcess
中通过forkAndExec
会创建新的虚拟机进程,执行Shell命令”ls -al”获取文件权限信息。1
2
3pid = forkAndExec(launchMechanism.ordinal() + 1,
helperpath, prog, argBlock, argc, envBlock, envc,
dir, fds, redirectErrorStream);
如上,prog为命令,这里为ls
。argBlock为命令以外的参数信息,字节数组形式,这里为”-al filename”的字节数组;argc为命令以外的参数个数,这里为2个;envBlock为传入的环境变量字节数组形式,这里没有;envc为环境变量个数,这里为0;dir为当前工作目录,这里没有;fds为对应的输出文件描述符,stdin(0),stdout(1),stderr(2),pipe(-1),这里应该为stdout(1),通过该文件描述符创建输入流读取结果(这里对fds的描述可能有误)。
读取结果1
2
3
4
5
6
7
8
9
10BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
parseExecResult(inReader);
protected void parseExecResult(BufferedReader lines) throws IOException {
output = new StringBuffer();
char[] buf = new char[512];
int nRead;
while ( (nRead = lines.read(buf, 0, buf.length)) > 0 ) {//从stdout中读取结果至ShellCommandExecutor的output中
output.append(buf, 0, nRead);
}
}
process.getInputStream中的process为UNIXProcess,如下:1
2
3public InputStream getInputStream() {
return stdout;
}
如上从进程的stdout中读取结果至ShellCommandExecutor的output中,然后通过ShellCommandExecutor.getOutput
获取结果。
5. 文件系统操作实现
5.1 open,create,append
1 | public FSDataInputStream open(Path f, int bufferSize) throws IOException { |
如上,创建本文件系统的输入流,然后封装成BufferedInputStream,BufferedInputStream与BufferedInputStream类似,只不过是对FSInputStream的缓冲,底层流只能是FSInputStream,具体见Hadoop抽象文件系统中对应描述。最终封装成文件系统统一输入流FSDataInputStream,拥有DataInput接口的功能。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18private FSDataOutputStream create(Path f, boolean overwrite, boolean createParent, int bufferSize,
short replication, long blockSize, Progressable progress) throws IOException {
if (exists(f) && !overwrite) {//文件存在,且不允许覆盖,抛出异常
throw new IOException("File already exists:"+f);
}
Path parent = f.getParent();
if (parent != null) {
if (!createParent && !exists(parent)) {//父目录不存在且不允许创建父目录,抛出异常
throw new FileNotFoundException("Parent directory doesn't exist: "
+ parent);
} else if (!mkdirs(parent)) {//创建父目录,如果失败抛出异常
throw new IOException("Mkdirs failed to create " + parent);
}
}
//创建本文件系统对应输出流,封装成BufferedOutputStream,然后封装成文件系统统一的输出流对象
//注意构建LocalFSFileOutputStream时append参数为false
return new FSDataOutputStream(new BufferedOutputStream(new LocalFSFileOutputStream(f, false), bufferSize), statistics);
}
如上,创建本文键系统对应的输出流LocalFSFileOutputStream,然后封装成BufferedOutputStream,最后封装成DataFSOutputStream。1
2
3
4
5
6
7
8
9public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
if (!exists(f)) {//不存在,直接抛出异常
throw new FileNotFoundException("File " + f + " not found.");
}
if (getFileStatus(f).isDir()) {//为目录,抛出异常
throw new IOException("Cannot append to a diretory (=" + f + " ).");
}
return new FSDataOutputStream(new BufferedOutputStream( new LocalFSFileOutputStream(f, true), bufferSize), statistics);
}
如上,创建LocalFSFileOutputStream,其中append参数为true允许追加,然后进行相应的封装。
5.2 获取文件状态相关方法
1 | public FileStatus getFileStatus(Path f) throws IOException { |
getFileStatus将Path转换为文件后,创建相应的RawLocalFileStatus对象返回。
而listStatus判断Path是否为文件,若为文件则创建相应的RawLocalFileStatus放入数组返回,否则对目录下每一个Path通过getFileStatus获取对应文件状态。
5.3 delete
1 | public boolean delete(Path p, boolean recursive) throws IOException { |
也是通过File来执行删除操作,如果是文件则直接通过File的delete方法删除该文件。若为目录,如果不为空且不允许递归删除的话,抛出异常,否则先删除目录下的所有目录项(可能为目录,此时仍然先删除目录项然后删除目录),然后删除该目(通过FileUtil.fullyDelete完成)。最终都是调用File的delete方法。
5.4 mkdirs
1 | public boolean mkdirs(Path f) throws IOException { |
如上会递归创建目录,创建一个目录时,若其父目录不存在则会先创建父目录。
用到了布尔表达式短路求值。(parent==null||mkdirs(parent))成功时才会继续(p2f.mkdir)||p2f.isDirectory)。
而在前半部分中,如果父目录为null则前半部分直接返回true,不需要创建父目录,接下来创建子目录;若父目录不为null则执行mkdir(parent)创建父目录。若创建失败,则前半部分返回false,父目录创建失败,后半部分不会执行。父目录创建成功则前半部分返回true,执行后半部分创建子目录。创建父目录通过mkdirs同样的会递归创建父目录的父目录,直到根目录。
后半部分中,通过File的mkdir创建子目录。创建成功则直接返回true,否则通过isDirectory判断子目录是否已经创建了,若已经创建则返回ture,否则子目录创建失败,返回false。
5.5 rename
1 | public boolean rename(Path src, Path dst) throws IOException { |
首先通过File的renameTo方法重命名,若失败则通过FileUtil拷贝数据到目的文件。
5.6 setOwner,setPermission
两者也是通过执行Shell命令设置相应的值1
public void setOwner(Path p, String username, String groupname) throws IOException
username为null时设置所在组,命令对应为chgrp
,否则设置所属用户,命令对应为chown
。
setPermission对应命令为chmod
。
5.7 其他方法
1 | public Path getHomeDirectory() { |
主目录为系统属性user.home
。1
2
3
4
5
6
7private Path makeAbsolute(Path f) {
if (f.isAbsolute()) {
return f;
} else {
return new Path(workingDir, f);
}
}
绝对目录父目录为当前工作目录1
2
3
4public void setWorkingDirectory(Path newDir) {
workingDir = makeAbsolute(newDir);
checkPath(workingDir);//检查workingDir是否为该文件系统中的路径,若不是会抛出运行时异常
}