RaidNode源码阅读---编码过程之上篇Raid文件选取(LocalRaidNode)

Hadoop版本:hadoop-20-master


TriggerMonitor

负责检测policy文件,并相应的执行相应文件的Raid操作.

主要成员:

1
2
private Map<String, PolicyState> policyStateMap = new HashMap<String, PolicyState>();
private volatile long lastTriggerTime = 0;

policyStateMap管理所有加载的policy,键为policy名,值为PolicyState对象.

TriggerMonitor线程主函数doProcess

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
private void doProcess() throws IOException, InterruptedException {
ArrayList<PolicyInfo> allPolicies = new ArrayList<PolicyInfo>();
ArrayList<PolicyInfo> allPoliciesWithSrcPath = new ArrayList<PolicyInfo>();
for (PolicyInfo info : configMgr.getAllPolicies()) {//读取ConfigManager中的policy信息
allPolicies.add(info);
if (info.getSrcPath() != null) {
allPoliciesWithSrcPath.add(info);
}
}
while (running) {
Thread.sleep(RaidNode.triggerMonitorSleepTime);//执行前休眠10s
boolean reloaded = configMgr.reloadConfigsIfNecessary();
if (reloaded) {//重新加载了policy配置,更新
allPolicies.clear();
allPoliciesWithSrcPath.clear();
for (PolicyInfo info : configMgr.getAllPolicies()) {
allPolicies.add(info);
if (info.getSrcPath() != null) {
allPoliciesWithSrcPath.add(info);
}
}
}
LOG.info("TriggerMonitor.doProcess " + allPolicies.size());

for (PolicyInfo info: allPolicies) {//对每个policy执行编码操作
this.putPolicyInfo(info);

List<FileStatus> filteredPaths = null;
if (shouldReadFileList(info)) {//指定fileListPath的,判断是否重新读取
filteredPaths = readFileList(info);
} else if (shouldSelectFiles(info)) {//指定srcPath的,判断是否重新选择
LOG.info("Triggering Policy Filter " + info.getName() +
" " + info.getSrcPath());
try {
filteredPaths = selectFiles(info, allPoliciesWithSrcPath);
} catch (Exception e) {
LOG.info("Exception while invoking filter on policy " + info.getName() +
" srcPath " + info.getSrcPath() +
" exception " + StringUtils.stringifyException(e));
continue;
}
} else {
continue;
}

if (filteredPaths == null || filteredPaths.size() == 0) {
LOG.info("No filtered paths for policy " + info.getName());
continue;
}

// Apply the action on accepted paths
LOG.info("Triggering Policy Action " + info.getName() +
" " + filteredPaths.size() + " files");
try {
//对一个policy进行编码操作
raidFiles(info, filteredPaths);
} catch (Throwable e) {
LOG.info("Exception while invoking action on policy " + info.getName() +
" srcPath " + info.getSrcPath() +
" exception " + StringUtils.stringifyException(e), e);
continue;
}
}
lastTriggerTime = System.currentTimeMillis();
}
}

这里,running为RaidNode的运行状态.两次Raid操作的间隔10s.

首先判断是否重新加载raid配置文件(每隔10s读取配置文件,看文件是否已经修改且过了5s).
对allPolicies和allPoliciesWithSrcPath更新.
更新policy之后,对每个policy读取可Raid的文件,进行Raid操作,Raid操作通过raidFiles(info,filteredPaths)完成.

根据policy配置,有两种读取可raid文件的方式:

  1. 给定fileListPath,则需读取该文件中的文件列表,一行对应一个文件或目录
  2. 直接给定文件或目录路径srcPath,这里的srcPath支持模式匹配.
    ?匹配任意一个字符;
    *匹配0个或以上的字符;
    [abc]匹配abc中一个字符;
    [a-c]同上;
    [^abc]匹配除abc以外的字符;
    {ab,bc}匹配ab或bc;
    具体见hadoop FileSytem中的globStatus方法;

通过fileListPath读取待Raid文件

是否读取policy文件列表

该方式由shouldReadFileList方法判断是否应该读文件列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private boolean shouldReadFileList(PolicyInfo info) {
if (info.getFileListPath() == null || !info.getShouldRaid()) {
return false;
}
String policyName = info.getName();
PolicyState scanState = policyStateMap.get(policyName);
if (scanState.isFileListReadInProgress()) {//若该policy已经有job在编码了,没有超出一个policy可运行job数才能继续raid
int maxJobsPerPolicy = configMgr.getMaxJobsPerPolicy();
int runningJobsCount = getRunningJobsForPolicy(policyName);
// If there is a scan in progress for this policy, we can have
// upto maxJobsPerPolicy running jobs.
return (runningJobsCount < maxJobsPerPolicy);
} else {//否则根据周期来判断是否应该raid
long lastReadStart = scanState.startTime;
return (now() > lastReadStart + configMgr.getPeriodicity());
}
}

