NO IMAGE

Netty原始碼分析:NioEventLoopGroup

在工作之餘,看到自己公司的超哥(俞超)關於Netty的系列博文,講解的很好,因此,自己在學習之餘也跟了下原始碼,來了解Netty,也做了相關的筆記,將形成系列博文,這是第一篇。超哥的博文地址在這裡:http://www.jianshu.com/p/c5068caab217

Netty版本:4.0.23.Final

借用超哥的例子,一般服務端的程式碼如下所示:

    package com.wrh.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Created by wuranghao on 2017/9/4.
*/
public final class SimpleServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new SimpleServerHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private static class SimpleServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded");
}
}
}

為更好的理解,我們將分析main函式中的每一行程式碼背後做了哪些工作。

下面將分析第一、二行程式碼,看下NioEventLoopGroup類的建構函式幹了些什麼。其餘的部分將在其他博文中分析。

    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

NioEventLoopGroup建構函式分析

NioEventLoopGroup的建構函式的程式碼如下

    public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, null);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
super(nThreads, threadFactory, selectorProvider);
} 

NioEventLoopGroup類中的建構函式最終都是呼叫的父類MultithreadEventLoopGroup如下的建構函式:

    protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}

從上面的建構函式可以得到 如果使用EventLoopGroup workerGroup = new NioEventLoopGroup()來建立物件,即不指定執行緒個數,則netty給我們使用預設的執行緒個數,如果指定則用我們指定的執行緒個數。

預設執行緒個數相關的程式碼如下:

    static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}

而SystemPropertyUtil.getInt函式的功能為:得到系統屬性中指定key(這裡:key=”io.netty.eventLoopThreads”)所對應的value,如果獲取不到獲取失敗則返回預設值,這裡的預設值為:cpu的核數的2倍。

結論:如果沒有設定程式啟動引數(或者說沒有指定key=”io.netty.eventLoopThreads”的屬性值),那麼預設情況下執行緒的個數為cpu的核數乘以2。

其中,SystemPropertyUtil.java 類中的getInt方法的程式碼如下:

    public static int getInt(String key, int def) {
String value = get(key);
if (value == null) {
return def;
}
value = value.trim().toLowerCase();
//Pattern INTEGER_PATTERN = Pattern.compile("-?[0-9] ");
if (INTEGER_PATTERN.matcher(value).matches()) {
try {
return Integer.parseInt(value);
} catch (Exception e) {
// Ignore
}
}
log(
"Unable to parse the integer system property '"   key   "':"   value   " - "  
"using the default value: "   def);
return def;
} 

讀原始碼有很多好處,就是可以學習到很多優雅的寫法,例如,在上面的函式中,將String型別轉換為數字之前的正則匹配,我覺得就很優雅。

繼續看,由於由於MultithreadEventLoopGroup的建構函式是呼叫的是其父類MultithreadEventExecutorGroup的建構函式,因此,看下此類的建構函式

    protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (threadFactory == null) {
threadFactory = newDefaultThreadFactory();
}
children = new SingleThreadEventExecutor[nThreads];
//根據執行緒個數是否為2的冪次方,採用不同策略初始化chooser
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
//產生nTreads個NioEventLoop物件儲存在children陣列中
for (int i = 0; i < nThreads; i   ) {
boolean success = false;
try {
children[i] = newChild(threadFactory, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
//如果newChild方法執行失敗,則對前面執行new成功的幾個NioEventLoop進行shutdown處理
if (!success) {
for (int j = 0; j < i; j   ) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j   ) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}

該建構函式幹了如下三件事:

1、產生了一個執行緒工場:threadFactory = newDefaultThreadFactory();

MultithreadEventExecutorGroup.java
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());//getClass()為:NioEventLoopGroup.class
}
DefaultThreadFactory.java    
public DefaultThreadFactory(Class<?> poolType) {
this(poolType, false, Thread.NORM_PRIORITY);
}

2、根據執行緒個數是否為2的冪次方,採用不同策略初始化chooser

判斷一個數是否為2的冪次方就很巧妙,程式碼如下,這些都是我們需要學習的點。

    private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}

