RPC源码阅读---源码分析下篇

Hadoop版本:Hadoop-1.2.1
参考:《Hadoop技术内幕-深入解析Hadoop Common和HDFS架构设计与实现原理》,后文简称技术内幕
RPC中用到的通道和选择器另见通道和选择器,缓冲区另见缓冲区,以及序列化(主要是ObjectWritable)另见序列化
下文所说的[概述]为上篇RPC源码分析—概述
本文不涉及SIMPLE,DIGEST,KERBEROS三种鉴权方式的鉴权过程分析


本文所属RPC源码分析,按照调用请求的步骤对源码进行分析,限于篇幅分为上下两篇。

上篇分析了”客户端代理创建”,”服务器创建启动”,”客户端服务器连接建立”,”客户端头数据发送”,”服务器对头数据验证,鉴权”,至此连接已经建立验证,可以发送正常的调用请求了。上篇另见RPC源码分析上篇

而下篇分析了”客户端调用请求”,”服务器对调用请求的处理响应”,”客户端接收响应”,”客户端服务器连接的关闭”,”客户端关闭”,”服务器关闭”等过程,本文即为下篇。


1. 方法调用请求和响应

上篇<建立连接>部分分析了建立连接过程中,客户端的连接请求和头数据发送,服务器端的连接接收以及头数据处理鉴权等过程。

对于客户端来说getConnection返回时表明连接已经创建,且头数据已经发送,而关于服务器对头数据的异常响应在客户端Connection线程中。
对getConnection要注意的是,复用Connection时,如果连接已经建立且头数据已经发送setupIOstreams直接返回。
而对于服务器端来说,监听器的Reader线程已经处理了头数据,接下来等待实际的调用请求。

下面分析客户端的实际调用请求和服务器对请求的响应

1.1 客户端调用请求

由上,getConnection之后,通过连接对象的sendParam方法发起调用请求,具体见前面分析

1
connection.sendParam(call);

这里call为封装了Invocation的客户端调用对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void sendParam(Call call) {
if (shouldCloseConnection.get()) {
return;
}//对应的服务器端鉴权失败,客户端连接关闭,这里便取消发送调用请求

DataOutputBuffer d=null;
try {
synchronized (this.out) {//同步,使用该连接的输出流out,保证一次能够完整的输出一帧数据,而不会乱序
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);

//for serializing the
//data to be written
d = new DataOutputBuffer();
d.writeInt(call.id);//id暂存到缓冲区
call.param.write(d);//方法,参数信息的Invocation对象序列化暂存到缓冲区
byte[] data = d.getData();
int dataLength = d.getLength();
out.writeInt(dataLength); //first put the data length,先写缓冲区数据长度,即一帧数据大小
out.write(data, 0, dataLength);//write the data,然后写实际一帧的数据
out.flush();
}
} catch(IOException e) {
markClosed(e);//写操作异常,服务器连接可能关闭,唤醒等待读取响应的该连接线程,并关闭
} finally {
//the buffer is just an in-memory buffer, but it is still polite to
// close early
IOUtils.closeStream(d);
}
}

如上,sendParam比较简单,一帧实际调用请求数据的输出格式为:
数据长度+实际数据(id+Invocation对象)
即数据长度,然后是客户端Call对象的id,服务器创建Call对象是id值便为此值,再就是包含请求的方法,方法参数信息的Invocation对象,服务器端只需相应的创建Invocation对象,然后反序列化即可。
需要注意的是,使用连接对象的输出流out进行的同步操作,这样能够保证多个线程使用sendParam发送调用请求时,一次能够完整的发送一整帧数据,而不会乱序。
同时,写操作异常时,需要关闭连接。

调用请求发送完后,当前调用线程便阻塞等待,直到对应的Connection对象通知响应已经到达,具体见前面分析

1
call.wait();

而响应到达时的通知,唤醒该睡眠线程见后面分析。

1.2 服务器端调用请求的响应

1.2.1 监听器处理

继续承接前文监听器listener的Reader线程对到来数据的处理,前面已经分析了到来数据rpcHeaderConnectionHeader和ping数据的处理过程。
具体到实际调用请求的数据处理,由前文可知由processData方法负责处理,调用链为:
doRead(key)->readAndProcess()->processOneRpc(data)->processData(buf)
此时,headerRead为true。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void processData(byte[] buf) throws  IOException, InterruptedException {
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf));
int id = dis.readInt(); // try to read an id,读客户端Call对象id

if (LOG.isDebugEnabled())
LOG.debug(" got #" + id);

Writable param = ReflectionUtils.newInstance(paramClass, conf);//read param,创建Invocation对象
param.readFields(dis);//反序列化发送过来的Invocation

Call call = new Call(id, param, this);//创建服务器端Call对象
callQueue.put(call); // queue the call; maybe blocked here,放入阻塞队列callQueue,等待处理器handler处理
incRpcCount(); // Increment the rpc count,增加处理的rpc计数值
}

根据客户端实际调用请求发送数据格式,会先发送一帧数据的长度,这个长度在processData之前读取在dataLengthBuffer中,具体见前面分析。根据这个长度构建的data缓冲区,然后读取data缓冲区大小的数据,这里的buf参数便是data缓冲区中的数据。
客户端在发送长度之后,便是Call对象的id,因此先读取4个字节的id,然后是包含调用的方法,方法参数信息的Invocation对象,服务器同样的创建一个Invocation对象,进行反序列化。
之后构建服务器端的Call对象,与客户端的Call对象相对应,不过服务器端多了该Call对象所属的Connection这一成员。
创建的Call对象放入阻塞队列callQueue中等待handler进行处理,很明显的”生产者-消费者”模型。

1.2.2 处理器处理

1.2.2.1 主程序

