基於併發伺服器幾種實現方法(總結)

NO IMAGE
1 Star2 Stars3 Stars4 Stars5 Stars 給文章打分!
Loading...

今天主題是實現併發伺服器,實現方法有多種版本,先從簡單的單程序程式碼實現到多程序,多執行緒的實現,最終引入一些高階模組來實現併發TCP伺服器。

說到TCP,想起吐槽大會有個段子提到三次握手,也只有程式猿(媛)能get。

UDP伺服器資料傳輸不可靠,這裡就忽略了。

>>:

簡單的單程序TCP伺服器

假程式碼:

#建立tcp伺服器套接字

#繫結埠

#設定正常情況退出的伺服器下,埠可以重用

#設定監聽,變為主動監聽

# 等待客戶端的連結,返回新的socket和地址

#關閉tcp伺服器套接字


from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR
#建立tcp伺服器套接字
server_socket = socket(AF_INET,SOCK_STREAM)
#繫結埠
server_socket.bind(("",9999))
#設定正常情況退出的伺服器下,埠可以重用
server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
#設定監聽,變為主動監聽
server_socket.listen(5)
while True:
# 等待客戶端的連結,返回新的socket和地址
new_socket,new_address = server_socket.accept()
#接收資料,並且傳送資料
try:
while True:
recv_data = new_socket.recv(1024)
#當有客戶端關閉後,recv解除阻塞,並且返回長度為0
if len(recv_data) > 0:
recv_content = recv_data.decode("gb2312")
print("收到:%s的資訊是:%s" % (str(new_address),recv_content))
new_socket.send("thank you!".encode("gb2312"))
else:
print("客戶端%s已經關閉" % (str(new_address)))
break
finally:
new_socket.close()
print("關閉%s客戶端" % (str(new_address)))
#關閉tcp伺服器套接字
server_socket.close()

多程序TCP伺服器


from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR
from multiprocessing import Process
#在子程序中接收訊息
def recv_data(new_socket,new_address):
while True:
recv_data = new_socket.recv(1024)
# 當有客戶端關閉後,recv解除阻塞,並且返回長度為0
if len(recv_data) > 0:
recv_content = recv_data.decode("gb2312")
print("收到:%s的資訊是:%s" % (str(new_address), recv_content))
new_socket.send("thank you!".encode("gb2312"))
else:
print("客戶端%s已經關閉" % (str(new_address)))
break
#關閉與客戶端的連線
print("關閉與客戶端的連線")
new_socket.close()
def main():
#建立tcp伺服器套接字
server_socket = socket(AF_INET,SOCK_STREAM)
#繫結埠
server_socket.bind(("",8888))
#設定正常情況退出的伺服器下,埠可以重用
server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
#設定監聽,變為被動連線
server_socket.listen(3)
try:
while True:
# 等待客戶端的連結,返回新的socket和地址
new_socket,new_address = server_socket.accept()
#接收資料,並且傳送資料
Process(target=recv_data,args=(new_socket,new_address)).start()
#因為主程序和子程序不共享資料
#如果我們直接關閉new_socket,只是關閉主程序的new_socket,而子程序的不受影響
new_socket.close()
finally:
#關閉tcp伺服器套接字
server_socket.close()
if __name__ == "__main__":
main()

多程序TCP伺服器


from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR
from multiprocessing import Process
#在子程序中接收訊息
def recv_data(new_socket,new_address):
while True:
recv_data = new_socket.recv(1024)
# 當有客戶端關閉後,recv解除阻塞,並且返回長度為0
if len(recv_data) > 0:
recv_content = recv_data.decode("gb2312")
print("收到:%s的資訊是:%s" % (str(new_address), recv_content))
new_socket.send("thank you!".encode("gb2312"))
else:
print("客戶端%s已經關閉" % (str(new_address)))
break
#關閉與客戶端的連線
print("關閉與客戶端的連線")
new_socket.close()
def main():
#建立tcp伺服器套接字
server_socket = socket(AF_INET,SOCK_STREAM)
#繫結埠
server_socket.bind(("",8888))
#設定正常情況退出的伺服器下,埠可以重用
server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
#設定監聽,變為被動連線
server_socket.listen(3)
try:
while True:
# 等待客戶端的連結,返回新的socket和地址
new_socket,new_address = server_socket.accept()
#接收資料,並且傳送資料
Process(target=recv_data,args=(new_socket,new_address)).start()
#因為主程序和子程序不共享資料
#如果我們直接關閉new_socket,只是關閉主程序的new_socket,而子程序的不受影響
new_socket.close()
finally:
#關閉tcp伺服器套接字
server_socket.close()
if __name__ == "__main__":
main()