如果当前policy有读取Raid文件的job在运行,则只有满足当前policy运行的jobs数目小于raid.distraid.max.jobs(成员maxJobsPerPolicy,缺省10)才能继续读取待Raid的文件,这对应一个policy中文件过多,需要多个job读取文件进行Raid操作.

为什么不需要判断Raid间隔这一条件呢?因为已经有Job在运行了,说明间隔这一条件满足(该policy已经读过一些文件了,只是还没读完).
若没有job运行,则policy当前时间满足加载同一policy配置的raid文件周期raid.policy.rescan.interval时便可.

读取可Raid的文件

该方式下读取待Raid文件,返回fileListPath中文件或目录可以执行Raid操作的部分.

1
public List<FileStatus> readFileList(PolicyInfo info) throws IOException

一个policy一次Raid区块/文件限制:

1
int selectLimit = codec.isDirRaid? configMgr.getMaxBlocksPerDirRaidJob():configMgr.getMaxFilesPerJob();

即对于目录Raid来说限制为maxBlocksPerDirRaidJob(为Block数目),由配置hdfs.raid.dir.raid.block.limit设置,缺省5000.
而对于文件Raid来说,限制为maxFilesPerJob(文件数目),由配置raid.distraid.max.files设置,缺省5000.

readFileList主程序循环读取文件列表中的每一行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
while ((l = scanState.fileListReader.readLine()) != null) {//一行一个raid路径
Path p = new Path(l);
FileSystem fs = p.getFileSystem(conf);
p = fs.makeQualified(p);
FileStatus stat = null;
try {
stat = ParityFilePair.FileStatusCache.get(fs, p);
} catch (FileNotFoundException e) {
LOG.warn("Path " + p + " does not exist", e);
}
if (stat == null) {
continue;
}
short repl = 0;
List<FileStatus> lfs = null;
if (codec.isDirRaid) {
if (!stat.isDir()) {
continue;//目录Raid下,非目录项跳过
}
lfs = RaidNode.listDirectoryRaidFileStatus(conf, fs, p);
if (lfs == null) {
continue;
}
repl = DirectoryStripeReader.getReplication(lfs);
} else {
repl = stat.getReplication();
}

从上可以看出,一个policy对应一个codecID,若配置codecID为目录Raid,则fileListPath中的项不是目录则不会加入到待Raid列表中,跳过

读取文件列表中一项后,判断是否可以raid:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
if (!RaidNode.shouldRaid(conf, fs, stat, codec, lfs)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Should not raid file: " + l);
}
continue;
}
public static boolean shouldRaid(Configuration conf, FileSystem srcFs,
FileStatus stat, Codec codec, List<FileStatus> lfs) throws IOException
{

Path p = stat.getPath();
long blockNum = 0L;
if (stat.isDir() != codec.isDirRaid) {
return false;
}//路径状态(文件或目录)与codec不一致不进行Raid操作

if (tooNewForRaid(stat)) {
return false;
}//路径太新(修改时间到现在没有超过RaidNode.modTimePeriod,配置项raid.mod.time.period,默认1天),不进行Raid操作

blockNum = codec.isDirRaid ? DirectoryStripeReader.getBlockNum(lfs) : numBlocks(stat);

// if the file/directory has fewer than 2 blocks, then nothing to do
if (blockNum <= RaidState.TOO_SMALL_NOT_RAID_NUM_BLOCKS) {
return false;
}//路径Blocks数目小于TOO_SMALL_NOT_RAID_NUM_BLOCKS(2),不进行Raid操作.

return !raidedByOtherHighPriCodec(conf, stat, codec);
//已经被其他高优先级的Codec编码过的,不进行Raid操作.
//代码就不贴了,就是遍历所有codec,取出优先级高的codec,看是否存在该目录的校验文件ParityFilePair.parityExists(stat, tcodec, conf).
}

是否可以Raid见上注释,需要注意的TOO_SMALL_NOT_RAID_NUM_BLOCKS,为RaidState的静态成员,值为2.

另外,从最后判断高优先级codec来看,想象这样一种情景:
若一个目录中原来只有4个文件,已经进行了编码操作,然后添加了若干文件(假如4),我们配置的(10,4)rs码,则同样的优先级下,该目录下所有的文件参与
新编码过程(是否包括原来编码过的文件?不包括,已经编码的文件状态为RAIDED,不参与后续任何编码).而若已经被高优先级的codec编码,且高优先级
codec没到再次编码时间,低优先级的codec到了编码时间,则不予处理,仍然交给高优先级codec进行编码.

如果应该Raid则修改备份数,添加至待Raid列表中,若超出一个policy的可以Raid的区块/文件限制,则退出循环,添加完成(剩下的下一次编码?):

1
2
3
4
5
6
7
8
9
10
11
if ((repl > targetReplication) || (repl == targetReplication && !ParityFilePair.parityExists(stat, codec, conf))) {
list.add(stat);
if (codec.isDirRaid) {
dirRaidNumBlocks += DirectoryStripeReader.getBlockNum(lfs);;
}
}
// for dir-raid, we judge from number of blocks rather than
// that of directories
if (codec.isDirRaid && dirRaidNumBlocks >= selectLimit || !codec.isDirRaid && list.size() >= selectLimit) {
break;
}

