workflow充当客户端

redis任务

不需要回调 可以做写类型的指令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <workflow/WFFailities.h>
#include <signal.h>
static WFFacilities:WAitGrou waitGroup(1);//static表示只在本文件内部生效

void sigHandler(int signum){
waitGroup.done();
}


int main(){
signal(SIGINT,siganlder);
//创建redis任务
WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,nullptr);
//更改请求属性
prtocol::RedisRequest = *req = redisTask->get_req();
//可以执行redis指令
req->set_request("set",{"key","value"});
redisTask->start();
waitGroup.wait();
return 0;
}

需要回调,进行读类型的指令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <workflow/WFFailities.h>
#include <signal.h>
static WFFacilities:WAitGrou waitGroup(1);//static表示只在本文件内部生效

void sigHandler(int signum){
waitGroup.done();
}

void redisCallback(WFRedisTask * redisTask){
int state = redisTask->get_state();
int error = redisTask->get_error();
switch(state){
case WFT_STATE_SYS_ERROR:
cerr << "system error:" << strerror(error) << "\n";
break;
case WFT_STATE_DNS_ERROR:
cerr << "DNS error:" << gai_strerror(error) << '\n';
break;
case WFT_STATE_SSL_ERROR:
cerr << "SSL error :" << error << '\n';
break;
case WFT_STATE_TASK_ERROR:
cerr << "Task error :" << error << '\n';
break;
case WFT_STATE_SUCCESS:
break;
}//只能检查连接错误 不能检查业务的语法错误
//检查业务的语法错误
protocol:RedisResponse *resp = redisTask->get_resp();
potocol:RedisValue result;//用来保存redis返回的结果
resp->get_result(result);
if(result.is_error()){
state = WFT_STATE_TASK_ERROR;
}
if(state != WFT_STATE_SUCCESS){
cerr << "Failed. Press Ctrl-C to exit.\n";
return;
}
if(result.is_string()){
cerr << "result = " << result.string_value() << '\n';
}
}

int main(){
signal(SIGINT,siganlder);
//创建redis任务
WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);
//更改请求属性
prtocol::RedisRequest *req = redisTask->get_req();
//可以执行redis指令
req->set_request("get",{"key"});
redisTask->start();
waitGroup.wait();
return 0;
}

使用hgetall指令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include <workflow/WFFailities.h>
#include <signal.h>
static WFFacilities:WAitGrou waitGroup(1);//static表示只在本文件内部生效

void sigHandler(int signum){
waitGroup.done();
}

void redisCallback(WFRedisTask * redisTask){
int state = redisTask->get_state();
int error = redisTask->get_error();
switch(state){
case WFT_STATE_SYS_ERROR:
cerr << "system error:" << strerror(error) << "\n";
break;
case WFT_STATE_DNS_ERROR:
cerr << "DNS error:" << gai_strerror(error) << '\n';
break;
case WFT_STATE_SSL_ERROR:
cerr << "SSL error :" << error << '\n';
break;
case WFT_STATE_TASK_ERROR:
cerr << "Task error :" << error << '\n';
break;
case WFT_STATE_SUCCESS:
break;
}//只能检查连接错误 不能检查业务的语法错误
//检查业务的语法错误
protocol:RedisResponse *resp = redisTask->get_resp();
potocol:RedisValue result;//用来保存redis返回的结果
resp->get_result(result);
if(result.is_error()){
state = WFT_STATE_TASK_ERROR;
}
if(state != WFT_STATE_SUCCESS){
cerr << "Failed. Press Ctrl-C to exit.\n";
return;
}
if(result.is_string()){
cerr << "result = " << result.string_value() << '\n';
}
else if(result.is_array()){
for(int i = 0; i < result.arr_size();++i){
cerr << "i = " << i
<< "arr[i] = "
<< result.arr_at(i).string_value() << '\n';
}
}
}

int main(){
signal(SIGINT,siganlder);
//创建redis任务
WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);
//更改请求属性
prtocol::RedisRequest *req = redisTask->get_req();
//可以执行redis指令
req->set_request("hgetall",{"key"});
redisTask->start();
waitGroup.wait();
return 0;
}

需求:先set再get,要求第一个set做完之后再来做get

创建两个redis任务是不可以的,因为workflow是异步的,没法保证get一定再set之前执行

思路:把第二个任务的创建放在回调函数里面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#include <workflow/WFFailities.h>
#include <signal.h>
static WFFacilities:WAitGrou waitGroup(1);//static表示只在本文件内部生效

void sigHandler(int signum){
waitGroup.done();
}
void redisCallback2(WFRedisTask * redisTask){
int state = redisTask->get_state();
int error = redisTask->get_error();
switch(state){
case WFT_STATE_SYS_ERROR:
cerr << "system error:" << strerror(error) << "\n";
break;
case WFT_STATE_DNS_ERROR:
cerr << "DNS error:" << gai_strerror(error) << '\n';
break;
case WFT_STATE_SSL_ERROR:
cerr << "SSL error :" << error << '\n';
break;
case WFT_STATE_TASK_ERROR:
cerr << "Task error :" << error << '\n';
break;
case WFT_STATE_SUCCESS:
break;
}//只能检查连接错误 不能检查业务的语法错误
//检查业务的语法错误
protocol:RedisResponse *resp = redisTask->get_resp();
potocol:RedisValue result;//用来保存redis返回的结果
resp->get_result(result);
if(result.is_error()){
state = WFT_STATE_TASK_ERROR;
}
if(state != WFT_STATE_SUCCESS){
cerr << "Failed. Press Ctrl-C to exit.\n";
return;
}
if(result.is_string()){
cerr << "result = " << result.string_value() << '\n';
}
else if(result.is_array()){
for(int i = 0; i < result.arr_size();++i){
cerr << "i = " << i
<< "arr[i] = "
<< result.arr_at(i).string_value() << '\n';
}
}

}

