pthread同步简单实例

pthread同步实例:线程池实现和producer_consumer问题解决

一.线程池的实现

在本实例中,我们创建并管理一个线程池,其synchronization实现是用pthread和POSIX.用户可以用pool_init()来初始化该线程池,并调用pool_submit(void(*somefunction)(void *p), void *p)向其中提交任务。在最后用pool_shutdown()将线程池关闭

1.sem_信号量框架

POSIX下定义的信号量定义在头文件<semaphore.h>中。在本项目中我都使用了未命名的信号量,其使用方法为:

1
2
3
4
5
6
7
8
sem_t sem;//声明一个信号量
sem_init(&sem,0,1);//信号量初始化,其中0表示该信号量只能在对应的线程中使用,若使用一个非0的值我们就可以在不同的线程中共用该信号量了。

sem_wait(&sem);//等待直到该信号量的值大于0,即有空余信号可以使用
/*危险区*/
sem_post(&sem);释放该信号量


2.mutex互斥锁框架

pthread使用pthread_mutex_t 数据类型来定义互斥锁。其使用方法为:

1
2
3
4
5
pthread_mutex_t mutex;//定义一个互斥锁
pthread_mutex_init(&mutex,NULL);//初始化,其中NULL表示使用默认属性初始化
pthread_mutex_lock(&mutex);//加锁
/*危险区*/
pthread_mutex_unlock(&mutex);//解锁

3.线程池框架

该说不说,这个线程池的框架还是比较巧妙的。
定义任务类:

1
2
3
4
5
6
typedef struct 
{
void (*function)(void *p);
void *data;
}
task;

基于该任务类定义了一个任务队列,一个线程队列,一个指示队列,用来指示线程池中线程的使用情况。
pool_submit()函数将新的任务提交到任务队列中,同时查找是否有可用的线程,如果有,那么创建线程,新建的线程的任务函数是worker(),该函数从任务队列中取出队首的元素,执行其任务类的任务。
当然,相应的enqueue()和dequeue()函数就用来实现往任务队列中的插入和删除。这里我们还是用数组来存因为玩数组玩的比较熟。但是我们数组的最大的容量如果不够,将阻止该插入,并且报错。

4.代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
/**
* Implementation of thread pool.
*/

#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <semaphore.h>
#include <unistd.h>
#include "threadpool.h"

#define QUEUE_SIZE 10
#define NUMBER_OF_THREADS 3

#define True 1
#define False 0

// this represents work that has to be
// completed by a thread in the pool
typedef struct
{
void (*function)(void *p);
void *data;
}
task;

task taskqueue[QUEUE_SIZE];
int cuttasknum=0;
int lenth()
{
return cuttasknum;
}

pthread_mutex_t qlock;
sem_t tsem;
// the work queue
task worktodo;

// the worker bee
pthread_t bee;

// insert a task into the queue
//TODO
// returns 0 if successful or 1 otherwise,
int enqueue(task t)
{
pthread_mutex_lock(&qlock);
if(lenth()>=QUEUE_SIZE)
{
printf("fail to enqueue\n");
return 1;
}

taskqueue[cuttasknum]=t;
++cuttasknum;
pthread_mutex_unlock(&qlock);
return 0;
}

// remove a task from the queue
//TODO
task dequeue()
{
pthread_mutex_lock(&qlock);
if(lenth()<0)
{
printf("oops,an error happens during dequeue\n");
worktodo.data=worktodo.function=NULL;
}

worktodo=taskqueue[0];
for(int i = 1;i<lenth();i++)
{
taskqueue[i-1]=taskqueue[i];
}
--cuttasknum;
pthread_mutex_unlock(&qlock);
return worktodo;
}

// the worker thread in the thread pool
int state[NUMBER_OF_THREADS];
pthread_t mythread[NUMBER_OF_THREADS];
// this function is the function called by the working thread in pthread_create()
void *worker(void *param)
{
int *a=(int *)param;
int index=*a;
//state[index]=False;//busy
worktodo=dequeue();
// execute the task
if(worktodo.data!=NULL)
{
execute(worktodo.function, worktodo.data);
state[index]=True;//availiable
}
sem_post(&tsem);
pthread_exit(0);
}

