Tomcat 的 Connector 有三种运行模式 bio、nio、apr ,先了解一下这三种的区别。
1、 bio(blocking I/O),顾名思义即阻塞式 I/O 操作,表示 Tomcat 使用的是传统的 Java I/O 操作(即java.io
包及其子包)。Tomcat 在默认情况下,就是以 bio 模式运行的。一般而言 bio 模式是三种运行模式中性能最低的一种。
2、 nio(new I/O),是 Java SE 1.4 及后续版本提供的一种新的 I/O 操作方式(即java.nio
包及其子包)。Java
nio 是一个基于缓冲区并能提供非阻塞 I/O 操作的 Java API ,因此 nio 也被看成是 non-blocking I/O
的缩写。它拥有比传统 I/O 操作( bio )更好的并发运行性能。要让 Tomcat 以 nio 模式来运行只需要在 Tomcat 安装目录/conf/server.xml
文件中将 Connector 节点的 protocol 配置成org.apache.coyote.http11.Http11NioProtocol
即可。
3、
apr( Apache Portable Runtime/Apache 可移植运行时),是 Apache HTTP
服务器的支持库。可以简单地理解为 Tomcat 将以 JNI 的形式调 用Apache HTTP
服务器的核心动态链接库来处理文件读取或网络传输操作,从而大大提高 Tomcat 对静态文件的处理性能。 Tomcat apr 也是在
Tomcat 上运行高并发应用的首选模式。
写个 BIO 的 Socket 服务器还是比较容易的,无非是每 accept 一个 socket 之后就扔到一个线程中处理请求生成响应,这种方式可以改进的点就是增加线程池的支持,本文主要分析一下 Tomcat 中 NIO 处理方式的相关代码逻辑。
关键代码都是在org.apache.tomcat.util.net.NioEndpoint
这个类里面,它是 Http11NioProtocol 中负责接收处理 socket 的主要组件,别看代码很长,仔细阅读会发现有很多共通的地方,如:
1、 都会对JDK中原有的API做一下扩展或者包装,如 ThreadPoolExecutor 是对java.util.concurrent.ThreadPoolExecutor
的扩展,NioChannel 是对 ByteChannel 的扩展,KeyAttachment 则是对 NioChannel 的包装
2、
很多类设计成非 GC 的,方便缓存和重复使用,实现方式都是通过 ConcurrentLinkedQueue 类构造一个队列。比如
NioEndpoint 类里面的 ConcurrentLinkedQueue
processorCache、ConcurrentLinkedQueue keyCache、ConcurrentLinkedQueue
eventCache、ConcurrentLinkedQueue nioChannels 。Poller 类里面的
ConcurrentLinkedQueue events
先看下整个 Connector 组件结构图:
看过之前 Tomcat 启动文章的应该都知道,Connector 的启动会调用 Connector 类的 startInternal 方法,里面调用了 protocolHandler 的 start() ,该方法中将调用抽象的 endpoint 的 start() 方法,这个方法会调用到具体 Endpoint 类的 startInternal() ,所以代码分析先从 NioEndpoint 类的 startInternal 看起。
/**
* Start the NIO endpoint, creating acceptor, poller threads.
*/
@Override
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
// Create worker collection
if ( getExecutor() == null ) {
// 构造线程池,用于后续执行SocketProcessor线程,这就是上图中的Worker。
createExecutor();
}
initializeConnectionLatch();
// Start poller threads
// 根据处理器数量构造一定数目的轮询器,即上图中的Poller
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
// 创建接收者线程,即上图中的Acceptor
startAcceptorThreads();
}
}
startAcceptorThreads 调用的是父类org.apache.tomcat.util.net.AbstractEndpoint
中的实现:
protected final void startAcceptorThreads() {
int count = getAcceptorThreadCount();
acceptors = new Acceptor[count];
for (int i = 0; i < count; i++) {
// 调用子类的createAcceptor方法,本例中即NioEndpoint类的createAcceptor方法
acceptors[i] = createAcceptor();
String threadName = getName() + "-Acceptor-" + i;
acceptors[i].setThreadName(threadName);
Thread t = new Thread(acceptors[i], threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}
以上就是 Acceptor、Poller、Worker 等核心组件的初始化过程。
核心组件初始化之后接着就是 Acceptor 线程接收 socket 连接,看下 Acceptor 的源码:
// --------------------------------------------------- Acceptor Inner Class
/**
* 后台线程,用于监听TCP/IP连接以及将它们分发给相应的调度器处理。
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
protected class Acceptor extends AbstractEndpoint.Acceptor {
@Override
public void run() {
int errorDelay = 0;
// 循环遍历直到接收到关闭命令
// Loop until we receive a shutdown command
while (running) {
// Loop if endpoint is paused
while (paused && running) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
if (!running) {
break;
}
state = AcceptorState.RUNNING;
try {
// 如果已经达到最大连接数则让线程等待
//if we have reached max connections, wait
countUpOrAwaitConnection();
SocketChannel socket = null;
try {
// 接收连接,这里用的阻塞模式。
// Accept the next incoming connection from the server
// socket
socket = serverSock.accept();
} catch (IOException ioe) {
//we didn't get a socket
countDownConnection();
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
}
// Successful accept, reset the error delay
errorDelay = 0;
// 注意这个setSocketOptions方法
// 它将把上面接收到的socket添加到轮询器Poller中
// setSocketOptions() will add channel to the poller
// if successful
if (running && !paused) {
if (!setSocketOptions(socket)) {
countDownConnection();
closeSocket(socket);
}
} else {
countDownConnection();
closeSocket(socket);
}
} catch (SocketTimeoutException sx) {
// Ignore: Normal condition
} catch (IOException x) {
if (running) {
log.error(sm.getString("endpoint.accept.fail"), x);
}
} catch (OutOfMemoryError oom) {
try {
oomParachuteData = null;
releaseCaches();
log.error("", oom);
}catch ( Throwable oomt ) {
try {
try {
System.err.println(oomParachuteMsg);
oomt.printStackTrace();
}catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.accept.fail"), t);
}
}
state = AcceptorState.ENDED;
}
}
在 Acceptor 里接收到一个连接之后调用 setSocketOptions 方法设置 SocketChannel 的一些参数,然后将 SocketChannel 注册到 Poller 中。看下 setSocketOptions 的实现:
/**
* Process the specified connection.
*/
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
// 将SocketChannel配置为非阻塞模式
//disable blocking, APR style, we are gonna be polling it
socket.configureBlocking(false);
Socket sock = socket.socket();
// 设置Socket参数值(从server.xml的Connector节点上获取参数值)
// 比如Socket发送、接收的缓存大小、心跳检测等
socketProperties.setProperties(sock);
// 从NioChannel的缓存队列中取出一个NioChannel
// NioChannel是SocketChannel的一个的包装类
// 这里对上层屏蔽SSL和一般TCP连接的差异
NioChannel channel = nioChannels.poll();
// 缓存队列中没有则新建一个NioChannel
if ( channel == null ) {
// SSL setup
if (sslContext != null) {
SSLEngine engine = createSSLEngine();
int appbufsize = engine.getSession().getApplicationBufferSize();
NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,socketProperties.getAppReadBufSize()),
Math.max(appbufsize,socketProperties.getAppWriteBufSize()),
socketProperties.getDirectBuffer());
channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool);
} else {
// normal tcp setup
NioBufferHandler bufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
channel = new NioChannel(socket, bufhandler);
}
} else {
// 将SocketChannel关联到从缓存队列中获取的NioChannel上来
channel.setIOChannel(socket);
if ( channel instanceof SecureNioChannel ) {
SSLEngine engine = createSSLEngine();
((SecureNioChannel)channel).reset(engine);
} else {
channel.reset();
}
}
// 将新接收到的SocketChannel注册到Poller中
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error("",t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(t);
}
// Tell to close the socket
return false;
}
return true;
}
核心调用是最后的getPoller0().register(channel);
它将配置好的 SocketChannel 包装成一个 PollerEvent ,然后加入到 Poller 的 events 缓存队列中。
getPoller0 方法将轮询当前的 Poller 数组,从中取出一个 Poller 返回。( Poller 的初始化参见上述第1步:NioEndpoint 类核心组件的初始化)
/**
* Return an available poller in true round robin fashion
*/
public Poller getPoller0() {
// 最简单的轮询调度算法,poller的计数器不断加1再对poller数组取余数
int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
return pollers[idx];
}
之后调用 Poller 对象的 register 方法:
public void register(final NioChannel socket) {
// 设置socket的Poller引用,便于后续处理
socket.setPoller(this);
// 从NioEndpoint的keyCache缓存队列中取出一个KeyAttachment
KeyAttachment key = keyCache.poll();
// KeyAttachment实际是NioChannel的包装类
final KeyAttachment ka = key!=null?key:new KeyAttachment(socket);
// 重置KeyAttachment对象中Poller、NioChannel等成员变量的引用
ka.reset(this,socket,getSocketProperties().getSoTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
// 从Poller的事件对象缓存中取出一个PollerEvent,并用socket初始化事件对象
PollerEvent r = eventCache.poll();
// 设置读操作为感兴趣的操作
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
// 加入到Poller对象里的事件队列
addEvent(r);
}
看下 Poller 类里 addEvent 的代码:
/**
* Only used in this class. Will be made private in Tomcat 8.0.x
* @deprecated
*/
@Deprecated
public void addEvent(Runnable event) {
events.offer(event);
if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
}
就两行,第一行从 event 对象添加到缓存队列中,第二行如果当前事件队列中没有事件,则唤醒处于阻塞状态的 selector 。
上面讲的是从 Acceptor 中接收到的 Socket 以 PollerEvent 的形式包装并添加到 Poller 的事件缓存队列中,接下来看看另外一个核心组件 Poller 的处理过程:
/**
* Poller class.
*/
public class Poller implements Runnable {
// 这就是NIO中用到的选择器,可以看出每一个Poller都会关联一个Selector
protected Selector selector;
// 待处理的事件队列
protected ConcurrentLinkedQueue events = new ConcurrentLinkedQueue();
// 唤醒多路复用器的条件阈值
protected AtomicLong wakeupCounter = new AtomicLong(0l);
public Poller() throws IOException {
// 对Selector的同步访问,通过调用Selector.open()方法创建一个Selector
synchronized (Selector.class) {
// Selector.open() isn't thread safe
// http://bugs.sun.com/view_bug.do?bug_id=6427854
// Affects 1.6.0_29, fixed in 1.7.0_01
this.selector = Selector.open();
}
}
// 通过addEvent方法将事件添加到Poller的事件队列中
/**
* Only used in this class. Will be made private in Tomcat 8.0.x
* @deprecated
*/
@Deprecated
public void addEvent(Runnable event) {
events.offer(event);
// 如果队列中没有待处理的事件则唤醒处于阻塞状态的selector
if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
}
// 处理事件队列中的所有事件,如果事件队列是空的则返回false
/**
* Processes events in the event queue of the Poller.
*
* @return true if some events were processed,
* false if queue was empty
*/
public boolean events() {
boolean result = false;
Runnable r = null;
// 将Poller的事件队列中的事件逐个取出并执行相应的事件线程
while ( (r = events.poll()) != null ) {
result = true;
try {
// 执行事件处理逻辑
// 这里将事件设计成线程是将具体的事件处理逻辑和事件框架分开
r.run();
if ( r instanceof PollerEvent ) {
((PollerEvent)r).reset();
// 事件处理完之后,将事件对象返回NIOEndpoint的事件对象缓存中
eventCache.offer((PollerEvent)r);
}
} catch ( Throwable x ) {
log.error("",x);
}
}
return result;
}
// 将socket包装成统一的事件对象PollerEvent,加入到待处理事件队列中
public void register(final NioChannel socket) {
socket.setPoller(this);
KeyAttachment key = keyCache.poll();
final KeyAttachment ka = key!=null?key:new KeyAttachment(socket);
ka.reset(this,socket,getSocketProperties().getSoTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
// 从NIOEndpoint的事件对象缓存中取出一个事件对象
PollerEvent r = eventCache.poll();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
// 将事件添加打Poller的事件队列中
addEvent(r);
}
// Poller是一个线程,该线程同Acceptor一样会监听TCP/IP连接并将它们交给合适的处理器处理
/**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
@Override
public void run() {
// Loop until destroy() is called
while (true) {
try {
// Loop if endpoint is paused
while (paused && (!close) ) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore
}
}
boolean hasEvents = false;
// Time to terminate?
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString(
"endpoint.nio.selectorCloseFail"), ioe);
}
break;
} else {
// 执行事件队列中的事件线程
hasEvents = events();
}
try {
if ( !close ) {
if (wakeupCounter.getAndSet(-1) > 0) {
// 把wakeupCounter设成-1,这是与addEvent里的代码呼应,这样会唤醒selector
//if we are here, means we have other stuff to do
//do a non blocking select
// 以非阻塞方式查看selector是否有事件发生
keyCount = selector.selectNow();
} else {
// 查看selector是否有事件发生,超过指定时间则立即返回
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
// 执行事件队列中的事件线程
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString(
"endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch ( NullPointerException x ) {
//sun bug 5076772 on windows JDK 1.5
if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
if ( wakeupCounter == null || selector == null ) throw x;
continue;
} catch ( CancelledKeyException x ) {
//sun bug 5076772 on windows JDK 1.5
if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
if ( wakeupCounter == null || selector == null ) throw x;
continue;
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("",x);
continue;
}
//either we timed out or we woke up, process events first
if ( keyCount == 0 ) hasEvents = (hasEvents | events());
Iterator iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// 根据向selector中注册的key遍历channel中已经就绪的keys,并处理这些key
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
// 这里的attachment方法返回的就是在register()方法中注册的
// 而KeyAttachment对象是对socket的包装
KeyAttachment attachment = (KeyAttachment)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
// 更新通道最近一次发生事件的时间
// 防止因超时没有事件发生而被剔除出selector
attachment.access();
iterator.remove();
// 具体处理通道的逻辑
processKey(sk, attachment);
}
}//while
//process timeouts
// 多路复用器每执行一遍完整的轮询便查看所有通道是否超时
// 对超时的通道将会被剔除出多路复用器
timeout(keyCount,hasEvents);
if ( oomParachute > 0 && oomParachuteData == null ) checkParachute();
} catch (OutOfMemoryError oom) {
try {
oomParachuteData = null;
releaseCaches();
log.error("", oom);
}catch ( Throwable oomt ) {
try {
System.err.println(oomParachuteMsg);
oomt.printStackTrace();
}catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}
}
}//while
synchronized (this) {
this.notifyAll();
}
stopLatch.countDown();
}
// 处理selector检测到的通道事件
protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
boolean result = true;
try {
if ( close ) {
cancelledKey(sk, SocketStatus.STOP, attachment.comet);
} else if ( sk.isValid() && attachment != null ) {
// 确保通道不会因超时而被剔除
attachment.access();//make sure we don't time out valid sockets
sk.attach(attachment);//cant remember why this is here
NioChannel channel = attachment.getChannel();
// 处理通道发生的读写事件
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
} else {
if ( isWorkerAvailable() ) {
// 在通道上注销对已经发生事件的关注
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
// 具体的通道处理逻辑
if (!processSocket(channel, SocketStatus.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
if (!processSocket(channel, SocketStatus.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
// 解除无效通道
cancelledKey(sk,SocketStatus.DISCONNECT,false);
}
} else {
result = false;
}
}
}
} else {
//invalid key
cancelledKey(sk, SocketStatus.ERROR,false);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk, SocketStatus.ERROR,false);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("",t);
}
return result;
}
// 这个unreg()很巧妙,防止了通道对同一个事件不断select的问题
protected void unreg(SelectionKey sk, KeyAttachment attachment, int readyOps) {
//this is a must, so that we don't have multiple threads messing with the socket
reg(sk,attachment,sk.interestOps()& (~readyOps));
}
// 向NioChannel注册感兴趣的事件,具体代码看下面的PollerEvent类的说明
protected void reg(SelectionKey sk, KeyAttachment attachment, int intops) {
sk.interestOps(intops);
attachment.interestOps(intops);
attachment.setCometOps(intops);
}
}
Poller 处理的核心是启动执行事件队列中的 PollerEvent,接着从 selector 中遍历已经就绪的 key ,一旦发生了感兴趣的事件,则交由 processSocket 方法处理。PollerEvent 的作用是向 socket 注册或更新感兴趣的事件:
/**
*
* PollerEvent, cacheable object for poller events to avoid GC
*/
public static class PollerEvent implements Runnable {
// 每个PollerEvent都会保存NioChannel的引用
protected NioChannel socket;
protected int interestOps;
protected KeyAttachment key;
public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) {
reset(ch, k, intOps);
}
public void reset(NioChannel ch, KeyAttachment k, int intOps) {
socket = ch;
interestOps = intOps;
key = k;
}
public void reset() {
reset(null, null, 0);
}
@Override
public void run() {
//socket第一次注册到selector中,完成对socket读事件的注册
if ( interestOps == OP_REGISTER ) {
try {
socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
} catch (Exception x) {
log.error("", x);
}
} else {
// socket之前已经注册到了selector中,更新socket所感兴趣的事件
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
boolean cancel = false;
if (key != null) {
final KeyAttachment att = (KeyAttachment) key.attachment();
if ( att!=null ) {
//handle callback flag
if (att.isComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) {
att.setCometNotify(true);
} else {
att.setCometNotify(false);
}
interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag
// 刷新事件的最后访问时间,防止事件超时
att.access();//to prevent timeout
//we are registering the key to start with, reset the fairness counter.
int ops = key.interestOps() | interestOps;
att.interestOps(ops);
key.interestOps(ops);
} else {
cancel = true;
}
} else {
cancel = true;
}
if ( cancel ) socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
}catch (CancelledKeyException ckx) {
try {
socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT,true);
}catch (Exception ignore) {}
}
}//end if
}//run
@Override
public String toString() {
return super.toString()+"[intOps="+this.interestOps+"]";
}
}
在第5步的 Poller 处理流程的分析中看到它的 run 方法最后会调用 processKey() 处理 selector 检测到的通道事件,而在这个方法最后会调用 processSocket 来调用具体的通道处理逻辑,看下 processSocket 方法的实现:
public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
try {
KeyAttachment attachment = (KeyAttachment)socket.getAttachment();
if (attachment == null) {
return false;
}
attachment.setCometNotify(false); //will get reset upon next reg
// 从SocketProcessor的缓存队列中取出一个来处理socket
SocketProcessor sc = processorCache.poll();
if ( sc == null ) sc = new SocketProcessor(socket,status);
else sc.reset(socket,status);
// 将有事件发生的socket交给Worker处理
if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);
else sc.run();
} catch (RejectedExecutionException rx) {
log.warn("Socket processing request was rejected for:"+socket,rx);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
Poller 通过 NioEndpoint 的协调,将发生事件的 socket 交给工作者线程 Worker 来进一步处理。整个事件框架的工作就到此结束,下面就是 Worker 的处理。
在 Tomcat 6 版本的 NIO 处理实现中有一个 Worker 类,在 Tomcat 7 中把它去掉了,但工作者的职责还在,只是交由了上面看到的 SocketProcessor 这个类来担当,看下这个类的实现代码:
本文系作者在时代Java发表,未经许可,不得转载。
如有侵权,请联系nowjava@qq.com删除。