RPC源码阅读---源码分析上篇

Hadoop版本:Hadoop-1.2.1
参考:《Hadoop技术内幕-深入解析Hadoop Common和HDFS架构设计与实现原理》,后文简称技术内幕
RPC中用到的通道和选择器另见通道和选择器,缓冲区另见缓冲区,以及序列化(主要是ObjectWritable)另见序列化
下文所说的[概述]为上篇RPC源码分析—概述
本文不涉及SIMPLE,DIGEST,KERBEROS三种鉴权方式的鉴权过程分析


本文所属RPC源码分析,按照调用请求的步骤对源码进行分析,限于篇幅分为上下两篇。

上篇分析了”客户端代理创建”,”服务器创建启动”,”客户端服务器连接建立”,”客户端头数据发送”,”服务器对头数据验证,鉴权”,至此连接已经建立验证,可以发送正常的调用请求了。本文即为上篇。

而下篇分析了”客户端调用请求”,”服务器对调用请求的处理响应”,”客户端接收响应”,”客户端服务器连接的关闭”,”客户端关闭”,”服务器关闭”等过程,下篇另见RPC源码分析下篇


1. 客户端代理创建

RPC.getProxy获取客户端代理,由[概述]分析知一个代理处理用户到具体服务器的具体协议对应的连接,同时连接属于一个Client,而Client一般由SocketFactory决定,不同SocketFactory对应不同Client。因此getProxy需指定包括SocketFactory用户服务器地址协议这4个信息,另外还需指定连接读操作超时时间。
对应getProxy重要的5个信息,最少需给定服务器地址和使用的协议,其他的都可以默认。SocketFactory默认为默认SocketFactory,用户默认为当前用户,读操作超时时间默认为0,此时会设置为pingInterval(见[概述])。

getProxy的重载:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//指定协议protocol,地址addr。
public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException
{

return getProxy(protocol, clientVersion, addr, conf, NetUtils.getDefaultSocketFactory(conf), 0);
}

//指定协议protocol,地址addr,SockeFactory factory
public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr, Configuration conf,SocketFactory factory) throws IOException
{

UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
return getProxy(protocol, clientVersion, addr, ugi, conf, factory, 0);
}

//指定协议protocol,地址addr,SocketFactory factory,超时rpcTimeout
public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException
{

UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
return getProxy(protocol, clientVersion, addr, ugi, conf, factory, rpcTimeout);
}

//指定协议protocol,地址addr,用户ticket,SocketFactory factory,超时rpcTimeout
public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException
{

return getProxy(protocol, clientVersion, addr, ticket, conf, factory,rpcTimeout, null, true);
}

可见,最终会使用最多参数的getProxy创建代理对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory, int rpcTimeout,
RetryPolicy connectionRetryPolicy, boolean checkVersion) throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
//由protocol,addr,ticket,factory,rpcTimeout等信息创建调用处理器对象Invoker,其中connectionRetryPolicy缺省为重试10次,每次间隔1s
final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy);
//创建动态代理,代理的调用处理器为上面创建的invoker
VersionedProtocol proxy = (VersionedProtocol)Proxy.newProxyInstance( protocol.getClassLoader(), new Class[]{protocol}, invoker);

if (checkVersion) {//默认情况下需要检查客户端和服务器协议的版本
checkVersion(protocol, clientVersion, proxy);
}
return proxy;
}

因此,创建代理主要就是通过传入的参数创建对应的调用处理器对象Invoker,然后通过Java动态代理创建动态代理,创建代理后,默认情况下需要检查客户端和服务器协议的版本号,不匹配的话抛出VersionMismatch异常。因此,创建的代理核心信息保存在调用处理器invoker中。
看一下Invoker的构造:

1
2
3
4
5
6
private Invoker(Class<? extends VersionedProtocol> protocol,
InetSocketAddress address, UserGroupInformation ticket, Configuration conf,
SocketFactory factory,int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException
{

this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf);
this.client = CLIENTS.getClient(conf, factory);
}

因此,就是通过传入的用户,服务器地址,协议,超时时间等信息构建Invoker负责处理连接的标识ConnectionId,即确定了该Invoker负责处理的连接。
而用SocketFactory在缓存中查找所属客户端,若存在现有使用该factory的客户端直接使用,只是增加其引用计数refCount,否则构建一个Client对象,valueClass初始化为ObjectWritable
这些成员具体含义另见[概述]。

1
2
3
4
5
6
7
8
9
10
11
private synchronized Client getClient(Configuration conf,
SocketFactory factory) {
Client client = clients.get(factory);
if (client == null) {//不存在,构建Client对象,valueClass为ObjectClass,并放入缓存
client = new Client(ObjectWritable.class, conf, factory);
clients.put(factory, client);
} else {
client.incCount();//存在的话,client新增了一个Connection,增加引用计数
}
return client;
}

这样,客户端的代理构建完成了,其实主要是构建了代理关联的调用处理器中的connectionId(对应一个Connection),以及client(将代理处理的连接注册到相应客户端)。


服务器创建

创建服务器对象,通过RPC.getServer完成,至少指定服务器绑定的地址和端口,以及实现协议的实例对象,看方法重载:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//给定实现协议的实例对象instance,要绑定的地址bindAddress和端口port
public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf) throws IOException {
return getServer(instance, bindAddress, port, 1, false, conf);
}

