这是一个用户层线程池的示例代码
c#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#define THREAD_POOL_SIZE 4
#define QUEUE_SIZE 100
typedef struct {
void (*function)(void *);
void *argument;
} task_t;
typedef struct {
task_t tasks[QUEUE_SIZE];
int head;
int tail;
int count;
pthread_mutex_t lock;
pthread_cond_t notify;
} task_queue_t;
typedef struct {
pthread_t threads[THREAD_POOL_SIZE];
task_queue_t queue;
int shutdown;
} thread_pool_t;
void *worker_thread(void *arg) {
thread_pool_t *pool = (thread_pool_t *)arg;
while (1) {
pthread_mutex_lock(&(pool->queue.lock));
while (pool->queue.count == 0 && !pool->shutdown) {
pthread_cond_wait(&(pool->queue.notify), &(pool->queue.lock));
}
if (pool->shutdown) {
pthread_mutex_unlock(&(pool->queue.lock));
pthread_exit(NULL);
}
task_t task = pool->queue.tasks[pool->queue.head];
pool->queue.head = (pool->queue.head + 1) % QUEUE_SIZE;
pool->queue.count--;
pthread_mutex_unlock(&(pool->queue.lock));
(*(task.function))(task.argument);
}
return NULL;
}
int thread_pool_init(thread_pool_t *pool) {
pool->queue.head = 0;
pool->queue.tail = 0;
pool->queue.count = 0;
pool->shutdown = 0;
pthread_mutex_init(&(pool->queue.lock), NULL);
pthread_cond_init(&(pool->queue.notify), NULL);
for (int i = 0; i < THREAD_POOL_SIZE; i++) {
if (pthread_create(&(pool->threads[i]), NULL, worker_thread, (void *)pool) != 0) {
return -1;
}
}
return 0;
}
void thread_pool_submit(thread_pool_t *pool, void (*function)(void *), void *arg) {
pthread_mutex_lock(&(pool->queue.lock));
if (pool->queue.count == QUEUE_SIZE) {
pthread_mutex_unlock(&(pool->queue.lock));
return;
}
pool->queue.tasks[pool->queue.tail].function = function;
pool->queue.tasks[pool->queue.tail].argument = arg;
pool->queue.tail = (pool->queue.tail + 1) % QUEUE_SIZE;
pool->queue.count++;
pthread_cond_signal(&(pool->queue.notify));
pthread_mutex_unlock(&(pool->queue.lock));
}
void thread_pool_destroy(thread_pool_t *pool) {
pool->shutdown = 1;
pthread_mutex_lock(&(pool->queue.lock));
pthread_cond_broadcast(&(pool->queue.notify));
pthread_mutex_unlock(&(pool->queue.lock));
for (int i = 0; i < THREAD_POOL_SIZE; i++) {
pthread_join(pool->threads[i], NULL);
}
pthread_mutex_destroy(&(pool->queue.lock));
pthread_cond_destroy(&(pool->queue.notify));
}
// Example task function
void example_task(void *arg) {
int *num = (int *)arg;
printf("Task executed with argument: %d\n", *num);
sleep(1);
}
int main() {
thread_pool_t pool;
if (thread_pool_init(&pool) != 0) {
fprintf(stderr, "Failed to initialize thread pool\n");
return 1;
}
int task_args[10];
for (int i = 0; i < 10; i++) {
task_args[i] = i;
thread_pool_submit(&pool, example_task, &task_args[i]);
}
sleep(5); // Wait for tasks to complete
thread_pool_destroy(&pool);
return 0;
}
编译方式如下:
gcc -o thread_pool thread_pool.c -lpthread
以下是各个函数的功能
void *worker_thread(void *arg)作用:工作线程的主函数,负责从任务队列中取出任务并执行。
详细说明:
pthread_cond_wait 进入等待状态。shutdown 标志为 1),线程会退出。关键点:
pthread_cond_wait 等待任务。pthread_mutex_lock 和 pthread_mutex_unlock 保护对任务队列的访问。int thread_pool_init(thread_pool_t *pool)作用:初始化线程池。
详细说明:
head、tail、count)。pthread_mutex_init)和条件变量(pthread_cond_init)。pthread_create),并将它们绑定到 worker_thread 函数。关键点:
-1 表示初始化失败。0。void thread_pool_submit(thread_pool_t *pool, void (*function)(void *), void *arg)作用:向线程池提交任务。
详细说明:
pthread_cond_signal 唤醒一个等待的工作线程。关键点:
pthread_mutex_lock 和 pthread_mutex_unlock 保护对任务队列的访问。pthread_cond_signal 通知工作线程有新任务。void thread_pool_destroy(thread_pool_t *pool)作用:销毁线程池。
详细说明:
shutdown 标志为 1,通知所有工作线程退出。pthread_cond_broadcast 唤醒所有等待的工作线程。pthread_join)。关键点:
void example_task(void *arg)作用:示例任务函数,用于测试线程池。
详细说明:
关键点:
int main()作用:主函数,用于测试线程池。
详细说明:
thread_pool_init)。thread_pool_submit)。sleep(5))。thread_pool_destroy)。关键点:
| 函数名 | 作用 |
|---|---|
worker_thread | 工作线程的主函数,负责从任务队列中取出任务并执行。 |
thread_pool_init | 初始化线程池,创建线程并初始化任务队列、互斥锁和条件变量。 |
thread_pool_submit | 向线程池提交任务,唤醒一个工作线程处理任务。 |
thread_pool_destroy | 销毁线程池,通知所有线程退出并释放资源。 |
example_task | 示例任务函数,用于测试线程池。 |
main | 主函数,用于测试线程池的功能。 |
c#define THREAD_POOL_SIZE 4
#define QUEUE_SIZE 100
THREAD_POOL_SIZE:定义线程池中线程的数量,这里设置为 4。QUEUE_SIZE:定义任务队列的最大容量,这里设置为 100。task_t)ctypedef struct {
void (*function)(void *);
void *argument;
} task_t;
function:指向任务函数的指针,线程会执行这个函数。argument:任务函数的参数,传递给 function。void * 参数,支持任意类型的任务。task_queue_t)ctypedef struct {
task_t tasks[QUEUE_SIZE];
int head;
int tail;
int count;
pthread_mutex_t lock;
pthread_cond_t notify;
} task_queue_t;
tasks:固定大小的数组,用于存储任务。head:指向队列头部,用于取出任务。tail:指向队列尾部,用于插入任务。count:当前队列中的任务数量。lock:互斥锁,保护队列的并发访问。notify:条件变量,用于通知等待的线程有新任务到来。lock)和条件变量(notify)确保多线程环境下任务队列的安全访问。head、tail 和 count 管理任务的插入和取出。thread_pool_t)ctypedef struct {
pthread_t threads[THREAD_POOL_SIZE];
task_queue_t queue;
int shutdown;
} thread_pool_t;
threads:线程数组,存储线程池中的所有线程。queue:任务队列,用于存储待执行的任务。shutdown:线程池关闭标志,用于优雅地关闭线程池。threads 数组管理线程池中的所有线程。queue 实现任务的调度和执行。shutdown 标志实现线程池的安全关闭,避免任务丢失或线程阻塞。任务封装:
task_t 封装任务函数和参数,方便任务的传递和执行。任务队列:
线程池:
资源控制:
优雅关闭:
shutdown 标志实现线程池的安全关闭,确保所有任务都能被处理。初始化线程池:
THREAD_POOL_SIZE 个线程,每个线程从任务队列中取出任务并执行。添加任务:
task_t,插入到任务队列中。执行任务:
关闭线程池:
shutdown 标志,等待所有线程退出。这种设计通过任务封装、环形队列、线程管理和资源控制,实现了一个高效、线程安全的线程池。它适用于需要并发处理大量任务的场景,例如网络服务器、数据处理等。
本文作者:Ryohei010
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!