Netty網路聊天室之基礎閘道器搭建

最近在學習Netty框架,使用的學習教材是李林鋒著的《Netty權威指南》。國內關於netty的書籍幾乎沒有,這本書算是比較好的入門資源了。

我始終覺得,學習一個新的框架,除了研究框架的原始碼之外,還應該使用該框架開發一個實際的小應用。為此,我選擇Netty作為通訊框架,開發一個模仿QQ的聊天室。

基本框架是這樣設計的,使用Netty作為通訊閘道器,使用JavaFX開發客戶端介面,使用Spring作為IOC容器,使用MyBatics支援持久化。本文將著重介紹Netty閘道器的私有協議棧開發。

Netty服務端程式示例

啟動Reactor執行緒組監聽客戶端鏈路的連線與IO網路讀寫。

public class ChatServer {
private Logger logger = LoggerFactory.getLogger(ChatServer.class);
//避免使用預設執行緒數引數
private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private	EventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());
public void bind(int port) throws Exception {
logger.info("服務端已啟動,正在監聽使用者的請求......");
try{
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());
ChannelFuture f = b.bind(new InetSocketAddress(port))
.sync();
f.channel().closeFuture().sync();
}catch(Exception e){
logger.error("", e);
throw e;
}finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public void close() {
try{
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}catch(Exception e){
logger.error("", e);
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
ChannelPipeline pipeline = arg0.pipeline();
pipeline.addLast(new PacketDecoder(1024*4,0,4,0,4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new PacketEncoder());
//客戶端300秒沒收發包,便會觸發UserEventTriggered事件到MessageTransportHandler
pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, 300));
pipeline.addLast(new IoHandler());
}
}
}

通訊私有協議棧的設計

私有協議棧主要用於跨程序的資料通訊,只能用於企業內部,協議設計比較靈巧方便。

在這裡,訊息定義將訊息頭和訊息體融為一體。將訊息的第一個short資料視為訊息的型別,服務端將根據訊息型別處理不同的業務邏輯。定義Packet抽象類,抽象方法

 readFromBuff(ByteBuf buf) 和  writePacketMsg(ByteBuf buf) 作為讀寫資料的抽象行為,而具體的讀寫方式由相應的子類去實現。程式碼如下:

package com.kingston.net;
import io.netty.buffer.ByteBuf;
import java.io.UnsupportedEncodingException;
public abstract  class Packet {
//	protected String userId;
public void writeToBuff(ByteBuf buf){
buf.writeShort(getPacketType().getType());
writePacketMsg(buf);
}
abstract public void  writePacketMsg(ByteBuf buf);
abstract public void  readFromBuff(ByteBuf buf);
abstract public PacketType  getPacketType();
abstract public void execPacket();
protected  String readUTF8(ByteBuf buf){
int strSize = buf.readInt();
byte[] content = new byte[strSize];
buf.readBytes(content);
try {
return new String(content,"UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return "";
}
}
protected  void writeUTF8(ByteBuf buf,String msg){
byte[] content ;
try {
content = msg.getBytes("UTF-8");
buf.writeInt(content.length);
buf.writeBytes(content);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}

需要注意的是,由於Netty通訊本質上傳送的是byte資料,無法直接傳送String欄位串,需要先經過簡單的編解碼成位元組陣列才能傳送。

POJO物件的編碼與解碼

資料傳送方傳送載體為ByteBuf,因此在發包時,需要將POJO物件進行編碼。本專案使用Netty自帶的編碼器MessageToByteEncoder,實現自定義的編碼方式。程式碼如下:

package com.kingston.net;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class PacketEncoder extends MessageToByteEncoder<Packet> {
@Override
protected void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out)
throws Exception {
msg.writeToBuff(out);
}
}

接收方實際接收ByteBuf資料,需要將其解碼成對應的POJO物件,才能處理對應的邏輯。本專案使用Netty自帶的解碼器ByteToMessageDecoder(LengthFieldBasedFrameDecoder繼承自ByteToMessageDecoder,其作用見下文),實現自定義的解碼方式。程式碼如下:

package com.kingston.net.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import com.kingston.net.Packet;
import com.kingston.net.PacketManager;
public class PacketDecoder extends LengthFieldBasedFrameDecoder{
public PacketDecoder(int maxFrameLength,
int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip
) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength,
lengthAdjustment, initialBytesToStrip);
}
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = (ByteBuf)(super.decode(ctx, in));
if(frame.readableBytes() <= 0) return null ;
short packetType = frame.readShort();
Packet packet = PacketManager.createNewPacket(packetType);
packet.readFromBuff(frame);
return packet;
}
}