void redisCallback1(WFRedisTask * redisTask){
int state = redisTask->get_state();
int error = redisTask->get_error();
switch(state){
case WFT_STATE_SYS_ERROR:
cerr << "system error:" << strerror(error) << "\n";
break;
case WFT_STATE_DNS_ERROR:
cerr << "DNS error:" << gai_strerror(error) << '\n';
break;
case WFT_STATE_SSL_ERROR:
cerr << "SSL error :" << error << '\n';
break;
case WFT_STATE_TASK_ERROR:
cerr << "Task error :" << error << '\n';
break;
case WFT_STATE_SUCCESS:
break;
}//只能检查连接错误 不能检查业务的语法错误
//检查业务的语法错误
protocol:RedisResponse *resp = redisTask->get_resp();
potocol:RedisValue result;//用来保存redis返回的结果
resp->get_result(result);
if(result.is_error()){
state = WFT_STATE_TASK_ERROR;
}
if(state != WFT_STATE_SUCCESS){
cerr << "Failed. Press Ctrl-C to exit.\n";
return;
}
if(result.is_string()){
cerr << "result = " << result.string_value() << '\n';
}
else if(result.is_array()){
for(int i = 0; i < result.arr_size();++i){
cerr << "i = " << i
<< "arr[i] = "
<< result.arr_at(i).string_value() << '\n';
}
}

WFRedisTask *redisTask2 = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback2);
//更改请求属性
prtocol::RedisRequest *req2 = redisTask2->get_req();
//可以执行redis指令
req->set_request("get",{"key"});
redisTask2->start();
}

int main(){
signal(SIGINT,siganlder);
//创建redis任务
WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback1);
//更改请求属性
prtocol::RedisRequest *req = redisTask->get_req();
//可以执行redis指令
req->set_request("set",{"key"});
redisTask->start();
waitGroup.wait();
return 0;
}

task2执行的时候,task1的回调不一定执行完了,为了解决这个问题框架提供了序列机制,是一个任务容器(队列),可以确保前面的任务做完了再做下一个任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#include <workflow/WFFailities.h>
#include <signal.h>
#include <workflow/workflow.h>
static WFFacilities:WAitGrou waitGroup(1);//static表示只在本文件内部生效

void sigHandler(int signum){
waitGroup.done();
}

void redisCallback2(WFRedisTask * redisTask){
int state = redisTask->get_state();
int error = redisTask->get_error();
switch(state){
case WFT_STATE_SYS_ERROR:
cerr << "system error:" << strerror(error) << "\n";
break;
case WFT_STATE_DNS_ERROR:
cerr << "DNS error:" << gai_strerror(error) << '\n';
break;
case WFT_STATE_SSL_ERROR:
cerr << "SSL error :" << error << '\n';
break;
case WFT_STATE_TASK_ERROR:
cerr << "Task error :" << error << '\n';
break;
case WFT_STATE_SUCCESS:
break;
}//只能检查连接错误 不能检查业务的语法错误
//检查业务的语法错误
protocol:RedisResponse *resp = redisTask->get_resp();
potocol:RedisValue result;//用来保存redis返回的结果
resp->get_result(result);
if(result.is_error()){
state = WFT_STATE_TASK_ERROR;
}
if(state != WFT_STATE_SUCCESS){
cerr << "Failed. Press Ctrl-C to exit.\n";
return;
}
if(result.is_string()){
cerr << "result = " << result.string_value() << '\n';
}
else if(result.is_array()){
for(int i = 0; i < result.arr_size();++i){
cerr << "i = " << i
<< "arr[i] = "
<< result.arr_at(i).string_value() << '\n';
}
}

}

void redisCallback1(WFRedisTask * redisTask){
int state = redisTask->get_state();
int error = redisTask->get_error();
switch(state){
case WFT_STATE_SYS_ERROR:
cerr << "system error:" << strerror(error) << "\n";
break;
case WFT_STATE_DNS_ERROR:
cerr << "DNS error:" << gai_strerror(error) << '\n';
break;
case WFT_STATE_SSL_ERROR:
cerr << "SSL error :" << error << '\n';
break;
case WFT_STATE_TASK_ERROR:
cerr << "Task error :" << error << '\n';
break;
case WFT_STATE_SUCCESS:
break;
}//只能检查连接错误 不能检查业务的语法错误
//检查业务的语法错误
protocol:RedisResponse *resp = redisTask->get_resp();
potocol:RedisValue result;//用来保存redis返回的结果
resp->get_result(result);
if(result.is_error()){
state = WFT_STATE_TASK_ERROR;
}
if(state != WFT_STATE_SUCCESS){
cerr << "Failed. Press Ctrl-C to exit.\n";
return;
}
if(result.is_string()){
cerr << "result = " << result.string_value() << '\n';
}
else if(result.is_array()){
for(int i = 0; i < result.arr_size();++i){
cerr << "i = " << i
<< "arr[i] = "
<< result.arr_at(i).string_value() << '\n';
}
}
}

