Netty 原始碼閱讀 —— 服務端建立

Netty 原始碼閱讀 —— 服務端建立

之前專案中用過netty,這次趁著面試空閒時間,重新梳理一遍netty原始碼,從服務端建立開始,話不多說,直接上程式碼

先看看netty服務端建立的整體程式碼,大概如下所示:

public void bind(int port) throws Exception {
EventLoopGroup workGroup = new NioEventLoopGroup();
EventLoopGroup bossGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,24)
.childHandler(new ChildChannelHandler());
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
System.out.println("server initChannel..");
socketChannel.pipeline().addLast(new ServerHandler());
}
}

ok , 我們先來看看new NioEventLoopGroup() 的過程中發生了什麼:

public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (ThreadFactory)null);}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(nThreads, threadFactory, new Object[]{selectorProvider});
}

可以看到首先通過層層呼叫後 最終呼叫了這一個方法,其實最後是呼叫了父類的構造方法,傳入了三個引數,第一個是執行緒數,第二個 threadFactory 為空,第三個是一個SelectorProvider,顧名思義其實就是一個Selector提供類,我們可以看一下這個類的原始碼,果然不出所料,如果看到這裡你還不熟悉的話,建議先去看看java NIO 的知識。

super(nThreads, threadFactory, new Object[]{selectorProvider});
public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<SelectorProvider>() {
                public SelectorProvider run() {
                        if (loadProviderFromProperty())
                            return provider;
                        if (loadProviderAsService())
                            return provider;
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}
public abstract AbstractSelector openSelector()
        throws IOException;
public abstract ServerSocketChannel openServerSocketChannel()
        throws IOException;
    
public abstract SocketChannel openSocketChannel()
        throws IOException;

ok,剛剛說到通過super 關鍵字呼叫父類的構造方法,NioEventLoopGroup 父類是 MultithreadEventLoopGroup ,那我們來看看父類構造方法的實現

private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0?DEFAULT_EVENT_LOOP_THREADS:nThreads, threadFactory, args);
}

發現MultithreadEventLoopGroup這個類又呼叫了它父類的構造方法,ok,層層往上找 ,找到MultithreadEventLoopGroup 的父類 MultithreadEventExecutorGroup,構造方法如下:

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (threadFactory == null) {
threadFactory = newDefaultThreadFactory();
}
children = new SingleThreadEventExecutor[nThreads];
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
for (int i = 0; i < nThreads; i   ) {
boolean success = false;
try {
children[i] = newChild(threadFactory, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j   ) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j   ) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
}

這裡是不是有點熟悉,回顧一下netty 的原理,這段程式碼就是netty reactor 模式的實現。chooser 用來隨機選擇一個child 執行緒執行。children 即為工作執行緒,型別為SingleThreadEventExecutor,我們來看看這個類的原始碼

                                                               

通過這個類的方法我們可以看到,SingleThreadEventExecutor這個類就是具體的task 執行類了。是不是豁然開朗

接著我們繼續看一下客戶端的連線過程

還記得 MultithreadEventExecutorGroup 類構造的時候有一行 這個程式碼嗎? 

children[i] = newChild(threadFactory, args);

我們看一下它發生了什麼

@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception {
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}
public final class NioEventLoop extends SingleThreadEventLoop {
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
provider = selectorProvider;
selector = openSelector();
}

可以看到 這行程式碼建立了一個 一個NioEventLoop , 這個物件裡面 開啟了一個 selector, 客戶端的連線動作基本都是由這個類管理。我們看一下run() 方法:

protected void run() {
for (;;) {
boolean oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select(oldWakenUp);
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
//    'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
//    'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}

這段程式碼邏輯大概是,先判斷一下當前有沒有任務需要執行,如果有任務,則執行任務下發邏輯,我們可以看一下processSelectedKeys() 這個方法

private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i   ) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
for (;;) {
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
i  ;
}
selectAgain();
// Need to flip the optimized selectedKeys to get the right reference to the array
// and reset the index to -1 which will then set to 0 on the for loop
// to start over again.
//
// See https://github.com/netty/netty/issues/1523
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}

我們看一下 processSelectedKey 這個方法做了什麼操作,如下

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