监听器,处理器和响应器都实在服务器创建后调用start启动的,处理器主程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Override
public void run() {
LOG.info(getName() + ": starting");
SERVER.set(Server.this);
ByteArrayOutputStream buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
while (running) {
try {
final Call call = callQueue.take(); // pop the queue; maybe blocked here,从阻塞队列中取出一个Call对象

if (LOG.isDebugEnabled())
LOG.debug(getName() + ": has #" + call.id + " from " + call.connection);

String errorClass = null;
String error = null;
Writable value = null;

CurCall.set(call);//取出的Call对象保存在当前处理器线程的线程局部变量CurCall中
//本地实现方法调用
try {
if (call.connection.user == null) {
value = call(call.connection.protocol, call.param, call.timestamp);
} else {
value = call.connection.user.doAs(new PrivilegedExceptionAction<Writable>() {
@Override
public Writable run() throws Exception {
return call(call.connection.protocol, call.param, call.timestamp);
}
});
}
}catch (Throwable e) {
...//日志记录
errorClass = e.getClass().getName();//调用异常类
error = StringUtils.stringifyException(e);//异常信息
}
CurCall.set(null);//调用完成,清空CurCall
synchronized (call.connection.responseQueue) {//写到responseQueue中,同步
//将调用结果(包括状态,正确返回值或异常)设置为call的response成员
setupResponse(buf, call, (error == null) ? Status.SUCCESS : Status.ERROR, value, errorClass, error);
if (buf.size() > maxRespSize) {//上次响应太大,重新分配小的临时缓冲,以释放大块内存,因为一般来说响应都是比较小的
LOG.warn("Large response size " + buf.size() + " for call " + call.toString());
buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);//重新分配10KB
}
responder.doRespond(call);//将包含响应的Call对象添加到关联连接的响应队列responseQueue中,并做相应的处理
}
}
...//异常记录
}
LOG.info(getName() + ": exiting");
}

如上,处理器每次从callQueue阻塞队列中拿出一个待处理的Call对象,然后执行本地实现的调用处理。不管是鉴权方式还是普通的调用,最终都通过call方法执行方法的调用,调用结果可能为正常的返回值,此时封装在ObjectWritable中,可能发生异常,此时异常类记录在errorClass,异常信息记录在error中,然后将返回值或异常设置为call的响应,添加到连接的响应队列中等待处理。

1.2.2.2 call

对应为实际本地实现的方法调用过程,org.apache.hadoop.ipc.Server中方法如下

1
public abstract Writable call(Class<?> protocol, Writable param, long receiveTime) throws IOException;

为抽象方法,我们创建服务器时是创建RPC.Server即该类的实现类,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public Writable call(Class<?> protocol, Writable param, long receivedTime) throws IOException {
try {
Invocation call = (Invocation)param;//Invocation对象
if (verbose) log("Call: " + call);

Method method = protocol.getMethod(call.getMethodName(),call.getParameterClasses());
method.setAccessible(true);//设置方法可访问

long startTime = System.currentTimeMillis();
Object value = method.invoke(instance, call.getParameters());//反射调用实例instance上的方法method,参数在Invocation
int processingTime = (int) (System.currentTimeMillis() - startTime);
int qTime = (int) (startTime-receivedTime);
if (LOG.isDebugEnabled()) {//记录该Call从接收到开始处理即在队列中的时间,以及处理时间
LOG.debug("Served: " + call.getMethodName() + " queueTime= " + qTime + " procesingTime= " + processingTime);
}
...//度量记录
//将返回值封装成ObjectWritable返回
return new ObjectWritable(method.getReturnType(), value);

}
...//调用方法异常,进行日志记录并抛出,对应在handler线程中也会进行相应的日志记录,同时会记录异常类errorClass和异常信息error,将
//errorClass和error作为响应发送,状态为ERROR
}

如上,从Invocation对象中取出调用的方法和参数信息,在我们创建服务器getServer时指定的实现协议的实例instance调用相应的方法即可,不过调用是通过反射执行。如果成功执行得到返回值,将返回值封装成ObjectWritable返回,而执行时出现异常则进行日志记录,并抛到上层,在handler线程中,记录日志并记录异常类和异常信息,最终会将异常类和异常信息作为响应发送到客户端,就像本地调用方法出现异常一样。

call返回后,可能得到正常返回值,可能得到异常。接下来通过setupResponse把正常返回值或异常作为响应添加到Call对象中,有了响应的Call对象便可以发送给客户端了,添加至连接的响应队列中进行处理。

1.2.2.3 setupResponse

调用结果添加至Call对象的响应成员中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void setupResponse(ByteArrayOutputStream response, Call call, Status status, Writable rv, String errorClass, String error) 
throws IOException {
response.reset();//重置ByteArrayOutputStream中字节数组,等待写入
DataOutputStream out = new DataOutputStream(response);
out.writeInt(call.id); // write call id,先写Call的id
out.writeInt(status.state); // write status,再写调用的状态,成功返回为SUCCESS(值为0),异常为ERROR(值为1)

if (status == Status.SUCCESS) {//成功返回,写返回值ObjectWritable的序列化数据
rv.write(out);
} else {//异常情况,先写异常类,再写异常信息
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
}
if (call.connection.useWrap) {
wrapWithSasl(response, call);
}
call.setResponse(ByteBuffer.wrap(response.toByteArray()));//响应数据设置为call的成员
}

如上,响应格式为:

1
2
3
4
//正常返回:
Call对象id+0(SUCCESS)+ObjectWritable(返回值)序列化
//发生异常
Call对象id+1(ERROR)+异常类(字符串序列化)+异常信息(字符串序列化)

还有另外一种响应FATAL,前面已经介绍过了,发生在客户端发送过来头部数据,服务器鉴权失败时。这时,对应的Call对象是固定的,其id为-1,响应格式:

1
-1(id)+-1(FATAL)+异常类(字符串序列化)+异常信息(字符串序列化)

1.2.2.4 doRespond

将包含响应的Call对象添加到连接的响应队列中

1
2
3
4
5
6
7
8
void doRespond(Call call) throws IOException {
synchronized (call.connection.responseQueue) {
call.connection.responseQueue.addLast(call);//添加到队列尾

if (call.connection.responseQueue.size() == 1) {//如果队列中只有一个待发送响应的Call对象,直接在本线程中发送
processResponse(call.connection.responseQueue, true);//发送call响应
}
}
}