/**
* Executes the task provided to the thread pool
*/
void execute(void (*somefunction)(void *p), void *p)
{
(*somefunction)(p);
}

/**
* Submits work to the pool.
*/
int pool_submit(void (*somefunction)(void *p), void *p)
{
//sleep(1);
sem_wait(&tsem);
worktodo.function = somefunction;
worktodo.data = p;
if(enqueue(worktodo)!=0)
{
printf("nani?\n");
return 1;
}

for(int i=0;i<NUMBER_OF_THREADS;i++)
{
if(state[i]==True)
{
state[i]=False;//set to busy
pthread_create(&mythread[i],NULL,&worker,&i);
break;
}
}

return 0;
}

// initialize the thread pool
void pool_init(void)
{
cuttasknum=0;
for(int i=0;i<NUMBER_OF_THREADS;i++)
{
state[i]=True;
}
sem_init(&tsem,0,NUMBER_OF_THREADS);
pthread_mutex_init(&qlock,NULL);
}

// shutdown the thread pool
void pool_shutdown(void)
{
for(int i=0;i<NUMBER_OF_THREADS;i++)
pthread_join(mythread[i],NULL);
pthread_mutex_destroy(&qlock);
sem_destroy(&tsem);
}

5.测试

基于示例代码,我们又往里面填了三个任务,使得任务总量能够大于线程池提供的线程数目以测试我们的代码:

1
2
3
4
5
6
7
8
9
10
11
12
work2.a=6;
work2.b=11;
work3.a=7;
work3.b=12;
work4.a=8;
work4.b=13;
pool_submit(&add,&work2);

pool_submit(&add,&work3);

pool_submit(&add,&work4);

测试结果如图所示:

二.producer_consumer问题解决

1.producer和consumer实现

同步领域的经典问题,在这个项目中我们创建了不少的consumer和producer,他们要么安静的睡觉,要么往缓冲区里存东西/写东西。
存东西的insert_item()函数就是往一个线性表中存值,我使用了一个cutindex下表指针来指示下一个元素应该被插入的位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int insert_item(buffer_item item) 
{
if(cutindex>=BUFFER_SIZE)
{
printf("oops!insert failed\n");
return -1;
}
pthread_mutex_lock(&mutex);
buffer[cutindex]=item;
++cutindex;
pthread_mutex_unlock(&mutex);

return 0;
/* insert item into buffer
return 0 if successful, otherwise
return -1 indicating an error condition */
}

其中修改buffer的行为属于危险行为,所以给他加了一把互斥锁。
如果consumer只消耗最后一个元素就太逊了,我们让他随机消耗一个元素:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int remove_item(buffer_item *item) {
if(cutindex<0)
{
printf("oops,fail to insert\n");
return -1;
}
pthread_mutex_lock(&mutex);
int index=(int)(rand()%cutindex);//要消耗的元素的位置
(*item)=buffer[index];
for(int i=index+1;i<cutindex-1;i++)
{
buffer[i]=buffer[i+1];
}
buffer[cutindex-1]=NOITEM;
--cutindex;
pthread_mutex_unlock(&mutex);
return 0;
/* remove an object from buffer
placing it in item
return 0 if successful, otherwise
return -1 indicating an error condition */
}

这样一来我们的producer和consumer只需要在睡醒的时候在自己的任务函数里面调用以上两个函数就可以实现插入和删除了。注意调用的时候使用了两个信号量empty和full,其具体用法见恐龙书7.1节。其次这应该是一个死循环,由main函数觉得应该终止的时候加以停止和回收线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void *producer(void *param) {
buffer_item item;
while (true)
/* sleep for a random period of time */
{
int sleep_period = rand()%4;
sleep(sleep_period);
/* generate a random number */
sem_wait(&empty);
item = rand();
if (insert_item(item))
printf("report error condition");
else
printf("producer produced %d\n",item);
sem_post(&full);
}
}
void *consumer(void *param) {
buffer_item item;
while (true)
/* sleep for a random period of time */
{
int sleep_period = rand()%4;
sleep(sleep_period);
sem_wait(&full);
if (remove_item(&item))
printf("report error condition");
else
printf("consumer consumed %d\n",item);
sem_post(&empty);
}
}

