Hadoop版本:hadoop-20-master
ParallelStreamReader
一个Stripe
对应一个ParallelStreamReader
,管理n
个输入流的字节缓冲.
编码过程对应的n为stripeLength
,将这n个输入流编码成parityLength
个输出流.
修复损坏区块的过程对应的n为stripeLength+parityLength
,不过只有其中的stripeLength
个输入流为文件对应可用数据流,其他为ZeroInputStream
.
从输入流中读取的数据存储在阻塞队列boundedBuffer
中,队列中的元素为ReadResult
,每个ReadResult存储了输入流的bufSize
个字节,且队列中ReadResult字节顺序与输入流中字节顺序一致,即若输入流对应一个Block,大小为64MB,bufSize取默认值1M,则boundedBuffer中存储的ReadResult对应的字节分别为:0~1M,1M~2M....
1. 成员:
1 | public ParallelStreamReader(Progressable reporter,InputStream[] streams,int bufSize,int numThreads,int boundedBufferCapacity, |
对于编码过程,streams
成员对应stripeLength
个要编码的输入流.
对于损坏区块的修复过程,streams
对应stripeLength+parityLength
个输入流,其中只有stripeLength个输入流为有效数据对应的流,即一个stripe中没有损坏的区块对应的输入流,可能为源区块或校验区块对应的流.
endOffsets
分别对应每个输入流的结束偏移量.
computeChecksum
决定是否计算源Blocks的校验和,若需要计算时,checksums
分别存储stripeLength个源Blocks的校验和,否则,checksums
为null.
当streams
某一个流为正常文件流且对应Block实际存在时,对应的endOffsets
为该输入流在文件中的实际偏移量,并在需要计算源Block校验和时创建对应的checksums
对象.
否则,当streams
中某一个流为ZeroInputStream(即0 Block,这在编码过程中对应不足Block的填充,而在损坏区块的修复过程中对应不参与修复的区块,参与修复的区块数只为stripeLength),或者是实际文件流但Block不存在,则对应的endOffsets
为Long.MAX_VALUE,对应的checksums
为null.
boundedBuffer
为输入流读取结果的阻塞队列,编码过程对应队列初始容量为1,损坏区块的修复过程对应队列初始容量为2,阻塞队列中的元素为ReadResult
.
一个ReadResult保存了输入流的一段数据,数据保存在成员readBufs
中,readBufs为二维数组,这里记为readBufs[rows][columns].rows值为输入流的个数,对于编码过程,rows值为stripeLength,而对于损坏区块的修复过程,rows值为stripeLength+parityLength
,columns都为bufSize
.
输入流中的数据,通过多个线程并行的依次读取存入多个ReadResult中,而这些ReadResult存储在boundedBuffer中,并依次从boundedBuffer中读取进行处理.
从构造函数也可以看出,当配置的线程数大于输入流的个数时,将线程数调整为stripeLength,因为多出的线程也是浪费.
remainingBytesPerStream
为输入流中剩余的字节数,初始化为一个Block的大小,每次读取bufSize个字节,构造一个ReadResult对象.slots
为信号量,大小为配置的线程数,为该ParallelStreamReader同时允许读取输入流的线程数.readPool
固定大小的线程池,大小为配置的线程数.mainThread
为ParallelStreamReader的主线程.
2. 数据读取
主线程执行如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public void run() {
while (running) {
ReadResult readResult = new ReadResult(streams.length, bufSize);
try {
// Do not try to read more data if the desired amount of data has
// been read.
if (remainingBytesPerStream == 0) {
return;
}
performReads(readResult);//每次读取的数据为ReadResult对象
// Enqueue to bounder buffer.
boundedBuffer.put(readResult);
remainingBytesPerStream -= bufSize;
} catch (InterruptedException e) {
running = false;
}
}
}
主线程中,若输入流数据还未读完,则构建一个ReadResult对象,然后执行performReads(readResult)
从输入流中读取bufSize个字节的数据至readResult中.
所有输入流都读完bufSize个字节后,将该ReadResult放入之前创建的阻塞队列中,等待其他执行流程读取数据进行编码.
由于阻塞队列为数组实现:1
this.boundedBuffer = new ArrayBlockingQueue<ReadResult>(boundedBufferCapacity);
因此元素的放入次序就是读取次序,也就是说该队列会依次存储源输入流的BlockSize/bufSize
个ReadResult对象,这就保证了读取过程中编码的顺序.
而performReads如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24private void performReads(ReadResult readResult) throws InterruptedException {
long start = System.currentTimeMillis();
for (int i = 0; i < streams.length; ) {
//尝试获取信号量,若当前线程池中没有线程可用,则最多等待10s
boolean acquired = slots.tryAcquire(1, 10, TimeUnit.SECONDS);
reporter.progress();
if (acquired) {
readPool.execute(new ReadOperation(readResult, i));
i++;
}
}
// All read operations have been submitted to the readPool.
// Now wait for the operations to finish and release the semaphore.
while (true) {
boolean acquired = slots.tryAcquire(numThreads, 10, TimeUnit.SECONDS);//尝试获取所有信号量,即等待线程执行完成
reporter.progress();
if (acquired) {
slots.release(numThreads);//释放所有信号量,用于下次读取
break;
}
}
readTime += (System.currentTimeMillis() - start);
}
可见,在performReads
中,对每个输入流,先获取一个信号量,获取成功则从之前创建的线程池中取得一个ReadOperation线程执行ReadOperation(readResult,i)
,从第i个输入流读取bufSize个字节放入readResult的readBufs的第i个数组中,ReadOperation线程中读取数据:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36public void run() {
readResult.setException(idx, null);
try {
if (streams[idx] == null) {//错误输入流,一般不为null
// We encountered an error in this stream earlier, use zeros.
Arrays.fill(readResult.readBufs[idx], (byte) 0);
return;
}
boolean eofOK = true;
byte[] buffer = readResult.readBufs[idx];
int numRead = RaidUtils.readTillEnd(streams[idx], buffer, eofOK,
endOffsets[idx], (int) Math.min(remainingBytesPerStream,
buffer.length));//数据读取
if (outs != null && outs.length > 0) {
InjectionHandler.processEventIO(
InjectionEvent.RAID_ENCODING_FAILURE_BLOCK_MISSING, outs[0]);
}
if (computeChecksum && numRead > 0) {
if (checksums[idx] != null) {
checksums[idx].update(buffer, 0, numRead);//更新校验数据
}
}
readResult.numRead[idx] = numRead;
} catch (Exception e) {
LOG.warn("Encountered exception in stream " + idx, e);
readResult.setException(idx, e);//异常时保存异常值
if (streams[idx] != null) {
try {
streams[idx].close();
} catch (IOException ioe) {}
}
streams[idx] = null;
} finally {
ParallelStreamReader.this.slots.release();//最终释放信号量
}
}
当所有的输入流读完bufSize个字节数据后,释放占有的信号量给下次读取使用.
3. 数据读取举例
这里假设对于(10,4)rs码的编码过程,一个Stripe有10个源Blocks,在这里对应10个输入流,默认来讲,会创建4个线程,对应到该函数,便最多会同时有4个线程依次读取这10个输入流,每次读取bufSize个字节构造一个ReadResult对象,读完10个输入流后将构造好的ReadResult放入阻塞队列.
一个Block大小为64MB,bufSize取默认值1MB,则每个ReadResult的readBufs数组为readBufs[10][1MB]
.
因此boundedBuffer中ReadResult的放入和取出的readBufs顺序为:readBufs[10][0-1MB],readBufs[10][1MB-2MB],readBufs[10][2MB-3MB],...,readBuf[10][63MB-64MB]
即先取所有输入流的前1MB数据编码,编码完成后在去第2MB的数据进行编码,以此类推,编码完所有64MB的数据.
而boundedBuffer中同时存在的ReadResult的个数取决于阻塞队列的大小,默认情况下,编码时大小为1,即放入一个ReadResult后,必须等待编码时从中取出一个才能继续放入下一个读取缓冲.而损坏区块的修复默认大小为2.