//给定instance,bindAddress,port的基础上,指定处理器线程数目(缺省1)
public static Server getServer(final Object instance, final String bindAddress, final int port,
final int numHandlers, final boolean verbose, Configuration conf) throws IOException
{

return getServer(instance, bindAddress, port, numHandlers, verbose, conf, null);
}

//上有基础上,再指定安全管理器secretManager(缺省为null)
public static Server getServer(final Object instance, final String bindAddress, final int port,
final int numHandlers, final boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager) throws IOException
{

return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
}

可见,最终通过RPC.Server的构造函数构建服务器对象。

1
2
3
4
5
6
public Server(Object instance, Configuration conf, String bindAddress,  int port, int numHandlers,
boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) throws IOException
{

super(bindAddress, port, Invocation.class, numHandlers, conf,classNameBase(instance.getClass().getName()), secretManager);
this.instance = instance;
this.verbose = verbose;
}

调用父类org.apache.hadoop.ipc.Server的构造函数,然后初始化实现协议的实例对象instance。父类构造函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
protected Server(String bindAddress, int port, Class<? extends Writable> paramClass, int handlerCount, 
Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager)

throws IOException {

this.bindAddress = bindAddress;//绑定地址
this.conf = conf;
this.port = port;//绑定端口
this.paramClass = paramClass;//客户端传过来的包含调用方法参数等信息的参数类,为Invocation
this.handlerCount = handlerCount;//处理器线程数目,默认1
this.socketSendBufferSize = 0;
//待处理Call队列callQueue中最大的待处理Call数,默认100
this.maxQueueSize = handlerCount * conf.getInt(IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
//响应缓冲最大大小,1MB
this.maxRespSize = conf.getInt(IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY, IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
//监听器listener中Reader线程数目,默认1
this.readThreads = conf.getInt(IPC_SERVER_RPC_READ_THREADS_KEY,IPC_SERVER_RPC_READ_THREADS_DEFAULT);
this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);//构造callQueue
this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);//清理空闲连接时一次最多清理的个数
//服务器端连接超过此值应该执行清理(连接数过多,一个连接为一个线程,资源消耗大)
this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);//是否鉴权
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();

// Start the listener here and let it bind to the port
listener = new Listener();//构建监听器
this.port = listener.getAddress().getPort();
this.rpcMetrics = RpcInstrumentation.create(serverName, this.port);
this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);//是否开启Nagel算法,默认false开启

// Create the responder here
responder = new Responder();//响应器

if (isSecurityEnabled) {
SaslRpcServer.init(conf);
}
}

初始化相应成员后,构建监听器listener和响应器responder,而对应的多个处理器在start方法中创建。

构建监听器时,会打开一个选择器,然后将ServerSocketChannel注册到选择器上,注册Accept操作,由该选择器负责处理到来的连接请求。
同时创建readThreads个Reader线程并启动,每一个Reader线程中都有一个Selector。
由listener中的Selector处理Accept的连接请求,会创建SocketChannel对象,然后从所有的Reader线程中选择下一个Reader线程,将accept的SocketChannel注册在选择的Reader线程的Selector中,注册读操作,这样便由该Reader线程负责该连接到来的数据请求。

监听器构造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();//打开服务器端唯一的ServerSocketChannel
acceptChannel.configureBlocking(false);//配置成非阻塞模式

// Bind the server socket to the local host and port
bind(acceptChannel.socket(), address, backlogLength);//绑定到本地地址端口
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
// create a selector;
selector= Selector.open();//打开选择器,该选择器用于注册accept操作
readers = new Reader[readThreads];//Reader线程
readPool = Executors.newFixedThreadPool(readThreads);//固定大小线程池,用于创建Reader线程
for (int i = 0; i < readThreads; i++) {
Selector readSelector = Selector.open();//每个Reader线程对应一个Selector,用于注册accept返回的SocketChannel上的read操作
Reader reader = new Reader(readSelector);
readers[i] = reader;
readPool.execute(reader);//启动线程
}

// Register accepts on the server socket with the selector.
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);//ServerSocketChannel注册Accept操作
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
}

响应器构建

响应器的创建比较简单,打开用于为写操作注册的Selector

1
2
3
4
5
6
Responder() throws IOException {
this.setName("IPC Server Responder");
this.setDaemon(true);
writeSelector = Selector.open(); // create a selector
pending = 0;
}


服务器启动

服务器启动在Serverstart方法中,一般通过RPC.getServer创建服务器后,调用服务器的start方法启动服务器

1
2
3
4
5
6
7
8
9
10
public synchronized void start() {
responder.start();//启动响应器
listener.start();//启动监听器
handlers = new Handler[handlerCount];

for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i);//创建处理器
handlers[i].start();//启动处理器
}
}

分别启动响应器,监听器,创建对应数量的处理器并启动。处理器没有相应的成员属性,只是简单的设置了处理器线程名字以及将其设为后台线程

1
2
3
4
public Handler(int instanceNumber) {
this.setDaemon(true);
this.setName("IPC Server handler "+ instanceNumber + " on " + port);
}


代理方法调用

通过getProxy创建代理对象后,如果在代理对象上调用协议的方法,则像Java动态代理一样,会将该调用转发给其调用处理器Invoker的invoke方法:

1
2
3
4
5
6
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
...//日志相关
ObjectWritable value = (ObjectWritable)client.call(new Invocation(method, args), remoteId);
...//日志相关
return value.get();
}

