RaidNode源码阅读---RaidNode启动过程分析

Hadoop版本:hadoop-20-master


start-raidnode.sh

通过bin目录下的start-raidnode.sh启动,加载conf目录下的hadoop-env.sh,然后导出RaidNode相关选项(若有):

1
2
3
4
if [ -f "${HADOOP_CONF_DIR}/hadoop-env.sh" ]; then
. "${HADOOP_CONF_DIR}/hadoop-env.sh"
fi
export HADOOP_DAEMON_OPTS=$HADOOP_RAIDNODE_OPTS

接着通过bin目录下的hadoop-daemon.sh启动raidnode:

1
"$bin"/hadoop-daemon.sh --config $HADOOP_CONF_DIR start raidnode


start-daemon.sh

hadoop-daemon.sh中,首先加载bin目录下的hadoop-config.sh和conf目录下的hadoop-env.sh(如果存在):

1
2
3
4
. "$bin"/hadoop-config.sh
if [ -f "${HADOOP_CONF_DIR}/hadoop-env.sh" ]; then
. "${HADOOP_CONF_DIR}/hadoop-env.sh"
fi

接着检查当前用户名和${HADOOP_USERNAME}(hadoop-env.sh配置)是否一致,然后创建一些需要的目录(logs目录等),接着启动raidnode:

1
2
cd "$HADOOP_HOME"
nohup nice -n $HADOOP_NICENESS "$HADOOP_HOME"/bin/hadoop --config $HADOOP_CONF_DIR $command "$@" > "$log" 2>&1 < /dev/null &

nohup后台启动进程,通过$HADOOP_NICENESS设置进程的nice值(动态优先级),启动输出文件为日志文件,可见最终通过bin目录下的hadoop文件启动raidnode,传入参数为raidnode.


hadoop

该脚本为所有节点的启动脚本,根据传入的参数不同执行不同的行为,还包括一些文件系统的工具.
hadoop脚本位于bin目录下,首先也加载bin目录下的hadoop-config.sh和conf目录下的hadoop-env.sh(如果存在),然后检查并设置$JAVA_HOME和最大堆大小:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
. "$bin"/hadoop-config.sh
if [ -f "${HADOOP_CONF_DIR}/hadoop-env.sh" ]; then
. "${HADOOP_CONF_DIR}/hadoop-env.sh"
fi
if [ "$JAVA_HOME" != "" ]; then
JAVA_HOME=$JAVA_HOME
fi
if [ "$JAVA_HOME" = "" ]; then
echo "Error: JAVA_HOME is not set."
exit 1
fi
JAVA=$JAVA_HOME/bin/java
JAVA_HEAP_MAX=-Xmx1000m
if [ "$HADOOP_HEAPSIZE" != "" ]; then
JAVA_HEAP_MAX="-Xmx""$HADOOP_HEAPSIZE""m"
fi

可见,启动时最大堆大小为1GB,通过hadoop脚本启动的节点最大堆都是1GB(包括raidnode,namenode等等).

classpath设置

接着便是设置启动节点时的classpath:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
CLASSPATH="${HADOOP_CONF_DIR}" //conf目录
CLASSPATH=${CLASSPATH}:$HADOOP_CLASSPATH //HADOOP_CLASSPATH变量
CLASSPATH=${CLASSPATH}:$JAVA_HOME/lib/tools.jar