int main(){
signal(SIGINT,siganlder);
//创建redis任务
WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);
//更改请求属性
prtocol::RedisRequest *req = redisTask->get_req();
//可以执行redis指令
req->set_request("set",{"key","value"});
WFRedisTask *redisTask2 = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback2);
//更改请求属性
prtocol::RedisRequest *req2 = redisTask2->get_req();
//可以执行redis指令
req->set_request("get",{"key"});
redisTask2->start();
//创建一个序列,往序列中添加任务
SeriesWork * series = workflow::create_series_work(redisTask1,nullptr);
series->push_back(redisTask2);
series->start();
waitGroup.wait();
return 0;
}

步骤:

  1. 创建序列中所有的任务
  2. 根据第一个任务去创建序列
  3. 将后续的任务push_back进序列里面
  4. 调用start启动序列

如果序列运行一半想加任务怎么把,workflow其实是吧每一个任务都转成一个序列再运行,所以workflow的最小执行单元是一个序列,所以任务在执行过程中,一定位于某一个序列中,所以要加任务就变成了已知任务要找序列,一般在任务的callback中调用series_of(找到任务当前的序列),因为在回调函数运行过程中,任务一定处于某一个序列中,所以一定可以找到序列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#include <workflow/WFFailities.h>
#include <signal.h>
#include <workflow/workflow.h>
static WFFacilities:WAitGrou waitGroup(1);//static表示只在本文件内部生效

void sigHandler(int signum){
waitGroup.done();
}
void sigHandler(int signum){
waitGroup.done();
}

void redisCallback2(WFRedisTask * redisTask){
int state = redisTask->get_state();
int error = redisTask->get_error();
switch(state){
case WFT_STATE_SYS_ERROR:
cerr << "system error:" << strerror(error) << "\n";
break;
case WFT_STATE_DNS_ERROR:
cerr << "DNS error:" << gai_strerror(error) << '\n';
break;
case WFT_STATE_SSL_ERROR:
cerr << "SSL error :" << error << '\n';
break;
case WFT_STATE_TASK_ERROR:
cerr << "Task error :" << error << '\n';
break;
case WFT_STATE_SUCCESS:
break;
}//只能检查连接错误 不能检查业务的语法错误
//检查业务的语法错误
protocol:RedisResponse *resp = redisTask->get_resp();
potocol:RedisValue result;//用来保存redis返回的结果
resp->get_result(result);
if(result.is_error()){
state = WFT_STATE_TASK_ERROR;
}
if(state != WFT_STATE_SUCCESS){
cerr << "Failed. Press Ctrl-C to exit.\n";
return;
}
if(result.is_string()){
cerr << "result = " << result.string_value() << '\n';
}
else if(result.is_array()){
for(int i = 0; i < result.arr_size();++i){
cerr << "i = " << i
<< "arr[i] = "
<< result.arr_at(i).string_value() << '\n';
}
}

}

void redisCallback1(WFRedisTask * redisTask){
int state = redisTask->get_state();
int error = redisTask->get_error();
switch(state){
case WFT_STATE_SYS_ERROR:
cerr << "system error:" << strerror(error) << "\n";
break;
case WFT_STATE_DNS_ERROR:
cerr << "DNS error:" << gai_strerror(error) << '\n';
break;
case WFT_STATE_SSL_ERROR:
cerr << "SSL error :" << error << '\n';
break;
case WFT_STATE_TASK_ERROR:
cerr << "Task error :" << error << '\n';
break;
case WFT_STATE_SUCCESS:
break;
}//只能检查连接错误 不能检查业务的语法错误
//检查业务的语法错误
protocol:RedisResponse *resp = redisTask->get_resp();
potocol:RedisValue result;//用来保存redis返回的结果
resp->get_result(result);
if(result.is_error()){
state = WFT_STATE_TASK_ERROR;
}
if(state != WFT_STATE_SUCCESS){
cerr << "Failed. Press Ctrl-C to exit.\n";
return;
}
if(result.is_string()){
cerr << "result = " << result.string_value() << '\n';
}
else if(result.is_array()){
for(int i = 0; i < result.arr_size();++i){
cerr << "i = " << i
<< "arr[i] = "
<< result.arr_at(i).string_value() << '\n';
}
}

SeriesWork *series = series_of(redisTask);
//创建redis任务
WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback2);
//更改请求属性
prtocol::RedisRequest *req = redisTask->get_req();
//可以执行redis指令
req->set_request("set",{"key","value"});
//往正在运行的序列中加一个任务
series->push_back(redisTask);

}

int main(){
signal(SIGINT,siganlder);
//创建redis任务
WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);
//更改请求属性
prtocol::RedisRequest *req = redisTask->get_req();
//可以执行redis指令
req->set_request("set",{"key","value"});
WFRedisTask *redisTask2 = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback2);
//更改请求属性
prtocol::RedisRequest *req2 = redisTask2->get_req();
//可以执行redis指令
req->set_request("get",{"key"});
redisTask2->start();
//创建一个序列,往序列中添加任务
SeriesWork * series = workflow::create_series_work(redisTask1,nullptr);
series->push_back(redisTask2);
series->start();
waitGroup.wait();
return 0;
}

序列的其他机制

在序列的任务之间共享数据

e20299cef1948c7d9083eb7488fad9f7.png

具体使用

需求,任务一创建context内存,并写入数据,任务二读取context,并修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#include <workflow/WFFailities.h>
#include <signal.h>
#include <workflow/workflow.h>

