1 #include2 #include 3 #include 4 #include 5 #include 6 7 typedef struct CThread_worker 8 { 9 void *(*process)(void *arg); 10 void *arg; 11 struct CThread_worker *next; 12 }CThread_worker; 13 14 15 typedef struct 16 { 17 pthread_mutex_t queue_head; 18 pthread_cond_t queue_ready; 19 20 struct CThread_worker *queue_worker; 21 pthread_t *pthread_id; 22 23 int max_task_num; 24 int cur_task_num; 25 int shutdown; 26 27 }CThread_pool; 28 29 static CThread_pool *pool = NULL; 30 31 void pool_init(int); 32 void add_task_to_pool(void *(*)(void *), void *); 33 void *pthread_fun(void *); 34 void pool_destroy(); 35 void *my_process(void *); 36 37 int main(int argc, char *argv[]) 38 { 39 int max_task_num = 0; 40 int i = 0; 41 max_task_num = 5; 42 43 pool_init(max_task_num); 44 45 int *worker_num; 46 worker_num = (int *)malloc(10 * sizeof(int)); 47 for (i=0; i<10; i++) 48 { 49 worker_num[i] = i; 50 add_task_to_pool(my_process, &worker_num[i]); 51 } 52 53 sleep(12); 54 pool_destroy(); 55 56 free(worker_num); 57 worker_num = NULL; 58 59 return 0; 60 } 61 62 63 void pool_init(int num) 64 { 65 int i = 0; 66 pool = (CThread_pool *)malloc(sizeof(CThread_pool)); 67 68 pthread_mutex_init(&(pool->queue_head), NULL); 69 pthread_cond_init(&(pool->queue_ready), NULL); 70 71 pool->queue_worker = NULL; 72 pool->max_task_num = num; 73 pool->cur_task_num = 0; 74 pool->shutdown = 0; 75 pool->pthread_id = (pthread_t *)malloc(num * sizeof(pthread_t)); 76 77 for (i=0; i pthread_id[i]), NULL, pthread_fun, NULL); 80 } 81 82 } 83 84 void *pthread_fun(void *arg) 85 { 86 CThread_worker *worker = NULL; 87 88 printf("pthread %u is starting\n", (unsigned int)pthread_self()); 89 while (1) 90 { 91 pthread_mutex_lock(&(pool->queue_head)); 92 while (pool->cur_task_num == 0 && !pool->shutdown) 93 { 94 printf("pthread %u is waiting task...\n\n", (unsigned int)(pthread_self())); 95 pthread_cond_wait(&(pool->queue_ready), &(pool->queue_head)); 96 } 97 if (pool->shutdown) 98 { 99 /*线程退出之前,必须解锁,以让其他的线程得以访问该共享资源.*/100 pthread_mutex_unlock(&(pool->queue_head));101 printf("pthread %u is exiting\n", (unsigned int)pthread_self());102 pthread_exit(NULL);103 }104 105 pool->cur_task_num--;106 107 worker = pool->queue_worker;108 pool->queue_worker = worker->next;109 /*110 while (worker->next != NULL)111 {112 worker = worker->next;113 } 114 */ 115 pthread_mutex_unlock(&(pool->queue_head));116 117 (*(worker->process))(worker->arg);118 }119 pthread_exit(NULL);120 }121 122 void add_task_to_pool(void *(*my_process)(void *), void *arg)123 {124 CThread_worker *worker = NULL;125 CThread_worker *p = NULL;126 127 worker = (CThread_worker *)malloc(sizeof(CThread_worker));128 worker->process = my_process;129 worker->arg = arg;130 worker->next = NULL;131 132 pthread_mutex_lock(&(pool->queue_head));133 134 p = pool->queue_worker;135 if ( p == NULL)136 {137 pool->queue_worker = worker;138 }139 else140 {141 while (p->next != NULL)142 {143 p = p->next;144 }145 p->next = worker;146 }147 148 pool->cur_task_num++;149 150 pthread_mutex_unlock(&(pool->queue_head));151 152 pthread_cond_signal(&(pool->queue_ready));153 154 }155 156 157 void pool_destroy()158 {159 int i = 0;160 CThread_worker *p = NULL;161 162 pool->shutdown = 1;163 /*唤醒等待该条件的所有线程.否则线程将处于阻塞状态,等待条件满足.*/164 pthread_cond_broadcast(&(pool->queue_ready)); 165 /*阻塞等待线程退出,否则成为僵尸线程.*/166 for (i=0; i max_task_num; ++i)167 {168 pthread_join(pool->pthread_id[i], NULL);169 }170 free (pool->pthread_id);171 172 while (pool->queue_worker != NULL)173 {174 p = pool->queue_worker;175 pool->queue_worker = p->next;176 free(p);177 }178 /*信号量和条件变量需要销毁.*/ 179 pthread_mutex_destroy(&(pool->queue_head));180 pthread_cond_destroy(&(pool->queue_ready));181 182 free(pool);183 /*为避免pool成为野指针,将其赋值为空.*/184 pool = NULL; 185 }186 187 void *my_process(void *task_id)188 {189 printf("thread %u is doing task %d\n",(unsigned int)pthread_self(), *(int *)task_id);190 sleep(1);191 }