netty5.0之SingleThreadEventLoop & NioEventLoop

netty5.0之SingleThreadEventLoop & NioEventLoop

SingleThreadEventLoop繼承自SingleThreadEventExecutor這是一個標準的執行緒池的實現。和JDK中執行緒池的實現大同小異。主要的用處就是執行任務。 NioEventLoop繼承自SingleThreadEventLoop,作為NIO框架的Reactor執行緒,需要處理網路IO讀寫事件,因此他必須聚合一個多路複用器。

參考《Netty權威指南》李林鋒;http://xw-z1985.iteye.com/blog/1928244

類關係如下圖:

本文就著手分析NioEventLoop實現的執行緒執行邏輯

@Override
protected void run() {
for (;;) {
oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select();
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
//    'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
//    'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
final long ioStartTime = System.nanoTime();
needsToSelectAgain = false;
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
final long ioTime = System.nanoTime() - ioStartTime;
final int ioRatio = this.ioRatio;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}

1、所有的邏輯操作都在for迴圈體內進行,只有當NioEventLoop接到退出指令的時候,才退出迴圈,這也是通用的處理NIO訊息的執行緒實現方式。

2、首先將wakenUp還原為false,並將之前的wake up狀態儲存到oldWakenUp變數中。通過hasTasks()方法判斷點前的訊息佇列中是否有訊息尚未處理,如果有則呼叫selectNow()方法進行一次select操作,看是否有準備就緒的Channel需要處理。它的實現如下:

void selectNow() throws IOException {
try {
selector.selectNow();
} finally {
// restore wakup state if needed
if (wakenUp.get()) {
selector.wakeup();
}
}
}

selector的selectNow()方法會立即觸發Selector的選擇操作,如果有準備就緒的channel,則返回就緒的channel集合,否則返回0,。選擇完成後,再次判斷使用者是否呼叫了Selector的wakeup方法,如果呼叫,則執行selector.wakeup()操作。

關於wakeup(若不清楚,參考http://ifeve.com/selectors/#wakeUp):1)某個執行緒呼叫select()方法後阻塞了,即使沒有通道已經就緒,也有辦法讓其從select()方法返回。只要讓其它執行緒在第一個執行緒呼叫select()方法的那個物件上呼叫Selector.wakeup()方法即可。阻塞在select()方法上的執行緒會立馬返回。2)如果有其它執行緒呼叫了wakeup()方法,但當前沒有執行緒阻塞在select()方法上,下個呼叫select()方法的執行緒會立即“醒來(wake
up)”。

3、返回方法,分析select方法,由Selector多路複用器輪詢。

private void select() throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos   delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos   500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt   ;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) {
// Selected something,
// waken up by user, or
// the task queue has a pending task.
break;
}
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding selector.",
selectCnt);
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = System.nanoTime();
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName()   " raised by a Selector - JDK bug?", e);
}
// Harmless exception - log anyway
}
}

1)取當前系統的納秒時間,呼叫delayNanos()方法獲得NioEventLoop中定時任務的觸發時間

2)計算下一個將要觸發的定時任務的剩餘超時時間,將它轉換成毫秒,為超時時間增加0.5毫秒的調整值。對剩餘的超時時間進行判斷,如果需要立即執行或已經超時,則呼叫selector.selectNow()進行輪詢操作,將selectCnt設定為1,並退出當前迴圈。

3)將定時任務剩餘的超時時間作為引數進行select操作,沒完成一次select操作,對select計數器selectCnt加1。

4)Select操作完成之後,需要對結果進行判斷,如果存在下列任意一種情況,則退出當前迴圈。

        A:有Channel處於就緒狀態,selectKeys不為0,說明有讀寫事件需要處理;

        B:oldWakenUp為true;

        C:系統或使用者呼叫了wakeup操作,喚醒當前的多路複用器;

5)如果本次Selector的輪詢結果為空,也沒有wakeup操作或是新的訊息需要處理,則說明是個空輪詢,有可能觸發了JDK的epll bug,他會導致Selector的空輪詢,使I/O執行緒一致處於100%狀態,介質到當前最新的JDK7版本,該bug仍然沒有被完全修復。所以Netty需要對該bug進行規避和修正。

Bug-id = 6403933的Selector堆疊如圖:見《Netty權威指南 P433》李林鋒 著

該bug的修復策略如下:

(1) 對Selector的select操作週期進行統計;

(2) 每完成一次空的select操作進行一次計數;

(3) 在某個週期(例如100ms)內如果連續發生N次空輪詢,說明出發了JDK NIO的epoll()死迴圈bug。

6)監測到Selector處於死迴圈後,需要通過重建Selector的方式讓系統恢復正常,見rebuildSelector()方法

7)如果輪詢到了處於就緒狀態的SocketChannel,則需要處理網路I/O事件。

見run()方法

  final long ioStartTime = System.nanoTime();
needsToSelectAgain = false;
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
final long ioTime = System.nanoTime() - ioStartTime;

