NO IMAGE

上一篇BLOG已經介紹了revolver RUDP的傳輸效能、基本的框架和介面,這篇文章我重點講述RUDP的實現細節。在RUDP的模組中最為重要的是其收發緩衝控制和CCC傳送視窗控制、CCC傳送慢啟動控制、CCC快恢復控制等幾個過程。(關於RUDP原始碼實現在revolver開源專案的RUDP目錄:點選開啟連結

資料塊定義

在RUDP模組中,所有傳送的資料被定義成RUDPRecvSegment 和 RUDPSendSegment結構,其中RUDPSendSegment是傳送塊定義,RUDPRecvSegment 是接收塊定義。如下:
//傳送資料片
typedef struct tagRUDPSendSegment
{
uint64_t	seq_;                          //塊序號
uint64_t	push_ts_;		       //進入傳送佇列的時刻
uint64_t	last_send_ts_;		       //最後一次傳送的時刻
uint16_t	send_count_;		       //傳送的次數
uint8_t		data_[MAX_SEGMENT_SIZE];       //塊資料	
uint16_t	data_size_;                    //塊資料長度
}RUDPSendSegment;

typedef struct tagRUDPRecvSegment
{
uint64_t	seq_;                          //塊序號
uint8_t		data_[MAX_SEGMENT_SIZE];       //塊資料
uint16_t	data_size_;	               //塊資料長度
}RUDPRecvSegment;

塊的最大尺寸為MAX_SEGMENT_SIZE = 1408(不能大於MTU,一般MTU是1492)。為了加快記憶體分配的速度,RUDP模組中使用了物件池來保證塊物件的快速申請,物件池定義:

ObjectPool<RUDPSendSegment, RUDP_SEGMENT_POOL_SIZE>	SENDSEGPOOL;
ObjectPool<RUDPRecvSegment, RUDP_SEGMENT_POOL_SIZE>	RECVSEGPOOL;
#define GAIN_SEND_SEG(seg) \
RUDPSendSegment* seg = SENDSEGPOOL.pop_obj();\
seg->reset()
#define RETURN_SEND_SEG(seg) \
if(seg != NULL)\
SENDSEGPOOL.push_obj(seg)
#define GAIN_RECV_SEG(seg) \
RUDPRecvSegment* seg = RECVSEGPOOL.pop_obj(); \
seg->reset()
#define RETURN_RECV_SEG(seg) \
if(seg != NULL)\
RECVSEGPOOL.push_obj(seg)

幾個巨集是作為塊申請和釋放的巨集。以上就是塊的定義介紹,更具體的只有去檢視相關原始碼了。

傳送緩衝區

傳送緩衝區,定義如下:
class RUDPSendBuffer
{
public:
...
//傳送資料介面
int32_t				send(const uint8_t* data, int32_t data_size);
//ACK處理
void				on_ack(uint64_t ack_seq);
//NACK處理
void				on_nack(uint64_t base_seq, const LossIDArray& loss_ids);
//定時器介面
void				on_timer(uint64_t now_ts);
//檢查BUFFER是否可以寫入資料
void				check_buffer();
...
public:
uint64_t			get_buffer_seq() {return buffer_seq_;};
//設定NAGLE演算法	
void				set_nagle(bool nagle = true){nagle_ = nagle;};
bool				get_nagle() const {return nagle_;};
//設定傳送緩衝區的大小
void				set_buffer_size(int32_t buffer_size){buffer_size_ = buffer_size;};
int32_t				get_buffer_size() const {return buffer_size_;};
...
protected:
IRUDPNetChannel*	net_channel_;
//正在傳送的資料片
SendWindowMap		send_window_;
//正在傳送的報文的丟包集合
LossIDSet			loss_set_;
//等待傳送的資料片
SendDataList		send_data_;
//傳送緩衝區的大小
int32_t				buffer_size_;
//當前緩衝資料的大小
int32_t				buffer_data_size_;
//當前BUFFER中最大的SEQ
uint64_t			buffer_seq_;
//當前WINDOW中最大的SEQ
uint64_t			cwnd_max_seq_;
//接收端最大的SEQ
uint64_t			dest_max_seq_;
//速度控制器
RUDPCCCObject*		ccc_;
//是否啟動NAGLE演算法
bool				nagle_;
}

其中send函式是資料寫入函式,在這個函式裡面,緩衝區物件先會對寫入的資料進行報文拼接成傳送塊,讓傳送資料儘量接近MAX_SEGMENT_SIZE,如果傳送的資料大於MAX_SEGMENT_SIZE,也會進行MAX_SEGMENT_SIZE為單元的分片。然後寫入到對應的傳送緩衝列表send_data_當中。最後嘗試進行網路傳送。虛擬碼如下:

int32_t RUDPSendBuffer::send(const uint8_t* data, int32_t data_size)
{
int32_t copy_pos = 0;
int32_t copy_size = 0;
uint8_t* pos = (uint8_t *)data;
uint64_t now_timer = CBaseTimeValue::get_time_value().msec();
if(!send_data_.empty()) //拼接報文,讓其接近MAX_SEGMENT_SIZE
{
//取出send_data_中的最後一片,如果它沒有達到MAX_SEGMENT_SIZE,資料追加到MAX_SEGMENT_SIZE大小為止。
RUDPSendSegment* last_seg = send_data_.back();
if(last_seg != NULL && last_seg->data_size_ < MAX_SEGMENT_SIZE)
{
copy_size = MAX_SEGMENT_SIZE - last_seg->data_size_;
if( copy_size > data_size) 
copy_size = data_size;
memcpy(last_seg->data_   last_seg->data_size_, pos, copy_size);
copy_pos  = copy_size;
pos  = copy_size;
last_seg->data_size_  = copy_size;
}
}
//剩餘資料分成MAX_SEGMENT_SIZE為單位的若干分片
while(copy_pos < data_size)
{
GAIN_SEND_SEG(last_seg);
//設定初始化的的時刻
last_seg->push_ts_ = now_timer; //記錄壓入時間戳
last_seg->seq_ = buffer_seq_;
buffer_seq_   ;
//確定拷貝的塊長度
copy_size = (data_size - copy_pos);
if(copy_size > MAX_SEGMENT_SIZE)
copy_size = MAX_SEGMENT_SIZE;
memcpy(last_seg->data_, pos, copy_size);
copy_pos  = copy_size;
pos  = copy_size;
last_seg->data_size_ = copy_size;
//壓入傳送佇列
send_data_.push_back(last_seg);
}
//記錄緩衝區的資料長度
buffer_data_size_  = copy_pos;
//嘗試傳送,立即傳送
attempt_send(now_timer);
return copy_pos;
}

這裡會觸發attempt_send()函式。這個函式是嘗試傳送的核心函式。在後面的幾個過程裡面也會呼叫到這個函式。以上就是傳送函式的過程。

除了傳送函式以外,傳送緩衝區物件還會響應來自網路的on_ack和on_nack訊息,這兩個訊息分別是處理正常的狀態報告和丟包情況下的網路報告。如果收到on_ack,緩衝區物件會把已經接收端報告過來的報文ID全部從傳送視窗中刪除,然後呼叫attempt_send嘗試新的塊傳送。如果收到的是on_nack,表示對端有丟包,則先會記錄丟包的ID到loss_set中,再呼叫on_ack進行處理。
觸發attempt_send還有可能是定時器Timer,定時器每5MS會檢查一下傳送緩衝區,並呼叫attempt_send嘗試傳送並且會檢查緩衝區是否可寫。
attempt_send函式虛擬碼如下:
void RUDPSendBuffer::attempt_send(uint64_t now_timer)
{
uint32_t cwnd_size = send_window_.size();
uint32_t rtt = ccc_->get_rtt();
uint32_t ccc_cwnd_size = ccc_->get_send_window_size();
RUDPSendSegment* seg = NULL;
uint32_t send_packet_number  = 0;
if(!loss_set_.empty()) //重發丟失的片段
{
//傳送丟包佇列中的報文
uint64_t loss_last_ts = 0;
uint64_t loss_last_seq = 0;
for(LossIDSet::iterator it = loss_set_.begin(); it != loss_set_.end();) //檢查丟失報文是否要重發
{
if(send_packet_number >= ccc_cwnd_size) //超過傳送視窗
break;
SendWindowMap::iterator cwnd_it = send_window_.find(*it);
if(cwnd_it != send_window_.end() && cwnd_it->second->last_send_ts_   rtt < now_timer) //丟失報文必須在視窗中
{
seg = cwnd_it->second;
//UDP網路傳送
net_channel_->send_data(0, seg->seq_, seg->data_, seg->data_size_, now_timer);
if(cwnd_max_seq_ < seg->seq_)
cwnd_max_seq_ = seg->seq_;
//判斷是否可以更改TS
if(loss_last_ts < seg->last_send_ts_)
{
loss_last_ts = seg->last_send_ts_;
if(loss_last_seq < *it) 
loss_last_seq = *it;
}
seg->last_send_ts_ = now_timer;
seg->send_count_   ;
send_packet_number   ;
loss_set_.erase(it   );
//報告CCC有重發
ccc_->add_resend();
}
else
it;
}
//更新重發包範圍內未重發報文的時刻,防止下一次定時器到來時重複傳送
for(SendWindowMap::iterator it = send_window_.begin(); it != send_window_.end();   it)
{
if(it->second->push_ts_ < loss_last_ts && loss_last_seq >= it->first)
it->second->last_send_ts_ = now_timer;
else if(loss_last_seq < it->first)
break;
}
}
else if(send_window_.size() > 0)//丟包佇列為空,重發所有視窗中超時的分片
{
//傳送間時間隔閾值
uint32_t rtt_threshold = (uint32_t)ceil(rtt * 1.25);
rtt_threshold = (core_max(rtt_threshold, 30));
SendWindowMap::iterator end_it = send_window_.end();
for(SendWindowMap::iterator it = send_window_.begin(); it != end_it;   it)
{
if(send_packet_number >= ccc_cwnd_size || (it->second->push_ts_   rtt_threshold > now_timer))
break;
seg = it->second;
//重發塊的觸發條件是上一次傳送的時間距離現在大於特定的閾值或者壓入時間很長並且是屬於傳送緩衝區靠前的塊
if(seg->last_send_ts_   rtt_threshold < now_timer 
|| (seg->push_ts_   rtt_threshold * 5 < now_timer && seg->seq_ < dest_max_seq_   3 && seg->last_send_ts_   rtt_threshold / 2 < now_timer)) 
{
net_channel_->send_data(0, seg->seq_, seg->data_, seg->data_size_, now_timer);
if(cwnd_max_seq_ < seg->seq_)
cwnd_max_seq_ = seg->seq_;
seg->last_send_ts_ = now_timer;
seg->send_count_   ;
send_packet_number   ;
//報告CCC有重發塊
ccc_->add_resend();
}
}
}
//判斷是否可以傳送新的報文
if(ccc_cwnd_size > send_packet_number)
{
while(!send_data_.empty())
{
RUDPSendSegment* seg = send_data_.front();
//判斷NAGLE演算法,NAGLE最少需要在100MS湊1024個位元組報文
if(cwnd_size > 0 && nagle_ && seg->push_ts_   NAGLE_DELAY > now_timer && seg->data_size_ < MAX_SEGMENT_SIZE - 256)
break;
//判斷髮送視窗
if(cwnd_size < ccc_cwnd_size)
{
send_data_.pop_front();
send_window_.insert(SendWindowMap::value_type(seg->seq_, seg));
cwnd_size   ;
seg->push_ts_ = now_timer;
seg->last_send_ts_ = now_timer;
seg->send_count_ = 1;
//UDP網路傳送
net_channel_->send_data(0, seg->seq_, seg->data_, seg->data_size_, now_timer);
if(cwnd_max_seq_ < seg->seq_)
cwnd_max_seq_ = seg->seq_;
}
else //傳送視窗滿,則停止傳送
break;
}
}
}

從上可得知,attempt_send是首先檢查是否可以傳送丟失的報文,然後再檢查視窗中太老的報文是否要重發,最後才加入新的傳送報文。所有的前提約束是不超過傳送視窗。這個函式裡CCC決定的傳送視窗大小和RTT直接控制著傳送速度和傳送策略。在這裡值得一提的是NAGLE的實現,RUDP為了防止小包過多,實現了一個nagle演算法,如果設定了此開關,假如只有1個塊在緩衝佇列中,會等資料達到1024的長度才進行傳送。如果等100MS沒到1024長度也會傳送,也就是最大等100MS.開關可以通過rudp
interface設定的。

接收緩衝區

接收緩衝區相對比較簡單,其主要功能是接收傳送方的資料並生成接收塊、塊排序、丟包判斷和反饋、讀事件通知等。以下是接收緩衝區的定義:
class RUDPRecvBuffer
{
public:
...
//來自網路中的資料
int32_t				on_data(uint64_t seq, const uint8_t* data, int32_t data_size);
//定時事件
void				on_timer(uint64_t now_timer, uint32_t rtc);
//讀取BUFFER中的資料
int32_t				read(uint8_t* data, int32_t data_size);
//檢查緩衝區是否可讀
void				check_buffer();
//檢查丟包
bool				check_loss(uint64_t now_timer, uint32_t rtc);
...
protected:
IRUDPNetChannel*	net_channel_;
//接收視窗
RecvWindowMap		recv_window_;
//已完成的連續資料片
RecvDataList		recv_data_;
//丟包序列
LossIDTSMap			loss_map_;
//當前BUFFER中最大連續資料片的SEQ
uint64_t			first_seq_;
//當期BUFFER中受到的最大的資料片ID
uint64_t			max_seq_;
//最後一次傳送ACK的時刻
uint64_t			last_ack_ts_;
//在上次傳送ACK到現在,受到新的連續報文的標誌	
bool				recv_new_packet_;
...
};

在上面定義中,核心的函式主要是on_data和on_timer。on_data是接收來自傳送端的RUDP資料包文,在這個函式裡面首先會進行接收到報文和緩衝去裡面的報文進行比較判斷是否丟包和重複包。如果有丟包,記錄到loss_map中。如果是重複包,則丟棄。如果接收到的包和緩衝區裡的報文可以組成連續的塊序列。則對上層觸發on_read讀事件。一下是這個函式的虛擬碼:

int32_t RUDPRecvBuffer::on_data(uint64_t seq, const uint8_t* data, int32_t data_size)
{
//報文合法性檢測
if(seq > first_seq_   MAX_SEQ_INTNAL || data_size > MAX_SEGMENT_SIZE)
{
//報告異常
RUDP_RECV_DEBUG("on data exception!!");
net_channel_->on_exception();
return -1;
}
RUDPRecvSegment* seg = NULL;
if(first_seq_   1 == seq)//連續報文
{
recv_new_packet_= true;
//將資料緩衝到佇列中
GAIN_RECV_SEG(seg);
seg->seq_ = seq;
seg->data_size_ = data_size;
memcpy(seg->data_, data, data_size);
recv_data_.push_back(seg);
first_seq_ = seq;
//判斷緩衝區中的塊是否連續,並進行排序
check_recv_window();
//觸發可讀事件
net_channel_->on_read();
//刪除丟包
loss_map_.erase(seq);
}
else if(seq > first_seq_   1) //非連續報文
{
RecvWindowMap::iterator it = recv_window_.find(seq);
if(it == recv_window_.end()) //記錄到接收視窗中
{
//將資料緩衝到佇列中
GAIN_RECV_SEG(seg);
seg->seq_ = seq;
seg->data_size_ = data_size;
memcpy(seg->data_, data, data_size);
recv_window_[seq] = seg;
}
//判斷丟包
if(seq > max_seq_   1)
{
uint64_t ts = CBaseTimeValue::get_time_value().msec();
for(uint64_t i = max_seq_   1; i < seq;     i) //緩衝區中最大的報文和收到的報文之間的報文全部列入丟包範圍中,並記錄丟包時刻
loss_map_[i] = ts;
}
else
{
//刪除丟包
loss_map_.erase(seq);
}
}
//更新緩衝區最大SEQ
if(max_seq_ < seq)
max_seq_ = seq;
return 0;
}

on_timer是定時觸發的,一般是5MS觸發一次。主要是向傳送端傳送報告訊息(ack/nack)、檢查緩衝區是否可讀兩個操作。傳送ack狀態訊息的條件是

uint32_t rtc_threshold = core_min(20, rtc / 2);
if(last_ack_ts_ rtc_threshold <= now_timer && recv_new_packet_){
傳送ack
}
其中rtc是RTT的修正值,由CCC計算得來。間隔不大於20MS傳送一次。recv_new_packet_是一個收到正常連續報文的標誌。如果傳送了NACK,就不傳送ACK,如果有丟包的話,就會觸發傳送nack,在on_timer的時候就會檢測是本定時週期是否有丟包,如果有,就將丟包的序號通過nack發往傳送端做丟包補償。
void RUDPRecvBuffer::on_timer(uint64_t now_timer, uint32_t rtc)
{       //檢查丟包
if(check_loss(now_timer, rtc))
recv_new_packet_ = false;
//檢查是否需要傳送ack
uint32_t rtc_threshold = core_min(20, rtc / 2);
if(last_ack_ts_   rtc_threshold <= now_timer && recv_new_packet_)
send_ack();
//檢查緩衝區是否可讀
if(!recv_data_.empty() && net_channel_ != NULL)
net_channel_->on_read();
}

CCC核心控制

CCC的核心控制就是慢啟動、快恢復、RTT評估三個部分。
慢啟動過程描述如下:
1、傳送端的初始化傳送視窗(send_wnd)為16
2、當傳送端收到第一個ACK時,send_wnd = send_wnd (本ACK週期內傳送成功的報文數量)
3、繼續傳送資料包文,直到下一個ACK。重複2和3步驟,如果send_wnd >= MAX_WND.慢啟動結束,或者慢啟動時間超過10個RTT和出現丟包情況,慢啟動也結束。
其中MAX_WND是通過RTT決定的。RTT與MAX_WND的對照
RTT                                              MAX_WND
< 10ms                                      2048
< 50ms                                      6144
< 100ms                                     8192
其他                                            12288
從上面可得知,RTT越大MAX_WND越大,這樣做的目的是提高高延遲穩定網路之間的吞吐量。
快恢復過程描述如下:
在慢啟動結束後,資料傳輸過程會隨著網路變化策略也要變化。
1、如果1個ACK週期沒有丟包,傳送視窗send_wnd = snd_cwnd * 1.5
2、如果1個ACK週期有丟包,send_wnd  = send_wnd  / 1.25;最小不能低於8
3、如果本地觸發on_timer事件,檢查本地重發報文resend_count > send_wnd  / 8,如果條件滿足send_wnd  = send_wnd  / 1.25;最小不能低於8。
RTT的評估是通過RUDP_KEEPLIVE的迴路得到一個keeplive_rtt為引數如數計算得到rtt和rtt_var.虛擬碼如下:
void RUDPCCCObject::set_rtt(uint32_t keep_live_rtt)
{
...
//第一次計算rtt和rtt修正
if(rtt_first_)
{
rtt_first_ = false;
rtt_ = keep_live_rtt;
rtt_var_ = rtt_ / 2;
}
else //參考了tcp的rtt計算
{
rtt_var_ = (rtt_var_ * 3   core_abs(rtt_, keep_live_rtt)) / 4;
rtt_ = (7 * rtt_   keep_live_rtt) / 8;
}
rtt_ = core_max(5, rtt_);
rtt_var_ = core_max(3, rtt_var_);
...
}

總結,revolver RUDP模組在傳輸速度和穩定性上表現還算優秀,在頻寬達到30M/s以上,CPU上升比較高,一般佔用一個CORE的30%,造成這個原因主要是一個UDP socket傳送比較耗CPU,還有就是大資料量造成傳送和接收視窗增長,使得丟包判定、視窗移動等效率明顯下降。關於視窗移動和傳送以後可以考慮用存C來實現,不依賴C 和STL,應該效率有比較大的提升。