Netty之 netty原始碼學習之netty server端原始碼初讀(下)

Netty之 netty原始碼學習之netty server端原始碼初讀(下)

上一篇簡單的分析了一下NioEventLoop,ServerBootstrap等元件的建立過程做的一些操作

現在我們一起看下當SingleThreadEventExecutor.java中的thread啟動後,netty做的一些最最重要的一些操作

我們接著昨天的程式碼看,昨天分析到register之後,呼叫這段程式碼:

[java] view
plain
 copy

 在CODE上檢視程式碼片派生到我的程式碼片

  1. eventLoop.execute(new OneTimeTask() {  
  2.                         @Override  
  3.                         public void run() {  
  4.                             register0(promise);  
  5.                         }  
  6. });  

接著下去就是SingleThreadEventExecutor.java execute方法:

[java] view
plain
 copy

 在CODE上檢視程式碼片派生到我的程式碼片

  1. @Override  
  2.     public void execute(Runnable task) {  
  3.         if (task == null) {  
  4.             throw new NullPointerException(“task”);  
  5.         }  
  6.   
  7.         boolean inEventLoop = inEventLoop();  
  8.         if (inEventLoop) {  
  9.             addTask(task);  
  10.         } else {  
  11.             startThread();  
  12.             addTask(task);  
  13.             if (isShutdown() && removeTask(task)) {  
  14.                 reject();  
  15.             }  
  16.         }  
  17.   
  18.         if (!addTaskWakesUp && wakesUpForTask(task)) {  
  19.             wakeup(inEventLoop);  
  20.         }  
  21.     }  

startThread這個方法就是呼叫thread.start的方法

最後就會呼叫我們上篇講述的片段二的程式碼SingleThreadEventExecutor.this.run():

最後追述到NioEventLoop.java中的run方法:

[java] view
plain
 copy

 在CODE上檢視程式碼片派生到我的程式碼片

  1. @Override  
  2.     protected void run() {  
  3.         for (;;) {  
  4.             oldWakenUp = wakenUp.getAndSet(false);  
  5.             try {  
  6.                 if (hasTasks()) {  
  7.                     selectNow();  
  8.                 } else {  
  9.                     select();  
  10.                     if (wakenUp.get()) {  
  11.                         selector.wakeup();  
  12.                     }  
  13.                 }  
  14.   
  15.                 cancelledKeys = 0;  
  16.                 needsToSelectAgain = false;  
  17.                 final int ioRatio = this.ioRatio;  
  18.                 if (ioRatio == 100) {  
  19.                     processSelectedKeys();  
  20.                     runAllTasks();  
  21.                 } else {  
  22.                     final long ioStartTime = System.nanoTime();  
  23.   
  24.                     processSelectedKeys();  
  25.   
  26.                     final long ioTime = System.nanoTime() – ioStartTime;  
  27.                     runAllTasks(ioTime * (100 – ioRatio) / ioRatio);  
  28.                 }  
  29.   
  30.                 if (isShuttingDown()) {  
  31.                     closeAll();  
  32.                     if (confirmShutdown()) {  
  33.                         break;  
  34.                     }  
  35.                 }  
  36.             } catch (Throwable t) {  
  37.                 logger.warn(“Unexpected exception in the selector loop.”, t);  
  38.   
  39.                 // Prevent possible consecutive immediate failures that lead to  
  40.                 // excessive CPU consumption.  
  41.                 try {  
  42.                     Thread.sleep(1000);  
  43.                 } catch (InterruptedException e) {  
  44.                     // Ignore.  
  45.                 }  
  46.             }  
  47.         }  
  48.     }  

首先這個方法是無限自迴圈,沒有shutdown之前是不會停止的,講述這個方法之前,先再理清一些概念,首先是這個run方法的執行執行緒是thread,也就是之前講述nioEventLoopGroup時,說netty為每一個NioEventLoop繫結了一個自己的執行緒,它去執行佇列和延遲佇列中的task任務,除了執行這兩種任務之外,還執行一些IO操作,何為IO操作,例如OP_ACCEPT、OP_CONNECT、OP_READ、OP_WRITE,相反那些佇列中的task則為非IO操作,例如bind,register,channelRead,channelActive等等