using namespace std;
//自定义的类 用于共享数据 使用struct是为了默认访问权限都是public
struct SerisContext{
int id;
string name;
}

static WFFacilities:WAitGrou waitGroup(1);//static表示只在本文件内部生效

void sigHandler(int signum){
waitGroup.done();
}
void sigHandler(int signum){
waitGroup.done();
}

void redisCallback2(WFRedisTask * redisTask){
int state = redisTask->get_state();
int error = redisTask->get_error();
switch(state){
case WFT_STATE_SYS_ERROR:
cerr << "system error:" << strerror(error) << "\n";
break;
case WFT_STATE_DNS_ERROR:
cerr << "DNS error:" << gai_strerror(error) << '\n';
break;
case WFT_STATE_SSL_ERROR:
cerr << "SSL error :" << error << '\n';
break;
case WFT_STATE_TASK_ERROR:
cerr << "Task error :" << error << '\n';
break;
case WFT_STATE_SUCCESS:
break;
}//只能检查连接错误 不能检查业务的语法错误
//检查业务的语法错误
protocol:RedisResponse *resp = redisTask->get_resp();
potocol:RedisValue result;//用来保存redis返回的结果
resp->get_result(result);
if(result.is_error()){
state = WFT_STATE_TASK_ERROR;
}
if(state != WFT_STATE_SUCCESS){
cerr << "Failed. Press Ctrl-C to exit.\n";
return;
}
if(result.is_string()){
cerr << "result = " << result.string_value() << '\n';
}
else if(result.is_array()){
for(int i = 0; i < result.arr_size();++i){
cerr << "i = " << i
<< "arr[i] = "
<< result.arr_at(i).string_value() << '\n';
}
}

}

void redisCallback1(WFRedisTask * redisTask){
int state = redisTask->get_state();
int error = redisTask->get_error();
switch(state){
case WFT_STATE_SYS_ERROR:
cerr << "system error:" << strerror(error) << "\n";
break;
case WFT_STATE_DNS_ERROR:
cerr << "DNS error:" << gai_strerror(error) << '\n';
break;
case WFT_STATE_SSL_ERROR:
cerr << "SSL error :" << error << '\n';
break;
case WFT_STATE_TASK_ERROR:
cerr << "Task error :" << error << '\n';
break;
case WFT_STATE_SUCCESS:
break;
}//只能检查连接错误 不能检查业务的语法错误
//检查业务的语法错误
protocol:RedisResponse *resp = redisTask->get_resp();
potocol:RedisValue result;//用来保存redis返回的结果
resp->get_result(result);
if(result.is_error()){
state = WFT_STATE_TASK_ERROR;
}
if(state != WFT_STATE_SUCCESS){
cerr << "Failed. Press Ctrl-C to exit.\n";
return;
}
if(result.is_string()){
cerr << "result = " << result.string_value() << '\n';
}
else if(result.is_array()){
for(int i = 0; i < result.arr_size();++i){
cerr << "i = " << i
<< "arr[i] = "
<< result.arr_at(i).string_value() << '\n';
}
}
//----------------------------------

//申请一片内存
SeriesContext * context = new SeriesContext();
context->id = 1000;
context->name = "redisTask1";
//set_context 将内存的首地址作为序列的属性存起来
series_of(redisTask)->set_context(context);

//-----------------------------------
SeriesWork *series = series_of(redisTask);
//创建redis任务
WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback2);
//更改请求属性
prtocol::RedisRequest *req = redisTask->get_req();
//可以执行redis指令
req->set_request("set",{"key","value"});
//往正在运行的序列中加一个任务
series->push_back(redisTask);

}

int main(){
signal(SIGINT,siganlder);
//创建redis任务
WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);
//更改请求属性
prtocol::RedisRequest *req = redisTask->get_req();
//可以执行redis指令
req->set_request("set",{"key","value"});
WFRedisTask *redisTask2 = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback2);
//更改请求属性
prtocol::RedisRequest *req2 = redisTask2->get_req();
//可以执行redis指令
req->set_request("get",{"key"});
redisTask2->start();
//创建一个序列,往序列中添加任务
SeriesWork * series = workflow::create_series_work(redisTask1,nullptr);
series->push_back(redisTask2);
series->start();
waitGroup.wait();
return 0;
}

第一步完成 做第二步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#include <workflow/WFFailities.h>
#include <signal.h>
#include <workflow/workflow.h>

using namespace std;
//自定义的类 用于共享数据 使用struct是为了默认访问权限都是public
struct SerisContext{
int id;
string name;
}

static WFFacilities:WAitGrou waitGroup(1);//static表示只在本文件内部生效

void sigHandler(int signum){
waitGroup.done();
}
void sigHandler(int signum){
waitGroup.done();
}

void redisCallback2(WFRedisTask * redisTask){
int state = redisTask->get_state();
int error = redisTask->get_error();
switch(state){
case WFT_STATE_SYS_ERROR:
cerr << "system error:" << strerror(error) << "\n";
break;
case WFT_STATE_DNS_ERROR:
cerr << "DNS error:" << gai_strerror(error) << '\n';
break;
case WFT_STATE_SSL_ERROR:
cerr << "SSL error :" << error << '\n';
break;
case WFT_STATE_TASK_ERROR:
cerr << "Task error :" << error << '\n';
break;
case WFT_STATE_SUCCESS:
break;
}//只能检查连接错误 不能检查业务的语法错误
//检查业务的语法错误
protocol:RedisResponse *resp = redisTask->get_resp();
potocol:RedisValue result;//用来保存redis返回的结果
resp->get_result(result);
if(result.is_error()){
state = WFT_STATE_TASK_ERROR;
}
if(state != WFT_STATE_SUCCESS){
cerr << "Failed. Press Ctrl-C to exit.\n";
return;
}
if(result.is_string()){
cerr << "result = " << result.string_value() << '\n';
}
else if(result.is_array()){
for(int i = 0; i < result.arr_size();++i){
cerr << "i = " << i
<< "arr[i] = "
<< result.arr_at(i).string_value() << '\n';
}
}
//----------------------------------
//获取并把void*进行类型转换
SeriesContext *context = static_cast<SeriesCOntext *>(series_of(redisTask)->get_context());

context->id = 1001;
context->name = "redisTask2";

//----------------------------------
}

