tomcat源码2
Tomcat拾遗–BootStrap类的静态代码块和反射调用Catalina的意义是什么
首先我们需要知道一个潜规则:即如果我们在A类中调用B类,如果B类没有被classloader加载或者就算加载了 但是该classloader和A类的classloader属于平行的,即我们在A的classloader中找不到B类的class,那么A会使用自己的classloader去加载B。
反射调用Catalina的意义
因为Bootstrap这个类在Tomcat打包发布时是放在bin\bootstrap.jar中,
而Catalina类是放在lib\catalina.jar中,两个jar是用不同的ClassLoader加载的,
所以不能在Bootstrap类中直接引用Catalina类,只能通过反射。
这也意味着 后续我们在tomcat的Catalina类里面启动的类默认都是使用catalinaLoader(除了我们的context使用webappclassloader去加载的)
,进而tomcat使用的类只能被tomcat自己使用,而不会被其他应用使用
组件图
多个 Connector 和一个 Container 就形成了一个 Service,Service 的概念大家都很熟悉了,有了 Service 就可以对外提供服务了,但是 Service 还要一个生存的环境,必须要有人能够给她生命、掌握其生死大权,那就非 Server 莫属了。所以整个 Tomcat 的生命周期由 Server 控制。
connector
在tomcat中,connector负责接收来自客户端的连接,并交由后续的代码进行处理。connector对象持有ProtocolHandler对象;ProtocolHandler对象持有AbstractEndpoint对象。AbstractEndpoint负责创建服务器套接字,并绑定到监听端口;同时还创建accepter线程来接收客户端的连接以及poller线程来处理连接中的读写请求。其结构如上图所示。
public Connector(String protocol) {
//设置协议
setProtocol(protocol);
// Instantiate protocol handler
ProtocolHandler p = null;
try {
//反射生成ProtocolHandler实例
Class<?> clazz = Class.forName(protocolHandlerClassName);
p = (ProtocolHandler) clazz.getConstructor().newInstance();
} catch (Exception e) {
log.error(sm.getString(
"coyoteConnector.protocolHandlerInstantiationFailed"), e);
} finally {
this.protocolHandler = p;
}
if (Globals.STRICT_SERVLET_COMPLIANCE) {
uriCharset = StandardCharsets.ISO_8859_1;
} else {
uriCharset = StandardCharsets.UTF_8;
}
}
public void setProtocol(String protocol) {
boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
AprLifecycleListener.getUseAprConnector();
if ("HTTP/1.1".equals(protocol) || protocol == null) {
if (aprConnector) {
setProtocolHandlerClassName("org.apache.coyote.http11.Http11AprProtocol");
} else {
setProtocolHandlerClassName("org.apache.coyote.http11.Http11NioProtocol");
}
} else if ("AJP/1.3".equals(protocol)) {
if (aprConnector) {
setProtocolHandlerClassName("org.apache.coyote.ajp.AjpAprProtocol");
} else {
setProtocolHandlerClassName("org.apache.coyote.ajp.AjpNioProtocol");
}
} else {
setProtocolHandlerClassName(protocol);
}
}
Connector的构造函数带有协议属性,该协议属性是server.xml中Connector标签的protocol的属性值。Tomcat 8中默认值为HTTP/1.1,因此在Connector的构造函数中生成的是Http11NioProtocol对象。在setProtocol()方法中可以看到,tomcat8还包括其他几个协议处理器。协议处理器中带有Apr命名的都是使用Apr库来处理http请求的。通过使用APR库,Tomcat将使用JNI的方式来读取文件以及进行网络传输,可以大大提升Tomcat对静态文件的处理性能,同时如果你使用了HTTPS方式传输的话,也可以提升SSL的处理性能。AJP/1.3协议是Http服务器和应用服务器之间数据交互的协议,比如Apache服务器或IIS服务器与tomcat服务器之间进行数据交互。
Http11NioProtocol是非阻塞模式的Http1.1协议处理器,使用java的nio包来实现非阻塞。可以看到,在tomcat 8中,默认使用的是非阻塞IO。
public AbstractHttp11Protocol(AbstractEndpoint<S> endpoint) {
super(endpoint);
setConnectionTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
ConnectionHandler<S> cHandler = new ConnectionHandler<>(this);
setHandler(cHandler);
getEndpoint().setHandler(cHandler);
}
在创建Http11NioProtocol实例的时候,会创建NioEndpoint、ConnectionHandler实例。
//<<AbstractEndpoint>>
public void init() throws Exception {
if (bindOnInit) {
bind();//调用子类bind方法初始化
bindState = BindState.BOUND_ON_INIT;
}
if (this.domain != null) {
// Register endpoint (as ThreadPool - historical name)
oname = new ObjectName(domain + ":type=ThreadPool,name=\"" + getName() + "\"");
Registry.getRegistry(null, null).registerComponent(this, oname, null);
for (SSLHostConfig sslHostConfig : findSslHostConfigs()) {
registerJmx(sslHostConfig);
}
}
}
//<<NioEndpoint>>
/**
* Initialize the endpoint.
*/
@Override
public void bind() throws Exception {
if (!getUseInheritedChannel()) {
// 打开serverSocketChannel
serverSock = ServerSocketChannel.open();
// 设置socket属性
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
// 绑定监听端口
serverSock.socket().bind(addr,getAcceptCount());
} else {
// Retrieve the channel provided by the OS
Channel ic = System.inheritedChannel();
if (ic instanceof ServerSocketChannel) {
serverSock = (ServerSocketChannel) ic;
}
if (serverSock == null) {
throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
}
}
// 设为阻塞模式
//这里为什么要设置成阻塞呢,Tomcat的设计初衷主要是为了操作方便。这样这里就跟BIO模式下一样了。只不过在BIO下这里返回的是Socket,NIO下这里返回的是SocketChannel。
serverSock.configureBlocking(true); //mimic APR behavior
// Initialize thread count defaults for acceptor, poller
if (acceptorThreadCount == 0) {
// FIXME: Doesn't seem to work that well with multiple accept threads
acceptorThreadCount = 1;
}
if (pollerThreadCount <= 0) {
//minimum one poller thread
pollerThreadCount = 1;
}
setStopLatch(new CountDownLatch(pollerThreadCount));
// Initialize SSL if needed
initialiseSsl();
// 打开阻塞模式的selector
selectorPool.open();
}
在bind()方法中,首先打开serverSocketChannel,并绑定到监听端口,此处将其该channel设置为阻塞模式。对于SSL部分,此处略过不讲。在最后的 selectorPool.open()执行语句中,会先获得共享的selector,并且创建线程在该selector上检测事件。
//<<AbstractEndPoint>>
public final void start() throws Exception {
if (bindState == BindState.UNBOUND) {
bind();
bindState = BindState.BOUND_ON_START;
}
startInternal();//调用子类startInternal方法初始化启动
}
//<<NioEndpoint>>
/**
* Start the NIO endpoint, creating acceptor, poller threads.
*/
@Override
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());
eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getEventCache());
nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getBufferPool());
// Create worker collection
if ( getExecutor() == null ) {
createExecutor();
}
// 初始化计数器Latch
initializeConnectionLatch();
// 创建Poller线程
// Start poller threads
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
// 创建Acceptor线程
startAcceptorThreads();
}
}
在startInternal()方法中,最重要的是创建Poller和Acceptor线程。Acceptor线程处理serverSocketChannel的请求接收事件;Poller处理serverSocketChannel的读写事件。此时可以预想到,Acceptor线程专门负责接收客户端连接socketChannel,然后将socketChannel交给Poller线程读写。在实际中,Poller线程将socketChannel再次封装之后又开启另一个线程进行实际的数据处理。这样设计的目的是避免当某一个请求出现阻塞的时候,影响到整个服务器的接收、处理能力。 按接收请求,处理请求的逻辑,我们先观察Acceptor线程。
// --------------------------------------------------- Acceptor Inner Class
/**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
protected class Acceptor extends AbstractEndpoint.Acceptor {
@Override
public void run() {
int errorDelay = 0;
// 一直循环直到接收停止命令
// Loop until we receive a shutdown command
while (running) {
// Loop if endpoint is paused
while (paused && running) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
if (!running) {
break;
}
state = AcceptorState.RUNNING;
try {
//通过同步计数器来限制连接数目
//当连接数目超过上限时,则等待
//其中同步计算器是通过继承AQS实现的
//默认的最大连接数是10000
//if we have reached max connections, wait
countUpOrAwaitConnection();
SocketChannel socket = null;
try {
// Accept the next incoming connection from the server
// socket
//接收连接,此处并不是使用selector实现,在前面的代码中已知serverSock是阻塞模式的。
socket = serverSock.accept();
} catch (IOException ioe) {
// We didn't get a socket
countDownConnection();
if (running) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break;
}
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (running && !paused) {
// setSocketOptions() will hand the socket off to
// an appropriate processor if successful
// 在setSocketOptions中将接收到的socket传给poller线程进行处理
if (!setSocketOptions(socket)) {
closeSocket(socket);
}
} else {
closeSocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.accept.fail"), t);
}
}
state = AcceptorState.ENDED;
}
private void closeSocket(SocketChannel socket) {
countDownConnection();
try {
socket.socket().close();
} catch (IOException ioe) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.err.close"), ioe);
}
}
try {
socket.close();
} catch (IOException ioe) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.err.close"), ioe);
}
}
}
}
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//disable blocking, APR style, we are gonna be polling it
// 设置为非阻塞模式
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
// 从NioChannel容器中获得一个NioChannel
// NioChannel可以理解为socketChannel的代理类,提供更多的功能
NioChannel channel = nioChannels.pop();
if (channel == null) {
// SocketBufferHandler维护了在处理过程中的读写缓存
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
// 将socket、bufHandler封装到NioChannel中
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
channel.reset();
}
// 将niochannel注册到poller线程中进行处理
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error("",t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}
poller和pollerEvent
待补充
public void run() {
// Loop until destroy() is called
// 循环直到destroy()方法被调用
while (true) {
boolean hasEvents = false;
try {
if (!close) {
hasEvents = events();
// wakeupCounter > 0,表示有事件,故直接用selectNow,否则用 select(selectorTimeout)以阻塞一段时间等待事件到来
if (wakeupCounter.getAndSet(-1) > 0) {
//if we are here, means we have other stuff to do
//do a non blocking select
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
// 关闭
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("",x);
continue;
}
//either we timed out or we woke up, process events first
// 执行队列中的PollerEvent事件,注册读或写,
// hasEvents表示是否有读写事件注册
if ( keyCount == 0 ) hasEvents = (hasEvents | events());
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
// 将sk和attachtment包装,交由后续线程继续处理
processKey(sk, attachment);
}
}//while
//process timeouts
timeout(keyCount,hasEvents);
}//while
getStopLatch().countDown();
}
//在events()方法中,通过调用PollerEvent的run()方法将socket注册到selector中
public boolean events() {
boolean result = false;
PollerEvent pe = null;
// 从队列中获得PollerEvent事件
for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
result = true;
try {
// 调用PollerEvent的run()方法执行事件注册
pe.run();
pe.reset();
if (running && !paused) {
eventCache.push(pe);
}
} catch ( Throwable x ) {
log.error("",x);
}
}
return result;
}
public void run() {
// 如果是注册,则把socket注册到selector中
if (interestOps == OP_REGISTER) {
try {
socket.getIOChannel().register(
socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
} else {
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
if (key == null) {
// The key was cancelled (e.g. due to socket closure)
// and removed from the selector while it was being
// processed. Count down the connections at this point
// since it won't have been counted down when the socket
// closed.
socket.socketWrapper.getEndpoint().countDownConnection();
((NioSocketWrapper) socket.socketWrapper).closed = true;
} else {
final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
if (socketWrapper != null) {
//we are registering the key to start with, reset the fairness counter.
int ops = key.interestOps() | interestOps;
socketWrapper.interestOps(ops);
key.interestOps(ops);
} else {
socket.getPoller().cancelledKey(key);
}
}
} catch (CancelledKeyException ckx) {
try {
socket.getPoller().cancelledKey(key);
} catch (Exception ignore) {}
}
}
}
//processKey()方法处理准备完毕的事件
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
// 如果close,则取消sk
if ( close ) {
cancelledKey(sk);
} else if ( sk.isValid() && attachment != null ) {
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getSendfileData() != null ) {
// 处理文件
processSendfile(sk,attachment, false);
} else {
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {// 处理可读
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) { //处理可写
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
}
} else {
//invalid key
cancelledKey(sk);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("",t);
}
}
// 将selectionKey包装为SocketProcessor
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
// 交给线程池处理或直接运行
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}