RaidNode源码阅读---辅助类之ParallelStreamReader

Hadoop版本:hadoop-20-master


ParallelStreamReader

一个Stripe对应一个ParallelStreamReader,管理n个输入流的字节缓冲.

编码过程对应的n为stripeLength,将这n个输入流编码成parityLength个输出流.
修复损坏区块的过程对应的n为stripeLength+parityLength,不过只有其中的stripeLength个输入流为文件对应可用数据流,其他为ZeroInputStream.
从输入流中读取的数据存储在阻塞队列boundedBuffer中,队列中的元素为ReadResult,每个ReadResult存储了输入流的bufSize个字节,且队列中ReadResult字节顺序与输入流中字节顺序一致,即若输入流对应一个Block,大小为64MB,bufSize取默认值1M,则boundedBuffer中存储的ReadResult对应的字节分别为:
0~1M,1M~2M....

1. 成员:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public ParallelStreamReader(Progressable reporter,InputStream[] streams,int bufSize,int numThreads,int boundedBufferCapacity,
long maxBytesPerStream,boolean computeChecksum,OutputStream[] outs) throws IOException {
this.reporter = reporter;
//决定是否计算源区块的校验和,若计算则读完一个源区块后计算校验和存储至ChecksumStore,并在本地目录创建相应文件
this.computeChecksum = computeChecksum;
this.streams = new InputStream[streams.length];//输入流,编码过程为stripeLength,解码过程为stripeLength+parityLength
this.endOffsets = new long[streams.length];
if (computeChecksum) {
this.checksums = new CRC32[streams.length];
}
this.outs = outs;
for (int i = 0; i < streams.length; i++) {
this.streams[i] = streams[i];
if (this.streams[i] instanceof DFSDataInputStream) {
DFSDataInputStream stream = (DFSDataInputStream)this.streams[i];
// in directory raiding, the block size for each input stream
// might be different, so we need to determine the endOffset of
// each stream by their own block size.
List<LocatedBlock> blocks = stream.getAllBlocks();
if (blocks.size() == 0) {
this.endOffsets[i] = Long.MAX_VALUE;
if (computeChecksum) {
this.checksums[i] = null;
}
} else {//实际的数据输入流,计算结束偏移量
long blockSize = blocks.get(0).getBlockSize();
this.endOffsets[i] = stream.getPos() + blockSize;
if (computeChecksum) {
this.checksums[i] = new CRC32();
}
}
} else {//ZeroInputStream,偏移量MAX_VALUE,对应校验和为null
this.endOffsets[i] = Long.MAX_VALUE;
if (computeChecksum) {
this.checksums[i] = null;
}
}
streams[i] = null; // Take over ownership of streams.
}
this.bufSize = bufSize;
//存储ReadResult的阻塞队列,编码过程大小为1,解码过程大小为2
this.boundedBuffer = new ArrayBlockingQueue<ReadResult>(boundedBufferCapacity);
if (numThreads > streams.length) {
this.numThreads = streams.length;
} else {
this.numThreads = numThreads;
}//调整线程数
this.remainingBytesPerStream = maxBytesPerStream;
this.slots = new Semaphore(this.numThreads);
ThreadFactory ParallelStreamReaderFactory = new ThreadFactoryBuilder()
.setNameFormat("ParallelStreamReader-read-pool-%d")
.build();
this.readPool = Executors.newFixedThreadPool(this.numThreads, ParallelStreamReaderFactory);//固定大小线程池
this.mainThread = new MainThread();//主线程
mainThread.setName("ParallelStreamReader-main");
}

对于编码过程,streams成员对应stripeLength个要编码的输入流.
对于损坏区块的修复过程,streams对应stripeLength+parityLength个输入流,其中只有stripeLength个输入流为有效数据对应的流,即一个stripe中没有损坏的区块对应的输入流,可能为源区块或校验区块对应的流.

endOffsets分别对应每个输入流的结束偏移量.

computeChecksum决定是否计算源Blocks的校验和,若需要计算时,checksums分别存储stripeLength个源Blocks的校验和,否则,checksums为null.

streams某一个流为正常文件流且对应Block实际存在时,对应的endOffsets为该输入流在文件中的实际偏移量,并在需要计算源Block校验和时创建对应的checksums对象.
否则,当streams中某一个流为ZeroInputStream(即0 Block,这在编码过程中对应不足Block的填充,而在损坏区块的修复过程中对应不参与修复的区块,参与修复的区块数只为stripeLength),或者是实际文件流但Block不存在,则对应的endOffsets为Long.MAX_VALUE,对应的checksums为null.

boundedBuffer为输入流读取结果的阻塞队列,编码过程对应队列初始容量为1,损坏区块的修复过程对应队列初始容量为2,阻塞队列中的元素为ReadResult.

一个ReadResult保存了输入流的一段数据,数据保存在成员readBufs中,readBufs为二维数组,这里记为readBufs[rows][columns].rows值为输入流的个数,对于编码过程,rows值为stripeLength,而对于损坏区块的修复过程,rows值为stripeLength+parityLength,columns都为bufSize.

输入流中的数据,通过多个线程并行的依次读取存入多个ReadResult中,而这些ReadResult存储在boundedBuffer中,并依次从boundedBuffer中读取进行处理.