多執行緒TCP伺服器


from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR
from threading import Thread
#接收訊息
def recv_data(new_socket,new_address):
while True:
recv_data = new_socket.recv(1024)
# 當有客戶端關閉後,recv解除阻塞,並且返回長度為0
if len(recv_data) > 0:
recv_content = recv_data.decode("gb2312")
print("收到:%s的資訊是:%s" % (str(new_address), recv_content))
new_socket.send("thank you!".encode("gb2312"))
else:
print("客戶端%s已經關閉" % (str(new_address)))
break
def main():
#建立tcp伺服器套接字
server_socket = socket(AF_INET,SOCK_STREAM)
#繫結埠
server_socket.bind(("",9999))
#設定正常情況退出的伺服器下,埠可以重用
server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
#設定監聽,變為被動連線
server_socket.listen(3)
try:
while True:
# 等待客戶端的連結,返回新的socket和地址
new_socket,new_address = server_socket.accept()
#接收資料,並且傳送資料
Thread(target=recv_data,args=(new_socket,new_address)).start()
finally:
#關閉tcp伺服器套接字
server_socket.close()
if __name__ == "__main__":
main()

多工協程實現 ——

greenlet和gevent


#coding=utf-8
from greenlet import greenlet
import time
def test1():
while True:
print "---A--"
gr2.switch()
time.sleep(0.5)
def test2():
while True:
print "---B--"
gr1.switch()
time.sleep(0.5)
gr1 = greenlet(test1)
gr2 = greenlet(test2)
#切換到gr1中執行
gr1.switch()

import gevent
#函式
def f(n):
for i in range(n):
print("%s:%s" % (gevent.getcurrent(),i))
f1 = gevent.spawn(f,5)
f2 = gevent.spawn(f,5)
f3 = gevent.spawn(f,5)
#讓主執行緒等待三個協程執行完畢,否則沒有機會執行
f1.join()
f2.join()
f3.join()
#可以看到,3個greenlet是依次執行而不是交替執行。要讓greenlet交替執行,可以通過gevent.sleep()交出控制權。

#coding=utf-8
import gevent
def f(n):
for i in range(n):
print gevent.getcurrent(), i
#用來模擬一個耗時操作,注意不是time模組中的sleep
gevent.sleep(1)
g1 = gevent.spawn(f, 5)
g2 = gevent.spawn(f, 5)
g3 = gevent.spawn(f, 5)
#下面三行程式碼意思:主執行緒等待各個協成支援完,否則協成沒有機會執行
g1.join()
g2.join()
g3.join()

單程序TCP伺服器 ——

非堵塞式


from socket import AF_INET,socket,SO_REUSEADDR,SOCK_STREAM,SOL_SOCKET
def main():
#建立tcp的socket套接字
server_socket = socket(AF_INET,SOCK_STREAM)
server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
#繫結埠
server_socket.bind(("",9999))
#設定非阻塞,也就是說accept方法不阻塞了,
# 但是在沒有客戶端連結且被執行的時候會報錯
#有客戶端連結的時候正常執行
server_socket.setblocking(False)
#設定監聽
server_socket.listen(5)
#客戶端列表
client_lists = []
try:
#不斷呼叫accept
while True:
try:
# print("accept--111")
new_socket,new_address = server_socket.accept()
print("accept--2222")
except Exception as result:
# print(result)
pass
else:
print("新的客戶%s連結上" % str(new_address))
#新連結的new_sokect預設也是阻塞,也設定為非阻塞後,recv為非阻塞
new_socket.setblocking(False)
client_lists.append((new_socket,new_address))
# print(111)
for client_sokect,client_address in client_lists:
#接收資料
try:
recv_data = client_sokect.recv(1024)
except Exception as result:
# print(result)
pass
else:
# print("正常資料:%s" %recv_data)
if len(recv_data) > 0 :
print("收到%s:%s" % (str(client_address),recv_data))
client_sokect.send("thank you!".encode("gb2312"))
else:
#客戶端已經埠,要把該客戶端從列表中異常
client_lists.remove((client_sokect,new_address))
client_sokect.close()
print("%s已經斷開" % str(new_address))
finally:
#關閉套接字
server_socket.close()
if __name__ == "__main__":
main()