我們可以看到,當註冊物件是 SelectionKey.OP_READ 這個時,執行unsafe.read() 操作,我們看看這個操作幹了什麼事情,我們發現這是一個抽象方法,看一下NioMessageUnsafe 這個子類的實現

@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
if (!config.isAutoRead() && !isReadPending()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
removeReadOp();
return;
}
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
Throwable exception = null;
try {
try {
for (;;) {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// stop reading and remove op
if (!config.isAutoRead()) {
break;
}
if (readBuf.size() >= maxMessagesPerRead) {
break;
}
}
} catch (Throwable t) {
exception = t;
}
setReadPending(false);
int size = readBuf.size();
for (int i = 0; i < size; i   ) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();
if (exception != null) {
if (exception instanceof IOException && !(exception instanceof PortUnreachableException)) {
// ServerChannel should not be closed even on IOException because it can often continue
// accepting incoming connections. (e.g. too many open files)
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
}
pipeline.fireExceptionCaught(exception);
}
if (closed) {
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead() && !isReadPending()) {
removeReadOp();
}
}
}

我們看一下 這行程式碼 int localRead = doReadMessages(readBuf);

protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}

其實就是 接收客戶端連線的邏輯。

final ChannelPipeline pipeline = pipeline(); 這一行程式碼看一下,我們發現,

@Override
public ChannelPipeline pipeline() {
return pipeline;
}
private final ChannelPipeline pipeline;
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
static final NotYetConnectedException NOT_YET_CONNECTED_EXCEPTION = new NotYetConnectedException();
static {
CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
NOT_YET_CONNECTED_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
}
private MessageSizeEstimator.Handle estimatorHandle;
private final Channel parent;
private final long hashCode = ThreadLocalRandom.current().nextLong();
private final Unsafe unsafe;
private final ChannelPipeline pipeline;
private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);
private final VoidChannelPromise voidPromise = new VoidChannelPromise(this, true);
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
private final CloseFuture closeFuture = new CloseFuture(this);
private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress;
private volatile EventLoop eventLoop;
private volatile boolean registered;
/** Cache for the string representation of this channel */
    private boolean strValActive;
private String strVal;

我們發現了 pipeline 這個物件是final ,而且 一個Channel 封裝了 很多全域性物件,這些物件都是全域性不可更改的。看到這個物件,是不是對Netty 的設計有了更多的一些感悟呢

拿到了 pipeline 之後,執行了pipeline.fireChannelRead(readBuf.get(i)); 這一行程式碼,我們深入看一下

@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
if (msg == null) {
throw new NullPointerException("msg");
}
final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(msg);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRead(msg);
}
});
}
return this;
}

然後我們發現,next.invokeChannelRead(msg); 這裡回撥了  channelRead 方法 , 所以你明白為什麼 netty 建立連線之後會回撥channelRead 方法了吧

private void invokeChannelRead(Object msg) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
}

如果你跟我一樣,一直對如何回撥這個邏輯比較好奇,那你可能會有額外發現,我們跟蹤一下這一行程式碼

final AbstractChannelHandlerContext next = findContextInbound();

private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}

我們突然發現了ChannelHandlerContext 的設計,是一個雙向連結串列。也就是說netty是通過一個雙向連結串列來實現通訊過程中上下文管理的。這裡你是不是又想到了linkedlist 呢

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext {
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
private final boolean inbound;
private final boolean outbound;
private final AbstractChannel channel;
private final DefaultChannelPipeline pipeline;
private final String name;
private boolean removed;
// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
final EventExecutor executor;
private ChannelFuture succeededFuture;
// Lazily instantiated tasks used to trigger events to a handler with different executor.
// These needs to be volatile as otherwise an other Thread may see an half initialized instance.
// See the JMM for more details
private volatile Runnable invokeChannelReadCompleteTask;
private volatile Runnable invokeReadTask;
private volatile Runnable invokeChannelWritableStateChangedTask;
private volatile Runnable invokeFlushTask;

ok,回到我們的channelRead 方法,這裡執行的是ServerBootstrapAdapter 的 channelRead 方法,我們可以看到這裡完成了 新增 childHandler ,設定客戶端引數,以及註冊到多路複用器的邏輯。到這裡,整個連線過程清晰無疑。

public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: "   e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: "   child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}