void redisCallback1(WFRedisTask * redisTask){
int state = redisTask->get_state();
int error = redisTask->get_error();
switch(state){
case WFT_STATE_SYS_ERROR:
cerr << "system error:" << strerror(error) << "\n";
break;
case WFT_STATE_DNS_ERROR:
cerr << "DNS error:" << gai_strerror(error) << '\n';
break;
case WFT_STATE_SSL_ERROR:
cerr << "SSL error :" << error << '\n';
break;
case WFT_STATE_TASK_ERROR:
cerr << "Task error :" << error << '\n';
break;
case WFT_STATE_SUCCESS:
break;
}//只能检查连接错误 不能检查业务的语法错误
//检查业务的语法错误
protocol:RedisResponse *resp = redisTask->get_resp();
potocol:RedisValue result;//用来保存redis返回的结果
resp->get_result(result);
if(result.is_error()){
state = WFT_STATE_TASK_ERROR;
}
if(state != WFT_STATE_SUCCESS){
cerr << "Failed. Press Ctrl-C to exit.\n";
return;
}
if(result.is_string()){
cerr << "result = " << result.string_value() << '\n';
}
else if(result.is_array()){
for(int i = 0; i < result.arr_size();++i){
cerr << "i = " << i
<< "arr[i] = "
<< result.arr_at(i).string_value() << '\n';
}
}


//申请一片内存
SeriesContext * context = new SeriesContext();
context->id = 1000;
context->name = "redisTask1";
//set_context 将内存的首地址作为序列的属性存起来
series_of(redisTask)->set_context(context);


SeriesWork *series = series_of(redisTask);
//创建redis任务
WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback2);
//更改请求属性
protocol::RedisRequest *req = redisTask->get_req();
//可以执行redis指令
req->set_request("set",{"key","value"});
//往正在运行的序列中加一个任务
series->push_back(redisTask);

}

int main(){
signal(SIGINT,siganlder);
//创建redis任务
WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);
//更改请求属性
prtocol::RedisRequest *req = redisTask->get_req();
//可以执行redis指令
req->set_request("set",{"key","value"});
WFRedisTask *redisTask2 = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback2);
//更改请求属性
prtocol::RedisRequest *req2 = redisTask2->get_req();
//可以执行redis指令
req->set_request("get",{"key"});
redisTask2->start();
//创建一个序列,往序列中添加任务
SeriesWork * series = workflow::create_series_work(redisTask1,nullptr);
series->push_back(redisTask2);
series->start();
waitGroup.wait();
return 0;
}

上述写法会出现内存泄漏,所以可以在序列的回调函数把申请的共享内存给释放掉

方式一 设置回调

1
2
3
4
5
6
7
8
9
10
void seriesCallback(const SeriesWork *sries){
//提取context
SeriesContext *context = static_cast<SeriesCOntext *>(series->get_context());
delete context;
}
void redisCallback1(){
...;
series_of(redisTask)>set_callback(seriesCallback)
}

方式二 使用lambda表达式在new之后立刻给序列加一个callback,callback里面复制delete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
void redisCallback1(WFRedisTask * redisTask){
int state = redisTask->get_state();
int error = redisTask->get_error();
switch(state){
case WFT_STATE_SYS_ERROR:
cerr << "system error:" << strerror(error) << "\n";
break;
case WFT_STATE_DNS_ERROR:
cerr << "DNS error:" << gai_strerror(error) << '\n';
break;
case WFT_STATE_SSL_ERROR:
cerr << "SSL error :" << error << '\n';
break;
case WFT_STATE_TASK_ERROR:
cerr << "Task error :" << error << '\n';
break;
case WFT_STATE_SUCCESS:
break;
}//只能检查连接错误 不能检查业务的语法错误
//检查业务的语法错误
protocol:RedisResponse *resp = redisTask->get_resp();
potocol:RedisValue result;//用来保存redis返回的结果
resp->get_result(result);
if(result.is_error()){
state = WFT_STATE_TASK_ERROR;
}
if(state != WFT_STATE_SUCCESS){
cerr << "Failed. Press Ctrl-C to exit.\n";
return;
}
if(result.is_string()){
cerr << "result = " << result.string_value() << '\n';
}
else if(result.is_array()){
for(int i = 0; i < result.arr_size();++i){
cerr << "i = " << i
<< "arr[i] = "
<< result.arr_at(i).string_value() << '\n';
}
}


//申请一片内存
SeriesContext * context = new SeriesContext();
context->id = 1000;
context->name = "redisTask1";
//set_context 将内存的首地址作为序列的属性存起来
series_of(redisTask)->set_context(context);
//----------------------------------
series_of(redisTask)>set_callback([context](const SeriesWork *series){
delete context;
})
//---------------------------------
SeriesWork *series = series_of(redisTask);
//创建redis任务
WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback2);
//更改请求属性
prtocol::RedisRequest *req = redisTask->get_req();
//可以执行redis指令
req->set_request("set",{"key","value"});
//往正在运行的序列中加一个任务
series->push_back(redisTask);

}

