一.处理流程 1.1 系统IO多路复用 OK,要知道 Tomcat
请求的流程,首先我们得先知道 I/O复用模型
,同等级的模型还有 阻塞式I/O
、非阻塞式I/O
、I/O复用(select/poll/epoll)
、信号驱动式I/O
和 异步I/O
,目前的 Tomcat
是 8.5
版本,使用的就是 I/O复用模型
,也叫 多路复用
模型。 I/O复用模型
:
从左往右走,首先用户程序会被 select函数
阻塞,当系统有数据可读的时候,就会直接返回表示 我有数据啦
,然后程序再手动调用 recvfrom
来获取缓冲区的数据,内核
则会将数据拷贝到 缓冲区
以供 用户态
的程序处理,拷贝整个过程需要等待完成。每个链接进来都会现在内核转换成一个 channel通道
,这里的 多路
指的就是 多个channel
,而 复用
则指的是系统只需要几个线程来处理这些连接就可以了。 相对于一个连接就是创建一个线程来处理,这个模型的好处就是只需要很少量的线程即可完成 channel
的处理。
1.2 Tomcat再多路复用 这就有点意思了,接收请求是异步的,处理请求也是异步的。 前面我们说了,NioEndpoint
有 acceptors
pollers
以及 workers
这些线程:
acceptor
负责监听 select函数
,将 socket
均衡注册到某个 Poller
线程中;
Poller
负责捞出 socket
,并且交给线程池进行 ProtocolHandler进行处理
;
NioSocketWrapper
负责写出去响应消息。
1.3 NioEndpoint#Acceptor接收请求 OK先来看看 Acceptor
怎么接收请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 public class NioEndpoint extends AbstractJsseEndpoint <NioChannel> { protected class Acceptor extends AbstractEndpoint .Acceptor { @Override public void run () { int errorDelay = 0 ; while (running) { while (paused && running) { state = AcceptorState.PAUSED; try { Thread.sleep(50 ); } catch (InterruptedException e) { } } if (!running) { break ; } state = AcceptorState.RUNNING; try { countUpOrAwaitConnection(); SocketChannel socket = null ; try { socket = serverSock.accept(); } catch (IOException ioe) { countDownConnection(); if (running) { errorDelay = handleExceptionWithDelay(errorDelay); throw ioe; } else { break ; } } errorDelay = 0 ; if (running && !paused) { if (!setSocketOptions(socket)) { closeSocket(socket); } } else { closeSocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail" ), t); } } state = AcceptorState.ENDED; } } protected boolean setSocketOptions (SocketChannel socket) { try { socket.configureBlocking(false ); Socket sock = socket.socket(); socketProperties.setProperties(sock); NioChannel channel = nioChannels.pop(); if (channel == null ) { SocketBufferHandler bufhandler = new SocketBufferHandler ( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); if (isSSLEnabled()) { channel = new SecureNioChannel (socket, bufhandler, selectorPool, this ); } else { channel = new NioChannel (socket, bufhandler); } } else { channel.setIOChannel(socket); channel.reset(); } getPoller0().register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error("" ,t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } return false ; } return true ; } }
1.4 NioEndpoint#Poller接收请求 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 public class NioEndpoint extends AbstractJsseEndpoint <NioChannel> { public class Poller implements Runnable { @Override public void run () { while (true ) { boolean hasEvents = false ; try { if (!close) { hasEvents = events(); if (wakeupCounter.getAndSet(-1 ) > 0 ) { keyCount = selector.selectNow(); } else { keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0 ); } if (close) { events(); timeout(0 , false ); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString("endpoint.nio.selectorCloseFail" ), ioe); } break ; } } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error("" ,x); continue ; } if ( keyCount == 0 ) hasEvents = (hasEvents events () ); Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null ; while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); if (attachment == null ) { iterator.remove(); } else { iterator.remove(); processKey(sk, attachment); } } timeout(keyCount,hasEvents); } getStopLatch().countDown(); } protected void processKey (SelectionKey sk, NioSocketWrapper attachment) { try { if ( close ) { cancelledKey(sk); } else if ( sk.isValid() && attachment != null ) { if (sk.isReadable() sk.isWritable() ) { if ( attachment.getSendfileData() != null ) { processSendfile(sk,attachment, false ); } else { unreg(sk, attachment, sk.readyOps()); boolean closeSocket = false ; if (sk.isReadable()) { if (!processSocket(attachment, SocketEvent.OPEN_READ, true )) { closeSocket = true ; } } if (!closeSocket && sk.isWritable()) { if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true )) { closeSocket = true ; } } if (closeSocket) { cancelledKey(sk); } } } } else { cancelledKey(sk); } } catch ( CancelledKeyException ckx ) { cancelledKey(sk); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error("" ,t); } } } public boolean processSocket (SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null ) { return false ; } SocketProcessorBase<S> sc = processorCache.pop(); if (sc == null ) { sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } Executor executor = getExecutor(); if (dispatch && executor != null ) { executor.execute(sc); } else { sc.run(); } } catch (RejectedExecutionException ree) { getLog().warn(sm.getString("endpoint.executor.fail" , socketWrapper) , ree); return false ; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); getLog().error(sm.getString("endpoint.process.fail" ), t); return false ; } return true ; } }
然后我们来完善一下那张图:
二. 处理Request 上面接收的请求的看的我难受,接下来就是处理请求了。涉及到怎么解析 Socket
数据,以及写出去,先看看调用之前 Tomcat
做了什么,才能让我们的 servlet
顺利完成请求。 由于我们上面已经将请求转移给 executor
执行了,而这个线程类则是 NioEndpoint$SocketProcessor
,他的 run
方法在 SocketProcessorBase
里面,其实这个是一个模板模式,因为 run
又调用了子类需要实现的 doRun
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public abstract class SocketProcessorBase <S> implements Runnable { protected SocketWrapperBase<S> socketWrapper; @Override public final void run () { synchronized (socketWrapper) { if (socketWrapper.isClosed()) { return ; } doRun(); } } }
继续跟进子类的 doRun()
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public class NioEndpoint extends AbstractJsseEndpoint <NioChannel> { protected class SocketProcessor extends SocketProcessorBase <NioChannel> { public SocketProcessor (SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) { super (socketWrapper, event); } @Override protected void doRun () { NioChannel socket = socketWrapper.getSocket(); SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); try { int handshake = -1 ; try { if (key != null ) { if (socket.isHandshakeComplete()) { handshake = 0 ; } else ... } } catch (IOException x) { } catch (CancelledKeyException ckx) { } if (handshake == 0 ) { SocketState state = SocketState.OPEN; if (event == null ) { state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ); } else { state = getHandler().process(socketWrapper, event); } if (state == SocketState.CLOSED) { close(socket, key); } } else if (handshake == -1 ) { } else if (handshake == SelectionKey.OP_READ){ } else if (handshake == SelectionKey.OP_WRITE){ } } catch (CancelledKeyException cx) { socket.getPoller().cancelledKey(key); } catch (VirtualMachineError vme) { ExceptionUtils.handleThrowable(vme); } catch (Throwable t) { log.error("" , t); socket.getPoller().cancelledKey(key); } finally { socketWrapper = null ; event = null ; if (running && !paused) { processorCache.push(this ); } } } } }
ConnectionHandler
的 process
方法会根据当前 Socket
的状态进行不同的处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 public abstract class AbstractProtocol <S> implements ProtocolHandler , MBeanRegistration { protected static class ConnectionHandler <S> implements AbstractEndpoint .Handler<S> { @Override public SocketState process (SocketWrapperBase<S> wrapper, SocketEvent status) { if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.process" , wrapper.getSocket(), status)); } if (wrapper == null ) { return SocketState.CLOSED; } S socket = wrapper.getSocket(); Processor processor = connections.get(socket); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet" , processor, socket)); } if (SocketEvent.TIMEOUT == status && (processor == null !processor.isAsync() && !processor.isUpgrade() processor.isAsync() && !processor.checkAsyncTimeoutGeneration())) { return SocketState.OPEN; } if (processor != null ) { getProtocol().removeWaitingProcessor(processor); } else if (status == SocketEvent.DISCONNECT status == SocketEvent.ERROR) { return SocketState.CLOSED; } ContainerThreadMarker.set(); try { if (processor == null ) { String negotiatedProtocol = wrapper.getNegotiatedProtocol(); if (negotiatedProtocol != null && negotiatedProtocol.length() > 0 ) { } } if (processor == null ) { processor = recycledProcessors.pop(); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.processorPop" , processor)); } } if (processor == null ) { processor = getProtocol().createProcessor(); register(processor); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.processorCreate" , processor)); } } processor.setSslSupport( wrapper.getSslSupport(getProtocol().getClientCertProvider())); connections.put(socket, processor); SocketState state = SocketState.CLOSED; do { state = processor.process(wrapper, status); if (state == SocketState.UPGRADING) { } } while ( state == SocketState.UPGRADING); if (state == SocketState.LONG) { longPoll(wrapper, processor); if (processor.isAsync()) { getProtocol().addWaitingProcessor(processor); } } else if (state == SocketState.OPEN) { connections.remove(socket); release(processor); wrapper.registerReadInterest(); } else if (state == SocketState.SENDFILE) { } else if (state == SocketState.UPGRADED) { if (status != SocketEvent.OPEN_WRITE) { longPoll(wrapper, processor); getProtocol().addWaitingProcessor(processor); } } else if (state == SocketState.SUSPENDED) { } else { connections.remove(socket); if (processor.isUpgrade()) { } release(processor); } return state; } catch (java.net.SocketException e) { } finally { ContainerThreadMarker.clear(); } connections.remove(socket); release(processor); return SocketState.CLOSED; } } }
交给 Processor
的 process
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 public abstract class AbstractProcessorLight implements Processor { @Override public SocketState process (SocketWrapperBase<?> socketWrapper, SocketEvent status) throws IOException { SocketState state = SocketState.CLOSED; Iterator<DispatchType> dispatches = null ; do { if (dispatches != null ) { DispatchType nextDispatch = dispatches.next(); if (getLog().isDebugEnabled()) { getLog().debug("Processing dispatch type: [" + nextDispatch + "]" ); } state = dispatch(nextDispatch.getSocketStatus()); if (!dispatches.hasNext()) { state = checkForPipelinedData(state, socketWrapper); } } else if (status == SocketEvent.DISCONNECT) { } else if (isAsync() isUpgrade() state == SocketState.ASYNC_END) { state = dispatch(status); state = checkForPipelinedData(state, socketWrapper); } else if (status == SocketEvent.OPEN_WRITE) { state = SocketState.LONG; } else if (status == SocketEvent.OPEN_READ) { state = service(socketWrapper); } else if (status == SocketEvent.CONNECT_FAIL) { logAccess(socketWrapper); } else { state = SocketState.CLOSED; } if (getLog().isDebugEnabled()) { } if (state != SocketState.CLOSED && isAsync()) { state = asyncPostProcess(); if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + socketWrapper + "], State after async post processing: [" + state + "]" ); } } if (dispatches == null !dispatches.hasNext()) { dispatches = getIteratorAndClearDispatches(); } } while (state == SocketState.ASYNC_END dispatches != null && state != SocketState.CLOSED); return state; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 public class Http11Processor extends AbstractProcessor { @Override public SocketState service (SocketWrapperBase<?> socketWrapper) throws IOException { RequestInfo rp = request.getRequestProcessor(); rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); setSocketWrapper(socketWrapper); keepAlive = true ; openSocket = false ; readComplete = true ; boolean keptAlive = false ; SendfileState sendfileState = SendfileState.DONE; while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null && sendfileState == SendfileState.DONE && !endpoint.isPaused()) { try { if (!inputBuffer.parseRequestLine(keptAlive)) { if (inputBuffer.getParsingRequestLinePhase() == -1 ) { return SocketState.UPGRADING; } else if (handleIncompleteRequestLineRead()) { break ; } } if (endpoint.isPaused()) { response.setStatus(503 ); setErrorState(ErrorState.CLOSE_CLEAN, null ); } else { keptAlive = true ; request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount()); if (!inputBuffer.parseHeaders()) { openSocket = true ; readComplete = false ; break ; } if (!disableUploadTimeout) { socketWrapper.setReadTimeout(connectionUploadTimeout); } } } catch (IOException e) { if (log.isDebugEnabled()) { log.debug(sm.getString("http11processor.header.parse" ), e); } setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e); break ; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); UserDataHelper.Mode logMode = userDataHelper.getNextMode(); if (logMode != null ) { String message = sm.getString("http11processor.header.parse" ); switch (logMode) { case INFO_THEN_DEBUG: message += sm.getString("http11processor.fallToDebug" ); case INFO: log.info(message, t); break ; case DEBUG: log.debug(message, t); } } response.setStatus(400 ); setErrorState(ErrorState.CLOSE_CLEAN, t); } if (isConnectionToken(request.getMimeHeaders(), "upgrade" )) { String requestedProtocol = request.getHeader("Upgrade" ); UpgradeProtocol upgradeProtocol = protocol.getUpgradeProtocol(requestedProtocol); if (upgradeProtocol != null ) { } } if (getErrorState().isIoAllowed()) { rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE); try { prepareRequest(); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); if (log.isDebugEnabled()) { log.debug(sm.getString("http11processor.request.prepare" ), t); } response.setStatus(500 ); setErrorState(ErrorState.CLOSE_CLEAN, t); } } if (maxKeepAliveRequests == 1 ) { keepAlive = false ; } else if (maxKeepAliveRequests > 0 && socketWrapper.decrementKeepAlive() <= 0 ) { keepAlive = false ; } if (getErrorState().isIoAllowed()) { try { rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); getAdapter().service(request, response); if (keepAlive && !getErrorState().isError() && !isAsync() && statusDropsConnection(response.getStatus())) { setErrorState(ErrorState.CLOSE_CLEAN, null ); } } catch (InterruptedIOException e) { setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e); } catch (HeadersTooLargeException e) { } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("http11processor.request.process" ), t); response.setStatus(500 ); setErrorState(ErrorState.CLOSE_CLEAN, t); getAdapter().log(request, response, 0 ); } } rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT); if (!isAsync()) { endRequest(); } rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT); if (getErrorState().isError()) { response.setStatus(500 ); } if (!isAsync() getErrorState().isError()) { request.updateCounters(); if (getErrorState().isIoAllowed()) { inputBuffer.nextRequest(); outputBuffer.nextRequest(); } } if (!disableUploadTimeout) { int soTimeout = endpoint.getConnectionTimeout(); if (soTimeout > 0 ) { socketWrapper.setReadTimeout(soTimeout); } else { socketWrapper.setReadTimeout(0 ); } } rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE); sendfileState = processSendfile(socketWrapper); } rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); if (getErrorState().isError() endpoint.isPaused()) { return SocketState.CLOSED; } else if (isAsync()) { return SocketState.LONG; } else if (isUpgrade()) { return SocketState.UPGRADING; } else { if (sendfileState == SendfileState.PENDING) { return SocketState.SENDFILE; } else { if (openSocket) { if (readComplete) { return SocketState.OPEN; } else { return SocketState.LONG; } } else { return SocketState.CLOSED; } } } } }
未完待续