【Tomcat】四.Tomcat处理请求(上)

作者: Weidan 分类: Tomcat 发布时间: 2020-04-19

一.处理流程

1.1 系统IO多路复用

OK,要知道 Tomcat 请求的流程,首先我们得先知道 I/O复用模型,同等级的模型还有 阻塞式I/O非阻塞式I/OI/O复用(select/poll/epoll)信号驱动式I/O异步I/O,目前的 Tomcat8.5 版本,使用的就是 I/O复用模型,也叫 多路复用 模型。

I/O复用模型

从左往右走,首先用户程序会被 select函数 阻塞,当系统有数据可读的时候,就会直接返回表示 我有数据啦,然后程序再手动调用 recvfrom 来获取缓冲区的数据,内核 则会将数据拷贝到 缓冲区 以供 用户态 的程序处理,拷贝整个过程需要等待完成。每个链接进来都会现在内核转换成一个 channel通道,这里的 多路 指的就是 多个channel,而 复用 则指的是系统只需要几个线程来处理这些连接就可以了。

相对于一个连接就是创建一个线程来处理,这个模型的好处就是只需要很少量的线程即可完成 channel 的处理。

1.2 Tomcat再多路复用

这就有点意思了,接收请求是异步的,处理请求也是异步的。

前面我们说了,NioEndpointacceptors pollers 以及 workers 这些线程:

  1. acceptor 负责监听 select函数,将 socket 均衡注册到某个 Poller 线程中;

  2. Poller 负责捞出 socket,并且交给线程池进行 ProtocolHandler进行处理

  3. NioSocketWrapper 负责写出去响应消息。

1.3 NioEndpoint#Acceptor接收请求

OK先来看看 Acceptor 怎么接收请求:

public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {

  protected class Acceptor extends AbstractEndpoint.Acceptor {

    @Override
    public void run() {
      int errorDelay = 0;
      // 无限循环!接收一条请求通道后进行注册
      while (running) {
        // 判断NioEndpoint的状态 也就是暂停状态了
        while (paused && running) {
          state = AcceptorState.PAUSED;
          try {
            Thread.sleep(50);
          } catch (InterruptedException e) {
          }
        }

        if (!running) {
          break;
        }
        state = AcceptorState.RUNNING;

        try {
          // 先自增1个通道,限流,超出限量10000个链接的时候,被阻塞在这里
          countUpOrAwaitConnection();

          SocketChannel socket = null;
          try {
            // 常年阻塞在这里
            socket = serverSock.accept();
          } catch (IOException ioe) {
            // 发生错误的时候,要把刚刚新增的限流数减下来
            countDownConnection();
            if (running) {
              errorDelay = handleExceptionWithDelay(errorDelay);
              throw ioe;
            } else {
              break;
            }
          }
          // 错误置零
          errorDelay = 0;

          if (running && !paused) {
            // 这句话是重点,将Socket注册到事件通道中,然后由Poller取出来处理
            // 来到这里的时候,整个接受请求流程也就结束了
            if (!setSocketOptions(socket)) {
              closeSocket(socket);
            }
          } else {
            closeSocket(socket);
          }
        } catch (Throwable t) {
          ExceptionUtils.handleThrowable(t);
          log.error(sm.getString("endpoint.accept.fail"), t);
        }
      }
      state = AcceptorState.ENDED;
    }
  }

  protected boolean setSocketOptions(SocketChannel socket) {
    try {
      // 关闭同步呀,设置一些属性
      socket.configureBlocking(false);
      Socket sock = socket.socket();
      socketProperties.setProperties(sock);

      NioChannel channel = nioChannels.pop();
      if (channel == null) {
        // 将请求置入缓存
        SocketBufferHandler bufhandler = new SocketBufferHandler(
          socketProperties.getAppReadBufSize(),
          socketProperties.getAppWriteBufSize(),
          socketProperties.getDirectBuffer());
        if (isSSLEnabled()) {
          channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
        } else {
          channel = new NioChannel(socket, bufhandler);
        }
      } else {
        channel.setIOChannel(socket);
        channel.reset();
      }
      // 均衡的取出Poller线程(之前说过是跟CPU一样的长度,最小是2),然后把这个通道注入进去!
      // 由对应的 Poller 线程处理请求。
      getPoller0().register(channel);
    } catch (Throwable t) {
      ExceptionUtils.handleThrowable(t);
      try {
        log.error("",t);
      } catch (Throwable tt) {
        ExceptionUtils.handleThrowable(tt);
      }
      // Tell to close the socket
      return false;
    }
    return true;
  }

}

