ReactorV3V4V5

ReactorV3

ReactorV3其实就是在ReactorV2的版本上进行了一个小封装,增加了一个新的类
pkhHdMR.png

这个类就是把TestTcpConnection这个测试文件中的启动服务器的流程进行了封装简化,即改为了这样

1
2
3
4
5
6
7
8
Acceptor acc("127.0.0.1",8888);
acc.ready();
EventLoop loop(acc);
loop.setNewConnectionCallBack(handleNewConeection);
loop.setMessageCallBack(handleMessage);
loop.setCloseCallBack(handleClose);
loop.loop();

TcpServer类的具体实现为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#ifndef TCPSEVER
#define TCPSEVER
#include "Acceptor.hpp"
#include "EventLoop.hpp"

class TcpSever
{
public:
TcpSever(const string ip,unsigned short port);
~TcpSever();

void start();
void stop();

void setAllCallBack(TcpConnectionCallBack && cb1,
TcpConnectionCallBack && cb2,
TcpConnectionCallBack && cb3);

private:
Acceptor _accpeptor;
EventLoop _loop;
};

#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include "TcpSever.hpp"

TcpSever::TcpSever(const string ip,unsigned short port)
:_accpeptor(ip,port)
,_loop(_accpeptor)
{


}
TcpSever::~TcpSever(){}

void TcpSever::start(){
_accpeptor.ready();
_loop.loop();
}

void TcpSever::stop(){
_loop.unloop();
}

void TcpSever::setAllCallBack(TcpConnectionCallBack && cb1,
TcpConnectionCallBack && cb2,
TcpConnectionCallBack && cb3){
_loop.setNewConnectionCallBack(move(cb1));
_loop.setMessageCallBack(move(cb2));
_loop.setCloseCallBack(move(cb3));

}

封装完成之后启动服务器只需要这样

1
2
3
4
TcpSever tcp("127.0.0.1",8888);
tcp.setAllCallBack(move(handleNewConeection),move(handleMessage),move(handleClose));

tcp.start();

没什么好说的

ReactorV4

这个增加了一些新的策略,即将处理msg(任务)的过程交给了线程池中的线程,而Reactor(EventLoop)只负责msg的收发,这样提高了并发度,原理图为

pkhHrdK.png

这里面有几个值得探讨的问题,ReactorV2已经实现了通过回调函数来对连接建立,信息处理,连接关闭这三个事件的扩展(回调函数实现多态)即

1
2
3
4
5
6
7
8
9
10
11
12
13
void handleNewConeection(const TcpConnectionPtr & cb){
cout << "connect !!!" << endl;
}

void handleMessage(const TcpConnectionPtr & cb){
string msg = cb->receive();
cout << ">> client send msg :" << msg << endl;
cb->send(msg);
}

void handleClose(const TcpConnectionPtr & cb){
cout << "close !!!" << endl;
}

先不谈关闭连接和新建连接,因为这两个没有涉及到数据的发送,重点需要讨论的是处理消息这个,因为cb->send(msg)它涉及到了数据的发送,如果引入线程池,那么就会变成这样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class MyTask{
public:
MyTask(string msg,const TcpConnectionPtr &tcpptr)
:_msg(msg)
,_tcpptr(tcpptr)
{}

void process(){
_tcpptr->sendInLoop(_msg);
}
private:
string _msg;
TcpConnectionPtr _tcpptr;
};
void handleMessage(const TcpConnectionPtr & cb){
string msg = cb->receive();
cout << ">> client send msg :" << msg << endl;
MyTask task(msg,cb);
gpool->addTask(bind(&MyTask::process,task));

}

这样确实也能实现功能,但是,问题是发送数据给客户端成线程池内线程做的事情了,想像一种情况,如果数据很多,都需要发送,那么线程池内的线程反而要排队等待数据一个一个发送了,这样并发度仍然很低,所以解决办法是啥?把发送数据跟处理数据分离,线程池的线程就只处理任务, 而发送数据都交给Reactor,这样Reactor发送他的数据,线程池只处理它的任务,这样并发度就提高了。

但是问题是处理好的数据如何发给Reactor?
发送给Reactor之后,它如何发送给客户端?(Reactor没有数据发送功能,只有TcpConnection有数据发送的功能)

