Hadoop版本:Hadoop-1.2.1
参考:《Hadoop技术内幕-深入解析Hadoop Common和HDFS架构设计与实现原理》,后文简称技术内幕
RPC中用到的通道和选择器另见通道和选择器,缓冲区另见缓冲区,以及序列化(主要是ObjectWritable)另见序列化
下文所说的[概述]为上篇RPC源码分析—概述
本文不涉及SIMPLE,DIGEST,KERBEROS三种鉴权方式的鉴权过程分析
本文所属RPC源码分析,按照调用请求的步骤对源码进行分析,限于篇幅分为上下两篇。  
上篇分析了”客户端代理创建”,”服务器创建启动”,”客户端服务器连接建立”,”客户端头数据发送”,”服务器对头数据验证,鉴权”,至此连接已经建立验证,可以发送正常的调用请求了。本文即为上篇。
而下篇分析了”客户端调用请求”,”服务器对调用请求的处理响应”,”客户端接收响应”,”客户端服务器连接的关闭”,”客户端关闭”,”服务器关闭”等过程,下篇另见RPC源码分析下篇。
1. 客户端代理创建
由RPC.getProxy获取客户端代理,由[概述]分析知一个代理处理用户到具体服务器的具体协议对应的连接,同时连接属于一个Client,而Client一般由SocketFactory决定,不同SocketFactory对应不同Client。因此getProxy需指定包括SocketFactory,用户,服务器地址,协议这4个信息,另外还需指定连接读操作超时时间。
对应getProxy重要的5个信息,最少需给定服务器地址和使用的协议,其他的都可以默认。SocketFactory默认为默认SocketFactory,用户默认为当前用户,读操作超时时间默认为0,此时会设置为pingInterval(见[概述])。
getProxy的重载:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27//指定协议protocol,地址addr。
public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
  long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
    return getProxy(protocol, clientVersion, addr, conf, NetUtils.getDefaultSocketFactory(conf), 0);
}
//指定协议protocol,地址addr,SockeFactory factory
public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
    long clientVersion, InetSocketAddress addr, Configuration conf,SocketFactory factory) throws IOException {
    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
    return getProxy(protocol, clientVersion, addr, ugi, conf, factory, 0);
}
//指定协议protocol,地址addr,SocketFactory factory,超时rpcTimeout
public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
    long clientVersion, InetSocketAddress addr, Configuration conf,
    SocketFactory factory, int rpcTimeout) throws IOException {
    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
    return getProxy(protocol, clientVersion, addr, ugi, conf, factory, rpcTimeout);
}
//指定协议protocol,地址addr,用户ticket,SocketFactory factory,超时rpcTimeout
public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
    long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
    Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException {
    return getProxy(protocol, clientVersion, addr, ticket, conf, factory,rpcTimeout, null, true);
}
可见,最终会使用最多参数的getProxy创建代理对象:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
      long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
      Configuration conf, SocketFactory factory, int rpcTimeout,
      RetryPolicy connectionRetryPolicy, boolean checkVersion) throws IOException {
    if (UserGroupInformation.isSecurityEnabled()) {
      SaslRpcServer.init(conf);
    }
    //由protocol,addr,ticket,factory,rpcTimeout等信息创建调用处理器对象Invoker,其中connectionRetryPolicy缺省为重试10次,每次间隔1s
    final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy);
    //创建动态代理,代理的调用处理器为上面创建的invoker
    VersionedProtocol proxy = (VersionedProtocol)Proxy.newProxyInstance( protocol.getClassLoader(), new Class[]{protocol}, invoker);
    
    if (checkVersion) {//默认情况下需要检查客户端和服务器协议的版本
      checkVersion(protocol, clientVersion, proxy);
    }
    return proxy;
}
因此,创建代理主要就是通过传入的参数创建对应的调用处理器对象Invoker,然后通过Java动态代理创建动态代理,创建代理后,默认情况下需要检查客户端和服务器协议的版本号,不匹配的话抛出VersionMismatch异常。因此,创建的代理核心信息保存在调用处理器invoker中。
看一下Invoker的构造:1
2
3
4
5
6private Invoker(Class<? extends VersionedProtocol> protocol,
    InetSocketAddress address, UserGroupInformation ticket, Configuration conf,
    SocketFactory factory,int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException {
    this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf);
    this.client = CLIENTS.getClient(conf, factory);
}
因此,就是通过传入的用户,服务器地址,协议,超时时间等信息构建Invoker负责处理连接的标识ConnectionId,即确定了该Invoker负责处理的连接。
而用SocketFactory在缓存中查找所属客户端,若存在现有使用该factory的客户端直接使用,只是增加其引用计数refCount,否则构建一个Client对象,valueClass初始化为ObjectWritable。
这些成员具体含义另见[概述]。1
2
3
4
5
6
7
8
9
10
11private synchronized Client getClient(Configuration conf,
    SocketFactory factory) {
    Client client = clients.get(factory);
    if (client == null) {//不存在,构建Client对象,valueClass为ObjectClass,并放入缓存
        client = new Client(ObjectWritable.class, conf, factory);
        clients.put(factory, client);
    } else {
        client.incCount();//存在的话,client新增了一个Connection,增加引用计数
    }
    return client;
}
这样,客户端的代理构建完成了,其实主要是构建了代理关联的调用处理器中的connectionId(对应一个Connection),以及client(将代理处理的连接注册到相应客户端)。  
服务器创建
创建服务器对象,通过RPC.getServer完成,至少指定服务器绑定的地址和端口,以及实现协议的实例对象,看方法重载:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17//给定实现协议的实例对象instance,要绑定的地址bindAddress和端口port
public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf) throws IOException {
    return getServer(instance, bindAddress, port, 1, false, conf);
}
//给定instance,bindAddress,port的基础上,指定处理器线程数目(缺省1)
public static Server getServer(final Object instance, final String bindAddress, final int port,
                             final int numHandlers, final boolean verbose, Configuration conf) throws IOException {
    return getServer(instance, bindAddress, port, numHandlers, verbose, conf, null);
}
//上有基础上,再指定安全管理器secretManager(缺省为null)
public static Server getServer(final Object instance, final String bindAddress, final int port,
                             final int numHandlers, final boolean verbose, Configuration conf,
                             SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
    return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
}
可见,最终通过RPC.Server的构造函数构建服务器对象。1
2
3
4
5
6public Server(Object instance, Configuration conf, String bindAddress,  int port, int numHandlers,
                boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
  super(bindAddress, port, Invocation.class, numHandlers, conf,classNameBase(instance.getClass().getName()), secretManager);
  this.instance = instance;
  this.verbose = verbose;
}
调用父类org.apache.hadoop.ipc.Server的构造函数,然后初始化实现协议的实例对象instance。父类构造函数如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37protected Server(String bindAddress, int port, Class<? extends Writable> paramClass, int handlerCount, 
                  Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager) 
    throws IOException {
    this.bindAddress = bindAddress;//绑定地址
    this.conf = conf;
    this.port = port;//绑定端口
    this.paramClass = paramClass;//客户端传过来的包含调用方法参数等信息的参数类,为Invocation
    this.handlerCount = handlerCount;//处理器线程数目,默认1
    this.socketSendBufferSize = 0;
    //待处理Call队列callQueue中最大的待处理Call数,默认100
    this.maxQueueSize = handlerCount * conf.getInt(IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
    //响应缓冲最大大小,1MB
    this.maxRespSize = conf.getInt(IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY, IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
    //监听器listener中Reader线程数目,默认1
    this.readThreads = conf.getInt(IPC_SERVER_RPC_READ_THREADS_KEY,IPC_SERVER_RPC_READ_THREADS_DEFAULT);
    this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize);//构造callQueue 
    this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
    this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);//清理空闲连接时一次最多清理的个数
    //服务器端连接超过此值应该执行清理(连接数过多,一个连接为一个线程,资源消耗大)
    this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
    this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
    this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);//是否鉴权
    this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
    
    // Start the listener here and let it bind to the port
    listener = new Listener();//构建监听器
    this.port = listener.getAddress().getPort();    
    this.rpcMetrics = RpcInstrumentation.create(serverName, this.port);
    this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);//是否开启Nagel算法,默认false开启
    // Create the responder here
    responder = new Responder();//响应器
    
    if (isSecurityEnabled) {
      SaslRpcServer.init(conf);
    }
  }