如上,将包含响应的Call对象添加到关联连接的响应队列responseQueue队列尾,如果队列中只有一个元素,则直接在本线程(handler)中发送响应,而不是等待响应器responder发送。

1.2.2.5 processResponse

处理响应队列中待发送响应的Call对象,该方法在handler和responder中都可能被调用,当响应队列中只有一个元素时,直接在handler线程中发送该响应,节省了切换到responder线程的开销。同时,当在handler中响应没有发送完,才会将连接对应的通道注册到responder的选择器中,注册选操作。

在handler线程中的响应发送,对应的inHandler参数为true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
private boolean processResponse(LinkedList<Call> responseQueue, boolean inHandler) throws IOException {
boolean error = true;
boolean done = false; // there is more data for this channel.
int numElements = 0;
Call call = null;
try {
synchronized (responseQueue) {
numElements = responseQueue.size();
if (numElements == 0) {
error = false;
return true; //连接上没有待发送的响应
}
call = responseQueue.removeFirst();//获取响应队列的首元素
SocketChannel channel = call.connection.channel;
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": responding to #" + call.id + " from " + call.connection);
}
//尽可能将call.response响应数据全部写到通道,但不保证
int numBytes = channelWrite(channel, call.response);
if (numBytes < 0) {
return true;
}
if (!call.response.hasRemaining()) {//该响应已经全部发送玩
call.connection.decRpcCount();//减少该连接的rpc计数
if (numElements == 1) { // last call fully processes.
done = true; // no more data for this channel.该连接所有响应已经发送完
} else {
done = false; // more calls pending to be sent.该连接还有未发送的响应
}
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": responding to #" + call.id + " from " + call.connection + " Wrote " + numBytes + " bytes.");
}
} else {
//响应数据这次没有发送完,添加到响应队列的队列首,等待下次发送。如果是在handler线程中发送的响应,则还需将连接对应通道注册到
//responder选择器中,而如果是在responder中发送,则已经注册过了,无需再注册

call.connection.responseQueue.addFirst(call);//添加到队列首

if (inHandler) {
// set the serve time when the response has to be sent later
call.timestamp = System.currentTimeMillis();

//如果responder线程阻塞在select操作,唤醒,然后休眠等待通道注册

incPending();//增加排队计数,这会造成responder在下次循环中等待通道注册
try {
writeSelector.wakeup();//如果responder阻塞在select操作上,唤醒
//注册通道在responder选择器上,写操作,call对象作为附件附加在SelectionKey上
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
} catch (ClosedChannelException e) {
//Its ok. channel might be closed else where.
done = true;
} finally {
decPending();//注册完成,减少排队计数,唤醒等待注册完成的responder线程
}
}
}
error = false; // everything went off well
}
} finally {//发送响应过程中出错,关闭连接,可能情况为channelWrite过程中服务器关闭,中断该线程
if (error && call != null) {
LOG.warn(getName()+", call " + call + ": output error");
done = true; // error. no more data for this channel.
closeConnection(call.connection);
}
}
return done;
}

如上,该函数返回true时,表明连接的响应队列中没有待发送的响应(已经全部发送完)。
从响应队列首中取出待发送响应的Call对象,通过channelWrite尽可能的将响应发送到客户端,如果发送完了,减少连接上的rpc计数,连接上没有待发送响应时,直接返回true。
channelWrite和之前介绍过得channelRead类似,一次最多往通道写NIO_BUFFER_LIMIT即8KB大小的数据

1
2
3
4
5
6
7
8
private int channelWrite(WritableByteChannel channel, ByteBuffer buffer) throws IOException {
int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
channel.write(buffer) : channelIO(null, channel, buffer);
if (count > 0) {
rpcMetrics.incrSentBytes(count);
}
return count;
}

channelIO在前面channelRead中已经分析过了,这里readCh为null。

而如果没发送完,则需等待下次发送。如果现在是在handler线程中响应没有发送完,则将Call添加回响应队列的队列首,同时将通道注册到responder的选择器中,注册写操作,之后新的包含响应的Call对象添加到响应队列时,队列中元素不为1,不会在handler线程中处理,因此注册后的响应写由responder负责。
不过考虑这种情况,注册后由responder负责响应写,而所有响应写完之后客户端暂时没有新的请求,导致响应队列元素为空。过了一段时间,客户端再次请求新的包含响应的Call对象添加到响应队列,添加后队列元素为1,应该由handler线程处理,那这时responder还是注册了通道的写操作就会有问题了,因此在responder中,如果连接上所有的响应发送完,要取消该连接通道上的写操作注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void doAsyncWrite(SelectionKey key) throws IOException {
Call call = (Call)key.attachment();//通过SelectionKey的附件获取Call对象
if (call == null) {
return;
}
if (key.channel() != call.connection.channel) {
throw new IOException("doAsyncWrite: bad channel");
}

synchronized(call.connection.responseQueue) {//call对象关联的
if (processResponse(call.connection.responseQueue, false)) {//返回true时,表示连接上所有响应已经发送完
try {
key.interestOps(0);//取消写操作的注册
}
...//异常记录
}
}
}

该方法在responder线程的主程序中被调用,因此如果已经通道已经注册了responder上选择器写操作,由responder负责连接上响应的发送时,当所有响应发送完后,需要取消写操作的注册,以便下一个Call对象添加到响应队列中时,能够在handler线程中处理并做相应的注册。

而通道的注册,通过incPendingwriteSelector.wakeup的配合暂停responder的工作,注册时将Call对象作为附件附加在SelectionKey上,注册完后通过decPending唤醒responder线程,具体的注册过程见后面responder的主程序分析。

如果现在是在responder线程中响应没有发送完,则将Call添加回响应队列首即可,因为已经注册了。

1.2.3 响应器处理