# for developers, add Hadoop classes to CLASSPATH
if [ -d "$HADOOP_HOME/build/classes" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_HOME/build/classes //build/classes目录
fi
if [ -d "$HADOOP_HOME/build/webapps" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_HOME/build
fi
if [ -d "$HADOOP_HOME/build/test/classes" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_HOME/build/test/classes
fi
if [ -d "$HADOOP_HOME/build/tools" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_HOME/build/tools
fi
if [ -d "$HADOOP_HOME/build/contrib/highavailability/classes" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_HOME/build/contrib/highavailability/classes
fi

# so that filenames w/ spaces are handled correctly in loops below
IFS=

# for releases, add core hadoop jar & webapps to CLASSPATH
if [ -d "$HADOOP_HOME/webapps" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_HOME
fi
for f in $HADOOP_HOME/hadoop-*-core.jar; do
CLASSPATH=${CLASSPATH}:$f;
done

if [ -d "$HADOOP_HOME/build/ivy/lib/Hadoop/common" ]; then //通过ivy下载的三方库
if [ "$COMMAND" = "hdfsnfsproxy" ] ; then
for f in $HADOOP_HOME/build/ivy/lib/Hadoop/common/slf4j*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
for f in $HADOOP_HOME/build/ivy/lib/Hadoop/common/zookeeper*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
for f in $HADOOP_HOME/build/ivy/lib/Hadoop/common/guava*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
for f in $HADOOP_HOME/build/ivy/lib/Hadoop/common/json*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
for f in $HADOOP_HOME/build/ivy/lib/Hadoop/common/commons*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
for f in $HADOOP_HOME/build/ivy/lib/Hadoop/common/log4j*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
for f in $HADOOP_HOME/build/ivy/lib/Hadoop/common/hadoop*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
else
for f in $HADOOP_HOME/build/ivy/lib/Hadoop/common/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
fi
fi

for f in $HADOOP_HOME/lib/jsp-2.1/*.jar; do //jsp库
CLASSPATH=${CLASSPATH}:$f;
done
...

可见,启动节点时的classpath包括:

  • conf目录
  • ${HADOOP_CLASSPATH}变量,在conf目录下的hadoop-env.sh中设置
  • JAVA_HOME目录下lib/tools.jar
  • hadoop编译结果,编译结果存放在$HADOOP_HOME/build目录下,包括该目录下的:
    • classes,hadoop核心类的编译输出目录
    • webapps,WEB服务编译输出目录,包括namenode,raidnode等WEB页面
    • test/classes,所有测试类的输出目录
    • tools,工具类的输出目录,具体见源码目录下的tools目录
    • contrib/highavailability/classes,高可靠性相关类输出目录
  • $HADOOP_HOME目录下的webapps目录,即WEB编译输出部署目录
  • $HADOOP_HOME目录下的hadoop-*-core.jar相关文件
  • $HADOOP_HOME/build/ivy/lib/Hadoop/common目录下的所有jar文件,即通过ivy下载的三方依赖jar包,最终会拷贝到$HADOOP_HOME/lib目录下
  • $HADOOP_HOME/lib/jsp-2.1/目录下的所有jar包

以上的classpath是所有节点启动时共用的,大概包括了${HADOOP_CLASSPATH}指定路径,hadoop源码编译的路径(包括core,tools,test,contrib编译输出)和通过ivy下载的三方jar包以及相关的jar包.
对于raidnode来说,还包括自有的路径${CORONA_LIB_PATH}:

1
2
3
4
5
elif [ "$COMMAND" = "raidnode" ] ; then
CLASS='org.apache.hadoop.raid.RaidNode'
JMX_OPTS=$HADOOP_JMX_RAIDNODE_OPTS
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_GC_LOG_OPTS"
CLASSPATH=${CORONA_LIB_PATH}:${CLASSPATH}

当然,其他节点也各有自有的classpath.

Java参数启动

最后通过java启动raidnode:

1
2
3
export CLASSPATH
export JVM_PID=$$
exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $JMX_OPTS "-Dfb_hadoop_version=0.20" $CLASS $CMDLINE_OPTS "$@"

CLASSPATH为之前设置的,$JAVA_HEAP_MAX设置1GB,$HADOOP_OPTS如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
export LD_LIBRARY_PATH="$JAVA_LIBRARY_PATH"

HADOOP_OPTS="$HADOOP_OPTS $HADOOP_DAEMON_OPTS"
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.log.dir=$HADOOP_LOG_DIR"
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.log.file=$HADOOP_LOGFILE"
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.home.dir=$HADOOP_HOME"
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.id.str=$HADOOP_IDENT_STRING"
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.root.logger=${HADOOP_ROOT_LOGGER:-INFO,console}"
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.application=${HADOOP_APPLICATION:-default}"
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.installationid=${CLUSTER_NAME:-default}"
if [ "x$JAVA_LIBRARY_PATH" != "x" ]; then
HADOOP_OPTS="$HADOOP_OPTS -Djava.library.path=$JAVA_LIBRARY_PATH"
fi
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.policy.file=$HADOOP_POLICYFILE"

包括了基本属性,其中的$JAVA_LIBRARY_PATH有待探索.
$CLASS为org.apache.hadoop.raid.RaidNode,$CMDLINE_OPTS不同的节点启动有不同的选项,raidnode没有,最终传入的参数$@为空.


RaidNode主类执行

通过运行org.apache.hadoop.raid.RaidNode,正式开始RaidNode的启动,该类的main函数如下:

1
2
3
4
5
StringUtils.startupShutdownMessage(RaidNode.class, argv, LOG);
RaidNode raid = createRaidNode(argv, null);
if (raid != null) {
raid.join();
}

在createRaidNode(argv,null)中通过RaidNode node = createRaidNode(conf);创建RaidNode:

1
2
3
4
5
6
Class<?> raidNodeClass = conf.getClass(RAIDNODE_CLASSNAME_KEY, DistRaidNode.class);
if (!RaidNode.class.isAssignableFrom(raidNodeClass)) {
throw new ClassNotFoundException("not an implementation of RaidNode");
}

Constructor<?> constructor = raidNodeClass.getConstructor(new Class[] {Configuration.class} );
return (RaidNode) constructor.newInstance(conf);

读取配置raid.classname通过反射创建RaidNode,即执行所配置类的构造函数,默认DistRaidNode.

LocalRaidNode启动

若配置为LocalRaidNode,则对于LocalRaidNode,构造函数为:

1
2
3
4
public LocalRaidNode(Configuration conf) throws IOException {
super(conf);
LOG.info("created");
}

在父类RaidNode构造函数中通过initialize(conf);完成对象的构造,读取所需配置,创建相应对象,线程.

LocalRaidNode初始化

initialize中,创建了RaidNode管理的对象和相关线程,主要包括以下成分:

主要对象成员

  • ConfigManager,成员属性configMgr

    1
    configMgr = new ConfigManager(conf);

    加载policy文件

  • Server,成员属性server

    1
    this.server = RPC.getServer(this, socAddr.getAddress().getHostAddress(), socAddr.getPort(), handlerCount, false, conf);

    RPC服务器

  • HttpServer,成员属性infoServer
    Raid http服务器:

    1
    this.infoServer = new HttpServer("raid", this.infoBindAddress, tmpInfoPort, tmpInfoPort == 0, conf);
  • CheckSumStore

    1
    RaidNode.createChecksumStore(conf, true);
  • StripeStore

    1
    RaidNode.createStripeStore(conf, true, FileSystem.get(conf));

线程相关:

  • blockIntegrityMonitor,成员属性blockIntegrityMonitor
    负责损坏区块或丢失区块的修复,包括:

    • blockFixerThread损坏区块修复线程
    • blockCopierThread丢失区块修复线程
    • corruptFileCounterThread
      根据配置决定是否创建相应线程:

      1
      2
      3
      4
      5
      6
      boolean useBlockFixer = 
      !conf.getBoolean(RAID_DISABLE_CORRUPT_BLOCK_FIXER_KEY, false);
      boolean useBlockCopier =
      !conf.getBoolean(RAID_DISABLE_DECOMMISSIONING_BLOCK_COPIER_KEY, true);
      boolean useCorruptFileCounter =
      !conf.getBoolean(RAID_DISABLE_CORRUPTFILE_COUNTER_KEY, false);

      详见BlockIntegrityMonitor类.

  • TriggerMonitor,成员属性triggerMonitor,对应的线程属性triggerThread

    1
    2
    3
    4
    this.triggerMonitor = new TriggerMonitor();
    this.triggerThread = new Daemon(this.triggerMonitor);
    this.triggerThread.setName("Trigger Thread");
    this.triggerThread.start();

    周期性检查policy,对可Raid的path进行编码操作.

  • UnderRedundantFilesProcessor,成员属性urfProcessor,对应的线程属性urfThread

    1
    2
    3
    4
    this.urfProcessor = new UnderRedundantFilesProcessor(conf);
    this.urfThread = new Daemon(this.urfProcessor);
    this.urfThread.setName("UnderRedundantFilesProcessor Thread");
    this.urfThread.start();
  • PlacementMonitor,成员属性placementMonitor
    主要成员:BlockMover->线程clusterUpdater

    1
    2
    this.placementMonitor = new PlacementMonitor(conf);
    this.placementMonitor.start();

    负责区块管理和移动

  • PurgeMonitor,成员属性purgeMonitor,对应线程属性purgeThread

    1
    2
    3
    4
    this.purgeMonitor = new PurgeMonitor(conf, placementMonitor, this);
    this.purgeThread = new Daemon(purgeMonitor);
    this.purgeThread.setName("Purge Thread");
    this.purgeThread.start();

    负责孤立校验区块的删除

  • HarMonitor,成员线程属性harThread

    1
    2
    3
    this.harThread = new Daemon(new HarMonitor());
    this.harThread.setName("HAR Thread");
    this.harThread.start();

    负责创建har文件

  • StatisticsCollector,成员属性statsCollector,对应线程属性statsCollectorThread

    1
    2
    3
    4
    this.statsCollector = new StatisticsCollector(this, configMgr, conf);
    this.statsCollectorThread = new Daemon(statsCollector);
    this.statsCollectorThread.setName("Stats Collector");
    this.statsCollectorThread.start();

    负责统计信息的收集