RaidNode源码阅读---RaidNode编码过程下篇编码操作(LocalRaidNode)

Hadoop版本:hadoop-20-master


LocalRaidNode编码操作通过raidFiles完成,函数原型如下:

1
2
3
4
5
6
@Override
void raidFiles(PolicyInfo info, List<FileStatus> paths) throws IOException {
//分割paths,创建EncodingCandidate列表
List<EncodingCandidate> lec = RaidNode.splitPaths(conf, Codec.getCodec(info.getCodecId()), paths);
doRaid(conf, info, lec);
}

传入参数中info为policy文件中的一个policy,paths为该policy下可以Raid的路径列表(这里可能为文件或叶子目录),具体见上篇Raid文件的选取.
其中,splitPaths根据配置参数,将待Raid的文件列表转换成EncodingCandidate列表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
for (FileStatus s : paths) {
if (codec.isDirRaid != s.isDir()) {
continue;
}
long numBlocks = 0L;
if (codec.isDirRaid) {
List<FileStatus> lfs = RaidNode.listDirectoryRaidFileStatus(
conf, srcFs, s.getPath());
if (lfs == null) {
continue;
}
for (FileStatus stat : lfs) {
numBlocks += RaidNode.numBlocks(stat);
}
} else {
numBlocks = RaidNode.numBlocks(s);
}
long numStripes = RaidNode.numStripes(numBlocks, codec.stripeLength);
String encodingId = System.currentTimeMillis() + "." + rand.nextLong();
for (long startStripe = 0; startStripe < numStripes;
startStripe += encodingUnit) {
lec.add(new EncodingCandidate(s, startStripe, encodingId, encodingUnit,
s.getModificationTime()));
}
}

对于路径和codec不匹配的,不予处理.然后统计每个path的区块数(path可能为文件或目录).
对于目录path,通过listDirectoryRaidFileStatus统计目录下所有的Blocks,简单的对目录下所有文件(目录下的目录不处理)对应的区块相加.
不过对于小于hdfs.raid.min.filesize(默认10KB)的文件不进行统计.
而对于文件path,则直接是该文件对应的Block数目.

有了path的Block数目,根据codec的stripeLength,便能得到该path对应的stripe数目numStripes.
然后,创建numStripes/encodingUnitEncodingCandidate对象.这里encodingUnit为配置项hdfs.raid.stripe.encoding默认为1.即一个EncodingCandidate对应一个stripe,例如对于(10,4)rs码来说,默认情况下,一个EncodingCandidate负责编码10个区块.

从上面代码可以看出,对于待Riad列表来说,一个path对应一个encodingId,ID号为当前时间加上一个随机数,不管path下能够对应几个stripe.
如果path(文件或目录)对应Block数目过多,需多个EncodingCandidate进行处理,则这几个EncodingCandidate共享这一个encodingId,以进行辨识.
另外,从上代码可以看出,不管path下面的Block数目怎么少(最小2个Block),它至少对应一个EncodingCandidate.如对于(10,4)rs码,如果一个path的Block只有4个,它也单独需要一个EncodingCandidate进行处理,剩下的6个Block在后面会看到将用0Block进行填充.

这主要说明了文件Riad和目录Raid的含义,文件Raid编码独立单元即为文件,即使它再小也不能和其他小文件进行拼接一起编码.若文件对应Block很多,则分别由不同的EncodingCandidate进行处理,一个EncodingCandidate可以处理的Block数依赖于stripeLength和encodingUnit,一个文件的
所有的EncodingCandidate共享一个encodingId.
目录Raid编码独立单元为目录,目录下面Block数目再小也不能和其他目录进行拼接一起编码,一个目录的所有EncodingCandidate共享一个encodingId.

接着,由doRaid(conf, info, lec);对EncodingCandidate列表进行Raid操作.

1
2
3
4
5
6
7
8
9
for (EncodingCandidate ec : paths) {//对每一个EncodingCandidate依次编码
doRaid(conf, ec, destPref, codec, statistics,
RaidUtils.NULL_PROGRESSABLE, doSimulate,
targetRepl, metaRepl);
if (count % 1000 == 0) {
LOG.info("RAID statistics " + statistics.toString());
}
count++;
}

对每一个EncodingCandidate一次执行doRaid(conf, ec, destPref, codec, statistics,RaidUtils.NULL_PROGRESSABLE, doSimulate,targetRepl, metaRepl);.
这里destPref为配置的codec的校验文件存放路径parity_dir,dosimulate为policy中的属性simulate,默认为false.
targetRepl为policy属性targetReplication,metaRepl为policy属性metaReplication.

然后,在函数中,分别对目录Raid和文件Raid两种情况调用不同子函数:

1
2
3
4
5
6
7
if (codec.isDirRaid) {//不同分支
result = doDirRaid(conf, ec, destPath, codec, statistics, reporter,
doSimulate, targetRepl, metaRepl);
} else {
result = doFileRaid(conf, ec, destPath, codec, statistics, reporter,
doSimulate, targetRepl, metaRepl);
}

两个函数将ec管理的文件或目录下的所有文件编码至destPath中,输出路径为destPath/srcPath,并根据doSimulate更新源区块的备份数.

1. doFileRaid

在doFileRaid中,首先检查path的区块是否大于2,这个之前其实已经检查过了:

1
2
3
4
5
BlockLocation[] locations = srcFs.getFileBlockLocations(stat, 0, stat.getLen());
// if the file has fewer than 2 blocks, then nothing to do
if (locations.length <= 2) {
return LOGRESULTS.NOACTION;
}

