NO IMAGE

寫過web的同學們應該對Session這個東西很熟悉。瀏覽器第一次與伺服器建立連線的時候,伺服器就會自動為之分配一個Session。Session可以用來判斷使用者是否經過登入驗證,也可以儲存使用者的各種資訊。

其實,Session是很常用的技術。不管是WEB,還是遊戲服務,還是聯網的桌面程式,都有session的身影。有了Session,我們可以向裡面儲存各種個人引數,還可以利用session來向客戶端傳送訊息。極大方便了程式對客戶端的管理。

Mina IO框架預設有IoSession這個物件,Netty可就沒有了。所以我們可以自己建立一個Session抽象。

關於Session,我們希望它有這樣的作用。

1. 客戶端鏈路第一次啟用時,服務端為之建立一個Session;

2. 可以使用Session向使用者傳送訊息;

3. 可以儲存一些重要的且不需要持久化的使用者資訊;

4. 只能由服務端控制它的生命週期消亡;

public class IoSession {
private static final Logger logger = LoggerFactory.getLogger(IoSession.class);
/** 網路連線channel */
private Channel channel;
private User user;
/** ip地址 */
private String ipAddr;
private boolean reconnected;
/** 拓展用,儲存一些個人資料  */
private Map<String, Object> attrs = new HashMap<>();
public IoSession() {
}
public IoSession(Channel channel) {
this.channel = channel;
this.ipAddr = ChannelUtils.getIp(channel);
}
public void setUser(User user) {
this.user = user;
}
/**
* 向客戶端傳送訊息
* @param packet
*/
public void sendPacket(Packet packet) {
if (packet == null) {
return;
}
if (channel != null) {
channel.writeAndFlush(packet);
}
}
public String getIpAddr() {
return ipAddr;
}
public void setIpAddr(String ipAddr) {
this.ipAddr = ipAddr;
}
public boolean isReconnected() {
return reconnected;
}
public void setReconnected(boolean reconnected) {
this.reconnected = reconnected;
}
public User getUser() {
return user;
}
public boolean isClose() {
if (channel == null) {
return true;
}
return !channel.isActive() ||
!channel.isOpen();
}
/**
* 關閉session 
* @param reason {@link SessionCloseReason}
*/
public void close(SessionCloseReason reason) {
try{
if (this.channel == null) {
return;
}
if (channel.isOpen()) {
channel.close();
logger.info("close session[{}], reason is {}", getUser().getUserId(), reason);
}else{
logger.info("session[{}] already close, reason is {}", getUser().getUserId(), reason);
}
}catch(Exception e){
}
}
}

Session被關閉可以有一系列原因,所以我們最後有一個列舉儲存各種原因,像這樣

package com.kingston.net;
public enum SessionCloseReason {
/** 正常退出 */
NORMAL,
/** 連結超時 */
OVER_TIME,
}

在Netty,channel是通訊的載體,為了方便對channel的各種操作,加了一個channel的工具類(ChannelUtils.java)

public final class ChannelUtils {
public static AttributeKey<IoSession> SESSION_KEY = AttributeKey.valueOf("session");
/**
* 新增新的會話
* @param channel
* @param session
* @return
*/
public static boolean addChannelSession(Channel channel, IoSession session) {
Attribute<IoSession> sessionAttr = channel.attr(SESSION_KEY);
return sessionAttr.compareAndSet(null, session);
}
public static IoSession getSessionBy(Channel channel) {
Attribute<IoSession> sessionAttr = channel.attr(SESSION_KEY);
return sessionAttr.get() ;
}
public static String getIp(Channel channel) {
return ((InetSocketAddress)channel.remoteAddress()).getAddress().toString().substring(1);
}
}

使用了IoSession,先前用於管理使用者通訊的工具類,也相應發生變化

public enum ServerManager {
INSTANCE;
private Logger logger = LoggerFactory.getLogger(ServerManager.class);
/** 快取通訊上下文環境對應的登入使用者(主要用於服務) */ 
private Map<IoSession, Long> session2UserIds  = new ConcurrentHashMap<>();
/** 快取使用者id與對應的會話 */
private ConcurrentMap<Long, IoSession> userId2Sessions = new ConcurrentHashMap<>();
public void sendPacketTo(Packet pact,Long userId){
if(pact == null || userId <= 0) return;
IoSession session = userId2Sessions.get(userId);
if (session != null) {
session.sendPacket(pact);
}
}
/**
*  向所有線上使用者傳送資料包
*/
public void sendPacketToAllUsers(Packet pact){
if(pact == null ) return;
userId2Sessions.values().forEach( (session) -> session.sendPacket(pact));
}
/**
*  向單一線上使用者傳送資料包
*/
public void sendPacketTo(Packet pact,ChannelHandlerContext targetContext ){
if(pact == null || targetContext == null) return;
targetContext.writeAndFlush(pact);
}
public IoSession getSessionBy(long userId) {
return this.userId2Sessions.get(userId);
}
public boolean registerSession(User user, IoSession session) {
session.setUser(user);
userId2Sessions.put(user.getUserId(), session);
logger.info("[{}] registered...", user.getUserId());
return true;
}
/**
*   登出使用者通訊渠道
*/
public void ungisterUserContext(Channel context ){
if(context  == null){
return;
}
IoSession session = ChannelUtils.getSessionBy(context);
Long userId = session2UserIds.remove(session);
userId2Sessions.remove(userId);
if (session != null) {
session.close(SessionCloseReason.OVER_TIME);
}
}
}

加入IoSession後,先前的業務需要做點修改,比如在客戶端鏈路建立後,需要建立新的session物件。

在MessageTransportHandler類增加方法

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (!ChannelUtils.addChannelSession(ctx.channel(), new IoSession(ctx.channel()))) {
ctx.channel().close();
logger.error("Duplicate session,IP=[{}]",ChannelUtils.getIp(ctx.channel()));
}
}

全部程式碼已在github上託管

服務端程式碼請移步 –> netty聊天室伺服器

客戶端程式碼請移步 –> netty聊天室客戶端