C语言-线程池代码

简介: github 地址:常用的C工具代码,这里的工具包含了C语言实现的线程池,hashtable,list,md5,字符串操作,消息队列等很多常用的工具,我这里就不一一说明了,感兴趣的朋友可以自行下载研究,工作中肯定用的上。

说明

github 地址:常用的C工具代码,这里的工具包含了C语言实现的线程池,hashtable,list,md5,字符串操作,消息队列等很多常用的工具,我这里就不一一说明了,感兴趣的朋友可以自行下载研究,工作中肯定用的上。


这里的代码是直接拷贝的,直接复制到自己的项目就能使用。


如果对线程池不是很熟悉的朋友,或者只需要支持linux的版本线程池,可以参考我另一篇博客的简单版本linux版本的线程池.


代码

thread-pool.h头文件

#ifndef _threadpool_h_
#define _threadpool_h_
#ifdef __cplusplus
extern "C" {
#endif
typedef void* thread_pool_t;
/**
 * @brief thread_pool_create 创建线程池
 * @param num 初始化的线程个数
 * @param min 最小的线程个数
 * @param max 最大的线程个数
 * @return 0-error, other-thread pool id
 */
thread_pool_t thread_pool_create(int num, int min, int max);
/**
 * @brief thread_pool_destroy 销毁线程池
 * @param pool
 */
void thread_pool_destroy(thread_pool_t pool);
/**
 * @brief 获取线程池中的线程个数
 * @param pool
 * @return <0-error code, >=0-thread count
 */
int thread_pool_threads_count(thread_pool_t pool);
///任务回调函数
typedef void (*thread_pool_proc)(void *param);
/**
 * @brief thread_pool_push 往线程池中放入一个任务
 * @param pool 线程池对象
 * @param proc 任务的函数指针
 * @param param 任务自定义函数参数
 * @return =0-ok, <0-error code
 */
int thread_pool_push(thread_pool_t pool, thread_pool_proc proc, void *param);
#ifdef __cplusplus
}
#endif
#endif /* !_threadpool_h_ */

依赖头文件

locker.h

#ifndef _platform_locker_h_
#define _platform_locker_h_
#include <errno.h>
#if defined(WIN32)
#include <Windows.h>
typedef CRITICAL_SECTION  locker_t;
#else
#include <pthread.h>
typedef pthread_mutex_t  locker_t;
#endif
//-------------------------------------------------------------------------------------
// int locker_create(locker_t* locker);
// int locker_destroy(locker_t* locker);
// int locker_lock(locker_t* locker);
// int locker_unlock(locker_t* locker);
// int locker_trylock(locker_t* locker);
//-------------------------------------------------------------------------------------
static inline int locker_create(locker_t* locker)
{
#if defined(WIN32)
  InitializeCriticalSection(locker);
  return 0;
#else
  // create a recusive locker
  int r;
  pthread_mutexattr_t attr;
  pthread_mutexattr_init(&attr);
  // http://linux.die.net/man/3/pthread_mutexattr_settype
  // Application Usage:
  // It is advised that an application should not use a PTHREAD_MUTEX_RECURSIVE mutex 
  // with condition variables because the implicit unlock performed for a pthread_cond_timedwait() 
  // or pthread_cond_wait() may not actually release the mutex (if it had been locked multiple times). 
  // If this happens, no other thread can satisfy the condition of the predicate. 
#if defined(OS_LINUX) && defined(__GLIBC__)
  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE_NP);
  //pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK_NP);
#else
  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
  //pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
#endif
  r = pthread_mutex_init(locker, &attr);
  pthread_mutexattr_destroy(&attr);
  return r;
#endif
}
static inline int locker_destroy(locker_t* locker)
{
#if defined(WIN32)
  DeleteCriticalSection(locker);
  return 0;
#else
  return pthread_mutex_destroy(locker);
#endif
}
static inline int locker_lock(locker_t* locker)
{
#if defined(WIN32)
  EnterCriticalSection(locker);
  return 0;
#else
    // These functions shall not return an error code of [EINTR].
  return pthread_mutex_lock(locker);
#endif
}
// linux: unlock thread must is the lock thread
static inline int locker_unlock(locker_t* locker)
{
#if defined(WIN32)
  LeaveCriticalSection(locker);
  return 0;
#else
  return pthread_mutex_unlock(locker);
#endif
}
static inline int locker_trylock(locker_t* locker)
{
#if defined(WIN32)
  return TryEnterCriticalSection(locker)?0:-1;
#else
  return pthread_mutex_trylock(locker);
#endif
}
#endif /* !_platform_locker_h_ */

system.h