通过srcPath读取待Raid文件

是否读取待Raid文件列表

该方式下由shouldSelectFiles判断是否应该继续读取待Raid文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private boolean shouldSelectFiles(PolicyInfo info) {
if (!info.getShouldRaid()) {
return false;
}
String policyName = info.getName();
int runningJobsCount = getRunningJobsForPolicy(policyName);
PolicyState scanState = policyStateMap.get(policyName);
if (scanState.isScanInProgress()) {
int maxJobsPerPolicy = configMgr.getMaxJobsPerPolicy();
// If there is a scan in progress for this policy, we can have
// upto maxJobsPerPolicy running jobs.
return (runningJobsCount < maxJobsPerPolicy);
} else {
// Check the time of the last full traversal before starting a fresh
// traversal.
long lastScan = scanState.startTime;
return (now() > lastScan + configMgr.getPeriodicity());
}
}

可见,判断条件和fileListPath的基本一致.
若有job在运行,则根据配置判断是否可以继续运行job,否则判断是否达到可重新加载周期.

读取可Raid文件

该方式下加载待Raid文件,通过selectFiles完成.

1
filteredPaths = selectFiles(info, allPoliciesWithSrcPath);

其中,allPoliciesWithSrcPath为所有policy中有srcPath属性的.

这种方式一次处理也有限制:

1
2
int selectLimit = configMgr.getMaxFilesPerJob();
List<FileStatus> returnSet = new ArrayList<FileStatus>(selectLimit);

一次Raid操作限制为maxFilesPerJob成员,通过raid.distraid.max.files设置,缺省5000.这里的returnSet中的元素可能是文件(文件Raid),也可能是目录(目录Raid)

TriggerMonitor中policyStateMap成员保存每一个policy的状态信息,PolicyState对象含有以下成员:

1
2
3
4
5
long startTime = 0;
// A policy may specify either a path for directory traversal
// or a file with the list of files to raid.
DirectoryTraversal pendingTraversal = null;
BufferedReader fileListReader = null;

通过srcPath指定待Raid文件路径的,需要通过DirectoryTraversal进行遍历,以过滤能够Raid的路径.srcPath支持模式匹配,上面已有说明.

selectFiles中,若当前policy已有DirectoryTraversal在遍历,则使用现有的即pendingTraversal(policy目录下新增文件,只使用原来的DirectoryTraversal,新增文件怎么添加至原来DirectoryTraversal的directories属性的?)否则,对应目录Raid和文件Raid分别创建DirectoryTraversal对象,其中目录Raid通过raidLeafDirectoryRetriever创建,而文件Raid通过raidFileRetriever创建:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
if (scanState.isScanInProgress()) {
LOG.info("Resuming traversal for policy " + policyName);
traversal = scanState.pendingTraversal;
} else {
LOG.info("Start new traversal for policy " + policyName);
scanState.startTime = now();
if (!Codec.getCodec(info.getCodecId()).isDirRaid) {
traversal = DirectoryTraversal.raidFileRetriever(
info, info.getSrcPathExpanded(), allPolicies, conf,
directoryTraversalThreads, directoryTraversalShuffle,
true);//文件Raid过滤
} else {
traversal = DirectoryTraversal.raidLeafDirectoryRetriever(
info, info.getSrcPathExpanded(), allPolicies, conf,
directoryTraversalThreads, directoryTraversalShuffle,
true);//目录Raid过滤
}
scanState.setTraversal(traversal);
}

这里传入的参数中,info.getSrcPathExpanded即srcPath的模式匹配,返回匹配到的所有路径.
directoryTraversalThreads为遍历线程的数目,配置raid.directorytraversal.threads,缺省4个.
directoryTraversalShuffle为是否打乱srcPath解析路径,配置raid.directorytraversal.shuffle,缺省为true.

在两个函数中,创建对应的DirectoryTraversal:

1
2
3
4
//文件Raid
return new DirectoryTraversal("Raid File Retriever ", roots, fs, filter,numThreads, doShuffle, allowStandby, false);
//目录Raid
return new DirectoryTraversal("Raid File Retriever ", roots, fs, filter,numThreads, doShuffle, allowStandby, true);

最后一个参数为checkLeafDir,目录Raid中对叶子目录才可能加到待Raid列表中.

DirectoryTraversal

DirectoryTraversal主要成员:

1
2
3
4
5
6
7
8
9
10
11
static final public FileStatus FINISH_TOKEN = new FileStatus();
static final int OUTPUT_QUEUE_SIZE = 10000;
final private BlockingQueue<FileStatus> output;//过滤结果阻塞队列
final private BlockingDeque<Path> directories;//待过滤路径
final private Filter filter;//过滤器
final private Processor[] processors;//过滤线程
final private AtomicInteger totalDirectories;
final private AtomicInteger activeThreads;
final private boolean doShuffle;//是否将加入的带过滤路径打乱
final private boolean allowStandby;
private volatile boolean finished = false;

