Hadoop版本:hadoop-20-master
ConfigManager
主要成员
1
2
3
4
5
6
7
8
9
10
11
12
13
public static final long RELOAD_WAIT = 5 * 1000;
private long lastReloadAttempt; // Last time we tried to reload the config file
private long lastSuccessfulReload; // Last time we successfully reloaded config
private boolean lastReloadAttemptFailed = false;
private long reloadInterval = RELOAD_INTERVAL;
private long periodicity; // time between runs of all policies
private long harPartfileSize;
private int maxJobsPerPolicy; // Max no. of jobs running simultaneously for a job.
private int maxFilesPerJob; // Max no. of files raided by a job.
private int maxBlocksPerDirRaidJob; // Max no. of blocks raided by a dir-raid job
// Reload the configuration
private boolean doReload;
private Thread reloadThread;
raid policy文件
raid policy文件的加载,主要方法:reloadConfigs()
重新加载policy文件.
policy文件为configFileName
成员属性.
加载结果存入成员属性allPolicies中:1
2Collection<PolicyInfo> allPolicies = new ArrayList<PolicyInfo>();
setAllPolicies(all);
配置
policy文件中一个policy以policy元素开始,对应PolicyInfo类,policy子元素可以为:
元素 | 说明 |
---|---|
srcPath | 指定编码的源文件路径,路径为属性prefix |
fileList | 文件列表 |
codecId | 编码方式对应的ID |
shouldRaid | 是否应该编码 |
description | 描述信息 |
parentPolicy | 父policy,属性将被拷贝至当前policy |
property | name-value对,保存在PolicyInfo的properties属性中 |
另外,policyName为policy元素的属性.
对于其中的property,可以设置的值有:
targetReplication
,编码后原文件备份数;metaReplication
,编码后校验文件备份数;modTimePeriod
,原文件修改后经过多久可以参与编码,单位ms;simulate
,与codec的json设置的simulate_block_fix类似,缺省false;
加载时机
是否重新加载policy文件在方法reloadConfigsIfNecessary
中,大致条件:time > lastReloadAttempt + reloadInterval;
即当前时间满足加载间隔,reloadInterval
为raid.config.reload.interval
,默认10s
policy文件自从上次成功加载后修改过,且修改后时间大于RELOAD_WAIT(5s)1
2
3
4
5
6
7
8
9
10
11
12
13
14if (time > lastReloadAttempt + reloadInterval) {//reloadInterval默认10s
lastReloadAttempt = time;
try {
File file = new File(configFileName);
long lastModified = file.lastModified();
if (lastModified > lastSuccessfulReload &&
time > lastModified + RELOAD_WAIT) {
reloadConfigs();//重新加载配置文件
lastSuccessfulReload = time;
lastReloadAttemptFailed = false;
return true;
}
}
...
Codec
构造:
通过JSON格式构造,在配置文件中:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20private Codec(JSONObject json) throws JSONException {
this.jsonStr = json.toString();
this.id = json.getString("id");
this.parityLength = json.getInt("parity_length");
this.stripeLength = json.getInt("stripe_length");
this.erasureCodeClass = json.getString("erasure_code");
this.parityDirectory = json.getString("parity_dir");
this.priority = json.getInt("priority");
this.description = getJSONString(json, "description", "");
this.isDirRaid = Boolean.parseBoolean(getJSONString(json, "dir_raid", "false"));
this.tmpParityDirectory = getJSONString(
json, "tmp_parity_dir", "/tmp" + this.parityDirectory);
this.tmpHarDirectory = getJSONString(
json, "tmp_har_dir", "/tmp" + this.parityDirectory + "_har");
this.simulateBlockFix = json.getBoolean("simulate_block_fix");
checkDirectory(parityDirectory);
checkDirectory(tmpParityDirectory);
checkDirectory(tmpHarDirectory);
setPathSeparatorDirectories();
}
可见,缺省dir_raid
为false,为文件Raid.tmp_parity_dir
缺省为/tmp/parity_dir.tmp_har_dir
缺省为/tmp/parity_dir_har.
ErasureCode对象构造:
读取配置1
2
3
4Class<?> erasureCode = conf.getClass(ERASURE_CODE_KEY_PREFIX + this.id,
conf.getClassByName(this.erasureCodeClass));
ErasureCode code = (ErasureCode) ReflectionUtils.newInstance(erasureCode,
conf);
可见,policy的id可以配置为具体的ErasureCode实现类,如果不是实现类则通过erasure_code
配置.
Encoder
编码过程中,一个EncodingCandidate
对应一个Encoder,负责具体编码过程.
这里,主要成员通过构造函数说明,构造函数如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23Encoder(Configuration conf, Codec codec) {
this.conf = conf;
//Encoder和Decoder的parallelism共用配置raid.encoder.parallelism
this.parallelism = conf.getInt("raid.encoder.parallelism",DEFAULT_PARALLELISM);
this.codec = codec;
this.code = codec.createErasureCode(conf);
this.rand = new Random();
this.maxBufSize = conf.getInt(ENCODING_MAX_BUFFER_SIZE_KEY,
DEFAULT_MAX_BUFFER_SIZE);
this.bufSize = conf.getInt("raid.encoder.bufsize", maxBufSize);
this.writeBufs = new byte[codec.parityLength][];
this.checksumStore = RaidNode.createChecksumStore(conf, false);
this.requiredChecksumStore = conf.getBoolean(
RaidNode.RAID_CHECKSUM_STORE_REQUIRED_KEY,
false);
if (codec.isDirRaid) {
// only need by directory raid
this.stripeStore = RaidNode.createStripeStore(conf, false, null);
}
this.retryCountPartialEncoding = conf.getInt(RETRY_COUNT_PARTIAL_ENCODING_KEY,
DEFAULT_RETRY_COUNT_PARTIAL_ENCODING);
allocateBuffers();
}
parallelism
为编码器读取源数据的并行度,在之后可以看到这个是为了读取stripe个源文件输入流而启动的线程数.
即假设进行(10,4)rs编码,一次编码过程有10个源区块,默认开启4个线程一次读取这10个区块数据至一个输入缓冲区中,每次读取长度固定,一个线程读完一个区块固定长度的缓冲区后,读取下一个没有读取的区块,详细分析见ParallelStreamReader.
code
为擦除码,为codec的配置,具体见Codec部分.bufSize
即为上面说的线程一次读取的字节长度,默认1MB.writeBufs
为编码结果输出缓冲,有parityLength个缓冲区,每个缓冲区大小为bufSize,与线程每次读取长度对应,即每次编码bufSize,输出到该缓冲区中.checksumStore
可选项,需配置hdfs.raid.checksum.store.required
为true.stripeStore
可选项,当policy中存在目录Raid时必须配置.retryCountPartialEncoding
局部编码过程失败后的重试次数.配置项raid.encoder.retry.count.partial.encoding
.
PolicyInfo
1 | private Path srcPath; // the specified src path |
CheckSumStore
存储校验和.hdfs.raid.checksum.store.required
为true时,必须配置相应类.默认可以不配置.
LocalCheckSumStore
存储方式
1 | /** |
主要成员
storeDirName
,本地存储目录,对应文件属性storeDir
,为配置项hdfs.raid.local.checksum.dir
,目录下为空文件,文件名为”blockid:checksum”存储结构:
1
public ConcurrentHashMap<String, Long> store = new ConcurrentHashMap<String, Long>();
存储Block ID和对应校验和的键值对.
创建
创建时初始化1
public void initialize(Configuration conf, boolean createStore) throws IOException
初始化时读取配置,创建本地目录,读取目录中文件信息至store成员中.
一般来讲,若允许的话,Encoder,Decoder对应一个ChecksumStore和StripeStore,在构造函数中初始化。
保存
编码过程中,通过writeToChecksumStore
将计算的新的校验和更新至ChecksumStore1
2
3
4
5
6
7
8
9
10
11private void writeToChecksumStore(DistributedFileSystem dfs,
CRC32[] crcOuts, Path parityTmp, long expectedParityFileSize,
Progressable reporter) throws IOException {
LocatedBlocks lbks = dfs.getLocatedBlocks(parityTmp, 0L,expectedParityFileSize);
for (int i = 0; i < crcOuts.length; i++) {
this.checksumStore.putIfAbsentChecksum(lbks.get(i).getBlock(),
crcOuts[i].getValue());
reporter.progress();
}
LOG.info("Wrote checksums of parity file into checksum store");
}
以上对应为校验文件区块的校验和存储至ChecksumStore,若允许的话ParallelStreamReader在读取输入流时也可将源区块的校验和更新至ChecksumStore。putIfAbsentChecksum
更新区块对应的校验和。1
2
3
4
5
6
7
8
9
10public Long putIfAbsentChecksum(Block blk, Long newChecksum)
throws IOException {
Long oldChecksum = putIfAbsent(blk, newChecksum);
if (oldChecksum!= null && !oldChecksum.equals(newChecksum)) {
throw new IOException("Block " + blk.toString()
+ " has different checksums " + oldChecksum + "(old) and " +
newChecksum+ "(new)");
}
return oldChecksum;
}
putIfAbsent
更新区块对应校验和,抽象方法,对于LocalChecksumStore实现来说1
2
3
4
5
6
7
8
9
10@Override
public Long putIfAbsent(Block blk, Long newChecksum) throws IOException {
Long oldChecksum = store.putIfAbsent(blk.toString(), newChecksum);
if (oldChecksum == null) {
LOG.info("Put " + blk + " -> " + newChecksum);
File newFile = getFile(blk, newChecksum);
newFile.createNewFile();
}
return oldChecksum;
}
更新后,在本地目录创建相应的文件,如ChecksumStore文件描述一样。
因此,每次更新校验和时会在本地目录创建相应文件,这样新建ChecksumStore时,会读取本地目录的文件,使得新创建的ChecksumStore中区块和校验和对应关系更新。
StripeStore
对于目录Raid必须配置.
LocalStripeStore
LocalStripeStore信息存储在hdfs.raid.local.stripe.dir
目录下
存储方式
1 | /** |
主要成员
storeDirName
,本地存储目录,对应文件属性storeDir
主要存储结构:
1
2public ConcurrentHashMap<String, LocalStripeInfo> blockToStripeStore = new ConcurrentHashMap<String, LocalStripeInfo>();
public ConcurrentHashMap<List<Block>, Long> stripeSet = new ConcurrentHashMap<List<Block>, Long>();blockToStripeStore
存储了区块和对应的StripeInfo
,LocalStripeInfo
为LocalStripeStore对应StripeInfo的子类.
StripeInfo存储了一个stripe的基本信息,包括codec,源区块和校验区块:1
2
3
4public Codec codec;
public Block block;
public List<Block> parityBlocks;
public List<Block> srcBlocks;若StripeStore存在,则可以通过blockToStripeStore获取一个block的对应stripe信息,从而可以进行相应的解码操作,在损坏区块的修复过程中便可以看到通过StripeStore进行修复的情况.
stripeSet
存储可所有stripe的校验区块,键对应为一个stripe的校验区块列表,值固定为1.创建
创建时初始化
1
public void initialize(Configuration conf, boolean createStore,FileSystem fs) throws IOException
初始化时读取配置,创建本地存储目录,读取本地目录下的文件
一个本地文件生成一个LocalStripeStore,文件名格式以及文件内容可见上存储方式
的说明:return new LocalStripeInfo(codec, blk, parityBlocks, srcBlocks);
codec是文件名中前部分,blk为后部分,parityBlocks为codec文件内容对应前parityLength个blk对应区块,srcBlocks为剩余区块;
然后存入相应数据结构:1
2
3
4
5
6
7LocalStripeInfo oldSi = blockToStripeStore.put(si.getKey(), si);
if (oldSi != null) {
throw new IOException("There are two stripes for the same key: " +
si.getKey() + "\n old: " +oldSi.toString() + "\n new: " +
si.toString());
}
stripeSet.put(si.parityBlocks, 1L);
si.getKey()即为文件名.
保存
编码过程中,通过writeToStripeStore
保存已编码的stripe信息至StripeStore中,同时会在本地目录中创建相应的文件,并写文件内容,如前描述的本地文件记录方式一样。具体保存见源代码。
BlockIntegrityMonitor
负责损坏区块和丢失区块的修复.
主要线程
1 | Daemon blockFixerThread = null; |
blockFixerThread
对应损坏区块的修复,blockCopierThread
对应丢失区块的修复,corruptFileCounterThread
为损坏文件计数线程.
在RaidNode初始化过程中根据配置项创建相应线程.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24boolean useBlockFixer = !conf.getBoolean(RAID_DISABLE_CORRUPT_BLOCK_FIXER_KEY, false);
boolean useBlockCopier = !conf.getBoolean(RAID_DISABLE_DECOMMISSIONING_BLOCK_COPIER_KEY, true);
boolean useCorruptFileCounter = !conf.getBoolean(RAID_DISABLE_CORRUPTFILE_COUNTER_KEY, false);
Runnable fixer = blockIntegrityMonitor.getCorruptionMonitor();
if (useBlockFixer && (fixer != null)) {
this.blockFixerThread = new Daemon(fixer);
this.blockFixerThread.setName("Block Fixer");
this.blockFixerThread.start();
}
Runnable copier = blockIntegrityMonitor.getDecommissioningMonitor();
if (useBlockCopier && (copier != null)) {
this.blockCopierThread = new Daemon(copier);
this.blockCopierThread.setName("Block Copier");
this.blockCopierThread.start();
}
Runnable counter = blockIntegrityMonitor.getCorruptFileCounter();
if (useCorruptFileCounter && counter != null) {
this.corruptFileCounterThread = new Daemon(counter);
this.corruptFileCounterThread.setName("Corrupt File Counter");
this.corruptFileCounterThread.start();
}
配置项raid.blockreconstruction.corrupt.disable
默认为false,对应blockFixerThread
默认存在.
配置项raid.blockreconstruction.decommissioning.disable
默认为true,对应blockCopierThread
默认不存在.
配置项raid.corruptfile.counter.disable
默认为false,对应corruptFileCounterThread
默认存在.
LocalBlockIntegrityMonitor
对于LocalBlockIntegrityMonitor
来说,getDecomissioningMOnitor
和getCorruptFileCounter
都没有实现,getCorruptionMonitor
即为本对象:1
2
3
4
5
6
7
8
9
10
11
12
13@Override
public Runnable getDecommissioningMonitor() {
// This class does not monitor decommissioning files.
return null;
}
@Override
public Runnable getCorruptFileCounter() {
return null;
}
@Override
public Runnable getCorruptionMonitor() {
return this;
}
因此,LocalBlockIntegrityMonitor
就算配置了允许DemissionionMonitor和CorruptFileCounter,也没有效;
主要成员:
monitorDirs
,损坏文件目录1
public String[] monitorDirs;
创建LocalBlockIntegrityMonitor时,初始化,为文件系统中损坏文件的目录:
1
monitorDirs = getCorruptMonitorDirs(getConf());
从
raid.corruptfile.counter.dirs
读取,默认为”/“,即读取整个文件系统中的损坏文件.monitorSeconds
,读取raid.monitor.seconds
,默认”18000,86400,604800”(5h,1d,1w)1
private String[] monitorSeconds;
recoveyTimes
,一个被管理的损坏文件目录对应一个直方图,直方图RaidHistogram有窗口,对应raid.monitor.seconds
1
protected HashMap<String, RaidHistogram> recoveryTimes;
`histoLen``,直方图长度
1
private int histoLen
percents
,直方图百分比1
private ArrayList<Float> percents;
其他
统计信息
1
2
3
4
5
6
7
8private long numFilesFixed = 0;
private long numFileFixFailures = 0;
private long numFilesCopied = 0;
private long numFileCopyFailures = 0;
private long numBlockFixSimulationFailures = 0;
private long numBlockFixSimulationSuccess = 0;
private long numFilesToFixDropped = 0;
private long numfileFixBytesReadRemoteRack = 0;间隔
1
2
3
4// interval between checks for lost files
protected long blockCheckInterval;
protected long corruptFileCountInterval;
protected short notRaidedReplication;1
2public volatile long lastSuccessfulFixTime = System.currentTimeMillis();
public volatile long approximateNumRecoverableFiles = 0L;