workflow充当客户端
redis任务
不需要回调 可以做写类型的指令
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| #include <workflow/WFFailities.h> #include <signal.h> static WFFacilities:WAitGrou waitGroup(1);
void sigHandler(int signum){ waitGroup.done(); }
int main(){ signal(SIGINT,siganlder); WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,nullptr); prtocol::RedisRequest = *req = redisTask->get_req(); req->set_request("set",{"key","value"}); redisTask->start(); waitGroup.wait(); return 0; }
|
需要回调,进行读类型的指令
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| #include <workflow/WFFailities.h> #include <signal.h> static WFFacilities:WAitGrou waitGroup(1);
void sigHandler(int signum){ waitGroup.done(); }
void redisCallback(WFRedisTask * redisTask){ int state = redisTask->get_state(); int error = redisTask->get_error(); switch(state){ case WFT_STATE_SYS_ERROR: cerr << "system error:" << strerror(error) << "\n"; break; case WFT_STATE_DNS_ERROR: cerr << "DNS error:" << gai_strerror(error) << '\n'; break; case WFT_STATE_SSL_ERROR: cerr << "SSL error :" << error << '\n'; break; case WFT_STATE_TASK_ERROR: cerr << "Task error :" << error << '\n'; break; case WFT_STATE_SUCCESS: break; } protocol:RedisResponse *resp = redisTask->get_resp(); potocol:RedisValue result; resp->get_result(result); if(result.is_error()){ state = WFT_STATE_TASK_ERROR; } if(state != WFT_STATE_SUCCESS){ cerr << "Failed. Press Ctrl-C to exit.\n"; return; } if(result.is_string()){ cerr << "result = " << result.string_value() << '\n'; } }
int main(){ signal(SIGINT,siganlder); WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback); prtocol::RedisRequest *req = redisTask->get_req(); req->set_request("get",{"key"}); redisTask->start(); waitGroup.wait(); return 0; }
|
使用hgetall指令
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| #include <workflow/WFFailities.h> #include <signal.h> static WFFacilities:WAitGrou waitGroup(1);
void sigHandler(int signum){ waitGroup.done(); }
void redisCallback(WFRedisTask * redisTask){ int state = redisTask->get_state(); int error = redisTask->get_error(); switch(state){ case WFT_STATE_SYS_ERROR: cerr << "system error:" << strerror(error) << "\n"; break; case WFT_STATE_DNS_ERROR: cerr << "DNS error:" << gai_strerror(error) << '\n'; break; case WFT_STATE_SSL_ERROR: cerr << "SSL error :" << error << '\n'; break; case WFT_STATE_TASK_ERROR: cerr << "Task error :" << error << '\n'; break; case WFT_STATE_SUCCESS: break; } protocol:RedisResponse *resp = redisTask->get_resp(); potocol:RedisValue result; resp->get_result(result); if(result.is_error()){ state = WFT_STATE_TASK_ERROR; } if(state != WFT_STATE_SUCCESS){ cerr << "Failed. Press Ctrl-C to exit.\n"; return; } if(result.is_string()){ cerr << "result = " << result.string_value() << '\n'; } else if(result.is_array()){ for(int i = 0; i < result.arr_size();++i){ cerr << "i = " << i << "arr[i] = " << result.arr_at(i).string_value() << '\n'; } } }
int main(){ signal(SIGINT,siganlder); WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback); prtocol::RedisRequest *req = redisTask->get_req(); req->set_request("hgetall",{"key"}); redisTask->start(); waitGroup.wait(); return 0; }
|
需求:先set再get,要求第一个set做完之后再来做get
创建两个redis任务是不可以的,因为workflow是异步的,没法保证get一定再set之前执行
思路:把第二个任务的创建放在回调函数里面
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
| #include <workflow/WFFailities.h> #include <signal.h> static WFFacilities:WAitGrou waitGroup(1);
void sigHandler(int signum){ waitGroup.done(); } void redisCallback2(WFRedisTask * redisTask){ int state = redisTask->get_state(); int error = redisTask->get_error(); switch(state){ case WFT_STATE_SYS_ERROR: cerr << "system error:" << strerror(error) << "\n"; break; case WFT_STATE_DNS_ERROR: cerr << "DNS error:" << gai_strerror(error) << '\n'; break; case WFT_STATE_SSL_ERROR: cerr << "SSL error :" << error << '\n'; break; case WFT_STATE_TASK_ERROR: cerr << "Task error :" << error << '\n'; break; case WFT_STATE_SUCCESS: break; } protocol:RedisResponse *resp = redisTask->get_resp(); potocol:RedisValue result; resp->get_result(result); if(result.is_error()){ state = WFT_STATE_TASK_ERROR; } if(state != WFT_STATE_SUCCESS){ cerr << "Failed. Press Ctrl-C to exit.\n"; return; } if(result.is_string()){ cerr << "result = " << result.string_value() << '\n'; } else if(result.is_array()){ for(int i = 0; i < result.arr_size();++i){ cerr << "i = " << i << "arr[i] = " << result.arr_at(i).string_value() << '\n'; } } }
void redisCallback1(WFRedisTask * redisTask){ int state = redisTask->get_state(); int error = redisTask->get_error(); switch(state){ case WFT_STATE_SYS_ERROR: cerr << "system error:" << strerror(error) << "\n"; break; case WFT_STATE_DNS_ERROR: cerr << "DNS error:" << gai_strerror(error) << '\n'; break; case WFT_STATE_SSL_ERROR: cerr << "SSL error :" << error << '\n'; break; case WFT_STATE_TASK_ERROR: cerr << "Task error :" << error << '\n'; break; case WFT_STATE_SUCCESS: break; } protocol:RedisResponse *resp = redisTask->get_resp(); potocol:RedisValue result; resp->get_result(result); if(result.is_error()){ state = WFT_STATE_TASK_ERROR; } if(state != WFT_STATE_SUCCESS){ cerr << "Failed. Press Ctrl-C to exit.\n"; return; } if(result.is_string()){ cerr << "result = " << result.string_value() << '\n'; } else if(result.is_array()){ for(int i = 0; i < result.arr_size();++i){ cerr << "i = " << i << "arr[i] = " << result.arr_at(i).string_value() << '\n'; } } WFRedisTask *redisTask2 = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback2); prtocol::RedisRequest *req2 = redisTask2->get_req(); req->set_request("get",{"key"}); redisTask2->start(); }
int main(){ signal(SIGINT,siganlder); WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback1); prtocol::RedisRequest *req = redisTask->get_req(); req->set_request("set",{"key"}); redisTask->start(); waitGroup.wait(); return 0; }
|
task2执行的时候,task1的回调不一定执行完了,为了解决这个问题框架提供了序列机制,是一个任务容器(队列),可以确保前面的任务做完了再做下一个任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
| #include <workflow/WFFailities.h> #include <signal.h> #include <workflow/workflow.h> static WFFacilities:WAitGrou waitGroup(1);
void sigHandler(int signum){ waitGroup.done(); }
void redisCallback2(WFRedisTask * redisTask){ int state = redisTask->get_state(); int error = redisTask->get_error(); switch(state){ case WFT_STATE_SYS_ERROR: cerr << "system error:" << strerror(error) << "\n"; break; case WFT_STATE_DNS_ERROR: cerr << "DNS error:" << gai_strerror(error) << '\n'; break; case WFT_STATE_SSL_ERROR: cerr << "SSL error :" << error << '\n'; break; case WFT_STATE_TASK_ERROR: cerr << "Task error :" << error << '\n'; break; case WFT_STATE_SUCCESS: break; } protocol:RedisResponse *resp = redisTask->get_resp(); potocol:RedisValue result; resp->get_result(result); if(result.is_error()){ state = WFT_STATE_TASK_ERROR; } if(state != WFT_STATE_SUCCESS){ cerr << "Failed. Press Ctrl-C to exit.\n"; return; } if(result.is_string()){ cerr << "result = " << result.string_value() << '\n'; } else if(result.is_array()){ for(int i = 0; i < result.arr_size();++i){ cerr << "i = " << i << "arr[i] = " << result.arr_at(i).string_value() << '\n'; } } }
void redisCallback1(WFRedisTask * redisTask){ int state = redisTask->get_state(); int error = redisTask->get_error(); switch(state){ case WFT_STATE_SYS_ERROR: cerr << "system error:" << strerror(error) << "\n"; break; case WFT_STATE_DNS_ERROR: cerr << "DNS error:" << gai_strerror(error) << '\n'; break; case WFT_STATE_SSL_ERROR: cerr << "SSL error :" << error << '\n'; break; case WFT_STATE_TASK_ERROR: cerr << "Task error :" << error << '\n'; break; case WFT_STATE_SUCCESS: break; } protocol:RedisResponse *resp = redisTask->get_resp(); potocol:RedisValue result; resp->get_result(result); if(result.is_error()){ state = WFT_STATE_TASK_ERROR; } if(state != WFT_STATE_SUCCESS){ cerr << "Failed. Press Ctrl-C to exit.\n"; return; } if(result.is_string()){ cerr << "result = " << result.string_value() << '\n'; } else if(result.is_array()){ for(int i = 0; i < result.arr_size();++i){ cerr << "i = " << i << "arr[i] = " << result.arr_at(i).string_value() << '\n'; } } }
int main(){ signal(SIGINT,siganlder); WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback); prtocol::RedisRequest *req = redisTask->get_req(); req->set_request("set",{"key","value"}); WFRedisTask *redisTask2 = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback2); prtocol::RedisRequest *req2 = redisTask2->get_req(); req->set_request("get",{"key"}); redisTask2->start(); SeriesWork * series = workflow::create_series_work(redisTask1,nullptr); series->push_back(redisTask2); series->start(); waitGroup.wait(); return 0; }
|
步骤:
- 创建序列中所有的任务
- 根据第一个任务去创建序列
- 将后续的任务push_back进序列里面
- 调用start启动序列
如果序列运行一半想加任务怎么把,workflow其实是吧每一个任务都转成一个序列再运行,所以workflow的最小执行单元是一个序列,所以任务在执行过程中,一定位于某一个序列中,所以要加任务就变成了已知任务要找序列,一般在任务的callback中调用series_of(找到任务当前的序列),因为在回调函数运行过程中,任务一定处于某一个序列中,所以一定可以找到序列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
| #include <workflow/WFFailities.h> #include <signal.h> #include <workflow/workflow.h> static WFFacilities:WAitGrou waitGroup(1);
void sigHandler(int signum){ waitGroup.done(); } void sigHandler(int signum){ waitGroup.done(); }
void redisCallback2(WFRedisTask * redisTask){ int state = redisTask->get_state(); int error = redisTask->get_error(); switch(state){ case WFT_STATE_SYS_ERROR: cerr << "system error:" << strerror(error) << "\n"; break; case WFT_STATE_DNS_ERROR: cerr << "DNS error:" << gai_strerror(error) << '\n'; break; case WFT_STATE_SSL_ERROR: cerr << "SSL error :" << error << '\n'; break; case WFT_STATE_TASK_ERROR: cerr << "Task error :" << error << '\n'; break; case WFT_STATE_SUCCESS: break; } protocol:RedisResponse *resp = redisTask->get_resp(); potocol:RedisValue result; resp->get_result(result); if(result.is_error()){ state = WFT_STATE_TASK_ERROR; } if(state != WFT_STATE_SUCCESS){ cerr << "Failed. Press Ctrl-C to exit.\n"; return; } if(result.is_string()){ cerr << "result = " << result.string_value() << '\n'; } else if(result.is_array()){ for(int i = 0; i < result.arr_size();++i){ cerr << "i = " << i << "arr[i] = " << result.arr_at(i).string_value() << '\n'; } } }
void redisCallback1(WFRedisTask * redisTask){ int state = redisTask->get_state(); int error = redisTask->get_error(); switch(state){ case WFT_STATE_SYS_ERROR: cerr << "system error:" << strerror(error) << "\n"; break; case WFT_STATE_DNS_ERROR: cerr << "DNS error:" << gai_strerror(error) << '\n'; break; case WFT_STATE_SSL_ERROR: cerr << "SSL error :" << error << '\n'; break; case WFT_STATE_TASK_ERROR: cerr << "Task error :" << error << '\n'; break; case WFT_STATE_SUCCESS: break; } protocol:RedisResponse *resp = redisTask->get_resp(); potocol:RedisValue result; resp->get_result(result); if(result.is_error()){ state = WFT_STATE_TASK_ERROR; } if(state != WFT_STATE_SUCCESS){ cerr << "Failed. Press Ctrl-C to exit.\n"; return; } if(result.is_string()){ cerr << "result = " << result.string_value() << '\n'; } else if(result.is_array()){ for(int i = 0; i < result.arr_size();++i){ cerr << "i = " << i << "arr[i] = " << result.arr_at(i).string_value() << '\n'; } } SeriesWork *series = series_of(redisTask); WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback2); prtocol::RedisRequest *req = redisTask->get_req(); req->set_request("set",{"key","value"}); series->push_back(redisTask); }
int main(){ signal(SIGINT,siganlder); WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback); prtocol::RedisRequest *req = redisTask->get_req(); req->set_request("set",{"key","value"}); WFRedisTask *redisTask2 = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback2); prtocol::RedisRequest *req2 = redisTask2->get_req(); req->set_request("get",{"key"}); redisTask2->start(); SeriesWork * series = workflow::create_series_work(redisTask1,nullptr); series->push_back(redisTask2); series->start(); waitGroup.wait(); return 0; }
|
序列的其他机制
在序列的任务之间共享数据