directories由roots初始化,为policy的srcPath匹配的路径;
output为经过过滤后最终能够参与Raid的文件或目录;
两者都为阻塞队列,因为遍历由processors默认4个线程执行遍历.

OUTPUT_QUEUE_SIZE指定了output输出队列的大小.
FINISH_TOKEN为output结束标记.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
this.output = new ArrayBlockingQueue<FileStatus>(OUTPUT_QUEUE_SIZE);
this.directories = new LinkedBlockingDeque<Path>();
this.filter = filter;
this.totalDirectories = new AtomicInteger(roots.size());
this.processors = new Processor[numThreads];
this.activeThreads = new AtomicInteger(numThreads);
this.doShuffle = doShuffle;
this.allowStandby = allowUseStandby;
if (doShuffle) {//打乱待过滤路径
List<Path> toShuffleAndAdd = new ArrayList<Path>();
toShuffleAndAdd.addAll(roots);
Collections.shuffle(toShuffleAndAdd);
this.directories.addAll(toShuffleAndAdd);
} else {
this.directories.addAll(roots);
}
if (roots.isEmpty()) {
try {
output.put(FINISH_TOKEN);
} catch (InterruptedException e) {
throw new IOException(e);
}
return;
}
for (int i = 0; i < processors.length; ++i) {
if (checkLeafDir) {//目录Raid使用LeafDirectoryProcessor线程过滤
processors[i] = new LeafDirectoryProcessor();
} else {//文件Raid使用Processor线程过滤
processors[i] = new Processor();
}
processors[i].setName(friendlyName + i);
}
for (int i = 0; i < processors.length; ++i) {
processors[i].start();
}

根据传入的参数初始化DirectoryTraversal对象,主要包括:

  • 根据传入的匹配路径roots,初始化direcotries,并根据doShuffle决定是否将所有路径打乱顺序.
  • 根据传入的Filter对象filter,初始化成员filter,filter用于检查一个路径是否能够Raid.
  • 根据传入的线程数numThreads创建相应的线程,且根据传入的checkLeafDir决定是创建文件遍历对应的线程Processor还是目录对应的线程LeafDirectoryProcessor.

最后将创建的线程启动,进行遍历操作.

多线程遍历匹配路径

线程中,循环读取directories中路径,进行过滤,过滤结果通过submitOutputs提交,该函数中也将新发现的需继续遍历的目录添加至directories中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
while (!finished && totalDirectories.get() > 0) {
Path dir = null;
try {
dir = directories.poll(1000L, TimeUnit.MILLISECONDS);//取出一个待过滤的路径
} catch (InterruptedException e) {
continue;
}
if (dir == null) {
continue;
}
try {
filterDirectory(dir, subDirs, filtered);//执行过滤
} catch (Throwable ex) {
LOG.error(getName() + " throws Throwable. Skip " + dir, ex);
totalDirectories.decrementAndGet();
continue;
}
int numOfDirectoriesChanged = -1 + subDirs.size();
if (totalDirectories.addAndGet(numOfDirectoriesChanged) == 0) {
interruptProcessors();
}
submitOutputs(filtered, subDirs);//提交结果,filtered添加到output中,subDirs添加到directories中继续过滤
}

每次从direcotires中拿出一个路径dir,通过filterDirectory进行过滤,能够进行Raid的路径存放在filtered中,而需要添加到directories中继续过滤的路径存放在subDirs中.
subDirsfiltered通过submitOutputs提交,subDirs放入directories中,而filtered添加到该DirectoryTraversal的最终输出结果output成员中,为过滤结果.

对于文件Raid和目录Raid,有不同的filterDirectory实现.

文件Raid过滤

对于文件Raid,filterDirectory方法如下:

1
2
3
4
5
6
7
8
9
10
11
elements = fs.listStatus(dir);
if (elements != null) {
for (FileStatus element : elements) {
if (filter.check(element)) {//如果element是目录则会直接返回false
filtered.add(element);
}
if (element.isDir()) {//element为目录添加到subDirs中,继续添加至direcotries中
subDirs.add(element.getPath());
}
}
}

通过filter成员的check函数对一个路径过滤,文件Raid进行过滤的路径为目录时添加到subDirs中.
因此文件Raid对于给定的srcPath会递归的遍历子目录直至遍历到文件,检查文件是否可Raid.

文件Raid的filter如下:

1
2
3
4
5
6
7
8
9
10
11
12
Filter filter = new Filter() {
@Override
public boolean check(FileStatus f) throws IOException {
long now = RaidNode.now();
if (f.isDir()) {//目录直接返回false
return false;
}
RaidState state = checker.check(info, f, now, false);//使用Checker检查f的Raid状态
LOG.debug(f.getPath().toUri().getPath() + " : " + state);
return state == RaidState.NOT_RAIDED_BUT_SHOULD;//只有Raid状态为没有Raid但是应该由本Policy Raid时才为true
}
};