如上,将调用的方法和参数封装成Invocation对象,该对象正是服务器的paramClass即接受的参数类。对应的在客户端会将包含方法和参数信息的Invocation序列化发送到服务器,而服务器会创建Invocation对象,反序列化到来的数据,便接收到调用请求。
封装成Invocation对象后,通过Invoker所属的Client,调用call方法执行远程过程调用,该调用对应的连接标识为Invoker所属的remoteId
调用的返回值类型为ObjectWritable,在Client构建时可见初始化valueClassObjectWritable。因为对应在服务器端发送给客户端的相应是通过ObjectWritable序列化的,因此客户端这边有响应可接收时,通过构造ObjectWritable对象反序列化即为返回值数据。

Client的call方法中,继续将方法和参数信息对象Invocation封装成一个Call对象,对应客户端的一次远程调用(另见[概述])。然后获取到服务器端的连接,若连接还未建立,则需建立到服务器端的连接,发送rpcHeaderConnectionHeader,否则直接从缓冲区中获取存在的连接。获取连接后将Invocation对象信息反序列化到服务器端,休眠等待。等待的过程中由所属的Connection线程读取服务器到来的响应,读取到响应(成功或失败)时,唤醒该等待线程,完成远程过程调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException {
Call call = new Call(param);//封装成Call对象,对应为客户端的一个远程过程调用
Connection connection = getConnection(remoteId, call);//获取连接,若缓冲区中没有则需创建并发送rpcHeader和ConnectionHeader到服务器
connection.sendParam(call); // send the parameter,发送调用的方法和参数信息
boolean interrupted = false;
synchronized (call) {
while (!call.done) {
try {
call.wait(); // wait for the result,休眠等待调用完成,由Connection线程读取响应并唤醒该等待的线程
} catch (InterruptedException ie) {
// save the fact that we were interrupted
interrupted = true;
}
}

if (interrupted) {
// set the interrupt flag now that we are done waiting
Thread.currentThread().interrupt();
}

if (call.error != null) {//远程调用中出现异常,直接抛出给上层,对应为协议方法捕获,就像本地调用一样
if (call.error instanceof RemoteException) {
call.error.fillInStackTrace();
throw call.error;
} else { // local exception
// use the connection because it will reflect an ip change, unlike
// the remoteId
throw wrapException(connection.getRemoteAddress(), call.error);
}
} else {//正常返回,获取值
return call.value;
}
}
}


建立连接

客户端

如上,客户端建立连接对应在getConnection方法中获取一个连接,若连接不在客户端维护的缓冲区connections中,则需创建连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private Connection getConnection(ConnectionId remoteId, Call call) throws IOException, InterruptedException {
if (!running.get()) {
throw new IOException("The client is stopped");
}
Connection connection;
do {
synchronized (connections) {
connection = connections.get(remoteId);
if (connection == null) {//connections中不存在
connection = new Connection(remoteId);//创建新的连接对象
connections.put(remoteId, connection);//放入缓存中
}
}
} while (!connection.addCall(call));//获取的连接中添加Call对象

//we don't invoke the method below inside "synchronized (connections)"
//block above. The reason for that is if the server happens to be slow,
//it will take longer to establish a connection and that will slow the
//entire system down.
connection.setupIOstreams();//如果还没建立到服务器端的连接,需连接到服务器,并发送rpcHeader和ConnectionHeader
return connection;
}

如上,不存在连接,通过Connection构造方法构造新的连接对象,然后addCall添加到有效的连接中,最终通过setupIOstreams建立连接并发送头部数据,开启连接线程,接收响应。

Connection构造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public Connection(ConnectionId remoteId) throws IOException {
this.remoteId = remoteId;
this.server = remoteId.getAddress();//服务器地址
if (server.isUnresolved()) {
throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());
}
this.maxIdleTime = remoteId.getMaxIdleTime();//连接的最大空闲时间ipc.client.connection.maxidletime,默认10s
this.connectionRetryPolicy = remoteId.connectionRetryPolicy;//连接重试策略,默认重试10次,每次间隔1s
this.tcpNoDelay = remoteId.getTcpNoDelay();//Nagle算法,ipc.client.tcpnodelay,默认为false即默认开启Nagel算法
this.pingInterval = remoteId.getPingInterval();//ping周期,ipc.ping.interval,默认1min
if (LOG.isDebugEnabled()) {
LOG.debug("The ping interval is" + this.pingInterval + "ms.");
}
this.rpcTimeout = remoteId.getRpcTimeout();//read操作超时,默认0,0时设为pingInterval
UserGroupInformation ticket = remoteId.getTicket();
Class<?> protocol = remoteId.getProtocol();
this.useSasl = UserGroupInformation.isSecurityEnabled();
...//鉴权相关

if (!useSasl) {
authMethod = AuthMethod.SIMPLE;
} else if (token != null) {
authMethod = AuthMethod.DIGEST;
} else {
authMethod = AuthMethod.KERBEROS;
}

//创建连接头信息,包括使用的协议,用户和鉴权方法
header = new ConnectionHeader(protocol == null ? null : protocol.getName(), ticket, authMethod);

if (LOG.isDebugEnabled())
LOG.debug("Use " + authMethod + " authentication for protocol " + protocol.getSimpleName());

