Hadoop FileSystem实现---InMemoryFileSystem

Hadoop版本:Hadoop-1.2.1
参考:《Hadoop技术内幕-深入解析Hadoop Common和HDFS架构设计与实现原理》


InMemoryFileSystem是内存文件系统的实现,继承自ChecksumFileSystem,拥有校验和的功能,InMemoryFileSystem目前被标记为Deprecated,这里主要分析其底层文件系统RawInMemoryFileSyStem,是不带校验和的内存文件系统实现。

RawInMemoryFileSystem继承自FileSystem,该文件系统的uri以”ramfs://“开始

1
private static class RawInMemoryFileSystem extends FileSystem


1 成员,构造

1.1 成员

RawInMemoryFileSystem的成员如下:

1
2
3
4
5
6
7
8
private URI uri;
private long fsSize;
private volatile long totalUsed;
private Path staticWorkingDir;

private Map<String, FileAttributes> pathToFileAttribs = new HashMap<String, FileAttributes>();

private Map<String, FileAttributes> tempFileAttribs = new HashMap<String, FileAttributes>();

  • uri,工作目录uri
  • fsSize,文件系统最大大小,即所占内存大小
  • totalUsed,目前文件系统使用量
  • staticWorkingDir,静态工作目录,即当前工作目录
  • pathToFileAttribs,成功创建的文件,值为FileAttributes保存文件的内容
  • tempFileAttribs,保存已经分配了空间,但还没有写入数据或正在写入数据的文件,文件关闭时会被移到pathToFileAttribs中

1.2 FileAttributes

1
2
3
4
5
6
7
8
9
private static class FileAttributes {
private byte[] data;
private int size;

public FileAttributes(int size) {
this.size = size;
this.data = new byte[size];
}
}

如上,data用于保存该文件的数据,size为文件的大小。

1.3 构造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public RawInMemoryFileSystem() { setConf(new Configuration()); }
public RawInMemoryFileSystem(URI uri, Configuration conf) { initialize(uri, conf); }
public void initialize(URI uri, Configuration conf) {
setConf(conf);
int size = Integer.parseInt(conf.get("fs.inmemory.size.mb", "100"));
this.fsSize = size * 1024L * 1024L;//内存文件系统大小为fs.inmemory.size.mb,默认100MB
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());//初始化uri
String path = this.uri.getPath();
if (path.length() == 0) {
path = Path.CUR_DIR;
}
this.staticWorkingDir = new Path(path);
LOG.info("Initialized InMemoryFileSystem: " + uri.toString() + " of size (in bytes): " + fsSize);
}

如上,根据传入的uri初始化成员uri和staticWorkingDir,从配置fs.inmemory.size.mb读取内存文件系统的大小为100MB。


2 输入流

RawInMemoryFileSystem的输入流为InMemoryInputStream,继承自FSInputStream。

1
private class InMemoryInputStream extends FSInputStream {

2.1 成员,构造

1
2
private DataInputBuffer din = new DataInputBuffer();
private FileAttributes fAttr;

使用DataInputBuffer来进行数据的读取,对应的文件为fAttr。
构造:

1
2
3
4
5
6
7
8
9
public InMemoryInputStream(Path f) throws IOException {
synchronized (RawInMemoryFileSystem.this) {
fAttr = pathToFileAttribs.get(getPath(f));
if (fAttr == null) {
throw new FileNotFoundException("File " + f + " does not exist");
}
din.reset(fAttr.data, 0, fAttr.size);
}
}

如上,从pathToFileAttribs中获取指定路径的文件对象FileAttributes,不存在时抛出异常,存在时使用FileAttributes中的数据重置DataInputBuffer,即文件数据目前缓存在din中了。

2.2 Seekable接口实现

1
2
3
public long getPos() throws IOException {
return din.getPosition();
}

因为文件的数据都缓存在din中,因此当前位置通过DataInputBuffer的getPosition获取即可。

1
2
3
4
public void seek(long pos) throws IOException {
if ((int)pos > fAttr.size) throw new IOException("Cannot seek after EOF");
din.reset(fAttr.data, (int)pos, fAttr.size - (int)pos);
}

seek方法,直接重新缓存文件从pos开始到文件尾数据,通过DataInputBuffer的reset方法

1
2
3
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}