具体使用
需求,任务一创建context内存,并写入数据,任务二读取context,并修改
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
| #include <workflow/WFFailities.h> #include <signal.h> #include <workflow/workflow.h>
using namespace std;
struct SerisContext{ int id; string name; }
static WFFacilities:WAitGrou waitGroup(1);
void sigHandler(int signum){ waitGroup.done(); } void sigHandler(int signum){ waitGroup.done(); }
void redisCallback2(WFRedisTask * redisTask){ int state = redisTask->get_state(); int error = redisTask->get_error(); switch(state){ case WFT_STATE_SYS_ERROR: cerr << "system error:" << strerror(error) << "\n"; break; case WFT_STATE_DNS_ERROR: cerr << "DNS error:" << gai_strerror(error) << '\n'; break; case WFT_STATE_SSL_ERROR: cerr << "SSL error :" << error << '\n'; break; case WFT_STATE_TASK_ERROR: cerr << "Task error :" << error << '\n'; break; case WFT_STATE_SUCCESS: break; } protocol:RedisResponse *resp = redisTask->get_resp(); potocol:RedisValue result; resp->get_result(result); if(result.is_error()){ state = WFT_STATE_TASK_ERROR; } if(state != WFT_STATE_SUCCESS){ cerr << "Failed. Press Ctrl-C to exit.\n"; return; } if(result.is_string()){ cerr << "result = " << result.string_value() << '\n'; } else if(result.is_array()){ for(int i = 0; i < result.arr_size();++i){ cerr << "i = " << i << "arr[i] = " << result.arr_at(i).string_value() << '\n'; } } }
void redisCallback1(WFRedisTask * redisTask){ int state = redisTask->get_state(); int error = redisTask->get_error(); switch(state){ case WFT_STATE_SYS_ERROR: cerr << "system error:" << strerror(error) << "\n"; break; case WFT_STATE_DNS_ERROR: cerr << "DNS error:" << gai_strerror(error) << '\n'; break; case WFT_STATE_SSL_ERROR: cerr << "SSL error :" << error << '\n'; break; case WFT_STATE_TASK_ERROR: cerr << "Task error :" << error << '\n'; break; case WFT_STATE_SUCCESS: break; } protocol:RedisResponse *resp = redisTask->get_resp(); potocol:RedisValue result; resp->get_result(result); if(result.is_error()){ state = WFT_STATE_TASK_ERROR; } if(state != WFT_STATE_SUCCESS){ cerr << "Failed. Press Ctrl-C to exit.\n"; return; } if(result.is_string()){ cerr << "result = " << result.string_value() << '\n'; } else if(result.is_array()){ for(int i = 0; i < result.arr_size();++i){ cerr << "i = " << i << "arr[i] = " << result.arr_at(i).string_value() << '\n'; } }
SeriesContext * context = new SeriesContext(); context->id = 1000; context->name = "redisTask1"; series_of(redisTask)->set_context(context);
SeriesWork *series = series_of(redisTask); WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback2); prtocol::RedisRequest *req = redisTask->get_req(); req->set_request("set",{"key","value"}); series->push_back(redisTask); }
int main(){ signal(SIGINT,siganlder); WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback); prtocol::RedisRequest *req = redisTask->get_req(); req->set_request("set",{"key","value"}); WFRedisTask *redisTask2 = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback2); prtocol::RedisRequest *req2 = redisTask2->get_req(); req->set_request("get",{"key"}); redisTask2->start(); SeriesWork * series = workflow::create_series_work(redisTask1,nullptr); series->push_back(redisTask2); series->start(); waitGroup.wait(); return 0; }
|
第一步完成 做第二步
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
| #include <workflow/WFFailities.h> #include <signal.h> #include <workflow/workflow.h>
using namespace std;
struct SerisContext{ int id; string name; }
static WFFacilities:WAitGrou waitGroup(1);
void sigHandler(int signum){ waitGroup.done(); } void sigHandler(int signum){ waitGroup.done(); }
void redisCallback2(WFRedisTask * redisTask){ int state = redisTask->get_state(); int error = redisTask->get_error(); switch(state){ case WFT_STATE_SYS_ERROR: cerr << "system error:" << strerror(error) << "\n"; break; case WFT_STATE_DNS_ERROR: cerr << "DNS error:" << gai_strerror(error) << '\n'; break; case WFT_STATE_SSL_ERROR: cerr << "SSL error :" << error << '\n'; break; case WFT_STATE_TASK_ERROR: cerr << "Task error :" << error << '\n'; break; case WFT_STATE_SUCCESS: break; } protocol:RedisResponse *resp = redisTask->get_resp(); potocol:RedisValue result; resp->get_result(result); if(result.is_error()){ state = WFT_STATE_TASK_ERROR; } if(state != WFT_STATE_SUCCESS){ cerr << "Failed. Press Ctrl-C to exit.\n"; return; } if(result.is_string()){ cerr << "result = " << result.string_value() << '\n'; } else if(result.is_array()){ for(int i = 0; i < result.arr_size();++i){ cerr << "i = " << i << "arr[i] = " << result.arr_at(i).string_value() << '\n'; } }
SeriesContext *context = static_cast<SeriesCOntext *>(series_of(redisTask)->get_context()); context->id = 1001; context->name = "redisTask2";
}
void redisCallback1(WFRedisTask * redisTask){ int state = redisTask->get_state(); int error = redisTask->get_error(); switch(state){ case WFT_STATE_SYS_ERROR: cerr << "system error:" << strerror(error) << "\n"; break; case WFT_STATE_DNS_ERROR: cerr << "DNS error:" << gai_strerror(error) << '\n'; break; case WFT_STATE_SSL_ERROR: cerr << "SSL error :" << error << '\n'; break; case WFT_STATE_TASK_ERROR: cerr << "Task error :" << error << '\n'; break; case WFT_STATE_SUCCESS: break; } protocol:RedisResponse *resp = redisTask->get_resp(); potocol:RedisValue result; resp->get_result(result); if(result.is_error()){ state = WFT_STATE_TASK_ERROR; } if(state != WFT_STATE_SUCCESS){ cerr << "Failed. Press Ctrl-C to exit.\n"; return; } if(result.is_string()){ cerr << "result = " << result.string_value() << '\n'; } else if(result.is_array()){ for(int i = 0; i < result.arr_size();++i){ cerr << "i = " << i << "arr[i] = " << result.arr_at(i).string_value() << '\n'; } }
SeriesContext * context = new SeriesContext(); context->id = 1000; context->name = "redisTask1"; series_of(redisTask)->set_context(context);
SeriesWork *series = series_of(redisTask); WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback2); protocol::RedisRequest *req = redisTask->get_req(); req->set_request("set",{"key","value"}); series->push_back(redisTask); }
int main(){ signal(SIGINT,siganlder); WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback); prtocol::RedisRequest *req = redisTask->get_req(); req->set_request("set",{"key","value"}); WFRedisTask *redisTask2 = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback2); prtocol::RedisRequest *req2 = redisTask2->get_req(); req->set_request("get",{"key"}); redisTask2->start(); SeriesWork * series = workflow::create_series_work(redisTask1,nullptr); series->push_back(redisTask2); series->start(); waitGroup.wait(); return 0; }
|
上述写法会出现内存泄漏,所以可以在序列的回调函数把申请的共享内存给释放掉
方式一 设置回调
1 2 3 4 5 6 7 8 9 10
| void seriesCallback(const SeriesWork *sries){ SeriesContext *context = static_cast<SeriesCOntext *>(series->get_context()); delete context; } void redisCallback1(){ ...; series_of(redisTask)>set_callback(seriesCallback) }
|
方式二 使用lambda表达式在new之后立刻给序列加一个callback,callback里面复制delete
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| void redisCallback1(WFRedisTask * redisTask){ int state = redisTask->get_state(); int error = redisTask->get_error(); switch(state){ case WFT_STATE_SYS_ERROR: cerr << "system error:" << strerror(error) << "\n"; break; case WFT_STATE_DNS_ERROR: cerr << "DNS error:" << gai_strerror(error) << '\n'; break; case WFT_STATE_SSL_ERROR: cerr << "SSL error :" << error << '\n'; break; case WFT_STATE_TASK_ERROR: cerr << "Task error :" << error << '\n'; break; case WFT_STATE_SUCCESS: break; } protocol:RedisResponse *resp = redisTask->get_resp(); potocol:RedisValue result; resp->get_result(result); if(result.is_error()){ state = WFT_STATE_TASK_ERROR; } if(state != WFT_STATE_SUCCESS){ cerr << "Failed. Press Ctrl-C to exit.\n"; return; } if(result.is_string()){ cerr << "result = " << result.string_value() << '\n'; } else if(result.is_array()){ for(int i = 0; i < result.arr_size();++i){ cerr << "i = " << i << "arr[i] = " << result.arr_at(i).string_value() << '\n'; } }
SeriesContext * context = new SeriesContext(); context->id = 1000; context->name = "redisTask1"; series_of(redisTask)->set_context(context);
series_of(redisTask)>set_callback([context](const SeriesWork *series){ delete context; })
SeriesWork *series = series_of(redisTask); WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback2); prtocol::RedisRequest *req = redisTask->get_req(); req->set_request("set",{"key","value"}); series->push_back(redisTask); }
|
并行任务
需求:访问三个任务,取其中最长的一个放到redis里面,是先并联再串联

