c++线程池
使用面向对象的方式实现:

代码实现:
Task.hpp
1 2 3 4 5 6 7 8 9 10 11 12 13
| #ifndef TASK #define TASK
class Task { public: Task() {} ~Task() {} virtual void process() = 0;
}; #endif
|
Task是一个纯虚函数,所以有任务来就继承Task,用多态方式来实现派生类任务的实现
TaskQueue.hpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| #ifndef TASKQUEUE #define TASKQUEUE
#include <queue> #include <mutex> #include <condition_variable> class Task; using namespace std;
class TaskQueue {
using ElemType = Task *; public: TaskQueue(size_t queuesize); void push(ElemType task);
ElemType pop();
bool isEmpty(); bool isFull();
void weakAll();
private: size_t _queueSize; queue<ElemType> _que; mutex _mutex; condition_variable _notEmpty; condition_variable _notFull; bool _flag; };
#endif
|
TaskQueue.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| #include <iostream> #include "TaskQueue.hpp"
TaskQueue::TaskQueue(size_t queuesize) :_queueSize(queuesize) ,_que() ,_mutex() ,_notEmpty() ,_notFull() ,_flag(false) {} void TaskQueue::push(ElemType task){ unique_lock<mutex> ul(_mutex); while(isFull()){ _notFull.wait(ul); } _que.push(task); _notEmpty.notify_one(); }
TaskQueue::ElemType TaskQueue::pop(){
unique_lock<mutex> ul(_mutex); while(isEmpty()&&! _flag){ _notEmpty.wait(ul); } if(!_flag){ ElemType ret = _que.front(); _que.pop(); _notFull.notify_one(); return ret; } return nullptr; }
bool TaskQueue::isEmpty(){ return _que.size() == 0; } bool TaskQueue::isFull(){ return _que.size() == _queueSize; }
void TaskQueue::weakAll(){ _flag = true; _notEmpty.notify_all(); }
|
ThreadPool.hpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| #ifndef THREADPOOL #define THREADPOOL #include <iostream> #include <thread> #include <vector> #include "TaskQueue.hpp" using namespace std;
class ThreadPool { public: ThreadPool(size_t threadNum, size_t queSize); ~ThreadPool();
void start();
void stop();
void addTask(Task * ptask); private: Task * getTask();
void doTask(); private: size_t _threadNum; vector<thread> _threads; size_t _queSize; TaskQueue _taskQue; bool _isExit; };
#endif
|
ThreadPool.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| #include <iostream> #include "ThreadPool.hpp" #include "Task.hpp" ThreadPool::ThreadPool(size_t threadNum, size_t queSize) :_threadNum(threadNum) ,_queSize(queSize) ,_taskQue(queSize) ,_isExit(false) {}
ThreadPool::~ThreadPool(){}
void ThreadPool::start(){ for(size_t i = 0; i < _threadNum; ++i){ _threads.push_back(thread(&ThreadPool::doTask,this)); }
}
void ThreadPool::stop(){ while(!_taskQue.isEmpty()){ this_thread::sleep_for(chrono::seconds(1)); } _isExit = true; _taskQue.weakAll(); for(auto & td:_threads){ td.join(); } }
void ThreadPool::addTask(Task * ptask){ if(ptask){ _taskQue.push(ptask); } }
Task * ThreadPool::getTask(){ return _taskQue.pop(); }
void ThreadPool::doTask(){ while(!_isExit){ Task *ptask = getTask(); if(ptask){ ptask->process(); }else{ cout << "ptask is nullptr" << endl; } } }
|
Test.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| #include <iostream> #include "ThreadPool.hpp" #include "Task.hpp" #include <unistd.h> #include <time.h> #include <memory>
class MyTask :public Task { public: void process() override{ ::srand(::clock()); int num = ::rand()%100; cout << "<< Task process :"<< num << endl; } };
void test(){
unique_ptr<Task> spt(new MyTask()); ThreadPool tp(4,10); tp.start(); for(int i = 0; i < 20; ++i){ tp.addTask(spt.get()); }
tp.stop(); }
int main() { test(); return 0; }
|
基于对象的实现
使用bind和function来不使用继承的方式来实现多态