1.4 NioEndpoint#Poller接收请求

public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {

  public class Poller implements Runnable {
    @Override
    public void run() {
      // Poller循环获取事件
      while (true) {

        boolean hasEvents = false;

        try {
          if (!close) {
            hasEvents = events();
            if (wakeupCounter.getAndSet(-1) > 0) {
              // 如果队列中存在任务,则立马返回已经准备完成的channel
              keyCount = selector.selectNow();
            } else {
              // 有个超时时间来返回准备完成的channel
              keyCount = selector.select(selectorTimeout);
            }
            wakeupCounter.set(0);
          }
          if (close) {
            events();
            timeout(0, false);
            try {
              selector.close();
            } catch (IOException ioe) {
              log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
            }
            break;
          }
        } catch (Throwable x) {
          ExceptionUtils.handleThrowable(x);
          log.error("",x);
          continue;
        }
        //either we timed out or we woke up, process events first
        if ( keyCount == 0 ) hasEvents = (hasEvents | events());

        // 拿到已经准备就绪的channel的Key值
        Iterator<SelectionKey> iterator =
          keyCount > 0 ? selector.selectedKeys().iterator() : null;
        // 遍历所有已经准备就绪的Channel,对请求数据进行处理
        while (iterator != null && iterator.hasNext()) {
          SelectionKey sk = iterator.next();
          NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
          // Attachment may be null if another thread has called
          // cancelledKey()
          if (attachment == null) {
            iterator.remove();
          } else {
            iterator.remove();
            // 在这里进入,交给ProtocolHandler来处理请求
            processKey(sk, attachment);
          }
        }//while

        // 清理超时的key
        timeout(keyCount,hasEvents);
      }//while

      getStopLatch().countDown();
    }

    protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
      try {
        if ( close ) {
          cancelledKey(sk);
        } else if ( sk.isValid() && attachment != null ) {
          // 可读或者可写,即进入处理
          if (sk.isReadable() || sk.isWritable() ) {
            if ( attachment.getSendfileData() != null ) {
              // 处理上传文件的请求
              processSendfile(sk,attachment, false);
            } else {
              unreg(sk, attachment, sk.readyOps());
              boolean closeSocket = false;
              // 先处理可读
              if (sk.isReadable()) {
                // 调用外部类的方法来读取请求
                if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                  closeSocket = true;
                }
              }
              // 再处理可写
              if (!closeSocket && sk.isWritable()) {
                if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                  closeSocket = true;
                }
              }
              if (closeSocket) {
                cancelledKey(sk);
              }
            }
          }
        } else {
          //invalid key
          cancelledKey(sk);
        }
      } catch ( CancelledKeyException ckx ) {
        cancelledKey(sk);
      } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        log.error("",t);
      }
    }

  }

  // 这个方法放置于AbstractEndpoint中,为了方便阅读就丢这个类里面了
  public boolean processSocket(SocketWrapperBase<S> socketWrapper,
                               SocketEvent event, boolean dispatch) {
    try {
      if (socketWrapper == null) {
        return false;
      }
      // 从缓存队列中取出NioEndpoint$NioSocketWrapper缓存,复用这些Socket处理器
      SocketProcessorBase<S> sc = processorCache.pop();
      if (sc == null) {
        sc = createSocketProcessor(socketWrapper, event);
      } else {
        sc.reset(socketWrapper, event);
      }
      Executor executor = getExecutor();
      // 这时候请求就交给线程池异步去解析+调用我们的servlet,并且将数据写出去
      if (dispatch && executor != null) {
        executor.execute(sc);
      } else {
        sc.run();
      }
    } catch (RejectedExecutionException ree) {
      getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
      return false;
    } catch (Throwable t) {
      ExceptionUtils.handleThrowable(t);
      // This means we got an OOM or similar creating a thread, or that
      // the pool and its queue are full
      getLog().error(sm.getString("endpoint.process.fail"), t);
      return false;
    }
    return true;
  }

}