由於未開啟selectedKeys優化功能,所以會進入processSelectedKeysPlain分支執行。下面繼續分析processSelectedKeysPlain的程式碼實現如下:

 private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}

對SelectionKey進行保護性判斷,如果為空則返回。獲取SelectionKey的迭代器進行迴圈操作,通過迭代器獲取SelectionKey和SocketChannel的附件物件,將已選擇的選擇鍵從迭代器中刪除,防止下次被重複選擇和處理(不解)。如下

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}

對SocketChannel的附件型別進行判讀,如果是AbstractNioChannel型別,說明它是NioServerSockdtChannel或者NioSocketChannel,需要進行I/O讀寫相關的操作;如果它是NioTask,則對其進行型別轉換,呼叫processSelectedKey進行處理。由於Netty自身沒實現NioTask介面,所以通常情況下系統不會執行該分支,除非使用者自行註冊到該Task到多路複用器。

8)分析I/O時間處理,程式碼如下:

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {//<span style="font-size: 15.5555562973022px; line-height: 25.2000007629395px; text-indent: 36px;">首先從NioServerSocketChannel或者NioSocketChannel中獲取內部類Unsafe,判斷當前選擇鍵是否可//用,如果不可用,則呼叫Unsafe的close方法,釋放連線資源。</span>
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop<span style="font-size: 15.5555562973022px; line-height: 25.2000007629395px; text-indent: 36px;">如果選擇鍵可用,則繼續對網路操作位進行判斷,ruguo</span>
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();//如果是讀或者連線操作,呼叫Unsafe的read方法,Unsafe此處實現時多型
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {//如果操作位為寫,需要呼叫flush,處理半包訊息傳送
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {//如果網路操作位為連線狀態,對連線結果進行判讀
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
} catch (CancelledKeyException e) {
unsafe.close(unsafe.voidPromise());
}
}

此處Unsafe實現時多型。對於NioServerSocketChannel,它的讀操作就是接收客戶端的TCP連線;對於NioSocketChannel,它的對操作就是從SocketChannel讀取ByteBuffer。

9) 處理完I/O事件之後,NioEventLoop需要執行非I/O操作的系統Task和定時任務

見run()方法

 final int ioRatio = this.ioRatio;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

由於NioEventLoop需要同時處理I/O事件和非I/O任務,為了保證兩者都能得到足夠的CPU執行時間被執行,Netty提供了I/O比例供使用者定製。如果I/O操作多餘定時任務和Task,則可以講I/O比例調大,反之則調小,預設值為50%.

Task的執行時間根據本次I/O操作的執行時間計算得來。下面我們具體看runAllTasks方法的實現

 /**
* Poll all tasks from the task queue and run them via {@link Runnable#run()} method.  This method stops running
* the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
*/
protected boolean runAllTasks(long timeoutNanos) {
fetchFromDelayedQueue();
Runnable task = pollTask();
if (task == null) {
return false;
}
final long deadline = ScheduledFutureTask.nanoTime()   timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}
runTasks   ;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
this.lastExecutionTime = lastExecutionTime;
return true;
}

首先從定時任務訊息佇列中彈出訊息進行處理,如果訊息佇列為空,則退出迴圈。根據當前的時間戳進行判斷,如果該定時任務已經或者處於超時狀態,則將其加入到執行Task Queue中,同時從延時佇列中刪除。定時任務如果沒有潮溼,說明本輪迴圈不需要處理,直接退出即可。程式碼實現如下:

private void fetchFromDelayedQueue() {
long nanoTime = 0L;
for (;;) {
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
if (delayedTask == null) {
break;
}
if (nanoTime == 0L) {
nanoTime = ScheduledFutureTask.nanoTime();
}
if (delayedTask.deadlineNanos() <= nanoTime) {
delayedTaskQueue.remove();
taskQueue.add(delayedTask);
} else {
break;
}
}
}

如果不退出,執行Task Queue中原有的任務和從延時佇列中複製的已經超時或者正處於超時狀態的定時任務。

由於獲取系統納秒時間是個耗時的操作,每次迴圈都獲取當前系統納秒時間進行超時判斷會降低效能。為了提升效能,每執行60次迴圈判斷一次,如果當前系統時間已經達到了分配給非I/O操作的超時時間,則退出迴圈。這是為了防止非I/O任務過多導致I/O操作被長時間阻塞。

10) 最後判斷系統是否進入優雅停機狀態

如果處於關閉狀態,則需要呼叫closeAll方法,釋放資源,並讓NioEventLoop執行緒退出迴圈,結束執行。資源關閉的程式碼如下

private void closeAll() {
selectAgain();
Set<SelectionKey> keys = selector.keys();
Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
for (SelectionKey k: keys) {
Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
channels.add((AbstractNioChannel) a);
} else {
k.cancel();
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, k, null);
}
}
for (AbstractNioChannel ch: channels) {
ch.unsafe().close(ch.unsafe().voidPromise());
}
}

遍歷獲取所有的Channel,呼叫它的Unsafe.close()方法關閉所有鏈路,釋放執行緒池、ChannelPipelien和ChannelHandler等資源。