ReactorV2

与V1的区别

在Reactor中增加了epoll监听,然后采用了函数回调机制,让处理新连接,处理新消息已经处理断开更加的灵活

类图

首先是不增加回调的类图

不增加回调会有一些问题,例如客户端断开连接,服务端就会一直就绪,并且如果处理信息那部分要实现别的功能的话,就需要修改源码,所以采用回调函数的可扩展性更强

增加回调的类图

代码实现

Socket.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#ifndef SOCKET
#define SOCKET
class Socket
{
public:
Socket() ;
explicit Socket(int fd);
~Socket() ;
int getFd();

Socket(const Socket & rhs) = delete ;
Socket & operator=(const Socket & rhs) = delete ;

private:
int _fd;
};

#endif

Socket.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include "Socket.hpp"
#include <my_header.h>

Socket::Socket(){
_fd = socket(AF_INET,SOCK_STREAM,0);
if(_fd < 0){
perror("_fd error");
return;
}
}


Socket::Socket(int fd)
:_fd(fd)
{}

Socket::~Socket() {
close(_fd);
}

int Socket::getFd(){
return _fd;
}

InetAddress.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#ifndef INETADDRESS
#define INETADDRESS
#include <string>
#include <my_header.h>
using namespace std;

class InetAddress
{
public:
InetAddress(const string & ip,unsigned short port);
InetAddress(const struct sockaddr_in &);
~InetAddress();
string IP();
unsigned short port();

struct sockaddr_in * getInetAddress();


private:
struct sockaddr_in _addr;
};

#endif

InetAddress.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include "InetAddress.hpp"

InetAddress::InetAddress(const string & ip,unsigned short port)
{
memset(&_addr,0,sizeof(_addr));
_addr.sin_family = AF_INET;
_addr.sin_port = htons(port);
_addr.sin_addr.s_addr = inet_addr(ip.c_str());
}


InetAddress::InetAddress(const struct sockaddr_in & addr)
:_addr(addr)
{}


InetAddress::~InetAddress(){}


string InetAddress::IP(){
return string(inet_ntoa(_addr.sin_addr));
}


unsigned short InetAddress::port(){
return ntohs(_addr.sin_port);
}


struct sockaddr_in * InetAddress::getInetAddress(){
return &_addr;
}

Acceptor.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#ifndef ACCEPTOR
#define ACCEPTOR
#include "InetAddress.hpp"
#include "Socket.hpp"


class Acceptor
{
public:
Acceptor(const string &ip, unsigned short port);
~Acceptor() {}

int accept();
void ready();

int fd();
private:
void setReuseAddr();
void setReusePort();

void bind();
void listen();



private:
Socket _sock;
InetAddress _addr;
};

#endif

Acceptor.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include "Acceptor.hpp"

Acceptor::Acceptor(const string &ip, unsigned short port)
:_sock()
,_addr(ip,port)
{}

