RaidNode源码阅读---辅助类

Hadoop版本:hadoop-20-master


ConfigManager

主要成员

1
2
3
4
5
6
7
8
9
10
11
12
13
public static final long RELOAD_WAIT = 5 * 1000;
private long lastReloadAttempt; // Last time we tried to reload the config file
private long lastSuccessfulReload; // Last time we successfully reloaded config
private boolean lastReloadAttemptFailed = false;
private long reloadInterval = RELOAD_INTERVAL;
private long periodicity; // time between runs of all policies
private long harPartfileSize;
private int maxJobsPerPolicy; // Max no. of jobs running simultaneously for a job.
private int maxFilesPerJob; // Max no. of files raided by a job.
private int maxBlocksPerDirRaidJob; // Max no. of blocks raided by a dir-raid job
// Reload the configuration
private boolean doReload;
private Thread reloadThread;

raid policy文件

raid policy文件的加载,主要方法:reloadConfigs()重新加载policy文件.
policy文件为configFileName成员属性.

加载结果存入成员属性allPolicies中:

1
2
Collection<PolicyInfo> allPolicies = new ArrayList<PolicyInfo>();
setAllPolicies(all);

配置

policy文件中一个policy以policy元素开始,对应PolicyInfo类,policy子元素可以为:

元素 说明
srcPath 指定编码的源文件路径,路径为属性prefix
fileList 文件列表
codecId 编码方式对应的ID
shouldRaid 是否应该编码
description 描述信息
parentPolicy 父policy,属性将被拷贝至当前policy
property name-value对,保存在PolicyInfo的properties属性中

另外,policyName为policy元素的属性.

对于其中的property,可以设置的值有:

  • targetReplication,编码后原文件备份数;
  • metaReplication,编码后校验文件备份数;
  • modTimePeriod,原文件修改后经过多久可以参与编码,单位ms;
  • simulate,与codec的json设置的simulate_block_fix类似,缺省false;

加载时机