如果区块数满足要求,则生成校验文件,并根据doSimulate决定是否更正源文件的备份数,其中还有统计信息的更新,这里不分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//子函数
boolean parityGenerated = generateParityFile(conf, ec, targetRepl, reporter,
srcFs, destPath, codec, locations.length, stat.getReplication(),
metaRepl, stat.getBlockSize(), null);
if (!parityGenerated) {
return LOGRESULTS.NOACTION;
}
if (!doSimulate) {
if (srcFs.setReplication(p, (short)targetRepl) == false) {
LOG.info("Error in reducing replication of " + p + " to " + targetRepl);
statistics.remainingSize += diskSpace;
return LOGRESULTS.FAILURE;
};
}

2. doDirRaid

doDirRaid同样的首先计算目录path下的区块数,判断是否大于2,不过因为是目录,需要统计目录下所有文件的区块总和:

1
2
3
4
5
6
7
8
9
10
List<FileStatus> lfs = RaidNode.listDirectoryRaidFileStatus(conf, srcFs, p);
if (lfs == null) {
return LOGRESULTS.NOACTION;
}
// add up the total number of blocks in the directory
long blockNum = DirectoryStripeReader.getBlockNum(lfs);
// if the directory has fewer than 2 blocks, then nothing to do
if (blockNum <= 2) {
return LOGRESULTS.NOACTION;
}

同样的区块数满足要求,生成校验文件,并根据doSimulate是否更新目录下所有源文件的备份数,更新统计信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
try {
parityGenerated = generateParityFile(conf, ec, targetRepl, reporter, srcFs,
destPath, codec, blockNum, srcRepl, metaRepl, parityBlockSize, lfs);
} catch (InterruptedException e) {
throw new IOException (e);
}
if (!parityGenerated)
return LOGRESULTS.NOACTION;
if (!doSimulate) {
for (FileStatus fsStat: lfs) {
if (srcFs.setReplication(fsStat.getPath(), (short)targetRepl) == false) {
LOG.info("Error in reducing replication of " +
fsStat.getPath() + " to " + targetRepl);
statistics.remainingSize += diskSpace;
return LOGRESULTS.FAILURE;
}
};
}

由上可见,generateParityFile文件Raid和目录Raid共用,区别在于最后一个参数,文件Raid为null,而目录Raid为目录下的文件列表.
其中locations为原path(文件或目录)所有的Blocks,而不是参数ec对应的Blocks.

3. generateParityFile

首先,获取编码输出路径,generateParityFile的destPath输入参数为codec配置的parity_dir参数,而对于一个EncodingCandidate来说,它的输出路径通过getOriginalParityFile获得:

1
2
3
FileStatus stat = ec.srcStat;
Path inpath = stat.getPath();
Path outpath = getOriginalParityFile(destPathPrefix, inpath);//配置的校验路径+源路径为最终输出路径

inpath为null或其父目录为null,则输出路径直接为parity_dir,否则输出路径为parity_dir/inpath,这个从编码测试结果可以验证.

然后,要判断是否已经编码过了,即输出路径是否存在校验文件,且源文件备份数已做出相应修改:

1
2
3
4
5
6
7
8
9
//是否已编码过
FileStatus stmp = outFs.getFileStatus(outpath);
if (stmp.getModificationTime() == stat.getModificationTime() &&
srcRepl == targetRepl) {
LOG.info("Parity file for " + inpath + "(" + blockNum +
") is " + outpath + " already upto-date and " +
"file is at target replication . Nothing more to do.");
return false;
}

没有编码过才能继续进行,创建Encoder对象,并验证ChecksumStore和StripeStore是否符合要求:

1
2
Encoder encoder = new Encoder(conf, codec);
encoder.verifyStore();//创建相应的ChecksumStore和StripeStore

Encoder构造见后面.编码器构造完成后,根据目录Raid或文件Raid执行不同流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
if (codec.isDirRaid) {
long numStripes = (blockNum % codec.stripeLength == 0) ?
(blockNum / codec.stripeLength) :
((blockNum / codec.stripeLength) + 1);
//目录Raid创建DirectoryStripeReader,执行编码
sReader = new DirectoryStripeReader(conf, codec, inFs,
ec.startStripe, ec.encodingUnit, inpath, lfs);
parityGenerated = encoder.encodeFile(conf, inFs, outFs, outpath,
(short)metaRepl, numStripes, blockSize, reporter, sReader, ec);
} else {
FileStatus srcStat = inFs.getFileStatus(inpath);
long srcSize = srcStat.getLen();
long numBlocks = (srcSize % blockSize == 0) ?
(srcSize / blockSize) :
((srcSize / blockSize) + 1);
long numStripes = (numBlocks % codec.stripeLength == 0) ?
(numBlocks / codec.stripeLength) :
((numBlocks / codec.stripeLength) + 1);
//文件Raid创建FileStripeReader执行编码
sReader = new FileStripeReader(conf, blockSize,
codec, inFs, ec.startStripe, ec.encodingUnit, inpath, srcSize);
parityGenerated = encoder.encodeFile(conf, inFs, outFs, outpath,
(short)metaRepl, numStripes, blockSize, reporter, sReader, ec);
}

两者都先计算该EncodingCandidate对应原path的Blocks数目,用于在子函数中判断是否完成该path的所有Raid操作.
然后对于目录Raid创建DirectoryStripeReader,文件Raid创建FileStripeReader,一个EncodingCandidate对应一个StripeReader.
最后通过刚创建的Encoder进行编码.

4. 文件Raid创建FileStripeReader

1
2
3
4
5
6
7
8
9
10
11
super(conf, codec, fs, stripeStartIdx);
this.blockSize = blockSize;
this.fs = fs;
this.srcFile = srcFile;
this.srcSize = srcSize;
long numBlocks = (long) Math.ceil(srcSize * 1.0 / blockSize);
long totalStripe = RaidNode.numStripes(numBlocks, codec.stripeLength);
if (encodingUnit < 0) {
encodingUnit = totalStripe - stripeStartIdx;
}
stripeEndIdx = Math.min(totalStripe, stripeStartIdx + encodingUnit);

