NPS Pthread Pool
Data Structures | Defines | Typedefs | Functions | Variables
thr_pool.c File Reference

The NPS Pthread Pool implementation. More...

#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <errno.h>
#include <sched.h>
#include <time.h>
#include <string.h>
#include <pthread.h>
#include "thr_pool.h"
Include dependency graph for thr_pool.c:

Data Structures

struct  job
 Linked list of queued jobs. More...
struct  thr_pool
 State for a Thread Pool. More...

Defines

#define DEBUG_THR_POOL   0
 debug level
#define dbugf(a, s)
 Prints to stderr "TID: file:line <string>\n".
#define dbugdf(a, s, d)
 Prints to stderr "TID: file:line <string> <integer>\n".
#define dbugg(a, f,...)
 Prints to stderr "TID: file:line <format string>\n".
#define THR_POOL_COPY_MODE   0x01
 flag: job input is copied
#define THR_POOL_DESTROY_REQUEST   0x01
 destroying pool
#define THR_POOL_WAKE_WORKER   0x02
 idle worker has been signaled
#define THR_POOL_WORKER_WOKE   0x04
 ack from worker that woke
#define THR_POOL_INITIALIZED   (THR_POOL_LAST - 1)
 The "initialized" module state.
#define THR_POOL_LOCK_ERROR   (THR_POOL_LAST - 2)
 Internal error: a lock-related error.
#define THR_POOL_LOCK_POOL_ERROR   (THR_POOL_LAST - 3)
 Internal error: a lock-related error that affects the pool.
#define THR_POOL_PTHREAD_ERROR   (THR_POOL_LAST - 4)
 Internal error: a pthread-related error.
#define THR_POOL_PTHREAD_POOL_ERROR   (THR_POOL_LAST - 5)
 Internal error: a pthread-related error that affects the pool.
#define THR_POOL_MEM_ERROR   (THR_POOL_LAST - 6)
 Internal error: a memory-related error.
#define THR_POOL_MEM_POOL_ERROR   (THR_POOL_LAST - 7)
 Internal error: a memory-related error that affects the pool.

Typedefs

typedef struct job job_t
 Simplifies defining linked lists.
typedef struct thr_pool thr_pool_t
 Simplifies defining lists of pools.
typedef void(* push_fn_t )(void *)
 Simplifies casting function pointers.

Functions

static int thrint_create_worker (thr_pool_t *pool)
static void * thrint_worker_function (void *arg)
static int thrint_error_check (int error)
static int thrint_pool_destroy (thr_pool_t *pool)
static void thrint_job_free_handler (job_t *job)
static void thrint_job_cleanup_handler (thr_pool_t *pool)
static void thrint_worker_cleanup_handler (thr_pool_t *pool)
static void thrint_lock_all (void)
static void thrint_unlock_all (void)
static void thrint_fork_child_handler (void)
int thr_pool_create (const unsigned int pool_min, const unsigned int pool_max, const unsigned int timeout, const pthread_attr_t *attr, thr_pool_handle_t *pool_handle)
int thr_pool_queue (const thr_pool_handle_t pool_handle, const int mode, const size_t len, void *(*func)(void *), void *arg)
int thr_pool_free (void)
int thr_pool_destroy (const thr_pool_handle_t pool_handle)
int thr_pool_init (void)

Variables

static int thr_pools_state = THR_POOL_NOT_INITIALIZED
 Module state. One of: THR_POOL_INITIALIZED, THR_POOL_NOT_INITIALIZED, THR_POOL_UNRECOV_ERROR.
static int thr_pools_atfork = 0
 Module state: remembers if the atfork handler was installed.
static thr_pool_tthr_pools = NULL
 List of pools.
static thr_pool_handle_t next_pool_handle = -1
 Next valid handle.
static sigset_t fillset
 A signal set filled with all signals.
static pthread_rwlock_t thr_pools_lock = PTHREAD_RWLOCK_INITIALIZER
 Module lock. Protects: job::next pointers, thr_pools, thr_pools_state, next_pool_handle.

Detailed Description

The NPS Pthread Pool implementation.


Define Documentation

#define dbugdf (   a,
  s,
 
)
Value:
do { \
    if (a) { \
        fprintf(stderr, "%5ld: ", pthread_self()); \
        fprintf(stderr, "%s:%d ", __FUNCTION__, __LINE__); \
        fprintf(stderr, "%s %d\n", s, d); \
        fflush(stderr); \
    } \
} while(0)

Prints to stderr "TID: file:line <string> <integer>\n".

#define dbugf (   a,
 
)
Value:
do { \
    if (a) { \
        fprintf(stderr, "%5ld: ", pthread_self()); \
        fprintf(stderr, "%s:%d ", __FUNCTION__, __LINE__); \
        fprintf(stderr, "%s\n", s); \
        fflush(stderr); \
    } \
} while(0)

Prints to stderr "TID: file:line <string>\n".

#define dbugg (   a,
  f,
  ... 
)
Value:
do { \
    if (a) { \
        fprintf(stderr, "%5ld: ", pthread_self()); \
        fprintf(stderr, "%s:%d ", __FUNCTION__, __LINE__); \
        fprintf(stderr, f, __VA_ARGS__); \
        fprintf(stderr, "\n"); \
        fflush(stderr); \
    } \
} while(0)

