编辑
2025-03-18
linux开发
0
请注意,本文编写于 297 天前,最后修改于 297 天前,其中某些信息可能已经过时。

目录

用户层线程池
函数功能梳理
1. void worker_thread(void arg)
2. int threadpoolinit(threadpoolt pool)**
3. void threadpoolsubmit(threadpoolt pool, void (function)(void ), void arg)
4. void threadpooldestroy(threadpoolt pool)**
5. void example_task(void arg)**
6. int main()
总结
结构体梳理
1. 宏定义
功能
设计原因
2. 任务结构体(task_t)
功能
设计原因
3. 任务队列结构体(taskqueuet)
功能
设计原因
4. 线程池结构体(threadpoolt)
功能
设计原因
整体设计思路
示例流程
总结

用户层线程池

这是一个用户层线程池的示例代码

c
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <unistd.h> #define THREAD_POOL_SIZE 4 #define QUEUE_SIZE 100 typedef struct { void (*function)(void *); void *argument; } task_t; typedef struct { task_t tasks[QUEUE_SIZE]; int head; int tail; int count; pthread_mutex_t lock; pthread_cond_t notify; } task_queue_t; typedef struct { pthread_t threads[THREAD_POOL_SIZE]; task_queue_t queue; int shutdown; } thread_pool_t; void *worker_thread(void *arg) { thread_pool_t *pool = (thread_pool_t *)arg; while (1) { pthread_mutex_lock(&(pool->queue.lock)); while (pool->queue.count == 0 && !pool->shutdown) { pthread_cond_wait(&(pool->queue.notify), &(pool->queue.lock)); } if (pool->shutdown) { pthread_mutex_unlock(&(pool->queue.lock)); pthread_exit(NULL); } task_t task = pool->queue.tasks[pool->queue.head]; pool->queue.head = (pool->queue.head + 1) % QUEUE_SIZE; pool->queue.count--; pthread_mutex_unlock(&(pool->queue.lock)); (*(task.function))(task.argument); } return NULL; } int thread_pool_init(thread_pool_t *pool) { pool->queue.head = 0; pool->queue.tail = 0; pool->queue.count = 0; pool->shutdown = 0; pthread_mutex_init(&(pool->queue.lock), NULL); pthread_cond_init(&(pool->queue.notify), NULL); for (int i = 0; i < THREAD_POOL_SIZE; i++) { if (pthread_create(&(pool->threads[i]), NULL, worker_thread, (void *)pool) != 0) { return -1; } } return 0; } void thread_pool_submit(thread_pool_t *pool, void (*function)(void *), void *arg) { pthread_mutex_lock(&(pool->queue.lock)); if (pool->queue.count == QUEUE_SIZE) { pthread_mutex_unlock(&(pool->queue.lock)); return; } pool->queue.tasks[pool->queue.tail].function = function; pool->queue.tasks[pool->queue.tail].argument = arg; pool->queue.tail = (pool->queue.tail + 1) % QUEUE_SIZE; pool->queue.count++; pthread_cond_signal(&(pool->queue.notify)); pthread_mutex_unlock(&(pool->queue.lock)); } void thread_pool_destroy(thread_pool_t *pool) { pool->shutdown = 1; pthread_mutex_lock(&(pool->queue.lock)); pthread_cond_broadcast(&(pool->queue.notify)); pthread_mutex_unlock(&(pool->queue.lock)); for (int i = 0; i < THREAD_POOL_SIZE; i++) { pthread_join(pool->threads[i], NULL); } pthread_mutex_destroy(&(pool->queue.lock)); pthread_cond_destroy(&(pool->queue.notify)); } // Example task function void example_task(void *arg) { int *num = (int *)arg; printf("Task executed with argument: %d\n", *num); sleep(1); } int main() { thread_pool_t pool; if (thread_pool_init(&pool) != 0) { fprintf(stderr, "Failed to initialize thread pool\n"); return 1; } int task_args[10]; for (int i = 0; i < 10; i++) { task_args[i] = i; thread_pool_submit(&pool, example_task, &task_args[i]); } sleep(5); // Wait for tasks to complete thread_pool_destroy(&pool); return 0; }

编译方式如下: gcc -o thread_pool thread_pool.c -lpthread

函数功能梳理

以下是各个函数的功能

1. void *worker_thread(void *arg)

作用:工作线程的主函数,负责从任务队列中取出任务并执行。

详细说明

  • 每个工作线程都会运行这个函数。
  • 它在一个无限循环中不断检查任务队列:
    • 如果任务队列为空且线程池未关闭,线程会调用 pthread_cond_wait 进入等待状态。
    • 如果任务队列中有任务,线程会取出任务并执行任务的函数。
  • 如果线程池被关闭(shutdown 标志为 1),线程会退出。

关键点

  • 使用 pthread_cond_wait 等待任务。
  • 使用 pthread_mutex_lockpthread_mutex_unlock 保护对任务队列的访问。

2. int thread_pool_init(thread_pool_t *pool)

作用:初始化线程池。

详细说明

  • 初始化任务队列的成员(headtailcount)。
  • 初始化互斥锁(pthread_mutex_init)和条件变量(pthread_cond_init)。
  • 创建指定数量的工作线程(pthread_create),并将它们绑定到 worker_thread 函数。

关键点

  • 如果线程创建失败,返回 -1 表示初始化失败。
  • 成功时返回 0

3. void thread_pool_submit(thread_pool_t *pool, void (*function)(void *), void *arg)

作用:向线程池提交任务。