好的,理清這個大前提之後,我們接著看這個方法

先進行判斷,檢視hasTask方法,這個方法主要檢視佇列裡面是否有未執行的task,如果有任務,就呼叫selectNow方法,立即返回,不阻塞,這邊很好理解啊,如果佇列中有任務當然立即返回,否則一直阻塞在那邊,你等別人請你吃午飯啊,如果hasTask方法返回false的時候,說明佇列中沒有任務,則呼叫select()方法,這邊需要解釋一下,如果阻塞呼叫的時候,就不怕在阻塞的期間佇列中有新的task嗎,這邊下面的分析的時候會做解釋,先放在這邊,我們接著把這個run方法分析完整

接下來我們會根據我們配置的ioRatio的值,分配IO和非IO任務執行的時間比例,預設情況下,ioRatio為50,走else分支,processSelectedKeys這是是執行IO操作,runAllTasks是執行佇列任務,整個run方法就是做這麼多事情,去檢視自己獨自持有的selector上不停的select或者selectNow,去檢視是否有IO事件發生,與此同時,如果有佇列任務,則順便執行了

就比如我們上一篇做的那個channel註冊selector的那個register方法一樣,就是呼叫的 runAllTasks(ioTime * (100 – ioRatio) / ioRatio)的方法,所以繫結channel就是一個非IO操作

到此為止,我們簡單的一起分析了run方法的主要流程,我們還沒有對processSelectedKeys()這個處理IO的方法進行分析,並且還有一個疑問就是我們剛才說明的那個問題,為什麼當前時刻沒有task就可以呼叫阻塞方法select方法而不當心阻塞的過程中有task加入到佇列的問題了

接下來,我們將啟動一個client來連線server端,通過連線,我們真實場景下進行debug,簡單地分析一下Netty的啟動過程,首先在debug之前,我們先說明一下,如果Client端連線Server併傳送一個資訊的話,我們需要注意的事情

1)因為Netty的Boss/worker的程式碼是複用的,我們在debug的時候,需要注意上下文,要區分此時的執行緒是boss的thread還是worker的thread

2)client端連線server端併傳送資訊,這會觸發一些什麼事件,我們也需要注意,上文講述過,任務分兩種IO任務和非IO任務(我們站在Server端的角度)

IO任務:OP_ACCEPT,OP_READ

非IO任務:channelActive和channelRead方法

當client啟動後,我們進入在server端進入debug模式

進入debug模式後,我們聚焦NioEventLoop的run方法

processSelectedKeys方法是處理在selector上的IO事件,進入processSelectedKeys方法接著看

可以看到在select方法操作之後,seletedKeys上的確有事件發生,如上圖所示,接著debug:

因為我們目前debug的是boss執行緒繫結的NioEventLoop上的thread 執行的run方法,而boss執行緒的selector繫結的channel是NioServerSocketChannel,也就是我們上一篇文章介紹的那個register方法,因為繫結的selector上有事件發生(OP_ACCEPTOR事件)接著看processSelectedKeysOptimized這個方法中的processSelectedKey子方法:

我們知道NioServerSocketChannel對應的unsafe物件是NioMessageUnsafe物件,並且這邊k.readyOps()的值是16也就是SelectionKey.OP_ACCEPT的值,所以我們需要處理客戶端的連線操作,processSelectedKey的方法使用unsafe.read()的方法進行處理,也就是這邊的NioMessageUnsafe來處理,我們進入NioMessageUnsafe的read方法

在這個方法中,我們會獲取到NioSeverSocketChannel上的管道,我們上文說過,netty在管道中偷偷的塞入了一個ServerBootstrapAcceptor,看這個類的名字就知道,是用來處理ServerBootstrap接收其他連結用的一個處理器,獲取到ServerBootstrapAcceptor之後,我們接著看doReadMessages這個方法