this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " + remoteId.getAddress().toString() +
" from " + ((ticket==null)?"an unknown user":ticket.getUserName()));
this.setDaemon(true);
}

addCall

获取connection后添加Call对象至connection中:

1
2
3
4
5
6
7
private synchronized boolean addCall(Call call) {
if (shouldCloseConnection.get())//连接应该关闭,不能添加至该连接
return false;
calls.put(call.id, call);
notify();//唤醒在run方法中因为calls为空而在休眠等待的connection线程
return true;
}

如上,addCall可能因为对应的connection要关闭了,而不能添加成功,添加失败时另外查找或创建新的连接,因此需要在do..while循环中,保证循环结束时Call已经添加到一个可用的正常的连接中了。

setupIOstreams

由上,获取正常连接后,通过setupIOstreams连接到服务器,并发送rpcHeader和ConnectionHeader,当然,如果已经连接了,便不需要再连接了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private synchronized void setupIOstreams() throws InterruptedException {
if (socket != null || shouldCloseConnection.get()) {
return;
}//socket已经存在,表示已经建立了到服务器端的连接,并发送了rpcHeader和ConnectionHeader,直接返回

try {
...
Random rand = null;
while (true) {
setupConnection();//连接到远程服务器,每次connect最多20s,最多尝试45次,每次重试间隔1s,即如果连接不上,要等待(20+1)s*45=15min 45s才放弃
InputStream inStream = NetUtils.getInputStream(socket);//socket输入流,读取响应
OutputStream outStream = NetUtils.getOutputStream(socket);//socket输出流,发送数据
writeRpcHeader(outStream);//建立了连接,向服务器写rpcHeader数据,包括4个字节魔数(hrpc),1个字节版本号(4),1个字节鉴权方法码
...//鉴权相关

//socket输入流包裹了PingInputStream,在读操作超时时,如果没有设置rpcTimeout,则周期性的(默认1min)发送ping数据到服务器,保持连接,
//而如果设置了rpcTimeout,则会抛出异常,关闭连接,具体见[概述]PingInputStream
this.in = new DataInputStream(new BufferedInputStream(new PingInputStream(inStream)));
this.out = new DataOutputStream(new BufferedOutputStream(outStream));
writeHeader();//向服务器写连接头,包括协议,用户信息,鉴权方法

// update last activity time
touch();//更新本连接上次活动时间lastActivity

// start the receiver thread after the socket connection has been set up
start();//开启本线程
return;
}
} catch (Throwable t) {
if (t instanceof IOException) {//连接服务器异常,或者写rpcHeader和连接头异常
markClosed((IOException)t);//设置shouldCloseConnection为true,然后唤醒阻塞在该连接上的线程(此时应该还没有)
} else {
markClosed(new IOException("Couldn't set up IO streams", t));
}
close();//关闭连接,从connections中注销,清理资源
}
}

如上,由setupConnection连接到服务器,然后writeRpcHeader写rpc头,writeHeader写连接头,创建输入输出流之后开启本线程,接收响应。

连接的输入流包裹了PingInputStream,其read方法如下:

1
2
3
4
5
6
7
8
9
public int read() throws IOException {
do {
try {
return super.read();
} catch (SocketTimeoutException e) {
handleTimeout(e);
}
} while (true);
}

read超时时,由handleTimeout处理超时,而超时时间如果在getProxy中设置rpcTimeout了大于0,则为rpcTimeout,此时pingInterval也为rpcTimeout,否则超时时间为pingInterval(ipc.ping.interval,默认1min),超时时间设置见后面setupConnection分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void handleTimeout(SocketTimeoutException e) throws IOException {
if (shouldCloseConnection.get() || !running.get() || rpcTimeout > 0) {//如果rpcTimeout大于0,直接抛出异常
throw e;
} else {//不应该关闭当前连接且rpcTimeout为0,则周期性发送ping到服务器端
sendPing();
}
}
private synchronized void sendPing() throws IOException {
long curTime = System.currentTimeMillis();
if ( curTime - lastActivity.get() >= pingInterval) {//应该发送ping
lastActivity.set(curTime);
synchronized (out) {
out.writeInt(PING_CALL_ID);
out.flush();
}
}
}

如上连接超时时,本端连接不应该关闭,如果rpcTimeout大于0,则直接抛出异常,否则发送ping包到服务器,而如果服务器端连接已经关闭,则发送ping时也会抛出异常,异常传递到read方法。因此超时时如果rpcTimeout大于0,或者对端关闭了连接,异常都会传递到read方法。

后面分析客户端Connection线程接收响应的时,在read*操作上如果抛出异常,则会关闭客户端连接。

