mina启动

目的

分析mina客户端服务器启动的时候是如何运行的;

服务端启动

服务端典型的代表: NioSocketAcceptor

new NioSocketAcceptor()对象创建的时候会调用一系列的super构造器

public NioSocketAcceptor(int processorCount) {
    super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
    ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
}


protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
        Class<? extends IoProcessor<T>> processorClass, int processorCount) {
    this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, processorCount), true);
}
创建了一个SimpleIoProcessorPool : 可以了理解为processor的池子 ; 
默认会创建Runtime.getRuntime().availableProcessors() + 1 个处理器

private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
            Executor executor, IoProcessor<T> processor,
            boolean createdProcessor)
            
 调用init() -->       selector = Selector.open();
 
 
 acceptor.bind 的时候    ---->AbstractPollingIoConnectionlessIoAcceptor.bindInternel()
 创建一个AcceptorOperationFuture的request对象放入到registerQueue
 startupAcceptor();--->开启一个Acceptor的线程 用于监听和处理客户端的连接

客端启动

NioSocketConnector

new NioSocketConnector的时候
public NioSocketConnector() {
    super(new DefaultSocketSessionConfig(), NioProcessor.class);
    ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
}

protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) {
    this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true);
}
依然会创建一个processor的池子
connector.connect() -->AbstractPollingIoConnector.connect0()
protected final ConnectFuture connect0(
        SocketAddress remoteAddress, SocketAddress localAddress,
        IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
    H handle = null;
    boolean success = false;
    try {
        //创建handle
        handle = newHandle(localAddress);
        if (connect(handle, remoteAddress)) {
            ConnectFuture future = new DefaultConnectFuture();
            //new 一个session
            T session = newSession(processor, handle);
            //session初始化
            initSession(session, future, sessionInitializer);
            // Forward the remaining process to the IoProcessor.
            //获取处理器 并且添加到session , 处理器怎么获取的呢?
            session.getProcessor().add(session);
            success = true;
            return future;
        }

        success = true;
    } catch (Exception e) {
        return DefaultConnectFuture.newFailedFuture(e);
    } finally {
        if (!success && handle != null) {
            try {
                close(handle);
            } catch (Exception e) {
                ExceptionMonitor.getInstance().exceptionCaught(e);
            }
        }
    }

    //创建一个连接请求
    ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
    //添加到队列
    connectQueue.add(request);
    //创建一个Connector线程 监视和维护连接
    startupWorker();
    wakeup();

    return request;
}

Processor获取

在SimpleIoProcessorPool中之前有说过维护了一个processor的池子 , 那么这么多处理器怎么知道获取那个处理器呢
private IoProcessor<S> getProcessor(S session) {
    IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);
    
    if (processor == null) {
        if (disposed || disposing) {
            throw new IllegalStateException("A disposed processor cannot be accessed.");
        }
        //session id 和池子的大小取模 放入到session的attribute中去
        processor = pool[Math.abs((int) session.getId()) % pool.length];
        session.setAttributeIfAbsent(PROCESSOR, processor);
    }

    return processor;
}

在添加session到处理器的时候还有一个很重要的操作
public final void add(S session) {
    if (disposed || disposing) {
        throw new IllegalStateException("Already disposed.");
    }

    // Adds the session to the newSession queue and starts the worker
    newSessions.add(session);
    //启动了处理器
    startupProcessor();
}


处理器线程的业务

private class Processor implements Runnable {
    public void run() {
        int nSessions = 0;
        lastIdleCheckTime = System.currentTimeMillis();

        for (;;) {
            try {
                // This select has a timeout so that we can manage
                // idle session when we get out of the select every
                // second. (note : this is a hack to avoid creating
                // a dedicated thread).
                //每一秒select一次 如果没有session连接导致阻塞
                int selected = select(SELECT_TIMEOUT);

                // Manage newly created session first
                //处理新建立的session ,
                //AbstractPollingIoProcessor.addNow()--->listeners.fireSessionCreated(session);
                //IoServiceListenerSupport.fireSessionCreated()-->DefaultIoFilterChain.fireSessionOpened()
                //ExecutorFilter.sessionOpened() 创建了一个IoFilterEvent线程 内部有一个fire方法                    
                //然后会fire调用一些列的filterChain, 最终到达处理器我们自定义的handler
                //handler 执行完业务后有出filterChain, ---->filterChain.fireFilterWrite(writeRequest);
                //IoFireEvent.fire()  write request  --next filter write 
                nSessions += handleNewSessions();
                
                updateTrafficMask();

                // Now, if we have had some incoming or outgoing events,
                // deal with them
                if (selected > 0) {
                    //LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...
                    process();
                }

                // Write the pending requests
                long currentTime = System.currentTimeMillis();
                flush(currentTime);
                
                // And manage removed sessions
                nSessions -= removeSessions();
                
                // Last, not least, send Idle events to the idle sessions
                notifyIdleSessions(currentTime);

                // Get a chance to exit the infinite loop if there are no
                // more sessions on this Processor
                if (nSessions == 0) {
                    synchronized (lock) {
                        if (newSessions.isEmpty() && isSelectorEmpty()) {
                            processor = null;
                            break;
                        }
                    }
                }

                // Disconnect all sessions immediately if disposal has been
                // requested so that we exit this loop eventually.
                if (isDisposing()) {
                    for (Iterator<S> i = allSessions(); i.hasNext();) {
                        scheduleRemove(i.next());
                    }
                    
                    wakeup();
                }
            } catch (ClosedSelectorException cse) {
                // If the selector has been closed, we can exit the loop
                break;
            } catch (Throwable t) {
                ExceptionMonitor.getInstance().exceptionCaught(t);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e1) {
                    ExceptionMonitor.getInstance().exceptionCaught(e1);
                }
            }
        }

        try {
            synchronized (disposalLock) {
                if (disposing) {
                    doDispose();
                }
            }
        } catch (Throwable t) {
            ExceptionMonitor.getInstance().exceptionCaught(t);
        } finally {
            disposalFuture.setValue(true);
        }
    }
}