内存文件系统不支持定位到其他源,没有备份

2.3 PositionedReadable接口实现

继承自FSInputStream的相应read和readFully实现,即依赖于seek和read方法循环读取。

2.4 流的关闭close

没有重写,继承自FSInputStream的close方法,即最终继承自InputStream的close方法,没有实现。其实不需要关闭输入流,因为文件数据就保存在内存中,在文件系统关闭时将相应的pathToFileAttribs清理就行。

2.5 其他方法

1
2
3
public int available() throws IOException {
return din.available();
}

可用数据即为缓冲区中可用数据

1
public boolean markSupport() { return false; }

不支持mark和reset

1
2
3
4
5
6
7
8
9
10
public int read() throws IOException {
return din.read();
}

public int read(byte[] b, int off, int len) throws IOException {
return din.read(b, off, len);
}

public long skip(long n) throws IOException { return din.skip(n); }
}

read和skip方法都依赖于DataInputBuffer,因为文件数据直接缓存在DataInputBuffer中


3 输出流

RawInMemoryFileSystem的输出流为InMemoryOutputStream,继承自OutputStream

1
private class InMemoryOutputStream extends OutputStream

3.1 成员,构造

1
2
3
private int count;
private FileAttributes fAttr;
private Path f;
  • count,输出流位置
  • fAttr,输出流对应的文件
  • f,输出流对应文件的路径

构造

1
2
3
4
public InMemoryOutputStream(Path f, FileAttributes fAttr) throws IOException {
this.fAttr = fAttr;
this.f = f;
}

3.2 数据写

写一个字节

1
2
3
4
5
6
7
8
public void write(int b) throws IOException {
int newcount = count + 1;
if (newcount > fAttr.size) {
throw new IOException("Insufficient space");
}
fAttr.data[count] = (byte)b;
count = newcount;
}

如上,写的数据保存在FileAttributes的datas中,即为文件数据,增加count值
写指定字节

1
2
3
4
5
6
7
8
9
public void write(byte[] b, int off, int len) throws IOException {
...//参数检查
int newcount = count + len;//文件新的长度
if (newcount > fAttr.size) {//文件新的长度超出文件大小,抛出异常
throw new IOException("Insufficient space");
}
System.arraycopy(b, off, fAttr.data, count, len);//从b中拷贝数据到文件的datas成员中
count = newcount;
}

如上,不允许超过文件指定的长度,超出时没有内存来保存超出的数据,因此此时抛出异常,正常情况下将数据拷贝到文件FileAttributes的datas成员中。

3.3 流的关闭close

1
2
3
4
5
public void close() throws IOException {
synchronized (RawInMemoryFileSystem.this) {
pathToFileAttribs.put(getPath(f), fAttr);
}
}

如上,关闭时需要将文件名及对应的FileAttributes映射放到pathToFileAttribs中。

3.4 其他方法

1
2
3
public long getPos() throws IOException {
return count;
}

4 文件状态

RawInMemoryFileSystem的文件状态对应为InMemoryFileStatus

1
2
3
4
5
private class InMemoryFileStatus extends FileStatus {
InMemoryFileStatus(Path f, FileAttributes attr) throws IOException {
super(attr.size, false, 1, getDefaultBlockSize(), 0, f);
}
}

如上,FileAttributes的大小即为文件大小,内存文件系统不支持目录,因此所有的文件状态的第二个参数isDir为false,内存文件系统没有备份,因此所有文件状态的第三个参数block_replication为1,内存文件系统没有记录修改时间等元信息,因此所有文件状态的第五个参数modification_time为0。
所以,内存文件系统的文件状态主要有用信息为文件大小和文件路径。


5 RawInMemoryFileSystem操作实现

5.1 open,create,append

1
2
3
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return new FSDataInputStream(new InMemoryInputStream(f));
}

创建内存文件系统对应的输入流对象,封装成文件系统统一的输入流对象FSDataInputStream,因为InMemoryInputStream没有实现HasFileDescriptor接口,也不是FileInputStream的子类,在最后封装的FSDataInputStream的getFileDescriptor方法将返回null,即没有文件描述符。

