c++线程池

使用面向对象的方式实现:

5baa8f26dcbaee6244be7b57a31279e1.png

代码实现:

Task.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
#ifndef TASK
#define TASK


class Task
{
public:
Task() {}
~Task() {}
virtual void process() = 0;

};
#endif

Task是一个纯虚函数,所以有任务来就继承Task,用多态方式来实现派生类任务的实现

TaskQueue.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#ifndef TASKQUEUE
#define TASKQUEUE

#include <queue>
#include <mutex>
#include <condition_variable>
class Task;
using namespace std;

class TaskQueue
{

using ElemType = Task *;
public:
TaskQueue(size_t queuesize);

void push(ElemType task);

ElemType pop();

bool isEmpty();

bool isFull();

void weakAll();

private:
size_t _queueSize;
queue<ElemType> _que;
mutex _mutex;
condition_variable _notEmpty;
condition_variable _notFull;
bool _flag;
};


#endif

TaskQueue.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <iostream>
#include "TaskQueue.hpp"

TaskQueue::TaskQueue(size_t queuesize)
:_queueSize(queuesize)
,_que()
,_mutex()
,_notEmpty()
,_notFull()
,_flag(false)
{}

void TaskQueue::push(ElemType task){
unique_lock<mutex> ul(_mutex);
while(isFull()){
_notFull.wait(ul);
}
_que.push(task);
_notEmpty.notify_one();
}

TaskQueue::ElemType TaskQueue::pop(){

unique_lock<mutex> ul(_mutex);
while(isEmpty()&&! _flag){
_notEmpty.wait(ul);
}
if(!_flag){
ElemType ret = _que.front();
_que.pop();
_notFull.notify_one();
return ret;
}
return nullptr;
}


bool TaskQueue::isEmpty(){
return _que.size() == 0;
}

bool TaskQueue::isFull(){
return _que.size() == _queueSize;
}

void TaskQueue::weakAll(){
_flag = true;
_notEmpty.notify_all();
}

ThreadPool.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#ifndef THREADPOOL
#define THREADPOOL
#include <iostream>
#include <thread>
#include <vector>
#include "TaskQueue.hpp"
using namespace std;


class ThreadPool
{
public:
ThreadPool(size_t threadNum, size_t queSize);
~ThreadPool();

void start();

void stop();

void addTask(Task * ptask);
private:
Task * getTask();

void doTask();

private:
size_t _threadNum;
vector<thread> _threads;
size_t _queSize;
TaskQueue _taskQue;
bool _isExit;
};



#endif

ThreadPool.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <iostream>
#include "ThreadPool.hpp"
#include "Task.hpp"
ThreadPool::ThreadPool(size_t threadNum, size_t queSize)
:_threadNum(threadNum)
,_queSize(queSize)
,_taskQue(queSize)
,_isExit(false)
{}

ThreadPool::~ThreadPool(){}

void ThreadPool::start(){
for(size_t i = 0; i < _threadNum; ++i){
_threads.push_back(thread(&ThreadPool::doTask,this));
}

}

void ThreadPool::stop(){
//只要任务没有执行完,就不能让主线程继续往下执行
while(!_taskQue.isEmpty()){
this_thread::sleep_for(chrono::seconds(1));
}
//如果主线程执行的比子线程慢,可能会在isExit还没被修改的时候已经进入for循环然后阻塞了,所以为了避免这种情况,
//有两种解决办法,1.降低子线程的执行速度,例如可以让每次执行完任务就sellp一下,但是显然这么做不太好
//可以在修改之后再把所有的线程唤醒一次,之后子线程就无法进入for循环了
_isExit = true;
_taskQue.weakAll();
for(auto & td:_threads){
td.join();
}


}

void ThreadPool::addTask(Task * ptask){
if(ptask){
_taskQue.push(ptask);
}
}


Task * ThreadPool::getTask(){
return _taskQue.pop();
}

void ThreadPool::doTask(){
while(!_isExit){
Task *ptask = getTask();
if(ptask){
ptask->process();
}else{
cout << "ptask is nullptr" << endl;
}
}
}

Test.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <iostream>
#include "ThreadPool.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <time.h>
#include <memory>

class MyTask
:public Task
{
public:
void process() override{
::srand(::clock());
int num = ::rand()%100;
cout << "<< Task process :"<< num << endl;
}
};

void test(){

unique_ptr<Task> spt(new MyTask());
ThreadPool tp(4,10);
tp.start();
for(int i = 0; i < 20; ++i){
tp.addTask(spt.get());
/* cout << "count = " << i << endl; */
}

tp.stop();
}


int main()
{
test();
return 0;
}