void Acceptor::setReuseAddr(){
int opt = 1;
int ret = setsockopt(_sock.getFd(),SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
if(-1 == ret){
perror("setsockopt ip error");
return;
}
}


void Acceptor::setReusePort(){
int opt = 1;
int ret = setsockopt(_sock.getFd(),SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
if(-1 == ret){
perror("setsockopt ip error");
return;
}
}

void Acceptor::bind(){
int ret = ::bind(_sock.getFd(),(struct sockaddr *)_addr.getInetAddress(),sizeof(struct sockaddr));
if(-1 == ret){
perror("bind error");
return;
}
}

void Acceptor::listen(){
int ret = ::listen(_sock.getFd(),128);
if(-1 == ret){
perror("listen error");
return;
}
}

void Acceptor::ready(){
setReuseAddr();
setReusePort();
Acceptor::bind();
Acceptor::listen();
}

int Acceptor::accept(){
//接受客户端发来的连接请求
int connetFd = ::accept(_sock.getFd(),nullptr,nullptr);
if(-1 == connetFd){
perror("accept error");
return -1;
}
return connetFd;
}

int Acceptor::fd(){
//返回listenFd
return _sock.getFd();
}

TcpConnection.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#ifndef TCPCONNECTION
#define TCPCONNECTION
#include <string>
#include <memory>
#include <functional>
#include "SocketIO.hpp"
using std::string;
using namespace std;


class TcpConnection
:public enable_shared_from_this<TcpConnection>
{
using TcpConnectionPtr = shared_ptr<TcpConnection>;
using TcpConnectionCallBack = function<void(const TcpConnectionPtr &)>;

public:
explicit TcpConnection(int fd);
~TcpConnection() {}

string receive();
void send(const string & msg);
public:
void setNewConnectionCallBack(const TcpConnectionCallBack & cb);

void setMessageCallBack(const TcpConnectionCallBack & cb);

void setCloseCallBack(const TcpConnectionCallBack & cb);

void handleNewConeectionCallBack();

void handleMessageCallBack();

void handleCloseCallBack();

bool isclose(int fd)const ;
private:
SocketIO _socketIO ;
TcpConnectionCallBack _onNewConnectionCB;
TcpConnectionCallBack _onMessageCB;
TcpConnectionCallBack _onCloseCB;

};


#endif

TcpConnection.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include "TcpConnection.hpp"
#include <iostream>
#include <my_header.h>
TcpConnection::TcpConnection(int fd)
:_socketIO(fd)
{}

string TcpConnection::receive(){
char buf[65535] = {0};
_socketIO.readLine(buf,sizeof(buf));
return string(buf);
}

void TcpConnection::send(const string & msg){
_socketIO.writen(msg.c_str(),msg.size());//这里第三个参数用sizeof会出现乱码
//因为sizeof会多统计一个字符

}
void TcpConnection::setNewConnectionCallBack(const TcpConnectionCallBack & cb){
_onNewConnectionCB = cb;
}

void TcpConnection::setMessageCallBack(const TcpConnectionCallBack & cb){
_onMessageCB = cb;
}

void TcpConnection::setCloseCallBack(const TcpConnectionCallBack & cb){
_onCloseCB = cb;
}

void TcpConnection::handleNewConeectionCallBack(){
if(_onNewConnectionCB == nullptr){
cout << "_onNewConnectionCB is nullptr" << endl;
}else{
_onNewConnectionCB(shared_from_this());
}
}

void TcpConnection::handleMessageCallBack(){
if(nullptr == _onMessageCB){
cout << "_onMessageCB is nullptr" << endl;
}else{
_onMessageCB(shared_from_this());
}
}

void TcpConnection::handleCloseCallBack(){
if(nullptr == _onCloseCB){
cout << "_onCloseCB is nullptr" << endl;
}else{
//传的是this指针,但是这样会导致智能指针托管裸指针,所以用shared_from_this
_onCloseCB(shared_from_this());
}
}

bool TcpConnection::isclose(int fd) const {
char buf[10] = {0};
int ret = recv(fd,buf,sizeof(buf),MSG_PEEK);
return 0 == ret;
}

SocketIO.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#ifndef SOCKETIO
#define SOCKETIO


class SocketIO
{
public:
explicit SocketIO(int fd);
~SocketIO() {}

int readn(char * buf,int len);

int readLine(char *buf,int len);

int writen(const char *buf,int len);

private:
int _fd;
};


#endif

SocketIO.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#include "SocketIO.hpp"
#include <my_header.h>
SocketIO::SocketIO(int fd)
:_fd(fd)
{}

int SocketIO::readn(char * buf,int len){
int left = len;
char *pstr = buf;
int ret = 0;
while(left > 0){
ret = read(_fd,pstr,left);
if(-1 == ret && errno == EINTR){
//碰到中断错误,属于可恢复的错误
continue;
}else if(-1 == ret){
perror("read error");
return -1;
}else if(0 == ret){
//客户端断开连接
break;
}else{
pstr += ret;
left -= ret;
}
}

return len - left;
}

int SocketIO::readLine(char *buf,int len){
//涉及到read和recv的区别,recv可以将内核的数据拷贝一份,而read会直接将内核的数据给读出来,
//并且把内核这部分数据移除,这俩读一行,可能第一个读的长度已经包含好几行了,所以
//用read不行,会把别的行的数据误删掉,所以使用recv先拷贝一份,然后一个一个遍历
//找到\n就把之前的数据read出来
int left = len - 1;//换行符?
char *pstr = buf;
int ret = 0,total = 0;
while(left > 0){
ret = recv(_fd,pstr,left,MSG_PEEK);
if(-1 == ret && errno == EINTR){
continue;
}else if(0 == ret){
break;
}else if(-1 == ret){
perror("read error");
return -1;
}else{
for(int i = 0; i < ret; ++i){
if(pstr[i] == '\n'){
int sz = i + 1;
readn(pstr,sz);
pstr += sz;
*pstr = '\0';
//读取完一行数据
//
return total + sz;
}
}

readn(pstr,ret);
total += ret;
pstr += ret;
left -= ret;
}
}
*pstr = '\0';

return total - left;
}

int SocketIO::writen(const char *buf,int len){
int left = len;
const char * pstr = buf;
int ret = 0;
while(left > 0){
ret = write(_fd,pstr,left);
if(-1 == ret && errno == EINTR){
continue;
}else if(-1 == ret){
perror("write error");
break;
}else if(0 == ret){
break;
}else{
left -= ret;
pstr += ret;
}
}
return len - left;

}

EventLoop.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#ifndef EVENTLOOP
#define EVENTLOOP
#include <my_header.h>
#include <vector>
#include <map>
#include <memory>
#include <functional>
using namespace std;
class TcpConnection;
class Acceptor;

using TcpConnectionPtr = shared_ptr<TcpConnection>;
using TcpConnectionCallBack = function<void(const TcpConnectionPtr &)>;


class EventLoop
{
public:
EventLoop(Acceptor & acceptor ) ;
~EventLoop();

void loop();
void unloop();

private:

void waitEpollFd();

void handleNewConeection();

void handMessage(int fd);

int createEpollFd();

void addEpollReadFd(int fd);

void delEpollReadFd(int fd);

public:
void setNewConnectionCallBack(TcpConnectionCallBack && cb);

void setMessageCallBack(TcpConnectionCallBack && cb);

void setCloseCallBack(TcpConnectionCallBack && cb);

private:
int _epfd;
vector<struct epoll_event> _evtList;
bool _isLooping;
Acceptor & _acceptor;
map<int,TcpConnectionPtr> _conns;
TcpConnectionCallBack _onNewConnectionCB;
TcpConnectionCallBack _onMessageCB;
TcpConnectionCallBack _onCloseCB;


};


#endif

EventLoop.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#include "EventLoop.hpp"
#include "Acceptor.hpp"
#include "TcpConnection.hpp"
#include <iostream>
EventLoop::EventLoop(Acceptor & acceptor )
:_epfd(createEpollFd())
,_evtList(1024)
,_isLooping(false)
,_acceptor(acceptor)
{
//将listenFd加入红黑树监听
int listenFd = _acceptor.fd();
addEpollReadFd(listenFd);
}

EventLoop::~EventLoop(){
close(_epfd);
}

void EventLoop::loop(){
_isLooping = true;
waitEpollFd();
}

void EventLoop::unloop(){
_isLooping = false;
}

void EventLoop::waitEpollFd(){
while(_isLooping){
int nready = epoll_wait(_epfd,&*_evtList.begin(),_evtList.size(),3000);
if(nready == -1 && errno == EPOLLIN){
continue;
}else if(nready == -1){
perror("nready == -1");
delEpollReadFd(_epfd);
return;
}else if(0 == nready){
cout << ">> epoll_wait timeout!" << endl;
}else{
//有可能容量不够 需要手动扩容
if(nready == (int)_evtList.size()){
_evtList.resize(2*_evtList.size());
}
//有就绪连接
for(int i = 0; i < nready; ++i){
int fd = _evtList[i].data.fd;
if(fd == _acceptor.fd()){
//有新连接到来
handleNewConeection();
}else{
//旧连接有消息
handMessage(fd);
}
}//end for
}//end else
}//end while
}

void EventLoop::handleNewConeection(){
//处理新连接
int connfd = _acceptor.accept();
if(-1 == connfd){
perror("accept error");
return;
}
addEpollReadFd(connfd);

//创建TcpConnection对象
TcpConnectionPtr tcp(new TcpConnection(connfd));

//传递回调函数
tcp->setNewConnectionCallBack(move(_onNewConnectionCB));
tcp->setMessageCallBack(move(_onMessageCB));
tcp->setCloseCallBack(move(_onCloseCB));

//将文件描述符和tcpconnection 这对键值对存在map中
_conns[connfd] = tcp;

//到了建立新连接的触发时机了
tcp->handleNewConeectionCallBack();
}

void EventLoop::handMessage(int fd){
//先判断是否为一个合法的文件描述符
auto it = _conns.find(fd);
if(it != _conns.end()){
//判断该文件描述符的连接是否已经关闭了
int flag = it->second->isclose(fd);
if(flag){
//这里就该调用断开连接的回调函数了 因为是对方主动关闭
it->second->handleCloseCallBack();
delEpollReadFd(fd);
_conns.erase(it);

}
else{
//到了回调函数调用的时机
it->second->handleMessageCallBack();
}
}else{
cout << "error fd" << endl;
}
}

int EventLoop::createEpollFd(){
int fd = epoll_create(10);
if(fd < 0){
cout << "epoll_ctreate error" << endl;
return -1;
}
return fd;
}

void EventLoop::addEpollReadFd(int fd){
struct epoll_event evt;
evt.events = EPOLLIN;
evt.data.fd = fd;
int ret = epoll_ctl(_epfd,EPOLL_CTL_ADD,fd,&evt);
if(ret < 0){
cout << "epoll_ctl error" << endl;
return;
}
}

void EventLoop::delEpollReadFd(int fd){
struct epoll_event evt;
evt.events = EPOLLIN;
evt.data.fd = fd;
int ret = epoll_ctl(_epfd,EPOLL_CTL_DEL,fd,&evt);

if(ret < 0){
cout << "epoll_ctl error" << endl;
return;
}

}


void EventLoop::setNewConnectionCallBack(TcpConnectionCallBack && cb){
_onNewConnectionCB = move(cb);
}


void EventLoop::setMessageCallBack(TcpConnectionCallBack && cb){
_onMessageCB = move(cb);
}


void EventLoop::setCloseCallBack(TcpConnectionCallBack && cb){
_onCloseCB = move(cb);
}

Test.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <iostream>
#include "TcpConnection.hpp"
#include "Acceptor.hpp"
#include "EventLoop.hpp"
#include <memory>
void test1(){
Acceptor acc("127.0.0.1",8888);
acc.ready();
TcpConnection tcpc(acc.accept());
cout << "连接建立!" << endl;
while(1){
cout << ">> client :" <<tcpc.receive() << endl;
tcpc.send("i am server!\n");
}
}

void handleNewConeection(const TcpConnectionPtr & cb){
cout << "connect !!!" << endl;
}

void handleMessage(const TcpConnectionPtr & cb){
string msg = cb->receive();
cout << ">> client send msg :" << msg << endl;
cb->send(msg);
}

void handleClose(const TcpConnectionPtr & cb){
cout << "close !!!" << endl;
}

void test(){
Acceptor acc("127.0.0.1",8888);
acc.ready();
EventLoop loop(acc);
loop.setNewConnectionCallBack(handleNewConeection);
loop.setMessageCallBack(handleMessage);
loop.setCloseCallBack(handleClose);
loop.loop();

}


int main()
{
test();
return 0;
}