3、 產生nTreads個NioEventLoop物件儲存在children陣列中 ,執行緒都是通過呼叫newChild方法來產生的。

上面的newChild函式在NioEventLoopGroup進行來過載,函式功能為:new一個NioEventLoop物件,具體如下:

    @Override
protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception {
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
} 

這裡傳給NioEventLoop建構函式的引數為:NioEventLoopGroup、DefaultThreadFactory、SelectorProvider。

這裡貼下NioEventLoopGroup類的繼承結構圖,如下所示

該圖是由idea工具自動生成的哈,具體操作為:雙擊選中該類後,點選滑鼠右鍵,選擇最後一個diagram選項,選擇show diagram 即可。

NioEventLoop建構函式分析

既然上面提到來new一個NioEventLoop物件,下面我們就看下這個類以及其父類。NioEventLoop繼承結構圖如下:

NioEventLoop這個類的構造方法

    NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
provider = selectorProvider;
selector = openSelector();
}

繼續看父類 SingleThreadEventLoop的建構函式

    protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
super(parent, threadFactory, addTaskWakesUp);
} 

又是直接呼叫來父類SingleThreadEventExecutor的建構函式,繼續看

    protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.parent = parent;
this.addTaskWakesUp = addTaskWakesUp;//false
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
boolean success = false;
updateLastExecutionTime();
try {
//呼叫NioEventLoop類的run方法
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error(
"Buggy "   EventExecutor.class.getSimpleName()   " implementation; "  
SingleThreadEventExecutor.class.getSimpleName()   ".confirmShutdown() must be called "  
"before run() implementation terminates.");
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with "  
"non-empty task queue ("   taskQueue.size()   ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
taskQueue = newTaskQueue();
} 
protected Queue<Runnable> newTaskQueue() {
return new LinkedBlockingQueue<Runnable>();
}

該建構函式程式碼很長哈,比較嚇人,但是主要幹來如下兩件事:

1、利用ThreadFactory建立來一個Thread,傳入了一個Runnable物件,該Runnable重寫的run程式碼比較長,不過重點僅僅是呼叫NioEventLoop類的run方法。其他的異常處理和收尾工作我們這裡先不管。

2、使用LinkedBlockingQueue類初始化taskQueue 。

其中,newThread方法的程式碼如下:

DefaultThreadFactory.java

    @Override
public Thread newThread(Runnable r) {
Thread t = newThread(new DefaultRunnableDecorator(r), prefix   nextId.incrementAndGet());
try {
//判斷是否是守護執行緒,並進行設定
if (t.isDaemon()) {
if (!daemon) {
t.setDaemon(false);
}
} else {
if (daemon) {
t.setDaemon(true);
}
}
//設定其優先順序
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(r, name);
}

FastThreadLocalThread.java

    public FastThreadLocalThread(Runnable target, String name) {
super(target, name);// FastThreadLocalThread extends Thread 
} 

到這裡,可以看到底層還是藉助於類似於Thread thread = new Thread(r)這種方式來建立執行緒。

關於NioEventLoop物件可以得到的點有,初始化了如下4個屬性。

1、NioEventLoopGroup (在父類SingleThreadEventExecutor中)

2、selector

3、provider

4、thread (在父類SingleThreadEventExecutor中)

總結

目前關於NioEventLoopGroup就先瞭解到這裡哈,總結如下

1、 如果不指定執行緒數,則執行緒數為:CPU的核數*2

2、根據執行緒個數是否為2的冪次方,採用不同策略初始化chooser

3、產生nThreads個NioEventLoop物件儲存在children陣列中。

可以理解NioEventLoop就是一個執行緒,執行緒NioEventLoop中裡面有如下幾個屬性:

1、NioEventLoopGroup (在父類SingleThreadEventExecutor中)

2、selector

3、provider

4、thread (在父類SingleThreadEventExecutor中)

更通俗點就是:NioEventLoopGroup就是一個執行緒池,NioEventLoop就是一個執行緒。NioEventLoopGroup執行緒池中有N個NioEventLoop執行緒。