如上分析,对于包含响应的Call对象,添加到连接的相应队列中后,可能在handler线程中直接处理,而如果handler线程处理不过来,则需要将通道注册到responder选择器中,然后交由responder处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Override
public void run() {
LOG.info(getName() + ": starting");
SERVER.set(Server.this);
long lastPurgeTime = 0; // last check for old calls.

while (running) {
try {
waitPending(); //如果现在有通道正在注册,休眠等待
writeSelector.select(PURGE_INTERVAL);//最多阻塞15min
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
try {
if (key.isValid() && key.isWritable()) {//通道可写
doAsyncWrite(key);//写相应通道上的响应
}
} catch (IOException e) {
LOG.info(getName() + ": doAsyncWrite threw exception " + e);
}
}
long now = System.currentTimeMillis();
if (now < lastPurgeTime + PURGE_INTERVAL) {
continue;//还没到清理时间,继续
}
lastPurgeTime = now;//应该清理了,更新lastPurgeTime

LOG.debug("Checking for old call responses.");
ArrayList<Call> calls;

// get the list of channels from list of keys.
synchronized (writeSelector.keys()) {//获取注册时SelectionKey的合适附件Call对象
calls = new ArrayList<Call>(writeSelector.keys().size());
iter = writeSelector.keys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
Call call = (Call)key.attachment();
if (call != null && key.channel() == call.connection.channel) {
calls.add(call);
}
}
}

for(Call call : calls) {
try {
doPurge(call, now);//清理该Call所属连接上的相应长时间未发送的Call对象
} catch (IOException e) {
LOG.warn("Error in purging old calls " + e);
}
}
} catch (OutOfMemoryError e) {
//
// we can run out of memory if we have too many threads
// log the event and sleep for a minute and give
// some thread(s) a chance to finish
//
LOG.warn("Out of Memory in server select", e);
//内存不足,休眠1min等待资源释放
try { Thread.sleep(60000); } catch (Exception ie) {}
} catch (Exception e) {
LOG.warn("Exception in Responder " + StringUtils.stringifyException(e));
}
}
LOG.info("Stopping " + this.getName());
}

如上,首先会进行可能的休眠状态,以等待通道的注册。注册完成后,使用doAsyncWrite对每一个可写通道,写出一个响应。本次相应处理完后,根据清理周期,对存在长时间没有发送的响应的连接进行清理,关闭该连接,通过doPurge完成。

1.2.3.1 等待通道注册
1
2
3
4
5
private synchronized void waitPending() throws InterruptedException {
while (pending > 0) {//循环判断,一个通道注册完后wait返回,如果还有其他通道再注册,醒来之后继续休眠等待,直到所有注册完成
wait();
}
}

pending为目前正在注册的通道数目,大于0时,休眠等待通道注册,对应为前面的handler中对未发送完响应的注册过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
...//见前面processResponse方法
incPending();
try {
writeSelector.wakeup();
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
} catch (ClosedChannelException e) {
done = true;
} finally {
decPending();
}

private synchronized void incPending() { // call waiting to be enqueued.
pending++;
}

如上,incPending增加pending的值,然后wakeup唤醒可能阻塞在select操作上的responder线程(最多可能阻塞15min呢,所以需要唤醒),下一次循环中便会在waitPending中休眠等待了。
注册完后decPending唤醒等待的responder

1
2
3
4
private synchronized void decPending() { // call done enqueueing.
pending--;
notify();
}

即减少pending值,并使得responder线程从wait中返回,而如果还有其他通道再注册,继续休眠。

1.2.3.2 doAsyncWrite

等待通道注册完成后,便通过doAsyncWrite将可写的响应写到通道中,doAsyncWrite在前面已经分析过了,这里对应的inHandler为false,响应一次没发送完时,无需再次注册。注意的是,连接的响应队列中没有未发送的响应时,取消注册该通道的写操作,以便接下来的包含响应的Cal对象首先在handler中处理然后注册。

由上doAsyncWrite的分析可知,一次doAsyncWrite只处理key所属连接响应队列中的一个待发送响应,所以responder的一次处理中,会对注册在其选择器上的每个可写通道发送一个未发送的响应。

1.2.3.3 doPurge

如果达到清理周期,对注册时每一个合法Call所属的连接,进行可能的清理工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void doPurge(Call call, long now) throws IOException {
LinkedList<Call> responseQueue = call.connection.responseQueue;//call所属连接的响应队列
synchronized (responseQueue) {
Iterator<Call> iter = responseQueue.listIterator(0);
while (iter.hasNext()) {
call = iter.next();
//从接收到调用请求起到现在时间超过了PURGE_INTERVAL(15min),该请求的响应还未发送给客户端,关闭所在的连接
if (now > call.timestamp + PURGE_INTERVAL) {
closeConnection(call.connection);
break;
}
}
}
}

如上,如果call所在连接的响应队列中存在超过清理周期的Call对象,即从接收到该调用请求到现在,已经过去了PURGE_INTERVAL(15min),响应还没发送,则关闭所在的连接。
responder中每进行一次发送响应的操作后,如果到了清理时间,会对每一个连接进行判断,如果连接上存在长时间(15min)未发送的响应,则关闭所在的连接。
关闭连接后,客户端在读取响应的过程中,要么读超时异常,要么发送ping异常(通道关闭了),导致客户端连接关闭,具体已经在前面分析过了。

上面分析了建立连接后,客户端调用请求以及服务器端对请求的响应过程,下面分析客户端的响应接收过程。


2. 客户端接收响应

由前面分析知,客户端建立连接,发送头部数据后,开始发送调用请求数据,调用请求数据发送后,阻塞等待连接线程通知响应已经接收,然后返回。
Connection线程在setupIOstreams,建立连接,发送rpcHeader,发送ConnectionHeader后启动。看Connection线程的主程序

1
2
3
4
5
6
7
8
9
10
11
12
13
public void run() {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": starting, having connections " + connections.size());

while (waitForWork()) {//wait here for work - read or close connection
receiveResponse();
}

close();

if (LOG.isDebugEnabled())
LOG.debug(getName() + ": stopped, remaining connections " + connections.size());
}