setupConnection
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private synchronized void setupConnection() throws IOException {
short ioFailures = 0;
short timeoutFailures = 0;//当前连接超时次数
while (true) {
try {
this.socket = socketFactory.createSocket();//通过SocketFactory创建Socket
this.socket.setTcpNoDelay(tcpNoDelay);//开启或禁用Nagel算法,默认开启
...//根据鉴权信息,可能绑定本地地址

// connection time out is 20s
NetUtils.connect(this.socket, server, 20000);//连接到服务器,最多阻塞20s
if (rpcTimeout > 0) {//如果getProxy设置的rpcTimeout大于0,则用rpcTimeout覆盖pingInterval
pingInterval = rpcTimeout; // rpcTimeout overwrites pingInterval
}

//设置socket的read超时时间,默认与pingInterval一样,1min,如果rpcTimeout大于0,则为rpcTimeout,此时pingInterval也为rpcTimeout
this.socket.setSoTimeout(pingInterval);
return;//连接建立成功返回
} catch (SocketTimeoutException toe) {//连接超时
if (updateAddress()) {//超时后,先尝试更新服务器地址
timeoutFailures = ioFailures = 0;
}
//若timeoutFailures小于45,则关闭连接(关闭socket),休眠1s中等待下次继续连接,即最多尝试45次,耗时(20+1)s*45=15min 45s
handleConnectionFailure(timeoutFailures++, 45, toe);
} catch (IOException ie) {
//非连接超时造成的IO异常,使用Connection的connectionRetryPolicy判断是否应该重试连接,默认情况下会重试10次,每次间隔也是1s
if (updateAddress()) {
timeoutFailures = ioFailures = 0;
}
handleConnectionFailure(ioFailures++, ie);//connectionRetryPolicy处理重试
}
}
}

如上通过SocketFactory(所属Client对应的)创建Socket,然后连接到服务器,每次连接最多阻塞20s,如果连接超时,则最多重试45次,每次重试间隔1s,因此如果经过(20+1)s*45=15分钟45s仍然连接超时,则抛出异常。而如果不是连接超时造成的异常,则通过Connection的connectionRetryPolicy进行重试管理,默认重试10次,每次间隔1s。

writeRpcHeader
1
2
3
4
5
6
7
8
private void writeRpcHeader(OutputStream outStream) throws IOException {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
// Write out the header, version and authentication method
out.write(Server.HEADER.array());//hrpc,4个字节
out.write(Server.CURRENT_VERSION);//版本4,1个字节
authMethod.write(out);//鉴权方法码,1个字节
out.flush();
}

可见,rpcHeader先写4个字节的魔数hrpc,然后是1个字节的版本号(4),再接着是1个字节鉴权方法码(SIMPLE 80,KERBEROS 81,DIGEST 82),共计6个字节

writeHeader
1
2
3
4
5
6
7
8
9
10
private void writeHeader() throws IOException {
// Write out the ConnectionHeader
DataOutputBuffer buf = new DataOutputBuffer();
header.write(buf);//先将ConnectionHeader写到数据缓冲区中

// Write out the payload length
int bufLen = buf.getLength();
out.writeInt(bufLen);//先写header所占长度
out.write(buf.getData(), 0, bufLen);//写header序列化实际数据
}

可见,连接头ConnectionHeader,会先写序列化后所占长度4个字节,然后是header的实际数据,序列化中至少会写出协议,然后根据用户和鉴权方法写相应的值,具体键ConnectionHeader的序列化方法。

服务器

由上,建立连接时,客户端最终通过socket connect服务器,连接后,发送rpcHeader和ConnectionHeader,来看看对应服务器怎么处理。
服务器由RPC.getServer创建服务器后,调用start方法启动listener,handler,responder多个线程。

处理客户端到来连接请求以及数据请求由listener负责。

由上服务器的创建getServer过程,创建listener对象时,相应的创建了readThreads个Reader线程,每个Reader有一个Selector。
同时,listener的ServerSocketChannel成员acceptChannel注册在Selector成员selector上,且注册为Accept操作。
因此成员selector负责到来的连接请求,检测到有连接请求到来时,由acceptChannel执行accept获取SocketChannel得到连接。然后选择一个Reader,将获得的SocketChannel注册到Reader的选择器上,注册Read操作。

因此,listener的成员selector负责到来的连接请求,而readers负责连接上的数据读取。

接受连接

客户端通过socket的connect请求连接,服务器端创建listener时acceptChannel注册在selector上,注册Accept操作,然后在listener线程主程序中监听到来的连接请求。
listener线程主程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public void run() {
LOG.info(getName() + ": starting");
SERVER.set(Server.this);
while (running) {
SelectionKey key = null;
try {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
doAccept(key);//有连接请求,doAccept处理
}
} catch (IOException e) {
}
key = null;
}
}
...//连接数过多导致内存不足或者其他异常,关闭当前连接或者清理所有连接,然后休眠1min
}
LOG.info("Stopping " + this.getName());
//退出时资源清理,包括关闭通道,选择器,关闭所有的连接
synchronized (this) {
try {
acceptChannel.close();//关闭通道
selector.close();//关闭选择器
} catch (IOException e) { }

selector= null;
acceptChannel= null;

// clean up all connections
while (!connectionList.isEmpty()) {//关闭所有连接
closeConnection(connectionList.remove(0));
}
}
}

因此,客户端有到来的连接时,最终由doAccept处理,传入的key为acceptChannel注册在selector上获取的SelectionKey

doAccept
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
Connection c = null;
ServerSocketChannel server = (ServerSocketChannel) key.channel();//获取key绑定的通道,为acceptChannel
SocketChannel channel;
while ((channel = server.accept()) != null) {//调用accept得到连接对应的SocketChannel
channel.configureBlocking(false);//新获得的通道配置成非阻塞模式
channel.socket().setTcpNoDelay(tcpNoDelay);//设置Nagle算法
Reader reader = getReader();//获取下一个应该使用的Reader,每次从readers数组中读取下一个
try {
reader.startAdd();//将Reader线程从select操作上唤醒,并休眠Reader线程,添加完后再唤醒
SelectionKey readKey = reader.registerChannel(channel);//在Reader的选择器上注册SocketChannel,注册Read操作
c = new Connection(readKey, channel, System.currentTimeMillis());//创建Connection对象
readKey.attach(c);//将readKey对应的连接对象Connection作为附件附加在注册的SelectionKey上
synchronized (connectionList) {
connectionList.add(numConnections, c);//添加到connectionList中
numConnections++;//增加连接数目
}
if (LOG.isDebugEnabled())
LOG.debug("Server connection from " + c.toString() +
"; # active connections: " + numConnections +
"; # queued calls: " + callQueue.size());
} finally {
reader.finishAdd();//唤醒Reader线程
}
}
}