思路:可否把访问三个任务这个并行任务看成一个整体,然后后访问redis构成一个串联序列
并行任务的特点:
- 并行任务是任务,可以放入序列里面
- 并行任务很“大”,其内部有多个并行的序列(小序列),小序列之间是并行的,
- 但是只有内部的所有小序列都执行完了,并行任务的基本工作才算做完。
放在workflow.h里面这里面放的并联和串联,两个大特性构成workflow的基石
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| #include <workflow/WFFailities.h>
#include <workflow/HttpUtil.h> #include <workflow/WFTaskFactory.h> #include <signal.h> #include <workflow/workflow.h> #include <vector> static WFFacilities:WAitGrou waitGroup(1); struct SeriesContext{ string url; size_t size; };
void sigHandler(int signum){ waitGroup.done(); }
void httpCallback(WFHttpTask *httpTask){ const void *body; size_t size; httpTask->get_resp()->get_parsed_body(&body,&size); SeriesContext *context = static_cast<SeriesContext *>(series_of(httpTask)->get_context()); context->size = size; }
int main(){ signal(SIGINT,siganlder); ParallelWork *parallel = Workflow::create_parallel_work(nullptr); vector<string> urls = {"http://www.baidu.com","http://www.jd.com","http://www.taobao.com"}; for(int i = 0; i < 3; ++i){ WFHttpTask *httpTask = WFTaskFactory::create_http_task(urls[i],10,10,httpCallback); SeriesContext*context = new SeriesContext(); context->url = urls[i]; SeriesWork *series= Workflow::create_series_work(httpTask,nullptr); series->set_context(context); parallel->add_series(series); } parallel->start(); waitGroup.wait(); }
|
结构图