是否重新加载policy文件在方法reloadConfigsIfNecessary中,大致条件:
time > lastReloadAttempt + reloadInterval;
即当前时间满足加载间隔,reloadIntervalraid.config.reload.interval,默认10s
policy文件自从上次成功加载后修改过,且修改后时间大于RELOAD_WAIT(5s)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if (time > lastReloadAttempt + reloadInterval) {//reloadInterval默认10s
lastReloadAttempt = time;
try {
File file = new File(configFileName);
long lastModified = file.lastModified();
if (lastModified > lastSuccessfulReload &&
time > lastModified + RELOAD_WAIT) {
reloadConfigs();//重新加载配置文件
lastSuccessfulReload = time;
lastReloadAttemptFailed = false;
return true;
}
}
...


Codec

构造:

通过JSON格式构造,在配置文件中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private Codec(JSONObject json) throws JSONException {
this.jsonStr = json.toString();
this.id = json.getString("id");
this.parityLength = json.getInt("parity_length");
this.stripeLength = json.getInt("stripe_length");
this.erasureCodeClass = json.getString("erasure_code");
this.parityDirectory = json.getString("parity_dir");
this.priority = json.getInt("priority");
this.description = getJSONString(json, "description", "");
this.isDirRaid = Boolean.parseBoolean(getJSONString(json, "dir_raid", "false"));
this.tmpParityDirectory = getJSONString(
json, "tmp_parity_dir", "/tmp" + this.parityDirectory);
this.tmpHarDirectory = getJSONString(
json, "tmp_har_dir", "/tmp" + this.parityDirectory + "_har");
this.simulateBlockFix = json.getBoolean("simulate_block_fix");
checkDirectory(parityDirectory);
checkDirectory(tmpParityDirectory);
checkDirectory(tmpHarDirectory);
setPathSeparatorDirectories();
}

可见,缺省dir_raid为false,为文件Raid.
tmp_parity_dir缺省为/tmp/parity_dir.
tmp_har_dir缺省为/tmp/parity_dir_har.

ErasureCode对象构造:

读取配置

1
2
3
4
Class<?> erasureCode = conf.getClass(ERASURE_CODE_KEY_PREFIX + this.id,
conf.getClassByName(this.erasureCodeClass));
ErasureCode code = (ErasureCode) ReflectionUtils.newInstance(erasureCode,
conf);

可见,policy的id可以配置为具体的ErasureCode实现类,如果不是实现类则通过erasure_code配置.


Encoder

编码过程中,一个EncodingCandidate对应一个Encoder,负责具体编码过程.
这里,主要成员通过构造函数说明,构造函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Encoder(Configuration conf, Codec codec) {
this.conf = conf;
//Encoder和Decoder的parallelism共用配置raid.encoder.parallelism
this.parallelism = conf.getInt("raid.encoder.parallelism",DEFAULT_PARALLELISM);
this.codec = codec;
this.code = codec.createErasureCode(conf);
this.rand = new Random();
this.maxBufSize = conf.getInt(ENCODING_MAX_BUFFER_SIZE_KEY,
DEFAULT_MAX_BUFFER_SIZE);
this.bufSize = conf.getInt("raid.encoder.bufsize", maxBufSize);
this.writeBufs = new byte[codec.parityLength][];
this.checksumStore = RaidNode.createChecksumStore(conf, false);
this.requiredChecksumStore = conf.getBoolean(
RaidNode.RAID_CHECKSUM_STORE_REQUIRED_KEY,
false);
if (codec.isDirRaid) {
// only need by directory raid
this.stripeStore = RaidNode.createStripeStore(conf, false, null);
}
this.retryCountPartialEncoding = conf.getInt(RETRY_COUNT_PARTIAL_ENCODING_KEY,
DEFAULT_RETRY_COUNT_PARTIAL_ENCODING);
allocateBuffers();
}

parallelism为编码器读取源数据的并行度,在之后可以看到这个是为了读取stripe个源文件输入流而启动的线程数.
即假设进行(10,4)rs编码,一次编码过程有10个源区块,默认开启4个线程一次读取这10个区块数据至一个输入缓冲区中,每次读取长度固定,一个线程读完一个区块固定长度的缓冲区后,读取下一个没有读取的区块,详细分析见ParallelStreamReader.

code为擦除码,为codec的配置,具体见Codec部分.
bufSize即为上面说的线程一次读取的字节长度,默认1MB.
writeBufs为编码结果输出缓冲,有parityLength个缓冲区,每个缓冲区大小为bufSize,与线程每次读取长度对应,即每次编码bufSize,输出到该缓冲区中.
checksumStore可选项,需配置hdfs.raid.checksum.store.required为true.
stripeStore可选项,当policy中存在目录Raid时必须配置.
retryCountPartialEncoding局部编码过程失败后的重试次数.配置项raid.encoder.retry.count.partial.encoding.


PolicyInfo

1
2
3
4
5
6
7
8
private Path srcPath;            // the specified src path  
private Path fileListPath; // this path to the file containing list of files.
private String policyName; // name of policy
private String codecId; // the codec used
private boolean shouldRaid; // Should we raid files or not.
private String description; // A verbose description of this policy
private Configuration conf; // Hadoop configuration
private Properties properties; // Policy-dependent properties

CheckSumStore

存储校验和.
hdfs.raid.checksum.store.required为true时,必须配置相应类.默认可以不配置.

LocalCheckSumStore

存储方式

1
2
3
4
/**
* Instead of storing mappings of block and checksums into a file,
* We store each mapping as an empty file like "blockid:checksum";
*/

主要成员

  • storeDirName,本地存储目录,对应文件属性storeDir,为配置项hdfs.raid.local.checksum.dir,目录下为空文件,文件名为”blockid:checksum”

  • 存储结构:

    1
    public ConcurrentHashMap<String, Long> store = new ConcurrentHashMap<String, Long>();

存储Block ID和对应校验和的键值对.

创建

创建时初始化

1
public void initialize(Configuration conf, boolean createStore) throws IOException

初始化时读取配置,创建本地目录,读取目录中文件信息至store成员中.
一般来讲,若允许的话,Encoder,Decoder对应一个ChecksumStore和StripeStore,在构造函数中初始化。

保存

编码过程中,通过writeToChecksumStore将计算的新的校验和更新至ChecksumStore

1
2
3
4
5
6
7
8
9
10
11
private void writeToChecksumStore(DistributedFileSystem dfs,
CRC32[] crcOuts, Path parityTmp, long expectedParityFileSize,
Progressable reporter) throws IOException
{

LocatedBlocks lbks = dfs.getLocatedBlocks(parityTmp, 0L,expectedParityFileSize);
for (int i = 0; i < crcOuts.length; i++) {
this.checksumStore.putIfAbsentChecksum(lbks.get(i).getBlock(),
crcOuts[i].getValue());
reporter.progress();
}
LOG.info("Wrote checksums of parity file into checksum store");
}

以上对应为校验文件区块的校验和存储至ChecksumStore,若允许的话ParallelStreamReader在读取输入流时也可将源区块的校验和更新至ChecksumStore。
putIfAbsentChecksum更新区块对应的校验和。

1
2
3
4
5
6
7
8
9
10
public Long putIfAbsentChecksum(Block blk, Long newChecksum)
throws IOException {
Long oldChecksum = putIfAbsent(blk, newChecksum);
if (oldChecksum!= null && !oldChecksum.equals(newChecksum)) {
throw new IOException("Block " + blk.toString()
+ " has different checksums " + oldChecksum + "(old) and " +
newChecksum+ "(new)");
}
return oldChecksum;
}

putIfAbsent更新区块对应校验和,抽象方法,对于LocalChecksumStore实现来说

1
2
3
4
5
6
7
8
9
10
@Override
public Long putIfAbsent(Block blk, Long newChecksum) throws IOException {
Long oldChecksum = store.putIfAbsent(blk.toString(), newChecksum);
if (oldChecksum == null) {
LOG.info("Put " + blk + " -> " + newChecksum);
File newFile = getFile(blk, newChecksum);
newFile.createNewFile();
}
return oldChecksum;
}

更新后,在本地目录创建相应的文件,如ChecksumStore文件描述一样。

因此,每次更新校验和时会在本地目录创建相应文件,这样新建ChecksumStore时,会读取本地目录的文件,使得新创建的ChecksumStore中区块和校验和对应关系更新。


StripeStore

对于目录Raid必须配置.

LocalStripeStore

LocalStripeStore信息存储在hdfs.raid.local.stripe.dir目录下

存储方式

1
2
3
4
5
6
7
8
9
10
/**  
* for each stripe blk1,blk2,...,blkn under one codec
* we generates n files:
* codecId:blk1
* codecId:blk2
* ...
* codecId:blkn
* they are all hardlinks of the same file whose content is
* "blk1:blk2:...:blkn"
*/

主要成员

  • storeDirName,本地存储目录,对应文件属性storeDir
  • 主要存储结构:

    1
    2
    public ConcurrentHashMap<String, LocalStripeInfo> blockToStripeStore = new ConcurrentHashMap<String, LocalStripeInfo>();
    public ConcurrentHashMap<List<Block>, Long> stripeSet = new ConcurrentHashMap<List<Block>, Long>();

    blockToStripeStore存储了区块和对应的StripeInfo,LocalStripeInfo为LocalStripeStore对应StripeInfo的子类.
    StripeInfo存储了一个stripe的基本信息,包括codec,源区块和校验区块:

    1
    2
    3
    4
    public Codec codec;
    public Block block;
    public List<Block> parityBlocks;
    public List<Block> srcBlocks;

    若StripeStore存在,则可以通过blockToStripeStore获取一个block的对应stripe信息,从而可以进行相应的解码操作,在损坏区块的修复过程中便可以看到通过StripeStore进行修复的情况.

    stripeSet存储可所有stripe的校验区块,键对应为一个stripe的校验区块列表,值固定为1.

    创建

    创建时初始化

    1
    public void initialize(Configuration conf, boolean createStore,FileSystem fs) throws IOException

初始化时读取配置,创建本地存储目录,读取本地目录下的文件
一个本地文件生成一个LocalStripeStore,文件名格式以及文件内容可见上存储方式的说明:
return new LocalStripeInfo(codec, blk, parityBlocks, srcBlocks);
codec是文件名中前部分,blk为后部分,parityBlocks为codec文件内容对应前parityLength个blk对应区块,srcBlocks为剩余区块;

然后存入相应数据结构:

1
2
3
4
5
6
7
LocalStripeInfo oldSi = blockToStripeStore.put(si.getKey(), si);
if (oldSi != null) {
throw new IOException("There are two stripes for the same key: " +
si.getKey() + "\n old: " +oldSi.toString() + "\n new: " +
si.toString());
}
stripeSet.put(si.parityBlocks, 1L);

si.getKey()即为文件名.

保存

编码过程中,通过writeToStripeStore保存已编码的stripe信息至StripeStore中,同时会在本地目录中创建相应的文件,并写文件内容,如前描述的本地文件记录方式一样。具体保存见源代码。


BlockIntegrityMonitor

负责损坏区块和丢失区块的修复.

主要线程

1
2
3
Daemon blockFixerThread = null;
Daemon blockCopierThread = null;
Daemon corruptFileCounterThread = null;

blockFixerThread对应损坏区块的修复,blockCopierThread对应丢失区块的修复,corruptFileCounterThread为损坏文件计数线程.
在RaidNode初始化过程中根据配置项创建相应线程.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
boolean useBlockFixer = !conf.getBoolean(RAID_DISABLE_CORRUPT_BLOCK_FIXER_KEY, false);
boolean useBlockCopier = !conf.getBoolean(RAID_DISABLE_DECOMMISSIONING_BLOCK_COPIER_KEY, true);
boolean useCorruptFileCounter = !conf.getBoolean(RAID_DISABLE_CORRUPTFILE_COUNTER_KEY, false);

Runnable fixer = blockIntegrityMonitor.getCorruptionMonitor();
if (useBlockFixer && (fixer != null)) {
this.blockFixerThread = new Daemon(fixer);
this.blockFixerThread.setName("Block Fixer");
this.blockFixerThread.start();
}

Runnable copier = blockIntegrityMonitor.getDecommissioningMonitor();
if (useBlockCopier && (copier != null)) {
this.blockCopierThread = new Daemon(copier);
this.blockCopierThread.setName("Block Copier");
this.blockCopierThread.start();
}

Runnable counter = blockIntegrityMonitor.getCorruptFileCounter();
if (useCorruptFileCounter && counter != null) {
this.corruptFileCounterThread = new Daemon(counter);
this.corruptFileCounterThread.setName("Corrupt File Counter");
this.corruptFileCounterThread.start();
}

配置项raid.blockreconstruction.corrupt.disable默认为false,对应blockFixerThread默认存在.
配置项raid.blockreconstruction.decommissioning.disable默认为true,对应blockCopierThread默认不存在.
配置项raid.corruptfile.counter.disable默认为false,对应corruptFileCounterThread默认存在.

LocalBlockIntegrityMonitor

对于LocalBlockIntegrityMonitor来说,getDecomissioningMOnitorgetCorruptFileCounter都没有实现,getCorruptionMonitor即为本对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public Runnable getDecommissioningMonitor() {
// This class does not monitor decommissioning files.
return null;
}
@Override
public Runnable getCorruptFileCounter() {
return null;
}
@Override
public Runnable getCorruptionMonitor() {
return this;
}

因此,LocalBlockIntegrityMonitor就算配置了允许DemissionionMonitor和CorruptFileCounter,也没有效;

主要成员:

  • monitorDirs,损坏文件目录

    1
    public String[] monitorDirs;

    创建LocalBlockIntegrityMonitor时,初始化,为文件系统中损坏文件的目录:

    1
    monitorDirs = getCorruptMonitorDirs(getConf());

    raid.corruptfile.counter.dirs读取,默认为”/“,即读取整个文件系统中的损坏文件.

  • monitorSeconds,读取raid.monitor.seconds,默认”18000,86400,604800”(5h,1d,1w)

    1
    private String[] monitorSeconds;
  • recoveyTimes,一个被管理的损坏文件目录对应一个直方图,直方图RaidHistogram有窗口,对应raid.monitor.seconds

    1
    protected HashMap<String, RaidHistogram> recoveryTimes;
  • `histoLen``,直方图长度

    1
    private int histoLen
  • percents,直方图百分比

    1
    private ArrayList<Float> percents;
  • 其他

    • 统计信息

      1
      2
      3
      4
      5
      6
      7
      8
      private long numFilesFixed = 0;
      private long numFileFixFailures = 0;
      private long numFilesCopied = 0;
      private long numFileCopyFailures = 0;
      private long numBlockFixSimulationFailures = 0;
      private long numBlockFixSimulationSuccess = 0;
      private long numFilesToFixDropped = 0;
      private long numfileFixBytesReadRemoteRack = 0;
    • 间隔

      1
      2
      3
      4
      // interval between checks for lost files
      protected long blockCheckInterval;
      protected long corruptFileCountInterval;
      protected short notRaidedReplication;
      1
      2
      public volatile long lastSuccessfulFixTime = System.currentTimeMillis();
      public volatile long approximateNumRecoverableFiles = 0L;