接下来一个一个解决这两个问题,首先是处理好的数据如果发送给Reactor的问题,先看类图

pkhHsIO.png

在看代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class MyTask{
public:
MyTask(string msg,const TcpConnectionPtr &tcpptr)
:_msg(msg)
,_tcpptr(tcpptr)
{}

void process(){
_tcpptr->sendInLoop(_msg);
}
private:
string _msg;
TcpConnectionPtr _tcpptr;
};
void handleMessage(const TcpConnectionPtr & cb){
string msg = cb->receive();
cout << ">> client send msg :" << msg << endl;
MyTask task(msg,cb);
gpool->addTask(bind(&MyTask::process,task));

}

这里当初设计的ReactorV2版本会有一个TcpConnectionPtr,这是当时为了发送数据给客户端设计的,那么在这里它可以发挥什么作用呢,他可以把数据发送给TcpConnection,但是这个和Reactor有什么关系呢?

关系大啦,因为TcpConnection是可以把这个数据发送给Reactor的,只需要TcpConnection直到EventLoop的存在即可,即加一个EventLoop 类型的指针,然后写一个成员函数,将数据发送给Reactor就行了,所以就可以把TcpConnection改进为这样
pkhHws1.png

这里发数据了,那么Reactor那边就需要接受一下,因此就可以EventLoop就多一个runInLoop成员函数,为什么叫这个呢?

因为通过TcpConnection将msg发给Reactor还有一个好处,别忘了他是拥有数据发送的能力的,那么岂不是可以用bind绑定一个可以发送msg的TcpConnection对象给Reactor,然后Reactor调用这个对象发送数据就完成了数据发送,这样第二个问题也随之解决了。

想象很完美,但是现实很骨感,因为忽略的一个问题,EventLoop怎么知道msg处理完了可以发送了?

所以这里就要引入一个新的机制eventfd

这里简单说一下他的作用,他也是一个文件描述符,但是他可以用在进程之间或者线程之间,用于一个线程通知另一个线程信号,这样岂不是在这里刚刚好,MyTask的process的代码的最后一句调用TcpConnection的sendInLoop,把数据发送到EventLoop当中,这个时候岂不是可以通知EventLoop可以发送数据给客户端了吗。

所以要掌握Eventfd的用法,他本质上就是文件描述符,所以他可以放在epoll的红黑树上进行监听,那么EventLoop监听到他就绪之后,就说明可以发送数据了,这时候就发送数据。

Eventfd的原理

eventfd系统调用返回的是文件描述符,该文件描述符与以前学习的文件描述符一样,可以读、写、监听。read函数:如果计数器A的值不为0时,读取成功,获得到该值;如果A的值为0,非阻塞模式时,会直

接返回失败,并把error置为EINVAL;如果为阻塞模式,一直会阻塞到A为非0为止。

write函数:将缓冲区写入的8字节整形值加到内核计数器上,即会增加8字节的整数在计数器A上,如果

其值达到0xfffffffffffffffe时,就会阻塞(在阻塞模式下),直到A的值被read。

select/poll/epoll:支持被io多路复用监听。当内核计数器的值发生变化时,就会触发事件。

通过对eventfd函数返回的文件描述符进行通信。一个进程或者线程A执行read操作,如果内核计数器的

值为0,并且是阻塞模式,那么A就会阻塞;另外一个进程或者线程B执行write操作,就会向内核计数器

写,那么阻塞的A发现内核计数器的值不为0,就会被触发,那么两个进程或者线程A与B就达到通信的

目的了。

在C语言中的基本用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#include <sys/eventfd.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdint.h> /* Definition of uint64_t */

#define handle_error(msg) \
do { perror(msg); exit(EXIT_FAILURE); } while (0)