三个请求都完成了,接下来要找最长的报文了,所以接下来要走并行任务的回调了

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| #include <workflow/WFFailities.h>
#include <workflow/HttpUtil.h> #include <workflow/WFTaskFactory.h> #include <signal.h> #include <workflow/workflow.h> #include <vector> static WFFacilities:WAitGrou waitGroup(1); struct SeriesContext{ string url; size_t size; };
void sigHandler(int signum){ waitGroup.done(); }
void httpCallback(WFHttpTask *httpTask){ const void *body; size_t size; httpTask->get_resp()->get_parsed_body(&body,&size); SeriesContext *context = static_cast<SeriesContext *>(series_of(httpTask)>get_context()); context->seze = size; }
void parallelCallback(const ParallelWork *parallel){ string key; size_t value = 0; for(int i = 0; i < parallel->size(); ++i){ SeriesContext *context = static_cast<SeriesContext *>(parallel->series_at(i)->get_context()); if(context->size > value){ key = context->url; value = context->size; } delete context; } }
int main(){ signal(SIGINT,siganlder); ParallelWork *parallel = Workflow::create_parallel_work(parallelCallback); vector<string> urls = {"http://www.baidu.com","http://www.jd.com","http://www.taobao.com"}; for(int i = 0; i < 3; ++i){ WFHttpTask *httpTask = WFTaskFactory::create_http_task(urls[i],10,10,httpCallback); SeriesContext*context = new SeriesContext(); context->url = urls[i]; SeriesWork *series= Workflow::create_series_work(httpTask,nullptr); series->set_context(context); parallel->add_series(series); } parallel->start(); waitGroup.wait(); }
|
并行任务完成之后,要把最长的交给redis,相当于在并行任务的后面又串联了一个任务

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| #include <workflow/WFFailities.h>
#include <workflow/HttpUtil.h> #include <workflow/WFTaskFactory.h> #include <signal.h> #include <workflow/workflow.h> #include <vector> static WFFacilities:WAitGrou waitGroup(1); struct SeriesContext{ string url; size_t size; };
void sigHandler(int signum){ waitGroup.done(); }
void httpCallback(WFHttpTask *httpTask){ const void *body; size_t size; httpTask->get_resp()->get_parsed_body(&body,&size); SeriesContext *context = static_cast<SeriesContext *>(series_of(httpTask)>get_context()); context->seze = size; }
void parallelCallback(const ParallelWork *parallel){ string key; size_t value = 0; for(int i = 0; i < parallel->size(); ++i){ SeriesContext *context = static_cast<SeriesContext *>(parallel->series_at(i)->get_context()); if(context->size > value){ key = context->url; value = context->size; } delete context; }
WFRedisTask * rediask = WFTaskFactory::create_redis_task("redis::127.0.0.1:6379",10,nullptr); redisTask->get_req()->set_request("SET",{key,std::t_string(value)}); series_of(parallel)->push_back(redisTask);
}
int main(){ signal(SIGINT,siganlder); ParallelWork *parallel = Workflow::create_parallel_work(parallelCallback); vector<string> urls = {"http://www.baidu.com","http://www.jd.com","http://www.taobao.com"}; for(int i = 0; i < 3; ++i){ WFHttpTask *httpTask = WFTaskFactory::create_http_task(urls[i],10,10,httpCallback); SeriesContext*context = new SeriesContext(); context->url = urls[i]; SeriesWork *series= Workflow::create_series_work(httpTask,nullptr); series->set_context(context); parallel->add_series(series); } parallel->start(); waitGroup.wait(); }
|
总结
1.创建并行任务:Workflow:create_parallel_work
2.创建小任务->小序列
3.add_series 把小序列添加到并行任务中
4.等到所有的并行序列执行完以后,执行回调,在回调中可以访问小序列的信息,也可以在大序列中添加新的任务
workflow充当服务端
当设计一个服务端的时候,首先要考虑被动,所以意味着要存在一个对象,并且该对象要永远存在,负责客户端的接入&数据传输,并决定创建序列和任务,执行业务逻辑
客户端的任务逻辑和服务端有区别
客户端方面:需要做的任务分为
1.准备请求 –>设置任务属性
2.发请求&等待&手响应–>对于所有的用户都一样,交给框架写,实现任务的基本工作
3.根据响应执行业务—>回调函数
服务端方面:
1.收请求–>这部分都是一样的
2.根据请求内容去执行不同的业务 –>任务的基本工作是用户的代码
3.回复响应–>这部分是一样的
workflow针对服务端任务做了特殊设计
存在一个server对象,等待新用户的连接,被动的处理客户端的连接和发送数据
一旦有客户端连接了server对象,server对象就创建一个特殊的任务(服务端的http任务)和序列
这个对象的工作由用户的代码(process)决定,用于生成响应,在process执行的过程中,可以往序列中加新的任务
回复消息的时机是序列中新加的任务做完那一刻。
process的回调放在回复消息之后执行

