ReactorV3V4V5 ReactorV3 ReactorV3其实就是在ReactorV2的版本上进行了一个小封装,增加了一个新的类
这个类就是把TestTcpConnection这个测试文件中的启动服务器的流程进行了封装简化,即改为了这样
1 2 3 4 5 6 7 8 Acceptor acc ("127.0.0.1" ,8888 ) ;acc.ready (); EventLoop loop (acc) ;loop.setNewConnectionCallBack (handleNewConeection); loop.setMessageCallBack (handleMessage); loop.setCloseCallBack (handleClose); loop.loop ();
TcpServer类的具体实现为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 #ifndef TCPSEVER #define TCPSEVER #include "Acceptor.hpp" #include "EventLoop.hpp" class TcpSever { public : TcpSever (const string ip,unsigned short port); ~TcpSever (); void start () ; void stop () ; void setAllCallBack (TcpConnectionCallBack && cb1, TcpConnectionCallBack && cb2, TcpConnectionCallBack && cb3) ;private : Acceptor _accpeptor; EventLoop _loop; }; #endif
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 #include "TcpSever.hpp" TcpSever::TcpSever (const string ip,unsigned short port) :_accpeptor(ip,port) ,_loop(_accpeptor) { } TcpSever::~TcpSever (){} void TcpSever::start () { _accpeptor.ready (); _loop.loop (); } void TcpSever::stop () { _loop.unloop (); } void TcpSever::setAllCallBack (TcpConnectionCallBack && cb1, TcpConnectionCallBack && cb2, TcpConnectionCallBack && cb3) { _loop.setNewConnectionCallBack (move (cb1)); _loop.setMessageCallBack (move (cb2)); _loop.setCloseCallBack (move (cb3)); }
封装完成之后启动服务器只需要这样
1 2 3 4 TcpSever tcp ("127.0.0.1" ,8888 ) ; tcp.setAllCallBack (move (handleNewConeection),move (handleMessage),move (handleClose)); tcp.start ();
没什么好说的
ReactorV4 这个增加了一些新的策略,即将处理msg(任务)的过程交给了线程池中的线程,而Reactor(EventLoop)只负责msg的收发,这样提高了并发度,原理图为
这里面有几个值得探讨的问题,ReactorV2已经实现了通过回调函数来对连接建立,信息处理,连接关闭这三个事件的扩展(回调函数实现多态)即
1 2 3 4 5 6 7 8 9 10 11 12 13 void handleNewConeection (const TcpConnectionPtr & cb) { cout << "connect !!!" << endl; } void handleMessage (const TcpConnectionPtr & cb) { string msg = cb->receive (); cout << ">> client send msg :" << msg << endl; cb->send (msg); } void handleClose (const TcpConnectionPtr & cb) { cout << "close !!!" << endl; }
先不谈关闭连接和新建连接,因为这两个没有涉及到数据的发送,重点需要讨论的是处理消息这个,因为cb->send(msg)
它涉及到了数据的发送,如果引入线程池,那么就会变成这样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class MyTask {public : MyTask (string msg,const TcpConnectionPtr &tcpptr) :_msg(msg) ,_tcpptr(tcpptr) {} void process () { _tcpptr->sendInLoop (_msg); } private : string _msg; TcpConnectionPtr _tcpptr; }; void handleMessage (const TcpConnectionPtr & cb) { string msg = cb->receive (); cout << ">> client send msg :" << msg << endl; MyTask task (msg,cb) ; gpool->addTask (bind (&MyTask::process,task)); }
这样确实也能实现功能,但是,问题是发送数据给客户端成线程池内线程做的事情了,想像一种情况,如果数据很多,都需要发送,那么线程池内的线程反而要排队等待数据一个一个发送了,这样并发度仍然很低,所以解决办法是啥?把发送数据跟处理数据分离,线程池的线程就只处理任务, 而发送数据都交给Reactor,这样Reactor发送他的数据,线程池只处理它的任务,这样并发度就提高了。
但是问题是处理好的数据如何发给Reactor? 发送给Reactor之后,它如何发送给客户端?(Reactor没有数据发送功能,只有TcpConnection有数据发送的功能)
接下来一个一个解决这两个问题,首先是处理好的数据如果发送给Reactor的问题,先看类图
在看代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class MyTask {public : MyTask (string msg,const TcpConnectionPtr &tcpptr) :_msg(msg) ,_tcpptr(tcpptr) {} void process () { _tcpptr->sendInLoop (_msg); } private : string _msg; TcpConnectionPtr _tcpptr; }; void handleMessage (const TcpConnectionPtr & cb) { string msg = cb->receive (); cout << ">> client send msg :" << msg << endl; MyTask task (msg,cb) ; gpool->addTask (bind (&MyTask::process,task)); }
这里当初设计的ReactorV2版本会有一个TcpConnectionPtr,这是当时为了发送数据给客户端设计的,那么在这里它可以发挥什么作用呢,他可以把数据发送给TcpConnection,但是这个和Reactor有什么关系呢?
关系大啦,因为TcpConnection是可以把这个数据发送给Reactor的,只需要TcpConnection直到EventLoop的存在即可,即加一个EventLoop 类型的指针,然后写一个成员函数,将数据发送给Reactor就行了,所以就可以把TcpConnection改进为这样
这里发数据了,那么Reactor那边就需要接受一下,因此就可以EventLoop就多一个runInLoop成员函数,为什么叫这个呢?
因为通过TcpConnection将msg发给Reactor还有一个好处,别忘了他是拥有数据发送的能力的,那么岂不是可以用bind绑定一个可以发送msg的TcpConnection对象给Reactor,然后Reactor调用这个对象发送数据就完成了数据发送,这样第二个问题也随之解决了。
想象很完美,但是现实很骨感,因为忽略的一个问题,EventLoop怎么知道msg处理完了可以发送了?
所以这里就要引入一个新的机制eventfd
这里简单说一下他的作用,他也是一个文件描述符,但是他可以用在进程之间或者线程之间,用于一个线程通知另一个线程信号,这样岂不是在这里刚刚好,MyTask的process的代码的最后一句调用TcpConnection的sendInLoop,把数据发送到EventLoop当中,这个时候岂不是可以通知EventLoop可以发送数据给客户端了吗。
所以要掌握Eventfd的用法,他本质上就是文件描述符,所以他可以放在epoll的红黑树上进行监听,那么EventLoop监听到他就绪之后,就说明可以发送数据了,这时候就发送数据。
Eventfd的原理 :
eventfd系统调用返回的是文件描述符,该文件描述符与以前学习的文件描述符一样,可以读、写、监听。read函数:如果计数器A的值不为0时,读取成功,获得到该值;如果A的值为0,非阻塞模式时,会直
接返回失败,并把error置为EINVAL;如果为阻塞模式,一直会阻塞到A为非0为止。
write函数:将缓冲区写入的8字节整形值加到内核计数器上,即会增加8字节的整数在计数器A上,如果
其值达到0xfffffffffffffffe时,就会阻塞(在阻塞模式下),直到A的值被read。
select/poll/epoll:支持被io多路复用监听。当内核计数器的值发生变化时,就会触发事件。
通过对eventfd函数返回的文件描述符进行通信。一个进程或者线程A执行read操作,如果内核计数器的
值为0,并且是阻塞模式,那么A就会阻塞;另外一个进程或者线程B执行write操作,就会向内核计数器
写,那么阻塞的A发现内核计数器的值不为0,就会被触发,那么两个进程或者线程A与B就达到通信的
目的了。
在C语言中的基本用法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 #include <sys/eventfd.h> #include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <stdint.h> #define handle_error(msg) \ do { perror(msg); exit(EXIT_FAILURE); } while (0) int main (int argc, char *argv[]) { int efd, j; uint64_t u; ssize_t s; if (argc < 2 ) { fprintf (stderr, "Usage: %s <num>...\n" , argv[0 ]); exit (EXIT_FAILURE); } efd = eventfd (10 , 0 ); if (efd == -1 ) { handle_error ("eventfd" ); } switch (fork()) { case 0 : for (j = 1 ; j < argc; j++) { printf ("Child writing %s to efd\n" , argv[j]); u = strtoull (argv[j], NULL , 0 ); s = write (efd, &u, sizeof (uint64_t )); if (s != sizeof (uint64_t )) { handle_error ("write" ); } sleep (1 ); } printf ("Child completed write loop\n" ); exit (EXIT_SUCCESS); default : sleep (2 ); for (int idx = 2 ; idx < argc; ++idx) { printf ("Parent about to read\n" ); s = read (efd, &u, sizeof (uint64_t )); if (s != sizeof (uint64_t )) { handle_error ("read" ); } printf ("Parent read %llu (0x%llx) from efd\n" , (unsigned long long ) u, (unsigned long long ) u); sleep (1 ); } exit (EXIT_SUCCESS); case -1 : handle_error ("fork" ); } }
这里因为EventFd也是一个文件描述符,所以有一个大胆的想法,那把他放在epoll上监听岂不美哉,这样的话,就只需要把上面这段代码柔和在EventLoop当中就可以了,所以类似于epollfd,也需要一个createEventFd,并且监听就绪之后还需要一个read函数,又因为需要通知eventloop读就绪了,那么还需要一个wakeup来唤醒EventLoop,那读就绪之后也把内核的数据移除之后就该办正事了,即把线程处理好的数据发送给客户端,所以还需要一个doTask成员函数。
大概想法是这样,接下来需要详细设计,因为线程池有很多线程,所以任务肯定也不止一个,所以用一个变量来接受任务肯定不行,所以需要用一个容器来放处理完成后需要发送给客户端的任务,又因为这个容器主线程,线程池中的线程都要读写,所以他是一个互斥资源,所以还需要一把锁,这样就可以把EventLoop设计完即
注意到多了一个Functor,这就上面说的TcpConnection用bind把可以发送的TcpConnection对象和要发送的数据msg这两个绑定后发给EventLoop的类型,接下来实现这两个类
TcpConnection
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 #ifndef TCPCONNECTION #define TCPCONNECTION #include <string> #include <memory> #include <functional> #include "SocketIO.hpp" using std::string;using namespace std;class EventLoop ;class TcpConnection :public enable_shared_from_this<TcpConnection> { using TcpConnectionPtr = shared_ptr<TcpConnection>; using TcpConnectionCallBack = function<void (const TcpConnectionPtr &)>; public : explicit TcpConnection (int fd,EventLoop *loop) ; ~TcpConnection () {} string receive () ; void send (const string & msg) ; public : void setNewConnectionCallBack (const TcpConnectionCallBack & cb) ; void setMessageCallBack (const TcpConnectionCallBack & cb) ; void setCloseCallBack (const TcpConnectionCallBack & cb) ; void handleNewConeectionCallBack () ; void handleMessageCallBack () ; void handleCloseCallBack () ; bool isclose (int fd) const ; void sendInLoop (const string & msg) ; private : SocketIO _socketIO ; TcpConnectionCallBack _onNewConnectionCB; TcpConnectionCallBack _onMessageCB; TcpConnectionCallBack _onCloseCB; EventLoop *_loop; }; #endif
实现文件就多了这么一个函数
1 2 3 4 void TcpConnection::sendInLoop (const string & msg) { _loop->runInLoop (bind (&TcpConnection::send,this ,msg)); }
EventLoop
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 #ifndef EVENTLOOP #define EVENTLOOP #include <my_header.h> #include <vector> #include <map> #include <memory> #include <functional> #include <mutex> using namespace std;class TcpConnection ;class Acceptor ;using TcpConnectionPtr = shared_ptr<TcpConnection>;using TcpConnectionCallBack = function<void (const TcpConnectionPtr &)>;using Functor = function<void ()>;class EventLoop { public : EventLoop (Acceptor & acceptor ) ; ~EventLoop (); void loop () ; void unloop () ; private : void waitEpollFd () ; void handleNewConeection () ; void handMessage (int fd) ; int createEpollFd () ; void addEpollReadFd (int fd) ; void delEpollReadFd (int fd) ; public : void setNewConnectionCallBack (TcpConnectionCallBack && cb) ; void setMessageCallBack (TcpConnectionCallBack && cb) ; void setCloseCallBack (TcpConnectionCallBack && cb) ; int createEventFd () ; void handleRead () ; void wakeup () ; void doPendingFunctors () ; void runInLoop (Functor && cb) ; private : int _epfd; vector<struct epoll_event> _evtList; bool _isLooping; Acceptor & _acceptor; map<int ,TcpConnectionPtr> _conns; TcpConnectionCallBack _onNewConnectionCB; TcpConnectionCallBack _onMessageCB; TcpConnectionCallBack _onCloseCB; vector<Functor> _pengdings; mutex _mutex; int _evtfd; }; #endif
实现文件要把eventfd也加入epoll 的监听当中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 EventLoop::EventLoop (Acceptor & acceptor ) :_epfd(createEpollFd ()) ,_evtList(1024 ) ,_isLooping(false ) ,_acceptor(acceptor) ,_pengdings() ,_mutex() ,_evtfd(createEventFd ()) { int listenFd = _acceptor.fd (); addEpollReadFd (listenFd); addEpollReadFd (_evtfd); }
析构的时候需要关闭的文件描述符也多一个
1 2 3 4 EventLoop::~EventLoop (){ close (_epfd); close (_evtfd); }
监听到子线程发通知了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 void EventLoop::waitEpollFd () { while (_isLooping){ int nready = epoll_wait (_epfd,&*_evtList.begin (),_evtList.size (),3000 ); if (nready == -1 && errno == EPOLLIN){ continue ; }else if (nready == -1 ){ perror ("nready == -1" ); delEpollReadFd (_epfd); return ; }else if (0 == nready){ cout << ">> epoll_wait timeout!" << endl; }else { if (nready == (int )_evtList.size ()){ _evtList.resize (2 *_evtList.size ()); } for (int i = 0 ; i < nready; ++i){ int fd = _evtList[i].data.fd; if (fd == _acceptor.fd ()){ handleNewConeection (); }else if (fd == _evtfd){ handleRead (); doPendingFunctors (); } else { handMessage (fd); } } } } }
然后是与eventfd相关的几个函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 int EventLoop::createEventFd () { int fd = eventfd (0 ,0 ); if (fd < 0 ){ perror ("eventfd error" ); return -1 ; } return fd; } void EventLoop::handleRead () { uint64_t one = 1 ; ssize_t ret = read (_evtfd,&one,sizeof (uint64_t )); if (ret != sizeof (uint64_t )){ perror ("handleRead error" ); return ; } } void EventLoop::wakeup () { uint64_t one = 1 ; ssize_t ret = write (_evtfd,&one,sizeof (uint64_t )); if (ret != sizeof (uint64_t )){ perror ("handleRead error" ); return ; } } void EventLoop::doPendingFunctors () { vector<Functor> tmp; unique_lock<mutex> ul (_mutex) ; tmp.swap (_pengdings); ul.unlock (); for (auto & mission:tmp){ mission (); } } void EventLoop::runInLoop (Functor && cb) { unique_lock<mutex> ul (_mutex) ; _pengdings.push_back (move (cb)); ul.unlock (); wakeup (); }
ReactorV5 这个也没实现啥新功能,只是又进行了进一步的封装,即把
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 #include <iostream> #include "TcpConnection.hpp" #include "Acceptor.hpp" #include "EventLoop.hpp" #include <memory> #include "ThreadPool.hpp" #include "TcpSever.hpp" ThreadPool *gpool; class MyTask {public : MyTask (string msg,const TcpConnectionPtr &tcpptr) :_msg(msg) ,_tcpptr(tcpptr) {} void process () { _tcpptr->sendInLoop (_msg); } private : string _msg; TcpConnectionPtr _tcpptr; }; void handleNewConeection (const TcpConnectionPtr & cb) { cout << "connect !!!" << endl; } void handleMessage (const TcpConnectionPtr & cb) { string msg = cb->receive (); cout << ">> client send msg :" << msg << endl; MyTask task (msg,cb) ; gpool->addTask (bind (&MyTask::process,task)); } void handleClose (const TcpConnectionPtr & cb) { cout << "close !!!" << endl; } void test () { ThreadPool pool (4 ,10 ) ; gpool = &pool; gpool->start (); TcpSever tcp ("127.0.0.1" ,8888 ) ; tcp.setAllCallBack (move (handleNewConeection),move (handleMessage),move (handleClose)); tcp.start (); } int main () { test (); return 0 ; }
这是ReactorV4的测试用例,但是有个问题是他用到了一个全局变量,如何把这个全局变量消掉,那就是用一个类把这整个过程封装起来,让这个gloop成为一个成员变量就可以了,所以再封装
类图
实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 #ifndef ECHOSERVER #define ECHOSERVER #include "TcpSever.hpp" #include "ThreadPool.hpp" class EchoServer { public : EchoServer (size_t ThreadNum,size_t queSize,const string & ip,unsigned short port); ~EchoServer (); void start () ; void stop () ; void onNewConnection (const TcpConnectionPtr & cons) ; void onMessage (const TcpConnectionPtr & cons) ; void onClose (const TcpConnectionPtr & conns) ; private : ThreadPool _pool; TcpSever _server; }; #endif
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 #include "EchoServer.hpp" #include "TcpConnection.hpp" class MyTask {public : MyTask (string msg,const TcpConnectionPtr &tcpptr) :_msg(msg) ,_tcpptr(tcpptr) {} void process () { _tcpptr->sendInLoop (_msg); } private : string _msg; TcpConnectionPtr _tcpptr; }; EchoServer::EchoServer (size_t ThreadNum,size_t queSize,const string & ip,unsigned short port) :_pool(ThreadNum,queSize) ,_server(ip,port) {} EchoServer::~EchoServer (){} void EchoServer::start () { _pool.start (); _server.setAllCallBack (bind (&EchoServer::onNewConnection,this ,placeholders::_1), bind (&EchoServer::onMessage,this ,placeholders::_1), bind (&EchoServer::onClose,this ,placeholders::_1)); _server.start (); } void EchoServer::stop () { _pool.stop (); _server.stop (); } void EchoServer::onNewConnection (const TcpConnectionPtr & cons) { cout <<"connect!!" << endl; } void EchoServer::onMessage (const TcpConnectionPtr & cons) { string msg = cons->receive (); cout << ">> client send msg :" << msg << endl; MyTask task (msg,cons) ; _pool.addTask (bind (&MyTask::process,task)); } void EchoServer::onClose (const TcpConnectionPtr & conns) { cout << "close!!" << endl; }
最后的测试用例就可以简化为
1 2 3 4 5 6 7 8 9 10 #include <iostream> #include "EchoServer.hpp" int main () { EchoServer ec (4 ,10 ,"127.0.0.1" ,8888 ) ; ec.start (); return 0 ; }