并行任务

需求:访问三个任务,取其中最长的一个放到redis里面,是先并联再串联

826994355cc1edf3387598a586483422.png

思路:可否把访问三个任务这个并行任务看成一个整体,然后后访问redis构成一个串联序列

并行任务的特点:

  1. 并行任务是任务,可以放入序列里面
  2. 并行任务很“大”,其内部有多个并行的序列(小序列),小序列之间是并行的,
  3. 但是只有内部的所有小序列都执行完了,并行任务的基本工作才算做完。

放在workflow.h里面这里面放的并联和串联,两个大特性构成workflow的基石

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//获取所有的请求信息和响应信息
#include <workflow/WFFailities.h>
//WFTaskFactory.h文件里面包含了所有创建任务相关的函数
//包含处理http的一些工具
#include <workflow/HttpUtil.h>
#include <workflow/WFTaskFactory.h>
#include <signal.h>
#include <workflow/workflow.h>
#include <vector>
static WFFacilities:WAitGrou waitGroup(1);//static表示只在本文件内部生效
struct SeriesContext{
string url;
size_t size;//长度
};


void sigHandler(int signum){
waitGroup.done();
}

void httpCallback(WFHttpTask *httpTask){
const void *body;
size_t size;
httpTask->get_resp()->get_parsed_body(&body,&size);
SeriesContext *context = static_cast<SeriesContext *>(series_of(httpTask)->get_context());
context->size = size;//获取请求长度
}


int main(){
signal(SIGINT,siganlder);
//创建并行任务集,但只是个空壳子,里面还没加任务
ParallelWork *parallel = Workflow::create_parallel_work(nullptr);
vector<string> urls = {"http://www.baidu.com","http://www.jd.com","http://www.taobao.com"};
for(int i = 0; i < 3; ++i){
WFHttpTask *httpTask = WFTaskFactory::create_http_task(urls[i],10,10,httpCallback);//创建好了一个任务,根据这个任务创建序列
//申请一个context 希望保留访问的网址
SeriesContext*context = new SeriesContext();
context->url = urls[i];

SeriesWork *series= Workflow::create_series_work(httpTask,nullptr);//要每次请求完成之后都保存报文的长度,放到context中存
series->set_context(context);
//把创建好的series加入并行任务的那个壳子里
parallel->add_series(series);

}
//并行任务里面放着三个并行的序列
//启动并行任务
parallel->start();
waitGroup.wait();
}

结构图

28df6f943de64b90195489200fa3529f.png

三个请求都完成了,接下来要找最长的报文了,所以接下来要走并行任务的回调了

689da501a2622c2a025746ebe890e6be.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
//获取所有的请求信息和响应信息
#include <workflow/WFFailities.h>
//WFTaskFactory.h文件里面包含了所有创建任务相关的函数
//包含处理http的一些工具
#include <workflow/HttpUtil.h>
#include <workflow/WFTaskFactory.h>
#include <signal.h>
#include <workflow/workflow.h>
#include <vector>
static WFFacilities:WAitGrou waitGroup(1);//static表示只在本文件内部生效
struct SeriesContext{
string url;
size_t size;//长度
};


void sigHandler(int signum){
waitGroup.done();
}

void httpCallback(WFHttpTask *httpTask){
const void *body;
size_t size;
httpTask->get_resp()->get_parsed_body(&body,&size);
SeriesContext *context = static_cast<SeriesContext *>(series_of(httpTask)>get_context());
context->seze = size;//获取请求长度
}
//--------------------------------
void parallelCallback(const ParallelWork *parallel){
//在并行任务的回调当中,可以访问所有的子序列以及其context
string key;
size_t value = 0;

for(int i = 0; i < parallel->size(); ++i){//size可以获取子序列的个数
SeriesContext *context = static_cast<SeriesContext *>(parallel->series_at(i)->get_context());
//获取context
//找最大size
if(context->size > value){
key = context->url;
value = context->size;
}
delete context;
}
}
//----------------------------------
int main(){
signal(SIGINT,siganlder);
//创建并行任务集,但只是个空壳子,里面还没加任务
ParallelWork *parallel = Workflow::create_parallel_work(parallelCallback);
vector<string> urls = {"http://www.baidu.com","http://www.jd.com","http://www.taobao.com"};
for(int i = 0; i < 3; ++i){
WFHttpTask *httpTask = WFTaskFactory::create_http_task(urls[i],10,10,httpCallback);//创建好了一个任务,根据这个任务创建序列
//申请一个context 希望保留访问的网址
SeriesContext*context = new SeriesContext();
context->url = urls[i];

SeriesWork *series= Workflow::create_series_work(httpTask,nullptr);//要每次请求完成之后都保存报文的长度,放到context中存
series->set_context(context);
//把创建好的series加入并行任务的那个壳子里
parallel->add_series(series);

}
//并行任务里面放着三个并行的序列
//启动并行任务
parallel->start();
waitGroup.wait();
}

并行任务完成之后,要把最长的交给redis,相当于在并行任务的后面又串联了一个任务

