说明
github 地址:常用的C工具代码,这里的工具包含了C语言实现的线程池,hashtable,list,md5,字符串操作,消息队列等很多常用的工具,我这里就不一一说明了,感兴趣的朋友可以自行下载研究,工作中肯定用的上。
这里的代码是直接拷贝的,直接复制到自己的项目就能使用。
如果对线程池不是很熟悉的朋友,或者只需要支持linux的版本线程池,可以参考我另一篇博客的简单版本linux版本的线程池.
代码
thread-pool.h头文件
#ifndef _threadpool_h_ #define _threadpool_h_ #ifdef __cplusplus extern "C" { #endif typedef void* thread_pool_t; /** * @brief thread_pool_create 创建线程池 * @param num 初始化的线程个数 * @param min 最小的线程个数 * @param max 最大的线程个数 * @return 0-error, other-thread pool id */ thread_pool_t thread_pool_create(int num, int min, int max); /** * @brief thread_pool_destroy 销毁线程池 * @param pool */ void thread_pool_destroy(thread_pool_t pool); /** * @brief 获取线程池中的线程个数 * @param pool * @return <0-error code, >=0-thread count */ int thread_pool_threads_count(thread_pool_t pool); ///任务回调函数 typedef void (*thread_pool_proc)(void *param); /** * @brief thread_pool_push 往线程池中放入一个任务 * @param pool 线程池对象 * @param proc 任务的函数指针 * @param param 任务自定义函数参数 * @return =0-ok, <0-error code */ int thread_pool_push(thread_pool_t pool, thread_pool_proc proc, void *param); #ifdef __cplusplus } #endif #endif /* !_threadpool_h_ */
依赖头文件
locker.h
#ifndef _platform_locker_h_ #define _platform_locker_h_ #include <errno.h> #if defined(WIN32) #include <Windows.h> typedef CRITICAL_SECTION locker_t; #else #include <pthread.h> typedef pthread_mutex_t locker_t; #endif //------------------------------------------------------------------------------------- // int locker_create(locker_t* locker); // int locker_destroy(locker_t* locker); // int locker_lock(locker_t* locker); // int locker_unlock(locker_t* locker); // int locker_trylock(locker_t* locker); //------------------------------------------------------------------------------------- static inline int locker_create(locker_t* locker) { #if defined(WIN32) InitializeCriticalSection(locker); return 0; #else // create a recusive locker int r; pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); // http://linux.die.net/man/3/pthread_mutexattr_settype // Application Usage: // It is advised that an application should not use a PTHREAD_MUTEX_RECURSIVE mutex // with condition variables because the implicit unlock performed for a pthread_cond_timedwait() // or pthread_cond_wait() may not actually release the mutex (if it had been locked multiple times). // If this happens, no other thread can satisfy the condition of the predicate. #if defined(OS_LINUX) && defined(__GLIBC__) pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE_NP); //pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK_NP); #else pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); //pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK); #endif r = pthread_mutex_init(locker, &attr); pthread_mutexattr_destroy(&attr); return r; #endif } static inline int locker_destroy(locker_t* locker) { #if defined(WIN32) DeleteCriticalSection(locker); return 0; #else return pthread_mutex_destroy(locker); #endif } static inline int locker_lock(locker_t* locker) { #if defined(WIN32) EnterCriticalSection(locker); return 0; #else // These functions shall not return an error code of [EINTR]. return pthread_mutex_lock(locker); #endif } // linux: unlock thread must is the lock thread static inline int locker_unlock(locker_t* locker) { #if defined(WIN32) LeaveCriticalSection(locker); return 0; #else return pthread_mutex_unlock(locker); #endif } static inline int locker_trylock(locker_t* locker) { #if defined(WIN32) return TryEnterCriticalSection(locker)?0:-1; #else return pthread_mutex_trylock(locker); #endif } #endif /* !_platform_locker_h_ */
system.h
#ifndef _platform_system_h_ #define _platform_system_h_ #include <stdint.h> #if defined(WIN32) #include <Windows.h> typedef HMODULE module_t; typedef uint32_t useconds_t; typedef FARPROC funcptr_t; #else #include <sys/types.h> #include <sys/utsname.h> #include <unistd.h> #include <pthread.h> #include <dlfcn.h> #include <errno.h> #include <time.h> typedef void* module_t; typedef void (*funcptr_t)(void); #endif #if defined(OS_MAC) #include <sys/param.h> #include <sys/sysctl.h> #include <mach/mach_time.h> #endif #include <stdint.h> #include <stdio.h> //----------------------------------------------------------------------- // void system_sleep(useconds_t millisecond); // uint64_t system_time(void); // uint64_t system_clock(void); // int64_t system_getcyclecount(void); // size_t system_getcpucount(void); // // int system_version(int* major, int* minor); // module_t system_load(const char* module); // int system_unload(module_t module); // funcptr_t system_getproc(module_t module, const char* producer); //----------------------------------------------------------------------- // /// /// implement /// // static inline void system_sleep(useconds_t milliseconds) { #if defined(WIN32) Sleep(milliseconds); #else usleep(milliseconds*1000); #endif } static inline size_t system_getcpucount(void) { #if defined(WIN32) SYSTEM_INFO sysinfo; GetSystemInfo(&sysinfo); return sysinfo.dwNumberOfProcessors; #elif defined(OS_MAC) || defined(_FREEBSD_) || defined(_NETBSD_) || defined(_OPENBSD_) // FreeBSD, MacOS X, NetBSD, OpenBSD, etc.: int mib[4]; size_t num; size_t len; mib[0] = CTL_HW; mib[1] = HW_AVAILCPU; // alternatively, try HW_NCPU; num = 0; len = sizeof(num); sysctl(mib, 2, &num, &len, NULL, 0); if(num < 1) { mib[1] = HW_NCPU; sysctl(mib, 2, &num, &len, NULL, 0); if(num < 1) num = 1; } return num; #elif defined(_HPUX_) // HPUX: return mpctl(MPC_GETNUMSPUS, NULL, NULL); #elif defined(_IRIX_) // IRIX: return sysconf(_SC_NPROC_ONLN); #else // linux, Solaris, & AIX return sysconf(_SC_NPROCESSORS_ONLN); //"cat /proc/cpuinfo | grep processor | wc -l" #endif } static inline int64_t system_getcyclecount(void) { #if defined(WIN32) LARGE_INTEGER freq; LARGE_INTEGER count; QueryPerformanceCounter(&count); QueryPerformanceFrequency(&freq); #else #endif return 0; } /// milliseconds since the Epoch(1970-01-01 00:00:00 +0000 (UTC)) static inline uint64_t system_time(void) { #if defined(WIN32) uint64_t t; FILETIME ft; GetSystemTimeAsFileTime(&ft); t = (uint64_t)ft.dwHighDateTime << 32 | ft.dwLowDateTime; return t / 10000 - 11644473600000ULL; /* Jan 1, 1601 */ #elif defined(OS_MAC) uint64_t tick; mach_timebase_info_data_t timebase; tick = mach_absolute_time(); mach_timebase_info(&timebase); return tick * timebase.numer / timebase.denom / 1000000; #else #if defined(CLOCK_REALTIME) struct timespec tp; clock_gettime(CLOCK_REALTIME, &tp); return (uint64_t)tp.tv_sec * 1000 + tp.tv_nsec / 1000000; #else // POSIX.1-2008 marks gettimeofday() as obsolete, recommending the use of clock_gettime(2) instead. struct timeval tv; gettimeofday(&tv, NULL); return (uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000; #endif #endif } ///@return milliseconds(relative time) static inline uint64_t system_clock(void) { #if defined(WIN32) LARGE_INTEGER freq; LARGE_INTEGER count; QueryPerformanceFrequency(&freq); QueryPerformanceCounter(&count); return (uint64_t)count.QuadPart * 1000 / freq.QuadPart; #elif defined(OS_MAC) uint64_t tick; mach_timebase_info_data_t timebase; tick = mach_absolute_time(); mach_timebase_info(&timebase); return tick * timebase.numer / timebase.denom / 1000000; #else #if defined(CLOCK_MONOTONIC) struct timespec tp; clock_gettime(CLOCK_MONOTONIC, &tp); return (uint64_t)tp.tv_sec * 1000 + tp.tv_nsec / 1000000; #else // POSIX.1-2008 marks gettimeofday() as obsolete, recommending the use of clock_gettime(2) instead. struct timeval tv; gettimeofday(&tv, NULL); return (uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000; #endif #endif } #if defined(_MSC_VER) #pragma warning(push) #pragma warning(disable: 4996) // GetVersionEx #pragma warning(disable: 28159) #endif static inline int system_version(int* major, int* minor) { #if defined(WIN32) /* Operating system Version number Windows 8 6.2 Windows Server 2012 6.2 Windows 7 6.1 Windows Server 2008 R2 6.1 Windows Server 2008 6.0 Windows Vista 6.0 Windows Server 2003 R2 5.2 Windows Server 2003 5.2 Windows XP 5.1 Windows 2000 5.0 Windows Me 4.90 Windows 98 4.10 Windows NT 4.0 4.0 Windows 95 4.0 */ OSVERSIONINFO version; memset(&version, 0, sizeof(version)); version.dwOSVersionInfoSize = sizeof(version); GetVersionEx(&version); *major = (int)(version.dwMajorVersion); *minor = (int)(version.dwMinorVersion); return 0; #else struct utsname ver; if(0 != uname(&ver)) return errno; if(2!=sscanf(ver.release, "%8d.%8d", major, minor)) return -1; return 0; #endif } #if defined(_MSC_VER) #pragma warning(pop) #endif // /// /// dynamic module load/unload /// // static inline module_t system_load(const char* module) { #if defined(WIN32) return LoadLibraryExA(module, NULL, LOAD_WITH_ALTERED_SEARCH_PATH); #else return dlopen(module, RTLD_LAZY|RTLD_LOCAL); #endif } static inline int system_unload(module_t module) { #if defined(WIN32) return FreeLibrary(module); #else return dlclose(module); #endif } static inline funcptr_t system_getproc(module_t module, const char* producer) { #if defined(WIN32) return GetProcAddress(module, producer); #else // https://linux.die.net/man/3/dlsym // cosine = (double (*)(double)) dlsym(handle, "cos") // ===> *(void **) (&cosine) = dlsym(handle, "cos"); return (funcptr_t)dlsym(module, producer); #endif } #endif /* !_platform_system_h_ */
thread.h
#ifndef _platform_thread_h_ #define _platform_thread_h_ #if defined(WIN32) #include <Windows.h> #include <process.h> #ifndef STDCALL #define STDCALL __stdcall #endif typedef struct { DWORD id; HANDLE handle; } pthread_t; typedef DWORD tid_t; #else #include <pthread.h> #include <sched.h> typedef pthread_t tid_t; #ifndef STDCALL #define STDCALL #endif enum thread_priority { THREAD_PRIORITY_IDLE = 1, THREAD_PRIORITY_LOWEST = 25, THREAD_PRIORITY_BELOW_NORMAL = 40, THREAD_PRIORITY_NORMAL = 50, THREAD_PRIORITY_ABOVE_NORMAL = 60, THREAD_PRIORITY_HIGHEST = 75, THREAD_PRIORITY_TIME_CRITICAL = 99, }; #endif //------------------------------------------------------------------------------------- // int thread_create(pthread_t* thread, thread_proc func, void* param); // int thread_destroy(pthread_t thread); // int thread_detach(pthread_t thread); // int thread_getpriority(pthread_t thread, int* priority); // int thread_setpriority(pthread_t thread, int priority); // int thread_isself(pthread_t thread); // int thread_valid(pthread_t thread); // int thread_yield(void); // tid_t thread_getid(pthread_t thread); // pthread_t thread_self(void); //------------------------------------------------------------------------------------- typedef int (STDCALL *thread_proc)(void* param); static inline int thread_create2(pthread_t* thread, unsigned int stacksize, thread_proc func, void* param) { #if defined(WIN32) // https://msdn.microsoft.com/en-us/library/windows/desktop/ms682453.aspx // CreateThread function: By default, every thread has one megabyte of stack space. // http://msdn.microsoft.com/en-us/library/windows/desktop/ms682453%28v=vs.85%29.aspx // A thread in an executable that calls the C run-time library (CRT) // should use the _beginthreadex and _endthreadex functions for thread management // rather than CreateThread and ExitThread; //thread->handle = CreateThread(NULL, stacksize, (LPTHREAD_START_ROUTINE)func, param, 0, &thread->id); typedef unsigned int(__stdcall *thread_routine)(void *); thread->handle = (HANDLE)_beginthreadex(NULL, stacksize, (thread_routine)func, param, 0, (unsigned int*)&thread->id); return NULL == thread->handle ? -1 : 0; #else // https://linux.die.net/man/3/pthread_create // On Linux/x86-32, the default stack size for a new thread is 2 megabytes(10M 64bits) // http://udrepper.livejournal.com/20948.html // mallopt(M_ARENA_MAX, cpu); // limit multithread virtual memory typedef void* (*linux_thread_routine)(void*); int r; pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setstacksize(&attr, stacksize); r = pthread_create(thread, &attr, (linux_thread_routine)func, param); pthread_attr_destroy(&attr); return r; #endif } static inline int thread_create(pthread_t* thread, thread_proc func, void* param) { return thread_create2(thread, 0, func, param); } static inline int thread_destroy(pthread_t thread) { #if defined(WIN32) if(thread.id != GetCurrentThreadId()) WaitForSingleObjectEx(thread.handle, INFINITE, TRUE); CloseHandle(thread.handle); return 0; #else void* value = NULL; if(pthread_equal(pthread_self(),thread)) return pthread_detach(thread); else return pthread_join(thread, &value); #endif } static inline int thread_detach(pthread_t thread) { #if defined(WIN32) CloseHandle(thread.handle); return 0; #else return pthread_detach(thread); #endif } // priority: [-15, 15] // 0: normal / -15: idle / 15: critical static inline int thread_getpriority(pthread_t thread, int* priority) { #if defined(WIN32) int r = GetThreadPriority(thread.handle); if(THREAD_PRIORITY_ERROR_RETURN == r) return (int)GetLastError(); *priority = r; return 0; #else int policy; struct sched_param sched; int r = pthread_getschedparam(thread, &policy, &sched); if(0 == r) *priority = sched.sched_priority; return r; #endif } static inline int thread_setpriority(pthread_t thread, int priority) { #if defined(WIN32) BOOL r = SetThreadPriority(thread.handle, priority); return TRUE==r?1:0; #else int policy = SCHED_RR; struct sched_param sched; pthread_getschedparam(thread, &policy, &sched); // For processes scheduled under one of the normal scheduling policies // (SCHED_OTHER, SCHED_IDLE, SCHED_BATCH), // sched_priority is not used in scheduling decisions (it must be specified as 0). // Processes scheduled under one of the real-time policies(SCHED_FIFO, SCHED_RR) // have a sched_priority value in the range 1 (low)to 99 (high) sched.sched_priority = (SCHED_FIFO==policy || SCHED_RR==policy) ? priority : 0; return pthread_setschedparam(thread, policy, &sched); #endif } static inline pthread_t thread_self(void) { #if defined(WIN32) pthread_t t; t.handle = GetCurrentThread(); t.id = GetCurrentThreadId(); return t; #else return pthread_self(); #endif } static inline tid_t thread_getid(pthread_t thread) { #if defined(WIN32) //return GetThreadId(thread.handle); // >= vista return thread.id; #else return thread; #endif } static inline int thread_isself(pthread_t thread) { #if defined(WIN32) return thread.id==GetCurrentThreadId() ? 1 : 0; #else return pthread_equal(pthread_self(), thread); #endif } static inline int thread_valid(pthread_t thread) { #if defined(WIN32) return 0 != thread.id ? 1 : 0; #else return 0 != thread ? 1 : 0; #endif } static inline int thread_yield(void) { #if defined(WIN32) // Causes the calling thread to yield execution to another thread that is ready to run // on the current processor. The operating system selects the next thread to be executed. return SwitchToThread() ? 0 : -1; #else return sched_yield(); #endif } #if defined(WIN32_XP) typedef DWORD KPRIORITY; typedef struct _CLIENT_ID { PVOID UniqueProcess; PVOID UniqueThread; } CLIENT_ID, *PCLIENT_ID; typedef struct _THREAD_BASIC_INFORMATION { NTSTATUS ExitStatus; PVOID TebBaseAddress; CLIENT_ID ClientId; KAFFINITY AffinityMask; KPRIORITY Priority; KPRIORITY BasePriority; } THREAD_BASIC_INFORMATION, *PTHREAD_BASIC_INFORMATION; typedef NTSTATUS(__stdcall *NtQueryInformationThread)(HANDLE ThreadHandle, int ThreadInformationClass, PVOID ThreadInformation, ULONG ThreadInformationLength, PULONG ReturnLength); static inline tid_t thread_getid_xp(HANDLE handle) { // NT_TIB* tib = (NT_TIB*)__readfsdword(0x18); HMODULE module; THREAD_BASIC_INFORMATION tbi; memset(&tbi, 0, sizeof(tbi)); module = GetModuleHandleA("ntdll.dll"); NtQueryInformationThread fp = (NtQueryInformationThread)GetProcAddress(module, "NtQueryInformationThread"); fp(handle, 0/*ThreadBasicInformation*/, &tbi, sizeof(tbi), NULL); return (tid_t)tbi.ClientId.UniqueThread; } #endif #endif /* !_platform_thread_h_ */
event.h
#ifndef _platform_event_h_ #define _platform_event_h_ #if defined(WIN32) #include <Windows.h> typedef HANDLE event_t; #else #include <sys/time.h> // gettimeofday #include <pthread.h> #include <unistd.h> #include <errno.h> #include <time.h> // clock_gettime typedef struct { int count; // fixed pthread_cond_signal/pthread_cond_wait call order pthread_cond_t event; pthread_mutex_t mutex; } event_t; #ifndef WAIT_TIMEOUT #define WAIT_TIMEOUT ETIMEDOUT #endif #endif /// event: Windows Event/Linux condition variable /// multi-processor: no //------------------------------------------------------------------------------------- // int event_create(event_t* event); // int event_destroy(event_t* event); // int event_wait(event_t* event); // int event_timewait(event_t* event, int timeout); // int event_signal(event_t* event); // int event_reset(event_t* event); //------------------------------------------------------------------------------------- static inline int event_create(event_t* event) { #if defined(WIN32) HANDLE h = CreateEvent(NULL, FALSE, FALSE, NULL);//自动复位,切初始无状态 if(NULL==h) return (int)GetLastError(); *event = h; return 0; #else int r; #if defined(OS_LINUX) && defined(CLOCK_MONOTONIC) && defined(__USE_XOPEN2K) pthread_condattr_t attr; #endif pthread_mutexattr_t mutex; pthread_mutexattr_init(&mutex); #if defined(OS_LINUX) pthread_mutexattr_settype(&mutex, PTHREAD_MUTEX_RECURSIVE_NP); #else pthread_mutexattr_settype(&mutex, PTHREAD_MUTEX_RECURSIVE); #endif pthread_mutex_init(&event->mutex, &mutex); #if defined(OS_LINUX) && defined(CLOCK_MONOTONIC) && defined(__USE_XOPEN2K) pthread_condattr_init(&attr); pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); r = pthread_cond_init(&event->event, &attr); pthread_condattr_destroy(&attr); #else r = pthread_cond_init(&event->event, NULL); #endif event->count = 0; return r; #endif } static inline int event_destroy(event_t* event) { #if defined(WIN32) BOOL r = CloseHandle(*event); return r?0:(int)GetLastError(); #else int r = pthread_cond_destroy(&event->event); while(EBUSY == r) { usleep(1000); r = pthread_cond_destroy(&event->event); } pthread_mutex_destroy(&event->mutex); return r; #endif } // 0-success, other-error static inline int event_wait(event_t* event) { #if defined(WIN32) DWORD r = WaitForSingleObjectEx(*event, INFINITE, TRUE); return WAIT_FAILED==r ? GetLastError() : r; #else int r = 0; pthread_mutex_lock(&event->mutex); if(0 == event->count) r = pthread_cond_wait(&event->event, &event->mutex); // These functions shall not return an error code of [EINTR]. event->count = 0; pthread_mutex_unlock(&event->mutex); return r; #endif } // 0-success, WAIT_TIMEOUT-timeout, other-error static inline int event_timewait(event_t* event, int timeout) { #if defined(WIN32) DWORD r = WaitForSingleObjectEx(*event, timeout, TRUE); return WAIT_FAILED==r ? GetLastError() : r; #else #if defined(OS_LINUX) && defined(CLOCK_REALTIME) int r = 0; struct timespec ts; #if defined(CLOCK_MONOTONIC) && defined(__USE_XOPEN2K) clock_gettime(CLOCK_MONOTONIC, &ts); #else clock_gettime(CLOCK_REALTIME, &ts); #endif ts.tv_sec += timeout/1000; ts.tv_nsec += (timeout%1000)*1000000; #else int r = 0; struct timeval tv; struct timespec ts; gettimeofday(&tv, NULL); ts.tv_sec = tv.tv_sec + timeout/1000; ts.tv_nsec = tv.tv_usec * 1000 + (timeout%1000)*1000000; #endif // tv_nsec >= 1000000000 ==> EINVAL ts.tv_sec += ts.tv_nsec / 1000000000; ts.tv_nsec %= 1000000000; pthread_mutex_lock(&event->mutex); if(0 == event->count) r = pthread_cond_timedwait(&event->event, &event->mutex, &ts); // These functions shall not return an error code of [EINTR]. event->count = 0; pthread_mutex_unlock(&event->mutex); return r; #endif } static inline int event_signal(event_t* event) { #if defined(WIN32) return SetEvent(*event)?0:(int)GetLastError(); #else int r; pthread_mutex_lock(&event->mutex); event->count = 1; r = pthread_cond_signal(&event->event); pthread_mutex_unlock(&event->mutex); return r; #endif } static inline int event_reset(event_t* event) { #if defined(WIN32) return ResetEvent(*event)?0:(int)GetLastError(); #else pthread_mutex_lock(&event->mutex); event->count = 0; pthread_mutex_unlock(&event->mutex); return 0; #endif } #endif /* !_platform_event_h_ */
thread-pool.c源文件
#include "thread-pool.h" #include "locker.h" #include "system.h" #include "thread.h" #include "event.h" #include <stdlib.h> #include <string.h> #include <assert.h> struct _thread_pool_context_t; //线程列表 typedef struct _thread_list_t { struct _thread_list_t *next;//下一个线程 struct _thread_pool_context_t *pool;//所属线程池 pthread_t thread; //线程id和句柄 } thread_list_t; //任务队列 typedef struct _thread_task_list_t { struct _thread_task_list_t *next;//下一个任务 thread_pool_proc proc; //要执行的任务 void *param;//任务参数 } thread_task_list_t; typedef struct _thread_pool_context_t { int run;//是否运行的标志 int idle_max; int threshold; int thread_count;//线程池中线程的数量 int thread_count_min;//线程池最小的线程数 int thread_count_max;//线程池最大的线程数 int thread_count_idle;//闲置的线程 int task_count; thread_task_list_t *tasks;//任务队列 thread_task_list_t *recycle_tasks;//回收利用的任务队列(主要是用于内存作用) thread_list_t *task_threads;//线程池中的所有线程列表 locker_t locker;//锁 event_t event;//事件对象/条件变量 } thread_pool_context_t; static void thread_pool_destroy_thread(thread_pool_context_t *context); //线程执行逻辑(所有线程都一样) static int STDCALL thread_pool_worker(void *param) { thread_list_t* threads; thread_task_list_t *task; thread_pool_context_t *context; threads = (thread_list_t*)param; context = threads->pool; locker_lock(&context->locker); while(context->run)//一直运行中 { // pop task task = context->tasks; while(task && context->run)//有任务且处于运行中(如果有任务,会一直执行任务) { // remove task from task list context->tasks = task->next; --context->task_count; // do task procedure --context->thread_count_idle; locker_unlock(&context->locker); task->proc(task->param);//执行任务 locker_lock(&context->locker); ++context->thread_count_idle; // recycle task: push task to recycle list 将其放入到回收的任务队列中 task->next = context->recycle_tasks; context->recycle_tasks = task; // do next task = context->tasks; } // delete idle thread if(context->thread_count_idle > context->idle_max || !context->run) break; // wait for task locker_unlock(&context->locker); event_timewait(&context->event, 60*1000);//1min内判断是否有任务,没有任务就超时不等待了,有任务了,就去获取任务。(自动变为无状态) locker_lock(&context->locker); } --context->thread_count; --context->thread_count_idle; thread_pool_destroy_thread(context); locker_unlock(&context->locker); return 0; } /** * @brief thread_pool_create_thread 创建线程 * @param context * @return */ static thread_list_t* thread_pool_create_thread(thread_pool_context_t *context) { thread_list_t* threads; threads = (thread_list_t*)malloc(sizeof(thread_list_t)); if(!threads) return NULL; memset(threads, 0, sizeof(thread_list_t)); threads->pool = context; if(0 != thread_create(&threads->thread, thread_pool_worker, threads)) { free(threads); return NULL; } return threads; } /** * @brief thread_pool_destroy_thread 释放线程 * @param context 线程池参数 */ static void thread_pool_destroy_thread(thread_pool_context_t *context) { thread_list_t **head; thread_list_t *next; head = &context->task_threads; while(*head)//从队列中找到要释放的当前线程 { if(thread_isself((*head)->thread)) { next = *head; *head = (*head)->next; free(next); break; } head = &(*head)->next; } } /** * @brief thread_pool_create_threads 创建所有的线程 * @param context 线程池对象 * @param num 创建线程的数量 */ static void thread_pool_create_threads(thread_pool_context_t *context, int num) { int i; thread_list_t *threads; for(i=0; i<num; i++) { threads = thread_pool_create_thread(context); if(!threads) break; // add to list head 头插法 threads->next = context->task_threads; context->task_threads = threads; } context->thread_count += i; context->thread_count_idle += i; } /** * @brief thread_pool_destroy_threads 删除所有线程,没有使用 * @param threads */ static void thread_pool_destroy_threads(thread_list_t *threads) { thread_list_t *next; while(threads) { next = threads->next; thread_destroy(threads->thread); free(threads); threads = next; } } /** * @brief thread_pool_create_task 往线程池中插入任务 * @param context * @param proc * @param param * @return */ static thread_task_list_t* thread_pool_create_task(thread_pool_context_t *context, thread_pool_proc proc, void* param) { thread_task_list_t *task; if(context->recycle_tasks) { task = context->recycle_tasks; context->recycle_tasks = context->recycle_tasks->next; } else { task = (thread_task_list_t*)malloc(sizeof(thread_task_list_t)); } if(!task) return NULL; memset(task, 0, sizeof(thread_task_list_t));//清空内容 task->param = param; task->proc = proc; return task; } /** * @brief thread_pool_destroy_tasks 销毁任务队列 * @param tasks 任务队列 */ static void thread_pool_destroy_tasks(thread_task_list_t *tasks) { thread_task_list_t *next; while(tasks) { next = tasks->next; free(tasks); tasks = next; } } thread_pool_t thread_pool_create(int num, int min, int max) { thread_pool_context_t *ctx; ctx = (thread_pool_context_t*)malloc(sizeof(thread_pool_context_t)); if(!ctx) return NULL; memset(ctx, 0, sizeof(thread_pool_context_t)); ctx->thread_count_min = min; ctx->thread_count_max = max; ctx->threshold = num / 2; ctx->idle_max = num; ctx->run = 1;//代表运行 if(0 != locker_create(&ctx->locker)) { free(ctx); return NULL; } if(0 != event_create(&ctx->event)) { locker_destroy(&ctx->locker); free(ctx); return NULL; } thread_pool_create_threads(ctx, num);//创建所有的线程 return ctx; } void thread_pool_destroy(thread_pool_t pool) { thread_pool_context_t *ctx; ctx = (thread_pool_context_t*)pool; ctx->run = 0; locker_lock(&ctx->locker); while(ctx->thread_count) { event_signal(&ctx->event);//通知所有线程退出 locker_unlock(&ctx->locker); system_sleep(100); locker_lock(&ctx->locker); } locker_unlock(&ctx->locker); //thread_pool_destroy_threads(ctx->task_threads); thread_pool_destroy_tasks(ctx->recycle_tasks); thread_pool_destroy_tasks(ctx->tasks); event_destroy(&ctx->event); locker_destroy(&ctx->locker); free(ctx); } int thread_pool_threads_count(thread_pool_t pool) { thread_pool_context_t *ctx; ctx = (thread_pool_context_t*)pool; return ctx->thread_count; } int thread_pool_push(thread_pool_t pool, thread_pool_proc proc, void *param) { thread_task_list_t *task; thread_pool_context_t *context; context = (thread_pool_context_t*)pool; locker_lock(&context->locker); task = thread_pool_create_task(context, proc, param);//创建任务 if(!task) { locker_unlock(&context->locker); return -1; } // 添加到任务队列中 task->next = context->tasks; context->tasks = task; ++context->task_count; // 添加新线程来处理任务(如果没有闲置线程,并且还没达到最大线程数) if(context->thread_count_idle<1 && context->thread_count<context->thread_count_max) thread_pool_create_threads(context, 1); event_signal(&context->event);//通知线程有任务了 locker_unlock(&context->locker); return 0; }