C语言-线程池代码

简介: github 地址:常用的C工具代码,这里的工具包含了C语言实现的线程池,hashtable,list,md5,字符串操作,消息队列等很多常用的工具,我这里就不一一说明了,感兴趣的朋友可以自行下载研究,工作中肯定用的上。

说明

github 地址:常用的C工具代码,这里的工具包含了C语言实现的线程池,hashtable,list,md5,字符串操作,消息队列等很多常用的工具,我这里就不一一说明了,感兴趣的朋友可以自行下载研究,工作中肯定用的上。


这里的代码是直接拷贝的,直接复制到自己的项目就能使用。


如果对线程池不是很熟悉的朋友,或者只需要支持linux的版本线程池,可以参考我另一篇博客的简单版本linux版本的线程池.


代码

thread-pool.h头文件

#ifndef _threadpool_h_
#define _threadpool_h_
#ifdef __cplusplus
extern "C" {
#endif
typedef void* thread_pool_t;
/**
 * @brief thread_pool_create 创建线程池
 * @param num 初始化的线程个数
 * @param min 最小的线程个数
 * @param max 最大的线程个数
 * @return 0-error, other-thread pool id
 */
thread_pool_t thread_pool_create(int num, int min, int max);
/**
 * @brief thread_pool_destroy 销毁线程池
 * @param pool
 */
void thread_pool_destroy(thread_pool_t pool);
/**
 * @brief 获取线程池中的线程个数
 * @param pool
 * @return <0-error code, >=0-thread count
 */
int thread_pool_threads_count(thread_pool_t pool);
///任务回调函数
typedef void (*thread_pool_proc)(void *param);
/**
 * @brief thread_pool_push 往线程池中放入一个任务
 * @param pool 线程池对象
 * @param proc 任务的函数指针
 * @param param 任务自定义函数参数
 * @return =0-ok, <0-error code
 */
int thread_pool_push(thread_pool_t pool, thread_pool_proc proc, void *param);
#ifdef __cplusplus
}
#endif
#endif /* !_threadpool_h_ */

依赖头文件

locker.h

#ifndef _platform_locker_h_
#define _platform_locker_h_
#include <errno.h>
#if defined(WIN32)
#include <Windows.h>
typedef CRITICAL_SECTION  locker_t;
#else
#include <pthread.h>
typedef pthread_mutex_t  locker_t;
#endif
//-------------------------------------------------------------------------------------
// int locker_create(locker_t* locker);
// int locker_destroy(locker_t* locker);
// int locker_lock(locker_t* locker);
// int locker_unlock(locker_t* locker);
// int locker_trylock(locker_t* locker);
//-------------------------------------------------------------------------------------
static inline int locker_create(locker_t* locker)
{
#if defined(WIN32)
  InitializeCriticalSection(locker);
  return 0;
#else
  // create a recusive locker
  int r;
  pthread_mutexattr_t attr;
  pthread_mutexattr_init(&attr);
  // http://linux.die.net/man/3/pthread_mutexattr_settype
  // Application Usage:
  // It is advised that an application should not use a PTHREAD_MUTEX_RECURSIVE mutex 
  // with condition variables because the implicit unlock performed for a pthread_cond_timedwait() 
  // or pthread_cond_wait() may not actually release the mutex (if it had been locked multiple times). 
  // If this happens, no other thread can satisfy the condition of the predicate. 
#if defined(OS_LINUX) && defined(__GLIBC__)
  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE_NP);
  //pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK_NP);
#else
  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
  //pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
#endif
  r = pthread_mutex_init(locker, &attr);
  pthread_mutexattr_destroy(&attr);
  return r;
#endif
}
static inline int locker_destroy(locker_t* locker)
{
#if defined(WIN32)
  DeleteCriticalSection(locker);
  return 0;
#else
  return pthread_mutex_destroy(locker);
#endif
}
static inline int locker_lock(locker_t* locker)
{
#if defined(WIN32)
  EnterCriticalSection(locker);
  return 0;
#else
    // These functions shall not return an error code of [EINTR].
  return pthread_mutex_lock(locker);
#endif
}
// linux: unlock thread must is the lock thread
static inline int locker_unlock(locker_t* locker)
{
#if defined(WIN32)
  LeaveCriticalSection(locker);
  return 0;
#else
  return pthread_mutex_unlock(locker);
#endif
}
static inline int locker_trylock(locker_t* locker)
{
#if defined(WIN32)
  return TryEnterCriticalSection(locker)?0:-1;
#else
  return pthread_mutex_trylock(locker);
#endif
}
#endif /* !_platform_locker_h_ */