351aa35fc4437b8543ff318d7d8383de.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
//获取所有的请求信息和响应信息
#include <workflow/WFFailities.h>
//WFTaskFactory.h文件里面包含了所有创建任务相关的函数
//包含处理http的一些工具
#include <workflow/HttpUtil.h>
#include <workflow/WFTaskFactory.h>
#include <signal.h>
#include <workflow/workflow.h>
#include <vector>
static WFFacilities:WAitGrou waitGroup(1);//static表示只在本文件内部生效
struct SeriesContext{
string url;
size_t size;//长度
};


void sigHandler(int signum){
waitGroup.done();
}

void httpCallback(WFHttpTask *httpTask){
const void *body;
size_t size;
httpTask->get_resp()->get_parsed_body(&body,&size);
SeriesContext *context = static_cast<SeriesContext *>(series_of(httpTask)>get_context());
context->seze = size;//获取请求长度
}

void parallelCallback(const ParallelWork *parallel){
//在并行任务的回调当中,可以访问所有的子序列以及其context
string key;
size_t value = 0;

for(int i = 0; i < parallel->size(); ++i){//size可以获取子序列的个数
SeriesContext *context = static_cast<SeriesContext *>(parallel->series_at(i)->get_context());
//获取context
//找最大size
if(context->size > value){
key = context->url;
value = context->size;
}
delete context;
}
//-----------------------------
WFRedisTask * rediask = WFTaskFactory::create_redis_task("redis::127.0.0.1:6379",10,nullptr);
redisTask->get_req()->set_request("SET",{key,std::t_string(value)});
//并行任务所在的大序列
series_of(parallel)->push_back(redisTask);
//-----------------------------
}

int main(){
signal(SIGINT,siganlder);
//创建并行任务集,但只是个空壳子,里面还没加任务
ParallelWork *parallel = Workflow::create_parallel_work(parallelCallback);
vector<string> urls = {"http://www.baidu.com","http://www.jd.com","http://www.taobao.com"};
for(int i = 0; i < 3; ++i){
WFHttpTask *httpTask = WFTaskFactory::create_http_task(urls[i],10,10,httpCallback);//创建好了一个任务,根据这个任务创建序列
//申请一个context 希望保留访问的网址
SeriesContext*context = new SeriesContext();
context->url = urls[i];

SeriesWork *series= Workflow::create_series_work(httpTask,nullptr);//要每次请求完成之后都保存报文的长度,放到context中存
series->set_context(context);
//把创建好的series加入并行任务的那个壳子里
parallel->add_series(series);

}
//并行任务里面放着三个并行的序列
//启动并行任务
parallel->start();
waitGroup.wait();
}

总结

1.创建并行任务:Workflow:create_parallel_work

2.创建小任务->小序列

3.add_series 把小序列添加到并行任务中

4.等到所有的并行序列执行完以后,执行回调,在回调中可以访问小序列的信息,也可以在大序列中添加新的任务

workflow充当服务端

当设计一个服务端的时候,首先要考虑被动,所以意味着要存在一个对象,并且该对象要永远存在,负责客户端的接入&数据传输,并决定创建序列和任务,执行业务逻辑

客户端的任务逻辑和服务端有区别

客户端方面:需要做的任务分为

1.准备请求 –>设置任务属性

2.发请求&等待&手响应–>对于所有的用户都一样,交给框架写,实现任务的基本工作

3.根据响应执行业务—>回调函数

服务端方面:

1.收请求–>这部分都是一样的

2.根据请求内容去执行不同的业务 –>任务的基本工作是用户的代码

3.回复响应–>这部分是一样的

workflow针对服务端任务做了特殊设计

  1. 存在一个server对象,等待新用户的连接,被动的处理客户端的连接和发送数据

  2. 一旦有客户端连接了server对象,server对象就创建一个特殊的任务(服务端的http任务)和序列

    这个对象的工作由用户的代码(process)决定,用于生成响应,在process执行的过程中,可以往序列中加新的任务

    回复消息的时机是序列中新加的任务做完那一刻。

    process的回调放在回复消息之后执行

    62005f91a2aac003fae9a05cf12d1bf2.png

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <workflow/WFFailities.h>
#include <workflow/WFHttpServer.h>
//WFHttpServer.h包含了server类的设计
#include <signal.h>
static WFFacilities:WAitGrou waitGroup(1);//static表示只在本文件内部生效

void sigHandler(int signum){
waitGroup.done();
}

void process(WFHttpTask *serverTask){
//用于连接来一个就打一个helloworld
cout << "helloworld" << endl;
}

int main(){
signal(SIGINT,siganlder);
WFHttpServer server(process);
if(server.start(12345) == 0){//start(int port) 让服务器处理等待连接状态
waitGroup.wait();
server.stop();
}else{
//启动失败
perror("server start failed!");
return -1;
}

return 0;
}

Echo业务

客户端发什么报文体,服务端回什么样的报文体

思路:

  1. 要收请求
  2. 回响应
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <workflow/WFFailities.h>
#include <workflow/WFHttpServer.h>
//WFHttpServer.h包含了server类的设计
#include <signal.h>
static WFFacilities:WAitGrou waitGroup(1);//static表示只在本文件内部生效

void sigHandler(int signum){
waitGroup.done();
}

void process(WFHttpTask *serverTask){
//echo业务
//收请求
protocol::HttpRequest *req = serverTask->get_req();
const void *body;
size_t size;
req->get_parsed_body(&body,&size);

//找到响应的地址
protocol::HttpResponse *resp = serverTask0->get_resp();
resp->append_output_body(body,size);//把一段内存内容作为报文回复
}

