ReactorV2 与V1的区别 在Reactor中增加了epoll监听,然后采用了函数回调机制,让处理新连接,处理新消息已经处理断开更加的灵活
类图 首先是不增加回调的类图
不增加回调会有一些问题,例如客户端断开连接,服务端就会一直就绪,并且如果处理信息那部分要实现别的功能的话,就需要修改源码,所以采用回调函数的可扩展性更强
增加回调的类图
代码实现 Socket.hpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 #ifndef SOCKET #define SOCKET class Socket { public : Socket () ; explicit Socket (int fd) ; ~Socket () ; int getFd () ; Socket (const Socket & rhs) = delete ; Socket & operator =(const Socket & rhs) = delete ; private : int _fd; }; #endif
Socket.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 #include "Socket.hpp" #include <my_header.h> Socket::Socket (){ _fd = socket (AF_INET,SOCK_STREAM,0 ); if (_fd < 0 ){ perror ("_fd error" ); return ; } } Socket::Socket (int fd) :_fd(fd) {} Socket::~Socket () { close (_fd); } int Socket::getFd () { return _fd; }
InetAddress.hpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #ifndef INETADDRESS #define INETADDRESS #include <string> #include <my_header.h> using namespace std;class InetAddress { public : InetAddress (const string & ip,unsigned short port); InetAddress (const struct sockaddr_in &); ~InetAddress (); string IP () ; unsigned short port () ; struct sockaddr_in * getInetAddress (); private : struct sockaddr_in _addr; }; #endif
InetAddress.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 #include "InetAddress.hpp" InetAddress::InetAddress (const string & ip,unsigned short port) { memset (&_addr,0 ,sizeof (_addr)); _addr.sin_family = AF_INET; _addr.sin_port = htons (port); _addr.sin_addr.s_addr = inet_addr (ip.c_str ()); } InetAddress::InetAddress (const struct sockaddr_in & addr) :_addr(addr) {} InetAddress::~InetAddress (){} string InetAddress::IP () { return string (inet_ntoa (_addr.sin_addr)); } unsigned short InetAddress::port () { return ntohs (_addr.sin_port); } struct sockaddr_in * InetAddress::getInetAddress (){ return &_addr; }
Acceptor.hpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 #ifndef ACCEPTOR #define ACCEPTOR #include "InetAddress.hpp" #include "Socket.hpp" class Acceptor { public : Acceptor (const string &ip, unsigned short port); ~Acceptor () {} int accept () ; void ready () ; int fd () ; private : void setReuseAddr () ; void setReusePort () ; void bind () ; void listen () ; private : Socket _sock; InetAddress _addr; }; #endif
Acceptor.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 #include "Acceptor.hpp" Acceptor::Acceptor (const string &ip, unsigned short port) :_sock() ,_addr(ip,port) {} void Acceptor::setReuseAddr () { int opt = 1 ; int ret = setsockopt (_sock.getFd (),SOL_SOCKET,SO_REUSEADDR,&opt,sizeof (opt)); if (-1 == ret){ perror ("setsockopt ip error" ); return ; } } void Acceptor::setReusePort () { int opt = 1 ; int ret = setsockopt (_sock.getFd (),SOL_SOCKET,SO_REUSEADDR,&opt,sizeof (opt)); if (-1 == ret){ perror ("setsockopt ip error" ); return ; } } void Acceptor::bind () { int ret = ::bind (_sock.getFd (),(struct sockaddr *)_addr.getInetAddress (),sizeof (struct sockaddr)); if (-1 == ret){ perror ("bind error" ); return ; } } void Acceptor::listen () { int ret = ::listen (_sock.getFd (),128 ); if (-1 == ret){ perror ("listen error" ); return ; } } void Acceptor::ready () { setReuseAddr (); setReusePort (); Acceptor::bind (); Acceptor::listen (); } int Acceptor::accept () { int connetFd = ::accept (_sock.getFd (),nullptr ,nullptr ); if (-1 == connetFd){ perror ("accept error" ); return -1 ; } return connetFd; } int Acceptor::fd () { return _sock.getFd (); }
TcpConnection.hpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 #ifndef TCPCONNECTION #define TCPCONNECTION #include <string> #include <memory> #include <functional> #include "SocketIO.hpp" using std::string;using namespace std;class TcpConnection :public enable_shared_from_this<TcpConnection> { using TcpConnectionPtr = shared_ptr<TcpConnection>; using TcpConnectionCallBack = function<void (const TcpConnectionPtr &)>; public : explicit TcpConnection (int fd) ; ~TcpConnection () {} string receive () ; void send (const string & msg) ; public : void setNewConnectionCallBack (const TcpConnectionCallBack & cb) ; void setMessageCallBack (const TcpConnectionCallBack & cb) ; void setCloseCallBack (const TcpConnectionCallBack & cb) ; void handleNewConeectionCallBack () ; void handleMessageCallBack () ; void handleCloseCallBack () ; bool isclose (int fd) const ; private : SocketIO _socketIO ; TcpConnectionCallBack _onNewConnectionCB; TcpConnectionCallBack _onMessageCB; TcpConnectionCallBack _onCloseCB; }; #endif
TcpConnection.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 #include "TcpConnection.hpp" #include <iostream> #include <my_header.h> TcpConnection::TcpConnection (int fd) :_socketIO(fd) {} string TcpConnection::receive () { char buf[65535 ] = {0 }; _socketIO.readLine (buf,sizeof (buf)); return string (buf); } void TcpConnection::send (const string & msg) { _socketIO.writen (msg.c_str (),msg.size ()); } void TcpConnection::setNewConnectionCallBack (const TcpConnectionCallBack & cb) { _onNewConnectionCB = cb; } void TcpConnection::setMessageCallBack (const TcpConnectionCallBack & cb) { _onMessageCB = cb; } void TcpConnection::setCloseCallBack (const TcpConnectionCallBack & cb) { _onCloseCB = cb; } void TcpConnection::handleNewConeectionCallBack () { if (_onNewConnectionCB == nullptr ){ cout << "_onNewConnectionCB is nullptr" << endl; }else { _onNewConnectionCB(shared_from_this ()); } } void TcpConnection::handleMessageCallBack () { if (nullptr == _onMessageCB){ cout << "_onMessageCB is nullptr" << endl; }else { _onMessageCB(shared_from_this ()); } } void TcpConnection::handleCloseCallBack () { if (nullptr == _onCloseCB){ cout << "_onCloseCB is nullptr" << endl; }else { _onCloseCB(shared_from_this ()); } } bool TcpConnection::isclose (int fd) const { char buf[10 ] = {0 }; int ret = recv (fd,buf,sizeof (buf),MSG_PEEK); return 0 == ret; }
SocketIO.hpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 #ifndef SOCKETIO #define SOCKETIO class SocketIO { public : explicit SocketIO (int fd) ; ~SocketIO () {} int readn (char * buf,int len) ; int readLine (char *buf,int len) ; int writen (const char *buf,int len) ; private : int _fd; }; #endif
SocketIO.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 #include "SocketIO.hpp" #include <my_header.h> SocketIO::SocketIO (int fd) :_fd(fd) {} int SocketIO::readn (char * buf,int len) { int left = len; char *pstr = buf; int ret = 0 ; while (left > 0 ){ ret = read (_fd,pstr,left); if (-1 == ret && errno == EINTR){ continue ; }else if (-1 == ret){ perror ("read error" ); return -1 ; }else if (0 == ret){ break ; }else { pstr += ret; left -= ret; } } return len - left; } int SocketIO::readLine (char *buf,int len) { int left = len - 1 ; char *pstr = buf; int ret = 0 ,total = 0 ; while (left > 0 ){ ret = recv (_fd,pstr,left,MSG_PEEK); if (-1 == ret && errno == EINTR){ continue ; }else if (0 == ret){ break ; }else if (-1 == ret){ perror ("read error" ); return -1 ; }else { for (int i = 0 ; i < ret; ++i){ if (pstr[i] == '\n' ){ int sz = i + 1 ; readn (pstr,sz); pstr += sz; *pstr = '\0' ; return total + sz; } } readn (pstr,ret); total += ret; pstr += ret; left -= ret; } } *pstr = '\0' ; return total - left; } int SocketIO::writen (const char *buf,int len) { int left = len; const char * pstr = buf; int ret = 0 ; while (left > 0 ){ ret = write (_fd,pstr,left); if (-1 == ret && errno == EINTR){ continue ; }else if (-1 == ret){ perror ("write error" ); break ; }else if (0 == ret){ break ; }else { left -= ret; pstr += ret; } } return len - left; }
EventLoop.hpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 #ifndef EVENTLOOP #define EVENTLOOP #include <my_header.h> #include <vector> #include <map> #include <memory> #include <functional> using namespace std;class TcpConnection ;class Acceptor ;using TcpConnectionPtr = shared_ptr<TcpConnection>;using TcpConnectionCallBack = function<void (const TcpConnectionPtr &)>;class EventLoop { public : EventLoop (Acceptor & acceptor ) ; ~EventLoop (); void loop () ; void unloop () ; private : void waitEpollFd () ; void handleNewConeection () ; void handMessage (int fd) ; int createEpollFd () ; void addEpollReadFd (int fd) ; void delEpollReadFd (int fd) ; public : void setNewConnectionCallBack (TcpConnectionCallBack && cb) ; void setMessageCallBack (TcpConnectionCallBack && cb) ; void setCloseCallBack (TcpConnectionCallBack && cb) ; private : int _epfd; vector<struct epoll_event> _evtList; bool _isLooping; Acceptor & _acceptor; map<int ,TcpConnectionPtr> _conns; TcpConnectionCallBack _onNewConnectionCB; TcpConnectionCallBack _onMessageCB; TcpConnectionCallBack _onCloseCB; }; #endif
EventLoop.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 #include "EventLoop.hpp" #include "Acceptor.hpp" #include "TcpConnection.hpp" #include <iostream> EventLoop::EventLoop (Acceptor & acceptor ) :_epfd(createEpollFd ()) ,_evtList(1024 ) ,_isLooping(false ) ,_acceptor(acceptor) { int listenFd = _acceptor.fd (); addEpollReadFd (listenFd); } EventLoop::~EventLoop (){ close (_epfd); } void EventLoop::loop () { _isLooping = true ; waitEpollFd (); } void EventLoop::unloop () { _isLooping = false ; } void EventLoop::waitEpollFd () { while (_isLooping){ int nready = epoll_wait (_epfd,&*_evtList.begin (),_evtList.size (),3000 ); if (nready == -1 && errno == EPOLLIN){ continue ; }else if (nready == -1 ){ perror ("nready == -1" ); delEpollReadFd (_epfd); return ; }else if (0 == nready){ cout << ">> epoll_wait timeout!" << endl; }else { if (nready == (int )_evtList.size ()){ _evtList.resize (2 *_evtList.size ()); } for (int i = 0 ; i < nready; ++i){ int fd = _evtList[i].data.fd; if (fd == _acceptor.fd ()){ handleNewConeection (); }else { handMessage (fd); } } } } } void EventLoop::handleNewConeection () { int connfd = _acceptor.accept (); if (-1 == connfd){ perror ("accept error" ); return ; } addEpollReadFd (connfd); TcpConnectionPtr tcp (new TcpConnection(connfd)) ; tcp->setNewConnectionCallBack (move (_onNewConnectionCB)); tcp->setMessageCallBack (move (_onMessageCB)); tcp->setCloseCallBack (move (_onCloseCB)); _conns[connfd] = tcp; tcp->handleNewConeectionCallBack (); } void EventLoop::handMessage (int fd) { auto it = _conns.find (fd); if (it != _conns.end ()){ int flag = it->second->isclose (fd); if (flag){ it->second->handleCloseCallBack (); delEpollReadFd (fd); _conns.erase (it); } else { it->second->handleMessageCallBack (); } }else { cout << "error fd" << endl; } } int EventLoop::createEpollFd () { int fd = epoll_create (10 ); if (fd < 0 ){ cout << "epoll_ctreate error" << endl; return -1 ; } return fd; } void EventLoop::addEpollReadFd (int fd) { struct epoll_event evt; evt.events = EPOLLIN; evt.data.fd = fd; int ret = epoll_ctl (_epfd,EPOLL_CTL_ADD,fd,&evt); if (ret < 0 ){ cout << "epoll_ctl error" << endl; return ; } } void EventLoop::delEpollReadFd (int fd) { struct epoll_event evt; evt.events = EPOLLIN; evt.data.fd = fd; int ret = epoll_ctl (_epfd,EPOLL_CTL_DEL,fd,&evt); if (ret < 0 ){ cout << "epoll_ctl error" << endl; return ; } } void EventLoop::setNewConnectionCallBack (TcpConnectionCallBack && cb) { _onNewConnectionCB = move (cb); } void EventLoop::setMessageCallBack (TcpConnectionCallBack && cb) { _onMessageCB = move (cb); } void EventLoop::setCloseCallBack (TcpConnectionCallBack && cb) { _onCloseCB = move (cb); }
Test.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 #include <iostream> #include "TcpConnection.hpp" #include "Acceptor.hpp" #include "EventLoop.hpp" #include <memory> void test1 () { Acceptor acc ("127.0.0.1" ,8888 ) ; acc.ready (); TcpConnection tcpc (acc.accept()) ; cout << "连接建立!" << endl; while (1 ){ cout << ">> client :" <<tcpc.receive () << endl; tcpc.send ("i am server!\n" ); } } void handleNewConeection (const TcpConnectionPtr & cb) { cout << "connect !!!" << endl; } void handleMessage (const TcpConnectionPtr & cb) { string msg = cb->receive (); cout << ">> client send msg :" << msg << endl; cb->send (msg); } void handleClose (const TcpConnectionPtr & cb) { cout << "close !!!" << endl; } void test () { Acceptor acc ("127.0.0.1" ,8888 ) ; acc.ready (); EventLoop loop (acc) ; loop.setNewConnectionCallBack (handleNewConeection); loop.setMessageCallBack (handleMessage); loop.setCloseCallBack (handleClose); loop.loop (); } int main () { test (); return 0 ; }