int main(int argc, char *argv[])
{
int efd, j;
uint64_t u;
ssize_t s;

if (argc < 2)
{
fprintf(stderr, "Usage: %s <num>...\n", argv[0]);
exit(EXIT_FAILURE);
}

efd = eventfd(10, 0);//eventfd的第一个参数代表的是内核上的计数器
if (efd == -1)
{
handle_error("eventfd");
}

switch (fork())
{
case 0:
//case 0部分是子线程的执行流
for (j = 1; j < argc; j++)
{
//打印命令行参数的值
printf("Child writing %s to efd\n", argv[j]);
//将命令行参数从字符串转换为整型
u = strtoull(argv[j], NULL, 0);/* strtoull() allows various bases */

//write可以写多次, 每执行一次,就会执行一次加法
s = write(efd, &u, sizeof(uint64_t));
if (s != sizeof(uint64_t))
{
handle_error("write");
}

sleep(1);
}
printf("Child completed write loop\n");

exit(EXIT_SUCCESS);

default:
//父线程的一个执行流
sleep(2);

for(int idx = 2; idx < argc; ++idx)
{
printf("Parent about to read\n");
//read操作会将计数器的值清0
s = read(efd, &u, sizeof(uint64_t));
if (s != sizeof(uint64_t))
{
handle_error("read");
}

//将从内核计数器读到的值以不同进制的形式打印出来
printf("Parent read %llu (0x%llx) from efd\n",
(unsigned long long) u, (unsigned long long) u);
sleep(1);
}
exit(EXIT_SUCCESS);

case -1:
handle_error("fork");
}
}

这里因为EventFd也是一个文件描述符,所以有一个大胆的想法,那把他放在epoll上监听岂不美哉,这样的话,就只需要把上面这段代码柔和在EventLoop当中就可以了,所以类似于epollfd,也需要一个createEventFd,并且监听就绪之后还需要一个read函数,又因为需要通知eventloop读就绪了,那么还需要一个wakeup来唤醒EventLoop,那读就绪之后也把内核的数据移除之后就该办正事了,即把线程处理好的数据发送给客户端,所以还需要一个doTask成员函数。

大概想法是这样,接下来需要详细设计,因为线程池有很多线程,所以任务肯定也不止一个,所以用一个变量来接受任务肯定不行,所以需要用一个容器来放处理完成后需要发送给客户端的任务,又因为这个容器主线程,线程池中的线程都要读写,所以他是一个互斥资源,所以还需要一把锁,这样就可以把EventLoop设计完即

pkhHDZ6.png

注意到多了一个Functor,这就上面说的TcpConnection用bind把可以发送的TcpConnection对象和要发送的数据msg这两个绑定后发给EventLoop的类型,接下来实现这两个类

TcpConnection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#ifndef TCPCONNECTION
#define TCPCONNECTION
#include <string>
#include <memory>
#include <functional>
#include "SocketIO.hpp"
using std::string;
using namespace std;
class EventLoop;

class TcpConnection
:public enable_shared_from_this<TcpConnection>
{
using TcpConnectionPtr = shared_ptr<TcpConnection>;
using TcpConnectionCallBack = function<void(const TcpConnectionPtr &)>;

public:
explicit TcpConnection(int fd,EventLoop *loop);
~TcpConnection() {}

string receive();
void send(const string & msg);
public:
void setNewConnectionCallBack(const TcpConnectionCallBack & cb);

void setMessageCallBack(const TcpConnectionCallBack & cb);

void setCloseCallBack(const TcpConnectionCallBack & cb);

void handleNewConeectionCallBack();

void handleMessageCallBack();

void handleCloseCallBack();

bool isclose(int fd)const ;

void sendInLoop(const string & msg);
private:
SocketIO _socketIO ;
TcpConnectionCallBack _onNewConnectionCB;
TcpConnectionCallBack _onMessageCB;
TcpConnectionCallBack _onCloseCB;

EventLoop *_loop;
};


#endif

实现文件就多了这么一个函数

1
2
3
4
void TcpConnection::sendInLoop(const string & msg){
//将有发送能力的tcpConnection的对象和msg发给reactor/EventLoop 然后那边唤醒发送就完事了
_loop->runInLoop(bind(&TcpConnection::send,this,msg));
}

EventLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#ifndef EVENTLOOP
#define EVENTLOOP
#include <my_header.h>
#include <vector>
#include <map>
#include <memory>
#include <functional>
#include <mutex>
using namespace std;
class TcpConnection;
class Acceptor;