acceptChannel中accept,获得SocketChannel,然后选择一个Reader,注册在Reader的选择器上,注册读操作,并创建该SocketChannel对应的Connection对象,将Connection对象添加到注册所得SelectionKey的附件中。
其中的reader.startAdd,如果该Reader线程阻塞在select操作上,唤醒它,然后将adding至为true,导致线程睡眠

1
2
3
4
public void startAdd() {
adding = true;
readSelector.wakeup();
}

对应的Reader线程主程序:

1
2
3
4
5
6
7
8
9
10
public void run() {
synchronized (this) {
while (running) {
SelectionKey key = null;
try {
readSelector.select();//如果当前线程阻塞在此,readSelector.wakeup将唤醒,直接返回
while (adding) {//adding置true,等待1s,等待添加完成
this.wait(1000);
}
...

添加完成后的finally中reader.finishAdd,通知睡眠的Reader线程,并将adding置false

1
2
3
4
public synchronized void finishAdd() {
adding = false;
this.notify();//唤醒在wait中的线程
}

这样,服务器端便处理了客户端的connect请求,创建SocketChannel对象,注册到Reader线程的选择器中,创建Connection对象,附加到SelectionKey中,然后由Reader操作负责读取带来的数据请求。

处理头数据

由上,客户端连接上服务器后,会先发送rpcHeaderConnectionHeader给服务器,接下来看看服务器怎么处理。

如上,服务器对连接上的数据请求由Reader线程处理,Reader线程的主程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void run() {
LOG.info("Starting SocketReader");
synchronized (this) {
while (running) {
SelectionKey key = null;
try {
readSelector.select();//select操作
while (adding) {//正在添加SocketChannel,休眠,添加完后会被唤醒
this.wait(1000);
}
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isValid()) {
if (key.isReadable()) {
doRead(key);//处理key对应的数据请求
}
}
key = null;
}
}
...//异常处理,进行日志记录
}
}
}

如上,开始部分对应建立连接过程中添加SocketChannel到Reader中的逻辑,上面已经分析过。之后对于到来的数据请求,通过doRead处理,key为SocketChannel注册到选择器上的key,且包含附件Connection。

doRead

其实doRead操作不仅处理头数据,还处理实际的调用请求数据,这里我们先分析头数据的处理过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void doRead(SelectionKey key) throws InterruptedException {
int count = 0;
Connection c = (Connection)key.attachment();//获取附件,连接对象
if (c == null) {//没有附件,异常,直接返回
return;
}
c.setLastContact(System.currentTimeMillis());//更新客户端和服务器通信时间lastContact

try {
count = c.readAndProcess();//读数据并处理
} catch (InterruptedException ieo) {//中断异常,在Reader的主程序中会进行日志记录
LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
throw ieo;
} catch (Exception e) {//鉴权失败异常导致关闭连接
LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
count = -1; //so that the (count < 0) block is executed
}
if (count < 0) {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": disconnecting client " + c + ". Number of active connections: "+ numConnections);
closeConnection(c);
c = null;
}
//正常情况,更新通信时间。这里注意,count大于0时只表明读取到了数据,可能为部分数据还未处理,需要等待下一次读取操作,也可能
//读取了一个请求的所有数据,并进行了处理。
else {
c.setLastContact(System.currentTimeMillis());
}
}

readAndProcess

readAndProcess根据标识rpcHeaderReadheaderRead两个标识,分别进行不同的处理。
rpcHeaderRead为false,则应该先读取并处理rpcHeader数据。当rpcHeaderRead为true时,若headerRead为false,则还需读取并处理ConnectionHeader数据。当rpcHeaderheaderRead都为true时,则进入到实际调用请求数据的处理过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
public int readAndProcess() throws IOException, InterruptedException {
while (true) {
/* Read at most one RPC. If the header is not read completely yet
* then iterate until we read first RPC or until there is no data left.
*/

int count = -1;
if (dataLengthBuffer.remaining() > 0) {//dataLengthBuffer为4个字节
//读取数据到缓冲区dataLengthBuffer中,尽可能填充完缓冲区剩余部分,不过不保证一定能填充完,未填充完时直接返回,等待下一次读取操作
count = channelRead(channel, dataLengthBuffer);
if (count < 0 || dataLengthBuffer.remaining() > 0) //读取错误或者没有填充完缓冲区,返回
return count;
}

if (!rpcHeaderRead) {//还未处理rpcHeader
//Every connection is expected to send the header.
//2字节缓冲,rpcHeader总共6字节,魔数4字节现在保存在dataLengthBuffer中,rpcHeaderBuffer保存版本号和鉴权方法码
if (rpcHeaderBuffer == null) {
rpcHeaderBuffer = ByteBuffer.allocate(2);
}
count = channelRead(channel, rpcHeaderBuffer);
if (count < 0 || rpcHeaderBuffer.remaining() > 0) {//同上,缓冲区读完才会继续处理,否则直接返回读取到的大小
return count;
}
int version = rpcHeaderBuffer.get(0);//第一字节为版本号(4)
byte[] method = new byte[] {rpcHeaderBuffer.get(1)};//第二个字节为鉴权方法码
authMethod = AuthMethod.read(new DataInputStream( new ByteArrayInputStream(method)));
dataLengthBuffer.flip();//
if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {//验证魔数hrpc和版本号,正确的话才会继续处理,否则关闭连接
//Warning is ok since this is not supposed to happen.
LOG.warn("Incorrect header or version mismatch from " +
hostAddress + ":" + remotePort +
" got version " + version +
" expected version " + CURRENT_VERSION);
return -1;//返回-1时,关闭连接
}
dataLengthBuffer.clear();
if (authMethod == null) {
throw new IOException("Unable to read authentication method");
}
if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
AccessControlException ae = new AccessControlException("Authorization ("
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION
+ ") is enabled but authentication ("
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION
+ ") is configured as simple. Please configure another method "
+ "like kerberos or digest.");
//鉴权失败,发送响应,对应的Call id为-1,状态为致命错误,发送异常类和异常信息。客户端读取到该响应时关闭连接
setupResponse(authFailedResponse, authFailedCall, Status.FATAL,null, ae.getClass().getName(), ae.getMessage());
responder.doRespond(authFailedCall);//添加到响应队列中,并相应处理
throw ae;
}
if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
doSaslReply(SaslStatus.SUCCESS, new IntWritable(
SaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null);
authMethod = AuthMethod.SIMPLE;
// client has already sent the initial Sasl message and we
// should ignore it. Both client and server should fall back
// to simple auth from now on.
skipInitialSaslHandshake = true;
}
if (authMethod != AuthMethod.SIMPLE) {
useSasl = true;
}
rpcHeaderBuffer = null;
rpcHeaderRead = true;//正常情况读完rpcHeaderRead并验证
continue;
}

if (data == null) {
dataLengthBuffer.flip();
dataLength = dataLengthBuffer.getInt();//处理完了rpcHeaderRead,则dataLengthBuffer中保存的为接下来的数据长度或ping数据(-1)

if (dataLength == Client.PING_CALL_ID) {//客户端发送的ping数据-1
if(!useWrap) { //covers the !useSasl too
dataLengthBuffer.clear();
return 0; //ping message,返回0时,会更新连接时间lastContact
}
}
//这之后的便是数据长度了
if (dataLength < 0) {
LOG.warn("Unexpected data length " + dataLength + "!! from " +
getHostAddress());
}
data = ByteBuffer.allocate(dataLength);//根据数据长度分配data缓冲区
}

count = channelRead(channel, data);//从通道尽可能读取足够填充缓冲区剩余空间的数据

if (data.remaining() == 0) {//缓冲区没有剩余空间了,即发送过来的一个帧的数据读取玩,开始处理
dataLengthBuffer.clear();
data.flip();
if (skipInitialSaslHandshake) {
data = null;
skipInitialSaslHandshake = false;
continue;
}
boolean isHeaderRead = headerRead;
if (useSasl) {//简单鉴权,这里不分析
saslReadAndProcess(data.array());
} else {//处理一个帧数据
processOneRpc(data.array());
}
data = null;
if (!isHeaderRead) {
continue;
}
}
//这里隐含缓冲区还有剩余空间,即一个帧数据未读取玩,直接返回读取到的字节数,更新lastContact,等待下一次读取
return count;
}
}