然后我们来完善一下那张图:

二. 处理Request

上面接收的请求的看的我难受,接下来就是处理请求了。涉及到怎么解析 Socket 数据,以及写出去,先看看调用之前 Tomcat 做了什么,才能让我们的 servlet 顺利完成请求。

由于我们上面已经将请求转移给 executor 执行了,而这个线程类则是 NioEndpoint$SocketProcessor,他的 run 方法在 SocketProcessorBase 里面,其实这个是一个模板模式,因为 run 又调用了子类需要实现的 doRun 方法。

public abstract class SocketProcessorBase<S> implements Runnable {
    // 一个Socket包装类.
  protected SocketWrapperBase<S> socketWrapper;

  @Override
    public final void run() {
        // 读写可能都进来这里所以需要包装只有一个线程可以进来
        // 什么时候发生的后面看看能不能看到
        synchronized (socketWrapper) {
            // 需要关心Socket是否已经被关闭了,被关闭了就不做任何事情
            if (socketWrapper.isClosed()) {
                return;
            }
            doRun();
        }
    }

}

继续跟进子类的 doRun() 方法:

public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {

  protected class SocketProcessor extends SocketProcessorBase<NioChannel> {

    public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
      super(socketWrapper, event);
    }

    @Override
    protected void doRun() {
      NioChannel socket = socketWrapper.getSocket();
      SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());

      try {
        // HTTPS握手需要用到的变量
        int handshake = -1;

        try {
          if (key != null) {
            if (socket.isHandshakeComplete()) {
              // 没有握手
              handshake = 0;
            } else ...
              // 删除部分代码
          }
        } catch (IOException x) {

        } catch (CancelledKeyException ckx) {
        }
        if (handshake == 0) {
          SocketState state = SocketState.OPEN;
          // 开始处理request的请求byte数组
          if (event == null) {
            state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
          } else {
            // 目前状态是读取数据:SocketEvent.OPEN_READ,先获取ConnectionHandler然后处理,返回SocketState.OPEN状态
            // 返回了ConnectionHandler,接下来进去处理方法
            state = getHandler().process(socketWrapper, event);
          }
          if (state == SocketState.CLOSED) {
            close(socket, key);
          }
        } else if (handshake == -1 ) {
                    // ....
        } else if (handshake == SelectionKey.OP_READ){
                    // ....
        } else if (handshake == SelectionKey.OP_WRITE){
                    // ....
        }
      } catch (CancelledKeyException cx) {
        socket.getPoller().cancelledKey(key);
      } catch (VirtualMachineError vme) {
        ExceptionUtils.handleThrowable(vme);
      } catch (Throwable t) {
        log.error("", t);
        socket.getPoller().cancelledKey(key);
      } finally {
        socketWrapper = null;
        event = null;
        // 缓存Processor,以便下一个请求可以直接使用
        if (running && !paused) {
          processorCache.push(this);
        }
      }
    }
  }

}

ConnectionHandlerprocess 方法会根据当前 Socket 的状态进行不同的处理:

public abstract class AbstractProtocol<S> implements ProtocolHandler, MBeanRegistration {