调用父类构造函数主要初始化开始stripeID stripeStartIdx,和当前stripeID currentStripeIdx.

1
2
3
4
5
6
this.codec = codec;
this.conf = conf;
this.fs = fs;
this.stripeStartIdx = stripeStartIdx;
this.currentStripeIdx = stripeStartIdx;
this.bufferSize = conf.getInt("io.file.buffer.size", 64 * 1024);

计算了文件的区块数目,根据开始stripeID,编码单元encodingUnit和区块数目计算了该StripeReader对应的结束stripeID stripeEndIdx.

5. 目录Raid创建DirectoryStripeReader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
super(conf, codec, fs, stripeStartIdx);
if (lfs == null) {
throw new IOException("Couldn't get files under directory " + srcDir);
}
this.parityBlockSize = getParityBlockSize(conf, lfs);
this.srcDir = srcDir;
this.lfs = lfs;
this.stripeBlocks = new ArrayList<BlockInfo>();
long blockNum = 0L;
for (int fid = 0; fid < lfs.size(); fid++) {
FileStatus fsStat = lfs.get(fid);
long numBlock = RaidNode.getNumBlocks(fsStat);
blockNum += numBlock;
for (int bid = 0; bid < numBlock; bid++) {
stripeBlocks.add(new BlockInfo(fid, bid));
}
}
this.numBlocks = blockNum;
long totalStripe = RaidNode.numStripes(blockNum, codec.stripeLength);
if (stripeStartIdx >= totalStripe) {
throw new IOException("stripe start idx " + stripeStartIdx +
" is equal or larger than total stripe number " + totalStripe);
}
if (encodingUnit < 0) {
encodingUnit = totalStripe;
}
stripeEndIdx = Math.min(totalStripe, stripeStartIdx + encodingUnit);

同样的先跟据传入的参数调用父类构造函数初始化stripeStartIdxcurrentStripeIdx.
目录Raid下可能有多个文件,与文件Raid不同,需要遍历目录下所有文件,取最大的Block大小作为校验文件的Block大小.
相比较FileStripeReader,维护了目录下所有文件状态列表lfs,并记录了目录下所有的文件区块和文件索引对应的关系列表stripeBlocks.
stripeBlocks中元素数量为目录下所有区块数量,用于在之后构建stripe的输入流时寻找对应的区块.每个元素为BlockInfo类型的对象,维护了一个区块和其所属文件的对应关系.例如,目录下有两个文件,每个文件5个Block,则stripeBlocks中维护了如下的映射关系:
(0,0),(0,1),(0,2),(0,3),(0,4),(1,0),(1,1),(1,2),(1,3),(1,4)
最后当然也计算stripeEndIdx.

6. encodeFile

Encoder的encodeFile也是由目录Raid和文件Raid共用,完成一个StripeReader(EncodingCandidate)的编码过程.
以下为该方法的注释:

1
2
3
4
5
/**
* The interface to use to generate a parity file.
* This method can be called multiple times with the same Encoder object,
* thus allowing reuse of the buffers allocated by the Encoder object.
*

意思是说,对于一个独立的编码单元(文件Raid的文件或目录Raid的一个目录,这里记为一个路径path)来说,如果它对应多个EncodingCandidate,需多次执行encodeFile,该path就存在三个Raid状态:

1
2
3
4
5
6
7
8
9
/**
* To support retriable encoding, we use three checkpoints to represent
* the last success state.
* 1. isEncoded: Set to true when partial partiy is generated
* 2. isRenamed: Set to true when all partial parity are generated and
* tmpPartialParityDir is moved to finalPartialParityDir
* 3. isConcated: Set to true when partial parities are concatenated into
* a final parity.
*/

记录Raid过程中的状态,这样允许在某一个EncodingCandidate编码失败时进行重试.

函数中存在以下局部变量:

1
2
3
long expectedParityFileSize = numStripes * blockSize * codec.parityLength;//该路径所期望的校验文件大小
long expectedPartialParityBlocks = (sReader.stripeEndIdx - sReader.stripeStartIdx) * codec.parityLength;
long expectedPartialParityFileSize = expectedPartialParityBlocks * blockSize;//该EncodingCandidate局部校验文件大小

其中expectedParityFileSize记录path所期望的校验文件大小,expectedPartialParityBlocks为当前EncodingCandidate所期望的区块数目,称为局部校验区块数,expectedPartialParityFileSize为当前EncodingCandidate期望的校验文件大小,为局部校验文件.
说明:
对于文件或目录来说,其所属的Block可能需要多个EncodingCandidate(StripeReader)进行处理,因此才有上面的变量.第一个变量用于判断是否完成了所有的编码,而后面的两个分别对应该次EncodingCandidate编码所应该产生的区块大小和文件大小.
另外,之后所说的path都对应一个文件或叶子目录,即需要编码的单元.

编码过程有以下临时目录:

1
2
3
4
5
6
7
8
String jobID = RaidNode.getJobID(jobConf);
Path tmpDir = new Path(codec.tmpParityDirectory, jobID);
String partialParityName = "partial_" + MD5Hash.digest(srcFile.toUri().getPath()) +
"_" + ec.srcStat.getModificationTime() + "_" + ec.encodingUnit + "_" +
ec.encodingId;
Path partialParityDir = new Path(tmpDir, partialParityName);
Path tmpPartialParityDir = new Path(partialParityDir, "tmp");
Path finalPartialParityDir = new Path(partialParityDir, "final");

jobID通过mapred.job.id设置,若不存在,则为localRaid+date(当前时间,ms).
因此,对于一个path来说,其临时编码路径为:/tmp/${parity_dir}/jobID.
临时路径下存在一个局部校验路径:partialParityDir,而该目录下又存在tmpfinal两个目录.