如上,waitForWork返回true时,可以读取响应,然后通过receiveResponse读取响应。而waitForWork为false时,退出循环,关闭连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private synchronized boolean waitForWork() {
//连接正常,不过没有远程调用等待接收响应,此时会等待一定时间,若该连接空闲时间超过maxIdleTime,则关闭连接
if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
long timeout = maxIdleTime- (System.currentTimeMillis()-lastActivity.get());
if (timeout>0) {//还未达到最大空闲时间,等待
try {
wait(timeout);
} catch (InterruptedException e) {}
}
}

//连接正常且存在远程调用等待接收响应,返回true,开始接收响应
if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
return true;
} else if (shouldCloseConnection.get()) {//连接被标记为关闭,返回false,关闭
return false;
} else if (calls.isEmpty()) { // idle connection closed or stopped,长时间空闲,关闭连接
markClosed(null);
return false;
} else { // get stopped but there are still pending requests
markClosed((IOException)new IOException().initCause(new InterruptedException()));
return false;
}
}

如上,其实客户端运行且连接没有被标记未关闭的话,如果calls有元素即存在等待读取响应的调用,则返回true,开始接收响应。
而如果该连接长时间(ipc.client.connection.maxidletime,默认10s)空闲,则会通过markClosed关闭连接。
也可以通过其他方式设置shouldCloseConnection为true关闭连接。

当存在等待接收响应的Call对象且连接正常时,通过receiveResponse接收一个响应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void receiveResponse() {
if (shouldCloseConnection.get()) {//如果通知关闭连接,直接返回
return;
}
touch();//更新连接的活动时间lastActivity

try {
int id = in.readInt(); // try to read an id,读Call id

if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + id);

Call call = calls.get(id);//获取等待读取相应的Call对象

int state = in.readInt(); // read call status,读取调用状态
if (state == Status.SUCCESS.state) {//成功,创建ObjectWritable对象,反序列化
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value,反序列化
call.setValue(value);//设置call的响应,并通知之前阻塞等待结果的线程
calls.remove(id);//调用完成,从calls中移除
} else if (state == Status.ERROR.state) {//错误,分别读取异常类和异常信息
//设置call的异常,通知之前阻塞等待结果的线程
call.setException(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in)));
calls.remove(id);//调用完成,从calls中移除
} else if (state == Status.FATAL.state) {//致命错误,鉴权失败时发生,关闭连接
// Close the connection,关闭连接
markClosed(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in)));
}
} catch (IOException e) {//读取过程中发生异常,或者是read超时(rpcTimeout大于0),或者发送ping时异常,服务器连接关闭,因此关闭连接
markClosed(e);
}
}

如上,读取响应的过程,要结合服务器发送相应的格式来看,在前面setupResponse的分析中。
首先读取4个字节Call的id,从当前连接的calls中找到对应的Call对象,然后读取一个字节的状态。成功时,创建ObjectWritable反序列化即为调用返回值,错误时,分别反序列化两个字符串为异常类和异常信息,即为调用过程中抛出的异常,而致命错误时,直接关闭连接,关闭连接的异常也通过序列化异常类和异常信息得到。

在获取到返回值后,通过setValue设置该Call的返回值,而获取到抛出异常后,通过setException设置该Call的异常。两者都会唤醒之前阻塞等待响应的线程(sendParam之后wait)

1
2
3
4
5
6
7
8
9
10
11
12
public synchronized void setValue(Writable value) {
this.value = value;//设置返回值
callComplete();//调用完成,通知阻塞线程
}
public synchronized void setException(IOException error) {
this.error = error;//设置异常
callComplete();//调用完成,通知阻塞线程
}
protected synchronized void callComplete() {
this.done = true;
notify(); // notify caller
}

callComplete要结合之前的调用请求看,在前面<代理方法调用>部分。即通过代理调用方法时,转发到Invoker对象的invoke方法中,最终在Client的call方法中调用,在发送完调用请求后,阻塞等待Connection线程通知响应已经接收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException {
Call call = new Call(param);
Connection connection = getConnection(remoteId, call);
connection.sendParam(call); // send the parameter,发送调用请求
boolean interrupted = false;
synchronized (call) {
while (!call.done) {
try {
call.wait(); // wait for the result,阻塞等待
} catch (InterruptedException ie) {
// save the fact that we were interrupted
interrupted = true;
}
}
...//处理异常和返回值
}
}

因此,callComplete中的notify便通知在对应call对象上调用wait方法的线程,使得从wait方法中返回,而此时call.done为true,从循环中退出。
之后便是对异常和返回值的处理,如果call.error不为null抛出异常,就像本地方法调用抛出异常一样。否则将call.value返回,就像本地方法调用返回值一样。
该call函数为Client在Invoker的invoke函数调用的,是代理转发,这样返回值或者异常便会转发给代理调用方法处,完成远程调用的过程。


3. 连接的关闭

前面分析了整个远程调用过程中客户端和服务器的行为,那么客户端和服务器端的连接什么情况下会关闭呢。

3.1 客户端连接关闭

客户端连接的关闭通过Connection.close()方法完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private synchronized void close() {
if (!shouldCloseConnection.get()) {//应该已经设置了
LOG.error("The connection is not in the closed state");
return;
}

// release the resources
// first thing to do;take the connection out of the connection list
synchronized (connections) {//从客户端维护的connections中移除
if (connections.get(remoteId) == this) {
connections.remove(remoteId);
}
}

// close the streams and therefore the socket
IOUtils.closeStream(out);//关闭输出流
IOUtils.closeStream(in);//关闭输入流
disposeSasl();

// clean up all calls
if (closeException == null) {//这里只有在连接长时间空闲时可能为null,对应在waitForWork中的calls.isEmpty
if (!calls.isEmpty()) {//一般来说calls为空,如果不为空,创建新的closeException
LOG.warn(
"A connection is closed for no cause and calls are not empty");

// clean up calls anyway
closeException = new IOException("Unexpected closed connection");
cleanupCalls();
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("closing ipc connection to " + server + ": " + closeException.getMessage(),closeException);
}
cleanupCalls();//清理所有的call
}
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": closed");
}

