Hadoop版本:hadoop-20-master
LocalRaidNode编码操作通过raidFiles完成,函数原型如下:1
2
3
4
5
6@Override
void raidFiles(PolicyInfo info, List<FileStatus> paths) throws IOException {
//分割paths,创建EncodingCandidate列表
List<EncodingCandidate> lec = RaidNode.splitPaths(conf, Codec.getCodec(info.getCodecId()), paths);
doRaid(conf, info, lec);
}
传入参数中info
为policy文件中的一个policy,paths为该policy下可以Raid的路径列表(这里可能为文件或叶子目录),具体见上篇Raid文件的选取.
其中,splitPaths根据配置参数,将待Raid的文件列表转换成EncodingCandidate列表:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25for (FileStatus s : paths) {
if (codec.isDirRaid != s.isDir()) {
continue;
}
long numBlocks = 0L;
if (codec.isDirRaid) {
List<FileStatus> lfs = RaidNode.listDirectoryRaidFileStatus(
conf, srcFs, s.getPath());
if (lfs == null) {
continue;
}
for (FileStatus stat : lfs) {
numBlocks += RaidNode.numBlocks(stat);
}
} else {
numBlocks = RaidNode.numBlocks(s);
}
long numStripes = RaidNode.numStripes(numBlocks, codec.stripeLength);
String encodingId = System.currentTimeMillis() + "." + rand.nextLong();
for (long startStripe = 0; startStripe < numStripes;
startStripe += encodingUnit) {
lec.add(new EncodingCandidate(s, startStripe, encodingId, encodingUnit,
s.getModificationTime()));
}
}
对于路径和codec不匹配的,不予处理.然后统计每个path的区块数(path可能为文件或目录).
对于目录path,通过listDirectoryRaidFileStatus统计目录下所有的Blocks,简单的对目录下所有文件(目录下的目录不处理)对应的区块相加.
不过对于小于hdfs.raid.min.filesize
(默认10KB)的文件不进行统计.
而对于文件path,则直接是该文件对应的Block数目.
有了path的Block数目,根据codec的stripeLength,便能得到该path对应的stripe数目numStripes
.
然后,创建numStripes/encodingUnit
个EncodingCandidate
对象.这里encodingUnit
为配置项hdfs.raid.stripe.encoding
默认为1.即一个EncodingCandidate对应一个stripe,例如对于(10,4)rs码来说,默认情况下,一个EncodingCandidate负责编码10个区块.
从上面代码可以看出,对于待Riad列表来说,一个path对应一个encodingId,ID号为当前时间加上一个随机数,不管path下能够对应几个stripe.
如果path(文件或目录)对应Block数目过多,需多个EncodingCandidate进行处理,则这几个EncodingCandidate共享这一个encodingId,以进行辨识.
另外,从上代码可以看出,不管path下面的Block数目怎么少(最小2个Block),它至少对应一个EncodingCandidate.如对于(10,4)rs码,如果一个path的Block只有4个,它也单独需要一个EncodingCandidate进行处理,剩下的6个Block在后面会看到将用0Block进行填充.
这主要说明了文件Riad和目录Raid的含义,文件Raid编码独立单元即为文件,即使它再小也不能和其他小文件进行拼接一起编码.若文件对应Block很多,则分别由不同的EncodingCandidate进行处理,一个EncodingCandidate可以处理的Block数依赖于stripeLength和encodingUnit,一个文件的
所有的EncodingCandidate共享一个encodingId.
目录Raid编码独立单元为目录,目录下面Block数目再小也不能和其他目录进行拼接一起编码,一个目录的所有EncodingCandidate共享一个encodingId.
接着,由doRaid(conf, info, lec);
对EncodingCandidate列表进行Raid操作.1
2
3
4
5
6
7
8
9for (EncodingCandidate ec : paths) {//对每一个EncodingCandidate依次编码
doRaid(conf, ec, destPref, codec, statistics,
RaidUtils.NULL_PROGRESSABLE, doSimulate,
targetRepl, metaRepl);
if (count % 1000 == 0) {
LOG.info("RAID statistics " + statistics.toString());
}
count++;
}
对每一个EncodingCandidate一次执行doRaid(conf, ec, destPref, codec, statistics,RaidUtils.NULL_PROGRESSABLE, doSimulate,targetRepl, metaRepl);
.
这里destPref
为配置的codec的校验文件存放路径parity_dir
,dosimulate
为policy中的属性simulate
,默认为false.targetRepl
为policy属性targetReplication
,metaRepl
为policy属性metaReplication
.
然后,在函数中,分别对目录Raid和文件Raid两种情况调用不同子函数:1
2
3
4
5
6
7if (codec.isDirRaid) {//不同分支
result = doDirRaid(conf, ec, destPath, codec, statistics, reporter,
doSimulate, targetRepl, metaRepl);
} else {
result = doFileRaid(conf, ec, destPath, codec, statistics, reporter,
doSimulate, targetRepl, metaRepl);
}
两个函数将ec管理的文件或目录下的所有文件编码至destPath中,输出路径为destPath/srcPath,并根据doSimulate更新源区块的备份数.
1. doFileRaid
在doFileRaid中,首先检查path的区块是否大于2,这个之前其实已经检查过了:1
2
3
4
5BlockLocation[] locations = srcFs.getFileBlockLocations(stat, 0, stat.getLen());
// if the file has fewer than 2 blocks, then nothing to do
if (locations.length <= 2) {
return LOGRESULTS.NOACTION;
}
如果区块数满足要求,则生成校验文件,并根据doSimulate
决定是否更正源文件的备份数,其中还有统计信息的更新,这里不分析:1
2
3
4
5
6
7
8
9
10
11
12
13
14//子函数
boolean parityGenerated = generateParityFile(conf, ec, targetRepl, reporter,
srcFs, destPath, codec, locations.length, stat.getReplication(),
metaRepl, stat.getBlockSize(), null);
if (!parityGenerated) {
return LOGRESULTS.NOACTION;
}
if (!doSimulate) {
if (srcFs.setReplication(p, (short)targetRepl) == false) {
LOG.info("Error in reducing replication of " + p + " to " + targetRepl);
statistics.remainingSize += diskSpace;
return LOGRESULTS.FAILURE;
};
}
2. doDirRaid
doDirRaid同样的首先计算目录path下的区块数,判断是否大于2,不过因为是目录,需要统计目录下所有文件的区块总和:1
2
3
4
5
6
7
8
9
10List<FileStatus> lfs = RaidNode.listDirectoryRaidFileStatus(conf, srcFs, p);
if (lfs == null) {
return LOGRESULTS.NOACTION;
}
// add up the total number of blocks in the directory
long blockNum = DirectoryStripeReader.getBlockNum(lfs);
// if the directory has fewer than 2 blocks, then nothing to do
if (blockNum <= 2) {
return LOGRESULTS.NOACTION;
}
同样的区块数满足要求,生成校验文件,并根据doSimulate
是否更新目录下所有源文件的备份数,更新统计信息:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18try {
parityGenerated = generateParityFile(conf, ec, targetRepl, reporter, srcFs,
destPath, codec, blockNum, srcRepl, metaRepl, parityBlockSize, lfs);
} catch (InterruptedException e) {
throw new IOException (e);
}
if (!parityGenerated)
return LOGRESULTS.NOACTION;
if (!doSimulate) {
for (FileStatus fsStat: lfs) {
if (srcFs.setReplication(fsStat.getPath(), (short)targetRepl) == false) {
LOG.info("Error in reducing replication of " +
fsStat.getPath() + " to " + targetRepl);
statistics.remainingSize += diskSpace;
return LOGRESULTS.FAILURE;
}
};
}
由上可见,generateParityFile
文件Raid和目录Raid共用,区别在于最后一个参数,文件Raid为null,而目录Raid为目录下的文件列表.
其中locations
为原path(文件或目录)所有的Blocks,而不是参数ec对应的Blocks.
3. generateParityFile
首先,获取编码输出路径,generateParityFile
的destPath输入参数为codec配置的parity_dir
参数,而对于一个EncodingCandidate
来说,它的输出路径通过getOriginalParityFile
获得:1
2
3FileStatus stat = ec.srcStat;
Path inpath = stat.getPath();
Path outpath = getOriginalParityFile(destPathPrefix, inpath);//配置的校验路径+源路径为最终输出路径
inpath
为null或其父目录为null,则输出路径直接为parity_dir
,否则输出路径为parity_dir/inpath
,这个从编码测试结果可以验证.
然后,要判断是否已经编码过了,即输出路径是否存在校验文件,且源文件备份数已做出相应修改:1
2
3
4
5
6
7
8
9//是否已编码过
FileStatus stmp = outFs.getFileStatus(outpath);
if (stmp.getModificationTime() == stat.getModificationTime() &&
srcRepl == targetRepl) {
LOG.info("Parity file for " + inpath + "(" + blockNum +
") is " + outpath + " already upto-date and " +
"file is at target replication . Nothing more to do.");
return false;
}
没有编码过才能继续进行,创建Encoder对象,并验证ChecksumStore和StripeStore是否符合要求:1
2Encoder encoder = new Encoder(conf, codec);
encoder.verifyStore();//创建相应的ChecksumStore和StripeStore
Encoder构造见后面.编码器构造完成后,根据目录Raid或文件Raid执行不同流程:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24if (codec.isDirRaid) {
long numStripes = (blockNum % codec.stripeLength == 0) ?
(blockNum / codec.stripeLength) :
((blockNum / codec.stripeLength) + 1);
//目录Raid创建DirectoryStripeReader,执行编码
sReader = new DirectoryStripeReader(conf, codec, inFs,
ec.startStripe, ec.encodingUnit, inpath, lfs);
parityGenerated = encoder.encodeFile(conf, inFs, outFs, outpath,
(short)metaRepl, numStripes, blockSize, reporter, sReader, ec);
} else {
FileStatus srcStat = inFs.getFileStatus(inpath);
long srcSize = srcStat.getLen();
long numBlocks = (srcSize % blockSize == 0) ?
(srcSize / blockSize) :
((srcSize / blockSize) + 1);
long numStripes = (numBlocks % codec.stripeLength == 0) ?
(numBlocks / codec.stripeLength) :
((numBlocks / codec.stripeLength) + 1);
//文件Raid创建FileStripeReader执行编码
sReader = new FileStripeReader(conf, blockSize,
codec, inFs, ec.startStripe, ec.encodingUnit, inpath, srcSize);
parityGenerated = encoder.encodeFile(conf, inFs, outFs, outpath,
(short)metaRepl, numStripes, blockSize, reporter, sReader, ec);
}
两者都先计算该EncodingCandidate对应原path的Blocks数目,用于在子函数中判断是否完成该path的所有Raid操作.
然后对于目录Raid创建DirectoryStripeReader,文件Raid创建FileStripeReader,一个EncodingCandidate对应一个StripeReader.
最后通过刚创建的Encoder进行编码.
4. 文件Raid创建FileStripeReader
1 | super(conf, codec, fs, stripeStartIdx); |
调用父类构造函数主要初始化开始stripeID stripeStartIdx
,和当前stripeID currentStripeIdx
.1
2
3
4
5
6this.codec = codec;
this.conf = conf;
this.fs = fs;
this.stripeStartIdx = stripeStartIdx;
this.currentStripeIdx = stripeStartIdx;
this.bufferSize = conf.getInt("io.file.buffer.size", 64 * 1024);
计算了文件的区块数目,根据开始stripeID,编码单元encodingUnit和区块数目计算了该StripeReader对应的结束stripeID stripeEndIdx
.
5. 目录Raid创建DirectoryStripeReader
1 | super(conf, codec, fs, stripeStartIdx); |
同样的先跟据传入的参数调用父类构造函数初始化stripeStartIdx
和currentStripeIdx
.
目录Raid下可能有多个文件,与文件Raid不同,需要遍历目录下所有文件,取最大的Block大小作为校验文件的Block大小.
相比较FileStripeReader,维护了目录下所有文件状态列表lfs
,并记录了目录下所有的文件区块和文件索引对应的关系列表stripeBlocks
.stripeBlocks
中元素数量为目录下所有区块数量,用于在之后构建stripe的输入流时寻找对应的区块.每个元素为BlockInfo类型的对象,维护了一个区块和其所属文件的对应关系.例如,目录下有两个文件,每个文件5个Block,则stripeBlocks中维护了如下的映射关系:(0,0),(0,1),(0,2),(0,3),(0,4),(1,0),(1,1),(1,2),(1,3),(1,4)
最后当然也计算stripeEndIdx
.
6. encodeFile
Encoder的encodeFile
也是由目录Raid和文件Raid共用,完成一个StripeReader(EncodingCandidate)的编码过程.
以下为该方法的注释:1
2
3
4
5/**
* The interface to use to generate a parity file.
* This method can be called multiple times with the same Encoder object,
* thus allowing reuse of the buffers allocated by the Encoder object.
*
意思是说,对于一个独立的编码单元(文件Raid的文件或目录Raid的一个目录,这里记为一个路径path)来说,如果它对应多个EncodingCandidate,需多次执行encodeFile,该path就存在三个Raid状态:1
2
3
4
5
6
7
8
9/**
* To support retriable encoding, we use three checkpoints to represent
* the last success state.
* 1. isEncoded: Set to true when partial partiy is generated
* 2. isRenamed: Set to true when all partial parity are generated and
* tmpPartialParityDir is moved to finalPartialParityDir
* 3. isConcated: Set to true when partial parities are concatenated into
* a final parity.
*/
记录Raid过程中的状态,这样允许在某一个EncodingCandidate编码失败时进行重试.
函数中存在以下局部变量:1
2
3long expectedParityFileSize = numStripes * blockSize * codec.parityLength;//该路径所期望的校验文件大小
long expectedPartialParityBlocks = (sReader.stripeEndIdx - sReader.stripeStartIdx) * codec.parityLength;
long expectedPartialParityFileSize = expectedPartialParityBlocks * blockSize;//该EncodingCandidate局部校验文件大小
其中expectedParityFileSize
记录path所期望的校验文件大小,expectedPartialParityBlocks
为当前EncodingCandidate所期望的区块数目,称为局部校验区块数,expectedPartialParityFileSize
为当前EncodingCandidate期望的校验文件大小,为局部校验文件.
说明:
对于文件或目录来说,其所属的Block可能需要多个EncodingCandidate(StripeReader)进行处理,因此才有上面的变量.第一个变量用于判断是否完成了所有的编码,而后面的两个分别对应该次EncodingCandidate编码所应该产生的区块大小和文件大小.
另外,之后所说的path都对应一个文件或叶子目录,即需要编码的单元.
编码过程有以下临时目录:1
2
3
4
5
6
7
8String jobID = RaidNode.getJobID(jobConf);
Path tmpDir = new Path(codec.tmpParityDirectory, jobID);
String partialParityName = "partial_" + MD5Hash.digest(srcFile.toUri().getPath()) +
"_" + ec.srcStat.getModificationTime() + "_" + ec.encodingUnit + "_" +
ec.encodingId;
Path partialParityDir = new Path(tmpDir, partialParityName);
Path tmpPartialParityDir = new Path(partialParityDir, "tmp");
Path finalPartialParityDir = new Path(partialParityDir, "final");
jobID
通过mapred.job.id
设置,若不存在,则为localRaid
+date
(当前时间,ms).
因此,对于一个path来说,其临时编码路径为:/tmp/${parity_dir}/jobID
.
临时路径下存在一个局部校验路径:partialParityDir
,而该目录下又存在tmp
和final
两个目录.
当前EncodingCandidate还没编码完成时,在tmp
目录下创建属于该EncodingCandidate的临时文件,进行局部编码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18if (!ec.isConcated) {
if (!ec.isEncoded) {
if (!parityFs.mkdirs(tmpPartialParityDir)) {
throw new IOException("Could not create " + tmpPartialParityDir);
}
Path partialTmpParity = new Path(tmpPartialParityDir,
Long.toString(sReader.getCurrentStripeIdx()));
LOG.info("Encoding partial parity " + partialTmpParity);
//临时校验文件编码
if (!encodeTmpParityFile(jobConf, sReader, dfs,
partialTmpParity, parityFile, tmpRepl, blockSize,
expectedPartialParityBlocks, expectedPartialParityFileSize,
reporter)) {
return false;
}
LOG.info("Encoded partial parity " + partialTmpParity);
}
...
一个EncodingCandidate对应的临时校验文件名与stripeID对应.
通过encodeTmpParityFile进行局部校验文件的编码操作,完成一个EncodingCandidate的编码过程.
输入参数partialTmpParity为该EncodingCandidate对应的临时校验文件,也即该函数的编码结果输出文件.
expectedPartialParityBlocks为parityLength*encodingUnit个Blocks.
局部校验文件编码成功,则当前EncodingCandidate状态isEncoded
为true,然后等待当前EncodingCandidate所属文件或目录的其他EncodingCandidate局部编码完成,都编码完成后将tmp目录重命名为final目录,置isRenamed
为true.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20ec.isEncoded = true;
long expectedNum = (long) Math.ceil(numStripes * 1.0 / ec.encodingUnit);
if (!ec.isRenamed) {
//判断并等待其他局部编码完成
if (!finishAllPartialEncoding(parityFs, tmpPartialParityDir, expectedNum)) {
return false;
}
InjectionHandler.processEventIO(
InjectionEvent.RAID_ENCODING_FAILURE_RENAME_FILE);
// Move the directory to final
//所有局部编码完成,重命名,进入到Renamed状态
if (!dfs.rename(tmpPartialParityDir, finalPartialParityDir)) {
LOG.info("Fail to rename " + tmpPartialParityDir + " to " +
finalPartialParityDir);
return false;
}
LOG.info("Renamed " + tmpPartialParityDir + " to " +
finalPartialParityDir);
ec.isRenamed = true;
}
状态还不是isRenamed,即表示还没有完成所有的局部编码,通过finishAllPartialEncoding
判断并等待其他局部编码完成.
该函数中最多判断raid.encoder.retry.count.partial.encoding
(默认3)次,看是否其他的局部编码完成,最后一次休眠等待.
重命名后,说明所有的局部编码完成,此时tmp目录重命名为final,final目录下含有所需的所有局部校验文件.
然后需要判断final目录下的文件是否正确(文件名和encodingUnit,文件大小和期望大小),正确的话列出所有文件,然后将除第一个文件外的其他所有文件依次追加到第一个文件末尾(第一个文件startstripe最小),完成concate的过程,并置isConcated
为true:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24FileStatus[] stats = parityFs.listStatus(finalPartialParityDir);
// Verify partial parities are correct
Vector<Path> partialPaths = getPartialPaths((int)ec.encodingUnit,
(int)expectedNum, stats, codec, numStripes);
finalTmpParity = partialPaths.get(0);
InjectionHandler.processEventIO(
InjectionEvent.RAID_ENCODING_FAILURE_CONCAT_FILE);
if (partialPaths.size() > 1) {
Path[] restPaths = partialPaths.subList(1,
partialPaths.size()).toArray(new Path[partialPaths.size() - 1]);
try {
// Concat requires source and target files are in the same directory
//连接其他局部校验文件到第一个局部校验文件尾,进入isConcated状态
dfs.concat(finalTmpParity, restPaths, true);
LOG.info("Concated " + partialPaths.size() + " files into " + finalTmpParity);
} catch (IOException ioe) {
// Maybe other tasks already finish concating.
LOG.info("Fail to concat " + partialPaths.size() +
" files into " + finalTmpParity, ioe);
throw ioe;
}
}
ec.isConcated = true;
至此,原path(一个文件或目录)编码全部完成,编码结果存在一个文件中,文件位于临时目录下.
接着便对最终的编码文件作合理性判断(区块数和大小),若符合要求,则重命名为最终的编码文件.
相应的,若支持StripeStore,则还需存储stripes1
2
3
4
5
6
7
8
9
10
11
12
13if (dfs.exists(parityFile)){
dfs.delete(parityFile, false);
}
dfs.mkdirs(parityFile.getParent());
//final目录下的已经连接的文件重命名为最终输出路径文件,完成编码
if (!dfs.rename(finalTmpParity, parityFile)) {
String msg = "Unable to rename file " + finalTmpParity + " to " + parityFile;
throw new IOException (msg);
}
if (stripeStore != null) {
//更新StripeStore并创建相应文件
this.writeToStripeStore(ec.srcStripes, dfs, fs, srcFile, parityFs, parityFile, expectedParityFileSize, reporter,finalTmpParity);
}
writeToStripeStore
中通过CheckSumStore的putStripe
方法,将Stripe信息存放至Encoder的ChecksumStore中并在指定的目录创建相应的文件以及内容.
7. encodeTmpParityFile
encodeTmpParityFile编码局部校验文件,并不直接使用传进来的局部校验文件作为编码输出文件,而是另外创建了一个临时文件:1
2
3
4
5
6
7String jobID = RaidNode.getJobID(jobConf);
Path tmpDir = new Path(codec.tmpParityDirectory, jobID);
if (!parityFs.mkdirs(tmpDir)) {
throw new IOException("Could not create tmp dir " + tmpDir);
}
Path parityTmp = new Path(tmpDir, parityFile.getName() + rand.nextLong());
FSDataOutputStream out = parityFs.create(parityTmp,true,conf.getInt("io.file.buffer.size", 64 * 1024),tmpRepl,blockSize);
可见临时文件的目录为:/tmp/${parity_dir}/jobID
,文件名为局部校验文件名加一个随机数,以这个临时文件作为编码结果输出.
如果支持ChecksumStore,则需创建expectedPartialParityBlocks个CRC32对象,每一个Block对应一个,存储区块的校验和.1
2
3
4CRC32[] crcOuts = null;
if (checksumStore != null) {
crcOuts = new CRC32[(int)expectedPartialParityBlocks];
}
然后通过encodeFileToStream(sReader, blockSize, out, crcOuts, reporter);
进行编码,编码结果写入out即临时文件中,支持校验和的话,同时计算parity数据校验和存入crcOuts中.
如果一切正常,支持校验和的话,将得到的校验和存入ChecksumStore中,并创建配置的目录下的相应文件:1
2
3
4if (checksumStore != null) {
//更新校验文件区块校验和,并在本地目录创建相应文件
this.writeToChecksumStore((DistributedFileSystem)parityFs, crcOuts, parityTmp, expectedPartialParityFileSize, reporter);
}
writeToChecksumStore
中对每一个区块和对应校验和写到checksumstore中,putIfAbsentChecksum
还会通过putIfAbsent
建相应的文件1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17LocatedBlocks lbks = dfs.getLocatedBlocks(parityTmp, 0L,
expectedParityFileSize);
for (int i = 0; i < crcOuts.length; i++) {
this.checksumStore.putIfAbsentChecksum(lbks.get(i).getBlock(),
crcOuts[i].getValue());
reporter.progress();
}
@Override
public Long putIfAbsent(Block blk, Long newChecksum) throws IOException {
Long oldChecksum = store.putIfAbsent(blk.toString(), newChecksum);
if (oldChecksum == null) {
LOG.info("Put " + blk + " -> " + newChecksum);
File newFile = getFile(blk, newChecksum);
newFile.createNewFile();
}
return oldChecksum;
}
最后将临时文件重命名为局部校验文件,完成局部校验的编码过程.1
2
3
4if (!parityFs.rename(parityTmp, partialTmpParity)) {
LOG.warn("Fail to rename file " + parityTmp + " to " + partialTmpParity);
return false;
}
8. encodeFileToStream
encodeFileToStream
完成一个StripeReader的编码,编码结果放在临时文件中(非上述tmp目录),根据需要计算校验文件的校验和,函数如下:1
private void encodeFileToStream(StripeReader sReader,long blockSize, FSDataOutputStream out, CRC32[] crcOuts,Progressable reporter)
在encodeFileToStream
中,首先创建parityLength个输出流,需要将stripeLength个输入流编码成parityLength个输出流.parityLength个输出流中,只有一个是输入参数out,也就是上面的临时文件流,其他的都是临时文件:1
2
3
4
5
6
7
8
9OutputStream[] tmpOuts = new OutputStream[codec.parityLength];
// One parity block can be written directly to out, rest to local files.
tmpOuts[0] = out;//第一个输出流为最终输出流,其他为临时文件输出流
File[] tmpFiles = new File[codec.parityLength - 1];
for (int i = 0; i < codec.parityLength - 1; i++) {
tmpFiles[i] = File.createTempFile("parity", "_" + i);
LOG.info("Created tmp file " + tmpFiles[i]);
tmpFiles[i].deleteOnExit();
}
接着便是对该StripReader所管理的stripes(stripe数取决于encodingUnit,见上)进行编码,如果当前StripeReader还有stripe未处理,则通过StripeReader取出下一个未处理的stripe:1
2
3
4
5
6
7while (sReader.hasNext()) {//依次编码StripeReader的encodingUnit个stripe
reporter.progress();
StripeInputInfo stripeInputInfo = null;
InputStream[] blocks = null;
// Create input streams for blocks in the stripe.
long currentStripeIdx = sReader.getCurrentStripeIdx();
stripeInputInfo = sReader.getNextStripeInputs();//获取下一个stripe输入信息
一个StripeInputInfo包含了一个stripe的信息,成员如下:1
2
3private final InputStream[] inputs;
private final Path[] srcPaths;
private final long[] offSets;
即包括了stripeLength个输入流(文件Raid中为文件对应的流和ZeroInputStream
,目录Raid中可能多个文件输入流和若干ZeroInputStream
),每个输入流对应的源文件和在源文件中的偏移量.
getNextStripeInputs
即根据StripeReader(文件或目录)返回下一个未Raid的StripeInputInfo:1
2
3
4
5public StripeInputInfo getNextStripeInputs() throws IOException {
StripeInputInfo stripeInputInfo = getStripeInputs(currentStripeIdx);
currentStripeIdx ++;
return stripeInputInfo;
}
getStripeInputs
为抽象方法,对应FileStripeReader和DirectoryStripeReader有相应实现.
8.1 FileStripeReader获取下一个StripeInputInfo
对应于上面创建的FileStripeReader,即文件Raid.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21//一个stripe的输入流,偏移量以及文件路径
long stripeStartOffset = stripeIdx * codec.stripeLength * blockSize;
for (int i = 0; i < codec.stripeLength; i++) {
long seekOffset = stripeStartOffset + i * blockSize;
if (seekOffset < srcSize) {
FSDataInputStream in = fs.open(srcFile, bufferSize);
in.seek(seekOffset);
LOG.info("Opening stream at " + srcFile + ":" + seekOffset);
blocks[i] = in;
srcPaths[i] = srcFile;
offsets[i] = seekOffset;
} else {
LOG.info("Using zeros at offset " + seekOffset);
// We have no src data at this offset.
blocks[i] = new RaidUtils.ZeroInputStream(
seekOffset + blockSize);
srcPaths[i] = null;
offsets[i] = -1;
}
}
return new StripeInputInfo(blocks, srcPaths, offsets);
stripeIdx为传入的currentStripeIdx,文件Raid中stripeLength个输入流对应的文件只有一个,可见根据currentStripeIdx,stripeLength和区块大小便能定位当前stripe文件中的开始偏移量,而一个stripe有stripeLength个Blocks,每个Block都能计算出一个开始偏移量,得到相应的流.
若文件剩余大小满足stripe需求,不需要填充0Block,则所有的输入流都为文件对应的流,srcPath也为文件,偏移量为相应的偏移量.
而如果需要填充0Block,则达到文件尾端后,剩余的输入流为ZeroInputStream,对应的srcPath为null,偏移量为-1.
8.2 DirectoryStripeReader获取下一个StripeInputInfo
1 | //通过stripeBlocks获取区块所在文件 |
这里与FileStripeReader不同的是,目录Raid的一个stripe可能跨越多个该目录下的文件,因此需要定位每一个输入流所属的文件.
构造过程中已经看到,成员stripeBlocks
记录了目录下所有的区块和其对应的文件信息.stripeIdx
为传入的currentStripeIdx
,根据currentStripeIdx和stripeLength便可得到当前stripe在目录下的开始区块索引号startOffset
.
有了stripe的开始区块索引号startOffset
,便能得到stripe中第i个区块在目录下的区块索引号,若索引号没有超出stripeBlocks的大小(即存在于目录下),我们便能通过stripeBlocks和lfs成员得知该区块所属的文件,进而得知在文件中的偏移量,并构建输入流对象.
而若当前stripe的第i个区块超出了stripeBlocks的大小,即不存在于目录下,需要填充0 Block,对应的输入流为ZeroInputStream
,偏移量为-1,文件路径为null,这与FileStreamReader一样.
输入流构造完成,接着便是局部编码的过程,在while循环中可以重试3次:1
2
3
4
5
6
7
8
9
10int retry = 3;
do {
redo = false;
retry --;
...
if (redo) {
// rebuild the inputs.
stripeInputInfo = sReader.getStripeInputs(currentStripeIdx);
}
} while (redo);
其中redo
在编码过程出错时,catch中如果retry还大于0,则重置为true,否则retry小于等于0,抛出异常.1
2
3
4if (retry <= 0) {
throw e;
}
redo = true;
另外,在catch中发现区块丢失等异常情况,要构建CorruptBlockReconstructor
和Decoder进行修复,具体修复在后面BlockIntegrityMonitor
中分析.
那么,do while中间过程的具体的编码过程如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17try {
blocks = stripeInputInfo.getInputs();
CRC32[] curCRCOuts = new CRC32[codec.parityLength];
if (crcOuts != null) {
for (int i = 0; i < codec.parityLength; i++) {
crcOuts[finishedParityBlockIdx + i] = curCRCOuts[i]
= new CRC32();
}
}
// Create output streams to the temp files.
for (int i = 0; i < codec.parityLength - 1; i++) {
tmpOuts[i + 1] = new FileOutputStream(tmpFiles[i]);
}
// Call the implementation of encoding.
encodeStripe(blocks, blockSize, tmpOuts, curCRCOuts, reporter,true, errorLocations);
}
即首先获取输入流数组blocks,如果支持ChecksumStore的话,crcOuts为expectedPartialParityBlocks
个CRC32数组.
然后对之前创建的临时输出文件构建输出流对象,最终通过encodeStripe
进行编码.
9. encodeStripe
函数原型为:1
2void encodeStripe(InputStream[] blocks,long blockSize,OutputStream[] outs,CRC32[] crcOuts,Progressable reporter,
boolean computeSrcChecksum,List<Integer> errorLocations)
blocks
为stripeLength个输入流数组,blockSize
为输入流中一个Block的大小,outs
对应parityLength个输出流;computeSrccChecksum
用于决定是否计算源文件区块的校验和.编码过程为true(通过encodeFileToStream调用),即对每一个源文件区块计算校验和更新至Encoder的ChecksumStore和校验和本地目录中;而对校验文件的修复为false(通过recoverParityBlockToStream),不重新计算源文件区块校验和.errorLocations
对应出错的位置,对应完成一个stripe的编码过程,将stripeLength个原Blocks编码成parityLength个校验Blocks.
首先配置Encoder的writeBufs(输出缓冲),每个缓冲大小为bufSize
,通过raid.encoder.bufsize
配置.
如果bufSize大于Block大小,则为Block大小.小于Block大小时,如果不能被Block大小整除,则配置为Block大小的1/256,最小1KB,因为在后面可以看到,每次取出bufSize,如果配置能被区块大小整除,则可以刚好取完,每次取出的长度一样:1
configureBuffers(blockSize);
然后,对stripeLength个输入流构建一个ParallelStreamReader对象,并启动读取缓冲的线程(默认4个).1
2
3
4
5
6int boundedBufferCapacity = 1;
ParallelStreamReader parallelReader = new ParallelStreamReader(
reporter, blocks, bufSize,
parallelism, boundedBufferCapacity, blockSize, computeSrcChecksum,
outs);
parallelReader.start();
ParallelStreamReader
启动多个线程(默认4)并行从stripeLength个输入流中读取数据,每次读取bufSize个字节的数据存入一个ReadResult对象中,将每次读取结果(一个ReadResult)放入阻塞队列中,等待从中读取进行编码.因此最小的编码单元为bufSize,详细ParallelStreamReader分析见下.
ParallelStreamReader构建完,并启动主线程后,开始编码过程:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33for (long encoded = 0; encoded < blockSize; encoded += bufSize) {
ParallelStreamReader.ReadResult readResult = null;
try {
readResult = parallelReader.getReadResult();//从阻塞队列中读取一个ReadResult局部缓冲
} catch (InterruptedException e) {
throw new IOException("Interrupted while waiting for read result");
}
// Cannot tolerate any IO errors.
IOException readEx = readResult.getException();
if (readEx != null) {
if (errorLocations != null) {
errorLocations.clear();
for (int idx : readResult.getErrorIdx()) {
errorLocations.add(idx);
}
}
throw readEx;
}
code.encodeBulk(readResult.readBufs, writeBufs);
reporter.progress();
// Assume each byte is independently encoded
int toWrite = (int)Math.min(blockSize - encoded, bufSize);
// Now that we have some data to write, send it to the temp files.
for (int i = 0; i < codec.parityLength; i++) {
outs[i].write(writeBufs[i], 0, toWrite);//编码结果写到输出流
if (crcOuts != null && crcOuts[i] != null) {//更新校验文件区块对应的校验和数据
crcOuts[i].update(writeBufs[i], 0, toWrite);
}
reporter.progress();
}
}
以上循环对应的一个Stripe的实际编码过程,从循环看出每次编码bufSize个字节,这是因为从输入流中读取的数据保存在ReadResult的成员属性readBufs中,它对应stripeLength个数组,每个数组存储bufSize个输入流的字节.1
readResult = parallelReader.getReadResult();
从阻塞队列中取出一个ReadResult,可能阻塞等待ReadOperation
线程读取完成.
获取一个ReadResult后,通过Encoder的code成员(擦除码实现,配置)的encodeBulk完成对bufSize个字节的编码.1
code.encodeBulk(readResult.readBufs, writeBufs);
将stripeLength个长度为bufSize的源数据编码成parityLength个长度为bufSize的校验数据,写到Encoder的writeBufs成员中.
最后将writeBufs成员中数据分别写到输入流outs中,且如果支持ChecksumStore,将编码结果更新至parityLength个CRC32对象中计算相应校验和.1
2
3
4
5
6
7for (int i = 0; i < codec.parityLength; i++) {
outs[i].write(writeBufs[i], 0, toWrite);
if (crcOuts != null && crcOuts[i] != null) {
crcOuts[i].update(writeBufs[i], 0, toWrite);
}
reporter.progress();
}
这样,循环结束便完成了一个stripe的编码.
10. 函数调用链(从上到下为先后调用顺序):
raidFiles
(info, filteredPaths);
处理当前PolicyInfo info的所有可Raid路径filteredPaths.doRaid
(conf, info, lec);
info的可Raid路径转换为EncodingCandidate列表lec,处理所有的EncodingCandidate.doRaid
(conf, ec, destPref, codec, statistics, RaidUtils.NULL_PROGRESSABLE, doSimulate, targetRepl, metaRepl);
处理一个EncodingCandidate—ec.destPref
为codec配置parity_dir
.doSimulate
为policy的配置属性simulate
,默认false,false时编码完成将更改源文件备份数.targetRepl
为编码后源文件备份.metaRepl
为编码文件备份.doFileRaid
(conf, ec, destPath, codec, statistics, reporter, doSimulate, targetRepl, metaRepl);
上面doRaid的文件Raid分支,对应目录Raid为doDirRaid,参数见上一个方法.generateParityFile
(conf, ec, targetRepl, reporter, srcFs, destPath, codec, locations.length, stat.getReplication(),metaRepl, stat.getBlockSize(), null);
增加了更多的该EncodingCandidate对应的信息,用于判断是否完成path所有的编码.对应的还是一个EncodingCandidate的编码.encoder.encodeFile
(conf, inFs, outFs, outpath, (short)metaRepl, numStripes, blockSize, reporter, sReader, ec);
这里输出路径outpath
不再是配置项parity_dir
,而是最终的输出文件路径:${parity_dir}/srcpath
.sReader
可能对应FileStreamReader
或DirectoryStreamReader
.
该方法还是对应一个EncodingCandidate的编码,只不过基于原来EncodingCandidate的信息构建了StripeReader对象,用于获取源文件的输入流.encodeTmpParityFile
(jobConf, sReader, dfs, partialTmpParity, parityFile, tmpRepl, blockSize, expectedPartialParityBlocks, expectedPartialParityFileSize, reporter)
对应一个StripeReader(即一个EncodingCandidate)的编码.partialTmpParity
为该sReader输出路径,即一个局部校验文件,路径为:/tmp/${parity_dir}/${jobID}/${partialParityName}/tmp/${currentStripeID}
.
其中jobID
可通过mapred.job.id
配置,默认为localRaid
+当前时间(date).partialParityName
为partial_ + MD5Hash.digest(srcFile.toUri().getPath()) +"_" + ec.srcStat.getModificationTime() + "_" + ec.encodingUnit + "_" +ec.encodingId
即默认情况下,一个path对应一个局部临时校验目录,该目录下的tmp
目录存储该path的所有局部校验文件,文件名为对应StripeReader的currentStripeID,校验文件数目为path对应的EncodingCandidate数目.
当一个path的所有EncodingCandidate编码完成后,全部的局部校验文件位于tmp目录下,然后重命名为tmp目录的兄弟目录final
,在final目录中将所有局部校验文件追加到第一个局部校验文件尾部,形成完整的校验文件,然后重命名为encodeFile的输入参数outpath,即校验文件输出路径,完成一个Path的Raid过程.举例:
一个文件Raid的文件,采用(10,4)rs编码,若该文件有40个Blocks,且encodingUnit采用默认配置1.
则一个EncodingCandidate以及一个StripeReader都只对应一个stripe即管理10个Block,因此该文件对应4个EncodingCandidate和StripeReader,
则tmp
录下应该有4个局部校验文件,文件名分别为0~3,通过encodeTmpParityFile生成(编码时先输出到另一个临时文件,局部编码完成后重命名).
当所有4个局部校验文件都产生后(encodeTmpParityFile一次调用产生一个文件,finishAllPartialEncoding方法中等待期望的校验文件都产生,
见上分析),tmp目录重命名为final(状态为isRenamed).
然后在final中将1~3局部校验文件依次追加到0中,追加后的文件则为最终的校验(状态变为isConcated),最终将final的唯一一个文件重命名为校验文件
(${parity-dir}/srcpath),完成path的编码过程.
而对应设置encodingUnit为2时,tmp目录下应该只有2个局部校验文件(此时一个EncodingCandidate和StripeReader管理两个stripe(20个Block),
encodeTmpParityFile编码两个stripe的区块),文件名分别为0,2(不是1),按上述步骤经过重命名—追加—重命名完成编码过程.encodeFileToStream
(sReader, blockSize, out, crcOuts, reporter);
通过从StripeReader中依次读取encodingUnit个stripe完成编码过程.
out为另一个临时文件对应的输出流,位置/tmp/${parity_dir}/${jobID}/currentStripeID+random
先将编码结果存在这一临时输出流中,sReader所有数据编码完成后,重命名为tmp目录下对应的currentStripeID文件,完成局部校验文件的生成.encodeStripe
(blocks, blockSize, tmpOuts, curCRCOuts, reporter, true, errorLocations);
对应一个stripe的编码过程,即StripeReader将执行encodingUnit次.blocks
stripe个区块对应的输入流.tmpOuts
第一个输出流为encodeFileToStream构建的临时文件流,剩余的parityLength-1个输出流为临时文件(parity_i)输出流.
函数通过构建ParallelStreamReader,创建多个线程并行读取数据到阻塞队列中,然后从阻塞队列取数据进行编码.code.encodeBulk
(readResult.readBufs, writeBufs);
最终的最小编码单元,编码bufSize个字节的数据.
对应codec配置的擦除码实现,上一级函数中通过ParallelStreamReader并行读取,每次读取bufSize个字节的数据存放在一个ReadResult中,该函数便是处理一个ReadResult的数据.编码结果存在Encoder的缓冲writeBufs中,每次bufSize个字节的数据编码完成,写到输出流tmpOuts中.
11. 一段话总结(以下说的路径可能为文件或目录)
TriggerMonitor执行完所有文件Raid操作后,默认每隔10s读取一次policy文件.如果policy文件自从上次重新加载后修改过,且当前时间从修改后过去了5s,则重新加载policy文件.
加载后,对每个policy,如果配置的路径需要重新读取进行Raid(条件见具体分析),则对与存在fileListPath元素的policy通过readFileList读取可Raid的路径列表,否则对于存在srcPath的policy,通过selectFiles读取可Raid的路径列表(是否可Raid见具体分析).
读取完可Raid的路径后,通过raidFiles对当前policy所有可Raid路径进行Raid.
Raid过程,对于每一个路径,首先根据路径的Blocks数,这里假设为nums,和codec的配置以及encodingUnit的配置创建n个EncodingCandidate.n=ceil(nums/stripeLength/encodingUnit)
即一个路径最少对应一个EncodingCandidate,只要其Blocks数大于2,一个路径拥有一个encodingID,所有其拥有的EncodingCandidate共享.
接着对于路径的每一个EncodingCandidate,执行编码操作.
path的编码输出路径为”${parity_dir}/path”.不过因为一个path可能对应多个EncodingCandidate,先在临时目录下进行局部编码,临时目录路径为:/tmp/${parity_dir}/${jobID}/${partialParityName}/tmp
每个EncodingCandidate在该目录下编码生成一个局部校验文件
,文件名为EncodingCandidate相关的StripeID,所有n
个局部校验文件都生成后,将tmp目录重命名为兄弟目录final,在final目录中将所有的局部校验文件连接成一个文件,然后重命名为最终的输出路径,即完成编码过程.
而其实每一个局部校验文件都是先在另外一个临时目录下编码生成,然后在重命名为tmp目录下的文件.
而对于局部校验文件的生成,一个EncodingCandidate会创建相应的文件或目录StripeReader,即FileStripeReader或DirectoryStripeReader,用于创建相应的输入流读取数据.一个EncodingCandidate或StripeReader对应encodingUnit个stripe,一个stripe的编码操作在函数encodeStripe
中完成.
每一个Stripe创建一个ParallelStripeReader,在其中创建多个线程从stripeLength个输入流中并行读取数据,每次读取bufSize个字节的数据存放在一个ReadResult对象中,stripeLength个输入流数据对应的所有ReadResult存放在阻塞队列中,依次从阻塞队列中取出ReadResult进行编码,最终最小的编码函数对应于:1
code.encodeBulk(readResult.readBufs, writeBufs);
为具体的擦除码实现,可继承ErasureCode完成相应的编码函数实现其他不同于rs xor的编码方式.