服务端典型的代表: NioSocketAcceptor
new NioSocketAcceptor()对象创建的时候会调用一系列的super构造器
public NioSocketAcceptor(int processorCount) {
super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
((DefaultSocketSessionConfig) getSessionConfig()).init(this);
}
protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
Class<? extends IoProcessor<T>> processorClass, int processorCount) {
this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, processorCount), true);
}
创建了一个SimpleIoProcessorPool : 可以了理解为processor的池子 ;
默认会创建Runtime.getRuntime().availableProcessors() + 1 个处理器
private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
Executor executor, IoProcessor<T> processor,
boolean createdProcessor)
调用init() --> selector = Selector.open();
acceptor.bind 的时候 ---->AbstractPollingIoConnectionlessIoAcceptor.bindInternel()
创建一个AcceptorOperationFuture的request对象放入到registerQueue
startupAcceptor();--->开启一个Acceptor的线程 用于监听和处理客户端的连接
NioSocketConnector
new NioSocketConnector的时候
public NioSocketConnector() {
super(new DefaultSocketSessionConfig(), NioProcessor.class);
((DefaultSocketSessionConfig) getSessionConfig()).init(this);
}
protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) {
this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true);
}
依然会创建一个processor的池子
connector.connect() -->AbstractPollingIoConnector.connect0()
protected final ConnectFuture connect0(
SocketAddress remoteAddress, SocketAddress localAddress,
IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
H handle = null;
boolean success = false;
try {
//创建handle
handle = newHandle(localAddress);
if (connect(handle, remoteAddress)) {
ConnectFuture future = new DefaultConnectFuture();
//new 一个session
T session = newSession(processor, handle);
//session初始化
initSession(session, future, sessionInitializer);
// Forward the remaining process to the IoProcessor.
//获取处理器 并且添加到session , 处理器怎么获取的呢?
session.getProcessor().add(session);
success = true;
return future;
}
success = true;
} catch (Exception e) {
return DefaultConnectFuture.newFailedFuture(e);
} finally {
if (!success && handle != null) {
try {
close(handle);
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
}
//创建一个连接请求
ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
//添加到队列
connectQueue.add(request);
//创建一个Connector线程 监视和维护连接
startupWorker();
wakeup();
return request;
}
在SimpleIoProcessorPool中之前有说过维护了一个processor的池子 , 那么这么多处理器怎么知道获取那个处理器呢
private IoProcessor<S> getProcessor(S session) {
IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);
if (processor == null) {
if (disposed || disposing) {
throw new IllegalStateException("A disposed processor cannot be accessed.");
}
//session id 和池子的大小取模 放入到session的attribute中去
processor = pool[Math.abs((int) session.getId()) % pool.length];
session.setAttributeIfAbsent(PROCESSOR, processor);
}
return processor;
}
在添加session到处理器的时候还有一个很重要的操作
public final void add(S session) {
if (disposed || disposing) {
throw new IllegalStateException("Already disposed.");
}
// Adds the session to the newSession queue and starts the worker
newSessions.add(session);
//启动了处理器
startupProcessor();
}
处理器线程的业务
private class Processor implements Runnable {
public void run() {
int nSessions = 0;
lastIdleCheckTime = System.currentTimeMillis();
for (;;) {
try {
// This select has a timeout so that we can manage
// idle session when we get out of the select every
// second. (note : this is a hack to avoid creating
// a dedicated thread).
//每一秒select一次 如果没有session连接导致阻塞
int selected = select(SELECT_TIMEOUT);
// Manage newly created session first
//处理新建立的session ,
//AbstractPollingIoProcessor.addNow()--->listeners.fireSessionCreated(session);
//IoServiceListenerSupport.fireSessionCreated()-->DefaultIoFilterChain.fireSessionOpened()
//ExecutorFilter.sessionOpened() 创建了一个IoFilterEvent线程 内部有一个fire方法
//然后会fire调用一些列的filterChain, 最终到达处理器我们自定义的handler
//handler 执行完业务后有出filterChain, ---->filterChain.fireFilterWrite(writeRequest);
//IoFireEvent.fire() write request --next filter write
nSessions += handleNewSessions();
updateTrafficMask();
// Now, if we have had some incoming or outgoing events,
// deal with them
if (selected > 0) {
//LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...
process();
}
// Write the pending requests
long currentTime = System.currentTimeMillis();
flush(currentTime);
// And manage removed sessions
nSessions -= removeSessions();
// Last, not least, send Idle events to the idle sessions
notifyIdleSessions(currentTime);
// Get a chance to exit the infinite loop if there are no
// more sessions on this Processor
if (nSessions == 0) {
synchronized (lock) {
if (newSessions.isEmpty() && isSelectorEmpty()) {
processor = null;
break;
}
}
}
// Disconnect all sessions immediately if disposal has been
// requested so that we exit this loop eventually.
if (isDisposing()) {
for (Iterator<S> i = allSessions(); i.hasNext();) {
scheduleRemove(i.next());
}
wakeup();
}
} catch (ClosedSelectorException cse) {
// If the selector has been closed, we can exit the loop
break;
} catch (Throwable t) {
ExceptionMonitor.getInstance().exceptionCaught(t);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
ExceptionMonitor.getInstance().exceptionCaught(e1);
}
}
}
try {
synchronized (disposalLock) {
if (disposing) {
doDispose();
}
}
} catch (Throwable t) {
ExceptionMonitor.getInstance().exceptionCaught(t);
} finally {
disposalFuture.setValue(true);
}
}
}