如上连接关闭时,首先从客户端的connections中移除,然后关闭输入输出流,关闭socket,最后清理连接管理的所有Call

1
2
3
4
5
6
7
8
private void cleanupCalls() {
Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ;
while (itor.hasNext()) {
Call c = itor.next().getValue();
c.setException(closeException); // local exception,通过setException设置异常,唤醒等待响应的线程,完成调用
itor.remove();
}
}

如上,通过setException设置异常,唤醒等待响应的线程,因此该调用会抛出对应的关闭异常作为调用结果。

在close方法中,shouldCloseConnection为true,其实关闭连接前,都会先调用markClosed方法设置该值为true。看看markClosed在哪里被调用:
markClosed

  • receiveResponse
    • 第一处,读取到远端响应状态为FATAL,致命错误,此时验证或鉴权失败,应该关闭连接,服务器端对应也关闭连接;
    • 第二处,是在读取响应过程中抛出IOException,包含三种可能情况:
      • rpcTimeout大于0时,读操作超时,直接抛出I/O异常,关闭连接;
      • rpcTimeout不大于0时,读操作超时,会发送ping数据维持连接,若发送ping数据异常(对端已关闭连接,可能服务器连接中有长时间15min没发送响应的Call,清理过程中关闭了该连接,或者服务器死了等),抛出I/O异常,关闭连接;
      • 客户端关闭,通过中断连接线程,则读操作中会抛出InterruptedException,关闭连接; 这里客户端关闭导致该线程中断时,不会抛出InterruptedException,因为底层的输入流要么是socket.getInputStream,要么是创建对应的SocketInputStream,读操作会一直阻塞直到上面两种情况发生。所以这里会不会有问题。
  • sendParam
    在写操作中I/O异常,服务器端连接已关闭,可能为清理过程关闭了该连接,或者服务器死了,关闭连接;
  • setupIOstreams
    • 第一处,IOException,可能主要有以下情况:
      • 连接服务器超时次数达到限制;
      • 连接服务器其他异常;
      • 获取输入输出流异常;
      • 发送rpcHeader和ConnectionHeader过程中的写操作异常;
      • 鉴权相关异常;
    • 第二处,其他异常,主要包括鉴权相关的异常以及中断异常;
  • waitForWork
    • 第一处,连接长时间(默认10s)空闲,关闭连接;
    • 第二处,客户端关闭,关闭连接;与上面receiveResponse中处于读操作时,客户端关闭不一样,此时连接线程处于waitForWork的判断逻辑中。

3.2 服务器连接的关闭

服务器连接关闭由closeConnection完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void closeConnection(Connection connection) {
synchronized (connectionList) {//从服务器的连接列表中移除
if (connectionList.remove(connection))
numConnections--;
}
try {
connection.close();//关闭该连接
} catch (IOException e) {
}
}
private synchronized void close() throws IOException {
disposeSasl();
data = null;//释放data缓冲区
dataLengthBuffer = null;//释放dataLengthBuffer缓冲区
if (!channel.isOpen())
return;
try {socket.shutdownOutput();} catch(Exception e) {}
if (channel.isOpen()) {//关闭通道
try {channel.close();} catch(Exception e) {}
}
try {socket.close();} catch(Exception e) {}//关闭socket
}

如上,首先从服务器的连接列表中移除,然后释放使用的缓冲区,关闭通道,关闭socket,即完成连接的关闭。

closeConnection在以下地方被调用:
closeConnection

  • cleanupConnection和closeCurrentConnection
    在listener线程的主程序异常处理程序中使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    while (running) {
    ...//选择器上的select操作等
    catch (OutOfMemoryError e) {
    // we can run out of memory if we have too many threads
    // log the event and sleep for a minute and give
    // some thread(s) a chance to finish
    LOG.warn("Out of Memory in server select", e);
    closeCurrentConnection(key, e);
    cleanupConnections(true);
    try { Thread.sleep(60000); } catch (Exception ie) {}
    } catch (Exception e) {
    closeCurrentConnection(key, e);
    }
    cleanupConnections(false);
    }

    因此,服务器负载太高,内存不足时会关闭当前连接,同时对所有空闲时间超出maxIdleTime的连接进行清理,而对select操作的其他异常,关闭当前连接。此外,每次listener执行完一次连接请求后,会随机对部分连接进行清理,清理空闲时间超出maxIdleTime的连接。
    cleanupConnections详细分析见后面<资源清理>,传入参数为true时,强制清理,且所有连接参与清理,而传入参数为false时,当当前连接数超过thresholdIdleConnections(ipc.client.idlethreshold,默认4000)时进行清理,选取随机个连接参与清理。清理时,对于每一个连接,如果其空闲(rpcCount为0)时间大于maxIdleTime则关闭连接。大于maxIdleTime表明长时间没有收到客户端的心跳,应该关闭连接。

  • doRead
    读取到的数据大小为-1时,关闭连接,可能情况有:
    • 读操作异常,返回-1,此时客户端连接主动关闭了,因此关闭服务器端连接;
    • 读取rpcHeader时,魔数或版本号不匹配,连接关闭;
    • 通过连接头和rpc头信息鉴权失败,连接关闭;
  • listener的run方法
    这里对应监听器退出时的连接清理,即对应服务器的关闭

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    @Override
    public void run() {
    ...//主循环
    LOG.info("Stopping " + this.getName());

    synchronized (this) {
    try {
    acceptChannel.close();
    selector.close();
    } catch (IOException e) { }

    selector= null;
    acceptChannel= null;

    // clean up all connections
    while (!connectionList.isEmpty()) {
    closeConnection(connectionList.remove(0));
    }
    }
    }
  • doPurge
    对应响应队列中存在长时间(15min)未发送出去的响应,此时应该关闭连接,这个在前面分析过了,不再分析;

  • processResponse
    对应处理响应的过程中出现错误,如写响应过程中服务器关闭被中断,关闭连接