2.比较有意思的main函数

首先这个main函数在命令行里传了三个参数——要睡多久?有几个producer?有几个consumer?

1
2
3
4
5
int main(int argc, char *argv[]) {
int sleeptime = atoi(argv[1]);
int producernum = atoi(argv[2]);
int consumernum = atoi(argv[3]);//arg[0]是文件名,参数从1开始
……

然后在做完初始化之后(初始化包括创建线程和那些sem和mutex的初始化和变量定义那一趟),休眠一会,任由线程来自己玩。

1
sleep(sleeptime);

最后料理后事,注意要使用pthread_cancel()而不是pthread_join()来终止线程,不然停不下来。

3.代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#include<stdlib.h>
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>
#include <unistd.h>
#include "buffer.h"
#define NOITEM 0
#define true 1
#define false 0
/* the buffer */
sem_t empty,full;
pthread_mutex_t mutex;
buffer_item buffer[BUFFER_SIZE];
int cutindex=0;
int insert_item(buffer_item item)
{

if(cutindex>=BUFFER_SIZE)
{
printf("oops!insert failed\n");
return -1;
}
pthread_mutex_lock(&mutex);
buffer[cutindex]=item;
++cutindex;
pthread_mutex_unlock(&mutex);

return 0;

/* insert item into buffer
return 0 if successful, otherwise
return -1 indicating an error condition */


}
int remove_item(buffer_item *item) {
if(cutindex<0)
{
printf("oops,fail to insert\n");
return -1;
}
pthread_mutex_lock(&mutex);
int index=(int)(rand()%cutindex);
(*item)=buffer[index];
for(int i=index+1;i<cutindex-1;i++)
{
buffer[i]=buffer[i+1];
}
buffer[cutindex-1]=NOITEM;
--cutindex;
pthread_mutex_unlock(&mutex);
return 0;
/* remove an object from buffer
placing it in item
return 0 if successful, otherwise
return -1 indicating an error condition */
}

void *producer(void *param) {
buffer_item item;
while (true)
/* sleep for a random period of time */
{
int sleep_period = rand()%4;
sleep(sleep_period);
/* generate a random number */
sem_wait(&empty);
item = rand();
if (insert_item(item))
printf("report error condition");
else
printf("producer produced %d\n",item);
sem_post(&full);
}
}
void *consumer(void *param) {
buffer_item item;
while (true)
/* sleep for a random period of time */
{
int sleep_period = rand()%4;
sleep(sleep_period);
sem_wait(&full);
if (remove_item(&item))
printf("report error condition");
else
printf("consumer consumed %d\n",item);
sem_post(&empty);
}
}
int main(int argc, char *argv[]) {
int sleeptime = atoi(argv[1]);
int producernum = atoi(argv[2]);
int consumernum = atoi(argv[3]);
//initialization
pthread_mutex_init(&mutex,NULL);
sem_init(&empty,0,BUFFER_SIZE);
sem_init(&full,0,0);

pthread_t producer_threads[producernum];
pthread_t consumer_threads[consumernum];

for(int i=0;i<producernum;i++)
{
pthread_create(&producer_threads[i],NULL,producer,NULL);
}

for(int i=0;i<consumernum;i++)
{
pthread_create(&consumer_threads[i],NULL,consumer,NULL);
}
//sleep
sleep(sleeptime);
//exit
for(int i=0;i<producernum;i++)
{
//pthread_join(producer_threads[i],NULL);
pthread_cancel(producer_threads[i]);
}
for(int i=0;i<consumernum;i++)
{
// pthread_join(consumer_threads[i],NULL);
pthread_cancel(consumer_threads[i]);
}
sem_destroy(&empty);
sem_destroy(&full);
pthread_mutex_destroy(&mutex);
return 0;

/* 1. Get command line arguments argv[1],argv[2],argv[3] */
/* 2. Initialize buffer */
/* 3. Create producer thread(s) */
/* 4. Create consumer thread(s) */
/* 5. Sleep */
/* 6. Exit */
}


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!