可以看到此時才進行accpt操作(不知道我理解的對不對,BACKLOG的值就是建立好的連線和為建立好的連線之和,建立好的連線和未建立好的連結的區分點就是是否accept),言歸正傳,下面將建立好的Channel封裝成NioSocketChannel,這邊要注意的是每一個channel都有一個Unsafe物件,我們可以看新建的NioSocketChannel的Unsafe物件與NioServerSocketChannel的NioMessageUnsafe物件有何不同,我們進入new NioSocketChannel(this,ch)的方法內部,最後debug到:

可以看到NioSocketChannel對應的NioByteUnsafe,同時也沒有在管道中偷偷塞入類似ServerBootstrapAcceptor這樣的處理器

可以看到defaultChannelPipeline目前還是空的,接著回來看doReadMessages方法,因為這個方法是在無限for迴圈中的,當下次accept沒有獲取到新的channel的時候,就會退出迴圈,接著看

pipeline就是我們剛才說的ServerBootstrapAcceptor這個物件,我們看看ServerBootstrapAcceptor對新的NioSocketChannel做了什麼,其實想想我們應該知道做什麼,我們已經獲取到了新的channel,我們只要讓其繫結8個worker中的任意worker,然後讓channel繫結selector,再順便啟動一下worker的thread,boss執行緒要乾的事情就算做完了,而且我們在分析ServerBootstrapAcceptor的時候,我們應該想起,ServerBootstrapAcceptor的建構函式裡面就有workGroup,這樣一想感覺順理成章,我們接著看

debug下看的確最後呼叫的是ServerBootstrapAcceptor這個handler。接著看

對channel做了一些屬性的配置之後,果然走到了最後一步register,其實上一節我們分析了register的方法的。我們這邊再一起分析一下,溫故而知新嘛

因為8個worker要均勻的分配,所以我們會看到最後呼叫的next方法是:

最後也就是呼叫了register方法,與上文沒有差別,且順便借用register之手,啟動了其中一個worker的執行緒

好了,到此為止,boss執行緒的NioEventLoop分析就到此為止了,我們接著看worker執行緒的NioEventLoop的run方法

可以看到此時的AbstractNioChannel已經變成了我們剛才封裝初始化繫結的NioSocketChannel,此時為啥會觸發這個方法呢,因為客戶端傳送了一個資訊到server端了,

這邊的readyOps的值是1,也就是OP_READ,unsafe物件也是我們剛才說的NioByteUnsafe

pipeline中的也是我們自定義的handler

好了,server端的啟動流程基本分析結束了,boss的執行緒,worker的啟動,都簡單的說明了一些,關於如何處理client端的資訊,我們下次在一起分析一下~

關於那個為什麼佇列中沒有任務的時候,就可以阻塞操作select的原因始因為我覺得(不一定對)絕大多數的非IO操作也是因為IO事件產生的,也就是說如果當前時刻沒有

IO事件的時候,Netty不會無腦會產生一些非IO的任務的,例如channelActive,bind等等的操作

我們總結一下,Netty的server端流程其實是很清晰的

1)先初始化好boss和worker的NioEventLoopGroup,並初始化好Group中的每一個NioEventLoop,為每一個NioEventLoop都穿件一個selector物件

2)Netty會在bind的動作上,去讓boss的NioserverSocketChannel去繫結selector,並啟動與boss捆綁在一起的thread,進入無盡的迴圈

3)在這個生命不息,迴圈不止的方法中,主要做了兩件事情,1是去select,不管是selectNow()方法還是select()方法,其主要目的就是去檢視boss關注的selector是否有事件發生

4)當有事件發生的時候,一般就是因為有client連結,如果有連結,boss執行緒就需要做的事情就是初始化封裝好新來的SocketChannel

5)封裝好的NioSocketChannel也會有自己的Unsafe物件,這個物件是用來做一些其他的操作的,例如讀操作,這與boss的Unsafe物件不一樣,boss的Unsafe物件是NioMessageUnsafe是用來進行繫結channel

6)NioSocketChannel用Boss執行緒管道中的ServerBootstrapAcceptor物件繫結確定屬於自己的worker執行緒之後,繫結好worker執行緒的selector之後就開始呼叫自己的run方法

用來監聽selector上的IO事件

7)要說明白的一點就是一個worker處理的channel在多連結的場景下,一個worker會處理不止一個SocketChannel