4. 服务器资源清理

由前面分析,服务器分别可能在监听器和响应器中对连接进行清理。监听器负责清理长时间空闲(即长时间没收到心跳)的连接,而响应器负责清理响应长时间没发送出去的连接。

4.1 监听器清理连接

监听器在选择操作select异常时,会关闭当前连接,而如果出现内存不足,强制对所有连接进行清理操作。此外,每次循环到达连接数到达清理阈值时,都会随机性的选择一些连接进行清理。
清理操作发生在cleanupConnections

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void cleanupConnections(boolean force) {
if (force || numConnections > thresholdIdleConnections) {//强制性清理不判断连接数,否则只有连接数超过阈值(默认4000)时才进行清理
long currentTime = System.currentTimeMillis();
if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {//非强制性,连接数没有超过阈值,不清理
return;
}
//强制性,选择全部连接清理
int start = 0;
int end = numConnections - 1;
if (!force) {//非强制性,随机选择连接进行清理
start = rand.nextInt() % numConnections;
end = rand.nextInt() % numConnections;
int temp;
if (end < start) {
temp = start;
start = end;
end = temp;
}
}
int i = start;
int numNuked = 0;
while (i <= end) {
Connection c;
synchronized (connectionList) {
try {
c = connectionList.get(i);
} catch (Exception e) {return;}
}
if (c.timedOut(currentTime)) {//如果空闲时间超过maxIdleTime,关闭连接
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
closeConnection(c);
numNuked++;
end--;
c = null;
if (!force && numNuked == maxConnectionsToNuke) break;
}
else i++;
}
lastCleanupRunTime = System.currentTimeMillis();//更新上次清理时间
}
}

如果是强制性清理,则不管当前连接数都进行清理(发生在内存不足),且清理所有的连接。而非强制性清理,只有在连接数超过阈值(默认4000)时,才进行清理,且随机选择部分连接进行清理。而对每一个待清理的连接,通过timedOut判断是否应该清理

1
2
3
4
5
private boolean timedOut(long currentTime) {
if (isIdle() && currentTime - lastContact > maxIdleTime)//空闲,且上次和客户端通信时间到目前超过了maxIdleTime
return true;
return false;
}

因此如果当前连接没有rpc调用(rpcCount为0),且从上次和客户端通信到现在超过了maxIdleTime,则应该关闭连接,表明很长时间没收到客户端的心跳。这里maxIdleTime为客户端maxIdleTime(ipc.client.connection.maxidletime)的两倍,不过客户端缺省10s,而服务器缺省1s(这样的意义何在?不知道代码是不是有问题)

1
2
3
4
5
//客户端
this.maxIdleTime = remoteId.getMaxIdleTime();
conf.getInt("ipc.client.connection.maxidletime", 10000)
//服务器
this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);

4.2 响应器清理连接

响应器也会在每次处理完一次响应后判断是否达到purge间隔(15min),如果需要purge则对所有包含响应Call的连接通过doPurge进行清理,如果该连接存在从接收到调用请求到目前时间超过purge间隔(15min)还未发送出去的响应,则关闭该连接,具体的过程在前面的doPurge已经分析过了。


5. 客户端关闭

客户端关闭在Client.stop()方法中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void stop() {
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping client");
}

if (!running.compareAndSet(true, false)) {//设置running为false
return;
}

// wake up all connections,中断客户端上所有连接,被中断后连接将关闭
synchronized (connections) {
for (Connection conn : connections.values()) {
conn.interrupt();
}
}

// wait until all connections are closed
while (!connections.isEmpty()) {//等待所有连接关闭
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
}

如上,设置running为false后,中断连接线程,使得连接线程关闭。
对于连接线程来说,要么处于waitForWork中,要么处于receiveResponse中。
waitForWork中,如果不是处于wait等待中,则running为false直接返回false,而在wait等待中时,线程中断抛出InterruptedException同样的从等待中返回,此时running为false,返回false。
receiveResponse中,等到下一次执行读操作,将会抛出InterruptedException作为IOException的子类,然后在catch中调用markClosed设置shouldCloseConnection为true,然后在下次waitForWork中将返回false。而如果receiveResponse中所有数据已经读取完,正在进行处理,则不会调用markClosed,当前receiveResponse正常返回后,下次waitForWork判断中running为false返回,对应最后一种情况

1
2
3
4
5
6
7
8
9
10
11
if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
return true;
} else if (shouldCloseConnection.get()) {
return false;
} else if (calls.isEmpty()) { // idle connection closed or stopped
markClosed(null);
return false;
} else { // get stopped but there are still pending requests
markClosed((IOException)new IOException().initCause(new InterruptedException()));
return false;
}

这都将导致连接的关闭。

因此客户端关闭时,就是关闭所有的连接。

而Client.close主要由stopClient调用

1
2
3
4
5
6
7
8
9
10
11
private void stopClient(Client client) {
synchronized (this) {
client.decCount();//减少客户端rpc调用引用计数
if (client.isZeroReference()) {//如果Client引用计数为0,从clients中移除
clients.remove(client.getSocketFactory());
}
}
if (client.isZeroReference()) {//引用计数为0,关闭客户端
client.stop();
}
}

而因为客户端为Invoker对象所拥有,即间接的为客户端代理所拥有,关闭代理时关闭客户端。
stopClient主要在Invoker的close方法中调用

1
2
3
4
5
6
synchronized private void close() {
if (!isClosed) {
isClosed = true;
CLIENTS.stopClient(client);
}
}

而停止代理时,会调用Invoker的close方法

1
2
3
4
5
public static void stopProxy(VersionedProtocol proxy) {
if (proxy!=null) {
((Invoker)Proxy.getInvocationHandler(proxy)).close();
}
}

因此,总的来说,一个客户端可能被多个代理使用,每一个代理增加客户端的引用计数refCount,当代理关闭时,相应的减少客户端的引用计数,如果代理关闭时客户端的引用计数为0,则关闭客户端。