当前EncodingCandidate还没编码完成时,在tmp目录下创建属于该EncodingCandidate的临时文件,进行局部编码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if (!ec.isConcated) {
if (!ec.isEncoded) {
if (!parityFs.mkdirs(tmpPartialParityDir)) {
throw new IOException("Could not create " + tmpPartialParityDir);
}
Path partialTmpParity = new Path(tmpPartialParityDir,
Long.toString(sReader.getCurrentStripeIdx()));
LOG.info("Encoding partial parity " + partialTmpParity);
//临时校验文件编码
if (!encodeTmpParityFile(jobConf, sReader, dfs,
partialTmpParity, parityFile, tmpRepl, blockSize,
expectedPartialParityBlocks, expectedPartialParityFileSize,
reporter)) {
return false;
}
LOG.info("Encoded partial parity " + partialTmpParity);
}
...

一个EncodingCandidate对应的临时校验文件名与stripeID对应.
通过encodeTmpParityFile进行局部校验文件的编码操作,完成一个EncodingCandidate的编码过程.
输入参数partialTmpParity为该EncodingCandidate对应的临时校验文件,也即该函数的编码结果输出文件.
expectedPartialParityBlocks为parityLength*encodingUnit个Blocks.

局部校验文件编码成功,则当前EncodingCandidate状态isEncoded为true,然后等待当前EncodingCandidate所属文件或目录的其他EncodingCandidate局部编码完成,都编码完成后将tmp目录重命名为final目录,置isRenamed为true.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ec.isEncoded = true;
long expectedNum = (long) Math.ceil(numStripes * 1.0 / ec.encodingUnit);
if (!ec.isRenamed) {
//判断并等待其他局部编码完成
if (!finishAllPartialEncoding(parityFs, tmpPartialParityDir, expectedNum)) {
return false;
}
InjectionHandler.processEventIO(
InjectionEvent.RAID_ENCODING_FAILURE_RENAME_FILE);
// Move the directory to final
//所有局部编码完成,重命名,进入到Renamed状态
if (!dfs.rename(tmpPartialParityDir, finalPartialParityDir)) {
LOG.info("Fail to rename " + tmpPartialParityDir + " to " +
finalPartialParityDir);
return false;
}
LOG.info("Renamed " + tmpPartialParityDir + " to " +
finalPartialParityDir);
ec.isRenamed = true;
}

状态还不是isRenamed,即表示还没有完成所有的局部编码,通过finishAllPartialEncoding判断并等待其他局部编码完成.
该函数中最多判断raid.encoder.retry.count.partial.encoding(默认3)次,看是否其他的局部编码完成,最后一次休眠等待.

重命名后,说明所有的局部编码完成,此时tmp目录重命名为final,final目录下含有所需的所有局部校验文件.
然后需要判断final目录下的文件是否正确(文件名和encodingUnit,文件大小和期望大小),正确的话列出所有文件,然后将除第一个文件外的其他所有文件依次追加到第一个文件末尾(第一个文件startstripe最小),完成concate的过程,并置isConcated为true:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
FileStatus[] stats = parityFs.listStatus(finalPartialParityDir);
// Verify partial parities are correct
Vector<Path> partialPaths = getPartialPaths((int)ec.encodingUnit,
(int)expectedNum, stats, codec, numStripes);
finalTmpParity = partialPaths.get(0);
InjectionHandler.processEventIO(
InjectionEvent.RAID_ENCODING_FAILURE_CONCAT_FILE);
if (partialPaths.size() > 1) {
Path[] restPaths = partialPaths.subList(1,
partialPaths.size()).toArray(new Path[partialPaths.size() - 1]);
try {
// Concat requires source and target files are in the same directory
//连接其他局部校验文件到第一个局部校验文件尾,进入isConcated状态
dfs.concat(finalTmpParity, restPaths, true);
LOG.info("Concated " + partialPaths.size() + " files into " + finalTmpParity);

} catch (IOException ioe) {
// Maybe other tasks already finish concating.
LOG.info("Fail to concat " + partialPaths.size() +
" files into " + finalTmpParity, ioe);
throw ioe;
}
}
ec.isConcated = true;

至此,原path(一个文件或目录)编码全部完成,编码结果存在一个文件中,文件位于临时目录下.
接着便对最终的编码文件作合理性判断(区块数和大小),若符合要求,则重命名为最终的编码文件.
相应的,若支持StripeStore,则还需存储stripes

1
2
3
4
5
6
7
8
9
10
11
12
13
if (dfs.exists(parityFile)){
dfs.delete(parityFile, false);
}
dfs.mkdirs(parityFile.getParent());
//final目录下的已经连接的文件重命名为最终输出路径文件,完成编码
if (!dfs.rename(finalTmpParity, parityFile)) {
String msg = "Unable to rename file " + finalTmpParity + " to " + parityFile;
throw new IOException (msg);
}
if (stripeStore != null) {
//更新StripeStore并创建相应文件
this.writeToStripeStore(ec.srcStripes, dfs, fs, srcFile, parityFs, parityFile, expectedParityFileSize, reporter,finalTmpParity);
}

writeToStripeStore中通过CheckSumStore的putStripe方法,将Stripe信息存放至Encoder的ChecksumStore中并在指定的目录创建相应的文件以及内容.

7. encodeTmpParityFile

encodeTmpParityFile编码局部校验文件,并不直接使用传进来的局部校验文件作为编码输出文件,而是另外创建了一个临时文件:

1
2
3
4
5
6
7
String jobID = RaidNode.getJobID(jobConf);
Path tmpDir = new Path(codec.tmpParityDirectory, jobID);
if (!parityFs.mkdirs(tmpDir)) {
throw new IOException("Could not create tmp dir " + tmpDir);
}
Path parityTmp = new Path(tmpDir, parityFile.getName() + rand.nextLong());
FSDataOutputStream out = parityFs.create(parityTmp,true,conf.getInt("io.file.buffer.size", 64 * 1024),tmpRepl,blockSize);

可见临时文件的目录为:/tmp/${parity_dir}/jobID,文件名为局部校验文件名加一个随机数,以这个临时文件作为编码结果输出.

如果支持ChecksumStore,则需创建expectedPartialParityBlocks个CRC32对象,每一个Block对应一个,存储区块的校验和.

1
2
3
4
CRC32[] crcOuts = null;
if (checksumStore != null) {
crcOuts = new CRC32[(int)expectedPartialParityBlocks];
}

然后通过encodeFileToStream(sReader, blockSize, out, crcOuts, reporter);进行编码,编码结果写入out即临时文件中,支持校验和的话,同时计算parity数据校验和存入crcOuts中.

如果一切正常,支持校验和的话,将得到的校验和存入ChecksumStore中,并创建配置的目录下的相应文件:

1
2
3
4
if (checksumStore != null) {
//更新校验文件区块校验和,并在本地目录创建相应文件
this.writeToChecksumStore((DistributedFileSystem)parityFs, crcOuts, parityTmp, expectedPartialParityFileSize, reporter);
}

writeToChecksumStore中对每一个区块和对应校验和写到checksumstore中,putIfAbsentChecksum还会通过putIfAbsent建相应的文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
LocatedBlocks lbks = dfs.getLocatedBlocks(parityTmp, 0L,
expectedParityFileSize);
for (int i = 0; i < crcOuts.length; i++) {
this.checksumStore.putIfAbsentChecksum(lbks.get(i).getBlock(),
crcOuts[i].getValue());
reporter.progress();
}
@Override
public Long putIfAbsent(Block blk, Long newChecksum) throws IOException {
Long oldChecksum = store.putIfAbsent(blk.toString(), newChecksum);
if (oldChecksum == null) {
LOG.info("Put " + blk + " -> " + newChecksum);
File newFile = getFile(blk, newChecksum);
newFile.createNewFile();
}
return oldChecksum;
}

最后将临时文件重命名为局部校验文件,完成局部校验的编码过程.

1
2
3
4
if (!parityFs.rename(parityTmp, partialTmpParity)) {
LOG.warn("Fail to rename file " + parityTmp + " to " + partialTmpParity);
return false;
}

8. encodeFileToStream

encodeFileToStream完成一个StripeReader的编码,编码结果放在临时文件中(非上述tmp目录),根据需要计算校验文件的校验和,函数如下:

1
private void encodeFileToStream(StripeReader sReader,long blockSize, FSDataOutputStream out, CRC32[] crcOuts,Progressable reporter)

encodeFileToStream中,首先创建parityLength个输出流,需要将stripeLength个输入流编码成parityLength个输出流.parityLength个输出流中,只有一个是输入参数out,也就是上面的临时文件流,其他的都是临时文件:

1
2
3
4
5
6
7
8
9
OutputStream[] tmpOuts = new OutputStream[codec.parityLength];
// One parity block can be written directly to out, rest to local files.
tmpOuts[0] = out;//第一个输出流为最终输出流,其他为临时文件输出流
File[] tmpFiles = new File[codec.parityLength - 1];
for (int i = 0; i < codec.parityLength - 1; i++) {
tmpFiles[i] = File.createTempFile("parity", "_" + i);
LOG.info("Created tmp file " + tmpFiles[i]);
tmpFiles[i].deleteOnExit();
}

接着便是对该StripReader所管理的stripes(stripe数取决于encodingUnit,见上)进行编码,如果当前StripeReader还有stripe未处理,则通过StripeReader取出下一个未处理的stripe:

1
2
3
4
5
6
7
while (sReader.hasNext()) {//依次编码StripeReader的encodingUnit个stripe
reporter.progress();

StripeInputInfo stripeInputInfo = null;
InputStream[] blocks = null;
// Create input streams for blocks in the stripe.
long currentStripeIdx = sReader.getCurrentStripeIdx();

stripeInputInfo = sReader.getNextStripeInputs();//获取下一个stripe输入信息

一个StripeInputInfo包含了一个stripe的信息,成员如下:

1
2
3
private final InputStream[] inputs;
private final Path[] srcPaths;
private final long[] offSets;

即包括了stripeLength个输入流(文件Raid中为文件对应的流和ZeroInputStream,目录Raid中可能多个文件输入流和若干ZeroInputStream),每个输入流对应的源文件和在源文件中的偏移量.

getNextStripeInputs即根据StripeReader(文件或目录)返回下一个未Raid的StripeInputInfo:

1
2
3
4
5
public StripeInputInfo getNextStripeInputs() throws IOException {
StripeInputInfo stripeInputInfo = getStripeInputs(currentStripeIdx);
currentStripeIdx ++;
return stripeInputInfo;
}

getStripeInputs为抽象方法,对应FileStripeReader和DirectoryStripeReader有相应实现.

8.1 FileStripeReader获取下一个StripeInputInfo

对应于上面创建的FileStripeReader,即文件Raid.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//一个stripe的输入流,偏移量以及文件路径
long stripeStartOffset = stripeIdx * codec.stripeLength * blockSize;
for (int i = 0; i < codec.stripeLength; i++) {

long seekOffset = stripeStartOffset + i * blockSize;
if (seekOffset < srcSize) {

FSDataInputStream in = fs.open(srcFile, bufferSize);
in.seek(seekOffset);

LOG.info("Opening stream at " + srcFile + ":" + seekOffset);
blocks[i] = in;
srcPaths[i] = srcFile;
offsets[i] = seekOffset;
} else {
LOG.info("Using zeros at offset " + seekOffset);
// We have no src data at this offset.
blocks[i] = new RaidUtils.ZeroInputStream(
seekOffset + blockSize);
srcPaths[i] = null;

offsets[i] = -1;
}
}
return new StripeInputInfo(blocks, srcPaths, offsets);

stripeIdx为传入的currentStripeIdx,文件Raid中stripeLength个输入流对应的文件只有一个,可见根据currentStripeIdx,stripeLength和区块大小便能定位当前stripe文件中的开始偏移量,而一个stripe有stripeLength个Blocks,每个Block都能计算出一个开始偏移量,得到相应的流.
若文件剩余大小满足stripe需求,不需要填充0Block,则所有的输入流都为文件对应的流,srcPath也为文件,偏移量为相应的偏移量.
而如果需要填充0Block,则达到文件尾端后,剩余的输入流为ZeroInputStream,对应的srcPath为null,偏移量为-1.

8.2 DirectoryStripeReader获取下一个StripeInputInfo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//通过stripeBlocks获取区块所在文件
int startOffset = (int)stripeIdx * codec.stripeLength;
for (int i = 0; i < codec.stripeLength; i++) {
if (startOffset + i < this.stripeBlocks.size()) {
BlockInfo bi = this.stripeBlocks.get(startOffset + i);
FileStatus curFile = lfs.get(bi.fileIdx);
long seekOffset = bi.blockId * curFile.getBlockSize();
Path srcFile = curFile.getPath();
FSDataInputStream in = fs.open(srcFile, bufferSize);
in.seek(seekOffset);
LOG.info("Opening stream at " + srcFile + ":" + seekOffset);
blocks[i] = in;
srcPaths[i] = srcFile;
offsets[i] = seekOffset;
} else {
LOG.info("Using zeros at block " + i);
// We have no src data at this offset.
blocks[i] = new RaidUtils.ZeroInputStream(parityBlockSize);
srcPaths[i] = null;
offsets[i] = -1;
}
}
return new StripeInputInfo(blocks, srcPaths, offsets);

这里与FileStripeReader不同的是,目录Raid的一个stripe可能跨越多个该目录下的文件,因此需要定位每一个输入流所属的文件.
构造过程中已经看到,成员stripeBlocks记录了目录下所有的区块和其对应的文件信息.
stripeIdx为传入的currentStripeIdx,根据currentStripeIdx和stripeLength便可得到当前stripe在目录下的开始区块索引号startOffset.

有了stripe的开始区块索引号startOffset,便能得到stripe中第i个区块在目录下的区块索引号,若索引号没有超出stripeBlocks的大小(即存在于目录下),我们便能通过stripeBlocks和lfs成员得知该区块所属的文件,进而得知在文件中的偏移量,并构建输入流对象.
而若当前stripe的第i个区块超出了stripeBlocks的大小,即不存在于目录下,需要填充0 Block,对应的输入流为ZeroInputStream,偏移量为-1,文件路径为null,这与FileStreamReader一样.

输入流构造完成,接着便是局部编码的过程,在while循环中可以重试3次:

1
2
3
4
5
6
7
8
9
10
int retry = 3;
do {
redo = false;
retry --;
...
if (redo) {
// rebuild the inputs.
stripeInputInfo = sReader.getStripeInputs(currentStripeIdx);
}
} while (redo);

其中redo在编码过程出错时,catch中如果retry还大于0,则重置为true,否则retry小于等于0,抛出异常.

1
2
3
4
if (retry <= 0) {
throw e;
}
redo = true;

另外,在catch中发现区块丢失等异常情况,要构建CorruptBlockReconstructor和Decoder进行修复,具体修复在后面BlockIntegrityMonitor中分析.

那么,do while中间过程的具体的编码过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
try {
blocks = stripeInputInfo.getInputs();
CRC32[] curCRCOuts = new CRC32[codec.parityLength];

if (crcOuts != null) {
for (int i = 0; i < codec.parityLength; i++) {
crcOuts[finishedParityBlockIdx + i] = curCRCOuts[i]
= new CRC32();
}
}
// Create output streams to the temp files.
for (int i = 0; i < codec.parityLength - 1; i++) {
tmpOuts[i + 1] = new FileOutputStream(tmpFiles[i]);
}
// Call the implementation of encoding.
encodeStripe(blocks, blockSize, tmpOuts, curCRCOuts, reporter,true, errorLocations);
}

即首先获取输入流数组blocks,如果支持ChecksumStore的话,crcOuts为expectedPartialParityBlocks个CRC32数组.
然后对之前创建的临时输出文件构建输出流对象,最终通过encodeStripe进行编码.

9. encodeStripe

函数原型为:

1
2
void encodeStripe(InputStream[] blocks,long blockSize,OutputStream[] outs,CRC32[] crcOuts,Progressable reporter,
boolean computeSrcChecksum,List<Integer> errorLocations)

blocks为stripeLength个输入流数组,blockSize为输入流中一个Block的大小,outs对应parityLength个输出流;
computeSrccChecksum用于决定是否计算源文件区块的校验和.编码过程为true(通过encodeFileToStream调用),即对每一个源文件区块计算校验和更新至Encoder的ChecksumStore和校验和本地目录中;而对校验文件的修复为false(通过recoverParityBlockToStream),不重新计算源文件区块校验和.
errorLocations对应出错的位置,对应完成一个stripe的编码过程,将stripeLength个原Blocks编码成parityLength个校验Blocks.

首先配置Encoder的writeBufs(输出缓冲),每个缓冲大小为bufSize,通过raid.encoder.bufsize配置.
如果bufSize大于Block大小,则为Block大小.小于Block大小时,如果不能被Block大小整除,则配置为Block大小的1/256,最小1KB,因为在后面可以看到,每次取出bufSize,如果配置能被区块大小整除,则可以刚好取完,每次取出的长度一样:

1
configureBuffers(blockSize);

然后,对stripeLength个输入流构建一个ParallelStreamReader对象,并启动读取缓冲的线程(默认4个).

1
2
3
4
5
6
int boundedBufferCapacity = 1;
ParallelStreamReader parallelReader = new ParallelStreamReader(
reporter, blocks, bufSize,
parallelism, boundedBufferCapacity, blockSize, computeSrcChecksum,

outs);
parallelReader.start();

ParallelStreamReader启动多个线程(默认4)并行从stripeLength个输入流中读取数据,每次读取bufSize个字节的数据存入一个ReadResult对象中,将每次读取结果(一个ReadResult)放入阻塞队列中,等待从中读取进行编码.因此最小的编码单元为bufSize,详细ParallelStreamReader分析见下.

ParallelStreamReader构建完,并启动主线程后,开始编码过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
for (long encoded = 0; encoded < blockSize; encoded += bufSize) {
ParallelStreamReader.ReadResult readResult = null;
try {
readResult = parallelReader.getReadResult();//从阻塞队列中读取一个ReadResult局部缓冲
} catch (InterruptedException e) {
throw new IOException("Interrupted while waiting for read result");
}
// Cannot tolerate any IO errors.
IOException readEx = readResult.getException();
if (readEx != null) {
if (errorLocations != null) {
errorLocations.clear();
for (int idx : readResult.getErrorIdx()) {
errorLocations.add(idx);
}
}
throw readEx;
}

code.encodeBulk(readResult.readBufs, writeBufs);
reporter.progress();
// Assume each byte is independently encoded
int toWrite = (int)Math.min(blockSize - encoded, bufSize);

// Now that we have some data to write, send it to the temp files.
for (int i = 0; i < codec.parityLength; i++) {
outs[i].write(writeBufs[i], 0, toWrite);//编码结果写到输出流
if (crcOuts != null && crcOuts[i] != null) {//更新校验文件区块对应的校验和数据
crcOuts[i].update(writeBufs[i], 0, toWrite);
}
reporter.progress();
}
}

以上循环对应的一个Stripe的实际编码过程,从循环看出每次编码bufSize个字节,这是因为从输入流中读取的数据保存在ReadResult的成员属性readBufs中,它对应stripeLength个数组,每个数组存储bufSize个输入流的字节.

1
readResult = parallelReader.getReadResult();

从阻塞队列中取出一个ReadResult,可能阻塞等待ReadOperation线程读取完成.
获取一个ReadResult后,通过Encoder的code成员(擦除码实现,配置)的encodeBulk完成对bufSize个字节的编码.

1
code.encodeBulk(readResult.readBufs, writeBufs);

将stripeLength个长度为bufSize的源数据编码成parityLength个长度为bufSize的校验数据,写到Encoder的writeBufs成员中.
最后将writeBufs成员中数据分别写到输入流outs中,且如果支持ChecksumStore,将编码结果更新至parityLength个CRC32对象中计算相应校验和.

1
2
3
4
5
6
7
for (int i = 0; i < codec.parityLength; i++) {
outs[i].write(writeBufs[i], 0, toWrite);
if (crcOuts != null && crcOuts[i] != null) {
crcOuts[i].update(writeBufs[i], 0, toWrite);
}
reporter.progress();
}

这样,循环结束便完成了一个stripe的编码.

10. 函数调用链(从上到下为先后调用顺序):

  • raidFiles(info, filteredPaths);
    处理当前PolicyInfo info的所有可Raid路径filteredPaths.
  • doRaid(conf, info, lec);
    info的可Raid路径转换为EncodingCandidate列表lec,处理所有的EncodingCandidate.
  • doRaid(conf, ec, destPref, codec, statistics, RaidUtils.NULL_PROGRESSABLE, doSimulate, targetRepl, metaRepl);
    处理一个EncodingCandidate—ec.
    destPref为codec配置parity_dir.
    doSimulate为policy的配置属性simulate,默认false,false时编码完成将更改源文件备份数.
    targetRepl为编码后源文件备份.
    metaRepl为编码文件备份.
  • doFileRaid(conf, ec, destPath, codec, statistics, reporter, doSimulate, targetRepl, metaRepl);
    上面doRaid的文件Raid分支,对应目录Raid为doDirRaid,参数见上一个方法.
  • generateParityFile(conf, ec, targetRepl, reporter, srcFs, destPath, codec, locations.length, stat.getReplication(),metaRepl, stat.getBlockSize(), null);
    增加了更多的该EncodingCandidate对应的信息,用于判断是否完成path所有的编码.对应的还是一个EncodingCandidate的编码.
  • encoder.encodeFile(conf, inFs, outFs, outpath, (short)metaRepl, numStripes, blockSize, reporter, sReader, ec);
    这里输出路径outpath不再是配置项parity_dir,而是最终的输出文件路径:${parity_dir}/srcpath.
    sReader可能对应FileStreamReaderDirectoryStreamReader.
    该方法还是对应一个EncodingCandidate的编码,只不过基于原来EncodingCandidate的信息构建了StripeReader对象,用于获取源文件的输入流.
  • encodeTmpParityFile(jobConf, sReader, dfs, partialTmpParity, parityFile, tmpRepl, blockSize, expectedPartialParityBlocks, expectedPartialParityFileSize, reporter)
    对应一个StripeReader(即一个EncodingCandidate)的编码.
    partialTmpParity为该sReader输出路径,即一个局部校验文件,路径为:
    /tmp/${parity_dir}/${jobID}/${partialParityName}/tmp/${currentStripeID}.
    其中jobID可通过mapred.job.id配置,默认为localRaid+当前时间(date).
    partialParityNamepartial_ + MD5Hash.digest(srcFile.toUri().getPath()) +"_" + ec.srcStat.getModificationTime() + "_" + ec.encodingUnit + "_" +ec.encodingId
    即默认情况下,一个path对应一个局部临时校验目录,该目录下的tmp目录存储该path的所有局部校验文件,文件名为对应StripeReader的currentStripeID,校验文件数目为path对应的EncodingCandidate数目.
    当一个path的所有EncodingCandidate编码完成后,全部的局部校验文件位于tmp目录下,然后重命名为tmp目录的兄弟目录final,在final目录中将所有局部校验文件追加到第一个局部校验文件尾部,形成完整的校验文件,然后重命名为encodeFile的输入参数outpath,即校验文件输出路径,完成一个Path的Raid过程.

    举例:
    一个文件Raid的文件,采用(10,4)rs编码,若该文件有40个Blocks,且encodingUnit采用默认配置1.
    则一个EncodingCandidate以及一个StripeReader都只对应一个stripe即管理10个Block,因此该文件对应4个EncodingCandidate和StripeReader,
    tmp录下应该有4个局部校验文件,文件名分别为0~3,通过encodeTmpParityFile生成(编码时先输出到另一个临时文件,局部编码完成后重命名).
    当所有4个局部校验文件都产生后(encodeTmpParityFile一次调用产生一个文件,finishAllPartialEncoding方法中等待期望的校验文件都产生,
    见上分析),tmp目录重命名为final(状态为isRenamed).
    然后在final中将1~3局部校验文件依次追加到0中,追加后的文件则为最终的校验(状态变为isConcated),最终将final的唯一一个文件重命名为校验文件
    (${parity-dir}/srcpath),完成path的编码过程.
    而对应设置encodingUnit为2时,tmp目录下应该只有2个局部校验文件(此时一个EncodingCandidate和StripeReader管理两个stripe(20个Block),
    encodeTmpParityFile编码两个stripe的区块),文件名分别为0,2(不是1),按上述步骤经过重命名—追加—重命名完成编码过程.

  • encodeFileToStream(sReader, blockSize, out, crcOuts, reporter);
    通过从StripeReader中依次读取encodingUnit个stripe完成编码过程.
    out为另一个临时文件对应的输出流,位置
    /tmp/${parity_dir}/${jobID}/currentStripeID+random
    先将编码结果存在这一临时输出流中,sReader所有数据编码完成后,重命名为tmp目录下对应的currentStripeID文件,完成局部校验文件的生成.

  • encodeStripe(blocks, blockSize, tmpOuts, curCRCOuts, reporter, true, errorLocations);
    对应一个stripe的编码过程,即StripeReader将执行encodingUnit次.
    blocksstripe个区块对应的输入流.
    tmpOuts第一个输出流为encodeFileToStream构建的临时文件流,剩余的parityLength-1个输出流为临时文件(parity_i)输出流.
    函数通过构建ParallelStreamReader,创建多个线程并行读取数据到阻塞队列中,然后从阻塞队列取数据进行编码.
  • code.encodeBulk(readResult.readBufs, writeBufs);
    最终的最小编码单元,编码bufSize个字节的数据.
    对应codec配置的擦除码实现,上一级函数中通过ParallelStreamReader并行读取,每次读取bufSize个字节的数据存放在一个ReadResult中,该函数便是处理一个ReadResult的数据.编码结果存在Encoder的缓冲writeBufs中,每次bufSize个字节的数据编码完成,写到输出流tmpOuts中.

11. 一段话总结(以下说的路径可能为文件或目录)

TriggerMonitor执行完所有文件Raid操作后,默认每隔10s读取一次policy文件.如果policy文件自从上次重新加载后修改过,且当前时间从修改后过去了5s,则重新加载policy文件.

加载后,对每个policy,如果配置的路径需要重新读取进行Raid(条件见具体分析),则对与存在fileListPath元素的policy通过readFileList读取可Raid的路径列表,否则对于存在srcPath的policy,通过selectFiles读取可Raid的路径列表(是否可Raid见具体分析).
读取完可Raid的路径后,通过raidFiles对当前policy所有可Raid路径进行Raid.

Raid过程,对于每一个路径,首先根据路径的Blocks数,这里假设为nums,和codec的配置以及encodingUnit的配置创建n个EncodingCandidate.
n=ceil(nums/stripeLength/encodingUnit)
即一个路径最少对应一个EncodingCandidate,只要其Blocks数大于2,一个路径拥有一个encodingID,所有其拥有的EncodingCandidate共享.

接着对于路径的每一个EncodingCandidate,执行编码操作.

path的编码输出路径为”${parity_dir}/path”.不过因为一个path可能对应多个EncodingCandidate,先在临时目录下进行局部编码,临时目录路径为:
/tmp/${parity_dir}/${jobID}/${partialParityName}/tmp

每个EncodingCandidate在该目录下编码生成一个局部校验文件,文件名为EncodingCandidate相关的StripeID,所有n个局部校验文件都生成后,将tmp目录重命名为兄弟目录final,在final目录中将所有的局部校验文件连接成一个文件,然后重命名为最终的输出路径,即完成编码过程.
而其实每一个局部校验文件都是先在另外一个临时目录下编码生成,然后在重命名为tmp目录下的文件.

而对于局部校验文件的生成,一个EncodingCandidate会创建相应的文件或目录StripeReader,即FileStripeReader或DirectoryStripeReader,用于创建相应的输入流读取数据.一个EncodingCandidate或StripeReader对应encodingUnit个stripe,一个stripe的编码操作在函数encodeStripe中完成.

每一个Stripe创建一个ParallelStripeReader,在其中创建多个线程从stripeLength个输入流中并行读取数据,每次读取bufSize个字节的数据存放在一个ReadResult对象中,stripeLength个输入流数据对应的所有ReadResult存放在阻塞队列中,依次从阻塞队列中取出ReadResult进行编码,最终最小的编码函数对应于:

1
code.encodeBulk(readResult.readBufs, writeBufs);

为具体的擦除码实现,可继承ErasureCode完成相应的编码函数实现其他不同于rs xor的编码方式.