#ifndef _platform_system_h_
#define _platform_system_h_
#include <stdint.h>
#if defined(WIN32)
#include <Windows.h>
typedef HMODULE module_t;
typedef uint32_t useconds_t;
typedef FARPROC funcptr_t;
#else
#include <sys/types.h>
#include <sys/utsname.h>
#include <unistd.h>
#include <pthread.h>
#include <dlfcn.h>
#include <errno.h>
#include <time.h>
typedef void* module_t;
typedef void (*funcptr_t)(void);
#endif
#if defined(OS_MAC)
#include <sys/param.h>
#include <sys/sysctl.h>
#include <mach/mach_time.h>  
#endif
#include <stdint.h>
#include <stdio.h>
//-----------------------------------------------------------------------
// void system_sleep(useconds_t millisecond);
// uint64_t system_time(void);
// uint64_t system_clock(void);
// int64_t system_getcyclecount(void);
// size_t system_getcpucount(void);
//
// int system_version(int* major, int* minor);
// module_t system_load(const char* module);
// int system_unload(module_t module);
// funcptr_t system_getproc(module_t module, const char* producer);
//-----------------------------------------------------------------------
//
///
/// implement
///
//
static inline void system_sleep(useconds_t milliseconds)
{
#if defined(WIN32)
  Sleep(milliseconds);
#else
  usleep(milliseconds*1000);
#endif
}
static inline size_t system_getcpucount(void)
{
#if defined(WIN32)
  SYSTEM_INFO sysinfo;
  GetSystemInfo(&sysinfo);
  return sysinfo.dwNumberOfProcessors;
#elif defined(OS_MAC) || defined(_FREEBSD_) || defined(_NETBSD_) || defined(_OPENBSD_)
  // FreeBSD, MacOS X, NetBSD, OpenBSD, etc.:
  int mib[4];
  size_t num;
  size_t len;
  mib[0] = CTL_HW;
  mib[1] = HW_AVAILCPU; // alternatively, try HW_NCPU;
    num = 0;
  len = sizeof(num);
  sysctl(mib, 2, &num, &len, NULL, 0);
  if(num < 1)
  {
  mib[1] = HW_NCPU;
  sysctl(mib, 2, &num, &len, NULL, 0);
  if(num < 1)
    num = 1;
  }
  return num;
#elif defined(_HPUX_)
  // HPUX:
  return mpctl(MPC_GETNUMSPUS, NULL, NULL);
#elif defined(_IRIX_)
  // IRIX:
  return sysconf(_SC_NPROC_ONLN);
#else
  // linux, Solaris, & AIX
  return sysconf(_SC_NPROCESSORS_ONLN);
  //"cat /proc/cpuinfo | grep processor | wc -l"
#endif
}
static inline int64_t system_getcyclecount(void)
{
#if defined(WIN32)
  LARGE_INTEGER freq;
  LARGE_INTEGER count;
  QueryPerformanceCounter(&count);
  QueryPerformanceFrequency(&freq);
#else
#endif
  return 0;
}
/// milliseconds since the Epoch(1970-01-01 00:00:00 +0000 (UTC))
static inline uint64_t system_time(void)
{
#if defined(WIN32)
  uint64_t t;
  FILETIME ft;
  GetSystemTimeAsFileTime(&ft);
  t = (uint64_t)ft.dwHighDateTime << 32 | ft.dwLowDateTime;
  return t / 10000 - 11644473600000ULL; /* Jan 1, 1601 */
#elif defined(OS_MAC)
  uint64_t tick;
  mach_timebase_info_data_t timebase;
  tick = mach_absolute_time();
  mach_timebase_info(&timebase);
  return tick * timebase.numer / timebase.denom / 1000000;
#else
#if defined(CLOCK_REALTIME)
  struct timespec tp;
  clock_gettime(CLOCK_REALTIME, &tp);
  return (uint64_t)tp.tv_sec * 1000 + tp.tv_nsec / 1000000;
#else
  // POSIX.1-2008 marks gettimeofday() as obsolete, recommending the use of clock_gettime(2) instead.
  struct timeval tv;
  gettimeofday(&tv, NULL);
  return (uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
#endif
#endif
}
///@return milliseconds(relative time)
static inline uint64_t system_clock(void)
{
#if defined(WIN32)
  LARGE_INTEGER freq;
  LARGE_INTEGER count;
  QueryPerformanceFrequency(&freq);
  QueryPerformanceCounter(&count);
  return (uint64_t)count.QuadPart * 1000 / freq.QuadPart;
#elif defined(OS_MAC)
  uint64_t tick;
  mach_timebase_info_data_t timebase;
  tick = mach_absolute_time();
  mach_timebase_info(&timebase);
  return tick * timebase.numer / timebase.denom / 1000000;
#else
#if defined(CLOCK_MONOTONIC)
  struct timespec tp;
  clock_gettime(CLOCK_MONOTONIC, &tp);
  return (uint64_t)tp.tv_sec * 1000 + tp.tv_nsec / 1000000;
#else
  // POSIX.1-2008 marks gettimeofday() as obsolete, recommending the use of clock_gettime(2) instead.
  struct timeval tv;
  gettimeofday(&tv, NULL);
  return (uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
#endif
#endif
}
#if defined(_MSC_VER)
#pragma warning(push)
#pragma warning(disable: 4996) // GetVersionEx
#pragma warning(disable: 28159)
#endif
static inline int system_version(int* major, int* minor)
{
#if defined(WIN32)
  /*
  Operating system  Version number 
  Windows 8    6.2
  Windows Server 2012  6.2
  Windows 7    6.1
  Windows Server 2008 R2  6.1
  Windows Server 2008  6.0 
  Windows Vista   6.0 
  Windows Server 2003 R2  5.2 
  Windows Server 2003  5.2 
  Windows XP    5.1 
  Windows 2000    5.0 
  Windows Me    4.90 
  Windows 98    4.10 
  Windows NT 4.0    4.0 
  Windows 95    4.0 
  */
  OSVERSIONINFO version;
  memset(&version, 0, sizeof(version));
  version.dwOSVersionInfoSize = sizeof(version);
  GetVersionEx(&version);
  *major = (int)(version.dwMajorVersion);
  *minor = (int)(version.dwMinorVersion);
  return 0;
#else
  struct utsname ver;
  if(0 != uname(&ver))
  return errno;
  if(2!=sscanf(ver.release, "%8d.%8d", major, minor))
  return -1;
  return 0;
#endif
}
#if defined(_MSC_VER)
#pragma warning(pop)
#endif
//
///
/// dynamic module load/unload
///
//
static inline module_t system_load(const char* module)
{
#if defined(WIN32)
  return LoadLibraryExA(module, NULL, LOAD_WITH_ALTERED_SEARCH_PATH);
#else
  return dlopen(module, RTLD_LAZY|RTLD_LOCAL);
#endif
}
static inline int system_unload(module_t module)
{
#if defined(WIN32)
  return FreeLibrary(module);
#else
  return dlclose(module);
#endif
}
static inline funcptr_t system_getproc(module_t module, const char* producer)
{
#if defined(WIN32)
  return GetProcAddress(module, producer);
#else
  // https://linux.die.net/man/3/dlsym
  // cosine = (double (*)(double)) dlsym(handle, "cos")
  // ===> *(void **) (&cosine) = dlsym(handle, "cos");
  return (funcptr_t)dlsym(module, producer);
#endif
}
#endif /* !_platform_system_h_ */

thread.h

#ifndef _platform_thread_h_
#define _platform_thread_h_
#if defined(WIN32)
#include <Windows.h>
#include <process.h>
#ifndef STDCALL
#define STDCALL __stdcall
#endif
typedef struct
{
  DWORD id;
  HANDLE handle;
} pthread_t;
typedef DWORD tid_t;
#else
#include <pthread.h>
#include <sched.h>
typedef pthread_t tid_t;
#ifndef STDCALL
#define STDCALL
#endif
enum thread_priority
{
  THREAD_PRIORITY_IDLE    = 1,
  THREAD_PRIORITY_LOWEST    = 25,
  THREAD_PRIORITY_BELOW_NORMAL  = 40,
  THREAD_PRIORITY_NORMAL    = 50,
  THREAD_PRIORITY_ABOVE_NORMAL  = 60,
  THREAD_PRIORITY_HIGHEST   = 75,
  THREAD_PRIORITY_TIME_CRITICAL = 99,
};
#endif
//-------------------------------------------------------------------------------------
// int thread_create(pthread_t* thread, thread_proc func, void* param);
// int thread_destroy(pthread_t thread);
// int thread_detach(pthread_t thread);
// int thread_getpriority(pthread_t thread, int* priority);
// int thread_setpriority(pthread_t thread, int priority);
// int thread_isself(pthread_t thread);
// int thread_valid(pthread_t thread);
// int thread_yield(void);
// tid_t thread_getid(pthread_t thread);
// pthread_t thread_self(void);
//-------------------------------------------------------------------------------------
typedef int (STDCALL *thread_proc)(void* param);
static inline int thread_create2(pthread_t* thread, unsigned int stacksize, thread_proc func, void* param)
{
#if defined(WIN32)
  // https://msdn.microsoft.com/en-us/library/windows/desktop/ms682453.aspx
  // CreateThread function: By default, every thread has one megabyte of stack space. 
  // http://msdn.microsoft.com/en-us/library/windows/desktop/ms682453%28v=vs.85%29.aspx
  // A thread in an executable that calls the C run-time library (CRT) 
  // should use the _beginthreadex and _endthreadex functions for thread management 
  // rather than CreateThread and ExitThread;
  //thread->handle = CreateThread(NULL, stacksize, (LPTHREAD_START_ROUTINE)func, param, 0, &thread->id);
  typedef unsigned int(__stdcall *thread_routine)(void *);
  thread->handle = (HANDLE)_beginthreadex(NULL, stacksize, (thread_routine)func, param, 0, (unsigned int*)&thread->id);
  return NULL == thread->handle ? -1 : 0;
#else
  // https://linux.die.net/man/3/pthread_create
  // On Linux/x86-32, the default stack size for a new thread is 2 megabytes(10M 64bits)
  // http://udrepper.livejournal.com/20948.html
  // mallopt(M_ARENA_MAX, cpu); // limit multithread virtual memory
  typedef void* (*linux_thread_routine)(void*);
  int r;
  pthread_attr_t attr;
  pthread_attr_init(&attr);
  pthread_attr_setstacksize(&attr, stacksize);
  r = pthread_create(thread, &attr, (linux_thread_routine)func, param);
  pthread_attr_destroy(&attr);
  return r;
#endif
}
static inline int thread_create(pthread_t* thread, thread_proc func, void* param)
{
  return thread_create2(thread, 0, func, param);
}
static inline int thread_destroy(pthread_t thread)
{
#if defined(WIN32)
  if(thread.id != GetCurrentThreadId())
  WaitForSingleObjectEx(thread.handle, INFINITE, TRUE);
  CloseHandle(thread.handle);
  return 0;
#else
  void* value = NULL;
  if(pthread_equal(pthread_self(),thread))
        return pthread_detach(thread);
  else
        return pthread_join(thread, &value);
#endif
}
static inline int thread_detach(pthread_t thread)
{
#if defined(WIN32)
  CloseHandle(thread.handle);
  return 0;
#else
  return pthread_detach(thread);
#endif
}
// priority: [-15, 15]
// 0: normal / -15: idle / 15: critical
static inline int thread_getpriority(pthread_t thread, int* priority)
{
#if defined(WIN32)
  int r = GetThreadPriority(thread.handle);
  if(THREAD_PRIORITY_ERROR_RETURN == r)
  return (int)GetLastError();
  *priority = r;
  return 0;
#else
  int policy;
  struct sched_param sched;
  int r = pthread_getschedparam(thread, &policy, &sched);
  if(0 == r)
  *priority = sched.sched_priority;
  return r;
#endif
}
static inline int thread_setpriority(pthread_t thread, int priority)
{
#if defined(WIN32)
  BOOL r = SetThreadPriority(thread.handle, priority);
  return TRUE==r?1:0;
#else
  int policy = SCHED_RR;
  struct sched_param sched;
  pthread_getschedparam(thread, &policy, &sched);
  // For processes scheduled under one of the normal scheduling policies 
  // (SCHED_OTHER, SCHED_IDLE, SCHED_BATCH), 
  // sched_priority is not used in scheduling decisions (it must be specified as 0).
  // Processes scheduled under one of the real-time policies(SCHED_FIFO, SCHED_RR) 
  // have a sched_priority value in the range 1 (low)to 99 (high)
  sched.sched_priority = (SCHED_FIFO==policy || SCHED_RR==policy) ? priority : 0;
  return pthread_setschedparam(thread, policy, &sched);
#endif
}
static inline pthread_t thread_self(void)
{
#if defined(WIN32)
  pthread_t t;
  t.handle = GetCurrentThread();
  t.id = GetCurrentThreadId();
  return t;
#else
  return pthread_self();
#endif
}
static inline tid_t thread_getid(pthread_t thread)
{
#if defined(WIN32)
  //return GetThreadId(thread.handle); // >= vista
  return thread.id;
#else
  return thread;
#endif
}
static inline int thread_isself(pthread_t thread)
{
#if defined(WIN32)
  return thread.id==GetCurrentThreadId() ? 1 : 0;
#else
  return pthread_equal(pthread_self(), thread);
#endif
}
static inline int thread_valid(pthread_t thread)
{
#if defined(WIN32)
  return 0 != thread.id ? 1 : 0;
#else
  return 0 != thread ? 1 : 0;
#endif
}
static inline int thread_yield(void)
{
#if defined(WIN32)
  // Causes the calling thread to yield execution to another thread that is ready to run 
  // on the current processor. The operating system selects the next thread to be executed.
  return SwitchToThread() ? 0 : -1;
#else
  return sched_yield();
#endif
}
#if defined(WIN32_XP)
typedef DWORD KPRIORITY;
typedef struct _CLIENT_ID
{
  PVOID UniqueProcess;
  PVOID UniqueThread;
} CLIENT_ID, *PCLIENT_ID;
typedef struct _THREAD_BASIC_INFORMATION
{
  NTSTATUS                ExitStatus;
  PVOID                   TebBaseAddress;
  CLIENT_ID               ClientId;
  KAFFINITY               AffinityMask;
  KPRIORITY               Priority;
  KPRIORITY               BasePriority;
} THREAD_BASIC_INFORMATION, *PTHREAD_BASIC_INFORMATION;
typedef NTSTATUS(__stdcall *NtQueryInformationThread)(HANDLE ThreadHandle, int ThreadInformationClass, PVOID ThreadInformation, ULONG ThreadInformationLength, PULONG ReturnLength);
static inline tid_t thread_getid_xp(HANDLE handle)
{
  // NT_TIB* tib = (NT_TIB*)__readfsdword(0x18);
  HMODULE module;
  THREAD_BASIC_INFORMATION tbi;
  memset(&tbi, 0, sizeof(tbi));
  module = GetModuleHandleA("ntdll.dll");
  NtQueryInformationThread fp = (NtQueryInformationThread)GetProcAddress(module, "NtQueryInformationThread");
  fp(handle, 0/*ThreadBasicInformation*/, &tbi, sizeof(tbi), NULL);
  return (tid_t)tbi.ClientId.UniqueThread;
}
#endif
#endif /* !_platform_thread_h_ */

event.h

#ifndef _platform_event_h_
#define _platform_event_h_
#if defined(WIN32)
#include <Windows.h>
typedef HANDLE  event_t;
#else
#include <sys/time.h> // gettimeofday
#include <pthread.h>
#include <unistd.h>
#include <errno.h>
#include <time.h> // clock_gettime
typedef struct
{
  int count; // fixed pthread_cond_signal/pthread_cond_wait call order
  pthread_cond_t event;
  pthread_mutex_t mutex;
} event_t;
#ifndef WAIT_TIMEOUT
#define WAIT_TIMEOUT  ETIMEDOUT
#endif
#endif
/// event: Windows Event/Linux condition variable
/// multi-processor: no
//-------------------------------------------------------------------------------------
// int event_create(event_t* event);
// int event_destroy(event_t* event);
// int event_wait(event_t* event);
// int event_timewait(event_t* event, int timeout);
// int event_signal(event_t* event);
// int event_reset(event_t* event);
//-------------------------------------------------------------------------------------
static inline int event_create(event_t* event)
{
#if defined(WIN32)
    HANDLE h = CreateEvent(NULL, FALSE, FALSE, NULL);//自动复位,切初始无状态
  if(NULL==h)
  return (int)GetLastError();
  *event = h;
  return 0;
#else
  int r;
#if defined(OS_LINUX) && defined(CLOCK_MONOTONIC) && defined(__USE_XOPEN2K)
  pthread_condattr_t attr;
#endif
    pthread_mutexattr_t mutex;
  pthread_mutexattr_init(&mutex);
#if defined(OS_LINUX)
  pthread_mutexattr_settype(&mutex, PTHREAD_MUTEX_RECURSIVE_NP);
#else
  pthread_mutexattr_settype(&mutex, PTHREAD_MUTEX_RECURSIVE);
#endif
  pthread_mutex_init(&event->mutex, &mutex);
#if defined(OS_LINUX) && defined(CLOCK_MONOTONIC) && defined(__USE_XOPEN2K)
  pthread_condattr_init(&attr);
  pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
  r = pthread_cond_init(&event->event, &attr);
  pthread_condattr_destroy(&attr);
#else
  r = pthread_cond_init(&event->event, NULL);
#endif
  event->count = 0;
  return r;
#endif
}
static inline int event_destroy(event_t* event)
{
#if defined(WIN32)
  BOOL r = CloseHandle(*event);
  return r?0:(int)GetLastError();
#else
  int r = pthread_cond_destroy(&event->event);
  while(EBUSY == r)
  {
  usleep(1000);
  r = pthread_cond_destroy(&event->event);
  }
  pthread_mutex_destroy(&event->mutex);
  return r;
#endif
}
// 0-success, other-error
static inline int event_wait(event_t* event)
{
#if defined(WIN32)
  DWORD r = WaitForSingleObjectEx(*event, INFINITE, TRUE);
  return WAIT_FAILED==r ? GetLastError() : r;
#else
  int r = 0;
  pthread_mutex_lock(&event->mutex);
  if(0 == event->count)
  r = pthread_cond_wait(&event->event, &event->mutex); // These functions shall not return an error code of [EINTR].
  event->count = 0;
  pthread_mutex_unlock(&event->mutex);
  return r;
#endif
}
// 0-success, WAIT_TIMEOUT-timeout, other-error
static inline int event_timewait(event_t* event, int timeout)
{
#if defined(WIN32)
  DWORD r = WaitForSingleObjectEx(*event, timeout, TRUE);
  return WAIT_FAILED==r ? GetLastError() : r;
#else
#if defined(OS_LINUX) && defined(CLOCK_REALTIME)
  int r = 0;
  struct timespec ts;
#if defined(CLOCK_MONOTONIC) && defined(__USE_XOPEN2K)
  clock_gettime(CLOCK_MONOTONIC, &ts);
#else
  clock_gettime(CLOCK_REALTIME, &ts);
#endif
  ts.tv_sec += timeout/1000;
  ts.tv_nsec += (timeout%1000)*1000000;
#else
  int r = 0;
  struct timeval tv;
  struct timespec ts;
  gettimeofday(&tv, NULL);
  ts.tv_sec = tv.tv_sec + timeout/1000;
  ts.tv_nsec = tv.tv_usec * 1000 + (timeout%1000)*1000000;
#endif
  // tv_nsec >= 1000000000 ==> EINVAL
  ts.tv_sec += ts.tv_nsec / 1000000000;
  ts.tv_nsec %= 1000000000;
  pthread_mutex_lock(&event->mutex);
  if(0 == event->count)
  r = pthread_cond_timedwait(&event->event, &event->mutex, &ts); // These functions shall not return an error code of [EINTR].
  event->count = 0;
  pthread_mutex_unlock(&event->mutex);
  return r;
#endif
}
static inline int event_signal(event_t* event)
{
#if defined(WIN32)
  return SetEvent(*event)?0:(int)GetLastError();
#else
  int r;
  pthread_mutex_lock(&event->mutex);
  event->count = 1;
  r = pthread_cond_signal(&event->event);
  pthread_mutex_unlock(&event->mutex);
  return r;
#endif
}
static inline int event_reset(event_t* event)
{
#if defined(WIN32)
  return ResetEvent(*event)?0:(int)GetLastError();
#else
  pthread_mutex_lock(&event->mutex);
  event->count = 0;
  pthread_mutex_unlock(&event->mutex);
  return 0;
#endif
}
#endif /* !_platform_event_h_ */

thread-pool.c源文件

#include "thread-pool.h"
#include "locker.h"
#include "system.h"
#include "thread.h"
#include "event.h"
#include <stdlib.h>
#include <string.h>
#include <assert.h>
struct _thread_pool_context_t;
//线程列表
typedef struct _thread_list_t
{
    struct _thread_list_t *next;//下一个线程
    struct _thread_pool_context_t *pool;//所属线程池
    pthread_t thread; //线程id和句柄
} thread_list_t;
//任务队列
typedef struct _thread_task_list_t
{
    struct _thread_task_list_t *next;//下一个任务
    thread_pool_proc proc; //要执行的任务
    void *param;//任务参数
} thread_task_list_t;
typedef struct _thread_pool_context_t
{
    int run;//是否运行的标志
    int idle_max;
    int threshold;
    int thread_count;//线程池中线程的数量
    int thread_count_min;//线程池最小的线程数
    int thread_count_max;//线程池最大的线程数
    int thread_count_idle;//闲置的线程
    int task_count;
    thread_task_list_t *tasks;//任务队列
    thread_task_list_t *recycle_tasks;//回收利用的任务队列(主要是用于内存作用)
    thread_list_t *task_threads;//线程池中的所有线程列表
    locker_t locker;//锁
    event_t event;//事件对象/条件变量
} thread_pool_context_t;
static void thread_pool_destroy_thread(thread_pool_context_t *context);
//线程执行逻辑(所有线程都一样)
static int STDCALL thread_pool_worker(void *param)
{
    thread_list_t* threads;
    thread_task_list_t *task;
    thread_pool_context_t *context;
    threads = (thread_list_t*)param;
    context = threads->pool;
    locker_lock(&context->locker);
    while(context->run)//一直运行中
    {
        // pop task
        task = context->tasks;
        while(task && context->run)//有任务且处于运行中(如果有任务,会一直执行任务)
        {
            // remove task from task list
            context->tasks = task->next;
            --context->task_count;
            // do task procedure
            --context->thread_count_idle;
            locker_unlock(&context->locker);
            task->proc(task->param);//执行任务
            locker_lock(&context->locker);
            ++context->thread_count_idle;
            // recycle task: push task to recycle list 将其放入到回收的任务队列中
            task->next = context->recycle_tasks;
            context->recycle_tasks = task;
            // do next
            task = context->tasks;
        }
        // delete idle thread
        if(context->thread_count_idle > context->idle_max
                || !context->run)
            break;
        // wait for task
        locker_unlock(&context->locker);
        event_timewait(&context->event, 60*1000);//1min内判断是否有任务,没有任务就超时不等待了,有任务了,就去获取任务。(自动变为无状态)
        locker_lock(&context->locker);
    }
    --context->thread_count;
    --context->thread_count_idle;
    thread_pool_destroy_thread(context);
    locker_unlock(&context->locker);
    return 0;
}
/**
 * @brief thread_pool_create_thread 创建线程
 * @param context
 * @return
 */
static thread_list_t* thread_pool_create_thread(thread_pool_context_t *context)
{
    thread_list_t* threads;
    threads = (thread_list_t*)malloc(sizeof(thread_list_t));
    if(!threads)
        return NULL;
    memset(threads, 0, sizeof(thread_list_t));
    threads->pool = context;
    if(0 != thread_create(&threads->thread, thread_pool_worker, threads))
    {
        free(threads);
        return NULL;
    }
    return threads;
}
/**
 * @brief thread_pool_destroy_thread 释放线程
 * @param context 线程池参数
 */
static void thread_pool_destroy_thread(thread_pool_context_t *context)
{
    thread_list_t **head;
    thread_list_t *next;
    head = &context->task_threads;
    while(*head)//从队列中找到要释放的当前线程
    {
        if(thread_isself((*head)->thread))
        {
            next = *head;
            *head = (*head)->next;
            free(next);
            break;
        }
        head = &(*head)->next;
    }
}
/**
 * @brief thread_pool_create_threads 创建所有的线程
 * @param context 线程池对象
 * @param num 创建线程的数量
 */
static void thread_pool_create_threads(thread_pool_context_t *context, 
                                       int num)
{
    int i;
    thread_list_t *threads;
    for(i=0; i<num; i++)
    {
        threads = thread_pool_create_thread(context);
        if(!threads)
            break;
        // add to list head 头插法
        threads->next = context->task_threads;
        context->task_threads = threads;
    }
    context->thread_count += i;
    context->thread_count_idle += i;
}
/**
 * @brief thread_pool_destroy_threads 删除所有线程,没有使用
 * @param threads
 */
static void thread_pool_destroy_threads(thread_list_t *threads)
{
    thread_list_t *next;
    while(threads)
    {
        next = threads->next;
        thread_destroy(threads->thread);
        free(threads);
        threads = next;
    }
}
/**
 * @brief thread_pool_create_task 往线程池中插入任务
 * @param context
 * @param proc
 * @param param
 * @return
 */
static thread_task_list_t* thread_pool_create_task(thread_pool_context_t *context,
                                                   thread_pool_proc proc,
                                                   void* param)
{
    thread_task_list_t *task;
    if(context->recycle_tasks)
    {
        task = context->recycle_tasks;
        context->recycle_tasks = context->recycle_tasks->next;
    }
    else
    {
        task = (thread_task_list_t*)malloc(sizeof(thread_task_list_t));
    }
    if(!task)
        return NULL;
    memset(task, 0, sizeof(thread_task_list_t));//清空内容
    task->param = param;
    task->proc = proc;
    return task;
}
/**
 * @brief thread_pool_destroy_tasks 销毁任务队列
 * @param tasks 任务队列
 */
static void thread_pool_destroy_tasks(thread_task_list_t *tasks)
{
    thread_task_list_t *next;
    while(tasks)
    {
        next = tasks->next;
        free(tasks);
        tasks = next;
    }
}
thread_pool_t thread_pool_create(int num, int min, int max)
{
    thread_pool_context_t *ctx;
    ctx = (thread_pool_context_t*)malloc(sizeof(thread_pool_context_t));
    if(!ctx)
        return NULL;
    memset(ctx, 0, sizeof(thread_pool_context_t));
    ctx->thread_count_min = min;
    ctx->thread_count_max = max;
    ctx->threshold = num / 2;
    ctx->idle_max = num;
    ctx->run = 1;//代表运行
    if(0 != locker_create(&ctx->locker))
    {
        free(ctx);
        return NULL;
    }
    if(0 != event_create(&ctx->event))
    {
        locker_destroy(&ctx->locker);
        free(ctx);
        return NULL;
    }
    thread_pool_create_threads(ctx, num);//创建所有的线程
    return ctx;
}
void thread_pool_destroy(thread_pool_t pool)
{
    thread_pool_context_t *ctx;
    ctx = (thread_pool_context_t*)pool;
    ctx->run = 0;
    locker_lock(&ctx->locker);
    while(ctx->thread_count)
    {
        event_signal(&ctx->event);//通知所有线程退出
        locker_unlock(&ctx->locker);
        system_sleep(100);
        locker_lock(&ctx->locker);
    }
    locker_unlock(&ctx->locker);
    //thread_pool_destroy_threads(ctx->task_threads);
    thread_pool_destroy_tasks(ctx->recycle_tasks);
    thread_pool_destroy_tasks(ctx->tasks);
    event_destroy(&ctx->event);
    locker_destroy(&ctx->locker);
    free(ctx);
}
int thread_pool_threads_count(thread_pool_t pool)
{
    thread_pool_context_t *ctx;
    ctx = (thread_pool_context_t*)pool;
    return ctx->thread_count;
}
int thread_pool_push(thread_pool_t pool, thread_pool_proc proc, void *param)
{
    thread_task_list_t *task;
    thread_pool_context_t *context;
    context = (thread_pool_context_t*)pool;
    locker_lock(&context->locker);
    task = thread_pool_create_task(context, proc, param);//创建任务
    if(!task)
    {
        locker_unlock(&context->locker);
        return -1;
    }
    // 添加到任务队列中
    task->next = context->tasks;
    context->tasks = task;
    ++context->task_count;
    // 添加新线程来处理任务(如果没有闲置线程,并且还没达到最大线程数)
    if(context->thread_count_idle<1
            && context->thread_count<context->thread_count_max)
        thread_pool_create_threads(context, 1);
    event_signal(&context->event);//通知线程有任务了
    locker_unlock(&context->locker);
    return 0;
}
相关文章
|
3月前
|
NoSQL 编译器 程序员
【C语言】揭秘GCC:从平凡到卓越的编译艺术,一场代码与效率的激情碰撞,探索那些不为人知的秘密武器,让你的程序瞬间提速百倍!
【8月更文挑战第20天】GCC,GNU Compiler Collection,是GNU项目中的开源编译器集合,支持C、C++等多种语言。作为C语言程序员的重要工具,GCC具备跨平台性、高度可配置性及丰富的优化选项等特点。通过简单示例,如编译“Hello, GCC!”程序 (`gcc -o hello hello.c`),展示了GCC的基础用法及不同优化级别(`-O0`, `-O1`, `-O3`)对性能的影响。GCC还支持生成调试信息(`-g`),便于使用GDB等工具进行调试。尽管有如Microsoft Visual C++、Clang等竞品,GCC仍因其灵活性和强大的功能被广泛采用。
122 1
|
1月前
|
存储 搜索推荐 C语言
深入C语言指针,使代码更加灵活(二)
深入C语言指针,使代码更加灵活(二)
|
1月前
|
存储 程序员 编译器
深入C语言指针,使代码更加灵活(一)
深入C语言指针,使代码更加灵活(一)
|
1月前
|
C语言
深入C语言指针,使代码更加灵活(三)
深入C语言指针,使代码更加灵活(三)
深入C语言指针,使代码更加灵活(三)
|
2月前
|
安全 C语言
在C语言中,正确使用运算符能提升代码的可读性和效率
在C语言中,运算符的使用需要注意优先级、结合性、自增自减的形式、逻辑运算的短路特性、位运算的类型、条件运算的可读性、类型转换以及使用括号来明确运算顺序。掌握这些注意事项可以帮助编写出更安全和高效的代码。
49 4
|
1月前
|
C语言
C语言练习题代码
C语言练习题代码
|
2月前
|
存储 算法 C语言
数据结构基础详解(C语言):单链表_定义_初始化_插入_删除_查找_建立操作_纯c语言代码注释讲解
本文详细介绍了单链表的理论知识,涵盖单链表的定义、优点与缺点,并通过示例代码讲解了单链表的初始化、插入、删除、查找等核心操作。文中还具体分析了按位序插入、指定节点前后插入、按位序删除及按值查找等算法实现,并提供了尾插法和头插法建立单链表的方法,帮助读者深入理解单链表的基本原理与应用技巧。
505 6
|
2月前
|
存储 C语言 C++
数据结构基础详解(C语言) 顺序表:顺序表静态分配和动态分配增删改查基本操作的基本介绍及c语言代码实现
本文介绍了顺序表的定义及其在C/C++中的实现方法。顺序表通过连续存储空间实现线性表,使逻辑上相邻的元素在物理位置上也相邻。文章详细描述了静态分配与动态分配两种方式下的顺序表定义、初始化、插入、删除、查找等基本操作,并提供了具体代码示例。静态分配方式下顺序表的长度固定,而动态分配则可根据需求调整大小。此外,还总结了顺序表的优点,如随机访问效率高、存储密度大,以及缺点,如扩展不便和插入删除操作成本高等特点。
190 5
|
2月前
|
存储 C语言
数据结构基础详解(C语言): 栈与队列的详解附完整代码
栈是一种仅允许在一端进行插入和删除操作的线性表,常用于解决括号匹配、函数调用等问题。栈分为顺序栈和链栈,顺序栈使用数组存储,链栈基于单链表实现。栈的主要操作包括初始化、销毁、入栈、出栈等。栈的应用广泛,如表达式求值、递归等场景。栈的顺序存储结构由数组和栈顶指针构成,链栈则基于单链表的头插法实现。
362 3
|
2月前
|
存储 算法 C语言
C语言手撕实战代码_二叉排序树(二叉搜索树)_构建_删除_插入操作详解
这份二叉排序树习题集涵盖了二叉搜索树(BST)的基本操作,包括构建、查找、删除等核心功能。通过多个具体示例,如构建BST、查找节点所在层数、删除特定节点及查找小于某个关键字的所有节点等,帮助读者深入理解二叉排序树的工作原理与应用技巧。此外,还介绍了如何将一棵二叉树分解为两棵满足特定条件的BST,以及删除所有关键字小于指定值的节点等高级操作。每个题目均配有详细解释与代码实现,便于学习与实践。