可见对于文件Raid,如果srcPath匹配的路径下还存在目录,check返回flase,不会添加到filtered中,而是加入subDirs中,最终加入到DirectoryTraversal的directories中,继续过滤.

而如果最终得到的element即传入filter的check函数参数f为文件时,通过Checker类对象checkercheck方法返回PolicyInfo info下文件fRaidState,RaidState表示文件的Raid状态,为枚举类型,包含以下值:

1
2
3
4
5
6
RAIDED,已经Raid过了,存在校验文件;
NOT_RAIDED_TOO_NEW,文件或目录修改后没达到可Raid的周期,太新了,不能Raid;
NOT_RAIDED_TOO_SMALL,文件或目录对应的Block数目小于2,不能Raid;
NOT_RAIDED_BUT_SHOULD,文件或目录满足Raid条件,可以加入到待Raid列表中,可以Raid;
NOT_RAIDED_OTHER_POLICY,文件或目录应该由其他Policy进行优先Raid,不能由当前policy Raid;
NOT_RAIDED_NO_POLICY,文件或目录没有对应的Policy(更具体的为ExpandedPolicy),不能Raid;

只有检查到文件(在这里,f此时只能是文件)的Raid状态为NOT_RAIDED_BUT_SHOULD时,即应该由本policy进行Raid,才返回true,将会添加到待Raid列表中.
关于Checker对象的创建以及check方法的详细分析,因为和目录Raid十分一致,放在后面.

目录Raid过滤

对于目录Raid,对应的处理线程为LeafDirectoryProcessor,该线程主程序流程和Processor一样,只是重写了filterDirectory方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
elements = fs.listStatus(dir);
if (elements != null) {
boolean isLeafDir = true;
for (FileStatus element : elements) {
if (element.isDir()) {//子目录天价到subDirs中,存在子目录则不是叶子目录
subDirs.add(element.getPath());
isLeafDir = false;
}
}//目录下还有目录,将子目录添加至subDirs中,返回时添加至directories中,isLeafDir置false,后面的判断不通过,不进行过滤
if (isLeafDir && elements.length > 0) {//叶子目录才通过filter检查
FileStatus dirStat = avatarFs != null?
avatarFs.getFileStatus(dir):
fs.getFileStatus(dir);
if (filter.check(dirStat)) {
filtered.add(dirStat);
}
}
}

路径为文件时直接跳过,非叶子目录继续添加到directories中过滤,对叶子目录通过filter的check方法判断是否能够Raid,能的话添加至filtered中,之后添加至output成员中.

目录Raid对应的filter:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Filter filter = new Filter() {
@Override
public boolean check(FileStatus f) throws IOException {
long now = RaidNode.now();
if (!f.isDir()) {//文件直接返回false
return false;
}
List<FileStatus> lfs = RaidNode.listDirectoryRaidFileStatus(conf, fs, f.getPath());//叶子目录下路径列表
RaidState state = checker.check(info, f, now, false, lfs);//Checker检查叶子目录f的Raid状态
if (LOG.isDebugEnabled()) {
LOG.debug(f.getPath() + " : " + state);
}
return state == RaidState.NOT_RAIDED_BUT_SHOULD;//只有还未Raid但是应该由本Policy Raid的才返回true
}
};

其他基本和文件Raid一致,Checker的check函数中多了个lfs,即叶子目录下所有文件状态列表(注意这里filter传入的参数f只可能为叶子目录).
而关于目录Raid的Checker创建和check函数的分析见下.

Checker

一个policy对应一个Checker,包括了policy下所有匹配到路径的信息.

构建

不管文件Raid还是目录Raid都使用这个类来判断给定路径的Raid状态,文件Raid和目录Raid创建Checker对象:

1
final RaidState.Checker checker = new RaidState.Checker(allInfos, conf);

allInfos是所有包含srcPath的policy,Checker主要成员属性如下:

1
2
3
4
5
final private List<ExpandedPolicy> sortedExpendedPolicy;//policy下没一个具体匹配到的路径生成一个ExpandedPolicy
private boolean inferMTimeFromName;
public static final ThreadLocalDateFormat dateFormat = new ThreadLocalDateFormat("yyyy-MM-dd");
private List<String> excludePatterns = new ArrayList<String>();//例外路径列表
private List<FileStatus> lfs = null;

创建Checker时,对每一个含有srcPath的policy,解析其srcPath,解析出来的每一个路径创建一个ExpandedPolicy对象,包含具体路径和原policy的修改时间备份等信息,原policy作为ExpandedPolicy的父policy.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
List<ExpandedPolicy> sortedExpendedPolicy =  new ArrayList<ExpandedPolicy>();
for (PolicyInfo policy : allInfos) {
sortedExpendedPolicy.addAll(ExpandedPolicy.expandPolicy(policy));
}