1
2
3
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
throw new IOException("Not supported");
}

内存文件系统不支持追加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize,
short replication, long blockSize, Progressable progress)throws IOException
{

synchronized (this) {
if (exists(f) && !overwrite) {//存在(在pathTOFileAttribs中)且不允许覆盖,抛出异常
throw new IOException("File already exists:"+f);
}
//如果在tempFileAttribs中了(创建前预留空间),从中移除返回对应的文件对象
FileAttributes fAttr = tempFileAttribs.remove(getPath(f));
if (fAttr != null)
return create(f, fAttr);
return null;//如果没有预留该文件的空间,不能创建,创建前应该先预留空间
}
}
public FSDataOutputStream create(Path f, FileAttributes fAttr) throws IOException {
return new FSDataOutputStream(new InMemoryOutputStream(f, fAttr), statistics);
}

如上,创建文件之前需要先分配文件对应的空间,然后将FileAttributes对象添加到tempFileAttributes中,在这之后才能创建文件。即从tempFileAttributes中取出文件对应的FileAttributes对象,创建相应的InMemoryOutputStream封装成FSDataOutputStream。
预留空间通过reserveSpace完成,为路径f预留大小为size的空间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public boolean reserveSpace(Path f, long size) {
synchronized (this) {
if (!canFitInMemory(size))
return false;
FileAttributes fileAttr;
try {
fileAttr = new FileAttributes((int)size);
} catch (OutOfMemoryError o) {
return false;
}
totalUsed += size;
tempFileAttribs.put(getPath(f), fileAttr);
return true;
}
}
private boolean canFitInMemory(long size) {
if ((size <= Integer.MAX_VALUE) && ((size + totalUsed) < fsSize)) {
return true;
}
return false;
}

如上,文件系统新增大小size的文件后,不能超出文件系统的大小,否则直接返回false。创建对应的FileAttributes对象,然后放到tempFileAttributes中,等待写数据。
而与之相反的操作为unreserveSpace,从tempFileAttributes中删除对应的文件记录,然后减少文件系统相应的统计量,如下。

1
2
3
4
5
6
7
8
9
public void unreserveSpace(Path f) {
synchronized (this) {
FileAttributes fAttr = tempFileAttribs.remove(getPath(f));
if (fAttr != null) {
fAttr.data = null;
totalUsed -= fAttr.size;
}
}
}

5.2 文件删除delete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Deprecated
public boolean delete(Path f) throws IOException {
return delete(f, true);
}

public boolean delete(Path f, boolean recursive) throws IOException {
synchronized (this) {
FileAttributes fAttr = pathToFileAttribs.remove(getPath(f));
if (fAttr != null) {
fAttr.data = null;
totalUsed -= fAttr.size;
return true;
}
return false;
}
}

内存文件系统不支持递归删除,因为没有目录,参数recursive无效。从pathToFileAttribs中移除,如果对应的文件存在相应减少文件系统的相关统计量。

5.3 文件重命名rename

1
2
3
4
5
6
7
8
9
10
11
public boolean rename(Path src, Path dst) throws IOException {
synchronized (this) {
if (exists(dst)) {
throw new IOException ("Path " + dst + " already exists");
}
FileAttributes fAttr = pathToFileAttribs.remove(getPath(src));
if (fAttr == null) return false;
pathToFileAttribs.put(getPath(dst), fAttr);
return true;
}
}

重命名时不允许覆盖,如果目的路径存在直接抛出异常。
否则,从pathToFileAttribs中移除源文件,如果源文件不存在,不能重命名返回false。否则将源文件的FileAttributes对象作为新文件的值重新放入pathToFileAttribs中。

5.4 文件状态

1
2
3
4
5
6
7
8
9
public FileStatus getFileStatus(Path f) throws IOException {
synchronized (this) {
FileAttributes attr = pathToFileAttribs.get(getPath(f));
if (attr==null) {
throw new FileNotFoundException("File " + f + " does not exist.");
}
return new InMemoryFileStatus(f.makeQualified(this), attr);
}
}

如上,单个文件的文件状态获取文件对应的FileAttributes后构建文件状态对象即可。

1
2
3
public FileStatus[] listStatus(Path f) throws IOException {
return null;
}