using TcpConnectionPtr = shared_ptr<TcpConnection>;
using TcpConnectionCallBack = function<void(const TcpConnectionPtr &)>;
using Functor = function<void()>;

class EventLoop
{
public:
EventLoop(Acceptor & acceptor ) ;
~EventLoop();

void loop();
void unloop();

private:

void waitEpollFd();

void handleNewConeection();

void handMessage(int fd);

int createEpollFd();

void addEpollReadFd(int fd);

void delEpollReadFd(int fd);

public:
void setNewConnectionCallBack(TcpConnectionCallBack && cb);

void setMessageCallBack(TcpConnectionCallBack && cb);

void setCloseCallBack(TcpConnectionCallBack && cb);

int createEventFd();

void handleRead();

void wakeup();

void doPendingFunctors();

void runInLoop(Functor && cb);
private:
int _epfd;
vector<struct epoll_event> _evtList;
bool _isLooping;
Acceptor & _acceptor;
map<int,TcpConnectionPtr> _conns;
TcpConnectionCallBack _onNewConnectionCB;
TcpConnectionCallBack _onMessageCB;
TcpConnectionCallBack _onCloseCB;

vector<Functor> _pengdings;
mutex _mutex;
int _evtfd;

};


#endif

实现文件要把eventfd也加入epoll 的监听当中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
EventLoop::EventLoop(Acceptor & acceptor )
:_epfd(createEpollFd())
,_evtList(1024)
,_isLooping(false)
,_acceptor(acceptor)
,_pengdings()
,_mutex()
,_evtfd(createEventFd())
{
//将listenFd加入红黑树监听
int listenFd = _acceptor.fd();
addEpollReadFd(listenFd);
//将evtfd也加入红黑树监听
addEpollReadFd(_evtfd);
}

析构的时候需要关闭的文件描述符也多一个

1
2
3
4
EventLoop::~EventLoop(){
close(_epfd);
close(_evtfd);
}

监听到子线程发通知了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void EventLoop::waitEpollFd(){
while(_isLooping){
int nready = epoll_wait(_epfd,&*_evtList.begin(),_evtList.size(),3000);
if(nready == -1 && errno == EPOLLIN){
continue;
}else if(nready == -1){
perror("nready == -1");
delEpollReadFd(_epfd);
return;
}else if(0 == nready){
cout << ">> epoll_wait timeout!" << endl;
}else{
//有可能容量不够 需要手动扩容
if(nready == (int)_evtList.size()){
_evtList.resize(2*_evtList.size());
}
//有就绪连接
for(int i = 0; i < nready; ++i){
int fd = _evtList[i].data.fd;
if(fd == _acceptor.fd()){
//有新连接到来
handleNewConeection();
}else if(fd == _evtfd){
//子线程处理完任务了,该发送给客户端了
handleRead();

doPendingFunctors();
}
else{
//旧连接有消息
handMessage(fd);
}
}//end for
}//end else
}//end while
}

然后是与eventfd相关的几个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
int EventLoop::createEventFd(){
int fd = eventfd(0,0);
if(fd < 0){
perror("eventfd error");
return -1;
}
return fd;
}

void EventLoop::handleRead(){
uint64_t one = 1;
ssize_t ret = read(_evtfd,&one,sizeof(uint64_t));
if(ret != sizeof(uint64_t)){
perror("handleRead error");
return;
}
}

void EventLoop::wakeup(){
uint64_t one = 1;
ssize_t ret = write(_evtfd,&one,sizeof(uint64_t));
if(ret != sizeof(uint64_t)){
perror("handleRead error");
return;
}

}

void EventLoop::doPendingFunctors(){
//为了提高并发度,可以在这里新建一个临时数组,将pending中的任务复制到这个
//临时数组里面,然后这样线程池就可以往pending中加任务的同时,主线程还可以给客户端
//发数据了
//pending是临界资源,访问要加锁
vector<Functor> tmp;
unique_lock<mutex> ul(_mutex);
tmp.swap(_pengdings);
ul.unlock();
//接下来主线程执行任务就行了
for(auto & mission:tmp){
mission();
}
}