static List<ExpandedPolicy> expandPolicy(PolicyInfo info)
throws IOException {
List<ExpandedPolicy> result = new ArrayList<ExpandedPolicy>();
for (Path srcPath : info.getSrcPathExpanded()) {//srcPath匹配到的所有路径
String srcPrefix = normalizePath(srcPath);
long modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod"));//原policy中的修改时间周期
int targetReplication = Integer.parseInt(info.getProperty("targetReplication"));//原policy中的编码后源文件备份
Codec codec = Codec.getCodec(info.getCodecId());
ExpandedPolicy ePolicy = new ExpandedPolicy(srcPrefix, modTimePeriod, codec, targetReplication, info);
/*
每一个匹配到的路径创建一个ExpandedPolicy对象,所有匹配到的路径共享源PolicyInfo的配置属性(即policy文件配置的属性)
例如,srcPath为:/usr/*,存在文件/usr/file1,/usr/sub/file2,/usr/sub/file3,则会创建2ExpandedPolicy,对应的路径为:
/usr/file1和/usr/sub/(/usr/*/*才能匹配到/usr/sub/file2和/usr/sub/file3).
*/
result.add(ePolicy);
}
return result;
}

这就是说原来policy的srcPath对应的不是具体的路径,而是一个路径匹配描述路径,将路径匹配出来,对每一个具体路径创建一个ExpandedPolicy,同时通过ExpandedPolicy的parentPolicy字段保存srcPath的policy和匹配路径ExpandedPolicy的关系.
这里注意的是ExpandedPolicy并不是PolicyInfo的子类.

构建完成之后,排序,因此最终成员sortedExpendedPolicy是有序的:

1
2
Collections.sort(sortedExpendedPolicy, expandedPolicyComparator);
this.sortedExpendedPolicy = Collections.unmodifiableList(sortedExpendedPolicy);

expandedPolicyComparatorExpandedPolicyComparator,其compare方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public int compare(ExpandedPolicy p1, ExpandedPolicy p2) {
if (p1.srcPrefix.length() > p2.srcPrefix.length()) {
// Prefers longer prefix
return -1;
}//路径长的排在前面
if (p1.srcPrefix.length() < p2.srcPrefix.length()) {
return 1;
}
if (p1.codec.priority > p2.codec.priority) {
// Prefers higher priority
return -1;
}//路径长度一样的,优先级高的排在前面
if (p1.codec.priority < p2.codec.priority) {
return 1;
}
// Prefers lower target replication factor
return p1.targetReplication < p2.targetReplication ? -1 : 1;
}

即路径长的在前面,路径长度相同的优先级高的在前面,之后才是编码后备份数少的在前面,这主要是在Checker的check方法里要按序遍历所有的ExpandedPolicy.

同等条件下,高优先级的排在前面很好理解,先判断高优先级的是否应该Raid(matched函数).不过路径长的排在前面不知道什么作用.

构建完有序policy列表后,还需根据raid.exclude.patterns创建不进行编码的文件列表excludePatterns:

1
2
3
4
5
6
7
excludePatterns.add(TRASH_PATTERN);
String excluded = conf.get("raid.exclude.patterns");
if (excluded != null) {
for (String p: excluded.split(",")) {
excludePatterns.add(p);
}
}

至此,Checker的构建完成.

check函数判断Raid状态

由上知,对文件Raid最终遍历到的文件,以及对目录Raid最终遍历到的叶子目录,这里两者都记为f,都会通过filter的check函数判断是否能够Raid,而该判断是通过Checker的check函数返回Raid状态是否为NOT_RAIDED_BUT_SHOULD来确定的.

check函数原型如下:

1
2
3
4
5
public RaidState check(PolicyInfo info, FileStatus file, long now, boolean skipParityCheck) throws IOException {
return check(info, file, now, skipParityCheck, null);
}

public RaidState check(PolicyInfo info, FileStatus file, long now, boolean skipParityCheck, List<FileStatus> lfs) throws IOException

再看文件Raid和目录Raid中的调用:

1
2
3
4
文件Raid:
RaidState state = checker.check(info, f, now, false);
目录Raid:
RaidState state = checker.check(info, f, now, false, lfs);

因此,两者的skipParityCheck都为false,目录Raid还需传入叶子目录f下所有的文件状态列表lfs.