基于对象的实现

使用bind和function来不使用继承的方式来实现多态

5779c07095ccf57da93f5febc2ff403a.png

Task不再是一个纯虚函数了

1
2
3
4
5
6
7
8
9
10
11
12
13
#ifndef MYTASK
#define MYTASK


class MyTask
{
public:
MyTask() {}
~MyTask() {}
void process();

};
#endif

Task.cc

1
2
3
4
5
6
7
8
9
#include <iostream>
#include "Task.hpp"
using std::cout;
using std::endl;

void MyTask::process(){
cout << "i am process" << endl;
}

TaskQueue.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#ifndef TASKQUEUE
#define TASKQUEUE

#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>

using namespace std;
using Task = function<void()>;

class TaskQueue
{

using Task = function<void()>;
public:
TaskQueue(size_t queuesize);

void push(Task &&task);

Task pop();

bool isEmpty();

bool isFull();

void weakAll();

private:
size_t _queueSize;
queue<Task> _que;
mutex _mutex;
condition_variable _notEmpty;
condition_variable _notFull;
bool _flag;
};


#endif

TaskQueue.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <iostream>
#include "TaskQueue.hpp"

TaskQueue::TaskQueue(size_t queuesize)
:_queueSize(queuesize)
,_que()
,_mutex()
,_notEmpty()
,_notFull()
,_flag(false)
{}

void TaskQueue::push(Task && task){
unique_lock<mutex> ul(_mutex);
while(isFull()){
_notFull.wait(ul);
}
_que.push(move(task));
_notEmpty.notify_one();
}

TaskQueue::Task TaskQueue::pop(){

unique_lock<mutex> ul(_mutex);
while(isEmpty()&&! _flag){
_notEmpty.wait(ul);
}
if(!_flag){
Task ret = _que.front();
_que.pop();
_notFull.notify_one();
return ret;
}
return nullptr;
}


bool TaskQueue::isEmpty(){
return _que.size() == 0;
}

bool TaskQueue::isFull(){
return _que.size() == _queueSize;
}

void TaskQueue::weakAll(){
_flag = true;
_notEmpty.notify_all();
}

ThreadPool.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#ifndef THREADPOOL
#define THREADPOOL
#include <iostream>
#include <thread>
#include <vector>
#include <functional>
#include "TaskQueue.hpp"
using namespace std;


class ThreadPool
{
public:
ThreadPool(size_t threadNum, size_t queSize);
~ThreadPool();

void start();

void stop();

void addTask(Task && ptask);
private:
Task getTask();

void doTask();

private:
size_t _threadNum;
vector<thread> _threads;
size_t _queSize;
TaskQueue _taskQue;
bool _isExit;
};



#endif

ThreadPool.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <iostream>
#include "ThreadPool.hpp"
#include "Task.hpp"
ThreadPool::ThreadPool(size_t threadNum, size_t queSize)
:_threadNum(threadNum)
,_queSize(queSize)
,_taskQue(queSize)
,_isExit(false)
{}

ThreadPool::~ThreadPool(){}

void ThreadPool::start(){
for(size_t i = 0; i < _threadNum; ++i){
_threads.push_back(thread(&ThreadPool::doTask,this));
}

}

void ThreadPool::stop(){
//只要任务没有执行完,就不能让主线程继续往下执行
while(!_taskQue.isEmpty()){
this_thread::sleep_for(chrono::seconds(1));
}
//如果主线程执行的比子线程慢,可能会在isExit还没被修改的时候已经进入for循环然后阻塞了,所以为了避免这种情况,
//有两种解决办法,1.降低子线程的执行速度,例如可以让每次执行完任务就sellp一下,但是显然这么做不太好
//可以在修改之后再把所有的线程唤醒一次,之后子线程就无法进入for循环了
_isExit = true;
_taskQue.weakAll();
for(auto & td:_threads){
td.join();
}


}

void ThreadPool::addTask(Task && ptask){
if(ptask){
_taskQue.push(move(ptask));
}
}


Task ThreadPool::getTask(){
return _taskQue.pop();
}

void ThreadPool::doTask(){
while(!_isExit){
Task ptask = getTask();
if(ptask){
ptask();
}else{
cout << "ptask is nullptr" << endl;
}
}
}

Test.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <iostream>
#include "ThreadPool.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <time.h>
#include <memory>
#include <functional>


void test(){

MyTask tk;
ThreadPool tp(4,10);
tp.start();
for(int i = 0; i < 20; ++i){
/* cout << "count = " << i << endl; */
tp.addTask(bind(&MyTask::process,&tk));
}

tp.stop();
}


int main()
{
test();
return 0;
}