單程序TCP伺服器 ——

select版

select 原理

其他語言(c或者c )也有使用select實現多工伺服器。

select 能夠完成一些套接字的檢查,從頭到尾檢查一遍後,標記哪些套接字是否可以收資料,返回的時候,就返回能接收資料的套接字,返回的是列表。select是由作業系統提供的,效率要高些,非常快的方式檢測哪些套接字可以接收資料。select是跨平臺的,在window也可以用。

io多路複用:沒有使用多程序和多執行緒的情況下完成多個套接字的使用。


from socket import AF_INET,socket,SO_REUSEADDR,SOCK_STREAM,SOL_SOCKET
from select import select
import sys
def main():
#建立tcp的socket套接字
server_socket = socket(AF_INET,SOCK_STREAM)
server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
#繫結埠
server_socket.bind(("",9999))
#設定監聽
server_socket.listen(5)
#客戶端列表
socket_lists = [server_socket,sys.stdin]
wirte_list = []
#是否退出
is_run = False
try:
while True:
#檢測列表client_lists那些socket可以接收資料,
#檢測列表[]那些套接字(socket)可否傳送資料
#檢測列表[]那些套接字(socket)是否產生了異常
print("select--111")
#這個select函式預設是堵塞,當有客戶端連結的時候解除阻塞,
# 當有資料可以接收的時候解除阻塞,當客戶端斷開的時候解除阻塞
readable, wirteable,excep = select(socket_lists,wirte_list,[])
# print("select--2222")
# print(111)
for sock in wirteable:
#這個會一直髮送,因為他是處於已經發的狀態
sock.send("thank you!".encode("gb2312"))
for sock in readable:
#接收資料
if sock == server_socket:
print("sock == server_socket")
#有新的客戶端連結進來
new_socket,new_address = sock.accept()
#新的socket新增到列表中,便於下次socket的時候能檢查到
socket_lists.append(new_socket)
elif sock == sys.stdin:
cmd = sys.stdin.readline()
print(cmd)
is_run = cmd
else:
# print("sock.recv(1024)....")
#此時的套接字sock是直接可以取資料的
recv_data = sock.recv(1024)
if len(recv_data) > 0:
print("從[%s]:%s" % (str(new_address),recv_data))
sock.send(recv_data)
#把連結上有訊息接收的socket新增到監聽寫的列表中
wirte_list.append(sock)
else:
print("客戶端已經斷開")
#客戶端已經斷開,要移除
sock.close()
socket_lists.remove(sock)
#是否退出程式
if is_run:
break
finally:
#關閉套接字
server_socket.close()
if __name__ == "__main__":
main()

單程序TCP伺服器 ——

epoll版


from socket import *
import select
def main():
#建立tcp伺服器套接字
server_socket = socket(AF_INET,SOCK_STREAM)
#設定埠可以重用
server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
#繫結埠
server_socket.bind(("",9999))
#設定監聽
server_socket.listen(5)
#用epoll設定監聽收資料
epoll = select.epoll()
#把server_socket註冊到epoll的事件監聽中,如果已經註冊過會發生異常
epoll.register(server_socket.fileno(),select.EPOLLIN|select.EPOLLET)
#裝socket列表
socket_lists = {}
#裝socket對應的地址
socket_address = {}
while True:
#返回套接字列表[(socket的檔案描述符,select.EPOLLIN)],
# 如果有新的連結,有資料發過來,斷開連結等都會解除阻塞
print("epoll.poll--111")
epoll_list = epoll.poll()
print("epoll.poll--222")
print(epoll_list)
for fd,event in epoll_list:
#有新的連結
if fd == server_socket.fileno():
print("新的客戶fd==%s" % fd)
new_sokect,new_address = server_socket.accept()
#往字典新增資料
socket_lists[new_sokect.fileno()] = new_sokect
socket_address[new_sokect.fileno()] = new_address
#註冊新的socket也註冊到epoll的事件監聽中
epoll.register(new_sokect.fileno(), select.EPOLLIN | select.EPOLLET)
elif event ==select.EPOLLIN:
print("收到資料了")
#根據檔案操作符取出對應socket
new_sokect = socket_lists[fd]
address = socket_address[fd]
recv_data = new_sokect.recv(1024)
if len(recv_data) > 0:
print("已經收到[%s]:%s" % (str(address),recv_data.decode("gb2312")))
else:
#客戶端埠,取消監聽
epoll.unregister(fd)
#關閉連結
new_sokect.close()
print("[%s]已經下線" % str(address))
#關閉套接字連結
server_socket.close()
if __name__ == "__main__":
main()