void EventLoop::runInLoop(Functor && cb){
unique_lock<mutex> ul(_mutex);
//TcpConnection将msg和发送的tcpConnection对象发过来了
_pengdings.push_back(move(cb));
ul.unlock();
//有任务了可以唤醒reactor/EventLoop来去发送数据了
wakeup();
}

ReactorV5

这个也没实现啥新功能,只是又进行了进一步的封装,即把

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#include <iostream>
#include "TcpConnection.hpp"
#include "Acceptor.hpp"
#include "EventLoop.hpp"
#include <memory>
#include "ThreadPool.hpp"
#include "TcpSever.hpp"

ThreadPool *gpool;

class MyTask{
public:
MyTask(string msg,const TcpConnectionPtr &tcpptr)
:_msg(msg)
,_tcpptr(tcpptr)
{}

void process(){
_tcpptr->sendInLoop(_msg);
}
private:
string _msg;
TcpConnectionPtr _tcpptr;
};

void handleNewConeection(const TcpConnectionPtr & cb){
cout << "connect !!!" << endl;
}

void handleMessage(const TcpConnectionPtr & cb){
string msg = cb->receive();
cout << ">> client send msg :" << msg << endl;
MyTask task(msg,cb);
gpool->addTask(bind(&MyTask::process,task));

}

void handleClose(const TcpConnectionPtr & cb){
cout << "close !!!" << endl;
}


void test(){
//新建和启动线程池
ThreadPool pool(4,10);

gpool = &pool;
gpool->start();

TcpSever tcp("127.0.0.1",8888);
tcp.setAllCallBack(move(handleNewConeection),move(handleMessage),move(handleClose));

tcp.start();

}


int main()
{
test();
return 0;
}

这是ReactorV4的测试用例,但是有个问题是他用到了一个全局变量,如何把这个全局变量消掉,那就是用一个类把这整个过程封装起来,让这个gloop成为一个成员变量就可以了,所以再封装

类图

pkhH0qx.png

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#ifndef ECHOSERVER
#define ECHOSERVER
#include "TcpSever.hpp"
#include "ThreadPool.hpp"

class EchoServer
{
public:
EchoServer(size_t ThreadNum,size_t queSize,const string & ip,unsigned short port);
~EchoServer();

void start();

void stop();

void onNewConnection(const TcpConnectionPtr & cons);

void onMessage(const TcpConnectionPtr & cons);

void onClose(const TcpConnectionPtr & conns);

private:
ThreadPool _pool;
TcpSever _server;

};


#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include "EchoServer.hpp"
#include "TcpConnection.hpp"
class MyTask{
public:
MyTask(string msg,const TcpConnectionPtr &tcpptr)
:_msg(msg)
,_tcpptr(tcpptr)
{}

void process(){
_tcpptr->sendInLoop(_msg);
}
private:
string _msg;
TcpConnectionPtr _tcpptr;
};

EchoServer::EchoServer(size_t ThreadNum,size_t queSize,const string & ip,unsigned short port)
:_pool(ThreadNum,queSize)
,_server(ip,port)
{}

EchoServer::~EchoServer(){}

void EchoServer::start(){
_pool.start();
//这里用一个占位符,这样setAllCallBack的阐述类型就匹配了
_server.setAllCallBack(bind(&EchoServer::onNewConnection,this,placeholders::_1),
bind(&EchoServer::onMessage,this,placeholders::_1),
bind(&EchoServer::onClose,this,placeholders::_1));
_server.start();

}

void EchoServer::stop(){
_pool.stop();
_server.stop();
}

void EchoServer::onNewConnection(const TcpConnectionPtr & cons){
cout <<"connect!!" << endl;
}

void EchoServer::onMessage(const TcpConnectionPtr & cons){
string msg = cons->receive();
cout << ">> client send msg :" << msg << endl;
MyTask task(msg,cons);
_pool.addTask(bind(&MyTask::process,task));

}

void EchoServer::onClose(const TcpConnectionPtr & conns){
cout << "close!!" << endl;
}

最后的测试用例就可以简化为

1
2
3
4
5
6
7
8
9
10
#include <iostream>
#include "EchoServer.hpp"


int main()
{
EchoServer ec(4,10,"127.0.0.1",8888);
ec.start();
return 0;
}