RaidNode源码阅读---损坏区块修复CorruptMonitor(LocalBlockIntegerityMonitor)

Hadoop版本:hadoop-20-master


LocalBlockIntegrityMonitor没有提供丢失区块修复功能,损坏区块的修复类就是LocalBlockIngerityMonitor,参见辅助类.
LocalBlockIntegrityMonitor主线程:

1
2
3
4
while (running) {
try {
LOG.info("LocalBlockFixer continuing to run...");
doFix();

执行修复的入口为doFix函数.


1. doFix

同样的,每次执行操作之前先休眠一段时间,时间为配置项raid.blockfix.interval,缺省5s.
然后,通过文件系统获取损坏的文件列表,列表中不包含回收站(.Trash目录).得到列表后过滤掉不能重构的源文件(不是校验文件且对应校验文件不存在,不能重构)

1
2
3
List<String> corruptFiles = getCorruptFiles();
FileSystem parityFs = new Path("/").getFileSystem(getConf());
filterUnreconstructableSourceFiles(parityFs, corruptFiles.iterator());

过滤后,如果没有损坏文件,继续循环,否则排序损坏文件列表:

1
2
3
4
5
if (corruptFiles.isEmpty()) {
// If there are no corrupt files, retry after some time.
continue;
}
helper.sortLostFiles(corruptFiles);

helperBlockReconstructor.CorruptBlockReconstructor类型成员,负责损坏区块的修复,sortLostFiles依据的比较器如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Comparator<String> comp = new Comparator<String>() {
public int compare(String p1, String p2) {
Codec c1 = null;
Codec c2 = null;
for (Codec codec : Codec.getCodecs()) {
if (isParityFile(p1, codec)) {
c1 = codec;
} else if (isParityFile(p2, codec)) {
c2 = codec;
}
}
if (c1 == null && c2 == null) {
return 0; // both are source files
}
if (c1 == null && c2 != null) {
return -1; // only p1 is a source file
}
if (c2 == null && c1 != null) {
return 1; // only p2 is a source file
}
return c2.priority - c1.priority; // descending order
}
};

即源文件排在前面先编码且源文件之间保证原来的插入顺序,校验文件在源文件后面,且校验文件之间对应的Codec优先级高的在前面.
排序后,对每一个损坏文件通过reconstructFile进行修复:

1
2
3
4
5
6
7
8
9
10
11
12
13
for (String srcPathStr: corruptFiles) {
if (!running) break;
long recoveryTime = -1;
Path srcPath = new Path(srcPathStr);
try {
boolean fixed = helper.reconstructFile(srcPath, null);
if (fixed) {
incrFilesFixed();
recoveryTime = System.currentTimeMillis() - detectionTime;
lastSuccessfulFixTime = System.currentTimeMillis();
}
}
...


2. reconstructFile

方法原型如下:

1
boolean reconstructFile(Path srcPath, Context context)

针对srcPath为不同文件类型以及是否支持StripeStore有不同的方式进行修复:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
srcStat = fs.getFileStatus(srcPath);
if (RaidNode.isParityHarPartFile(srcPath)) {
return processParityHarPartFile(srcPath, progress);
}//损坏文件为校验文件,且已经打包成har文件

// Reconstruct parity file
for (Codec codec : Codec.getCodecs()) {
if (isParityFile(srcPath, codec)) {
Decoder decoder = new Decoder(getConf(), codec);
decoder.connectToStore(srcPath);
return processParityFile(srcPath, decoder, context);
}
}//损坏文件为校验文件,没有打包成har文件

// Reconstruct source file without connecting to stripe store
for (Codec codec : Codec.getCodecs()) {
ParityFilePair ppair = ParityFilePair.getParityFile(
codec, srcStat, getConf());
if (ppair != null) {
Decoder decoder = new Decoder(getConf(), codec);
decoder.connectToStore(srcPath);
return processFile(srcPath, ppair, decoder, false, context);
}
}//损坏文件为源文件,首先不通过StripeStore进行修复

// Reconstruct source file through stripe store
for (Codec codec : Codec.getCodecs()) {
if (!codec.isDirRaid) {
continue;
}
try {
Decoder decoder = new Decoder(getConf(), codec);
decoder.connectToStore(srcPath);
if (processFile(srcPath, null, decoder, true, context)) {
return true;
}
} catch (Exception ex) {
LogUtils.logRaidReconstructionMetrics(LOGRESULTS.FAILURE, 0,
codec, srcPath, -1, LOGTYPES.OFFLINE_RECONSTRUCTION_USE_STRIPE,
fs, ex, context);
}
}//损坏文件为源文件,对应目录Raid,尝试使用StripeStore进行修复

2.1 Decoder

源文件和非har文件中的校验文件的修复都通过Decoder,构造:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public Decoder(Configuration conf, Codec codec) {
this.conf = conf;
this.parallelism = conf.getInt("raid.encoder.parallelism", DEFAULT_PARALLELISM);
this.codec = codec;
this.code = codec.createErasureCode(conf);
this.rand = new Random();
this.maxBufSize = conf.getInt(DECODING_MAX_BUFFER_SIZE_KEY,
DEFAULT_MAX_BUFFER_SIZE);
this.bufSize = conf.getInt("raid.decoder.bufsize", maxBufSize);
this.writeBufs = new byte[codec.parityLength][];
this.readBufs = new byte[codec.parityLength + codec.stripeLength][];

parallelCode = new ErasureCode[parallelism];
for (int i = 0; i < parallelism; i++) {
parallelCode[i] = codec.createErasureCode(conf);
}
allocateBuffers();
}

与编码过程的Encoder类似,parallelism定义了读取输入流线程的个数,不过还定义了参与编码的ErasureCode的个数.
maxBufSizebufSize与Encoder作用类似,不过配置项对应raid.decoder.max.buffer.sizeraid.decoder.bufSize.
除此之外,相比较Encoder多了readBufs,对应parityLength+stripeLength个输入流,而Encoder中输入流缓冲数据保存在ParallelStripReader的阻塞队列中.还多了parallelCode成员,用于并行解码,作用在后面解码过程中可以看到.

构造了Decoder对象后,调用connectToStore相应ChecksumStore,StripeStore创建初始化并验证合理性.
读取本地校验存储目录(LocalChecksumStore)和本地Stripe存储目录(LocalStripeStore)下的文件初始化ChecksumStore和StripeStore,体现在编码过程后将计算的校验和以及Stripe信息更新至Encoder的ChecksumStore和StripeStore的同时,会写到相应文件,这样构造Decoder时便同步了以前的校验和和Stripe信息.

对于源文件和不在har文件中的校验文件,构造了Decoder对象以及相应存储器后,通过不同的方法解码.


3. 校验文件processParityFile解码

方法原型如下:

1
boolean processParityFile(Path parityPath, Decoder decoder, Context context)

parityPath为要修复的校验文件,decoder为解码器,context在LocalBlockIntegrityMonitor中为null,用于MapReduce任务.
校验文件修复需要依靠源文件,因此首先获得对应的源文件路径,并检查源文件是否存在,不存在时无法修复,直接返回false:

1
2
3
4
5
6
7
DistributedFileSystem parityFs = getDFS(parityPath);
Path srcPath = RaidUtils.sourcePathFromParityPath(parityPath, parityFs);
if (srcPath == null) {
LOG.warn("Could not get regular file corresponding to parity file " +
parityPath + ", ignoring...");
return false;
}

然后检查源文件和校验文件的修改时间是否一致,不一致的不进行修复,返回false:

1
2
3
4
5
if (srcStat.getModificationTime() != parityStat.getModificationTime()) {
LOG.warn("Mismatching timestamp for " + srcPath + " and " + parityPath +
", ignoring...");
return false;
}

接着获取校验文件内的损坏区块,若损坏区块数为0,不需要修复,直接返回false

1
2
3
4
5
6
List<LocatedBlockWithMetaInfo> lostBlocks = lostBlocksInFile(parityFs, uriPath, parityStat);//读取损坏文件中损坏的区块
if (lostBlocks.size() == 0) {
LOG.warn("Couldn't find any lost blocks in parity file " + parityPath +
", ignoring...");
return false;
}

lostBlocksInFile根据不同的BlockReconstructor有不同的实现,LocalBlockIntegrityMonitor对应的为helper成员,通过文件系统Client获取文件对应所有Blocks信息,读取每一个Block对应的corrupt属性来判断是否损坏.
若校验文件中存在损坏的区块,则循环对每一个损坏的区块进行修复:

1
2
3
4
5
6
7
8
9
List<Block> blocksLostChecksum = new ArrayList<Block>();
List<Block> blocksLostStripe = new ArrayList<Block>();
for (LocatedBlockWithMetaInfo lb: lostBlocks) { //对每一个丢失的区块进行修复

Block lostBlock = lb.getBlock();
long lostBlockOffset = lb.getStartOffset();
LOG.info("Found lost block " + lostBlock +
", offset " + lostBlockOffset);
...
}

checkLostBlocks(blocksLostChecksum, blocksLostStripe, parityPath, decoder.codec);
对每一个损坏的区块,如果存在ChecksumStore判断是否丢失了校验和,通过abortReconstruction判断,目前对文件Raid的rs码和xor码允许丢失校验和,即此时仍然允许进行修复.函数返回true表示区块对应校验和丢失,不能修复,加入到blocksLostChecksum列表中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//从Decoder的ChecksumStore中读取区块丢失区块对应校验和,ChecksumStore在Decoder创建后构造,读取本地目录下的文件初始化
Long oldCRC = decoder.retrieveChecksum(lostBlock, parityPath, lostBlockOffset, parityFs, context);
if (abortReconstruction(oldCRC, decoder)) {
blocksLostChecksum.add(lostBlock);
continue;
}
private boolean abortReconstruction(Long oldCRC, Decoder decoder) {
// If current codec is simulated file-level raid,
// We assume we only have XOR and RS
// it's allowed to lose checksums
return oldCRC == null && decoder.checksumStore != null &&
(decoder.codec.isDirRaid ||
!decoder.codec.simulateBlockFix ||
decoder.requiredChecksumVerification);
}

而若存在StripeStore,则如果对应的StripeInfo不存在,加入到blocksLostStripe中,不予修复.

1
2
3
4
5
6
//从Decoder的StripeStore中读取StripeInfo,StripeStore在Decoder创建后构造,读取本地目录下的文件初始化
StripeInfo si = decoder.retrieveStripe(lostBlock,srcPath, lostBlockOffset, srcFs, context, false);
if (si == null && decoder.stripeStore != null) {
blocksLostStripe.add(lostBlock);
continue;
}

对于能修复的区块,创建以”区块名.tmp”为名字的临时文件

1
2
File localBlockFile = File.createTempFile(lostBlock.getBlockName(), ".tmp");
localBlockFile.deleteOnExit();

然后通过recoverParityBlockToFile将修复的区块数据写到该临时文件中,并验证之前该区块的校验和和修复后区块校验和是否相同

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CRC32 crc = decoder.recoverParityBlockToFile(srcFs, srcStat, parityFs,
parityPath, blockSize, lostBlockOffset, localBlockFile, si,context);

if (crc != null && oldCRC != null && crc.getValue() != oldCRC) {
// checksum doesn't match, it's dangerous to send it
IOException ioe = new IOException("Block " + lostBlock.toString()
+ " new checksum " + crc.getValue()
+ " doesn't match the old one " + oldCRC);
LogUtils.logRaidReconstructionMetrics(LOGRESULTS.FAILURE, 0,
decoder.codec, parityPath, lostBlockOffset,
LOGTYPES.OFFLINE_RECONSTRUCTION_CHECKSUM_VERIFICATION,
parityFs, ioe, context);
throw ioe;
}

最终将临时文件重新发送到文件系统中

1
2
3
computeMetadataAndSendReconstructedBlock(
localBlockFile, lostBlock, blockSize, lb.getLocations(),
lb.getDataProtocolVersion(), lb.getNamespaceID(),progress);

recoverParityBlockToFile中,只是创建临时文件的输出流,然后通过fixErasedBlock进行修复


4. 源文件processFile解码

源文件修复流程和校验文件类似,区别在于源文件可能所属文件Raid或目录Raid,因此一个Stripe可能跨越多个文件,需要通过构建相应StripeReader或通过StripeInfo进行修复.
processFile中前面处理方式一样,读取文件的损坏区块后,对每一个区块进行校验和和Stripe的检查,检查通过后,源文件修复有两种方式,使用StripeStore或构建StripeReader

1
2
3
4
5
6
7
8
9
if (fromStripeStore) {
crc = decoder.recoverBlockToFileFromStripeInfo(srcFs, srcPath,
lostBlock, localBlockFile, blockSize,
lostBlockOffset, blockContentsSize, si, context);

} else {
crc = decoder.recoverBlockToFile(srcFs, srcStat,parityPair.getFileSystem(),
parityPair.getPath(), blockSize, lostBlockOffset, localBlockFile,
blockContentsSize, si, context);
}

而由上面reconstructFile修复流程来看,只有最后一种对目录Raid的修复才使用StripeStore修复.

recoverBlockToFilerecoverParityBlockToFile一样,简单的构造临时文件的流,调用fixErasedBlock进行修复,只是参数有点不同,其中的fixSource在这里为true,而recoverParityBlockToFile为false.

recoverBlockToFileFromStripeInfo简单的构造临时文件流,调用fixErasedBlockImpl,其中传入该方法的recoverFromStripeStore为true,且lostBlock参数为损坏区块.

修复完后,同样的将临时文件发送给文件系统.


5. fixErasedBlock

方法原型:

1
2
3
4
5
CRC32 fixErasedBlock(
FileSystem srcFs, FileStatus srcStat, FileSystem parityFs, Path parityFile,
boolean fixSource, long blockSize, long errorOffset, long limit, boolean partial,
OutputStream out, StripeInfo si, Context context, boolean skipVerify)
throws IOException, InterruptedException

由上分析,修复校验区块fixSource为false,修复源文件区块时fixSource为true.

  • errorOffset,损坏区块在文件中的偏移量
  • limit,要修复的大小
  • partial,是否进行局部修复,即从一个Block的非开始部分开始修复,只修复Block的一部分数据,一般来讲不支持
  • out,修复数据输出流
  • si,损坏区块所在的Stripe信息
  • skipVerify,codec的simulateBlockFix为true时,使用原来的编码方式修复,用于确定是否跳过验证老的编码方式修复所得的校验和和新的编码方式所得校验和一致,一般为false,即需验证

fixErasedBlock中如果codec的simulateBlockFix为true,则使用老的编码方式进行修复

1
2
3
4
5
6
7
8
9
if (this.codec.simulateBlockFix) {
String oldId = getOldCodeId(srcStat);
...
Decoder decoder = (oldId.equals("xor"))? new XORDecoder(conf): new ReedSolomonDecoder(conf);
...
CRC32 oldCRC = (skipVerify && checksumStore == null)? null: new CRC32();
long oldLen = decoder.fixErasedBlockImpl(srcFs, srcFile, parityFs,
parityFile, fixSource, blockSize, errorOffset, limit, partial, out,
context, oldCRC, si, false, null);

并且在skipVerify为false时,使用上面创建的Decoder对象进行修复

1
2
3
4
5
6
if (!skipVerify) {
newCRC = new CRC32();
newLen = this.fixErasedBlockImpl(srcFs, srcFile, parityFs,
parityFile, fixSource, blockSize, errorOffset, limit, partial, null,
context, newCRC, null, false, null);
}

并且将老的修复方式得到的校验和和新的修复方式得到的校验和进行比较验证:

1
2
3
4
if (!skipVerify) {
if (newCRC.getValue() != oldCRC.getValue() ||
newLen != oldLen) {
...

从上可以看出,simulateBlockFix时,老的方式进行修复时,传入输出流out,同时需要验证时,新方式进行修复传入的输出流为null,即修复的区块数据不写到实际输出流中,只是获得其校验和和长度,进行验证而已,真正工作的时老的编码方式.

而simulateBlockFix为false时,直接使用创建的Decoder对象进行修复

1
2
3
4
5
6
7
8
9
else {
CRC32 crc = null;
if (checksumStore != null) {
crc = new CRC32();
}
fixErasedBlockImpl(srcFs, srcFile, parityFs, parityFile, fixSource, blockSize,
errorOffset, limit, partial, out, context, crc, si, false, null);
return crc;
}

这里不知道simulateBlockFix使用新旧两种方式进行修复的意义,不过最终都使用fixErasedBlockImpl进行区块修复.


6. fixErasedBlockImpl

先看以上三种不同的调用方式:

1
2
3
4
5
6
7
8
9
//recoverBlockToFileFromStripeInfo,通过StripeStore源文件的修复
fixErasedBlockImpl(srcFs, srcPath, srcFs, null, true, blockSize,
lostBlockOffset, limit, false, out, context, crc, si, true, lostBlock);
//recoverBlockToFile,即不通过StripeStore的源文件修复,其中fixSource为true
fixErasedBlockImpl(srcFs, srcFile, parityFs, parityFile, fixSource, blockSize,
errorOffset, limit, partial, out, context, crc, si, false, null);
//校验文件的修复,fixSource为false
fixErasedBlockImpl(srcFs, srcFile, parityFs, parityFile, fixSource, blockSize,
errorOffset, limit, partial, out, context, crc, si, false, null);

fixErasedBlockImpl方法原型

1
2
3
4
5
long fixErasedBlockImpl(FileSystem srcFs, Path srcFile, FileSystem parityFs,
Path parityFile, boolean fixSource, long blockSize, long errorOffset,
long limit, boolean partial, OutputStream out, Context context,
CRC32 crc, StripeInfo si, boolean recoverFromStripeStore, Block lostBlock)
throws IOException

即对于源文件区块的修复和校验文件区块的修复来说,主要不同在于fixSource参数,而通过StripeStore进行源文件区块的修复时,recoverFromStripeStore为true,以及最后的lostBlock存在.而这三种方式主要的不同点在于获取损坏区块所属Stripe,以及用于修复的输入流的方式不同.

首先,获取区块在Stripe中对应的位置,一个Stripe中校验区块的索引号在前,源区块的索引号在后.

1
2
3
4
5
6
7
8
9
10
if (recoverFromStripeStore) {
erasedLocationToFix = si.getBlockIdxInStripe(lostBlock);
} else if (fixSource) {
lp = StripeReader.getBlockLocation(codec, srcFs,
srcFile, blockIdx, conf);
erasedLocationToFix = codec.parityLength + lp.getBlockIdxInStripe();
} else {
lp = StripeReader.getParityBlockLocation(codec, blockIdx);
erasedLocationToFix = lp.getBlockIdxInStripe();
}

如果从StripeStore中修复,则直接根据传入的lostBlock以及对应所在的StripeInfo获取该区块在Stripe的索引.这个只需简单的在si中查找与lostBlock相匹配的区块即可.
否则如果不通过StripeStore对源区块进行修复,则通过StripeReader的静态方法getBlockLocation获取LocationPair对象,一个LocationPair管理了区块所在的Stripe索引以及在该Stripe中的区块索引.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
if (codec.isDirRaid) {//若为目录Raid,遍历目录下的文件列表,直到srcFile中损坏区块,统计损坏区块相对于目录的区块索引
Path parentPath = srcFile.getParent();
if (lfs == null) {
lfs = RaidNode.listDirectoryRaidFileStatus(conf, srcFs, parentPath);
}
if (lfs == null) {
throw new IOException("Couldn't list files under " + parentPath);
}
int blockNum = 0;
Path qSrcFile = srcFs.makeQualified(srcFile);
for (FileStatus fsStat: lfs) {
if (!fsStat.getPath().equals(qSrcFile)) {
blockNum += RaidNode.getNumBlocks(fsStat);
} else {
blockNum += blockIdxInFile;
break;
}

}
blockIdx = blockNum;//损坏区块相对于目录的区块索引
}

//文件Raid相对于文件的区块索引则直接为blockIdxInFile
stripeIdx = blockIdx / codec.stripeLength;
blockIdxInStripe = blockIdx % codec.stripeLength;

return new LocationPair(stripeIdx, blockIdxInStripe, lfs);

如果codec为目录Raid,则需遍历srcFile所在父目录下的文件列表,直到遍历到srcFile,计算区块数为总区块索引,便能得到对应的Stripe索引和在Stripe中的区块索引
而如果codec为文件Raid的话,则偏移量所得的区块索引便是总区块索引,得到对应的Stripe索引和Stripe中区块索引.

而如果是对校验区块的修复,因为校验文件只能是一个,所以偏移量所得的区块索引为总区块索引,直接得到对应的Stripe索引和在Stripe中的区块索引.

得到了在损坏区块对应的Stripe以及在Stripe中的位置后,添加到需要修复的位置集合erasedLocations

1
2
3
4
List<Integer> erasedLocations = new ArrayList<Integer>();
// Start off with one erased location.
erasedLocations.add(erasedLocationToFix);
Set<Integer> locationsToNotRead = new HashSet<Integer>();

locationsToNotRead为修复区块时不需要读取的数据块位置.
如果支持区块的局部修复,计算区块的开始偏移量,一般来讲不支持局部修复,只能修复整个区块

1
2
3
if (partial) {
startOffsetInBlock = errorOffset % blockSize;
}

然后,便是解码修复过程,在实际解码之前要创建相应的输入流以及ParallelStreamReader对象,并行从输入流中读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
if (parallelReader == null) {
long offsetInBlock = written + startOffsetInBlock;
if (recoverFromStripeStore) {//通过StripStore修复区块的,直接使用StripeInfo获取对应输入流
inputs = StripeReader.buildInputsFromStripeInfo((DistributedFileSystem)srcFs,
srcStat, codec, si, offsetInBlock,
limit, erasedLocations, locationsToNotRead, code);
} else {//不使用StripeStore修复,构建相应StripeReader读取输入流
StripeReader sReader = StripeReader.getStripeReader(codec,
conf, blockSize, srcFs, lp.getStripeIdx(), srcStat);
inputs = sReader.buildInputs(srcFs, srcFile,
srcStat, parityFs, parityFile, parityStat,
lp.getStripeIdx(), offsetInBlock, erasedLocations,
locationsToNotRead, code);
}
int i = 0;
for (int location : locationsToNotRead) {
locationsToFix[i] = location;
i++;
}

assert(parallelReader == null);
//获取输入流后,构建ParallelStreamReader并行读取输入流中数据
parallelReader = new ParallelStreamReader(reporter, inputs,
(int)Math.min(bufSize, limit),
parallelism, boundedBufferCapacity,
Math.min(limit, blockSize));
parallelReader.start();
}

与编码过程一样,创建ParallelStreamReader需要获取指定的输入流。而对不同的修复方式,获取输入流的方式不同。

6.1 获取输入流以及构建ParallelStreamReader对象

6.1.1 使用StripeStore修复,输入流

使用StripeStore修复源文件区块,通过StripeReader的静态方法buildInputsFromStripeInfo获取一个Stripe的输入流。
buildInputsFromStripeInfo如下:

1
2
3
4
public static InputStream[] buildInputsFromStripeInfo(
DistributedFileSystem srcFs, FileStatus srcStat, Codec codec,StripeInfo si, long offsetInBlock,
long limit, List<Integer> erasedLocations, Set<Integer> locationsToNotRead, ErasureCode code
) throws IOException

返回的输入流个数为stripe中区块的数目,即stripeLength+parityLength.

1
InputStream[] inputs = new InputStream[codec.stripeLength + codec.parityLength];

首先,根据损坏区块的位置erasedLocations获取修复损坏区块应该读取的区块索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
locationsToNotRead.clear();
List<Integer> locationsToRead = code.locationsToReadForDecode(erasedLocations);

public List<Integer> locationsToReadForDecode(List<Integer> erasedLocations) throws TooManyErasedLocations {
List<Integer> locationsToRead = new ArrayList<Integer>(stripeSize());
int limit = stripeSize() + paritySize();
// Loop through all possible locations in the stripe.
for (int loc = limit - 1; loc >= 0; loc--) {
// Is the location good.
if (erasedLocations.indexOf(loc) == -1) {
locationsToRead.add(loc);
if (stripeSize() == locationsToRead.size()) {
break;
}
}
}//优先读取源区块进行修复
// If we are are not able to fill up the locationsToRead list,
// we did not find enough good locations. Throw TooManyErasedLocations.
if (locationsToRead.size() != stripeSize()) {
String locationsStr = "";
for (Integer erasedLocation : erasedLocations) {
locationsStr += " " + erasedLocation;
}
throw new TooManyErasedLocations("Locations " + locationsStr);
}//能同时容忍parityLength个区块损坏,超出时不能进行修复,抛出TooManyErasedLocations异常
return locationsToRead;
}

参与解码的区块数为stripeLength,即为需要读取区块数目。
StripeStore中校验区块索引号在前,源区块索引在后,即0~parityLength-1对应校验区块号,parityLength~stripeLength+parityLength-1对应源区块号。
因此,上述的查找过程,优先使用源区块对损坏区块进行修复。

获取了应该读取区块索引后,对StripeInfo中每一个位置,如果其不应该参与到修复过程中,则对应的流为ZeroInputStream。

1
2
3
4
5
6
7
8
9
10
11
12
13
boolean isErased = (erasedLocations.indexOf(i) != -1);
boolean shouldRead = (locationsToRead.indexOf(i) != -1);

InputStream stm = null;
if (isErased || !shouldRead) {
if (isErased) {
LOG.info("Location " + i + " is erased, using zeros");
} else {
LOG.info("Location " + i + " need not be read, using zeros");
}
locationsToNotRead.add(i);
stm = new RaidUtils.ZeroInputStream(limit);//不需要参与解码的为零输入流
}

否则,获取需要读取区块索引在Stripe中对应区块的输入流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
else {
long blockId;
//获取Stripe中第i个区块对应的区块ID
if (i < codec.parityLength) {
blockId = si.parityBlocks.get(i).getBlockId();
} else if ((i - codec.parityLength) < si.srcBlocks.size()) {
blockId = si.srcBlocks.get(i - codec.parityLength).getBlockId();
} else {
LOG.info("Using zeros for location " + i);
inputs[i] = new RaidUtils.ZeroInputStream(limit);
continue;
}
//通过Client获取指定区块ID的区块信息
LocatedBlockWithFileName lb =
srcFs.getClient().getBlockInfo(blockId);
//区块不存在,抛出异常,在catch中将该区块添加至erasedLocations中,并重新读取Stripe输入流
if (lb == null) {
throw new BlockMissingException(String.valueOf(blockId),
"Location " + i + " can not be found. Block id: " + blockId, 0);
} else {
Path filePath = new Path(lb.getFileName());
FileStatus stat = srcFs.getFileStatus(filePath);
long blockSize = stat.getBlockSize();
//一般不会出现这种情况,局部修复一般不支持,即通常来说offsetInBlock为0
if (offsetInBlock > blockSize) {
stm = new RaidUtils.ZeroInputStream(limit);//偏移量错误,使用零输入流
} else {
if (srcFs.exists(filePath)) {
long startOffset = getBlockIdInFile(srcFs, filePath, blockId) * blockSize;//获取区块在文件中开始偏移量
long offset = startOffset + offsetInBlock;//区块在文件中实际偏移量,一般等于开始偏移量
LOG.info("Opening " + lb.getFileName() + ":" + offset +
" for location " + i);
FSDataInputStream is = srcFs.open(filePath);
is.seek(offset);
stm = is;//获取偏移量对应的输入流
} else {//文件不存在,添加至locationsToNotRead,使用零输入流
LOG.info("Location " + i + ", File " + lb.getFileName() +
" does not exist, using zeros");
locationsToNotRead.add(i);
stm = new RaidUtils.ZeroInputStream(limit);
}
}
}
}

因此,对于Stripe中第i个区块,通过parityBlocks或srcBlocks成员获取其区块ID,然后通过Client获取该ID的区块信息,存在时打开区块所在文件的输入流,定位到指定偏移量后即为第i个区块的输入流。

6.1.2 不使用StripeStore修复,输入流

不使用StripeStore修复时,没有损坏区块所在的Stripe信息,需要通过路径构建StripeReader,在StripeReader中获得对应的Stripe信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//构建源文件的StripeReader,校验文件修复时也是其对应的源文件
StripeReader sReader = StripeReader.getStripeReader(codec, conf, blockSize, srcFs, lp.getStripeIdx(), srcStat);

public static StripeReader getStripeReader(Codec codec, Configuration conf,
long blockSize, FileSystem fs, long stripeIdx, FileStatus srcStat) throws IOException
{

if (codec.isDirRaid) {
Path srcDir = srcStat.isDir()? srcStat.getPath():
srcStat.getPath().getParent();
return new DirectoryStripeReader(conf, codec, fs, stripeIdx, -1L, srcDir,
RaidNode.listDirectoryRaidFileStatus(conf, fs, srcDir));
} else {
return new FileStripeReader(conf, blockSize, codec, fs, stripeIdx, -1L,
srcStat.getPath(), srcStat.getLen());
}
}

构建StripeReader后,获取输入流

1
2
inputs = sReader.buildInputs(srcFs, srcFile, srcStat, parityFs, parityFile, parityStat,
lp.getStripeIdx(), offsetInBlock, erasedLocations, locationsToNotRead, code);

与StripeStore方式构建输入流类似,输入流数目为stripeLength+parityLength,若第i个区块不需要参与解码修复过程,则直接为零输入流。
否则通过相应StripeReader的buildOneInputs构建第i个参与解码修复的输入流,buildOneInputs在StripeReader类中为抽象方法,具体FileStripeReader和DirectoryStripeReader有不同的实现

  • FileStripeReader实现

    1
    2
    3
    4
    5
    public InputStream buildOneInput(
    int locationIndex, long offsetInBlock,
    FileSystem srcFs, Path srcFile, FileStatus srcStat,
    FileSystem parityFs, Path parityFile, FileStatus parityStat
    ) throws IOException

    locationIndex即输入参数i,对应stripe中第i个区块,i小于parityLength时为校验区块,否则为源区块
    通过当前StripeReader的currentStripeIdx和parityLength便能得知当前Stripe的开始区块偏移量,再根据i便能定位在文件中的区块偏移量
    对应的i小于parityLength时,该区块为校验文件parityFile中的第stripeStartIdx*parityLength+i个区块,加上区块偏移量offsetInBlock便得到了区块所属输入流在文件的偏移量。
    而同样的,i大于stripeLength时,区块为源文件srcFile中第stripeStartIdx*StripeLength+(i-parityLength)个区块,同样的得到区块输入流在源文件中的偏移量。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    if (locationIndex < codec.parityLength) {
    // we should use srcfile's block size because parity file could be hared
    //定位区块在校验文件中的偏移量,(stripeStartIdx*parityLength+locationIndex)*blockSize+offsetInBlock,然后创建校验文件的输入流
    //定位到偏移位置
    return this.getParityFileInput(locationIndex, parityFile, parityFs, parityStat, offsetInBlock, blockSize);
    } else {
    //输入流的构建原理和校验文件类似,不过源文件中一个Stripe对应stripeLength个区块,且locationIndex-parityLength才是stripe中区块偏移量
    // Dealing with a src file here.
    int blockIdxInStripe = locationIndex - codec.parityLength;
    int blockIdx = (int)(codec.stripeLength * stripeStartIdx + blockIdxInStripe);
    //这里用的stripeStartIdx,对于encodingUnit为1的情况没问题,如果大于1呢,感觉最好用currentStripeIdx,参见DirectoryStripeReader的实现
    long offset = blockSize * blockIdx + offsetInBlock;
    if (offset >= srcStat.getLen()) {//对应编码时的零区块填充,同样的构建ZeroInputStream
    LOG.info("Using zeros for " + srcFile + ":" + offset +
    " for location " + locationIndex);
    return new RaidUtils.ZeroInputStream(blockSize * (blockIdx + 1));
    } else {//对应的文件偏移量存在,构建文件输入流
    LOG.info("Opening " + srcFile + ":" + offset +
    " for location " + locationIndex);
    FSDataInputStream s = fs.open(
    srcFile, conf.getInt("io.file.buffer.size", 64 * 1024));
    s.seek(offset);
    return s;
    }
    }

    可见,知道了源文件srcFile和校验文件parityFile以及损坏区块所在的Stripe索引后很容易根据参与编码的区块索引创建输入流

  • DirectoryStripeReader
    DirectoryStripeReader实现和FileStripeReader类似,尤其是对校验文件输入流的构建,就是基类StripeReader中的方法,完全一样,因为不管文件Raid还是目录Raid,其校验文件只有一个。
    而对于源文件区块的输入流构建,需要通过DirectoryStripeReader的stripeBlocks来获得该区块对应的文件,之后定位到文件中的偏移量处。
    先回顾下DirectoryStripeReader的stripeBlocks成员,它维护了叶子目录下所有的区块信息,是一个ArrayList列表,元素为BlockInfo。每一个BlockInfo维护了区块在文件中区块索引以及对应文件索引信息。
    例如,假设叶子目录下有两个文件,每个文件5个Blocks,那么stripeBlocks中便有10个元素,依次为:
    (0,0),(0,1),(0,2),(0,3),(0,4),(1,0),(1,1),(1,2),(1,3),(1,4)
    这里我们用(fileIndex,blockIndex)的方式来表示一个BlockInfo,即文件索引和文件中区块索引。
    那么我们如果要取得目录下第i个区块对应的文件信息,则stripeBlocks.get(i)便可直接获得该目录下第i个区块的BlockInfo,然后便能获得区块对应的文件索引,通过目录下的文件列表获得对应的文件。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    else {
    // Dealing with a src file here.
    int blockIdxInStripe = locationIndex - codec.parityLength;
    int curBlockIdx = (int)currentStripeIdx * codec.stripeLength + blockIdxInStripe;
    if (curBlockIdx >= this.stripeBlocks.size()) {
    //目录下的区块索引超出了stripeBlocks的大小,对应最后一个stripe的零区块填充
    LOG.info("Using zeros because we reach the end of the stripe");
    return new RaidUtils.ZeroInputStream(blockSize * (curBlockIdx + 1));
    }
    BlockInfo bi = this.stripeBlocks.get(curBlockIdx);
    FileStatus fstat = lfs.get(bi.fileIdx);
    long offset = fstat.getBlockSize() * bi.blockId +
    offsetInBlock;
    if (offset >= fstat.getLen()) {//应该不太会出现,偏移量超出文件长度的话区块对应的文件应该为另一个文件了
    LOG.info("Using zeros for " + fstat.getPath() + ":" + offset +
    " for location " + locationIndex);
    return new RaidUtils.ZeroInputStream(blockSize * (curBlockIdx + 1));
    } else {//文件数据,创建实际文件输入流
    LOG.info("Opening " + fstat.getPath() + ":" + offset +
    " for location " + locationIndex);
    FSDataInputStream s = fs.open(fstat.getPath(), conf.getInt("io.file.buffer.size", 64 * 1024));
    s.seek(offset);
    return s;
    }

6.1.3 创建ParalleStreamReader

输入流构建完后,便创建ParallelStreamReader,如下

1
2
3
4
assert(parallelReader == null);
parallelReader = new ParallelStreamReader(reporter, inputs, (int)Math.min(bufSize, limit),
parallelism, boundedBufferCapacity,Math.min(limit, blockSize));
parallelReader.start();

这里和编码过程类似,区别在于编码过程传入的输入流inputs长度为stripeLength,即参与编码的源区块输入流。而这里传入的inputs长度为stripeLength+parityLength,其中最多stripeLength个输入流为参与解码修复的有效数据流,其他的为ZeroInputStream。
另外,Decoder的boundedBufferCapacity为2。
具体过程见ParallelStreamReader

ParallelStreamReader开始工作后,便可从阻塞队列中读取数据进行实际的解码操作了
通过ParallelStreamReader读取数据进行修复,每次最多修复bufSize字节数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//locationsToNotRead数目至少为parityLength个,可能大于parityLength(对应区块虽然参与解码,对应编码时为零区块填充,解码时不需要读取)
for (int location : locationsToNotRead) {
locationsToFix[i] = location;
i++;
}

for (written = 0; written < limit; ) {
//这里对应上面的ParalleStreamReader的构建过程,ParallelStreamReader构建完后,启动线程读取数据至缓冲区
...
//获取一个bufSize大小的缓冲区
ParallelStreamReader.ReadResult readResult = readFromInputs(erasedLocations, limit, reporter, parallelReader);
long startDecoding = System.currentTimeMillis();
int toWrite = (int)Math.min((long)bufSize, limit - written);
//解码过程
doParallelDecoding(toWrite, readResult, parallelCode, locationsToFix);
decodingTime += (System.currentTimeMillis() - startDecoding);

for (int i = 0; i < locationsToFix.length; i++) {
if (locationsToFix[i] == erasedLocationToFix) {
if (out != null)//对应的位置为要修复的位置,输出流存在时,写到输出流中
out.write(writeBufs[i], 0, toWrite);
if (crc != null) {//相应更新校验和
crc.update(writeBufs[i], 0, toWrite);
}
written += toWrite;
break;
}
}

如上,从ParalleStreamReader中取出bufSize大小的缓冲块ReadResult,这个ReadResult和编码过程一样。然后通过doParallelDecoding进行解码,将readResult中的数据解码至Decoder的writeBufs中,其中locationsToFix为Stripe中不需要读取的区块位置,locationsToFix的长度不小于parityLength。

一个ReadResult对应的数据解码完成后,将对应的erasedLocationToFix对应的位置解码数据写到输出流中,并在需要计算校验和时更新至相应CRC32对象中。

6.2 并行解码doParalleDecoding

方法原型如下

1
2
3
4
public void doParallelDecoding(int toWrite, 
ParallelStreamReader.ReadResult readResult,
ErasureCode[] parallelCode,int[] locationsToFix)
throws IOException

toWrite为该方法一次解码的字节数,一般来说为bufSize(blockSize为bufSize整数倍)
parallelCode数目为raid.encoder.parallelism个(默认4),并行解码
locationsToFix为那些不参与解码的区块索引
方法中,创建parallelism个解码线程,每个线程最多负责”toWrite/parallelism”个字节数据的解码操作,启动线程后等待线程解码完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int partSize = (int) Math.ceil(toWrite * 1.0 / parallelism);//每个线程最多分到的字节数
// parallel decoding
try {
Thread[] workers = new Thread[parallelism];//parallelism个解码线程
int start = 0;
for (int i = 0; i < parallelism; i++) {
int count = Math.min(toWrite - start, partSize);
workers[i] = new Thread(new DecodeOp(
readResult.readBufs, writeBufs, start, count,
locationsToFix, parallelCode[i]));
workers[i].start();
start += count;
}

// wait for all threads
for (int i = 0; i < parallelism; i++) {
workers[i].join();
workers[i] = null;
}

解码线程为DecodeOp类

1
2
3
4
5
6
7
8
9
10
DecodeOp(byte[][] readBufs, byte[][] writeBufs,
int startIdx, int count, int[] erasedLocations,
ErasureCode code) {
this.readBufs = readBufs;
this.writeBufs = writeBufs;
this.startIdx = startIdx;
this.count = count;
this.erasedLocations = erasedLocations;
this.code = code;
}

如上

  • readBufs为ReadResult中的readBufs即bufSize大小的输入缓冲
  • writeBufs为Decoder的writeBufs,parallelism个线程共享,每个线程写入的位置不一样,不需要同步
  • startIdx是相对于ReadResult的偏移量而不是区块的偏移量

run函数如下

1
2
3
4
5
6
7
public void run() {
try {
code.decodeBulk(readBufs, writeBufs, erasedLocations, startIdx, count);
} catch (IOException e) {
LOG.error("Encountered Exception in DecodeBulk: ", e);
}
}

decodeBulk即为ErasureCode的实现

  • readBufs是大小为bufSize的区块局部缓冲,二维数组,第一维大小为stripeLength+parityLength
  • writeBufs解码输出缓冲,二维数组,第一维大小为parityLength
  • erasedLocations为不参与解码操作的stripe中区块索引号
  • startIdx为局部缓冲中的偏移量
  • count为该函数从偏移量开始需处理的大小

相应实现ErasureCode如ReedSolomonCodeXORCode,完成具体解码功能,也可以继承ErasureCode实现该函数,完成其他编码方式的解码操作。


7. 小结

总结一下,对不同区块的修复以及不同的修复方式,本文分析了对校验文件区块的修复,对源文件区块的修复,以及通过StripeStore对目录Raid的源文件区块修复过程(没分析har文件中的校验文件区块修复,因为还涉及到HarMonitor)。

从三种不同的方式的解码过程来看,主要在于通过指定损坏区块获取其所属的stripe以及在stripe中的区块偏移方式不同,以及构建输入流的方式不同。

通过StripeStore进行修复的,因为直接使用StripeInfo,包含了所属stripe的所有区块信息,便能很容易的定位损坏区块在stripe中的区块偏移。同时在构建输入流时,也能通过Client很方便的获取Block所属的文件,再根据偏移量就能创建相应输入流。

而不使用StripeStore的修复,要构建相应的StripeReader(校验文件通过路径得到源文件路径)。通过StripeReader来计算所属的stripe索引以及在stripe中的区块索引。
构建输入流时,对于文件Raid区块的修复,不管是校验区块输入流还是源文件区块输入流,因为只有一个文件,输入流构建不成问题,通过文件中偏移量将流定位到区块在文件中的位置。

而对于目录Raid区块的修复,校验文件区块输入流因为只有一个文件,很容易构建。而源文件区块的输入流需要定位其所属的文件,这个便通过DirectoryStripeReader完成,因为其维护了目录下所有的BlockInfo,即区块和文件的对应关系,这样便能找到区块对应的文件,通过区块在文件中偏移量便能构建所需输入流。

输入流构建完,通过ParallelStreamReader并行读取输入流数据到阻塞队列中,才能够阻塞队列中读取局部缓冲进行解码。
解码过程与编码过程还略有不同,使用并行解码,默认使用4个线程对一个ReadResult的局部缓冲解码,每个线程处理bufSize/4个字节的数据。

最终解码方法为ErasureCode的decodeBulk方法,可继承ErasureCode重写该方法实现其他的解码方式。