check函数中,首先获取文件或叶子目录f的修改时间,如果raid.infermtimefromname为true,则从文件名推断时间,否则通过文件系统得到修改时间.
接着遍历之前创建的sortedExpendedPolicy,看是否有其他policy已经编码或应该编码的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
for (ExpandedPolicy policy : sortedExpendedPolicy) {
//sortedExpandedPolicy中匹配到和PolicyInfo一致的ExpandedPolicy
//也就是说,如果当前PolicyInfo info在其他PolicyInfo前面就匹配到的话,则不会判断其他PolicyInfo的Raid状态,
//反之,先判断其他PolicyInfo的Raid状态。这个取决于PolicyInfo对应的ExpandedPolicy在sortedExpandedPolicy中的顺序。
if (policy.parentPolicy == info) {
matched = policy;
break;
}
//检查其他PolicyInfo的ExpandedPolicy对应的Raid状态,即其他policy的Raid状态
RaidState rs = policy.match(file, mtime, now, conf, lfs);
if (rs == RaidState.RAIDED) {
return NOT_RAIDED_OTHER_POLICY;//其他policy已经Riad过了
} else if (rs == RaidState.NOT_RAIDED_BUT_SHOULD) {
hasNotRaidedButShouldPolicy = true;//应该由其他policy Raid的(还未Raid)
}
}

sortedExpendedPolicy路径长度最长的在前面,长度一致的优先级高的在前面,否则编码后备份数少的在前面.具体见上构造过程的排序.

如果能够直接找到与policy相匹配的ExpandedPolicy,使用最先找到的ExpandedPolicy进行过滤(主要使用修改时间,从PolicyInfo的子ExpandedPolicy找最先插入sortedExpendedPolicy的那个).

如果有其他policy已经对其Raid过了,则直接返回RAIDED,RAIDED不加到列表中.
而如果其他的policy需要Raid则标志位置位,返回NOT_RAIDED_OTHER_POLICY

当返回NOT_RAIDED_NO_POLICY时,表示文件没有对应的policy,有如下情况:

1
2
3
4
5
6
7
8
9
if (matched == null) {
return NOT_RAIDED_NO_POLICY;
}//遍历所有的sortedExpendedPolicy,没有匹配的ExpandedPolicy
if (shouldExclude(uriPath)) {
return NOT_RAIDED_NO_POLICY;
}//在excludePatterns内,通过raid.exclude.patterns设置
if (file.isDir() != matched.codec.isDirRaid) {
return NOT_RAIDED_NO_POLICY;
}//属性(文件/目录)与codec不匹配,这里对文件Raid来说将子目录添加至directories中继续遍历,而对目录Raid来说匹配到具体文件,忽略.

返回NOT_RAIDED_TOO_SMALL,表示policy设置的路径总区块数目过小(小于2):

1
2
3
4
5
long blockNum = matched.codec.isDirRaid? DirectoryStripeReader.getBlockNum(lfs): computeNumBlocks(file);

if (blockNum <= TOO_SMALL_NOT_RAID_NUM_BLOCKS) {
return NOT_RAIDED_TOO_SMALL;
}

如果存在的ExpandedPolicy和路径匹配,且区块大小合适,即以上情况都没出现,可能Raid,则最终由匹配到的ExpandedPolicy获取状态信息:

1
2
3
4
5
6
7
8
9
//本policy下的ExpandedPolicy判断Raid状态
RaidState finalState = matched.getBasicState(file, mtime, now,skipParityCheck, conf, lfs);
if (finalState == RaidState.RAIDED) {
return finalState;
} else if (hasNotRaidedButShouldPolicy) {
return RaidState.NOT_RAIDED_OTHER_POLICY;
} else {
return finalState;
}

getBasicState如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
RaidState getBasicState(FileStatus f, long mtime, long now,
boolean skipParityCheck, Configuration conf, List<FileStatus> lfs)

throws IOException {

if (f.isDir() != codec.isDirRaid) {
return RaidState.NOT_RAIDED_NO_POLICY;
}//路径状态和codec不匹配
if (now - mtime < modTimePeriod) {
return RaidState.NOT_RAIDED_TOO_NEW;
}//修改后没达到可以Raid的周期
long repl = f.isDir()? DirectoryStripeReader.getReplication(lfs): f.getReplication();
if (repl == targetReplication) {//skipParityCheck时,只要源文件备份和设置的编码后源文件备份数一样则直接判断已经Raid过了
if (skipParityCheck || ParityFilePair.parityExists(f, codec, conf)) {
return RaidState.RAIDED;
}
}
return RaidState.NOT_RAIDED_BUT_SHOULD;
}

可见,传入参数skipParityCheck表示判断是否存在校验文件(即是否已经编码过了)时,只通过比较当前源路径的备份数是否等于编码后源路径备份数,而不需要看实际校验文件是否存在.
当该参数为true时,如果我们设置源文件备份数和编码后源文件备份数一致,可能误判.
从上文件Raid和目录Raid来看,传入的都为false.

可见只有存在相应的ExpandedPolicy,不在excluded列表中,包含的Block大于2,没有其他的ExpandedPolicy需要Raid,路径状态和codec匹配,修改时间大于配置的modTimePeriod(即policy文件中的modTimePeriod),且没有Raid过时才返回NOT_RAIDED_BUT_SHOULD,即添加至待Raid列表中.

过滤实例分析总结

文件Raid policy过滤