int main(){
signal(SIGINT,siganlder);
WFHttpServer server(process);
if(server.start(12345) == 0){//start(int port) 让服务器处理等待连接状态
waitGroup.wait();
server.stop();
}else{
//启动失败
perror("server start failed!");
return -1;
}

return 0;
}

登陆业务

这次单纯的process不行了,还需要访问redis,(这里服务器访问redis,对于redis来说服务器是客户端)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include <workflow/WFFailities.h>
#include <workflow/WFHttpServer.h>
//WFHttpServer.h包含了server类的设计
#include <signal.h>
static WFFacilities:WAitGrou waitGroup(1);//static表示只在本文件内部生效

struct SeriesContext{
string name;
string password;
};

void sigHandler(int signum){
waitGroup.done();
}


void redisCallback(WFRedisTask * redisTask){
//获取redis的执行结果
protocol::RedisValue result;
protocol::RedisResponse *resp = redisTask->get_resp();
resp->get_result(result);
//访问context
SeriesContext *context = static_cast<SeriesContext *>(series_of(redisTask)->get_context());
if(result.is_string() && result.string_value() == context->password){
cout << context->name << "登录成功" << endl;
}else{
cout << context->name << "fail\n";
}
}

void process(WFHttpTask *serverTask){
//获取uri
protocol::HttpRequest *req = serverTask->get_req();
string uri = req->get_reques_uri();
//获取用户名和密码
string nameKV = uri.substr(0,uri.find('&'));
string passwordKV = uri.substr(uri.find('&')+1);
string name = nameKV.substr(nameKV.find("=")+1);
string password =passwordKV.substr(passwordKV.find("=")+1);

//申请一篇内存空间,用于序列内的任务共享数据
SeriesContext *cotext = new SeriesContext();
context->name = name;
context->password = password;
series_of(serverTask)->set_context(context);


//创建一个redis任务
WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);
//要查询redis看密码对不对 hget userdb username
redisTask->get_req()->set_request("HGET",{"userdb",name});
series_of(serverTask)->push_back(redisTask);
}

int main(){
signal(SIGINT,siganlder);
WFHttpServer server(process);
if(server.start(12345) == 0){//start(int port) 让服务器处理等待连接状态
waitGroup.wait();
server.stop();
}else{
//启动失败
perror("server start failed!");
return -1;
}

return 0;
}

现在服务器可以判断是否登录成功了,但是没有把响应发送给客户端,发送给客户端的响应是在process的参数serverTask的resp决定的,为了构造响应,在SeriesContext添加上resp数据成员

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include <workflow/WFFailities.h>
#include <workflow/WFHttpServer.h>
//WFHttpServer.h包含了server类的设计
#include <signal.h>
static WFFacilities:WAitGrou waitGroup(1);//static表示只在本文件内部生效

struct SeriesContext{
string name;
string password;
protocol::HttpRequest *req
};

void sigHandler(int signum){
waitGroup.done();
}


void redisCallback(WFRedisTask * redisTask){
//获取redis的执行结果
protocol::RedisValue result;
protocol::RedisResponse *resp = redisTask->get_resp();
resp->get_result(result);
//访问context
SeriesContext *context = static_cast<SeriesContext *>(series_of(redisTask)->get_context());
if(result.is_string() && result.string_value() == context->password){
cout << context->name << "登录成功" << endl;
context->resp->addpend_output_body("<html>success</html>");
}else{
cout << context->name << "fail\n";
context->resp->addpend_output_body("<html>fail</html>");
}
}

void process(WFHttpTask *serverTask){
//获取uri
protocol::HttpRequest *req = serverTask->get_req();
string uri = req->get_reques_uri();
//获取用户名和密码
string nameKV = uri.substr(0,uri.find('&'));
string passwordKV = uri.substr(uri.find('&')+1);
string name = nameKV.substr(nameKV.find("=")+1);
string password =passwordKV.substr(passwordKV.find("=")+1);

//申请一篇内存空间,用于序列内的任务共享数据
SeriesContext *cotext = new SeriesContext();
context->name = name;
context->password = password;
context->resp = serverTask->get_resp();
series_of(serverTask)->set_context(context);


//创建一个redis任务
WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback);
//要查询redis看密码对不对 hget userdb username
redisTask->get_req()->set_request("HGET",{"userdb",name});
series_of(serverTask)->push_back(redisTask);
}

int main(){
signal(SIGINT,siganlder);
WFHttpServer server(process);
if(server.start(12345) == 0){//start(int port) 让服务器处理等待连接状态
waitGroup.wait();
server.stop();
}else{
//启动失败
perror("server start failed!");
return -1;
}

return 0;
}

最后上述代码有内存泄漏,释放内存可以放在series的回调中释放

实现反向代理(和登录业务同理)

d96b9e9be40cbd5c5e3ae2e2e1f2cb19.png

实现静态资源服务器(和登陆业务同理)

708a5ae36a732f50cb94d53353f05bdb.png

workflow

  • 基本任务 http任务 redis任务

  • 序列 先一个任务 再一个任务

    context机制 在序列中的不同任务里面共享数据

  • 并行任务 ”大”任务

服务端

 因为被动执行,需要一个server

​ 当有客户端接入时,立刻创建一个序列

​ 序列以特殊的serverTask开始(serverTask基本工作 时用户写的,是process函数)

​ 在process后面添加的任务执行完成以后,在他的回调执行之前,回复给客户端的响应。