system.h

#ifndef _platform_system_h_
#define _platform_system_h_
#include <stdint.h>
#if defined(WIN32)
#include <Windows.h>
typedef HMODULE module_t;
typedef uint32_t useconds_t;
typedef FARPROC funcptr_t;
#else
#include <sys/types.h>
#include <sys/utsname.h>
#include <unistd.h>
#include <pthread.h>
#include <dlfcn.h>
#include <errno.h>
#include <time.h>
typedef void* module_t;
typedef void (*funcptr_t)(void);
#endif
#if defined(OS_MAC)
#include <sys/param.h>
#include <sys/sysctl.h>
#include <mach/mach_time.h>  
#endif
#include <stdint.h>
#include <stdio.h>
//-----------------------------------------------------------------------
// void system_sleep(useconds_t millisecond);
// uint64_t system_time(void);
// uint64_t system_clock(void);
// int64_t system_getcyclecount(void);
// size_t system_getcpucount(void);
//
// int system_version(int* major, int* minor);
// module_t system_load(const char* module);
// int system_unload(module_t module);
// funcptr_t system_getproc(module_t module, const char* producer);
//-----------------------------------------------------------------------
//
///
/// implement
///
//
static inline void system_sleep(useconds_t milliseconds)
{
#if defined(WIN32)
  Sleep(milliseconds);
#else
  usleep(milliseconds*1000);
#endif
}
static inline size_t system_getcpucount(void)
{
#if defined(WIN32)
  SYSTEM_INFO sysinfo;
  GetSystemInfo(&sysinfo);
  return sysinfo.dwNumberOfProcessors;
#elif defined(OS_MAC) || defined(_FREEBSD_) || defined(_NETBSD_) || defined(_OPENBSD_)
  // FreeBSD, MacOS X, NetBSD, OpenBSD, etc.:
  int mib[4];
  size_t num;
  size_t len;
  mib[0] = CTL_HW;
  mib[1] = HW_AVAILCPU; // alternatively, try HW_NCPU;
    num = 0;
  len = sizeof(num);
  sysctl(mib, 2, &num, &len, NULL, 0);
  if(num < 1)
  {
  mib[1] = HW_NCPU;
  sysctl(mib, 2, &num, &len, NULL, 0);
  if(num < 1)
    num = 1;
  }
  return num;
#elif defined(_HPUX_)
  // HPUX:
  return mpctl(MPC_GETNUMSPUS, NULL, NULL);
#elif defined(_IRIX_)
  // IRIX:
  return sysconf(_SC_NPROC_ONLN);
#else
  // linux, Solaris, & AIX
  return sysconf(_SC_NPROCESSORS_ONLN);
  //"cat /proc/cpuinfo | grep processor | wc -l"
#endif
}
static inline int64_t system_getcyclecount(void)
{
#if defined(WIN32)
  LARGE_INTEGER freq;
  LARGE_INTEGER count;
  QueryPerformanceCounter(&count);
  QueryPerformanceFrequency(&freq);
#else
#endif
  return 0;
}
/// milliseconds since the Epoch(1970-01-01 00:00:00 +0000 (UTC))
static inline uint64_t system_time(void)
{
#if defined(WIN32)
  uint64_t t;
  FILETIME ft;
  GetSystemTimeAsFileTime(&ft);
  t = (uint64_t)ft.dwHighDateTime << 32 | ft.dwLowDateTime;
  return t / 10000 - 11644473600000ULL; /* Jan 1, 1601 */
#elif defined(OS_MAC)
  uint64_t tick;
  mach_timebase_info_data_t timebase;
  tick = mach_absolute_time();
  mach_timebase_info(&timebase);
  return tick * timebase.numer / timebase.denom / 1000000;
#else
#if defined(CLOCK_REALTIME)
  struct timespec tp;
  clock_gettime(CLOCK_REALTIME, &tp);
  return (uint64_t)tp.tv_sec * 1000 + tp.tv_nsec / 1000000;
#else
  // POSIX.1-2008 marks gettimeofday() as obsolete, recommending the use of clock_gettime(2) instead.
  struct timeval tv;
  gettimeofday(&tv, NULL);
  return (uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
#endif
#endif
}
///@return milliseconds(relative time)
static inline uint64_t system_clock(void)
{
#if defined(WIN32)
  LARGE_INTEGER freq;
  LARGE_INTEGER count;
  QueryPerformanceFrequency(&freq);
  QueryPerformanceCounter(&count);
  return (uint64_t)count.QuadPart * 1000 / freq.QuadPart;
#elif defined(OS_MAC)
  uint64_t tick;
  mach_timebase_info_data_t timebase;
  tick = mach_absolute_time();
  mach_timebase_info(&timebase);
  return tick * timebase.numer / timebase.denom / 1000000;
#else
#if defined(CLOCK_MONOTONIC)
  struct timespec tp;
  clock_gettime(CLOCK_MONOTONIC, &tp);
  return (uint64_t)tp.tv_sec * 1000 + tp.tv_nsec / 1000000;
#else
  // POSIX.1-2008 marks gettimeofday() as obsolete, recommending the use of clock_gettime(2) instead.
  struct timeval tv;
  gettimeofday(&tv, NULL);
  return (uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
#endif
#endif
}
#if defined(_MSC_VER)
#pragma warning(push)
#pragma warning(disable: 4996) // GetVersionEx
#pragma warning(disable: 28159)
#endif
static inline int system_version(int* major, int* minor)
{
#if defined(WIN32)
  /*
  Operating system  Version number 
  Windows 8    6.2
  Windows Server 2012  6.2
  Windows 7    6.1
  Windows Server 2008 R2  6.1
  Windows Server 2008  6.0 
  Windows Vista   6.0 
  Windows Server 2003 R2  5.2 
  Windows Server 2003  5.2 
  Windows XP    5.1 
  Windows 2000    5.0 
  Windows Me    4.90 
  Windows 98    4.10 
  Windows NT 4.0    4.0 
  Windows 95    4.0 
  */
  OSVERSIONINFO version;
  memset(&version, 0, sizeof(version));
  version.dwOSVersionInfoSize = sizeof(version);
  GetVersionEx(&version);
  *major = (int)(version.dwMajorVersion);
  *minor = (int)(version.dwMinorVersion);
  return 0;
#else
  struct utsname ver;
  if(0 != uname(&ver))
  return errno;
  if(2!=sscanf(ver.release, "%8d.%8d", major, minor))
  return -1;
  return 0;
#endif
}
#if defined(_MSC_VER)
#pragma warning(pop)
#endif
//
///
/// dynamic module load/unload
///
//
static inline module_t system_load(const char* module)
{
#if defined(WIN32)
  return LoadLibraryExA(module, NULL, LOAD_WITH_ALTERED_SEARCH_PATH);
#else
  return dlopen(module, RTLD_LAZY|RTLD_LOCAL);
#endif
}
static inline int system_unload(module_t module)
{
#if defined(WIN32)
  return FreeLibrary(module);
#else
  return dlclose(module);
#endif
}
static inline funcptr_t system_getproc(module_t module, const char* producer)
{
#if defined(WIN32)
  return GetProcAddress(module, producer);
#else
  // https://linux.die.net/man/3/dlsym
  // cosine = (double (*)(double)) dlsym(handle, "cos")
  // ===> *(void **) (&cosine) = dlsym(handle, "cos");
  return (funcptr_t)dlsym(module, producer);
#endif
}
#endif /* !_platform_system_h_ */

thread.h

#ifndef _platform_thread_h_
#define _platform_thread_h_
#if defined(WIN32)
#include <Windows.h>
#include <process.h>
#ifndef STDCALL
#define STDCALL __stdcall
#endif
typedef struct
{
  DWORD id;
  HANDLE handle;
} pthread_t;
typedef DWORD tid_t;
#else
#include <pthread.h>
#include <sched.h>
typedef pthread_t tid_t;
#ifndef STDCALL
#define STDCALL
#endif
enum thread_priority
{
  THREAD_PRIORITY_IDLE    = 1,
  THREAD_PRIORITY_LOWEST    = 25,
  THREAD_PRIORITY_BELOW_NORMAL  = 40,
  THREAD_PRIORITY_NORMAL    = 50,
  THREAD_PRIORITY_ABOVE_NORMAL  = 60,
  THREAD_PRIORITY_HIGHEST   = 75,
  THREAD_PRIORITY_TIME_CRITICAL = 99,
};
#endif
//-------------------------------------------------------------------------------------
// int thread_create(pthread_t* thread, thread_proc func, void* param);
// int thread_destroy(pthread_t thread);
// int thread_detach(pthread_t thread);
// int thread_getpriority(pthread_t thread, int* priority);
// int thread_setpriority(pthread_t thread, int priority);
// int thread_isself(pthread_t thread);
// int thread_valid(pthread_t thread);
// int thread_yield(void);
// tid_t thread_getid(pthread_t thread);
// pthread_t thread_self(void);
//-------------------------------------------------------------------------------------
typedef int (STDCALL *thread_proc)(void* param);
static inline int thread_create2(pthread_t* thread, unsigned int stacksize, thread_proc func, void* param)
{
#if defined(WIN32)
  // https://msdn.microsoft.com/en-us/library/windows/desktop/ms682453.aspx
  // CreateThread function: By default, every thread has one megabyte of stack space. 
  // http://msdn.microsoft.com/en-us/library/windows/desktop/ms682453%28v=vs.85%29.aspx
  // A thread in an executable that calls the C run-time library (CRT) 
  // should use the _beginthreadex and _endthreadex functions for thread management 
  // rather than CreateThread and ExitThread;
  //thread->handle = CreateThread(NULL, stacksize, (LPTHREAD_START_ROUTINE)func, param, 0, &thread->id);
  typedef unsigned int(__stdcall *thread_routine)(void *);
  thread->handle = (HANDLE)_beginthreadex(NULL, stacksize, (thread_routine)func, param, 0, (unsigned int*)&thread->id);
  return NULL == thread->handle ? -1 : 0;
#else
  // https://linux.die.net/man/3/pthread_create
  // On Linux/x86-32, the default stack size for a new thread is 2 megabytes(10M 64bits)
  // http://udrepper.livejournal.com/20948.html
  // mallopt(M_ARENA_MAX, cpu); // limit multithread virtual memory
  typedef void* (*linux_thread_routine)(void*);
  int r;
  pthread_attr_t attr;
  pthread_attr_init(&attr);
  pthread_attr_setstacksize(&attr, stacksize);
  r = pthread_create(thread, &attr, (linux_thread_routine)func, param);
  pthread_attr_destroy(&attr);
  return r;
#endif
}
static inline int thread_create(pthread_t* thread, thread_proc func, void* param)
{
  return thread_create2(thread, 0, func, param);
}
static inline int thread_destroy(pthread_t thread)
{
#if defined(WIN32)
  if(thread.id != GetCurrentThreadId())
  WaitForSingleObjectEx(thread.handle, INFINITE, TRUE);
  CloseHandle(thread.handle);
  return 0;
#else
  void* value = NULL;
  if(pthread_equal(pthread_self(),thread))
        return pthread_detach(thread);
  else
        return pthread_join(thread, &value);
#endif
}
static inline int thread_detach(pthread_t thread)
{
#if defined(WIN32)
  CloseHandle(thread.handle);
  return 0;
#else
  return pthread_detach(thread);
#endif
}
// priority: [-15, 15]
// 0: normal / -15: idle / 15: critical
static inline int thread_getpriority(pthread_t thread, int* priority)
{
#if defined(WIN32)
  int r = GetThreadPriority(thread.handle);
  if(THREAD_PRIORITY_ERROR_RETURN == r)
  return (int)GetLastError();
  *priority = r;
  return 0;
#else
  int policy;
  struct sched_param sched;
  int r = pthread_getschedparam(thread, &policy, &sched);
  if(0 == r)
  *priority = sched.sched_priority;
  return r;
#endif
}
static inline int thread_setpriority(pthread_t thread, int priority)
{
#if defined(WIN32)
  BOOL r = SetThreadPriority(thread.handle, priority);
  return TRUE==r?1:0;
#else
  int policy = SCHED_RR;
  struct sched_param sched;
  pthread_getschedparam(thread, &policy, &sched);
  // For processes scheduled under one of the normal scheduling policies 
  // (SCHED_OTHER, SCHED_IDLE, SCHED_BATCH), 
  // sched_priority is not used in scheduling decisions (it must be specified as 0).
  // Processes scheduled under one of the real-time policies(SCHED_FIFO, SCHED_RR) 
  // have a sched_priority value in the range 1 (low)to 99 (high)
  sched.sched_priority = (SCHED_FIFO==policy || SCHED_RR==policy) ? priority : 0;
  return pthread_setschedparam(thread, policy, &sched);
#endif
}
static inline pthread_t thread_self(void)
{
#if defined(WIN32)
  pthread_t t;
  t.handle = GetCurrentThread();
  t.id = GetCurrentThreadId();
  return t;
#else
  return pthread_self();
#endif
}
static inline tid_t thread_getid(pthread_t thread)
{
#if defined(WIN32)
  //return GetThreadId(thread.handle); // >= vista
  return thread.id;
#else
  return thread;
#endif
}
static inline int thread_isself(pthread_t thread)
{
#if defined(WIN32)
  return thread.id==GetCurrentThreadId() ? 1 : 0;
#else
  return pthread_equal(pthread_self(), thread);
#endif
}
static inline int thread_valid(pthread_t thread)
{
#if defined(WIN32)
  return 0 != thread.id ? 1 : 0;
#else
  return 0 != thread ? 1 : 0;
#endif
}
static inline int thread_yield(void)
{
#if defined(WIN32)
  // Causes the calling thread to yield execution to another thread that is ready to run 
  // on the current processor. The operating system selects the next thread to be executed.
  return SwitchToThread() ? 0 : -1;
#else
  return sched_yield();
#endif
}
#if defined(WIN32_XP)
typedef DWORD KPRIORITY;
typedef struct _CLIENT_ID
{
  PVOID UniqueProcess;
  PVOID UniqueThread;
} CLIENT_ID, *PCLIENT_ID;
typedef struct _THREAD_BASIC_INFORMATION
{
  NTSTATUS                ExitStatus;
  PVOID                   TebBaseAddress;
  CLIENT_ID               ClientId;
  KAFFINITY               AffinityMask;
  KPRIORITY               Priority;
  KPRIORITY               BasePriority;
} THREAD_BASIC_INFORMATION, *PTHREAD_BASIC_INFORMATION;
typedef NTSTATUS(__stdcall *NtQueryInformationThread)(HANDLE ThreadHandle, int ThreadInformationClass, PVOID ThreadInformation, ULONG ThreadInformationLength, PULONG ReturnLength);
static inline tid_t thread_getid_xp(HANDLE handle)
{
  // NT_TIB* tib = (NT_TIB*)__readfsdword(0x18);
  HMODULE module;
  THREAD_BASIC_INFORMATION tbi;
  memset(&tbi, 0, sizeof(tbi));
  module = GetModuleHandleA("ntdll.dll");
  NtQueryInformationThread fp = (NtQueryInformationThread)GetProcAddress(module, "NtQueryInformationThread");
  fp(handle, 0/*ThreadBasicInformation*/, &tbi, sizeof(tbi), NULL);
  return (tid_t)tbi.ClientId.UniqueThread;
}
#endif
#endif /* !_platform_thread_h_ */

event.h

#ifndef _platform_event_h_
#define _platform_event_h_
#if defined(WIN32)
#include <Windows.h>
typedef HANDLE  event_t;
#else
#include <sys/time.h> // gettimeofday
#include <pthread.h>
#include <unistd.h>
#include <errno.h>
#include <time.h> // clock_gettime
typedef struct
{
  int count; // fixed pthread_cond_signal/pthread_cond_wait call order
  pthread_cond_t event;
  pthread_mutex_t mutex;
} event_t;
#ifndef WAIT_TIMEOUT
#define WAIT_TIMEOUT  ETIMEDOUT
#endif
#endif
/// event: Windows Event/Linux condition variable
/// multi-processor: no
//-------------------------------------------------------------------------------------
// int event_create(event_t* event);
// int event_destroy(event_t* event);
// int event_wait(event_t* event);
// int event_timewait(event_t* event, int timeout);
// int event_signal(event_t* event);
// int event_reset(event_t* event);
//-------------------------------------------------------------------------------------
static inline int event_create(event_t* event)
{
#if defined(WIN32)
    HANDLE h = CreateEvent(NULL, FALSE, FALSE, NULL);//自动复位,切初始无状态
  if(NULL==h)
  return (int)GetLastError();
  *event = h;
  return 0;
#else
  int r;
#if defined(OS_LINUX) && defined(CLOCK_MONOTONIC) && defined(__USE_XOPEN2K)
  pthread_condattr_t attr;
#endif
    pthread_mutexattr_t mutex;
  pthread_mutexattr_init(&mutex);
#if defined(OS_LINUX)
  pthread_mutexattr_settype(&mutex, PTHREAD_MUTEX_RECURSIVE_NP);
#else
  pthread_mutexattr_settype(&mutex, PTHREAD_MUTEX_RECURSIVE);
#endif
  pthread_mutex_init(&event->mutex, &mutex);
#if defined(OS_LINUX) && defined(CLOCK_MONOTONIC) && defined(__USE_XOPEN2K)
  pthread_condattr_init(&attr);
  pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
  r = pthread_cond_init(&event->event, &attr);
  pthread_condattr_destroy(&attr);
#else
  r = pthread_cond_init(&event->event, NULL);
#endif
  event->count = 0;
  return r;
#endif
}
static inline int event_destroy(event_t* event)
{
#if defined(WIN32)
  BOOL r = CloseHandle(*event);
  return r?0:(int)GetLastError();
#else
  int r = pthread_cond_destroy(&event->event);
  while(EBUSY == r)
  {
  usleep(1000);
  r = pthread_cond_destroy(&event->event);
  }
  pthread_mutex_destroy(&event->mutex);
  return r;
#endif
}
// 0-success, other-error
static inline int event_wait(event_t* event)
{
#if defined(WIN32)
  DWORD r = WaitForSingleObjectEx(*event, INFINITE, TRUE);
  return WAIT_FAILED==r ? GetLastError() : r;
#else
  int r = 0;
  pthread_mutex_lock(&event->mutex);
  if(0 == event->count)
  r = pthread_cond_wait(&event->event, &event->mutex); // These functions shall not return an error code of [EINTR].
  event->count = 0;
  pthread_mutex_unlock(&event->mutex);
  return r;
#endif
}
// 0-success, WAIT_TIMEOUT-timeout, other-error
static inline int event_timewait(event_t* event, int timeout)
{
#if defined(WIN32)
  DWORD r = WaitForSingleObjectEx(*event, timeout, TRUE);
  return WAIT_FAILED==r ? GetLastError() : r;
#else
#if defined(OS_LINUX) && defined(CLOCK_REALTIME)
  int r = 0;
  struct timespec ts;
#if defined(CLOCK_MONOTONIC) && defined(__USE_XOPEN2K)
  clock_gettime(CLOCK_MONOTONIC, &ts);
#else
  clock_gettime(CLOCK_REALTIME, &ts);
#endif
  ts.tv_sec += timeout/1000;
  ts.tv_nsec += (timeout%1000)*1000000;
#else
  int r = 0;
  struct timeval tv;
  struct timespec ts;
  gettimeofday(&tv, NULL);
  ts.tv_sec = tv.tv_sec + timeout/1000;
  ts.tv_nsec = tv.tv_usec * 1000 + (timeout%1000)*1000000;
#endif
  // tv_nsec >= 1000000000 ==> EINVAL
  ts.tv_sec += ts.tv_nsec / 1000000000;
  ts.tv_nsec %= 1000000000;
  pthread_mutex_lock(&event->mutex);
  if(0 == event->count)
  r = pthread_cond_timedwait(&event->event, &event->mutex, &ts); // These functions shall not return an error code of [EINTR].
  event->count = 0;
  pthread_mutex_unlock(&event->mutex);
  return r;
#endif
}
static inline int event_signal(event_t* event)
{
#if defined(WIN32)
  return SetEvent(*event)?0:(int)GetLastError();
#else
  int r;
  pthread_mutex_lock(&event->mutex);
  event->count = 1;
  r = pthread_cond_signal(&event->event);
  pthread_mutex_unlock(&event->mutex);
  return r;
#endif
}
static inline int event_reset(event_t* event)
{
#if defined(WIN32)
  return ResetEvent(*event)?0:(int)GetLastError();
#else
  pthread_mutex_lock(&event->mutex);
  event->count = 0;
  pthread_mutex_unlock(&event->mutex);
  return 0;
#endif
}
#endif /* !_platform_event_h_ */

thread-pool.c源文件

#include "thread-pool.h"
#include "locker.h"
#include "system.h"
#include "thread.h"
#include "event.h"
#include <stdlib.h>
#include <string.h>
#include <assert.h>
struct _thread_pool_context_t;
//线程列表
typedef struct _thread_list_t
{
    struct _thread_list_t *next;//下一个线程
    struct _thread_pool_context_t *pool;//所属线程池
    pthread_t thread; //线程id和句柄
} thread_list_t;
//任务队列
typedef struct _thread_task_list_t
{
    struct _thread_task_list_t *next;//下一个任务
    thread_pool_proc proc; //要执行的任务
    void *param;//任务参数
} thread_task_list_t;
typedef struct _thread_pool_context_t
{
    int run;//是否运行的标志
    int idle_max;
    int threshold;
    int thread_count;//线程池中线程的数量
    int thread_count_min;//线程池最小的线程数
    int thread_count_max;//线程池最大的线程数
    int thread_count_idle;//闲置的线程
    int task_count;
    thread_task_list_t *tasks;//任务队列
    thread_task_list_t *recycle_tasks;//回收利用的任务队列(主要是用于内存作用)
    thread_list_t *task_threads;//线程池中的所有线程列表
    locker_t locker;//锁
    event_t event;//事件对象/条件变量
} thread_pool_context_t;
static void thread_pool_destroy_thread(thread_pool_context_t *context);
//线程执行逻辑(所有线程都一样)
static int STDCALL thread_pool_worker(void *param)
{
    thread_list_t* threads;
    thread_task_list_t *task;
    thread_pool_context_t *context;
    threads = (thread_list_t*)param;
    context = threads->pool;
    locker_lock(&context->locker);
    while(context->run)//一直运行中
    {
        // pop task
        task = context->tasks;
        while(task && context->run)//有任务且处于运行中(如果有任务,会一直执行任务)
        {
            // remove task from task list
            context->tasks = task->next;
            --context->task_count;
            // do task procedure
            --context->thread_count_idle;
            locker_unlock(&context->locker);
            task->proc(task->param);//执行任务
            locker_lock(&context->locker);
            ++context->thread_count_idle;
            // recycle task: push task to recycle list 将其放入到回收的任务队列中
            task->next = context->recycle_tasks;
            context->recycle_tasks = task;
            // do next
            task = context->tasks;
        }
        // delete idle thread
        if(context->thread_count_idle > context->idle_max
                || !context->run)
            break;
        // wait for task
        locker_unlock(&context->locker);
        event_timewait(&context->event, 60*1000);//1min内判断是否有任务,没有任务就超时不等待了,有任务了,就去获取任务。(自动变为无状态)
        locker_lock(&context->locker);
    }
    --context->thread_count;
    --context->thread_count_idle;
    thread_pool_destroy_thread(context);
    locker_unlock(&context->locker);
    return 0;
}
/**
 * @brief thread_pool_create_thread 创建线程
 * @param context
 * @return
 */
static thread_list_t* thread_pool_create_thread(thread_pool_context_t *context)
{
    thread_list_t* threads;
    threads = (thread_list_t*)malloc(sizeof(thread_list_t));
    if(!threads)
        return NULL;
    memset(threads, 0, sizeof(thread_list_t));
    threads->pool = context;
    if(0 != thread_create(&threads->thread, thread_pool_worker, threads))
    {
        free(threads);
        return NULL;
    }
    return threads;
}
/**
 * @brief thread_pool_destroy_thread 释放线程
 * @param context 线程池参数
 */
static void thread_pool_destroy_thread(thread_pool_context_t *context)
{
    thread_list_t **head;
    thread_list_t *next;
    head = &context->task_threads;
    while(*head)//从队列中找到要释放的当前线程
    {
        if(thread_isself((*head)->thread))
        {
            next = *head;
            *head = (*head)->next;
            free(next);
            break;
        }
        head = &(*head)->next;
    }
}
/**
 * @brief thread_pool_create_threads 创建所有的线程
 * @param context 线程池对象
 * @param num 创建线程的数量
 */
static void thread_pool_create_threads(thread_pool_context_t *context, 
                                       int num)
{
    int i;
    thread_list_t *threads;
    for(i=0; i<num; i++)
    {
        threads = thread_pool_create_thread(context);
        if(!threads)
            break;
        // add to list head 头插法
        threads->next = context->task_threads;
        context->task_threads = threads;
    }
    context->thread_count += i;
    context->thread_count_idle += i;
}
/**
 * @brief thread_pool_destroy_threads 删除所有线程,没有使用
 * @param threads
 */
static void thread_pool_destroy_threads(thread_list_t *threads)
{
    thread_list_t *next;
    while(threads)
    {
        next = threads->next;
        thread_destroy(threads->thread);
        free(threads);
        threads = next;
    }
}
/**
 * @brief thread_pool_create_task 往线程池中插入任务
 * @param context
 * @param proc
 * @param param
 * @return
 */
static thread_task_list_t* thread_pool_create_task(thread_pool_context_t *context,
                                                   thread_pool_proc proc,
                                                   void* param)
{
    thread_task_list_t *task;
    if(context->recycle_tasks)
    {
        task = context->recycle_tasks;
        context->recycle_tasks = context->recycle_tasks->next;
    }
    else
    {
        task = (thread_task_list_t*)malloc(sizeof(thread_task_list_t));
    }
    if(!task)
        return NULL;
    memset(task, 0, sizeof(thread_task_list_t));//清空内容
    task->param = param;
    task->proc = proc;
    return task;
}
/**
 * @brief thread_pool_destroy_tasks 销毁任务队列
 * @param tasks 任务队列
 */
static void thread_pool_destroy_tasks(thread_task_list_t *tasks)
{
    thread_task_list_t *next;
    while(tasks)
    {
        next = tasks->next;
        free(tasks);
        tasks = next;
    }
}
thread_pool_t thread_pool_create(int num, int min, int max)
{
    thread_pool_context_t *ctx;
    ctx = (thread_pool_context_t*)malloc(sizeof(thread_pool_context_t));
    if(!ctx)
        return NULL;
    memset(ctx, 0, sizeof(thread_pool_context_t));
    ctx->thread_count_min = min;
    ctx->thread_count_max = max;
    ctx->threshold = num / 2;
    ctx->idle_max = num;
    ctx->run = 1;//代表运行
    if(0 != locker_create(&ctx->locker))
    {
        free(ctx);
        return NULL;
    }
    if(0 != event_create(&ctx->event))
    {
        locker_destroy(&ctx->locker);
        free(ctx);
        return NULL;
    }
    thread_pool_create_threads(ctx, num);//创建所有的线程
    return ctx;
}
void thread_pool_destroy(thread_pool_t pool)
{
    thread_pool_context_t *ctx;
    ctx = (thread_pool_context_t*)pool;
    ctx->run = 0;
    locker_lock(&ctx->locker);
    while(ctx->thread_count)
    {
        event_signal(&ctx->event);//通知所有线程退出
        locker_unlock(&ctx->locker);
        system_sleep(100);
        locker_lock(&ctx->locker);
    }
    locker_unlock(&ctx->locker);
    //thread_pool_destroy_threads(ctx->task_threads);
    thread_pool_destroy_tasks(ctx->recycle_tasks);
    thread_pool_destroy_tasks(ctx->tasks);
    event_destroy(&ctx->event);
    locker_destroy(&ctx->locker);
    free(ctx);
}
int thread_pool_threads_count(thread_pool_t pool)
{
    thread_pool_context_t *ctx;
    ctx = (thread_pool_context_t*)pool;
    return ctx->thread_count;
}
int thread_pool_push(thread_pool_t pool, thread_pool_proc proc, void *param)
{
    thread_task_list_t *task;
    thread_pool_context_t *context;
    context = (thread_pool_context_t*)pool;
    locker_lock(&context->locker);
    task = thread_pool_create_task(context, proc, param);//创建任务
    if(!task)
    {
        locker_unlock(&context->locker);
        return -1;
    }
    // 添加到任务队列中
    task->next = context->tasks;
    context->tasks = task;
    ++context->task_count;
    // 添加新线程来处理任务(如果没有闲置线程,并且还没达到最大线程数)
    if(context->thread_count_idle<1
            && context->thread_count<context->thread_count_max)
        thread_pool_create_threads(context, 1);
    event_signal(&context->event);//通知线程有任务了
    locker_unlock(&context->locker);
    return 0;
}
相关文章
|
6天前
|
安全 Python
告别低效编程!Python线程与进程并发技术详解,让你的代码飞起来!
【7月更文挑战第9天】Python并发编程提升效率:**理解并发与并行,线程借助`threading`模块处理IO密集型任务,受限于GIL;进程用`multiprocessing`实现并行,绕过GIL限制。示例展示线程和进程创建及同步。选择合适模型,注意线程安全,利用多核,优化性能,实现高效并发编程。
20 3
|
12天前
|
调度 C语言
深入浅出:C语言线程以及线程锁
线程锁的基本思想是,只有一个线程能持有锁,其他试图获取锁的线程将被阻塞,直到锁被释放。这样,锁就确保了在任何时刻,只有一个线程能够访问临界区(即需要保护的代码段或数据),从而保证了数据的完整性和一致性。 线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。一个进程可以包含一个或多个线程,而每个线程都有自己的指令指针和寄存器状态,它们共享进程的资源,如内存空间、文件句柄和网络连接等。 线程锁的概念
|
21天前
|
机器学习/深度学习 C语言 Windows
C语言的管理系统代码
C语言学生宿舍管理系统代码
|
20天前
|
算法 编译器 C语言
猜数字游戏C语言代码实现
猜数字游戏C语言代码实现
|
21天前
|
自然语言处理 Ubuntu 编译器
|
21天前
|
存储 机器学习/深度学习 编译器
C语言代码学习笔记
<编程精粹:编写高质量C语言代码> 读书笔记
|
21天前
|
JavaScript C语言
|
20天前
|
存储 安全 Serverless
扫雷游戏C语言代码实现——万字长文超详细,手把手教你实现,新手也能学会
扫雷游戏C语言代码实现——万字长文超详细,手把手教你实现,新手也能学会
|
21天前
|
C语言
C语言练习代码第一篇
C语言练习代码第一篇
|
21天前
|
C语言 图形学 C++