  protected static class ConnectionHandler<S> implements AbstractEndpoint.Handler<S> {

    @Override
    public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
      if (getLog().isDebugEnabled()) {
        getLog().debug(sm.getString("abstractConnectionHandler.process",
                                    wrapper.getSocket(), status));
      }
      if (wrapper == null) {
        return SocketState.CLOSED;
      }

      S socket = wrapper.getSocket();

      Processor processor = connections.get(socket);
      if (getLog().isDebugEnabled()) {
        getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",
                                    processor, socket));
      }
      // 确认TIMEOUT时间
      if (SocketEvent.TIMEOUT == status &&
          (processor == null ||
           !processor.isAsync() && !processor.isUpgrade() ||
           processor.isAsync() && !processor.checkAsyncTimeoutGeneration())) {
        // This is effectively a NO-OP
        return SocketState.OPEN;
      }

      if (processor != null) {
        getProtocol().removeWaitingProcessor(processor);
      } else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) {
        // Nothing to do. Endpoint requested a close and there is no
        // longer a processor associated with this socket.
        return SocketState.CLOSED;
      }

      ContainerThreadMarker.set();

      try {
        if (processor == null) {
          String negotiatedProtocol = wrapper.getNegotiatedProtocol();
          // 处理SSL
          if (negotiatedProtocol != null && negotiatedProtocol.length() > 0) {
            //...
          }
        }
        if (processor == null) {
          // 从processors队列尝试取出一个processor来进行处理
          processor = recycledProcessors.pop();
          if (getLog().isDebugEnabled()) {
            getLog().debug(sm.getString("abstractConnectionHandler.processorPop", processor));
          }
        }
        if (processor == null) {
          // 创建并丢进去缓存中以便使用
          processor = getProtocol().createProcessor();
          register(processor);
          if (getLog().isDebugEnabled()) {
            getLog().debug(sm.getString("abstractConnectionHandler.processorCreate", processor));
          }
        }

        // SSL,先不看
        processor.setSslSupport(
          wrapper.getSslSupport(getProtocol().getClientCertProvider()));

        // 将连接的Socket和Processor关联起来
        connections.put(socket, processor);

        SocketState state = SocketState.CLOSED;
        do {
          // 关键代码!!!!!!!!!!!!!!:交给Processor进行处理
          state = processor.process(wrapper, status);

          // 处理连接升级的业务
          if (state == SocketState.UPGRADING) {
            // ....
          }
        } while ( state == SocketState.UPGRADING);

        // 处理连接状态
        if (state == SocketState.LONG) {
          longPoll(wrapper, processor);
          if (processor.isAsync()) {
            getProtocol().addWaitingProcessor(processor);
          }
        } else if (state == SocketState.OPEN) {
          connections.remove(socket);
          release(processor);
          wrapper.registerReadInterest();
        } else if (state == SocketState.SENDFILE) {
          // ...
        } else if (state == SocketState.UPGRADED) {
          if (status != SocketEvent.OPEN_WRITE) {
            longPoll(wrapper, processor);
            getProtocol().addWaitingProcessor(processor);
          }
        } else if (state == SocketState.SUSPENDED) {
        } else {
          // 开始回收Processor数据,以便后续使用
          connections.remove(socket);
          if (processor.isUpgrade()) {
            // ....
          }
          release(processor);
        }
        return state;
      } catch(java.net.SocketException e) {
        // 删除大部分异常的处理流程代码
      } finally {
        ContainerThreadMarker.clear();
      }

      // 释放 socket/processor 的连接
      connections.remove(socket);
      release(processor);
      return SocketState.CLOSED;
    }

  }      
}

交给 Processorprocess 方法:

public abstract class AbstractProcessorLight implements Processor {
  // ConnectionHandler
  @Override
  public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
    throws IOException {
    // this: Http11Processor
    SocketState state = SocketState.CLOSED;
    Iterator<DispatchType> dispatches = null;
    do {
      if (dispatches != null) {
        DispatchType nextDispatch = dispatches.next();
        if (getLog().isDebugEnabled()) {
          getLog().debug("Processing dispatch type: [" + nextDispatch + "]");
        }
        state = dispatch(nextDispatch.getSocketStatus());
        if (!dispatches.hasNext()) {
          state = checkForPipelinedData(state, socketWrapper);
        }
      } else if (status == SocketEvent.DISCONNECT) {
        // Do nothing here, just wait for it to get recycled
      } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
        state = dispatch(status);
        state = checkForPipelinedData(state, socketWrapper);
      } else if (status == SocketEvent.OPEN_WRITE) {
        // Extra write event likely after async, ignore
        state = SocketState.LONG;
      } else if (status == SocketEvent.OPEN_READ) {
        // 目前是OPEN_READ状态,处理请求的关键部位,这个方法由子类去实现
        state = service(socketWrapper);
      } else if (status == SocketEvent.CONNECT_FAIL) {
        logAccess(socketWrapper);
      } else {
        // Default to closing the socket if the SocketEvent passed in
        // is not consistent with the current state of the Processor
        state = SocketState.CLOSED;
      }

      if (getLog().isDebugEnabled()) {
        // 删除日志记录代码
      }

      if (state != SocketState.CLOSED && isAsync()) {
        state = asyncPostProcess();
        if (getLog().isDebugEnabled()) {
          getLog().debug("Socket: [" + socketWrapper +
                         "], State after async post processing: [" + state + "]");
        }
      }

      if (dispatches == null || !dispatches.hasNext()) {
        // Only returns non-null iterator if there are
        // dispatches to process.
        dispatches = getIteratorAndClearDispatches();
      }
    } while (state == SocketState.ASYNC_END ||
             dispatches != null && state != SocketState.CLOSED);

    return state;
  }
}

public class Http11Processor extends AbstractProcessor {

  @Override
  public SocketState service(SocketWrapperBase<?> socketWrapper)
    throws IOException {
    // 开始处理请求
    RequestInfo rp = request.getRequestProcessor();
    rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

    // Setting up the I/O
    setSocketWrapper(socketWrapper);

    // Flags
    keepAlive = true;
    openSocket = false;
    readComplete = true;
    boolean keptAlive = false;
    SendfileState sendfileState = SendfileState.DONE;

    while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null &&
           sendfileState == SendfileState.DONE && !endpoint.isPaused()) {

      // 解析HttpHeader
      try {
        if (!inputBuffer.parseRequestLine(keptAlive)) {
          if (inputBuffer.getParsingRequestLinePhase() == -1) {
            return SocketState.UPGRADING;
          } else if (handleIncompleteRequestLineRead()) {
            break;
          }
        }

        if (endpoint.isPaused()) {
          // 如果endpoint暂停了,返回请求错误503
          response.setStatus(503);
          setErrorState(ErrorState.CLOSE_CLEAN, null);
        } else {
          keptAlive = true;
          request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
          // 解析HTTP-Header获取我们访问的访问协议内容,包括Content-Type, Method等等
          if (!inputBuffer.parseHeaders()) {
            openSocket = true;
            readComplete = false;
            break;
          }
          if (!disableUploadTimeout) {
            socketWrapper.setReadTimeout(connectionUploadTimeout);
          }
        }
      } catch (IOException e) {
        if (log.isDebugEnabled()) {
          log.debug(sm.getString("http11processor.header.parse"), e);
        }
        setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
        break;
      } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        UserDataHelper.Mode logMode = userDataHelper.getNextMode();
        if (logMode != null) {
          String message = sm.getString("http11processor.header.parse");
          switch (logMode) {
            case INFO_THEN_DEBUG:
              message += sm.getString("http11processor.fallToDebug");
              //$FALL-THROUGH$
            case INFO:
              log.info(message, t);
              break;
            case DEBUG:
              log.debug(message, t);
          }
        }
        // 400 - Bad Request
        response.setStatus(400);
        setErrorState(ErrorState.CLOSE_CLEAN, t);
      }