由上,如果rpcHeaderRead为false时,需要处理rpcHeader,这时根据发送端发送过来的rpcHeader格式:
4字节魔数hrpc+1字节版本号+1字节鉴权方法码
这是dataLengthBuffer里面保存的便是魔数hrpc,然后分配两个字节大小的rpcHeaderBuffer保存剩下的版本号和鉴权方法码,进行相应的验证。

而如果rpcHeader已经处理了,则一般情况下dataLengthBuffer里面保存的为一个帧数据的长度,根据这个长度分配data缓冲区的大小,然后进行相应长度数据的读取。不过在建立连接后,客户端read服务器响应超时时,客户端会发送ping数据,以保持连接,该ping数据作为一帧数据来看的话为int值-1,所以如果dataLengthBuffer值为-1时,则是客户端的ping数据,这时简单的返回0,然后更新连接时间即可。
所有上面提到的缓冲区都要注意的是,channelRead会尽量从通道读取数据填满缓冲区,但也不能保证。当缓冲区还有剩余空间时,我们还需等待下一数据读取操作的到来,这时简单的返回读取到的数据大小,更新连接时间即可。相应缓冲区填满后,才能进行相应的处理。

channelRead方法会尽量从通道中读取足够多的数据以填满缓冲区,但不保证。当缓冲区剩余空间大于NIO_BUFFER_LIMIT(8KB)时,会循环从通道读取数据到缓冲区,每次读取NIO_BUFFER_LIMIT大小,按照注释是说能够避免JDK在缓冲区增长时分配太多的直接内存缓冲,但是这里应该不会增长,只是读取最大为剩余空间的数据到缓冲区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private int channelRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ? channel.read(buffer) : channelIO(channel, null, buffer);
if (count > 0) {
rpcMetrics.incrReceivedBytes(count);
}
return count;
}
//channelRead和channelWrite同时使用,读时writeCh为null,写时readCh为null。
private static int channelIO(ReadableByteChannel readCh, WritableByteChannel writeCh, ByteBuffer buf) throws IOException {
int originalLimit = buf.limit();
int initialRemaining = buf.remaining();
int ret = 0;

while (buf.remaining() > 0) {
try {
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);//一次读取到缓冲区的数据大小,最多8KB
buf.limit(buf.position() + ioSize);//设置新的limit值为position+要读取的大小

ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);

if (ret < ioSize) {
break;
}
} finally {
buf.limit(originalLimit);//设为原来的limit值
}
}
int nBytes = initialRemaining - buf.remaining();
return (nBytes > 0) ? nBytes : ret;
}

