tft每日頭條

 > 科技

 > io流操作的固定流程

io流操作的固定流程

科技 更新时间:2024-06-29 10:05:01
I/O多路複用之poll

poll函數接口

#include <poll.h> int POLL(struct pollfd *fds, nfds_t nfds, int timeout);

參數解釋:

fds是一個poll函數監聽的結構列表. 每一個元素中, 包含了三部分内容: 文件描述符, 監聽的事件集合, 返回的事件集合.

pollfd結構體:

struct pollfd { int fd; //要監控的文件描述符 short events; //設置我們監控的描述符發生的事件 常見事件類型: POLLIN 可讀事件 POLLOUT 可寫事件 POLLIN | POLLOUT 用按位或的方式可以表示可讀可寫事件 short revents; //當關心的事件發生時,返回實際發生的事件 };

nfds:表示fds數組的長度.

程序員需要在代碼當中先定義一個事件結構數組; struct pollfd fd_arr[10]; fd_arr[0].fd= 3; //設置文件描述符 fd_arr[0].events = POLLIN; //設置可讀事件

timeout:表示poll函數的超時時間, 單位是毫秒(ms).

大于0 :帶有超時時間的監控 等于0 :非阻塞 小于0 :阻塞

events和revents的取值:

io流操作的固定流程(并發服務器IO多路複用之poll)1

返回值

  • 返回值小于0, 表示出錯;
  • 返回值等于0, 表示poll函數等待超時;
  • 返回值大于0, 表示poll由于監聽的文件描述符就緒而返回.

poll示例: 使用poll監控标準輸入

#include <poll.h> #include <unistd.h> #include <stdio.h> int main() { struct pollfd poll_fd; poll_fd.fd = 0; poll_fd.events = POLLIN;//組織事件結構 ,監控标準輸入的可讀事件 while(1) { int ret = poll(&poll_fd, 1, 1000);//帶有超時時間的監控 if (ret < 0) { perror("poll"); continue; } if (ret == 0) { printf("poll timeout\n"); continue; } if (poll_fd.revents == POLLIN) { //返回就緒事件為可讀,即進行IO操作讀取标準輸入 char buf[1024] = {0}; read(0, buf, sizeof(buf) - 1); printf("stdin:%s", buf); } } return 0; }

io流操作的固定流程(并發服務器IO多路複用之poll)2

poll的優點

  • 不同于select使用三個事件結構(位圖)來表示三個fdset的方式,poll使用一個pollfd事件結構的指針實現,簡化了代碼的編寫
  • pollfd結構包含了要監視的event和發生的event,不再使用select“參數-值”傳遞的方式. 接口使用比select更方便.
  • poll并沒有最大數量限制 (但是數量過大後性能也是會下降).

poll的缺點

  • poll跨平台移植性不如select, poll隻能在linux環境下使用,
  • 和select函數一樣,poll返回後,需要輪詢pollfd來獲取就緒的描述符.
  • 每次調用poll都需要把大量的pollfd結構從用戶态拷貝到内核中.
  • 同時連接的大量客戶端在一時刻可能隻有很少的處于就緒狀态, 因此随着監視的描述符數量的增長, 其效率也會線性下降.
I/O多路複用之epoll

epoll初識

按照man手冊的說法: 是為處理大批量句柄而作了改進的poll.

它是在2.5.44内核中被引進的(epoll(4) is a new API introduced in Linux kernel 2.5.44)

它幾乎具備了之前所說的一切優點,被公認為Linux2.6下性能最好的多路I/O就緒通知方法.

epoll接口使用方便: 雖然拆分成了三個系統調用函數. 但是反而使用起來更方便高效.

EPOLL的使用過程就是三部曲:

  • 調用epoll_create創建一個epoll句柄;
  • 調用epoll_ctl, 将要監控的文件描述符進行注冊;
  • 調用epoll_wait, 等待文件描述符就緒;

epoll_create 創建epoll操作句柄

int epoll_create(int size); size :本來的含義是定義epoll最大能夠監控的文件描述符個數 但在linux内核版本2.6.8之後.該參數size就已經被棄用了.内存現在采用的是擴容的方式 size是不可以傳入負數的! ! 用完之後, 必須調用close()關閉. 返回值:返回epoll操作句柄,說白了,就是操作struct eventpoll結構體的的鑰匙

從内核角度分析:此函數在内核當中創建一個結構體, struct eventpoll結構體,此結構體裡有兩個數據結構:紅黑樹,雙向鍊表。

io流操作的固定流程(并發服務器IO多路複用之poll)3

而紅黑樹,衆所周知,查找效率很高,而epoll便是将要監控的描述符組織成紅黑樹的數據結構,這樣查找有IO事件觸發的的描述符時效率就比select和poll的輪詢遍曆快了不少(都不是一個數量級了);

而雙向鍊表,用來保存紅黑樹中返回的有IO事件觸發的文件描述符(就緒的文件描述符);這樣從監控到最後的IO讀寫,時間效率大大提升,且不受要監控的描述符增多的影響。

epoll_ctl epoll的事件注冊函數

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

它不同于select()是在監聽事件時告訴内核要監聽什麼類型的事件, 而是在這裡先注冊要監聽的事件類型.

第一個參數是 epfd: epoll_create()的返回值(即epoll操作句柄).

第二個參數op(option) : 表示動作:讓epoll ctl函數做什麼事情,用三個宏來表示.