Task不再是一个纯虚函数了
1 2 3 4 5 6 7 8 9 10 11 12 13
| #ifndef MYTASK #define MYTASK
class MyTask { public: MyTask() {} ~MyTask() {} void process();
}; #endif
|
Task.cc
1 2 3 4 5 6 7 8 9
| #include <iostream> #include "Task.hpp" using std::cout; using std::endl;
void MyTask::process(){ cout << "i am process" << endl; }
|
TaskQueue.hpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| #ifndef TASKQUEUE #define TASKQUEUE
#include <queue> #include <mutex> #include <condition_variable> #include <functional>
using namespace std; using Task = function<void()>;
class TaskQueue {
using Task = function<void()>; public: TaskQueue(size_t queuesize); void push(Task &&task);
Task pop();
bool isEmpty(); bool isFull();
void weakAll();
private: size_t _queueSize; queue<Task> _que; mutex _mutex; condition_variable _notEmpty; condition_variable _notFull; bool _flag; };
#endif
|
TaskQueue.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| #include <iostream> #include "TaskQueue.hpp"
TaskQueue::TaskQueue(size_t queuesize) :_queueSize(queuesize) ,_que() ,_mutex() ,_notEmpty() ,_notFull() ,_flag(false) {} void TaskQueue::push(Task && task){ unique_lock<mutex> ul(_mutex); while(isFull()){ _notFull.wait(ul); } _que.push(move(task)); _notEmpty.notify_one(); }
TaskQueue::Task TaskQueue::pop(){
unique_lock<mutex> ul(_mutex); while(isEmpty()&&! _flag){ _notEmpty.wait(ul); } if(!_flag){ Task ret = _que.front(); _que.pop(); _notFull.notify_one(); return ret; } return nullptr; }
bool TaskQueue::isEmpty(){ return _que.size() == 0; } bool TaskQueue::isFull(){ return _que.size() == _queueSize; }
void TaskQueue::weakAll(){ _flag = true; _notEmpty.notify_all(); }
|
ThreadPool.hpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| #ifndef THREADPOOL #define THREADPOOL #include <iostream> #include <thread> #include <vector> #include <functional> #include "TaskQueue.hpp" using namespace std;
class ThreadPool { public: ThreadPool(size_t threadNum, size_t queSize); ~ThreadPool();
void start();
void stop();
void addTask(Task && ptask); private: Task getTask();
void doTask(); private: size_t _threadNum; vector<thread> _threads; size_t _queSize; TaskQueue _taskQue; bool _isExit; };
#endif
|
ThreadPool.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| #include <iostream> #include "ThreadPool.hpp" #include "Task.hpp" ThreadPool::ThreadPool(size_t threadNum, size_t queSize) :_threadNum(threadNum) ,_queSize(queSize) ,_taskQue(queSize) ,_isExit(false) {}
ThreadPool::~ThreadPool(){}
void ThreadPool::start(){ for(size_t i = 0; i < _threadNum; ++i){ _threads.push_back(thread(&ThreadPool::doTask,this)); }
}
void ThreadPool::stop(){ while(!_taskQue.isEmpty()){ this_thread::sleep_for(chrono::seconds(1)); } _isExit = true; _taskQue.weakAll(); for(auto & td:_threads){ td.join(); } }
void ThreadPool::addTask(Task && ptask){ if(ptask){ _taskQue.push(move(ptask)); } }
Task ThreadPool::getTask(){ return _taskQue.pop(); }
void ThreadPool::doTask(){ while(!_isExit){ Task ptask = getTask(); if(ptask){ ptask(); }else{ cout << "ptask is nullptr" << endl; } } }
|
Test.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| #include <iostream> #include "ThreadPool.hpp" #include "Task.hpp" #include <unistd.h> #include <time.h> #include <memory> #include <functional>
void test(){
MyTask tk; ThreadPool tp(4,10); tp.start(); for(int i = 0; i < 20; ++i){ tp.addTask(bind(&MyTask::process,&tk)); }
tp.stop(); }
int main() { test(); return 0; }
|