      // Has an upgrade been requested?
      if (isConnectionToken(request.getMimeHeaders(), "upgrade")) {
        // 协议升级的一些处理
        String requestedProtocol = request.getHeader("Upgrade");

        UpgradeProtocol upgradeProtocol = protocol.getUpgradeProtocol(requestedProtocol);
        if (upgradeProtocol != null) {
          // 省略升级请求的代码
        }
      }

      if (getErrorState().isIoAllowed()) {
        // Setting up filters, and parse some request headers
        rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
        try {
          // 在这里将字符串的byte数组进行翻译,依据反编译byte到字符串然后比对的形式进行设置
          prepareRequest();
        } catch (Throwable t) {
          ExceptionUtils.handleThrowable(t);
          if (log.isDebugEnabled()) {
            log.debug(sm.getString("http11processor.request.prepare"), t);
          }
          // 500 - Internal Server Error
          response.setStatus(500);
          setErrorState(ErrorState.CLOSE_CLEAN, t);
        }
      }

      if (maxKeepAliveRequests == 1) {
        keepAlive = false;
      } else if (maxKeepAliveRequests > 0 &&
                 socketWrapper.decrementKeepAlive() <= 0) {
        keepAlive = false;
      }

      // Process the request in the adapter
      if (getErrorState().isIoAllowed()) {
        try {
          rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
          // 关键代码!!!!!!!!!:进入下一个环节,找到对应的处理器来处理请求和响应结果
          // response在构造当前Http11Processor的时候就被创建了
          getAdapter().service(request, response);
          if(keepAlive && !getErrorState().isError() && !isAsync() &&
             statusDropsConnection(response.getStatus())) {
            setErrorState(ErrorState.CLOSE_CLEAN, null);
          }
        } catch (InterruptedIOException e) {
          setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
        } catch (HeadersTooLargeException e) {
          // 抛出异常都设置status=500;
        } catch (Throwable t) {
          ExceptionUtils.handleThrowable(t);
          log.error(sm.getString("http11processor.request.process"), t);
          // 500 - Internal Server Error
          response.setStatus(500);
          setErrorState(ErrorState.CLOSE_CLEAN, t);
          getAdapter().log(request, response, 0);
        }
      }

      // 回收Request、Response资源,以便后续可以继续使用当前Processor对象.
      rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
      if (!isAsync()) {
        endRequest();
      }
      rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);

      // 如果有错误,返回500
      if (getErrorState().isError()) {
        response.setStatus(500);
      }

      if (!isAsync() || getErrorState().isError()) {
        request.updateCounters();
        if (getErrorState().isIoAllowed()) {
          inputBuffer.nextRequest();
          outputBuffer.nextRequest();
        }
      }

      if (!disableUploadTimeout) {
        int soTimeout = endpoint.getConnectionTimeout();
        if(soTimeout > 0) {
          socketWrapper.setReadTimeout(soTimeout);
        } else {
          socketWrapper.setReadTimeout(0);
        }
      }

      rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);

      sendfileState = processSendfile(socketWrapper);
    }

    rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);

    if (getErrorState().isError() || endpoint.isPaused()) {
      return SocketState.CLOSED;
    } else if (isAsync()) {
      return SocketState.LONG;
    } else if (isUpgrade()) {
      return SocketState.UPGRADING;
    } else {
      if (sendfileState == SendfileState.PENDING) {
        return SocketState.SENDFILE;
      } else {
        if (openSocket) {
          if (readComplete) {
            // 返回OPEN状态
            return SocketState.OPEN;
          } else {
            return SocketState.LONG;
          }
        } else {
          return SocketState.CLOSED;
        }
      }
    }
  }

}

未完待续