  • EPOLL_ CTL ADD :添加一個文件描述符對應的事件結構到紅黑樹當中
  • EPOLL_ CTL MOD:修改一個已經在紅黑樹當中的事件結構
  • EPOLL_ CTL DEL :從epoll的紅黑樹當中删除一個文件描述符對應的事件結構

第三個參數是fd: 告訴epoll用戶關心的文件描述符

第四個參數event: 是告訴内核需要監聽什麼事.

類型是struct epoll_event結構體,即epoll的事件結構

struct epoll_event結構如下:

struct epoll_event { uint32_t events; //用戶對描述符關心的事件 epoll_data_t data; // epoll_data類型的 用戶數據變量 } __EPOLL_PACKED;

events事件可以是以下幾個宏的集合:

  • EPOLLIN : 表示對應的文件描述符可以讀 (包括對端socket正常關閉);
  • EPOLLOUT : 表示對應的文件描述符可以寫;
  • EPOLLPRI : 表示對應的文件描述符有緊急的數據可讀 (這裡應該表示有帶外數據到來);
  • EPOLLERR : 表示對應的文件描述符發生錯誤;
  • EPOLLHUP : 表示對應的文件描述符被挂斷;
  • EPOLLET : 将EPOLL設為邊緣觸發(Edge Triggered)模式, 這是相對于水平觸發(Level Triggered)來說的.
  • EPOLLONESHOT:隻監聽一次事件, 當監聽完這次事件之後, 如果還需要繼續監聽這個socket的話, 需要再次把這個socket加入到EPOLL隊列裡.

相關視頻推薦

【高性能服務器】6種epoll的做法,每一種都有自己适用的業務場景

網絡原理tcp/udp,網絡編程epoll/reactor,面試中常見“八股文”

學習地址:C/C Linux服務器開發/後台架構師【零聲教育】-學習視頻教程-騰訊課堂

需要C/C Linux服務器架構師學習資料加qun812855908獲取(資料包括C/C ,Linux,golang技術,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK,ffmpeg等),免費分享

io流操作的固定流程(并發服務器IO多路複用之poll)4

epoll_data 聯合結構體:

typedef union epoll_data { void *ptr; //可以傳遞一些信息,當epoll監控該描述符就緒的時候,返回之後,程序也就可以拿到這些信息 int fd; //用戶關心的文件描述符,可以當做文件描述符事件就緒之後,返回給程序員看的 uint32_t u32; uint64_t u64; } epoll_data_t;

對于 ptr 和 fd共用一塊内存,兩者在使用的時候,隻能任選其一:

ptr :傳入一個結構體"struct my_ epoll_ data{ int fd}" ,必須在結構體當中包含一個文件描述符

fd : fd的取值為文件描述符數值

一般都是使用fd成員。

epoll_wait 監控

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

epfd : epolI操作句柄

events : 分配好的epoll_event結構體數組,epoll将會把發生的事件賦值到events數組中

—》出參,返回就緒的事件結構(每一個事件結構都對應一個文件描述符)

maxevents :maxevents告之内核這個events有多大,這個 maxevents的值不能大于創建epoll_create()時的size.即最大能夠拷貝多少個事件結構,

timeout:

大于0 :帶有超時時間,單位為毫秒

等于0:非阻塞.

小于0 :阻塞

返回值:

大于0 :返回就緒的文件描述符個數

等于0 :等待超時

小于0 :監控出錯

epoll工作原理

io流操作的固定流程(并發服務器IO多路複用之poll)5

io流操作的固定流程(并發服務器IO多路複用之poll)6

當某一進程調用epoll_create方法時,Linux内核會創建一個eventpoll結構體,這個結構體中有兩個成員與epoll的使用方式密切相關.

struct eventpoll{ .... /*紅黑樹的根節點,這顆樹中存儲着所有添加到epoll中的需要監控的事件*/ struct rb_root rbr; /*雙鍊表中則存放着将要通過epoll_wait返回給用戶的滿足條件的事件*/ struct list_head rdlist; .... };

每一個epoll對象都有一個獨立的eventpoll結構體,用于存放通過epoll_ctl方法向epoll對象中添加進來的事件.

這些事件都會挂載在紅黑樹中,如此,重複添加的事件就可以通過紅黑樹而高效的識别出來(紅黑樹的插入時間效率是lgn,其中n為樹的高度).

而所有添加到epoll中的事件都會與設備(網卡)驅動程序建立回調關系,也就是說,當響應的事件發生時會調用這個回調方法.

這個回調方法在内核中叫ep_poll_callback,它會将發生的事件添加到rdlist雙鍊表中.

在epoll中,對于每一個事件,都會建立一個epitem結構體.

struct epitem{ struct rb_node rbn;//紅黑樹節點 struct list_head rdllink;//雙向鍊表節點 struct epoll_filefd ffd; //事件句柄信息 struct eventpoll *ep; //指向其所屬的eventpoll對象 struct epoll_event event; //期待發生的事件類型 }

當調用epoll_wait檢查是否有事件發生時,隻需要檢查eventpoll對象中的rdlist雙鍊表中是否有epitem元素即可.

如果rdlist不為空,則把發生的事件結構通過頁表映射到用戶态虛拟地址空間,同時将事件數量返回給用戶. 這個操作的時間複雜度是O(1).

epoll的優點(和 select 的缺點對應)

  • 接口使用方便: 雖然拆分成了三個函數, 但是反而使用起來更方便高效. 不需要每次循環都設置關注的文件描述符, 也做到了輸入輸出參數分離開
  • 數據拷貝輕量: 隻在合适的時候調用 EPOLL_CTL_ADD 将文件描述符結構拷貝到内核中(挂載到紅黑樹上), 這個操作并不頻繁(而select/poll都是每次循環都要進行拷貝)
  • 事件回調機制: 避免使用遍曆, 而是使用回調函數的方式, 将就緒的文件描述符結構加入到就緒隊列中,epoll_wait 返回直接訪問就緒隊列就知道哪些文件描述符就緒. 這個操作時間複雜度O(1). 即使文件描述符數目很多, 效率也不會受到影響.
  • 沒有數量限制: 文件描述符數目無上限.

epoll工作方式

epoll有2種工作方式: 水平觸發(LT) 和 邊緣觸發(ET)。

假如有這樣一個例子:

我們已經把一個tcp socket添加到epoll描述符(即已就緒) 這個時候socket的另一端被寫入了2KB的數據 調用epoll_wait,并且它檢測到IO事件觸發後會立即返回,說明它已經準備好讀取操作 然後調用read讀取數據, 隻讀取了1KB的數據(由于緩沖區裡時字節流,讀取的字節大小可以自己定義) (由于沒有讀取完)繼續調用epoll_wait......

水平觸發Level Triggered 工作模式

epoll默認狀态下就是LT工作模式.

  • 當epoll檢測到socket上事件就緒的時候, 可以不立刻進行處理. 或者隻處理一部分.

如上面的例子, 由于隻讀了1K數據, 緩沖區中還剩1K數據, 在第二次調用 epoll_wait 時, epoll_wait仍然會立刻返回并通知socket讀事件就緒.

  • 直到緩沖區上所有的數據都被處理完, epoll_wait 才不會立刻返回.
  • 支持阻塞讀寫和非阻塞讀寫

即:

對于可讀事件:隻要接收緩沖區當中的數據大于低水位标記(1字節) ,就會一直觸發可讀事件就緒,直到接收緩沖區當中沒有數據可讀

對于可寫事件: 隻要發送緩沖區當中的數據空間大小大于低水位标記( 1字節), 就會一直觸發可寫事件就緒,直到發送緩沖區當中沒有空間可寫

邊緣觸發Edge Triggered工作模式

如果我們在第1步将socket添加到epoll描述符的時候使用了EPOLLET标志, epoll進入ET工作模式.

當epoll檢測到socket上事件就緒時, 必須立刻處理.如上面的例子, 雖然隻讀了1K的數據, 緩沖區還剩1K的數據, 在第二次調用 epoll_wait 的時候, epoll_wait 不會再返回了.

也就是說, ET模式下, 文件描述符上的事件就緒後, 隻有一次處理機會,直到有新的事件就緒,才會再返回,但是由于是字節流的問題,可能一份數據要分好幾次讀,所以就要使用while循環來利用這一次僅有的機會,把一份數據順利讀完。

ET的性能比LT性能更高( epoll_wait 返回的次數少了很多). Nginx默認采用ET模式使用epoll.

隻支持非阻塞的讀寫(需要使用fnctl函數設置文件描述符屬性為非阻塞)

即:

對于可讀事件:隻有當新就緒事件到來的時候,才會一次觸發可讀處理。如果應用程序沒有将接收緩沖區當中的數據讀走或者讀完,也不會在再通知;直到又來一個新就緒事件,才會觸發可讀事件;

對于可寫事件:隻有發送緩沖區剩餘空間從不可寫變成可寫才會觸發一次可寫事件就緒(同上)。

select和poll其實也是工作在LT模式下. 而epoll既可以支持LT, 也可以支持ET.

epoll示例: 使用epoll監控标準輸入(水平觸發LT模式)

#include <stdio.h> #include <unistd.h> #include <sys/epoll.h> int main() { int epollfd = epoll_create(10);//創建epoll操作句柄 if(epollfd < 0) { perror("epoll_create"); return 0; } struct epoll_event ev;//組織事件結構 ev.events = EPOLLIN; ev.data.fd = 0; epoll_ctl(epollfd, EPOLL_CTL_ADD, 0, &ev);//将其添加進要内核監視的結構中 while(1) { struct epoll_event fd_arr[10];//保存就緒返回的文件描述符結構 int ret = epoll_wait(epollfd, fd_arr, sizeof(fd_arr)/sizeof(fd_arr[0]), 3000); //監控 if(ret < 0) { perror("epoll_wait"); return 0; } else if(ret == 0) // { printf("timeout out\n"); continue; } //監控返回,即有 IO事件觸發,讀數據 //epoll默認為水平觸發,隻要接收緩沖區不為空,監視函數就會一直返回,通知用戶讀取數據 for(int i = 0; i < ret; i ) { if(fd_arr[i].data.fd == 0) { //char buf[1024] = {0}; char buf[3]={0};//将接收緩沖區容量設置為3(模拟一次讀不完緩沖區全部數據的場景),測試水平觸發應接收緩沖區不為空而不斷返回通知讀取的工作流程 read(fd_arr[i].data.fd, buf, sizeof(buf) - 1); printf("buf is %s\n", buf); } } } return 0; }

buf容量足夠大時:char buf[1024] = {0};

io流操作的固定流程(并發服務器IO多路複用之poll)7

buf容量小的不足以一次讀完緩沖區裡的數據時:char buf[3] = {0};

io流操作的固定流程(并發服務器IO多路複用之poll)8

epoll示例2: 使用epoll監控标準輸入(邊緣觸發ET模式)

對于ET模式必須利用while循環把一份數據順利讀完,那麼我們怎麼判斷是否将一個完整的數據讀完呢?

則可以根據read返回讀取成功的有效字節數來判斷

即:

如果判斷read函數的返回值比我們準備的buf的最大接收能力(設置的緩沖區長度)還小,那就說明讀完了,退出循環。

但是也要考慮一種情況:如果該數據長度是buf長度的整數倍呢,比如,四個字節的數據abcd,每次讀兩個字節,讀兩次,每次read返回值不小于buf長度,其實兩次已經讀完了,但是根據那一個條件無法判斷是否讀完接着繼續循環,而再去第三次時,由于文件描述符默認為阻塞屬性,而接收緩沖區為空,read就會陷入饑餓狀态(即阻塞在read中,等待),所以對于ET模式循環讀,要避免read的讀饑餓,所以要提前設置文件描述符為非阻塞屬性。因為對于非阻塞 IO 讀數據, 如果接受緩沖區為空, 就會返回錯誤,錯誤碼為 EAGAIN 或者 EWOULDBLOCK, 本意是需要重試,但是我們可以根據這個錯誤碼來解決整數倍的問題,即判斷若為錯誤碼,則說明正好讀完了緩沖區裡的數據,跳出循環。

#include<stdio.h> #include<unistd.h> #include<sys/epoll.h> #include<fcntl.h> #include<string> #include<errno.h> using namespace std; void SetfdNoBlock(int fd)//設置非阻塞屬性 { int fl=fcntl(fd,F_GETFL); if(fl < 0) { perror("fcntl"); return ; } fcntl(fd,F_SETFL, fl|O_NONBLOCK); } int main() { //1.将标準輸入文件描述符設置為非阻塞屬性(用于邊緣觸發ET模式隻通知一次,所以必須使用循環讀,來判斷是否讀取完 整條數據) SetfdNoBlock(0); //2.創建epoll結構 返回操作句柄 int epollfd=epoll_create(10); if(epollfd < 0) { perror("epoll_create"); return 0; } //3.組織事件結構,再将其加入監視 struct epoll_event ev; ev.data.fd=0; ev.events=EPOLLIN |EPOLLET ; //ET模式 epoll_ctl(epollfd,EPOLL_CTL_ADD,0,&ev); //4.監視 while(1) { epoll_event event_arr[10]; int ret=epoll_wait(epollfd,event_arr,10,-1); if(ret < 0) { perror("epoll_wait"); continue; } //有IO事件觸發,監視函數檢測到後返回觸發個數 for(int i = 0;i < ret; i ) { if(event_arr[i].events == EPOLLIN ) { string read_ret; while(1)//由于ET模式隻會通知一次,所以必須加循環将緩沖區的所有數據讀完 { char buf[3]={0}; ssize_t readsize = read(0,buf,sizeof(buf)-1); if(readsize < 0) { // 對于非阻塞 IO 讀數據, 如果 接受緩沖區為空, 就會返回錯誤 // 錯誤碼為 EAGAIN 或者 EWOULDBLOCK , 需要重試 if(errno == EAGAIN || errno == EWOULDBLOCK ) { //說明數據正好讀完,跳出循環 goto overend; } perror("read"); return 0; } read_ret =buf; // 如果當前讀到的數據長度小于嘗試讀的緩沖區的長度, 就退出循環 // 這種寫法其實不算特别嚴謹(沒有考慮粘包問題) if(readsize <(ssize_t)sizeof(buf)-1) { overend: printf("stdin: %s\n",read_ret.c_str()); break; } } } } } return 0; }

io流操作的固定流程(并發服務器IO多路複用之poll)9

基于epoll的并發TCP服務器(水平觸發LT)

封裝epoll操作

#pragma once #include "tcpclass.hpp" #include<vector> #include<stdio.h> #include<unistd.h> #include<sys/epoll.h> class EpollSever { public: EpollSever() { epoll_fd=-1; } ~EpollSever() {} bool init_create(int size) { epoll_fd = epoll_create(size);//創建epoll if(epoll_fd < 0) { return false; } return true; } bool Add_events(int fd)//往epoll結構裡添加要監視的事件 { struct epoll_event ev; ev.data.fd=fd; ev.events=EPOLLIN; int ret= epoll_ctl(epoll_fd,EPOLL_CTL_ADD,fd,&ev); if(ret < 0) { perror("epoll_ctl"); return false; } return true; } bool Del_events(int fd)//删除事件 { int ret=epoll_ctl(epoll_fd,EPOLL_CTL_DEL,fd,NULL); if(ret < 0) { perror("epoll_ctl"); return false; } return true; } bool Epoll_Listen(vector<Tcpsc>* v)//監視 { struct epoll_event event_arr[10]; size_t ret=epoll_wait(epoll_fd,event_arr,sizeof(event_arr)/sizeof(event_arr[0]),-1); if(ret < 0) { perror("epoll_wait"); return false; } else if(ret == 0) { printf("timeout!"); return false; } if(ret > sizeof(event_arr)/sizeof(event_arr[0])) //防止數組越界 { ret = sizeof(event_arr)/sizeof(event_arr[0]); } for(size_t i= 0;i < ret; i )//将就緒的IO事件封裝到tcp類中,由那邊具體使用 { Tcpsc tc; tc.Setfd(event_arr[i].data.fd); v->push_back(tc); } return true; } private: int epoll_fd;//epoll操作句柄 };

socket操作類

#pragma once #include<cstdio> #include<cstdlib> #include<unistd.h> #include<string> #include<sys/socket.h> #include<arpa/inet.h> #include<iostream> #include<netinet/in.h> #include<sys/types.h> using namespace std; class Tcpsc { public: Tcpsc() { sock_=-1; } ~Tcpsc() { } //創建套接字 bool CreateSocket() { sock_=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); if(sock_ < 0) { perror("socket"); return false; } int opt=1; setsockopt(sock_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt));//地址複用 return true; } //綁定地址信息 bool Bind(std::string& ip,uint16_t port) { struct sockaddr_in addr;//組織成ipv4地址結構 addr.sin_family =AF_INET; addr.sin_port=htons(port); addr.sin_addr.s_addr=inet_addr(ip.c_str()); int ret=bind(sock_,(struct sockaddr*)&addr,sizeof(addr)); if(ret < 0) { perror("bind"); return false; } return true; } //監聽 bool Listen(int backlog=5) { int ret=listen(sock_,backlog); if(ret < 0) { perror("listen"); return false; } return true; } //accept 服務器獲取連接 //bool Accept(struct sockaddr_in* peeraddr,int* newfd) //peeraddr :出參。保存的是客戶端的地址信息,newfd:出參,表示完成連接的可以進行通信的新創建出來的套接字描述符 bool Accept(struct sockaddr_in* peeraddr,Tcpsc* newsc)//這裡用一個類的實例化指針,把數據傳出去 { socklen_t addrlen=sizeof(struct sockaddr_in);//記錄地址信息長度 int newserverfd=accept(sock_,(struct sockaddr*)peeraddr,&addrlen); if(newserverfd < 0) { perror("accept"); return false; } newsc->sock_=newserverfd;//傳出去新創建出來的用來通信的套接字 return true; } //connect 客戶端調用來連接服務端 bool Connect(string& ip,uint16_t port) { struct sockaddr_in addr;//還是先組織服務端地址信息 addr.sin_family =AF_INET; addr.sin_port=htons(port); addr.sin_addr.s_addr=inet_addr(ip.c_str()); int ret=connect(sock_,(struct sockaddr*)&addr,sizeof(addr)); if(ret < 0) { perror("connect"); return false; } return true; } //因為是已經建立連接了的,所以參數就隻是數據,和已完成連接的可以進行通信的socket套接字 //發送數據 bool Send(string& data) { int sendsize=send(sock_,data.c_str(),data.size(),0); if(sendsize < 0) { perror("sned"); return false; } return true; } //接收數據 bool Recv(string* data)//出參,保留信息 { char buf[1024]={0}; int recvsize=recv(sock_,buf,sizeof(buf)-1,0); if(recvsize < 0) { perror("recv"); return false; } else if(recvsize==0)//對端已關閉close { printf("peer is close connect"); return false; } (*data).assign(buf,recvsize);//賦值給傳出型參數 return true; } //關閉套接字 void Close() { close(sock_); sock_=-1; } int Getfd() { return sock_; } void Setfd(int fd) { sock_=fd; } private: int sock_; };

客戶端連接操作及收發數據

#include"tcpclass.hpp" int main(int argc,char* argv[]) { if(argc!=3) { printf("please enter true server_ip and port!"); return 0; } string ip=argv[1]; uint16_t port=atoi(argv[2]); Tcpsc sc; if(!sc.CreateSocket()) { return 0; } if(!sc.Connect(ip,port)) { return 0; } //連接完成,開始收發數據 while(1) { //發送數據 printf("cli say:"); fflush(stdout); string buf; cin>>buf; sc.Send(buf); //接收服務端回複的數據 sc.Recv(&buf); printf("server reply:%s\n",buf.c_str()); } sc.Close();//其實進程結束後會自動關閉描述符的 return 0; }

主函數邏輯:

還是服務器端的基本邏輯 創建套接字–》綁定地址信息–》轉化為監聽套接字–》 加入epoll結構 --》使用epoll進行監聽事件 --》返回就緒的文件描述符 --?判斷是新連接還是數據到來—?若是是新連接就調用accpet函數創建新的用于通信的套接字,并将其加入epoll結構,等待事件就緒。 若是數據到來,即讀取數據

#include"epoll_lt_tcpsvr.hpp" #define CHECK_RET(q) if(!q) {return -1;} int main() { Tcpsc listen_ts; CHECK_RET(listen_ts.CreateSocket()); string ip("0.0.0.0"); CHECK_RET(listen_ts.Bind(ip,19999)); CHECK_RET(listen_ts.Listen()); EpollSever es; CHECK_RET(es.init_create(10)); es.Add_events(listen_ts.Getfd());//先将監控描述符添加到epoll結構中 while(1) { //監控 vector<Tcpsc> v; if(!es.Epoll_Listen(&v)) { continue; } //返回就緒事件,判斷是新連接還是數據到來 for(size_t i = 0; i < v.size();i ) { if(v[i].Getfd() == listen_ts.Getfd())//是偵聽套接字上的就緒事件,說明是新連接 { //調用 accept函數創建新的套接字用于通信,并将其添加到epoll中 struct sockaddr_in peeraddr;//對端的地址信息 Tcpsc newts;//用于保存新創建出來的套接字 listen_ts.Accept(&peeraddr,&newts); printf("新的客戶端連接----->[ip]:%s,[port]:%d\n",inet_ntoa(peeraddr.sin_addr),peeraddr.sin_port); //再将其添加進去 es.Add_events(newts.Getfd()); } else //否則,就是新數據到來,讀取操作 { string read_data; bool ret=v[i].Recv(&read_data); if(!ret) { es.Del_events(v[i].Getfd()); v[i].Close(); } printf("客戶端向你說話:%s\n",read_data.c_str()); } } } return 0; }

io流操作的固定流程(并發服務器IO多路複用之poll)10

基于epoll的并發TCP服務器(邊緣觸發ET)

epoll功能封裝

epoll_et_tcpsvr.hpp #pragma once #include<stdio.h> #include<sys/epoll.h> #include<unistd.h> #include<stdlib.h> #include<vector> #include<errno.h> #include"tcpclass.hpp" class EpollSvr { public: EpollSvr() { epoll_fd=-1; } ~EpollSvr() { } bool InitSvr(int size) { //創建epoll 操作句柄 epoll_fd = epoll_create(size); if(epoll_fd < 0) { perror("epoll_create"); return false; } return true; } //對于ET模式,還需要再添加事件結構的跟上模式的指定 bool Addevent(int fd,bool is_ET=false) { //組織事件結構、 struct epoll_event ev; ev.data.fd = fd; if(is_ET) ev.events = EPOLLIN | EPOLLET; else ev.events =EPOLLIN; //添加此事件結構到epoll中 int ret=epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd,&ev); if(ret < 0) { perror("epoll_ctl"); return false; } return true; } //從epoll中删除事件 bool Delevent(int fd) { int ret = epoll_ctl(epoll_fd, EPOLL_CTL_DEL,fd ,NULL); if(ret < 0) { perror("epoll_ctl"); return false; } return true; } //監視 bool EventListen(vector<Tcpsc>* v) { struct epoll_event event_arr[10]; int ret=epoll_wait(epoll_fd,event_arr,sizeof(event_arr)/sizeof(event_arr[0]),-1); if(ret < 0) { perror("epoll_wait"); return false; } //監視返回,即有事件觸發,将其包裝為 Tcpsc類對象,返回給主函數判斷及使用 for(int i=0;i < ret;i ) { Tcpsc ts; ts.Setfd(event_arr[i].data.fd); v->push_back(ts); } return true; } private: int epoll_fd;//epoll操作句柄 };

tcp服務器功能封裝

相較于之前的,由于ET模式的特性加了設置非阻塞屬性、非阻塞接收和非阻塞寫的接口。

#pragma once #include<cstdio> #include<cstdlib> #include<unistd.h> #include<string> #include<sys/socket.h> #include<arpa/inet.h> #include<iostream> #include<netinet/in.h> #include<sys/types.h> #include<fcntl.h> using namespace std; class Tcpsc { public: Tcpsc() { sock_=-1; } ~Tcpsc() { } //創建套接字 bool CreateSocket() { sock_=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); if(sock_ < 0) { perror("socket"); return false; } int opt=1; setsockopt(sock_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt));//地址複用 return true; } //綁定地址信息 bool Bind(std::string& ip,uint16_t port) { struct sockaddr_in addr;//組織成ipv4地址結構 addr.sin_family =AF_INET; addr.sin_port=htons(port); addr.sin_addr.s_addr=inet_addr(ip.c_str()); int ret=bind(sock_,(struct sockaddr*)&addr,sizeof(addr)); if(ret < 0) { perror("bind"); return false; } return true; } //監聽 bool Listen(int backlog=5) { int ret=listen(sock_,backlog); if(ret < 0) { perror("listen"); return false; } return true; } //accept 服務器獲取連接 //bool Accept(struct sockaddr_in* peeraddr,int* newfd) //peeraddr :出參。保存的是客戶端的地址信息,newfd:出參,表示完成連接的可以進行通信的新創建出來的套接字描述符 bool Accept(struct sockaddr_in* peeraddr,Tcpsc* newsc)//這裡用一個類的實例化指針,把數據傳出去 { socklen_t addrlen=sizeof(struct sockaddr_in);//記錄地址信息長度 int newserverfd=accept(sock_,(struct sockaddr*)peeraddr,&addrlen); if(newserverfd < 0) { perror("accept"); return false; } newsc->sock_=newserverfd;//傳出去新創建出來的用來通信的套接字 return true; } //connect 客戶端調用來連接服務端 bool Connect(string& ip,uint16_t port) { struct sockaddr_in addr;//還是先組織服務端地址信息 addr.sin_family =AF_INET; addr.sin_port=htons(port); addr.sin_addr.s_addr=inet_addr(ip.c_str()); int ret=connect(sock_,(struct sockaddr*)&addr,sizeof(addr)); if(ret < 0) { perror("connect"); return false; } return true; } //因為是已經建立連接了的,所以參數就隻是數據,和已完成連接的可以進行通信的socket套接字 //發送數據 bool Send(string& data) { int sendsize=send(sock_,data.c_str(),data.size(),0); if(sendsize < 0) { perror("sned"); return false; } return true; } //接收數據 bool Recv(string* data)//出參,保留信息 { char buf[1024]={0}; int recvsize=recv(sock_,buf,sizeof(buf)-1,0); if(recvsize < 0) { perror("recv"); return false; } else if(recvsize==0)//對端已關閉close { printf("peer is close connect"); return false; } (*data).assign(buf,recvsize);//賦值給傳出型參數 return true; } //關閉套接字 void Close() { close(sock_); sock_=-1; } int Getfd() { return sock_; } void Setfd(int fd) { sock_=fd; } //ET模式下的非阻塞接收和非阻塞發送 //利用while循環 ,将數據保存到出參data裡 bool RecvNoBlock(string* data ) { while(1) { //sockfd_ 不是偵聽套接字,而是已連接的用于通信的套接字描述符了 //sockfd_ 已經被設置加上了非阻塞屬性,所以在判斷返回值時候,需要注意 緩沖區為空(正好被接收完)的時候,recv函數返回有EAGAIN或者EWOULDBLOCK的情況産生 char buf[3]={0}; ssize_t readsize = recv(sock_,buf,sizeof(buf)-1,0); if(readsize < 0) { if(errno == EAGAIN || errno == EWOULDBLOCK) { break; } perror("recv"); return false; } else if(readsize == 0) { printf("對端關閉了連接!"); return false; } *data = buf; if(readsize < (ssize_t)sizeof(buf)-1) { break; } } return true; } //非阻塞發送 bool SendNoBlock(string& buf)//将傳進來的數據發送出去 { //使用指針和字節數 來确保數據全部發送完 ssize_t pos=0; //記錄當前寫到的位置 ssize_t lensize = buf.size(); //記錄剩餘字節數 while(1) { //對于非阻塞IO寫入,如果tcp的發送緩沖區已經滿了,則寫操作也會返回 錯誤碼提示 ssize_t sendsize = send(sock_, buf.data() pos,lensize, 0); if(sendsize < 0 ) { if(errno == EAGAIN || errno == EWOULDBLOCK) { //即使發送緩沖區滿了,可能也沒有把所有數據全部寫入,所以繼續重新寫入 continue; } perror("send"); return false; } //更新指針位置 和 剩餘字節數 即加減實際發送字節數 pos = sendsize; lensize -= sendsize; //推出條件,即真正寫完了 if(lensize <= 0) { break; } } return true; } //将文件描述符設置為非阻塞屬性 ET模式下 void SetNoBlock() { int fl =fcntl(sock_,F_GETFL); if(fl < 0) { perror("fcntl"); return ; } fcntl(sock_,F_SETFL,fl| O_NONBLOCK); } private: int sock_; };

簡單的客戶端邏輯:

#include"tcpclass.hpp" int main(int argc,char* argv[]) { if(argc!=3) { printf("please enter true server_ip and port!"); return 0; } string ip=argv[1]; uint16_t port=atoi(argv[2]); Tcpsc sc; if(!sc.CreateSocket()) { return 0; } if(!sc.Connect(ip,port)) { return 0; } //連接完成,開始收發數據 while(1) { //發送數據 printf("i am client, say:"); fflush(stdout); string buf; cin>>buf; sc.Send(buf); //接收服務端回複的數據 sc.Recv(&buf); printf("peer server reply:%s\n",buf.c_str()); } sc.Close();//其實進程結束後會自動關閉描述符的 return 0; }

main.cpp邏輯:

還是服務器端的基本邏輯 創建套接字–》綁定地址信息–》轉化為監聽套接字–》 加入epoll結構 --》使用epoll進行監聽事件 --》返回就緒的文件描述符 --?判斷是新連接還是數據到來—?若是是新連接就調用accpet函數創建新的用于通信的套接字,先設置為非阻塞模式,并将其加入epoll結構,等待事件就緒。 若是數據到來,即讀取數據(ET模式下的非阻塞循環讀(RecvNoBlock接口))

#include"epoll_et_tcpsvr.hpp" #define CHECK_RET(q) if(!q) {return -1;} int main() { Tcpsc listen_ts; CHECK_RET(listen_ts.CreateSocket()); string ip("0.0.0.0"); CHECK_RET(listen_ts.Bind(ip,19999)); CHECK_RET(listen_ts.Listen()); EpollSvr es; es.InitSvr(10);//創建epoll操作句柄 es.Addevent(listen_ts.Getfd());//先将監控描述符添加到epoll結構中 ,并設置為ET邊緣觸發 while(1) { //監控 vector<Tcpsc> v; if(!es.EventListen(&v)) { continue; } //返回就緒事件,判斷是新連接還是數據到來 for(size_t i = 0; i < v.size();i ) { if(v[i].Getfd() == listen_ts.Getfd())//是偵聽套接字上的就緒事件,說明是新連接 { //調用 accept函數創建新的套接字用于通信,并将其添加到epoll中 struct sockaddr_in peeraddr;//對端的地址信息 Tcpsc newts;//用于保存新創建出來的套接字 listen_ts.Accept(&peeraddr,&newts); printf("新的客戶端連接----->[ip]:%s,[port]:%d\n",inet_ntoa(peeraddr.sin_addr),peeraddr.sin_port); newts.SetNoBlock();//設置為非阻塞屬性 //再将其添加進去 es.Addevent(newts.Getfd(),true); } else //否則,就是新數據到來,讀取操作,為ET模式時,要調用非阻塞recv方法 { string read_data; bool ret=v[i].RecvNoBlock(&read_data); if(!ret) { es.Delevent(v[i].Getfd()); v[i].Close(); continue; } printf("客戶端向你說話----》:%s\n",read_data.c_str()); read_data.clear(); read_data.assign("OK!本服務器已收到!"); v[i].SendNoBlock(read_data); } } } return 0; }

兩個客戶端示例:

io流操作的固定流程(并發服務器IO多路複用之poll)11

io流操作的固定流程(并發服務器IO多路複用之poll)12

tcp服務端的業務處理:

io流操作的固定流程(并發服務器IO多路複用之poll)13

基于IO多路複用的tcp服務器的優點:

  • 一個基于I/O多路複用的事件驅動服務器是運行在單一進程上下文中的,因此每個邏輯流都能訪問該進程的全部地址空間。這使得在流之間共享數據變得很容易。一個與作為單個進程運行相關的優點是,你可以利用熟悉的調試工具,例如GDB,來調試你的并發服務器,就像對順序程序那樣。
  • 并且事件驅動設計常常比基于進程的設計要高效得多,因為它們不需要進程上下文切換來調度新的流。
,

更多精彩资讯请关注tft每日頭條,我们将持续为您更新最新资讯!

查看全部

相关科技资讯推荐

热门科技资讯推荐

网友关注

Copyright 2023-2024 - www.tftnews.com All Rights Reserved