从构造函数也可以看出,当配置的线程数大于输入流的个数时,将线程数调整为stripeLength,因为多出的线程也是浪费.

remainingBytesPerStream为输入流中剩余的字节数,初始化为一个Block的大小,每次读取bufSize个字节,构造一个ReadResult对象.
slots为信号量,大小为配置的线程数,为该ParallelStreamReader同时允许读取输入流的线程数.
readPool固定大小的线程池,大小为配置的线程数.
mainThread为ParallelStreamReader的主线程.

2. 数据读取

主线程执行如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void run() {
while (running) {
ReadResult readResult = new ReadResult(streams.length, bufSize);
try {
// Do not try to read more data if the desired amount of data has
// been read.
if (remainingBytesPerStream == 0) {
return;
}
performReads(readResult);//每次读取的数据为ReadResult对象
// Enqueue to bounder buffer.
boundedBuffer.put(readResult);
remainingBytesPerStream -= bufSize;
} catch (InterruptedException e) {
running = false;
}
}
}

主线程中,若输入流数据还未读完,则构建一个ReadResult对象,然后执行performReads(readResult)从输入流中读取bufSize个字节的数据至readResult中.
所有输入流都读完bufSize个字节后,将该ReadResult放入之前创建的阻塞队列中,等待其他执行流程读取数据进行编码.
由于阻塞队列为数组实现:

1
this.boundedBuffer = new ArrayBlockingQueue<ReadResult>(boundedBufferCapacity);

因此元素的放入次序就是读取次序,也就是说该队列会依次存储源输入流的BlockSize/bufSize个ReadResult对象,这就保证了读取过程中编码的顺序.

而performReads如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void performReads(ReadResult readResult) throws InterruptedException {
long start = System.currentTimeMillis();
for (int i = 0; i < streams.length; ) {
//尝试获取信号量,若当前线程池中没有线程可用,则最多等待10s
boolean acquired = slots.tryAcquire(1, 10, TimeUnit.SECONDS);
reporter.progress();
if (acquired) {
readPool.execute(new ReadOperation(readResult, i));
i++;
}
}
// All read operations have been submitted to the readPool.
// Now wait for the operations to finish and release the semaphore.
while (true) {
boolean acquired = slots.tryAcquire(numThreads, 10, TimeUnit.SECONDS);//尝试获取所有信号量,即等待线程执行完成
reporter.progress();
if (acquired) {
slots.release(numThreads);//释放所有信号量,用于下次读取
break;
}
}

readTime += (System.currentTimeMillis() - start);
}

可见,在performReads中,对每个输入流,先获取一个信号量,获取成功则从之前创建的线程池中取得一个ReadOperation线程执行ReadOperation(readResult,i),从第i个输入流读取bufSize个字节放入readResult的readBufs的第i个数组中,ReadOperation线程中读取数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void run() {
readResult.setException(idx, null);
try {
if (streams[idx] == null) {//错误输入流,一般不为null
// We encountered an error in this stream earlier, use zeros.
Arrays.fill(readResult.readBufs[idx], (byte) 0);
return;
}
boolean eofOK = true;
byte[] buffer = readResult.readBufs[idx];
int numRead = RaidUtils.readTillEnd(streams[idx], buffer, eofOK,
endOffsets[idx], (int) Math.min(remainingBytesPerStream,
buffer.length));//数据读取
if (outs != null && outs.length > 0) {
InjectionHandler.processEventIO(
InjectionEvent.RAID_ENCODING_FAILURE_BLOCK_MISSING, outs[0]);
}
if (computeChecksum && numRead > 0) {
if (checksums[idx] != null) {
checksums[idx].update(buffer, 0, numRead);//更新校验数据
}
}
readResult.numRead[idx] = numRead;
} catch (Exception e) {
LOG.warn("Encountered exception in stream " + idx, e);
readResult.setException(idx, e);//异常时保存异常值
if (streams[idx] != null) {
try {
streams[idx].close();
} catch (IOException ioe) {}
}
streams[idx] = null;
} finally {
ParallelStreamReader.this.slots.release();//最终释放信号量
}
}

当所有的输入流读完bufSize个字节数据后,释放占有的信号量给下次读取使用.

3. 数据读取举例

这里假设对于(10,4)rs码的编码过程,一个Stripe有10个源Blocks,在这里对应10个输入流,默认来讲,会创建4个线程,对应到该函数,便最多会同时有4个线程依次读取这10个输入流,每次读取bufSize个字节构造一个ReadResult对象,读完10个输入流后将构造好的ReadResult放入阻塞队列.

一个Block大小为64MB,bufSize取默认值1MB,则每个ReadResult的readBufs数组为readBufs[10][1MB].
因此boundedBuffer中ReadResult的放入和取出的readBufs顺序为:
readBufs[10][0-1MB],readBufs[10][1MB-2MB],readBufs[10][2MB-3MB],...,readBuf[10][63MB-64MB]
即先取所有输入流的前1MB数据编码,编码完成后在去第2MB的数据进行编码,以此类推,编码完所有64MB的数据.
而boundedBuffer中同时存在的ReadResult的个数取决于阻塞队列的大小,默认情况下,编码时大小为1,即放入一个ReadResult后,必须等待编码时从中取出一个才能继续放入下一个读取缓冲.而损坏区块的修复默认大小为2.