Prints to stderr "TID: file:line <format string>\n".

#define DEBUG_THR_POOL   0

debug level

#define THR_POOL_COPY_MODE   0x01

flag: job input is copied

#define THR_POOL_DESTROY_REQUEST   0x01

destroying pool

#define THR_POOL_INITIALIZED   (THR_POOL_LAST - 1)

The "initialized" module state.

#define THR_POOL_LOCK_ERROR   (THR_POOL_LAST - 2)

Internal error: a lock-related error.

#define THR_POOL_LOCK_POOL_ERROR   (THR_POOL_LAST - 3)

Internal error: a lock-related error that affects the pool.

#define THR_POOL_MEM_ERROR   (THR_POOL_LAST - 6)

Internal error: a memory-related error.

#define THR_POOL_MEM_POOL_ERROR   (THR_POOL_LAST - 7)

Internal error: a memory-related error that affects the pool.

#define THR_POOL_PTHREAD_ERROR   (THR_POOL_LAST - 4)

Internal error: a pthread-related error.

#define THR_POOL_PTHREAD_POOL_ERROR   (THR_POOL_LAST - 5)

Internal error: a pthread-related error that affects the pool.

#define THR_POOL_WAKE_WORKER   0x02

idle worker has been signaled

#define THR_POOL_WORKER_WOKE   0x04

ack from worker that woke


Typedef Documentation

typedef struct job job_t

Simplifies defining linked lists.

typedef void(* push_fn_t)(void *)

Simplifies casting function pointers.

typedef struct thr_pool thr_pool_t

Simplifies defining lists of pools.


Function Documentation

int thr_pool_create ( const unsigned int  pool_min,
const unsigned int  pool_max,
const unsigned int  timeout,
const pthread_attr_t *  attr,
thr_pool_handle_t pool_handle 
)

Create and initialize the thread pool.

Parameters:
[in]pool_minThe minimum number of threads in the pool. If no jobs are present for the pool, these threads are present, but idle.
[in]pool_maxThe maximum number of active threads allowed in the queue; it should be the case that pool_max should be non-zero, no less than pool_min, and no greater than THR_POOL_MAX_THREADS.
[in]timeoutA value, interpreted as seconds, at most THR_POOL_MAX_TIMEOUT; if the number of active threads in the thread pool is in excess of pool_min, any thread that becomes idle will wait for timeout seconds. If no tasks are present in the queue or entered in the queue during this time, the thread will be removed from the pool and the system resources for the thread will be reclaimed;
[in]attrPointer to the POSIX thread attribute used to specify the attributes of any thread created in the thread pool. If NULL, the default attributes of pthread_create() are used.
[out]pool_handleA handle for the thread pool.
Returns:
An indicator of whether or not the operation was successful. On success: THR_POOL_SUCCESS is returned. On error: THR_POOL_BAD_INPUT, THR_POOL_RESOURCE_ERROR, THR_POOL_NOT_INITIALIZED, THR_POOL_ERROR, or THR_POOL_UNRECOV_ERROR is returned.

Here is the call graph for this function:

int thr_pool_destroy ( const thr_pool_handle_t  pool_handle)

Cancels all queued jobs, kills all active threads, and frees all internal module resources related to the thread.

Parameters:
[in]pool_handleThe handle for the thread pool.
Returns:
An indicator of whether or not the operation was successful On success: THR_POOL_SUCCESS is returned. On error: THR_POOL_INVALID_HANDLE, THR_POOL_NOT_INITIALIZED, THR_POOL_ERROR, or THR_POOL_UNRECOV_ERROR is returned.

Here is the call graph for this function:

int thr_pool_free ( void  )

Cancels all queued jobs, kills all active threads, and frees all internal module resources. All thread pools being managed are destroyed.

Returns:
An indicator of whether or not the operation was successful On success: THR_POOL_SUCCESS is returned. On error: THR_POOL_NOT_INITIALIZED, or THR_POOL_UNRECOV_ERROR is returned.

Here is the call graph for this function:

int thr_pool_init ( void  )

Initialize the thread management module data.

Returns:
An indicator of whether or not the operation was successful On success: THR_POOL_SUCCESS is returned. On error: THR_POOL_RESOURCE_ERROR, THR_POOL_ERROR, or THR_POOL_UNRECOV_ERROR is returned.

Here is the call graph for this function:

int thr_pool_queue ( const thr_pool_handle_t  pool_handle,
const int  mode,
const size_t  len,
void *(*)(void *)  func,
void *  arg 
)

If there are idle threads in the pool, awaken one to perform the job. If there are no idle threads but the maximum number of threads in the pool has not been reached, create a new thread to perform the job. If no idle threads exist and the pool already has the maximum number of threads, then add the job to the queue and return success; some existing worker thread will perform the job when it becomes idle. If mode is 1, the len bytes at arg will be copied locally, and the job (when executed) will use the local copy of arg.