而对于listStatus这种可能为目录级别的文件状态不支持,直接返回null。

5.5 其他操作

5.5.1 文件系统中所有文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public Path[] getFiles(PathFilter filter) {
synchronized (this) {
List<String> closedFilesList = new ArrayList<String>();
synchronized (pathToFileAttribs) {
Set paths = pathToFileAttribs.keySet();
if (paths == null || paths.isEmpty()) {
return new Path[0];
}
Iterator iter = paths.iterator();
while (iter.hasNext()) {//所有已关闭的文件
String f = (String)iter.next();
if (filter.accept(new Path(f))) {
closedFilesList.add(f);
}
}
}
String [] names = closedFilesList.toArray(new String[closedFilesList.size()]);
Path [] results = new Path[names.length];
for (int i = 0; i < names.length; i++) {
results[i] = new Path(names[i]);
}
return results;
}
}
如上,获取文件系统所有路径(所有已关闭文件,不包括创建了还在写或在等待写的文件),从pathToFileAttribs中获取每一个记录,经路径过滤器filter过滤后返回。

5.5.2 文件系统使用比例

1
2
3
4
5
public float getPercentUsed() {
if (fsSize > 0)
return (float)totalUsed/fsSize;
else return 0.1f;
}

5.5.3 当前工作目录

1
2
3
public Path getWorkingDirectory() {
return staticWorkingDir;
}

5.5.4 文件系统中的文件数目

1
2
3
public int getNumFiles(PathFilter filter) {
return getFiles(filter).length;
}

5.5.5 文件系统大小

1
2
3
public long getFSSize() {
return fsSize;
}

6 RawInMemoryFileSystem关闭close

1
2
3
4
5
6
7
8
9
10
11
12
13
public void close() throws IOException {
super.close();
synchronized (this) {
if (pathToFileAttribs != null) {
pathToFileAttribs.clear();
}
pathToFileAttribs = null;
if (tempFileAttribs != null) {
tempFileAttribs.clear();
}
tempFileAttribs = null;
}
}

如上调用父类FileSystem的close方法,删除deleteOnExit的文件,如果文件系统是通过FileSystem的get方法生成的且注册在CACHE中,还要从CACHE中移除。
之后清除pathToFileAttribs和tempFileAttributes中的文件。


7 InMemoryFileSystem相关

InMemoryFileSystem已经声明为Deprecated,继承自ChecksumFileSystem,这里简单分析

1
2
@Deprecated
public class InMemoryFileSystem extends ChecksumFileSystem {

7.1 构造

1
2
public InMemoryFileSystem() { super(new RawInMemoryFileSystem()); }
public InMemoryFileSystem(URI uri, Configuration conf) { super(new RawInMemoryFileSystem(uri, conf)); }

如上,底层文件系统为RawInMemoryFileSystem,继承自CheckSumFileSystem,使用继承的相应的读写方法在读写时完成相应的校验操作。

7.2 其他方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public boolean reserveSpaceWithCheckSum(Path f, long size) {
RawInMemoryFileSystem mfs = (RawInMemoryFileSystem)getRawFileSystem();
synchronized(mfs) {
boolean b = mfs.reserveSpace(f, size);
if (b) {
long checksumSize = getChecksumFileLength(f, size);
b = mfs.reserveSpace(getChecksumFile(f), checksumSize);
if (!b) {
mfs.unreserveSpace(f);
}
}
return b;
}
}

因为底层文件系统的预留空间方法只在文件系统中预留了源文件的空间,因此对于InMemoryFileSystem来说同时要预留校验文件的空间。

1
2
3
4
5
6
7
public Path[] getFiles(PathFilter filter) { return ((RawInMemoryFileSystem)getRawFileSystem()).getFiles(filter); }

public int getNumFiles(PathFilter filter) { return ((RawInMemoryFileSystem)getRawFileSystem()).getNumFiles(filter); }

public long getFSSize() { return ((RawInMemoryFileSystem)getRawFileSystem()).getFSSize(); }

public float getPercentUsed() { return ((RawInMemoryFileSystem)getRawFileSystem()).getPercentUsed(); }

都使用底层文件系统的相应方法实现。