Hadoop版本:Hadoop-1.2.1
参考:《Hadoop技术内幕-深入解析Hadoop Common和HDFS架构设计与实现原理》,后文简称技术内幕
RPC中用到的通道和选择器另见通道和选择器,缓冲区另见缓冲区,以及序列化(主要是ObjectWritable)另见序列化
下文所说的[概述]为上篇RPC源码分析—概述
本文不涉及SIMPLE,DIGEST,KERBEROS三种鉴权方式的鉴权过程分析
本文所属RPC源码分析
,按照调用请求的步骤对源码进行分析,限于篇幅分为上下两篇。
上篇分析了”客户端代理创建”,”服务器创建启动”,”客户端服务器连接建立”,”客户端头数据发送”,”服务器对头数据验证,鉴权”,至此连接已经建立验证,可以发送正常的调用请求了。本文即为上篇。
而下篇分析了”客户端调用请求”,”服务器对调用请求的处理响应”,”客户端接收响应”,”客户端服务器连接的关闭”,”客户端关闭”,”服务器关闭”等过程,下篇另见RPC源码分析下篇。
1. 客户端代理创建
由RPC.getProxy
获取客户端代理,由[概述]分析知一个代理处理用户到具体服务器的具体协议对应的连接,同时连接属于一个Client,而Client一般由SocketFactory决定,不同SocketFactory对应不同Client。因此getProxy需指定包括SocketFactory
,用户
,服务器地址
,协议
这4个信息,另外还需指定连接读操作超时时间。
对应getProxy重要的5个信息,最少需给定服务器地址
和使用的协议
,其他的都可以默认。SocketFactory
默认为默认SocketFactory,用户默认为当前用户,读操作超时时间默认为0,此时会设置为pingInterval(见[概述])。
getProxy的重载:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27//指定协议protocol,地址addr。
public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
return getProxy(protocol, clientVersion, addr, conf, NetUtils.getDefaultSocketFactory(conf), 0);
}
//指定协议protocol,地址addr,SockeFactory factory
public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr, Configuration conf,SocketFactory factory) throws IOException {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
return getProxy(protocol, clientVersion, addr, ugi, conf, factory, 0);
}
//指定协议protocol,地址addr,SocketFactory factory,超时rpcTimeout
public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
return getProxy(protocol, clientVersion, addr, ugi, conf, factory, rpcTimeout);
}
//指定协议protocol,地址addr,用户ticket,SocketFactory factory,超时rpcTimeout
public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException {
return getProxy(protocol, clientVersion, addr, ticket, conf, factory,rpcTimeout, null, true);
}
可见,最终会使用最多参数的getProxy
创建代理对象:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory, int rpcTimeout,
RetryPolicy connectionRetryPolicy, boolean checkVersion) throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
//由protocol,addr,ticket,factory,rpcTimeout等信息创建调用处理器对象Invoker,其中connectionRetryPolicy缺省为重试10次,每次间隔1s
final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy);
//创建动态代理,代理的调用处理器为上面创建的invoker
VersionedProtocol proxy = (VersionedProtocol)Proxy.newProxyInstance( protocol.getClassLoader(), new Class[]{protocol}, invoker);
if (checkVersion) {//默认情况下需要检查客户端和服务器协议的版本
checkVersion(protocol, clientVersion, proxy);
}
return proxy;
}
因此,创建代理主要就是通过传入的参数创建对应的调用处理器对象Invoker,然后通过Java动态代理创建动态代理,创建代理后,默认情况下需要检查客户端和服务器协议的版本号,不匹配的话抛出VersionMismatch
异常。因此,创建的代理核心信息保存在调用处理器invoker中。
看一下Invoker的构造:1
2
3
4
5
6private Invoker(Class<? extends VersionedProtocol> protocol,
InetSocketAddress address, UserGroupInformation ticket, Configuration conf,
SocketFactory factory,int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException {
this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf);
this.client = CLIENTS.getClient(conf, factory);
}
因此,就是通过传入的用户,服务器地址,协议,超时时间等信息构建Invoker负责处理连接的标识ConnectionId
,即确定了该Invoker负责处理的连接。
而用SocketFactory在缓存中查找所属客户端,若存在现有使用该factory的客户端直接使用,只是增加其引用计数refCount
,否则构建一个Client对象,valueClass
初始化为ObjectWritable
。
这些成员具体含义另见[概述]。1
2
3
4
5
6
7
8
9
10
11private synchronized Client getClient(Configuration conf,
SocketFactory factory) {
Client client = clients.get(factory);
if (client == null) {//不存在,构建Client对象,valueClass为ObjectClass,并放入缓存
client = new Client(ObjectWritable.class, conf, factory);
clients.put(factory, client);
} else {
client.incCount();//存在的话,client新增了一个Connection,增加引用计数
}
return client;
}
这样,客户端的代理构建完成了,其实主要是构建了代理关联的调用处理器中的connectionId
(对应一个Connection),以及client
(将代理处理的连接注册到相应客户端)。
服务器创建
创建服务器对象,通过RPC.getServer
完成,至少指定服务器绑定的地址和端口,以及实现协议的实例对象,看方法重载:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17//给定实现协议的实例对象instance,要绑定的地址bindAddress和端口port
public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf) throws IOException {
return getServer(instance, bindAddress, port, 1, false, conf);
}
//给定instance,bindAddress,port的基础上,指定处理器线程数目(缺省1)
public static Server getServer(final Object instance, final String bindAddress, final int port,
final int numHandlers, final boolean verbose, Configuration conf) throws IOException {
return getServer(instance, bindAddress, port, numHandlers, verbose, conf, null);
}
//上有基础上,再指定安全管理器secretManager(缺省为null)
public static Server getServer(final Object instance, final String bindAddress, final int port,
final int numHandlers, final boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
}
可见,最终通过RPC.Server
的构造函数构建服务器对象。1
2
3
4
5
6public Server(Object instance, Configuration conf, String bindAddress, int port, int numHandlers,
boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
super(bindAddress, port, Invocation.class, numHandlers, conf,classNameBase(instance.getClass().getName()), secretManager);
this.instance = instance;
this.verbose = verbose;
}
调用父类org.apache.hadoop.ipc.Server
的构造函数,然后初始化实现协议的实例对象instance。父类构造函数如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37protected Server(String bindAddress, int port, Class<? extends Writable> paramClass, int handlerCount,
Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
this.bindAddress = bindAddress;//绑定地址
this.conf = conf;
this.port = port;//绑定端口
this.paramClass = paramClass;//客户端传过来的包含调用方法参数等信息的参数类,为Invocation
this.handlerCount = handlerCount;//处理器线程数目,默认1
this.socketSendBufferSize = 0;
//待处理Call队列callQueue中最大的待处理Call数,默认100
this.maxQueueSize = handlerCount * conf.getInt(IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
//响应缓冲最大大小,1MB
this.maxRespSize = conf.getInt(IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY, IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
//监听器listener中Reader线程数目,默认1
this.readThreads = conf.getInt(IPC_SERVER_RPC_READ_THREADS_KEY,IPC_SERVER_RPC_READ_THREADS_DEFAULT);
this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);//构造callQueue
this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);//清理空闲连接时一次最多清理的个数
//服务器端连接超过此值应该执行清理(连接数过多,一个连接为一个线程,资源消耗大)
this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);//是否鉴权
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
// Start the listener here and let it bind to the port
listener = new Listener();//构建监听器
this.port = listener.getAddress().getPort();
this.rpcMetrics = RpcInstrumentation.create(serverName, this.port);
this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);//是否开启Nagel算法,默认false开启
// Create the responder here
responder = new Responder();//响应器
if (isSecurityEnabled) {
SaslRpcServer.init(conf);
}
}
初始化相应成员后,构建监听器listener
和响应器responder
,而对应的多个处理器在start
方法中创建。
构建监听器时,会打开一个选择器,然后将ServerSocketChannel注册到选择器上,注册Accept操作,由该选择器负责处理到来的连接请求。
同时创建readThreads
个Reader线程并启动,每一个Reader线程中都有一个Selector。
由listener中的Selector处理Accept的连接请求,会创建SocketChannel对象,然后从所有的Reader线程中选择下一个Reader线程,将accept的SocketChannel注册在选择的Reader线程的Selector中,注册读操作,这样便由该Reader线程负责该连接到来的数据请求。
监听器构造
1 | public Listener() throws IOException { |
响应器构建
响应器的创建比较简单,打开用于为写操作注册的Selector1
2
3
4
5
6Responder() throws IOException {
this.setName("IPC Server Responder");
this.setDaemon(true);
writeSelector = Selector.open(); // create a selector
pending = 0;
}
服务器启动
服务器启动在Server
的start
方法中,一般通过RPC.getServer
创建服务器后,调用服务器的start方法启动服务器1
2
3
4
5
6
7
8
9
10public synchronized void start() {
responder.start();//启动响应器
listener.start();//启动监听器
handlers = new Handler[handlerCount];
for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i);//创建处理器
handlers[i].start();//启动处理器
}
}
分别启动响应器,监听器,创建对应数量的处理器并启动。处理器没有相应的成员属性,只是简单的设置了处理器线程名字以及将其设为后台线程1
2
3
4public Handler(int instanceNumber) {
this.setDaemon(true);
this.setName("IPC Server handler "+ instanceNumber + " on " + port);
}
代理方法调用
通过getProxy创建代理对象后,如果在代理对象上调用协议的方法,则像Java动态代理一样,会将该调用转发给其调用处理器Invoker的invoke方法:1
2
3
4
5
6public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
...//日志相关
ObjectWritable value = (ObjectWritable)client.call(new Invocation(method, args), remoteId);
...//日志相关
return value.get();
}
如上,将调用的方法和参数封装成Invocation对象,该对象正是服务器的paramClass
即接受的参数类。对应的在客户端会将包含方法和参数信息的Invocation序列化发送到服务器,而服务器会创建Invocation对象,反序列化到来的数据,便接收到调用请求。
封装成Invocation对象后,通过Invoker所属的Client,调用call
方法执行远程过程调用,该调用对应的连接标识为Invoker所属的remoteId
。
调用的返回值类型为ObjectWritable
,在Client构建时可见初始化valueClass
为ObjectWritable
。因为对应在服务器端发送给客户端的相应是通过ObjectWritable序列化的,因此客户端这边有响应可接收时,通过构造ObjectWritable
对象反序列化即为返回值数据。
Client的call方法中,继续将方法和参数信息对象Invocation封装成一个Call对象,对应客户端的一次远程调用(另见[概述])。然后获取到服务器端的连接,若连接还未建立,则需建立到服务器端的连接,发送rpcHeader
和ConnectionHeader
,否则直接从缓冲区中获取存在的连接。获取连接后将Invocation对象信息反序列化到服务器端,休眠等待。等待的过程中由所属的Connection线程读取服务器到来的响应,读取到响应(成功或失败)时,唤醒该等待线程,完成远程过程调用。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException {
Call call = new Call(param);//封装成Call对象,对应为客户端的一个远程过程调用
Connection connection = getConnection(remoteId, call);//获取连接,若缓冲区中没有则需创建并发送rpcHeader和ConnectionHeader到服务器
connection.sendParam(call); // send the parameter,发送调用的方法和参数信息
boolean interrupted = false;
synchronized (call) {
while (!call.done) {
try {
call.wait(); // wait for the result,休眠等待调用完成,由Connection线程读取响应并唤醒该等待的线程
} catch (InterruptedException ie) {
// save the fact that we were interrupted
interrupted = true;
}
}
if (interrupted) {
// set the interrupt flag now that we are done waiting
Thread.currentThread().interrupt();
}
if (call.error != null) {//远程调用中出现异常,直接抛出给上层,对应为协议方法捕获,就像本地调用一样
if (call.error instanceof RemoteException) {
call.error.fillInStackTrace();
throw call.error;
} else { // local exception
// use the connection because it will reflect an ip change, unlike
// the remoteId
throw wrapException(connection.getRemoteAddress(), call.error);
}
} else {//正常返回,获取值
return call.value;
}
}
}
建立连接
客户端
如上,客户端建立连接对应在getConnection
方法中获取一个连接,若连接不在客户端维护的缓冲区connections
中,则需创建连接1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22private Connection getConnection(ConnectionId remoteId, Call call) throws IOException, InterruptedException {
if (!running.get()) {
throw new IOException("The client is stopped");
}
Connection connection;
do {
synchronized (connections) {
connection = connections.get(remoteId);
if (connection == null) {//connections中不存在
connection = new Connection(remoteId);//创建新的连接对象
connections.put(remoteId, connection);//放入缓存中
}
}
} while (!connection.addCall(call));//获取的连接中添加Call对象
//we don't invoke the method below inside "synchronized (connections)"
//block above. The reason for that is if the server happens to be slow,
//it will take longer to establish a connection and that will slow the
//entire system down.
connection.setupIOstreams();//如果还没建立到服务器端的连接,需连接到服务器,并发送rpcHeader和ConnectionHeader
return connection;
}
如上,不存在连接,通过Connection构造方法构造新的连接对象,然后addCall
添加到有效的连接中,最终通过setupIOstreams
建立连接并发送头部数据,开启连接线程,接收响应。
Connection构造
1 | public Connection(ConnectionId remoteId) throws IOException { |
addCall
获取connection后添加Call对象至connection中:1
2
3
4
5
6
7private synchronized boolean addCall(Call call) {
if (shouldCloseConnection.get())//连接应该关闭,不能添加至该连接
return false;
calls.put(call.id, call);
notify();//唤醒在run方法中因为calls为空而在休眠等待的connection线程
return true;
}
如上,addCall可能因为对应的connection要关闭了,而不能添加成功,添加失败时另外查找或创建新的连接,因此需要在do..while循环中,保证循环结束时Call已经添加到一个可用的正常的连接中了。
setupIOstreams
由上,获取正常连接后,通过setupIOstreams
连接到服务器,并发送rpcHeader和ConnectionHeader,当然,如果已经连接了,便不需要再连接了。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37private synchronized void setupIOstreams() throws InterruptedException {
if (socket != null || shouldCloseConnection.get()) {
return;
}//socket已经存在,表示已经建立了到服务器端的连接,并发送了rpcHeader和ConnectionHeader,直接返回
try {
...
Random rand = null;
while (true) {
setupConnection();//连接到远程服务器,每次connect最多20s,最多尝试45次,每次重试间隔1s,即如果连接不上,要等待(20+1)s*45=15min 45s才放弃
InputStream inStream = NetUtils.getInputStream(socket);//socket输入流,读取响应
OutputStream outStream = NetUtils.getOutputStream(socket);//socket输出流,发送数据
writeRpcHeader(outStream);//建立了连接,向服务器写rpcHeader数据,包括4个字节魔数(hrpc),1个字节版本号(4),1个字节鉴权方法码
...//鉴权相关
//socket输入流包裹了PingInputStream,在读操作超时时,如果没有设置rpcTimeout,则周期性的(默认1min)发送ping数据到服务器,保持连接,
//而如果设置了rpcTimeout,则会抛出异常,关闭连接,具体见[概述]PingInputStream
this.in = new DataInputStream(new BufferedInputStream(new PingInputStream(inStream)));
this.out = new DataOutputStream(new BufferedOutputStream(outStream));
writeHeader();//向服务器写连接头,包括协议,用户信息,鉴权方法
// update last activity time
touch();//更新本连接上次活动时间lastActivity
// start the receiver thread after the socket connection has been set up
start();//开启本线程
return;
}
} catch (Throwable t) {
if (t instanceof IOException) {//连接服务器异常,或者写rpcHeader和连接头异常
markClosed((IOException)t);//设置shouldCloseConnection为true,然后唤醒阻塞在该连接上的线程(此时应该还没有)
} else {
markClosed(new IOException("Couldn't set up IO streams", t));
}
close();//关闭连接,从connections中注销,清理资源
}
}
如上,由setupConnection
连接到服务器,然后writeRpcHeader
写rpc头,writeHeader
写连接头,创建输入输出流之后开启本线程,接收响应。
连接的输入流包裹了PingInputStream,其read方法如下:1
2
3
4
5
6
7
8
9public int read() throws IOException {
do {
try {
return super.read();
} catch (SocketTimeoutException e) {
handleTimeout(e);
}
} while (true);
}
read超时时,由handleTimeout
处理超时,而超时时间如果在getProxy中设置rpcTimeout了大于0,则为rpcTimeout,此时pingInterval也为rpcTimeout,否则超时时间为pingInterval(ipc.ping.interval,默认1min),超时时间设置见后面setupConnection
分析。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17private void handleTimeout(SocketTimeoutException e) throws IOException {
if (shouldCloseConnection.get() || !running.get() || rpcTimeout > 0) {//如果rpcTimeout大于0,直接抛出异常
throw e;
} else {//不应该关闭当前连接且rpcTimeout为0,则周期性发送ping到服务器端
sendPing();
}
}
private synchronized void sendPing() throws IOException {
long curTime = System.currentTimeMillis();
if ( curTime - lastActivity.get() >= pingInterval) {//应该发送ping
lastActivity.set(curTime);
synchronized (out) {
out.writeInt(PING_CALL_ID);
out.flush();
}
}
}
如上连接超时时,本端连接不应该关闭,如果rpcTimeout大于0,则直接抛出异常,否则发送ping包到服务器,而如果服务器端连接已经关闭,则发送ping时也会抛出异常,异常传递到read方法。因此超时时如果rpcTimeout大于0,或者对端关闭了连接,异常都会传递到read方法。
后面分析客户端Connection线程接收响应的时,在read*操作上如果抛出异常,则会关闭客户端连接。
setupConnection
1 | private synchronized void setupConnection() throws IOException { |
如上通过SocketFactory(所属Client对应的)创建Socket,然后连接到服务器,每次连接最多阻塞20s,如果连接超时,则最多重试45次,每次重试间隔1s,因此如果经过(20+1)s*45=15分钟45s仍然连接超时,则抛出异常。而如果不是连接超时造成的异常,则通过Connection的connectionRetryPolicy
进行重试管理,默认重试10次,每次间隔1s。
writeRpcHeader
1 | private void writeRpcHeader(OutputStream outStream) throws IOException { |
可见,rpcHeader
先写4个字节的魔数hrpc
,然后是1个字节的版本号(4),再接着是1个字节鉴权方法码(SIMPLE 80,KERBEROS 81,DIGEST 82),共计6个字节
writeHeader
1 | private void writeHeader() throws IOException { |
可见,连接头ConnectionHeader,会先写序列化后所占长度4个字节,然后是header的实际数据,序列化中至少会写出协议,然后根据用户和鉴权方法写相应的值,具体键ConnectionHeader的序列化方法。
服务器
由上,建立连接时,客户端最终通过socket connect服务器,连接后,发送rpcHeader和ConnectionHeader,来看看对应服务器怎么处理。
服务器由RPC.getServer
创建服务器后,调用start
方法启动listener,handler,responder多个线程。
处理客户端到来连接请求以及数据请求由listener负责。
由上服务器的创建getServer
过程,创建listener
对象时,相应的创建了readThreads
个Reader线程,每个Reader有一个Selector。
同时,listener的ServerSocketChannel成员acceptChannel
注册在Selector成员selector
上,且注册为Accept操作。
因此成员selector
负责到来的连接请求,检测到有连接请求到来时,由acceptChannel执行accept获取SocketChannel得到连接。然后选择一个Reader,将获得的SocketChannel注册到Reader的选择器上,注册Read操作。
因此,listener的成员selector
负责到来的连接请求,而readers
负责连接上的数据读取。
接受连接
客户端通过socket的connect请求连接,服务器端创建listener时acceptChannel
注册在selector
上,注册Accept操作,然后在listener线程主程序中监听到来的连接请求。
listener线程主程序如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40public void run() {
LOG.info(getName() + ": starting");
SERVER.set(Server.this);
while (running) {
SelectionKey key = null;
try {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
doAccept(key);//有连接请求,doAccept处理
}
} catch (IOException e) {
}
key = null;
}
}
...//连接数过多导致内存不足或者其他异常,关闭当前连接或者清理所有连接,然后休眠1min
}
LOG.info("Stopping " + this.getName());
//退出时资源清理,包括关闭通道,选择器,关闭所有的连接
synchronized (this) {
try {
acceptChannel.close();//关闭通道
selector.close();//关闭选择器
} catch (IOException e) { }
selector= null;
acceptChannel= null;
// clean up all connections
while (!connectionList.isEmpty()) {//关闭所有连接
closeConnection(connectionList.remove(0));
}
}
}
因此,客户端有到来的连接时,最终由doAccept
处理,传入的key为acceptChannel
注册在selector
上获取的SelectionKey
。
doAccept
1 | void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { |
从acceptChannel
中accept,获得SocketChannel,然后选择一个Reader,注册在Reader的选择器上,注册读操作,并创建该SocketChannel对应的Connection对象,将Connection对象添加到注册所得SelectionKey的附件中。
其中的reader.startAdd
,如果该Reader线程阻塞在select操作上,唤醒它,然后将adding
至为true,导致线程睡眠1
2
3
4public void startAdd() {
adding = true;
readSelector.wakeup();
}
对应的Reader线程主程序:1
2
3
4
5
6
7
8
9
10public void run() {
synchronized (this) {
while (running) {
SelectionKey key = null;
try {
readSelector.select();//如果当前线程阻塞在此,readSelector.wakeup将唤醒,直接返回
while (adding) {//adding置true,等待1s,等待添加完成
this.wait(1000);
}
...
添加完成后的finally中reader.finishAdd
,通知睡眠的Reader线程,并将adding置false1
2
3
4public synchronized void finishAdd() {
adding = false;
this.notify();//唤醒在wait中的线程
}
这样,服务器端便处理了客户端的connect请求,创建SocketChannel对象,注册到Reader线程的选择器中,创建Connection对象,附加到SelectionKey中,然后由Reader操作负责读取带来的数据请求。
处理头数据
由上,客户端连接上服务器后,会先发送rpcHeader
和ConnectionHeader
给服务器,接下来看看服务器怎么处理。
如上,服务器对连接上的数据请求由Reader线程处理,Reader线程的主程序:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public void run() {
LOG.info("Starting SocketReader");
synchronized (this) {
while (running) {
SelectionKey key = null;
try {
readSelector.select();//select操作
while (adding) {//正在添加SocketChannel,休眠,添加完后会被唤醒
this.wait(1000);
}
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isValid()) {
if (key.isReadable()) {
doRead(key);//处理key对应的数据请求
}
}
key = null;
}
}
...//异常处理,进行日志记录
}
}
}
如上,开始部分对应建立连接过程中添加SocketChannel到Reader中的逻辑,上面已经分析过。之后对于到来的数据请求,通过doRead
处理,key为SocketChannel注册到选择器上的key,且包含附件Connection。
doRead
其实doRead操作不仅处理头数据,还处理实际的调用请求数据,这里我们先分析头数据的处理过程。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29void doRead(SelectionKey key) throws InterruptedException {
int count = 0;
Connection c = (Connection)key.attachment();//获取附件,连接对象
if (c == null) {//没有附件,异常,直接返回
return;
}
c.setLastContact(System.currentTimeMillis());//更新客户端和服务器通信时间lastContact
try {
count = c.readAndProcess();//读数据并处理
} catch (InterruptedException ieo) {//中断异常,在Reader的主程序中会进行日志记录
LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
throw ieo;
} catch (Exception e) {//鉴权失败异常导致关闭连接
LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
count = -1; //so that the (count < 0) block is executed
}
if (count < 0) {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": disconnecting client " + c + ". Number of active connections: "+ numConnections);
closeConnection(c);
c = null;
}
//正常情况,更新通信时间。这里注意,count大于0时只表明读取到了数据,可能为部分数据还未处理,需要等待下一次读取操作,也可能
//读取了一个请求的所有数据,并进行了处理。
else {
c.setLastContact(System.currentTimeMillis());
}
}
readAndProcess
readAndProcess根据标识rpcHeaderRead
和headerRead
两个标识,分别进行不同的处理。
若rpcHeaderRead
为false,则应该先读取并处理rpcHeader
数据。当rpcHeaderRead
为true时,若headerRead
为false,则还需读取并处理ConnectionHeader数据。当rpcHeader
和headerRead
都为true时,则进入到实际调用请求数据的处理过程。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111public int readAndProcess() throws IOException, InterruptedException {
while (true) {
/* Read at most one RPC. If the header is not read completely yet
* then iterate until we read first RPC or until there is no data left.
*/
int count = -1;
if (dataLengthBuffer.remaining() > 0) {//dataLengthBuffer为4个字节
//读取数据到缓冲区dataLengthBuffer中,尽可能填充完缓冲区剩余部分,不过不保证一定能填充完,未填充完时直接返回,等待下一次读取操作
count = channelRead(channel, dataLengthBuffer);
if (count < 0 || dataLengthBuffer.remaining() > 0) //读取错误或者没有填充完缓冲区,返回
return count;
}
if (!rpcHeaderRead) {//还未处理rpcHeader
//Every connection is expected to send the header.
//2字节缓冲,rpcHeader总共6字节,魔数4字节现在保存在dataLengthBuffer中,rpcHeaderBuffer保存版本号和鉴权方法码
if (rpcHeaderBuffer == null) {
rpcHeaderBuffer = ByteBuffer.allocate(2);
}
count = channelRead(channel, rpcHeaderBuffer);
if (count < 0 || rpcHeaderBuffer.remaining() > 0) {//同上,缓冲区读完才会继续处理,否则直接返回读取到的大小
return count;
}
int version = rpcHeaderBuffer.get(0);//第一字节为版本号(4)
byte[] method = new byte[] {rpcHeaderBuffer.get(1)};//第二个字节为鉴权方法码
authMethod = AuthMethod.read(new DataInputStream( new ByteArrayInputStream(method)));
dataLengthBuffer.flip();//
if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {//验证魔数hrpc和版本号,正确的话才会继续处理,否则关闭连接
//Warning is ok since this is not supposed to happen.
LOG.warn("Incorrect header or version mismatch from " +
hostAddress + ":" + remotePort +
" got version " + version +
" expected version " + CURRENT_VERSION);
return -1;//返回-1时,关闭连接
}
dataLengthBuffer.clear();
if (authMethod == null) {
throw new IOException("Unable to read authentication method");
}
if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
AccessControlException ae = new AccessControlException("Authorization ("
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION
+ ") is enabled but authentication ("
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION
+ ") is configured as simple. Please configure another method "
+ "like kerberos or digest.");
//鉴权失败,发送响应,对应的Call id为-1,状态为致命错误,发送异常类和异常信息。客户端读取到该响应时关闭连接
setupResponse(authFailedResponse, authFailedCall, Status.FATAL,null, ae.getClass().getName(), ae.getMessage());
responder.doRespond(authFailedCall);//添加到响应队列中,并相应处理
throw ae;
}
if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
doSaslReply(SaslStatus.SUCCESS, new IntWritable(
SaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null);
authMethod = AuthMethod.SIMPLE;
// client has already sent the initial Sasl message and we
// should ignore it. Both client and server should fall back
// to simple auth from now on.
skipInitialSaslHandshake = true;
}
if (authMethod != AuthMethod.SIMPLE) {
useSasl = true;
}
rpcHeaderBuffer = null;
rpcHeaderRead = true;//正常情况读完rpcHeaderRead并验证
continue;
}
if (data == null) {
dataLengthBuffer.flip();
dataLength = dataLengthBuffer.getInt();//处理完了rpcHeaderRead,则dataLengthBuffer中保存的为接下来的数据长度或ping数据(-1)
if (dataLength == Client.PING_CALL_ID) {//客户端发送的ping数据-1
if(!useWrap) { //covers the !useSasl too
dataLengthBuffer.clear();
return 0; //ping message,返回0时,会更新连接时间lastContact
}
}
//这之后的便是数据长度了
if (dataLength < 0) {
LOG.warn("Unexpected data length " + dataLength + "!! from " +
getHostAddress());
}
data = ByteBuffer.allocate(dataLength);//根据数据长度分配data缓冲区
}
count = channelRead(channel, data);//从通道尽可能读取足够填充缓冲区剩余空间的数据
if (data.remaining() == 0) {//缓冲区没有剩余空间了,即发送过来的一个帧的数据读取玩,开始处理
dataLengthBuffer.clear();
data.flip();
if (skipInitialSaslHandshake) {
data = null;
skipInitialSaslHandshake = false;
continue;
}
boolean isHeaderRead = headerRead;
if (useSasl) {//简单鉴权,这里不分析
saslReadAndProcess(data.array());
} else {//处理一个帧数据
processOneRpc(data.array());
}
data = null;
if (!isHeaderRead) {
continue;
}
}
//这里隐含缓冲区还有剩余空间,即一个帧数据未读取玩,直接返回读取到的字节数,更新lastContact,等待下一次读取
return count;
}
}
由上,如果rpcHeaderRead为false时,需要处理rpcHeader,这时根据发送端发送过来的rpcHeader格式:4字节魔数hrpc+1字节版本号+1字节鉴权方法码
这是dataLengthBuffer里面保存的便是魔数hrpc,然后分配两个字节大小的rpcHeaderBuffer保存剩下的版本号和鉴权方法码,进行相应的验证。
而如果rpcHeader已经处理了,则一般情况下dataLengthBuffer里面保存的为一个帧数据的长度,根据这个长度分配data缓冲区的大小,然后进行相应长度数据的读取。不过在建立连接后,客户端read服务器响应超时时,客户端会发送ping数据,以保持连接,该ping数据作为一帧数据来看的话为int值-1,所以如果dataLengthBuffer值为-1时,则是客户端的ping数据,这时简单的返回0,然后更新连接时间即可。
所有上面提到的缓冲区都要注意的是,channelRead
会尽量从通道读取数据填满缓冲区,但也不能保证。当缓冲区还有剩余空间时,我们还需等待下一数据读取操作的到来,这时简单的返回读取到的数据大小,更新连接时间即可。相应缓冲区填满后,才能进行相应的处理。
channelRead
方法会尽量从通道中读取足够多的数据以填满缓冲区,但不保证。当缓冲区剩余空间大于NIO_BUFFER_LIMIT
(8KB)时,会循环从通道读取数据到缓冲区,每次读取NIO_BUFFER_LIMIT大小,按照注释是说能够避免JDK在缓冲区增长时分配太多的直接内存缓冲,但是这里应该不会增长,只是读取最大为剩余空间的数据到缓冲区。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30private int channelRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ? channel.read(buffer) : channelIO(channel, null, buffer);
if (count > 0) {
rpcMetrics.incrReceivedBytes(count);
}
return count;
}
//channelRead和channelWrite同时使用,读时writeCh为null,写时readCh为null。
private static int channelIO(ReadableByteChannel readCh, WritableByteChannel writeCh, ByteBuffer buf) throws IOException {
int originalLimit = buf.limit();
int initialRemaining = buf.remaining();
int ret = 0;
while (buf.remaining() > 0) {
try {
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);//一次读取到缓冲区的数据大小,最多8KB
buf.limit(buf.position() + ioSize);//设置新的limit值为position+要读取的大小
ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
if (ret < ioSize) {
break;
}
} finally {
buf.limit(originalLimit);//设为原来的limit值
}
}
int nBytes = initialRemaining - buf.remaining();
return (nBytes > 0) ? nBytes : ret;
}
处理完rpcHeader和ping包后,由processOneRpc
处理到来的数据帧,一帧数据已经保存在data缓冲区中了。1
2
3
4
5
6
7
8
9
10
11
12
13private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
if (headerRead) {
processData(buf);
} else {
processHeader(buf);
headerRead = true;
if (!authorizeConnection()) {
throw new AccessControlException("Connection from " + this
+ " for protocol " + header.getProtocol()
+ " is unauthorized for user " + user);
}
}
}
可见,如果headerRead为false,即还未读取连接头,则通过processHeader
处理连接头,并通过authorizeConnection
根据鉴权方法以及相关信息进行鉴权。否则若连接头已经处理,则通过processData
处理实际调用请求数据。这里在连接过程,先分析连接头的处理,实际的调用请求数据处理见后。
processHeader
1 | private void processHeader(byte[] buf) throws IOException { |
连接的鉴权方法authMethod
在处理rpcHeader时已经初始化了,这里反序列化连接头后,从连接头中获知客户端请求连接的协议,然后初始化该连接的protocol
,以及根据连接头的用户信息,初始化该连接的user
成员。
初始化相关成员后,使用authorizedConnection
对客户端进行鉴权1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24private boolean authorizeConnection() throws IOException {
try {
// If auth method is DIGEST, the token was obtained by the
// real user for the effective user, therefore not required to
// authorize real user. doAs is allowed only for simple or kerberos
// authentication
if (user != null && user.getRealUser() != null
&& (authMethod != AuthMethod.DIGEST)) {
ProxyUsers.authorize(user, this.getHostAddress(), conf);
}
authorize(user, header, getHostInetAddress());
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully authorized " + header);
}
rpcMetrics.incrAuthorizationSuccesses();
} catch (AuthorizationException ae) {
rpcMetrics.incrAuthorizationFailures();
//鉴权异常失败,发送鉴权异常响应,对应Call为-1,状态FATAL,发送异常类和异常信息
setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null, ae.getClass().getName(), ae.getMessage());
responder.doRespond(authFailedCall);
return false;
}
return true;
}
如果鉴权失败,则发送鉴权异常响应,对应状态为FATAL,客户端会关闭连接。而在服务器端,在processOneRpc
中抛出AccessControlException
这个IOException
最终传递到doRead
中的readAndProcess
中,由catch捕获,将count
即读取到的数据大小置为-1,后续处理中关闭服务器端连接。doRead中IOException异常处理见上。
因此,我们这里分析了服务器端连接建立的过程,由监听器listener监听到来的连接,并注册到Reader线程的选择器中,由Reader线程处理到来的数据请求。
客户端数据到来时,与客户端发送数据流程对应的,服务器端先处理rpcHeader,判断魔数和版本号是否相符,不相符的话给客户端发送鉴权失败异常响应,该响应对应的Call对象固定,其id为-1,状态为FATAL,客户端将会关闭连接,而服务器置读取到的数据count为-1,也关闭连接。正常的话初始化连接的authMethod成员。这里注意魔数hrpc长度为4个字节,刚好保存在后续用来保存一个帧数据长度的缓冲区dataLengthBuffer中。
处理完rpcHeader之后,便是对一帧数据的处理。一帧数据可能为ping数据,为4个字节值为-1,这时简单的置读取到数据count为0,更新连接时间lastContact。
否则dataLengthBuffer便是一帧数据的长度,根据该长度分配data缓冲区。
此时一帧数据可能为连接头,可能为实际的调用请求数据。没有读取连接头的话,则接下来的一帧数据为连接头。
对连接头进行处理,读取发送过来的连接头初始化该连接的protocol
和user
,然后根据不同的鉴权方式进行鉴权,鉴权失败,发送Call id为-1的响应,相应状态为FATAL,客户端关闭连接,而这里服务器抛出IOException,在doRead的异常处理中置count
为-1,关闭服务器端连接。
处理完这些信息后,连接建立了,鉴权完成了,可以在该连接上进行正常的调用请求了。具体过程见下篇。