通訊協議將包頭的第一個short資料視為包型別,根據包型別反射拿到對應的包class定義,呼叫抽象讀取方法完成訊息體的讀取。

訊息協議的解析與執行

訊息使用第一個short資料作為訊息的型別。為了區分每一個訊息協議包,需要有一個資料結構快取各種協議的型別與對應的訊息包定義。為此,使用列舉類定義所有的協議包。程式碼如下:

public enum PacketType {
//業務上行資料包
//連結心跳包
ReqHeartBeat((short)0x0001, ReqHeartBeatPacket.class),
//新使用者註冊
ReqUserRegister((short)0x0100, ReqUserRegisterPacket.class),
//使用者登陸
ReqUserLogin((short)0x0101, ReqUserLoginPacket.class),
//聊天
ReqChat((short)0x0102, ReqChatPacket.class),
//業務下行資料包
RespHeartBeat((short)0x2001, RespHeartBeatPacket.class),
//新使用者註冊
ResUserRegister((short)0x2100, ResUserRegisterPacket.class),
RespLogin((short)0x2102, RespUserLoginPacket.class),
RespChat((short)0x2103, RespChatPacket.class),
;
private short type;
private Class<? extends AbstractPacket> packetClass;
private static Map<Short,Class<? extends AbstractPacket>> PACKET_CLASS_MAP = new HashMap<Short,Class<? extends AbstractPacket>>();
public static void initPackets() {
Set<Short> typeSet = new HashSet<Short>();
Set<Class<?>> packets = new HashSet<>();
for(PacketType p:PacketType.values()){
Short type = p.getType();
if(typeSet.contains(type)){
throw new IllegalStateException("packet type 協議型別重複" type);
}
Class<?> packet = p.getPacketClass();
if (packets.contains(packet)) {
throw new IllegalStateException("packet定義重複" p);
}
PACKET_CLASS_MAP.put(type,p.getPacketClass());
typeSet.add(type);
packets.add(packet);
}
}
PacketType(short type,Class<? extends AbstractPacket> packetClass){
this.setType(type);
this.packetClass = packetClass;
}
public short getType() {
return type;
}
public void setType(short type) {
this.type = type;
}
public Class<? extends AbstractPacket> getPacketClass() {
return packetClass;
}
public void setPacketClass(Class<? extends AbstractPacket> packetClass) {
this.packetClass = packetClass;
}
public static  Class<? extends AbstractPacket> getPacketClassBy(short packetType){
return PACKET_CLASS_MAP.get(packetType);
}
}

PacketType列舉類中有一個初始化方法initPackets(),用於快取所有包型別與對應的實體類的對映關係。這樣,就可以根據包型別,直接拿到對應的Packet子類。

經過解碼反射得到完整的訊息包定義後,就可以通過反射機制,呼叫相應的業務方法。該步驟由包執行器完成,程式碼如下:

package com.kingston.net;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
public class PacketExecutor {
public static void execPacket(Packet pact){
if(pact == null) return;
try {
Method m = pact.getClass().getMethod("execPacket");
m.invoke(pact, null);
} catch (NoSuchMethodException | SecurityException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}

包執行器其實是根據反射,呼叫對應子類訊息包的業務處理方法。

到這裡,讀者應該可以感受抽象包Packet的定義是該通訊機制的精華部分。正是有了abstract public void  readFromBuff(ByteBuf buf); abstract public void writePacketMsg(ByteBuf buf); abstract public void execPacket()三個抽象方法,才能將各種訊息包的讀寫、業務邏輯相互隔離。

寫到這裡,我不禁回想起大學期間做過的一個聊天室課程設計。當初,我採用Java作為伺服器,flash作為客戶端,基於socket進行通訊。通訊訊息體只有一個長字串,通訊雙方根據不同訊息型別將字串作多次分隔。如果當初協議型別再多幾個的話,估計想死的心都有了。

Netty的半包讀寫解決之道

MessageToByteEncoder 和 ByteToMessageDecoder兩個類只是解決POJO的編解碼,並沒有處理粘包,拆包的異常情況。在本例中,使用LengthFieldBasedFrameDecoder和LengthFieldPrepender兩個工具類,就可以輕鬆解決半包讀寫異常。

服務端與客戶端資料通訊方式

客戶端tcp鏈路建立後,服務端必須快取對應的ChannelHandlerContext物件。這樣,服務端就可以向所有連線的使用者傳送資料了。傳送資料基礎服務類程式碼如下:

package com.kingston.base;
import io.netty.channel.ChannelHandlerContext;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.kingston.net.Packet;
import com.kingston.util.StringUtil;
public class ServerManager {
//快取所有登入使用者對應的通訊上下文環境(主要用於業務資料處理)
private static Map<Integer,ChannelHandlerContext> USER_CHANNEL_MAP  = new ConcurrentHashMap<>();
//快取通訊上下文環境對應的登入使用者(主要用於服務)
private static Map<ChannelHandlerContext,Integer> CHANNEL_USER_MAP  = new ConcurrentHashMap<>();
public static void sendPacketTo(Packet pact,String userId){
if(pact == null || StringUtil.isEmpty(userId)) return;
Map<Integer,ChannelHandlerContext> contextMap  = USER_CHANNEL_MAP;
if(StringUtil.isEmpty(contextMap)) return;
ChannelHandlerContext targetContext = contextMap.get(userId);
if(targetContext == null) return;
targetContext.writeAndFlush(pact);
}
/**
*  向所有線上使用者傳送資料包
*/
public static void sendPacketToAllUsers(Packet pact){
if(pact == null ) return;
Map<Integer,ChannelHandlerContext> contextMap  = USER_CHANNEL_MAP;
if(StringUtil.isEmpty(contextMap)) return;
contextMap.values().forEach( (ctx) -> ctx.writeAndFlush(pact));
}
/**
*  向單一線上使用者傳送資料包
*/
public static void sendPacketTo(Packet pact,ChannelHandlerContext targetContext ){
if(pact == null || targetContext == null) return;
targetContext.writeAndFlush(pact);
}
public static ChannelHandlerContext getOnlineContextBy(String userId){
return USER_CHANNEL_MAP.get(userId);
}
public static void addOnlineContext(Integer userId,ChannelHandlerContext context){
if(context == null){
throw new NullPointerException();
}
USER_CHANNEL_MAP.put(userId,context);
CHANNEL_USER_MAP.put(context, userId);
}
/**
*  登出使用者通訊渠道
*/
public static void ungisterUserContext(ChannelHandlerContext context ){
if(context  != null){
int userId = CHANNEL_USER_MAP.getOrDefault(context,0);
CHANNEL_USER_MAP.remove(context);
USER_CHANNEL_MAP.remove(userId);
context.close();
}
}
}

模擬使用者登入的服務端demo

1. demo流程為客戶端傳送一個以Req開頭命名的上行包到服務端,服務端接受資料後,直接傳送一個以Resp開頭命名的響應包到客戶端。

上行包ReqUserLogin程式碼如下:

public class ReqUserLoginPacket extends Packet{
private long userId;
private String userPwd; 
@Override
public void writePacketBody(ByteBuf buf) {
buf.writeLong(userId);
writeUTF8(buf, userPwd);
}
@Override
public void readPacketBody(ByteBuf buf) {
this.userId = buf.readLong();
this.userPwd =readUTF8(buf);
System.err.println("id=" userId ",pwd=" userPwd);
}
@Override
public PacketType getPacketType() {
return PacketType.ReqUserLogin;
}
@Override
public void execPacket() {
}
public String getUserPwd() {
return userPwd;
}
public void setUserPwd(String userPwd) {
this.userPwd = userPwd;
}
public long getUserId() {
return userId;
}
public void setUserId(long userId) {
this.userId = userId;
}
}

2. 業務邏輯服務,收到登入包後,呼叫對應的業務處理方法進行處理

@Component
public class LoginService {
@Autowired
private UserDao userDao;
public void validateLogin(Channel channel, long userId, String password) {
User user = validate(userId, password);
IoSession session = ChannelUtils.getSessionBy(channel);
RespUserLoginPacket resp = new RespUserLoginPacket();
if(user != null) {
resp.setIsValid((byte)1);
resp.setAlertMsg("登入成功");
ServerManager.INSTANCE.registerSession(user, session);
}else{
resp.setAlertMsg("帳號或密碼錯誤");
}
ServerManager.INSTANCE.sendPacketTo(session, resp);
}
/**
*  驗證帳號密碼是否一致
*/
private User validate(long userId, String password){
if (userId <= 0 || StringUtils.isEmpty(password)) {
return null;
}
User user = userDao.findById(userId);
if (user != null &&
user.getPassword().equals(password)) {
return user;
}
return null;
}
}

3. 業務處理後,下發一個響應包。下行包RespUserLogin程式碼如下:

public class RespUserLoginPacket extends AbstractPacket{
private String alertMsg;
private byte isValid;
@Override
public void writePacketBody(ByteBuf buf) {
writeUTF8(buf, alertMsg);
buf.writeByte(isValid);
}
@Override
public void readPacketBody(ByteBuf buf) {
this.alertMsg = readUTF8(buf);
this.isValid = buf.readByte();
}
@Override
public PacketType getPacketType() {
return PacketType.RespUserLogin;
}
@Override
public void execPacket() {
System.err.println("receive login "  alertMsg);
LoginManager.getInstance().handleLoginResponse(this);
}
public String getAlertMsg() {
return alertMsg;
}
public void setAlertMsg(String alertMsg) {
this.alertMsg = alertMsg;
}
public byte getIsValid() {
return isValid;
}
public void setIsValid(byte isValid) {
this.isValid = isValid;
}
}

至此,服務端主要通訊邏輯基本完成。

模擬使用者登入的客戶端demo

客戶端私有協議跟編解碼方式跟服務端完全一致。客戶端主要關注資料介面的展示。下面只給出啟動應用程式的程式碼,以及測試通訊的示例程式碼。
1.啟動Reactor執行緒組建立與服務端的的連線,以及處理IO網路讀寫。

public class SocketClient {  
/** 當前重接次數*/
private int reconnectTimes = 0;
public void start() {
try{
connect(ClientConfigs.REMOTE_SERVER_IP,
ClientConfigs.REMOTE_SERVER_PORT);
}catch(Exception e){
}
}
public void connect(String host,int port) throws Exception {  
EventLoopGroup group = new NioEventLoopGroup(1);  
try{  
Bootstrap b  = new Bootstrap();  
b.group(group).channel(NioSocketChannel.class)  
.handler(new ChannelInitializer<SocketChannel>(){  
@Override  
protected void initChannel(SocketChannel arg0)  
throws Exception {  
ChannelPipeline pipeline = arg0.pipeline();  
pipeline.addLast(new PacketDecoder(1024*1, 0,4,0,4));  
pipeline.addLast(new LengthFieldPrepender(4));  
pipeline.addLast(new PacketEncoder());  
pipeline.addLast(new ClientTransportHandler());  
}  
});  
ChannelFuture f = b.connect(new InetSocketAddress(host, port),  
new InetSocketAddress(ClientConfigs.LOCAL_SERVER_IP, ClientConfigs.LOCAL_SERVER_PORT))  
.sync();  
f.channel().closeFuture().sync();  
}catch(Exception e){  
e.printStackTrace();  
}finally{  
//          group.shutdownGracefully();  //這裡不再是優雅關閉了  
//設定最大重連次數,防止服務端正常關閉導致的空迴圈
if (reconnectTimes < ClientConfigs.MAX_RECONNECT_TIMES) {
reConnectServer();  
}
}  
}  
}

2.處理業務邏輯的ClientTransportHandler程式碼如下:

public class ClientTransportHandler extends ChannelHandlerAdapter{
public ClientTransportHandler(){
}
@Override
public void channelActive(ChannelHandlerContext ctx){
//註冊session
ClientBaseService.INSTANCE.registerSession(ctx.channel());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception{
AbstractPacket  packet = (AbstractPacket)msg;
PacketManager.INSTANCE.execPacket(packet);
}
@Override
public void close(ChannelHandlerContext ctx,ChannelPromise promise){
System.err.println("TCP closed...");
ctx.close(promise);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.err.println("客戶端關閉1");
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.disconnect(promise);
System.err.println("客戶端關閉2");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.err.println("客戶端關閉3");
Channel channel = ctx.channel();
cause.printStackTrace();
if(channel.isActive()){
System.err.println("simpleclient" channel.remoteAddress() "異常");
}
}
}

3. 先啟動伺服器,再啟動JavaFX客戶端(ClientStartup),即可看到登入介面

至此,聊天室的登入流程基本完成。限於篇幅,此demo例子並沒有出現spring,mybatic相關程式碼,但是私有協議通訊方式程式碼已全部給出。有了一個使用者登入的例子,相信構建其他得業務邏輯也不會太困難。

最後,說下寫程式碼的歷程。這個demo是我春節宅家期間,利用零碎時間做的,平均一天一個小時。很多開發人員應該有這樣的經歷,看書的時候往往覺得都能理解,但實際上自己動手就會遇到各種卡思路。在做這個demo時,我更多時間是花在查資料上。

我也會繼續往這個專案新增功能,讓它看起來越來越“炫”。(^-^)

全部程式碼已在github上託管(程式碼經過多次重構,與部落格上的程式碼略有不同)

完整服務端程式碼請移步 –> netty聊天室伺服器

完整客戶端程式碼請移步 –> netty聊天室客戶端