详细说明

  • 将任务(函数指针和参数)添加到任务队列中。
  • 如果任务队列已满,直接返回(不添加任务)。
  • 添加任务后,调用 pthread_cond_signal 唤醒一个等待的工作线程。

关键点

  • 使用 pthread_mutex_lockpthread_mutex_unlock 保护对任务队列的访问。
  • 使用 pthread_cond_signal 通知工作线程有新任务。

4. void thread_pool_destroy(thread_pool_t *pool)

作用:销毁线程池。

详细说明

  • 设置 shutdown 标志为 1,通知所有工作线程退出。
  • 调用 pthread_cond_broadcast 唤醒所有等待的工作线程。
  • 等待所有工作线程退出(pthread_join)。
  • 销毁互斥锁和条件变量。

关键点

  • 确保所有线程安全退出。
  • 释放资源(互斥锁和条件变量)。

5. void example_task(void *arg)

作用:示例任务函数,用于测试线程池。

详细说明

  • 这是一个简单的任务函数,接收一个整数参数并打印它。
  • 用于演示如何向线程池提交任务。

关键点

  • 你可以根据需要实现自己的任务函数。

6. int main()

作用:主函数,用于测试线程池。

详细说明

  • 初始化线程池(thread_pool_init)。
  • 提交 10 个示例任务到线程池(thread_pool_submit)。
  • 等待任务执行完成(sleep(5))。
  • 销毁线程池(thread_pool_destroy)。

关键点

  • 演示了如何使用线程池。

总结

函数名作用
worker_thread工作线程的主函数,负责从任务队列中取出任务并执行。
thread_pool_init初始化线程池,创建线程并初始化任务队列、互斥锁和条件变量。
thread_pool_submit向线程池提交任务,唤醒一个工作线程处理任务。
thread_pool_destroy销毁线程池,通知所有线程退出并释放资源。
example_task示例任务函数,用于测试线程池。
main主函数,用于测试线程池的功能。

结构体梳理


1. 宏定义

c
#define THREAD_POOL_SIZE 4 #define QUEUE_SIZE 100

功能

  • THREAD_POOL_SIZE:定义线程池中线程的数量,这里设置为 4。
  • QUEUE_SIZE:定义任务队列的最大容量,这里设置为 100。

设计原因

  • 控制资源:通过宏定义限制线程数量和任务队列的大小,避免资源过度占用。
  • 灵活性:宏定义可以方便地调整线程池的规模,适应不同的应用场景。

2. 任务结构体(task_t

c
typedef struct { void (*function)(void *); void *argument; } task_t;

功能

  • function:指向任务函数的指针,线程会执行这个函数。
  • argument:任务函数的参数,传递给 function

设计原因

  • 封装任务:将任务函数和参数封装在一起,方便管理和传递。
  • 通用性:通过函数指针和 void * 参数,支持任意类型的任务。

3. 任务队列结构体(task_queue_t

c
typedef struct { task_t tasks[QUEUE_SIZE]; int head; int tail; int count; pthread_mutex_t lock; pthread_cond_t notify; } task_queue_t;

功能

  • tasks:固定大小的数组,用于存储任务。
  • head:指向队列头部,用于取出任务。
  • tail:指向队列尾部,用于插入任务。
  • count:当前队列中的任务数量。
  • lock:互斥锁,保护队列的并发访问。
  • notify:条件变量,用于通知等待的线程有新任务到来。

设计原因

  • 环形队列:使用固定大小的数组实现环形队列,避免动态内存分配的开销。
  • 线程安全:通过互斥锁(lock)和条件变量(notify)确保多线程环境下任务队列的安全访问。
  • 任务管理:通过 headtailcount 管理任务的插入和取出。

4. 线程池结构体(thread_pool_t

c
typedef struct { pthread_t threads[THREAD_POOL_SIZE]; task_queue_t queue; int shutdown; } thread_pool_t;

功能

  • threads:线程数组,存储线程池中的所有线程。
  • queue:任务队列,用于存储待执行的任务。
  • shutdown:线程池关闭标志,用于优雅地关闭线程池。

设计原因

  • 线程管理:通过 threads 数组管理线程池中的所有线程。
  • 任务调度:通过 queue 实现任务的调度和执行。
  • 优雅关闭:通过 shutdown 标志实现线程池的安全关闭,避免任务丢失或线程阻塞。

整体设计思路

  1. 任务封装

    • 使用 task_t 封装任务函数和参数,方便任务的传递和执行。
  2. 任务队列

    • 使用固定大小的数组实现环形队列,避免动态内存分配的开销。
    • 通过互斥锁和条件变量确保线程安全。
  3. 线程池

    • 使用固定数量的线程处理任务,避免频繁创建和销毁线程的开销。
    • 通过任务队列实现任务的调度和执行。
  4. 资源控制

    • 通过宏定义限制线程数量和任务队列的大小,避免资源过度占用。
  5. 优雅关闭

    • 通过 shutdown 标志实现线程池的安全关闭,确保所有任务都能被处理。

示例流程

  1. 初始化线程池

    • 创建 THREAD_POOL_SIZE 个线程,每个线程从任务队列中取出任务并执行。
  2. 添加任务

    • 将任务封装为 task_t,插入到任务队列中。
    • 如果队列已满,等待空闲位置。
  3. 执行任务

    • 线程从任务队列中取出任务,执行任务函数。
  4. 关闭线程池

    • 设置 shutdown 标志,等待所有线程退出。

总结

这种设计通过任务封装、环形队列、线程管理和资源控制,实现了一个高效、线程安全的线程池。它适用于需要并发处理大量任务的场景,例如网络服务器、数据处理等。

本文作者:Ryohei010

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!