// this represents work that has to be // completed by a thread in the pool typedefstruct { void (*function)(void *p); void *data; } task;
task taskqueue[QUEUE_SIZE]; int cuttasknum=0; intlenth() { return cuttasknum; }
pthread_mutex_t qlock; sem_t tsem; // the work queue task worktodo;
// the worker bee pthread_t bee;
// insert a task into the queue //TODO // returns 0 if successful or 1 otherwise, intenqueue(task t) { pthread_mutex_lock(&qlock); if(lenth()>=QUEUE_SIZE) { printf("fail to enqueue\n"); return1; }
// the worker thread in the thread pool int state[NUMBER_OF_THREADS]; pthread_t mythread[NUMBER_OF_THREADS]; // this function is the function called by the working thread in pthread_create() void *worker(void *param) { int *a=(int *)param; int index=*a; //state[index]=False;//busy worktodo=dequeue(); // execute the task if(worktodo.data!=NULL) { execute(worktodo.function, worktodo.data); state[index]=True;//availiable } sem_post(&tsem); pthread_exit(0); }
/** * Executes the task provided to the thread pool */ voidexecute(void (*somefunction)(void *p), void *p) { (*somefunction)(p); }
/** * Submits work to the pool. */ intpool_submit(void (*somefunction)(void *p), void *p) { //sleep(1); sem_wait(&tsem); worktodo.function = somefunction; worktodo.data = p; if(enqueue(worktodo)!=0) { printf("nani?\n"); return1; }
void *producer(void *param){ buffer_item item; while (true) /* sleep for a random period of time */ { int sleep_period = rand()%4; sleep(sleep_period); /* generate a random number */ sem_wait(&empty); item = rand(); if (insert_item(item)) printf("report error condition"); else printf("producer produced %d\n",item); sem_post(&full); } } void *consumer(void *param){ buffer_item item; while (true) /* sleep for a random period of time */ { int sleep_period = rand()%4; sleep(sleep_period); sem_wait(&full); if (remove_item(&item)) printf("report error condition"); else printf("consumer consumed %d\n",item); sem_post(&empty); } }
/* insert item into buffer return 0 if successful, otherwise return -1 indicating an error condition */
} intremove_item(buffer_item *item){ if(cutindex<0) { printf("oops,fail to insert\n"); return-1; } pthread_mutex_lock(&mutex); int index=(int)(rand()%cutindex); (*item)=buffer[index]; for(int i=index+1;i<cutindex-1;i++) { buffer[i]=buffer[i+1]; } buffer[cutindex-1]=NOITEM; --cutindex; pthread_mutex_unlock(&mutex); return0; /* remove an object from buffer placing it in item return 0 if successful, otherwise return -1 indicating an error condition */ }
void *producer(void *param){ buffer_item item; while (true) /* sleep for a random period of time */ { int sleep_period = rand()%4; sleep(sleep_period); /* generate a random number */ sem_wait(&empty); item = rand(); if (insert_item(item)) printf("report error condition"); else printf("producer produced %d\n",item); sem_post(&full); } } void *consumer(void *param){ buffer_item item; while (true) /* sleep for a random period of time */ { int sleep_period = rand()%4; sleep(sleep_period); sem_wait(&full); if (remove_item(&item)) printf("report error condition"); else printf("consumer consumed %d\n",item); sem_post(&empty); } } intmain(int argc, char *argv[]){ int sleeptime = atoi(argv[1]); int producernum = atoi(argv[2]); int consumernum = atoi(argv[3]); //initialization pthread_mutex_init(&mutex,NULL); sem_init(&empty,0,BUFFER_SIZE); sem_init(&full,0,0);