初始化相应成员后,构建监听器listener和响应器responder,而对应的多个处理器在start方法中创建。  
构建监听器时,会打开一个选择器,然后将ServerSocketChannel注册到选择器上,注册Accept操作,由该选择器负责处理到来的连接请求。
同时创建readThreads个Reader线程并启动,每一个Reader线程中都有一个Selector。
由listener中的Selector处理Accept的连接请求,会创建SocketChannel对象,然后从所有的Reader线程中选择下一个Reader线程,将accept的SocketChannel注册在选择的Reader线程的Selector中,注册读操作,这样便由该Reader线程负责该连接到来的数据请求。  
监听器构造
| 1 | public Listener() throws IOException { | 
响应器构建
响应器的创建比较简单,打开用于为写操作注册的Selector1
2
3
4
5
6Responder() throws IOException {
    this.setName("IPC Server Responder");
    this.setDaemon(true);
    writeSelector = Selector.open(); // create a selector
    pending = 0;
}
服务器启动
服务器启动在Server的start方法中,一般通过RPC.getServer创建服务器后,调用服务器的start方法启动服务器1
2
3
4
5
6
7
8
9
10public synchronized void start() {
    responder.start();//启动响应器
    listener.start();//启动监听器
    handlers = new Handler[handlerCount];
    
    for (int i = 0; i < handlerCount; i++) {
      handlers[i] = new Handler(i);//创建处理器
      handlers[i].start();//启动处理器
    }
}
分别启动响应器,监听器,创建对应数量的处理器并启动。处理器没有相应的成员属性,只是简单的设置了处理器线程名字以及将其设为后台线程1
2
3
4public Handler(int instanceNumber) {
    this.setDaemon(true);
    this.setName("IPC Server handler "+ instanceNumber + " on " + port);
}
代理方法调用
通过getProxy创建代理对象后,如果在代理对象上调用协议的方法,则像Java动态代理一样,会将该调用转发给其调用处理器Invoker的invoke方法:1
2
3
4
5
6public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
      ...//日志相关
      ObjectWritable value = (ObjectWritable)client.call(new Invocation(method, args), remoteId);
      ...//日志相关
      return value.get();
}
如上,将调用的方法和参数封装成Invocation对象,该对象正是服务器的paramClass即接受的参数类。对应的在客户端会将包含方法和参数信息的Invocation序列化发送到服务器,而服务器会创建Invocation对象,反序列化到来的数据,便接收到调用请求。
封装成Invocation对象后,通过Invoker所属的Client,调用call方法执行远程过程调用,该调用对应的连接标识为Invoker所属的remoteId。
调用的返回值类型为ObjectWritable,在Client构建时可见初始化valueClass为ObjectWritable。因为对应在服务器端发送给客户端的相应是通过ObjectWritable序列化的,因此客户端这边有响应可接收时,通过构造ObjectWritable对象反序列化即为返回值数据。  
Client的call方法中,继续将方法和参数信息对象Invocation封装成一个Call对象,对应客户端的一次远程调用(另见[概述])。然后获取到服务器端的连接,若连接还未建立,则需建立到服务器端的连接,发送rpcHeader和ConnectionHeader,否则直接从缓冲区中获取存在的连接。获取连接后将Invocation对象信息反序列化到服务器端,休眠等待。等待的过程中由所属的Connection线程读取服务器到来的响应,读取到响应(成功或失败)时,唤醒该等待线程,完成远程过程调用。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException {
    Call call = new Call(param);//封装成Call对象,对应为客户端的一个远程过程调用
    Connection connection = getConnection(remoteId, call);//获取连接,若缓冲区中没有则需创建并发送rpcHeader和ConnectionHeader到服务器
    connection.sendParam(call);                 // send the parameter,发送调用的方法和参数信息
    boolean interrupted = false;
    synchronized (call) {
      while (!call.done) {
        try {
          call.wait();                           // wait for the result,休眠等待调用完成,由Connection线程读取响应并唤醒该等待的线程
        } catch (InterruptedException ie) {
          // save the fact that we were interrupted
          interrupted = true;
        }
      }
    
      if (interrupted) {
        // set the interrupt flag now that we are done waiting
        Thread.currentThread().interrupt();
      }
    
      if (call.error != null) {//远程调用中出现异常,直接抛出给上层,对应为协议方法捕获,就像本地调用一样
        if (call.error instanceof RemoteException) {
          call.error.fillInStackTrace();
          throw call.error;
        } else { // local exception
          // use the connection because it will reflect an ip change, unlike
          // the remoteId
          throw wrapException(connection.getRemoteAddress(), call.error);
        }
      } else {//正常返回,获取值
        return call.value;
      }
    }
}
建立连接
客户端
如上,客户端建立连接对应在getConnection方法中获取一个连接,若连接不在客户端维护的缓冲区connections中,则需创建连接1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22private Connection getConnection(ConnectionId remoteId, Call call) throws IOException, InterruptedException {
    if (!running.get()) {
      throw new IOException("The client is stopped");
    }
    Connection connection;
    do {
      synchronized (connections) {
        connection = connections.get(remoteId);
        if (connection == null) {//connections中不存在
          connection = new Connection(remoteId);//创建新的连接对象
          connections.put(remoteId, connection);//放入缓存中
        }
      }
    } while (!connection.addCall(call));//获取的连接中添加Call对象
    
    //we don't invoke the method below inside "synchronized (connections)"
    //block above. The reason for that is if the server happens to be slow,
    //it will take longer to establish a connection and that will slow the
    //entire system down.
    connection.setupIOstreams();//如果还没建立到服务器端的连接,需连接到服务器,并发送rpcHeader和ConnectionHeader
    return connection;
}
如上,不存在连接,通过Connection构造方法构造新的连接对象,然后addCall添加到有效的连接中,最终通过setupIOstreams建立连接并发送头部数据,开启连接线程,接收响应。  
Connection构造
| 1 | public Connection(ConnectionId remoteId) throws IOException { | 
addCall
获取connection后添加Call对象至connection中:1
2
3
4
5
6
7private synchronized boolean addCall(Call call) {
    if (shouldCloseConnection.get())//连接应该关闭,不能添加至该连接
        return false;
    calls.put(call.id, call);
    notify();//唤醒在run方法中因为calls为空而在休眠等待的connection线程
    return true;
}
如上,addCall可能因为对应的connection要关闭了,而不能添加成功,添加失败时另外查找或创建新的连接,因此需要在do..while循环中,保证循环结束时Call已经添加到一个可用的正常的连接中了。
setupIOstreams
由上,获取正常连接后,通过setupIOstreams连接到服务器,并发送rpcHeader和ConnectionHeader,当然,如果已经连接了,便不需要再连接了。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37private synchronized void setupIOstreams() throws InterruptedException {
  if (socket != null || shouldCloseConnection.get()) {
    return;
  }//socket已经存在,表示已经建立了到服务器端的连接,并发送了rpcHeader和ConnectionHeader,直接返回
 
  try {
    ...
    Random rand = null;
    while (true) {
      setupConnection();//连接到远程服务器,每次connect最多20s,最多尝试45次,每次重试间隔1s,即如果连接不上,要等待(20+1)s*45=15min 45s才放弃
      InputStream inStream = NetUtils.getInputStream(socket);//socket输入流,读取响应
      OutputStream outStream = NetUtils.getOutputStream(socket);//socket输出流,发送数据
      writeRpcHeader(outStream);//建立了连接,向服务器写rpcHeader数据,包括4个字节魔数(hrpc),1个字节版本号(4),1个字节鉴权方法码
      ...//鉴权相关
      
      //socket输入流包裹了PingInputStream,在读操作超时时,如果没有设置rpcTimeout,则周期性的(默认1min)发送ping数据到服务器,保持连接,
      //而如果设置了rpcTimeout,则会抛出异常,关闭连接,具体见[概述]PingInputStream
      this.in = new DataInputStream(new BufferedInputStream(new PingInputStream(inStream)));
      this.out = new DataOutputStream(new BufferedOutputStream(outStream));
      writeHeader();//向服务器写连接头,包括协议,用户信息,鉴权方法
      // update last activity time
      touch();//更新本连接上次活动时间lastActivity
      // start the receiver thread after the socket connection has been set up
      start();//开启本线程
      return;
    }
  } catch (Throwable t) {
    if (t instanceof IOException) {//连接服务器异常,或者写rpcHeader和连接头异常
      markClosed((IOException)t);//设置shouldCloseConnection为true,然后唤醒阻塞在该连接上的线程(此时应该还没有)
    } else {
      markClosed(new IOException("Couldn't set up IO streams", t));
    }
    close();//关闭连接,从connections中注销,清理资源
  }
}
如上,由setupConnection连接到服务器,然后writeRpcHeader写rpc头,writeHeader写连接头,创建输入输出流之后开启本线程,接收响应。  
连接的输入流包裹了PingInputStream,其read方法如下:1
2
3
4
5
6
7
8
9public int read() throws IOException {
    do {
      try {
        return super.read();
      } catch (SocketTimeoutException e) {
        handleTimeout(e);
      }
    } while (true);
}
read超时时,由handleTimeout处理超时,而超时时间如果在getProxy中设置rpcTimeout了大于0,则为rpcTimeout,此时pingInterval也为rpcTimeout,否则超时时间为pingInterval(ipc.ping.interval,默认1min),超时时间设置见后面setupConnection分析。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17private void handleTimeout(SocketTimeoutException e) throws IOException {
    if (shouldCloseConnection.get() || !running.get() || rpcTimeout > 0) {//如果rpcTimeout大于0,直接抛出异常
      throw e;
    } else {//不应该关闭当前连接且rpcTimeout为0,则周期性发送ping到服务器端
      sendPing();
    }
}
private synchronized void sendPing() throws IOException {
  long curTime = System.currentTimeMillis();
  if ( curTime - lastActivity.get() >= pingInterval) {//应该发送ping
    lastActivity.set(curTime);
    synchronized (out) {
      out.writeInt(PING_CALL_ID);
      out.flush();
    }
  }
}
如上连接超时时,本端连接不应该关闭,如果rpcTimeout大于0,则直接抛出异常,否则发送ping包到服务器,而如果服务器端连接已经关闭,则发送ping时也会抛出异常,异常传递到read方法。因此超时时如果rpcTimeout大于0,或者对端关闭了连接,异常都会传递到read方法。
后面分析客户端Connection线程接收响应的时,在read*操作上如果抛出异常,则会关闭客户端连接。
setupConnection
| 1 | private synchronized void setupConnection() throws IOException { | 
如上通过SocketFactory(所属Client对应的)创建Socket,然后连接到服务器,每次连接最多阻塞20s,如果连接超时,则最多重试45次,每次重试间隔1s,因此如果经过(20+1)s*45=15分钟45s仍然连接超时,则抛出异常。而如果不是连接超时造成的异常,则通过Connection的connectionRetryPolicy进行重试管理,默认重试10次,每次间隔1s。  
writeRpcHeader
| 1 | private void writeRpcHeader(OutputStream outStream) throws IOException { | 
可见,rpcHeader先写4个字节的魔数hrpc,然后是1个字节的版本号(4),再接着是1个字节鉴权方法码(SIMPLE 80,KERBEROS 81,DIGEST 82),共计6个字节  
writeHeader
| 1 | private void writeHeader() throws IOException { | 
可见,连接头ConnectionHeader,会先写序列化后所占长度4个字节,然后是header的实际数据,序列化中至少会写出协议,然后根据用户和鉴权方法写相应的值,具体键ConnectionHeader的序列化方法。
服务器
由上,建立连接时,客户端最终通过socket connect服务器,连接后,发送rpcHeader和ConnectionHeader,来看看对应服务器怎么处理。
服务器由RPC.getServer创建服务器后,调用start方法启动listener,handler,responder多个线程。  
处理客户端到来连接请求以及数据请求由listener负责。
由上服务器的创建getServer过程,创建listener对象时,相应的创建了readThreads个Reader线程,每个Reader有一个Selector。
同时,listener的ServerSocketChannel成员acceptChannel注册在Selector成员selector上,且注册为Accept操作。
因此成员selector负责到来的连接请求,检测到有连接请求到来时,由acceptChannel执行accept获取SocketChannel得到连接。然后选择一个Reader,将获得的SocketChannel注册到Reader的选择器上,注册Read操作。  
因此,listener的成员selector负责到来的连接请求,而readers负责连接上的数据读取。  
接受连接
客户端通过socket的connect请求连接,服务器端创建listener时acceptChannel注册在selector上,注册Accept操作,然后在listener线程主程序中监听到来的连接请求。
listener线程主程序如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40public void run() {
  LOG.info(getName() + ": starting");
  SERVER.set(Server.this);
  while (running) {
    SelectionKey key = null;
    try {
      selector.select();
      Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
      while (iter.hasNext()) {
        key = iter.next();
        iter.remove();
        try {
          if (key.isValid()) {
            if (key.isAcceptable())
              doAccept(key);//有连接请求,doAccept处理
          }
        } catch (IOException e) {
        }
        key = null;
      }
    }
    ...//连接数过多导致内存不足或者其他异常,关闭当前连接或者清理所有连接,然后休眠1min
  }
  LOG.info("Stopping " + this.getName());
  //退出时资源清理,包括关闭通道,选择器,关闭所有的连接
  synchronized (this) {
    try {
      acceptChannel.close();//关闭通道
      selector.close();//关闭选择器
    } catch (IOException e) { }
    selector= null;
    acceptChannel= null;
    
    // clean up all connections
    while (!connectionList.isEmpty()) {//关闭所有连接
      closeConnection(connectionList.remove(0));
    }
  }
}
因此,客户端有到来的连接时,最终由doAccept处理,传入的key为acceptChannel注册在selector上获取的SelectionKey。  
doAccept
| 1 | void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { | 
从acceptChannel中accept,获得SocketChannel,然后选择一个Reader,注册在Reader的选择器上,注册读操作,并创建该SocketChannel对应的Connection对象,将Connection对象添加到注册所得SelectionKey的附件中。
其中的reader.startAdd,如果该Reader线程阻塞在select操作上,唤醒它,然后将adding至为true,导致线程睡眠1
2
3
4public void startAdd() {
    adding = true;
    readSelector.wakeup();
}
对应的Reader线程主程序:1
2
3
4
5
6
7
8
9
10public void run() {
    synchronized (this) {
      while (running) {
        SelectionKey key = null;
        try {
          readSelector.select();//如果当前线程阻塞在此,readSelector.wakeup将唤醒,直接返回
          while (adding) {//adding置true,等待1s,等待添加完成
            this.wait(1000);
          }
        ...
添加完成后的finally中reader.finishAdd,通知睡眠的Reader线程,并将adding置false1
2
3
4public synchronized void finishAdd() {
    adding = false;
    this.notify();//唤醒在wait中的线程  
}
这样,服务器端便处理了客户端的connect请求,创建SocketChannel对象,注册到Reader线程的选择器中,创建Connection对象,附加到SelectionKey中,然后由Reader操作负责读取带来的数据请求。
处理头数据
由上,客户端连接上服务器后,会先发送rpcHeader和ConnectionHeader给服务器,接下来看看服务器怎么处理。  
如上,服务器对连接上的数据请求由Reader线程处理,Reader线程的主程序:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public void run() {
    LOG.info("Starting SocketReader");
    synchronized (this) {
      while (running) {
        SelectionKey key = null;
        try {
          readSelector.select();//select操作
          while (adding) {//正在添加SocketChannel,休眠,添加完后会被唤醒
            this.wait(1000);
          }              
          Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
          while (iter.hasNext()) {
            key = iter.next();
            iter.remove();
            if (key.isValid()) {
              if (key.isReadable()) {
                doRead(key);//处理key对应的数据请求
              }
            }
            key = null;
          }
        } 
        ...//异常处理,进行日志记录
      }
    }
}
如上,开始部分对应建立连接过程中添加SocketChannel到Reader中的逻辑,上面已经分析过。之后对于到来的数据请求,通过doRead处理,key为SocketChannel注册到选择器上的key,且包含附件Connection。  
doRead
其实doRead操作不仅处理头数据,还处理实际的调用请求数据,这里我们先分析头数据的处理过程。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29void doRead(SelectionKey key) throws InterruptedException {
    int count = 0;
    Connection c = (Connection)key.attachment();//获取附件,连接对象
    if (c == null) {//没有附件,异常,直接返回
        return;  
    }
    c.setLastContact(System.currentTimeMillis());//更新客户端和服务器通信时间lastContact
    
    try {
        count = c.readAndProcess();//读数据并处理
    } catch (InterruptedException ieo) {//中断异常,在Reader的主程序中会进行日志记录
        LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
        throw ieo;
    } catch (Exception e) {//鉴权失败异常导致关闭连接
        LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
        count = -1; //so that the (count < 0) block is executed
    }
    if (count < 0) {
        if (LOG.isDebugEnabled())
          LOG.debug(getName() + ": disconnecting client " + c + ". Number of active connections: "+ numConnections);
        closeConnection(c);
        c = null;
    }
    //正常情况,更新通信时间。这里注意,count大于0时只表明读取到了数据,可能为部分数据还未处理,需要等待下一次读取操作,也可能
    //读取了一个请求的所有数据,并进行了处理。
    else {
        c.setLastContact(System.currentTimeMillis());
    }
}
readAndProcess
readAndProcess根据标识rpcHeaderRead和headerRead两个标识,分别进行不同的处理。
若rpcHeaderRead为false,则应该先读取并处理rpcHeader数据。当rpcHeaderRead为true时,若headerRead为false,则还需读取并处理ConnectionHeader数据。当rpcHeader和headerRead都为true时,则进入到实际调用请求数据的处理过程。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111public int readAndProcess() throws IOException, InterruptedException {
  while (true) {
    /* Read at most one RPC. If the header is not read completely yet
     * then iterate until we read first RPC or until there is no data left.
     */    
    int count = -1;
    if (dataLengthBuffer.remaining() > 0) {//dataLengthBuffer为4个字节
      //读取数据到缓冲区dataLengthBuffer中,尽可能填充完缓冲区剩余部分,不过不保证一定能填充完,未填充完时直接返回,等待下一次读取操作
      count = channelRead(channel, dataLengthBuffer);
      if (count < 0 || dataLengthBuffer.remaining() > 0) //读取错误或者没有填充完缓冲区,返回
        return count;
    }
  
    if (!rpcHeaderRead) {//还未处理rpcHeader
      //Every connection is expected to send the header.
      //2字节缓冲,rpcHeader总共6字节,魔数4字节现在保存在dataLengthBuffer中,rpcHeaderBuffer保存版本号和鉴权方法码
      if (rpcHeaderBuffer == null) {
        rpcHeaderBuffer = ByteBuffer.allocate(2);
      }
      count = channelRead(channel, rpcHeaderBuffer);
      if (count < 0 || rpcHeaderBuffer.remaining() > 0) {//同上,缓冲区读完才会继续处理,否则直接返回读取到的大小
        return count;
      }
      int version = rpcHeaderBuffer.get(0);//第一字节为版本号(4)
      byte[] method = new byte[] {rpcHeaderBuffer.get(1)};//第二个字节为鉴权方法码
      authMethod = AuthMethod.read(new DataInputStream( new ByteArrayInputStream(method)));
      dataLengthBuffer.flip();//          
      if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {//验证魔数hrpc和版本号,正确的话才会继续处理,否则关闭连接
        //Warning is ok since this is not supposed to happen.
        LOG.warn("Incorrect header or version mismatch from " + 
                 hostAddress + ":" + remotePort +
                 " got version " + version + 
                 " expected version " + CURRENT_VERSION);
        return -1;//返回-1时,关闭连接
      }
      dataLengthBuffer.clear();
      if (authMethod == null) {
        throw new IOException("Unable to read authentication method");
      }
      if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
        AccessControlException ae = new AccessControlException("Authorization ("
          + CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION
          + ") is enabled but authentication ("
          + CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION
          + ") is configured as simple. Please configure another method "
          + "like kerberos or digest.");
        //鉴权失败,发送响应,对应的Call id为-1,状态为致命错误,发送异常类和异常信息。客户端读取到该响应时关闭连接
        setupResponse(authFailedResponse, authFailedCall, Status.FATAL,null, ae.getClass().getName(), ae.getMessage());
        responder.doRespond(authFailedCall);//添加到响应队列中,并相应处理
        throw ae;
      }
      if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
        doSaslReply(SaslStatus.SUCCESS, new IntWritable(
            SaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null);
        authMethod = AuthMethod.SIMPLE;
        // client has already sent the initial Sasl message and we
        // should ignore it. Both client and server should fall back
        // to simple auth from now on.
        skipInitialSaslHandshake = true;
      }
      if (authMethod != AuthMethod.SIMPLE) {
        useSasl = true;
      }
      rpcHeaderBuffer = null;
      rpcHeaderRead = true;//正常情况读完rpcHeaderRead并验证
      continue;
    }
    
    if (data == null) {
      dataLengthBuffer.flip();
      dataLength = dataLengthBuffer.getInt();//处理完了rpcHeaderRead,则dataLengthBuffer中保存的为接下来的数据长度或ping数据(-1)
   
      if (dataLength == Client.PING_CALL_ID) {//客户端发送的ping数据-1
        if(!useWrap) { //covers the !useSasl too
          dataLengthBuffer.clear();
          return 0;  //ping message,返回0时,会更新连接时间lastContact
        }
      }
      //这之后的便是数据长度了
      if (dataLength < 0) {
        LOG.warn("Unexpected data length " + dataLength + "!! from " + 
            getHostAddress());
      }
      data = ByteBuffer.allocate(dataLength);//根据数据长度分配data缓冲区
    }
    
    count = channelRead(channel, data);//从通道尽可能读取足够填充缓冲区剩余空间的数据
    
    if (data.remaining() == 0) {//缓冲区没有剩余空间了,即发送过来的一个帧的数据读取玩,开始处理
      dataLengthBuffer.clear();
      data.flip();
      if (skipInitialSaslHandshake) {
        data = null;
        skipInitialSaslHandshake = false;
        continue;
      }
      boolean isHeaderRead = headerRead;
      if (useSasl) {//简单鉴权,这里不分析
        saslReadAndProcess(data.array());
      } else {//处理一个帧数据
        processOneRpc(data.array());
      }
      data = null;
      if (!isHeaderRead) {
        continue;
      }
    } 
    //这里隐含缓冲区还有剩余空间,即一个帧数据未读取玩,直接返回读取到的字节数,更新lastContact,等待下一次读取
    return count;
  }
}
由上,如果rpcHeaderRead为false时,需要处理rpcHeader,这时根据发送端发送过来的rpcHeader格式:4字节魔数hrpc+1字节版本号+1字节鉴权方法码
这是dataLengthBuffer里面保存的便是魔数hrpc,然后分配两个字节大小的rpcHeaderBuffer保存剩下的版本号和鉴权方法码,进行相应的验证。  
而如果rpcHeader已经处理了,则一般情况下dataLengthBuffer里面保存的为一个帧数据的长度,根据这个长度分配data缓冲区的大小,然后进行相应长度数据的读取。不过在建立连接后,客户端read服务器响应超时时,客户端会发送ping数据,以保持连接,该ping数据作为一帧数据来看的话为int值-1,所以如果dataLengthBuffer值为-1时,则是客户端的ping数据,这时简单的返回0,然后更新连接时间即可。
所有上面提到的缓冲区都要注意的是,channelRead会尽量从通道读取数据填满缓冲区,但也不能保证。当缓冲区还有剩余空间时,我们还需等待下一数据读取操作的到来,这时简单的返回读取到的数据大小,更新连接时间即可。相应缓冲区填满后,才能进行相应的处理。  
channelRead方法会尽量从通道中读取足够多的数据以填满缓冲区,但不保证。当缓冲区剩余空间大于NIO_BUFFER_LIMIT(8KB)时,会循环从通道读取数据到缓冲区,每次读取NIO_BUFFER_LIMIT大小,按照注释是说能够避免JDK在缓冲区增长时分配太多的直接内存缓冲,但是这里应该不会增长,只是读取最大为剩余空间的数据到缓冲区。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30private int channelRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
    int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ? channel.read(buffer) : channelIO(channel, null, buffer);
    if (count > 0) {
      rpcMetrics.incrReceivedBytes(count);
    }
    return count;
}
//channelRead和channelWrite同时使用,读时writeCh为null,写时readCh为null。
private static int channelIO(ReadableByteChannel readCh, WritableByteChannel writeCh, ByteBuffer buf) throws IOException {
    int originalLimit = buf.limit();
    int initialRemaining = buf.remaining();
    int ret = 0;
    
    while (buf.remaining() > 0) {
      try {
        int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);//一次读取到缓冲区的数据大小,最多8KB
        buf.limit(buf.position() + ioSize);//设置新的limit值为position+要读取的大小
        
        ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf); 
        
        if (ret < ioSize) {
          break;
        }
      } finally {
        buf.limit(originalLimit);//设为原来的limit值        
      }
    }
    int nBytes = initialRemaining - buf.remaining(); 
    return (nBytes > 0) ? nBytes : ret;
}
处理完rpcHeader和ping包后,由processOneRpc处理到来的数据帧,一帧数据已经保存在data缓冲区中了。1
2
3
4
5
6
7
8
9
10
11
12
13private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
  if (headerRead) {
    processData(buf);
  } else {
    processHeader(buf);
    headerRead = true;
    if (!authorizeConnection()) {
      throw new AccessControlException("Connection from " + this
          + " for protocol " + header.getProtocol()
          + " is unauthorized for user " + user);
    }
  }
}
可见,如果headerRead为false,即还未读取连接头,则通过processHeader处理连接头,并通过authorizeConnection根据鉴权方法以及相关信息进行鉴权。否则若连接头已经处理,则通过processData处理实际调用请求数据。这里在连接过程,先分析连接头的处理,实际的调用请求数据处理见后。  
processHeader
| 1 | private void processHeader(byte[] buf) throws IOException { | 
连接的鉴权方法authMethod在处理rpcHeader时已经初始化了,这里反序列化连接头后,从连接头中获知客户端请求连接的协议,然后初始化该连接的protocol,以及根据连接头的用户信息,初始化该连接的user成员。  
初始化相关成员后,使用authorizedConnection对客户端进行鉴权1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24private boolean authorizeConnection() throws IOException {
  try {
    // If auth method is DIGEST, the token was obtained by the
    // real user for the effective user, therefore not required to
    // authorize real user. doAs is allowed only for simple or kerberos
    // authentication
    if (user != null && user.getRealUser() != null
        && (authMethod != AuthMethod.DIGEST)) {
      ProxyUsers.authorize(user, this.getHostAddress(), conf);
    }
    authorize(user, header, getHostInetAddress());
    if (LOG.isDebugEnabled()) {
      LOG.debug("Successfully authorized " + header);
    }
    rpcMetrics.incrAuthorizationSuccesses();
  } catch (AuthorizationException ae) {
    rpcMetrics.incrAuthorizationFailures();
    //鉴权异常失败,发送鉴权异常响应,对应Call为-1,状态FATAL,发送异常类和异常信息
    setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null, ae.getClass().getName(), ae.getMessage());
    responder.doRespond(authFailedCall);
    return false;
  }
  return true;
}
如果鉴权失败,则发送鉴权异常响应,对应状态为FATAL,客户端会关闭连接。而在服务器端,在processOneRpc中抛出AccessControlException这个IOException最终传递到doRead中的readAndProcess中,由catch捕获,将count即读取到的数据大小置为-1,后续处理中关闭服务器端连接。doRead中IOException异常处理见上。  
因此,我们这里分析了服务器端连接建立的过程,由监听器listener监听到来的连接,并注册到Reader线程的选择器中,由Reader线程处理到来的数据请求。
客户端数据到来时,与客户端发送数据流程对应的,服务器端先处理rpcHeader,判断魔数和版本号是否相符,不相符的话给客户端发送鉴权失败异常响应,该响应对应的Call对象固定,其id为-1,状态为FATAL,客户端将会关闭连接,而服务器置读取到的数据count为-1,也关闭连接。正常的话初始化连接的authMethod成员。这里注意魔数hrpc长度为4个字节,刚好保存在后续用来保存一个帧数据长度的缓冲区dataLengthBuffer中。  
处理完rpcHeader之后,便是对一帧数据的处理。一帧数据可能为ping数据,为4个字节值为-1,这时简单的置读取到数据count为0,更新连接时间lastContact。
否则dataLengthBuffer便是一帧数据的长度,根据该长度分配data缓冲区。
此时一帧数据可能为连接头,可能为实际的调用请求数据。没有读取连接头的话,则接下来的一帧数据为连接头。
对连接头进行处理,读取发送过来的连接头初始化该连接的protocol和user,然后根据不同的鉴权方式进行鉴权,鉴权失败,发送Call id为-1的响应,相应状态为FATAL,客户端关闭连接,而这里服务器抛出IOException,在doRead的异常处理中置count为-1,关闭服务器端连接。  
处理完这些信息后,连接建立了,鉴权完成了,可以在该连接上进行正常的调用请求了。具体过程见下篇。