代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| #include <workflow/WFFailities.h> #include <workflow/WFHttpServer.h>
#include <signal.h> static WFFacilities:WAitGrou waitGroup(1);
void sigHandler(int signum){ waitGroup.done(); }
void process(WFHttpTask *serverTask){ cout << "helloworld" << endl; }
int main(){ signal(SIGINT,siganlder); WFHttpServer server(process); if(server.start(12345) == 0){ waitGroup.wait(); server.stop(); }else{ perror("server start failed!"); return -1; } return 0; }
|
Echo业务
客户端发什么报文体,服务端回什么样的报文体
思路:
- 要收请求
- 回响应
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| #include <workflow/WFFailities.h> #include <workflow/WFHttpServer.h>
#include <signal.h> static WFFacilities:WAitGrou waitGroup(1);
void sigHandler(int signum){ waitGroup.done(); }
void process(WFHttpTask *serverTask){ protocol::HttpRequest *req = serverTask->get_req(); const void *body; size_t size; req->get_parsed_body(&body,&size); protocol::HttpResponse *resp = serverTask0->get_resp(); resp->append_output_body(body,size); }
int main(){ signal(SIGINT,siganlder); WFHttpServer server(process); if(server.start(12345) == 0){ waitGroup.wait(); server.stop(); }else{ perror("server start failed!"); return -1; } return 0; }
|
登陆业务
这次单纯的process不行了,还需要访问redis,(这里服务器访问redis,对于redis来说服务器是客户端)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| #include <workflow/WFFailities.h> #include <workflow/WFHttpServer.h>
#include <signal.h> static WFFacilities:WAitGrou waitGroup(1);
struct SeriesContext{ string name; string password; };
void sigHandler(int signum){ waitGroup.done(); }
void redisCallback(WFRedisTask * redisTask){ protocol::RedisValue result; protocol::RedisResponse *resp = redisTask->get_resp(); resp->get_result(result); SeriesContext *context = static_cast<SeriesContext *>(series_of(redisTask)->get_context()); if(result.is_string() && result.string_value() == context->password){ cout << context->name << "登录成功" << endl; }else{ cout << context->name << "fail\n"; } }
void process(WFHttpTask *serverTask){ protocol::HttpRequest *req = serverTask->get_req(); string uri = req->get_reques_uri(); string nameKV = uri.substr(0,uri.find('&')); string passwordKV = uri.substr(uri.find('&')+1); string name = nameKV.substr(nameKV.find("=")+1); string password =passwordKV.substr(passwordKV.find("=")+1); SeriesContext *cotext = new SeriesContext(); context->name = name; context->password = password; series_of(serverTask)->set_context(context); WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback); redisTask->get_req()->set_request("HGET",{"userdb",name}); series_of(serverTask)->push_back(redisTask); }
int main(){ signal(SIGINT,siganlder); WFHttpServer server(process); if(server.start(12345) == 0){ waitGroup.wait(); server.stop(); }else{ perror("server start failed!"); return -1; } return 0; }
|
现在服务器可以判断是否登录成功了,但是没有把响应发送给客户端,发送给客户端的响应是在process的参数serverTask的resp决定的,为了构造响应,在SeriesContext添加上resp数据成员
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| #include <workflow/WFFailities.h> #include <workflow/WFHttpServer.h>
#include <signal.h> static WFFacilities:WAitGrou waitGroup(1);
struct SeriesContext{ string name; string password; protocol::HttpRequest *req };
void sigHandler(int signum){ waitGroup.done(); }
void redisCallback(WFRedisTask * redisTask){ protocol::RedisValue result; protocol::RedisResponse *resp = redisTask->get_resp(); resp->get_result(result); SeriesContext *context = static_cast<SeriesContext *>(series_of(redisTask)->get_context()); if(result.is_string() && result.string_value() == context->password){ cout << context->name << "登录成功" << endl; context->resp->addpend_output_body("<html>success</html>"); }else{ cout << context->name << "fail\n"; context->resp->addpend_output_body("<html>fail</html>"); } }
void process(WFHttpTask *serverTask){ protocol::HttpRequest *req = serverTask->get_req(); string uri = req->get_reques_uri(); string nameKV = uri.substr(0,uri.find('&')); string passwordKV = uri.substr(uri.find('&')+1); string name = nameKV.substr(nameKV.find("=")+1); string password =passwordKV.substr(passwordKV.find("=")+1); SeriesContext *cotext = new SeriesContext(); context->name = name; context->password = password; context->resp = serverTask->get_resp(); series_of(serverTask)->set_context(context); WFRedisTask *redisTask = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379",10,redisCallback); redisTask->get_req()->set_request("HGET",{"userdb",name}); series_of(serverTask)->push_back(redisTask); }
int main(){ signal(SIGINT,siganlder); WFHttpServer server(process); if(server.start(12345) == 0){ waitGroup.wait(); server.stop(); }else{ perror("server start failed!"); return -1; } return 0; }
|
最后上述代码有内存泄漏,释放内存可以放在series的回调中释放
实现反向代理(和登录业务同理)

实现静态资源服务器(和登陆业务同理)

workflow
服务端
因为被动执行,需要一个server
当有客户端接入时,立刻创建一个序列
序列以特殊的serverTask开始(serverTask基本工作 时用户写的,是process函数)
在process后面添加的任务执行完成以后,在他的回调执行之前,回复给客户端的响应。