NPS Pthread Pool
|
The NPS Pthread Pool implementation. More...
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <errno.h>
#include <sched.h>
#include <time.h>
#include <string.h>
#include <pthread.h>
#include "thr_pool.h"
Data Structures | |
struct | job |
Linked list of queued jobs. More... | |
struct | thr_pool |
State for a Thread Pool. More... | |
Defines | |
#define | DEBUG_THR_POOL 0 |
debug level | |
#define | dbugf(a, s) |
Prints to stderr "TID: file:line <string>\n". | |
#define | dbugdf(a, s, d) |
Prints to stderr "TID: file:line <string> <integer>\n". | |
#define | dbugg(a, f,...) |
Prints to stderr "TID: file:line <format string>\n". | |
#define | THR_POOL_COPY_MODE 0x01 |
flag: job input is copied | |
#define | THR_POOL_DESTROY_REQUEST 0x01 |
destroying pool | |
#define | THR_POOL_WAKE_WORKER 0x02 |
idle worker has been signaled | |
#define | THR_POOL_WORKER_WOKE 0x04 |
ack from worker that woke | |
#define | THR_POOL_INITIALIZED (THR_POOL_LAST - 1) |
The "initialized" module state. | |
#define | THR_POOL_LOCK_ERROR (THR_POOL_LAST - 2) |
Internal error: a lock-related error. | |
#define | THR_POOL_LOCK_POOL_ERROR (THR_POOL_LAST - 3) |
Internal error: a lock-related error that affects the pool. | |
#define | THR_POOL_PTHREAD_ERROR (THR_POOL_LAST - 4) |
Internal error: a pthread-related error. | |
#define | THR_POOL_PTHREAD_POOL_ERROR (THR_POOL_LAST - 5) |
Internal error: a pthread-related error that affects the pool. | |
#define | THR_POOL_MEM_ERROR (THR_POOL_LAST - 6) |
Internal error: a memory-related error. | |
#define | THR_POOL_MEM_POOL_ERROR (THR_POOL_LAST - 7) |
Internal error: a memory-related error that affects the pool. | |
Typedefs | |
typedef struct job | job_t |
Simplifies defining linked lists. | |
typedef struct thr_pool | thr_pool_t |
Simplifies defining lists of pools. | |
typedef void(* | push_fn_t )(void *) |
Simplifies casting function pointers. | |
Functions | |
static int | thrint_create_worker (thr_pool_t *pool) |
static void * | thrint_worker_function (void *arg) |
static int | thrint_error_check (int error) |
static int | thrint_pool_destroy (thr_pool_t *pool) |
static void | thrint_job_free_handler (job_t *job) |
static void | thrint_job_cleanup_handler (thr_pool_t *pool) |
static void | thrint_worker_cleanup_handler (thr_pool_t *pool) |
static void | thrint_lock_all (void) |
static void | thrint_unlock_all (void) |
static void | thrint_fork_child_handler (void) |
int | thr_pool_create (const unsigned int pool_min, const unsigned int pool_max, const unsigned int timeout, const pthread_attr_t *attr, thr_pool_handle_t *pool_handle) |
int | thr_pool_queue (const thr_pool_handle_t pool_handle, const int mode, const size_t len, void *(*func)(void *), void *arg) |
int | thr_pool_free (void) |
int | thr_pool_destroy (const thr_pool_handle_t pool_handle) |
int | thr_pool_init (void) |
Variables | |
static int | thr_pools_state = THR_POOL_NOT_INITIALIZED |
Module state. One of: THR_POOL_INITIALIZED, THR_POOL_NOT_INITIALIZED, THR_POOL_UNRECOV_ERROR. | |
static int | thr_pools_atfork = 0 |
Module state: remembers if the atfork handler was installed. | |
static thr_pool_t * | thr_pools = NULL |
List of pools. | |
static thr_pool_handle_t | next_pool_handle = -1 |
Next valid handle. | |
static sigset_t | fillset |
A signal set filled with all signals. | |
static pthread_rwlock_t | thr_pools_lock = PTHREAD_RWLOCK_INITIALIZER |
Module lock. Protects: job::next pointers, thr_pools, thr_pools_state, next_pool_handle. |
The NPS Pthread Pool implementation.
#define dbugdf | ( | a, | |
s, | |||
d | |||
) |
do { \ if (a) { \ fprintf(stderr, "%5ld: ", pthread_self()); \ fprintf(stderr, "%s:%d ", __FUNCTION__, __LINE__); \ fprintf(stderr, "%s %d\n", s, d); \ fflush(stderr); \ } \ } while(0)
Prints to stderr "TID: file:line <string> <integer>\n".
#define dbugf | ( | a, | |
s | |||
) |
do { \ if (a) { \ fprintf(stderr, "%5ld: ", pthread_self()); \ fprintf(stderr, "%s:%d ", __FUNCTION__, __LINE__); \ fprintf(stderr, "%s\n", s); \ fflush(stderr); \ } \ } while(0)
Prints to stderr "TID: file:line <string>\n".
#define dbugg | ( | a, | |
f, | |||
... | |||
) |
do { \ if (a) { \ fprintf(stderr, "%5ld: ", pthread_self()); \ fprintf(stderr, "%s:%d ", __FUNCTION__, __LINE__); \ fprintf(stderr, f, __VA_ARGS__); \ fprintf(stderr, "\n"); \ fflush(stderr); \ } \ } while(0)
Prints to stderr "TID: file:line <format string>\n".
#define DEBUG_THR_POOL 0 |
debug level
#define THR_POOL_COPY_MODE 0x01 |
flag: job input is copied
#define THR_POOL_DESTROY_REQUEST 0x01 |
destroying pool
#define THR_POOL_INITIALIZED (THR_POOL_LAST - 1) |
The "initialized" module state.
#define THR_POOL_LOCK_ERROR (THR_POOL_LAST - 2) |
Internal error: a lock-related error.
#define THR_POOL_LOCK_POOL_ERROR (THR_POOL_LAST - 3) |
Internal error: a lock-related error that affects the pool.
#define THR_POOL_MEM_ERROR (THR_POOL_LAST - 6) |
Internal error: a memory-related error.
#define THR_POOL_MEM_POOL_ERROR (THR_POOL_LAST - 7) |
Internal error: a memory-related error that affects the pool.
#define THR_POOL_PTHREAD_ERROR (THR_POOL_LAST - 4) |
Internal error: a pthread-related error.
#define THR_POOL_PTHREAD_POOL_ERROR (THR_POOL_LAST - 5) |
Internal error: a pthread-related error that affects the pool.
#define THR_POOL_WAKE_WORKER 0x02 |
idle worker has been signaled
#define THR_POOL_WORKER_WOKE 0x04 |
ack from worker that woke
typedef void(* push_fn_t)(void *) |
Simplifies casting function pointers.
typedef struct thr_pool thr_pool_t |
Simplifies defining lists of pools.
int thr_pool_create | ( | const unsigned int | pool_min, |
const unsigned int | pool_max, | ||
const unsigned int | timeout, | ||
const pthread_attr_t * | attr, | ||
thr_pool_handle_t * | pool_handle | ||
) |
Create and initialize the thread pool.
[in] | pool_min | The minimum number of threads in the pool. If no jobs are present for the pool, these threads are present, but idle. |
[in] | pool_max | The maximum number of active threads allowed in the queue; it should be the case that pool_max should be non-zero, no less than pool_min, and no greater than THR_POOL_MAX_THREADS. |
[in] | timeout | A value, interpreted as seconds, at most THR_POOL_MAX_TIMEOUT; if the number of active threads in the thread pool is in excess of pool_min, any thread that becomes idle will wait for timeout seconds. If no tasks are present in the queue or entered in the queue during this time, the thread will be removed from the pool and the system resources for the thread will be reclaimed; |
[in] | attr | Pointer to the POSIX thread attribute used to specify the attributes of any thread created in the thread pool. If NULL, the default attributes of pthread_create() are used. |
[out] | pool_handle | A handle for the thread pool. |
int thr_pool_destroy | ( | const thr_pool_handle_t | pool_handle | ) |
Cancels all queued jobs, kills all active threads, and frees all internal module resources related to the thread.
[in] | pool_handle | The handle for the thread pool. |
int thr_pool_free | ( | void | ) |
Cancels all queued jobs, kills all active threads, and frees all internal module resources. All thread pools being managed are destroyed.
int thr_pool_init | ( | void | ) |
Initialize the thread management module data.
int thr_pool_queue | ( | const thr_pool_handle_t | pool_handle, |
const int | mode, | ||
const size_t | len, | ||
void *(*)(void *) | func, | ||
void * | arg | ||
) |
If there are idle threads in the pool, awaken one to perform the job. If there are no idle threads but the maximum number of threads in the pool has not been reached, create a new thread to perform the job. If no idle threads exist and the pool already has the maximum number of threads, then add the job to the queue and return success; some existing worker thread will perform the job when it becomes idle. If mode is 1, the len bytes at arg will be copied locally, and the job (when executed) will use the local copy of arg.
[in] | pool_handle | The handle for the thread pool. |
[in] | mode | Can take any value. If 1, it indicates arg should be (shallow) copied. This is useful in many scenarios, e.g. when arg is a pointer to something on the caller's stack. If arguments are not copied, the caller must ensure arg persists until a thread is scheduled for the job, and throughout the duration of that job. |
[in] | len | The length of arg. |
[in] | func | A pointer to the job function. |
[in,out] | arg | A pointer to the arguments for the job function. If argument is copied when enqueued, it cannot be used as an output argument. |
static int thrint_create_worker | ( | thr_pool_t * | pool | ) | [static] |
Creates a worker thread.
[in] | pool | A pointer to our pool |
static int thrint_error_check | ( | int | error | ) | [static] |
Some modue errors require actions to be taken on their behalf, such as changing the module state to the URECOVERABLE_ERROR_STATE. This function handles those conditions, and translates the internal errors into sensible, external errors that may be returned to the caller.
[in] | error | An error code |
static void thrint_fork_child_handler | ( | void | ) | [static] |
A post-fork handler for the child. Puts the child's copy of the module into a sensible state.
static void thrint_job_cleanup_handler | ( | thr_pool_t * | pool | ) | [static] |
A cleanup handler, executed by each worker thread upon completing a job. Takes the calling thread out of the list of active threads.
[in] | pool | A pointer to our pool. |
static void thrint_job_free_handler | ( | job_t * | job | ) | [static] |
A cleanup handler, executed by each worker thread upon completing a job. Frees the resources associated with the job.
[in] | job | A pointer to our job. |
static void thrint_lock_all | ( | void | ) | [static] |
A pre-fork handler. Holds all locks, so they don't enter a bad state during a fork.
static int thrint_pool_destroy | ( | thr_pool_t * | pool | ) | [static] |
Cancels all queued jobs, kills all active threads, and frees all internal module resources related to the thread.
[in] | pool | The handle for the thread pool. |
static void thrint_unlock_all | ( | void | ) | [static] |
A post-fork handler for the parent. Returns all locks.
static void thrint_worker_cleanup_handler | ( | thr_pool_t * | pool | ) | [static] |
A cleanup handler, executed by each worker thread upon exiting. Takes the calling thread out of the thread pool. If the caller exited prematurely, it creates a new worker to replace it. If the caller exited as a response to the pool being destroyed, it signals anyone waiting (ie, anyone who called thr_pool_destroy()) in the case the pool is now empty.
[in] | pool | A pointer to our pool. |
static void * thrint_worker_function | ( | void * | arg | ) | [static] |
The main loop for a worker thread.
[in] | arg | A pointer to our pool. |
sigset_t fillset [static] |
A signal set filled with all signals.
thr_pool_handle_t next_pool_handle = -1 [static] |
Next valid handle.
thr_pool_t* thr_pools = NULL [static] |
List of pools.
int thr_pools_atfork = 0 [static] |
Module state: remembers if the atfork handler was installed.
pthread_rwlock_t thr_pools_lock = PTHREAD_RWLOCK_INITIALIZER [static] |
Module lock. Protects: job::next pointers, thr_pools, thr_pools_state, next_pool_handle.
int thr_pools_state = THR_POOL_NOT_INITIALIZED [static] |
Module state. One of: THR_POOL_INITIALIZED, THR_POOL_NOT_INITIALIZED, THR_POOL_UNRECOV_ERROR.