分析一:serverBootstrap.group(parentGroup, childGroup)
AbstractBootstrap 维护了一个group (parentGroup)
ServerBootstrap 维护了一个childGroup
NioEventLoopGroup 继承了MultithreadEventExecutorGroup 内部维护了一个EventExecutor[] children 就是Executor
EventExecutor[] children不仅仅是一个Executor,后来查看别人对netty的分析才知道children[]内部维护的是NioEventLoop
NioEventLoop从字面的意思就是非阻塞io的事件循环;控制着io事件和非io事件的执行;(选择器执行的控制)
还有一个EventExecutorChooser chooser = chooserFactory.newChooser(children);
选择chooser的策略有2种
PowerOfTwoEventExecutorChooser: next的获取方式是executors[idx.getAndIncrement() & executors.lenght -1]
GenericEventExecutorChooser :next的获取方式是executors[Math.abs(idx.getAndIncrement() % executors.lenght)]
分析二:channel(NioServerSocketChannel.class)
ServerBootstrap内部维护了一个channelFactory , 那么怎么创建这个工厂对象的呢? 通过java反射;
从ReflectiveChannelFactory类的名称就可以知道内部采用了java的反射机制,该类通过调用newChannel()方法,
内部采用class.newInstance来创建NioServerSocketChannel的实例;
代码跟踪定位到NioServerSocketChannel的无参数的构造器,该类维护了一个默认的SelectorProvider; 通过provider来开启一个
socketChannel 然后调用有参数的构造器函数;
在AbstractNioChannel构造器的时候调用了ch.configureBlocking(false)
Adjusts this channel's blocking mode.
block - If true then this channel will be placed in blocking mode; if false then it will be placed non-blocking mode
分析三:option(ChannelOption.SO_BACKLOG, 100)
TODO
分析四:handler(new LoggingHandler(LogLevel.INFO))
TODO
分析五:
childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
//timeout idle handler
.addLast("timeout", new IdleStateHandler(hartTime, hartTime, hartTime, TimeUnit.SECONDS))
//send黏包: 发送消息的体较小,客户端将多次发送的请求消息合并一起发送
//receive黏包:接收的数据较小,服务端接收的消息包过小 等待接收的消息达到一定的长度再进行接收
.addLast(new LengthFieldPrepender(4, false))
.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
.addLast(new NettyServiceHandler());
}
});
TODO
分析六: ChannelFuture future = serverBootstrap.bind(bindPort).sync();
bind方法调用了AbstractBootstrap.doBind(localAddress) ; 在该方法内部通过调用initAndRegister()
方法来对channel进行初始化和注册;
初始化:
channel = channelFactory.newChannel(); 通过之前创建的channelFactory工厂来创建channel
init(channel);初始化channel, 通过debug发现会调用ServerBootstrap.init()方法
获取3的options ,对channel设置options
ChannelPipeline p = channel.pipeline(); 获取pipeline对象
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
//获取到4的handler
ChannelHandler handler = config.handler();
if (handler != null) {
//add 到最后
pipeline.addLast(handler);
}
//eventLoop是一个executor 开始执行runnable
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
//ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter
//ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler
//所以ServerBootstrapAcceptor本质是一个handler
//handler内部的有channelRead方法来接收和处理客户端的请求
//currentChildHandler 就是5的注册的handler
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
注册:
ChannelFuture regFuture = config().group().register(channel);
通过debug发现最终会去掉用AbstractChannel.register(eventLoop,promise);
对AbstractChannel类的eventLoop赋值,然后通过eventLoop.execute 来执行register0
register0内部的方法
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//开始注册 AbstractNioChannel方法的register()
//Registers this channel with the given selector, returning a selection key.
//将selector注册到改channel上去,并且返回一个SelectionKey对象
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
//调用之前初始化添加的handler处理器
pipeline.invokeHandlerAddedIfNeeded();
//改变result对象的状态 ,调用sync()方法会导致阻塞
//DefaultPromise.setValue0()通过cas来修改RESULT_UPDATER 来notifyListeners 唤醒futureListener
safeSetSuccess(promise);
//调用heandler 的channelRegistered
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
//channelActive
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}