單程序TCP伺服器 ——

gevent版

gevent原理

greenlet已經實現了協程,但是這個還得人工切換,是不是覺得太麻煩了,莫要捉急,python還有一個比greenlet更強大的並且能夠自動切換任務的模組gevent

原理——當一個greenlet遇到IO(指的是input output 輸入輸出,比如網路、檔案操作等)操作時,比如訪問網路,就自動切換到其他的greenlet,等到IO操作完成,再在適當的時候切換回來繼續執行。

由於IO操作非常耗時,經常使程式處於等待狀態,有了gevent為我們自動切換協程,就保證總有greenlet在執行,而不是等待IO.


import sys
import time
import gevent
from gevent import socket,monkey
monkey.patch_all()
def handle_request(conn):
while True:
data = conn.recv(1024)
if not data:
conn.close()
break
print("recv:", data)
conn.send(data)
def server(port):
s = socket.socket()
s.bind(('', port))
s.listen(5)
while True:
newSocket, addr = s.accept()
gevent.spawn(handle_request, newSocket)
if __name__ == '__main__':
server(7788)

首先基於以上程式碼模組,撒點概念問題:

1.什麼是協程?

協程:存線上程中,是比執行緒更小的執行單元,又稱微執行緒,纖程。自帶cpu上下文,操作協程由程式設計師決定,它可以將一個執行緒分解為多個微執行緒,每個協程間共享全域性空間的變數,每秒鐘切換頻率高達百萬次。

2. 什麼是計算密集型和IO密集型

計算密集型:要進行大量的計算,消耗cpu資源。如複雜計算,對視訊進行高清解碼等,全靠cpu的運算能力。而計算密集型任務完成多工切換任務比較耗時,cpu執行任務效率就越低。在python中,多程序適合計算密集型任務。

IO密集型:涉及到網路、磁碟io的任務都是io密集型。cpu消耗少,計算量小,如請求網頁,讀寫檔案等。在python中,使用sleep達到IO密集型任務的目的,多執行緒適合IO密集型任務。

各大實現版本對比:

select:

1)支援跨平臺,最大缺陷是單個程序開啟的FD是有限的,由FD_SETSIZE設定,預設是1024;

2)對socket掃描時是線性掃描,及採用輪詢方式,效率低;

3)需要維護一個存放大量FD的資料結構,使得使用者空間和核心空間在傳遞該資料結構時複製開銷大。

poll:

1)poll與select本質上沒有區別,但poll沒有最大連線數的限制;

2)大量的fd陣列被整體複製於使用者態和核心地址空間之間,不管這樣的複製是不是有意義;

3)‘水平觸發’,如果報告了fd後,沒有被處理,下次poll時還會再次報告該fd。

epoll:

1)是之前poll和select的增強版,epoll更靈活,沒有描述符限制,能開啟的fd遠大於1024(1G的記憶體上能監聽約10萬個埠);

2)‘邊緣出發’,事件通知機制,效率提升,最大的特點在於它只管你活躍的連線,而跟連線總數無關。而epoll對檔案描述符的操作模式之一ET是一種高效的工作方式,很大程度減少事件反覆觸發的次數,核心不會傳送更多的通知(only once)。

以上這篇基於併發伺服器幾種實現方法(總結)就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支援指令碼之家。

相關文章

程式語言 最新文章