文件Raid会对srcPath匹配的路径下所有文件进行编码,包括子目录下的文件.
虽然sortedExpendedPolicy中与policy有关的ExpandedPolicy不包含匹配的子目录,但是Processor线程会将子目录放入directories中递归遍历,最后过滤用的ExpandedPolicy为PolicyInfo关联的ExpandedPolicy最先插入的那个,主要使用PolicyInfo的修改时间进行判断.

例如:
srcPath: /usr/xiaoyun/*
文件系统中存在两个文件: /usr/xiaoyun/idea /usr/xiaoyun/sub/idea1

则ExpandedPolicy会有两个,其srcPrefix为/usr/xiaoyun/idea和/usr/xiaoyun/sub/,因为*匹配0个或多个字符,/usr/xiaoyun/*/*才能匹配到/usr/xiaoyun/sub/idea1.

这样如果没有其他policy可能对这两个文件进行Raid,则Processor线程遍历/usr/xiaoyun/idea时直接因为其为文件,满足条件时加入到列表中.

而遍历/usr/xiaoyun/sub/时,因为其为目录,与codec不匹配,check直接返回false,之后因为其为目录,被Processor线程加入到directories中,下一次遍历.
下一次遍历/usr/xiaoyun/sub/idea1时,最终获得的ExpandedPolicy对应的为以上两个中的一个(根据插入顺序,实验中为/usr/xiaoyun/idea对应的那个),获取修改时间,满足条件加入到列表中.

以上分析经过实际实验验证过.

目录Raid policy过滤

对于目录Riad,若:
srcPath为:/usr/xiaoyun/*
文件系统中存在:/usr/xiaoyun/idea,/usr/xiaoyun/sub/idea1,/usr/xiaoyun/sub/idea2
将对sub子目录下的idea1,idea2进行编码.
因为sub为叶子目录,/usr/xiaoyun/idea虽然能匹配到,但是不是目录,不会加到列表中.

而如果sub目录下还有一个子目录如sub1,则sub下面的idea1,idea2不会编码.当然也要考虑目录和文件Raid的优先级.

文件Raid+目录Raid过滤

rs码,若文件rs优先级小于目录rs,srcPath都为/*,存在以下文件结构:
/idea1,/sub/idea2,/sub/sub1/idea3,/sub/sub1/idea4
则文件rs将编码idea1,idea2文件,目录rs将编码sub1目录下的idea3,idea4文件.
而如果文件rs优先级大于目录rs,则文件rs将编码所有文件,目录rs不编码任何文件.

实际测试结果与预期不符,文件Raid优先级小于目录Raid时,所有文件都由文件Raid进行编码.

查看修改后日志,发现循环执行所有policies时,在allPoliies中先执行文件Raid对应的Policy.遍历到sub1目录时,由于其为目录,filter的check函数中直接返回false,在遍历目录下文件idea3,idea4时才会继续执行checker.check.
在checker.check中从sortedExpendedPolicy中,虽然刚开始会匹配到目录Raid的ExpandedPolicy(srcPath相同,对应前缀长度相同而目录Raid优先级高,目录的在sortedExpendedPolicy前面),但是由于此时为文件,执行ExpandedPolicy的getBasicState时和codec不匹配直接返回NOT_RAIDED_NO_POLICY,对结果无影响.
因此此时最终还是会匹配到文件Raid对应的ExpandedPolicy,从而执行相应的getBasicState,添加至Raid列表中,执行文件Raid.

那么如果先执行目录Raid对应的Policy呢,idea1,idea2因为是文件直接跳过,到sub1子目录时,因为是叶子目录,最先就获取了目录Raid对应的ExpandedPolicy(在有序阻塞队列前面),idea3,idea4便会由目录Raid进行编码.

那么,在doProcess的while循环中,policies的执行先后顺序到底怎么样呢,由前知:

1
ArrayList<PolicyInfo> allPolicies = new ArrayList<PolicyInfo>();

allPolicies为ArrayList,遍历顺序应该是插入顺序,也就是policy文件中policy的先后顺序,而测试中policy文件先是目录Raid才是文件Raid.

不过从日志文件可以看出以上的测试是先执行文件Raid再执行目录Raid,导致了所有文件都是由文件Raid进行编码,不知道为什么会这样.

接着重新执行了一次测试,结果符合预期,文件Raid编码idea1,idea2,目录Raid编码idea3,idea4.
感觉这里有点混乱.多次测试发现大部分这种文件Raid和目录Raid的policy覆盖同一个目录下的文件时,目录Raid优先级高的情况下,大部分都是由目录Raid进行编码,不过有时也会出现文件Raid进行编码的情况.

从测试结果也可以看出,文件Raid以文件为单位,即如果列表中两个文件分别为4个Block,执行(10,4)rs编码,两个文件分别参与编码,每个都需要填充6个0block,最终生成两个校验文件.
而目录Block以目录为单位,如果一个叶子目录下有两个分别为4个Block的文件,执行(10,4)rs编码,两个文件共同参与一次编码,共8个block,填充2个0block,最终生成一个校验文件.