客户端关闭时,会中断所有的连接,等待所有连接关闭。


6. 服务器关闭

服务器关闭通过服务器的stop方法完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public synchronized void stop() {
LOG.info("Stopping server on " + port);
running = false;//设置running为false
if (handlers != null) {//中断所有处理器
for (int i = 0; i < handlerCount; i++) {
if (handlers[i] != null) {
handlers[i].interrupt();
}
}
}
listener.interrupt();//中断监听器
listener.doStop();//停止监听器
responder.interrupt();//中断响应器
notifyAll();
if (this.rpcMetrics != null) {//关闭统计度量
this.rpcMetrics.shutdown();
}
}

因此将running置为false,分别中断处理器,监听器和响应器,监听器还要调用doStop方法停止,三种线程每次循环都会判断服务器是否运行,没有运行则退出主循环。

  • 处理器对中断的响应
    • 等待从callQueue阻塞队列中获取一个元素时,抛出InterruptedException,退出处理器主程序;
    • 处理器线程执行processResponse过程中,阻塞在channelWrite写响应时,抛出IOException,退出处理器主程序;
  • 监听器中对中断的响应
    在select上阻塞时,中断唤醒,然后running为false,退出主循环,执行接下来的connectionList的清理工作,清理完后退出监听器主程序
  • 响应器对中断的响应
    • waitPending中等待通道注册,抛出InterruptedException,退出响应器主程序;
    • 在select操作上阻塞,中断唤醒,执行完接下来的操作,退出响应器主程序;
    • 响应器执行doAsyncWrite过程中,阻塞在channelWrite写响应时,抛出异常,停止写响应。因为在响应器中会对每一个可写的通道执行doAsyncWrite,则此时会对后续每一个要写响应的通道终止写操作,执行后面的purge操作后,退出响应器主程序;

监听器执行doStop关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
synchronized void doStop() {
if (selector != null) {//选择器不为null
selector.wakeup();//唤醒可能阻塞在select上的操作
Thread.yield();//等待监听器线程执行完
}
if (acceptChannel != null) {
try {
acceptChannel.socket().close();//关闭socket
} catch (IOException e) {
LOG.info(getName() + ":Exception in closing listener socket. " + e);
}
}
readPool.shutdown();//关闭线程池
}

因此服务器关闭时,会关闭处理器,监听器,响应器。而在关闭监听器时,还会关闭服务器下的所有连接connectionList(关闭对应的通道socket),关闭服务器端监听的Socket,关闭线程池。


7. 整帧数据的读写

我们这里将每个调用的数据以及响应视为一帧数据。
那么客户端如何保证是一帧一帧数据的发送到服务器,而不会产生一帧数据中夹带另外一帧数据的,服务器怎么保证接收到完整一帧数据,发送响应时如何保证在连接上一整帧数据的发送,客户端接收响应时如何保证接收一整帧数据的呢。

客户端通过获取连接上的输出流out对象来同步的

1
2
3
4
5
6
7
8
9
10
synchronized (this.out) {
d = new DataOutputBuffer();
d.writeInt(call.id);
call.param.write(d);
byte[] data = d.getData();
int dataLength = d.getLength();
out.writeInt(dataLength); //first put the data length
out.write(data, 0, dataLength);//write the data
out.flush();
}

如上为sendParam中发送一帧数据的代码段,可见每次都只能发送完一帧数据后,释放out对象然后下一帧获取,继续发送,因此即使在多线程中,一个连接上也只能一整帧一整帧的数据发送。

TCP Socket保证了服务器端的按序接收,这样在服务器端便是一整帧的数据接收。
帧长度保存在4个字节的dataLengthBuffer缓冲区中,然后根据此值分配data缓冲区,读取一整帧数据到data缓冲区中。
缓冲区没填满时直接返回接收到的字节数,更新通信时间lastActivity,当data缓冲区填满后,接收了完整的一帧数据,进行处理。
具体过程前面已经分析过了,见readAndProcess
因为服务器接收请求是在Reader线程中,一个Reader线程处理多个连接,因此不存在在多个线程上从一个连接通道上接收数据,导致一帧数据请求被分别被多个线程接收到部分数据的情况,无需同步就能在Reader线程上接收到完整的一帧请求数据。

然后,服务器调用完成后,发送响应。先发4个字节的id,然后是1个字节的状态(0,1,或-1)。
正常返回时通过ObjectWritable封装返回值序列化发送到客户端,ObjectWritable的序列化见序列化
存在异常时,序列化两个字符串(分别为异常类和异常信息),而字符串的序列化其实是先发送字符串字节长度,然后是字符串编码字节结果。
服务器发送响应时,同步连接上的响应队列responseQueue,然后发送一帧响应数据。发送响应时,可能多个handler线程对一个连接上的Call处理完毕,需要添加到responseQueue中,而这时如果有一个handler线程正在发送响应的话,其他handler要等待响应发送完成才能添加到响应队列。同样的,当handler需要添加Call到响应队列中,而这时responder在发送该连接上的响应时,handler线程需要等待响应发送完,获得responseQueue对象管理权。
这样保证了发送响应过程中,响应队列安全写操作以及一帧数据的完整发送。

在客户端接收响应时,因为连接是一个线程,就算多个线程在该连接上远程调用,这些线程调用完后阻塞,是通过这一个连接线程通知响应到达的,只能一帧一帧数据接收,接受完后通知调用线程,因此无需同步便能得到完整的一帧数据。


8. 疑问

  • 客户端的输入流通过socket.getInputStream或创建SocketInputStream创建底层输入流,然后封装PingInputStream以及相应的流。
    输出流通过socket.getOutputStream或创建SocketOutputStream创建底层输出流,然后封装相应的流。这样读写操作便不会对客户端关闭时连接的中断进行响应,也就是说读操作要等待读超时(rpcTimeout大于0)或ping异常(服务器连接关闭),而写操作要等待写超时才会关闭该连接,关闭客户端是否会有很大延迟?