处理完rpcHeader和ping包后,由processOneRpc处理到来的数据帧,一帧数据已经保存在data缓冲区中了。

1
2
3
4
5
6
7
8
9
10
11
12
13
private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
if (headerRead) {
processData(buf);
} else {
processHeader(buf);
headerRead = true;
if (!authorizeConnection()) {
throw new AccessControlException("Connection from " + this
+ " for protocol " + header.getProtocol()
+ " is unauthorized for user " + user);
}
}
}

可见,如果headerRead为false,即还未读取连接头,则通过processHeader处理连接头,并通过authorizeConnection根据鉴权方法以及相关信息进行鉴权。否则若连接头已经处理,则通过processData处理实际调用请求数据。这里在连接过程,先分析连接头的处理,实际的调用请求数据处理见后。

processHeader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void processHeader(byte[] buf) throws IOException {
DataInputStream in = new DataInputStream(new ByteArrayInputStream(buf));
header.readFields(in);//反序列化连接头对象
try {
String protocolClassName = header.getProtocol();//获取客户端请求的协议
if (protocolClassName != null) {
protocol = getProtocolClass(header.getProtocol(), conf);//初始化该连接对应的协议类
}
} catch (ClassNotFoundException cnfe) {
throw new IOException("Unknown protocol: " + header.getProtocol());
}

UserGroupInformation protocolUser = header.getUgi();//获取客户端的用户信息
...//初始化连接的user成员
}

连接的鉴权方法authMethod在处理rpcHeader时已经初始化了,这里反序列化连接头后,从连接头中获知客户端请求连接的协议,然后初始化该连接的protocol,以及根据连接头的用户信息,初始化该连接的user成员。

初始化相关成员后,使用authorizedConnection对客户端进行鉴权

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private boolean authorizeConnection() throws IOException {
try {
// If auth method is DIGEST, the token was obtained by the
// real user for the effective user, therefore not required to
// authorize real user. doAs is allowed only for simple or kerberos
// authentication
if (user != null && user.getRealUser() != null
&& (authMethod != AuthMethod.DIGEST)) {
ProxyUsers.authorize(user, this.getHostAddress(), conf);
}
authorize(user, header, getHostInetAddress());
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully authorized " + header);
}
rpcMetrics.incrAuthorizationSuccesses();
} catch (AuthorizationException ae) {
rpcMetrics.incrAuthorizationFailures();
//鉴权异常失败,发送鉴权异常响应,对应Call为-1,状态FATAL,发送异常类和异常信息
setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null, ae.getClass().getName(), ae.getMessage());
responder.doRespond(authFailedCall);
return false;
}
return true;
}

如果鉴权失败,则发送鉴权异常响应,对应状态为FATAL,客户端会关闭连接。而在服务器端,在processOneRpc中抛出AccessControlException这个IOException最终传递到doRead中的readAndProcess中,由catch捕获,将count即读取到的数据大小置为-1,后续处理中关闭服务器端连接。doRead中IOException异常处理见上。

因此,我们这里分析了服务器端连接建立的过程,由监听器listener监听到来的连接,并注册到Reader线程的选择器中,由Reader线程处理到来的数据请求。
客户端数据到来时,与客户端发送数据流程对应的,服务器端先处理rpcHeader,判断魔数和版本号是否相符,不相符的话给客户端发送鉴权失败异常响应,该响应对应的Call对象固定,其id为-1,状态为FATAL,客户端将会关闭连接,而服务器置读取到的数据count为-1,也关闭连接。正常的话初始化连接的authMethod成员。这里注意魔数hrpc长度为4个字节,刚好保存在后续用来保存一个帧数据长度的缓冲区dataLengthBuffer中。

处理完rpcHeader之后,便是对一帧数据的处理。一帧数据可能为ping数据,为4个字节值为-1,这时简单的置读取到数据count为0,更新连接时间lastContact。

否则dataLengthBuffer便是一帧数据的长度,根据该长度分配data缓冲区。
此时一帧数据可能为连接头,可能为实际的调用请求数据。没有读取连接头的话,则接下来的一帧数据为连接头。
对连接头进行处理,读取发送过来的连接头初始化该连接的protocoluser,然后根据不同的鉴权方式进行鉴权,鉴权失败,发送Call id为-1的响应,相应状态为FATAL,客户端关闭连接,而这里服务器抛出IOException,在doRead的异常处理中置count为-1,关闭服务器端连接。

处理完这些信息后,连接建立了,鉴权完成了,可以在该连接上进行正常的调用请求了。具体过程见下篇