Parameters:
[in]pool_handleThe handle for the thread pool.
[in]modeCan take any value. If 1, it indicates arg should be (shallow) copied. This is useful in many scenarios, e.g. when arg is a pointer to something on the caller's stack. If arguments are not copied, the caller must ensure arg persists until a thread is scheduled for the job, and throughout the duration of that job.
[in]lenThe length of arg.
[in]funcA pointer to the job function.
[in,out]argA pointer to the arguments for the job function. If argument is copied when enqueued, it cannot be used as an output argument.
Returns:
An indicator of whether or not the operation was successful On success: THR_POOL_SUCCESS is returned. On error: THR_POOL_BAD_INPUT, THR_POOL_RESOURCE_ERROR, THR_POOL_INVALID_HANDLE, THR_POOL_NOT_INITIALIZED, THR_POOL_ERROR, or THR_POOL_UNRECOV_ERROR is returned.

Here is the call graph for this function:

static int thrint_create_worker ( thr_pool_t pool) [static]

Creates a worker thread.

Precondition:
Caller must acquire lock to pool.
Parameters:
[in]poolA pointer to our pool
Returns:
An indicator of whether or not the operation was successful. On success: THR_POOL_SUCCESS is returned. On error: THR_POOL_BAD_INPUT, or THR_POOL_PTHREAD_POOL_ERROR is returned.

Here is the call graph for this function:

Here is the caller graph for this function:

static int thrint_error_check ( int  error) [static]

Some modue errors require actions to be taken on their behalf, such as changing the module state to the URECOVERABLE_ERROR_STATE. This function handles those conditions, and translates the internal errors into sensible, external errors that may be returned to the caller.

Precondition:
Unlike other internal functions, this function acquires the module lock; caller should not already hold a module lock.
Parameters:
[in]errorAn error code
Returns:
A constant that corresponds in meaning to error; if error is not a meaningful error, it is returned unchanged.

Here is the caller graph for this function:

static void thrint_fork_child_handler ( void  ) [static]

A post-fork handler for the child. Puts the child's copy of the module into a sensible state.

Returns:
None.

Here is the call graph for this function:

Here is the caller graph for this function:

static void thrint_job_cleanup_handler ( thr_pool_t pool) [static]

A cleanup handler, executed by each worker thread upon completing a job. Takes the calling thread out of the list of active threads.

Postcondition:
Leaves pool->lock acquired.
Parameters:
[in]poolA pointer to our pool.
Returns:
None.

Here is the call graph for this function:

Here is the caller graph for this function:

static void thrint_job_free_handler ( job_t job) [static]

A cleanup handler, executed by each worker thread upon completing a job. Frees the resources associated with the job.

Parameters:
[in]jobA pointer to our job.
Returns:
None

Here is the caller graph for this function:

static void thrint_lock_all ( void  ) [static]

A pre-fork handler. Holds all locks, so they don't enter a bad state during a fork.

Returns:
None.

Here is the call graph for this function:

Here is the caller graph for this function:

static int thrint_pool_destroy ( thr_pool_t pool) [static]

Cancels all queued jobs, kills all active threads, and frees all internal module resources related to the thread.

Parameters:
[in]poolThe handle for the thread pool.
Returns:
An indicator of whether or not the operation was successful On success: THR_POOL_SUCCESS is returned. On error: THR_POOL_LOCK_POOL_ERROR or THR_POOL_PTHREAD_POOL_ERROR is returned.

Here is the call graph for this function:

Here is the caller graph for this function:

static void thrint_unlock_all ( void  ) [static]

A post-fork handler for the parent. Returns all locks.

Returns:
None

Here is the call graph for this function:

Here is the caller graph for this function:

static void thrint_worker_cleanup_handler ( thr_pool_t pool) [static]

A cleanup handler, executed by each worker thread upon exiting. Takes the calling thread out of the thread pool. If the caller exited prematurely, it creates a new worker to replace it. If the caller exited as a response to the pool being destroyed, it signals anyone waiting (ie, anyone who called thr_pool_destroy()) in the case the pool is now empty.

Precondition:
Caller must have acquired pool->lock.
Parameters:
[in]poolA pointer to our pool.
Returns:
None

Here is the call graph for this function:

Here is the caller graph for this function:

static void * thrint_worker_function ( void *  arg) [static]

The main loop for a worker thread.

Parameters:
[in]argA pointer to our pool.
Returns:
None.

Here is the call graph for this function:

Here is the caller graph for this function:


Variable Documentation

sigset_t fillset [static]

A signal set filled with all signals.

Next valid handle.

thr_pool_t* thr_pools = NULL [static]

List of pools.

int thr_pools_atfork = 0 [static]

Module state: remembers if the atfork handler was installed.

pthread_rwlock_t thr_pools_lock = PTHREAD_RWLOCK_INITIALIZER [static]

Module lock. Protects: job::next pointers, thr_pools, thr_pools_state, next_pool_handle.

int thr_pools_state = THR_POOL_NOT_INITIALIZED [static]

Module state. One of: THR_POOL_INITIALIZED, THR_POOL_NOT_INITIALIZED, THR_POOL_UNRECOV_ERROR.


Generated for NPS Pthread Pool by  doxygen