[小專案]-netty實現聊天功能

Netty是一個Java的NIO客戶端服務端框架可以快速的開發網路應用程式,比如客戶端和服務端的協議,大大簡化了網路程式的開發過程。我們知道Netty的整體架構主要由3部分組成:緩衝(buffer)、通道(channel)、事件模型(event model)。所有的高階也行都構建於這三個元件之上。下面我們基於這個架構實現一個簡單的網路聊天功能。

1.環境:

JDK 7
Maven3
Netty 4.1
IDEA14

2.服務端

服務端的handler

netty的所有IO處理都是基於事件驅動的,所以對於服務端我們先從服務端的Handler開始:
這裡我新建了SimpleChatServerHandler類,讓他繼承於SimpleChannelInboundHandler。並重寫父類的一些方法,原始碼如下:

package netty.cookbook.simplechat;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
/**
* Created by louyuting on 16/12/8.
* 服務端處理IO
*/
public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String>{
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 每當服務端收到新的客戶端連線時,客戶端的channel存入ChannelGroup列表中,並通知列表中其他客戶端channel
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//獲取連線的channel
Channel incomming = ctx.channel();
//通知所有已經連線到伺服器的客戶端,有一個新的通道加入
for(Channel channel:channels){
channel.writeAndFlush("[SERVER]-" incomming.remoteAddress() "加入\n");
}
channels.add(ctx.channel());
}
/**
*每當服務端斷開客戶端連線時,客戶端的channel從ChannelGroup中移除,並通知列表中其他客戶端channel
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
//獲取連線的channel
Channel incomming = ctx.channel();
for(Channel channel:channels){
channel.writeAndFlush("[SERVER]-" incomming.remoteAddress() "離開\n");
}
//從服務端的channelGroup中移除當前離開的客戶端
channels.remove(ctx.channel());
}
/**
* 每當從服務端讀到客戶端寫入資訊時,將資訊轉發給其他客戶端的Channel.
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Channel incomming = ctx.channel();
//將收到的資訊轉發給全部的客戶端channel
for(Channel channel:channels){
if(channel != incomming) {
channel.writeAndFlush("["   incomming.remoteAddress()   "]"   msg   "\n");
}else{
channel.writeAndFlush("[You]" msg "\n");
}
}
}
/**
* 服務端監聽到客戶端活動
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//服務端接收到客戶端上線通知
Channel incoming = ctx.channel();
System.out.println("SimpleChatClient:"   incoming.remoteAddress() "線上");
}
/**
* 服務端監聽到客戶端不活動
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//服務端接收到客戶端掉線通知
Channel incoming = ctx.channel();
System.out.println("SimpleChatClient:"   incoming.remoteAddress() "掉線");
}
/**
* 當服務端的IO 丟擲異常時被呼叫
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//super.exceptionCaught(ctx, cause);
Channel incoming = ctx.channel();
System.out.println("SimpleChatClient:"   incoming.remoteAddress() "異常");
//異常出現就關閉連線
cause.printStackTrace();
ctx.close();
}
}

對上面的handler做如下說明:
1、SimpleChatServerHandler繼承自SimpleChannelInboundHandler,這個類實現了ChannelInboundHandler介面,ChannelInboundHandler提供了很多事件處理的介面方法,我們僅僅需要繼承SimpleChannelInboundHandler並重寫這些方法。

2、覆蓋了父類的handlerAdded(ChannelHandlerContext ctx)事件處理方法,每當從服務端收到新的客戶端連線時,客戶端的Channel存入ChannelGroup列表中,並通知列表中的其他客戶端。在這個方法中我獲取到了新連線的channel,並通知所有已經連線到伺服器的channel有一個新的客戶端連線進來(注意這裡的通知不會在伺服器端顯示),然後把新連線的客戶端channel新增到服務端的channelGroup。

3、覆蓋了handlerRemoved()事件處理方法。每當從服務端收到客戶端斷開時,客戶端的Channel從ChannelGroup列表中移除,並通知列表中的其他客戶端。這個方法的實現和handlerAdded()方法完全相反,它通知所有已經連線到伺服器的channel有一個客戶端從伺服器斷開(注意這裡的通知不會在伺服器端顯示),然後把這個客戶端channel從服務端的channelGroup中移除。

4、覆蓋了 channelRead0() 事件處理方法。每當從服務端讀到客戶端寫入資訊時,將資訊轉發給其他所有的客戶端的Channel。

5、覆蓋了 channelActive() 事件處理方法。服務端監聽到客戶端正在活動時呼叫(線上)。

6、覆蓋了 channelInactive() 事件處理方法。服務端監聽到客戶端不活動是呼叫(離線).

7、exceptionCaught() 事件處理方法是:當出現 Throwable 物件才會被呼叫,即當 Netty 由於 IO 錯誤或者處理器在處理事件時丟擲的異常時出現。在大部分情況下,捕獲的異常應該被記錄下來並且把關聯的 channel 給關閉掉。然而這個方法的處理方式會在遇到不同異常的情況下有不同的實現,比如你可能想在關閉連線之前傳送一個錯誤碼的響應訊息。

所以上面的handler中函式一個比較常規執行順序是:

  1. handlerAdded()
  2. channelActive()
  3. channelRead0()
  4. channelInactive()
  5. handlerRemoved()

服務端的handler容器ServerInitializer

SimpleChatServerInitializer 用來增加多個的handler處理類到ChannelPipeline上,ChannelPipeline簡單理解就可以看成是一個handler容器,包括編碼、解碼、SimpleChatServerHandler等。我實現的原始碼如下:

package netty.cookbook.simplechat;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* Created by louyuting on 16/12/8.
* 用來增加多個的處理類到ChannelPipeline上:包括編碼,解碼,SimpleChatServerHandler
*/
public class SimpleChatServerInitializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new SimpleChatServerHandler());
System.out.println("SimpleChatClient:"   ch.remoteAddress() "連線上伺服器");
}
}

啟動伺服器

最後來編寫一個main方法來啟動伺服器:原始碼如下:

package netty.cookbook.simplechat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import utils.LogUtil;
/**
* Created by louyuting on 16/12/8.
* 啟動服務端
*/
public class SimpleChatServer {
private int port;
public SimpleChatServer(int port){
this.port = port;
}
public void run() throws Exception{
//NioEventLoopGroup是用來處理IO操作的多執行緒事件迴圈器
//boss用來接收進來的連線
EventLoopGroup bossGroup = new NioEventLoopGroup();
//用來處理已經被接收的連線;
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
//是一個啟動NIO服務的輔助啟動類
ServerBootstrap sBootstrap = new ServerBootstrap();
//These EventLoopGroup's are used to handle all the events and IO for ServerChannel and Channel's.
//為bootstrap設定acceptor的EventLoopGroup和client的EventLoopGroup
//這些EventLoopGroups用於處理所有的IO事件
//?這裡為什麼設定兩個group呢?
sBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new SimpleChatServerInitializer())
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
LogUtil.log_debug("SimpleChatServer 啟動了");
//繫結埠,開始接收進來的連線
ChannelFuture future = sBootstrap.bind(port).sync();
//等待伺服器socket關閉
//在本例子中不會發生,這時可以關閉伺服器了
future.channel().closeFuture().sync();
} finally {
//
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
LogUtil.log_debug("SimpleChatServer 關閉了");
}
}
public static void main(String[] args) throws Exception {
new SimpleChatServer(8080).run();
}
}

啟動伺服器端的程式碼基本都是一個套路:
1、首先定義NioEventLoopGroup,這個NioEventLoopGroup是用來處理IO操作的多執行緒事件迴圈器。在這個服務端應用中,我建立了兩個NioEventLoopGroup,一個稱謂boss,一個稱為worker。這裡的這兩個NioEventLoopGroup是有明確的分工任務的,boss用來接收進來的連線、用來處理已經被接收的連線。一旦boss接收到連線,就會把連線資訊註冊到worker上面,然後worker處理連線。這裡通過把接收請求和處理連線解耦,大大增強了服務端接收請求和處理連線的能力。

2、ServerBootstrap是一個啟動NIO服務的輔助啟動類。在這個類上我們需要配置伺服器的各種資訊,配置事件迴圈器、配置通道型別(NioServerSocketChannel)、新增childHandler、設定通道的可選引數等等。

3、剩下的就是繫結埠然後啟動服務。這裡我們在機器上繫結了機器所有網絡卡上的 8080 埠。當然 現在你可以多次呼叫 bind() 方法(基於不同繫結地址)。

至此、我們已經完成了基於Netty的聊天服務端的程式。

3. 客戶端

客戶端的實現程式碼其實基本上都和服務端差別不大。

客戶端的handler

客戶端的這個handler比較簡單,只需要列印出其餘客戶端傳送的資訊就行了。

package netty.cookbook.simplechat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import utils.LogUtil;
/**
* Created by louyuting on 16/12/8.
* 客戶端處理IO,只需要將讀到的資訊列印出來就OK了
*/
public class SimpleChatClientHandler extends SimpleChannelInboundHandler<String>{
/**
* 每當從服務端讀到客戶端寫入資訊時,將資訊轉發給其他客戶端的Channel.
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
LogUtil.log_debug(msg);
}
}

客戶端的ChannelInitializer

與服務端類似:

package netty.cookbook.simplechat;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* Created by louyuting on 16/12/8.
* 用來增加多個的處理類到ChannelPipeline上:包括編碼,解碼,SimpleChatServerHandler
*/
public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new SimpleChatClientHandler());
}
}

客戶端啟動程式

編寫main啟動客戶端

package netty.cookbook.simplechat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.BufferedReader;
import java.io.InputStreamReader;
/**
* Created by louyuting on 16/12/8.
* 啟動服務端
*/
public class SimpleChatClient {
private final int port;
private final String host;
public SimpleChatClient(String host, int port){
this.host = host;
this.port = port;
}
public void run() throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try{
//是一個啟動NIO服務的輔助啟動類
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new SimpleChatClientInitializer());
Channel channel = bootstrap.connect(host, port).sync().channel();
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
while (true){
channel.writeAndFlush(in.readLine() "\r\n");
}
} catch (Exception e){
e.printStackTrace();
} finally{
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new SimpleChatClient("localhost",8080).run();
}
}

啟動之後執行:

首先啟動服務端,然後再啟動兩個客戶端。執行之後截圖如下:
服務端:
服務端

客戶端1
客戶端1

客戶端2
客戶端2

所有原始碼的Github地址
https://github.com/leetcode-hust/leetcode/tree/master/louyuting/src/netty/cookbook/simplechat