Marine Library  1.0
C++ library for Linux Networking Development
threads.hh
1 #ifndef DOZERG_THREADS_H_20130222
2 #define DOZERG_THREADS_H_20130222
3 
4 /*
5  POSIX线程封装
6  CThreadAttr 线程的属性 CThread 单个线程 CThreadPool 固定数目的线程池 CThreadManager 自动伸缩的线程池 //*/ #include <pthread.h> #include <vector> #include <list> #include <cassert> #include "lock_int.hh" #include "lock_queue.hh" #include "logger.hh" #include "atomic_sync.hh" #include "impl/threads_impl.hh" NS_SERVER_BEGIN class CThread; class CThreadAttr { friend class CThread; typedef CThreadAttr __Myt; public: CThreadAttr() throw(std::runtime_error){ THROW_RT_IF_FAIL(::pthread_attr_init(&a_)); } ~CThreadAttr(){ ::pthread_attr_destroy(&a_); } //设置/获取栈大小(字节) void stackSize(size_t sz) throw(std::runtime_error){ THROW_RT_IF_FAIL(::pthread_attr_setstacksize(&a_, sz)); } size_t stackSize() const throw(std::runtime_error){ size_t sz = 0; THROW_RT_IF_FAIL(::pthread_attr_getstacksize(&a_, &sz)); return sz; } //设置/获取栈保护空间大小(字节) void guardSize(size_t sz) throw(std::runtime_error){ THROW_RT_IF_FAIL(::pthread_attr_setguardsize(&a_, sz)); } size_t guardSize() const throw(std::runtime_error){ size_t sz = 0; THROW_RT_IF_FAIL(::pthread_attr_getguardsize(&a_, &sz)); return sz; } //设置/获取线程状态 void detach(bool on) throw(std::runtime_error){ THROW_RT_IF_FAIL(::pthread_attr_setdetachstate(&a_ , (on ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE))); } bool detach() const throw(std::runtime_error){ int state = 0; THROW_RT_IF_FAIL(::pthread_attr_getdetachstate(&a_, &state)); return (PTHREAD_CREATE_DETACHED == state); } #ifdef __API_HAS_PTHREAD_ATTR_SETSTACK //设置/获取栈地址和大小 void stack(void * addr, size_t sz) throw(std::runtime_error){ THROW_RT_IF_FAIL(::pthread_attr_setstack(&a_, addr, sz)); } void * stack(size_t * sz) const throw(std::runtime_error){ void * addr = NULL; size_t s = 0; THROW_RT_IF_FAIL(::pthread_attr_getstack(&a_, &addr, &s)); if(sz) *sz = s; return addr; } #endif private: CThreadAttr(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); pthread_attr_t * a(){return &a_;} const pthread_attr_t * a() const{return &a_;} //fields pthread_attr_t a_; }; class CThread { typedef CThread __Myt; typedef void * (*__ThreadProc)(void *); public: CThread():ret_(NULL), id_(0){} //启动线程 //proc: 线程函数 //arg: 线程函数的参数 //attr: 线程属性 //注意: // 重复调用start(),会导致之前的线程id等信息丢失 bool start(__ThreadProc proc, void * arg = NULL){ return (0 == ::pthread_create(&id_, NULL, proc, arg)); } bool start(__ThreadProc proc, void * arg, const CThreadAttr & attr){ return (0 == ::pthread_create(&id_, attr.a(), proc, arg)); } //通知线程停止 bool stop(){ return (0 == ::pthread_cancel(id_)); } //等待线程停止,并存储返回值 //仅对joinable状态的线程有效 bool join(){ return (0 == ::pthread_join(id_, &ret_)); } //获取线程返回值 //仅对joinable状态的线程有效 void * retval() const{return ret_;} //detach线程 //仅对joinable状态的线程有效 bool detach(){ return (0 == ::pthread_detach(id_)); } //给线程发送信号 bool signal(int sig) const{ return (0 == ::pthread_kill(id_, sig)); } //判断线程id是否相等 bool operator ==(const __Myt & t) const{ return operator ==(t.id_); } bool operator ==(pthread_t t) const{ return (0 != ::pthread_equal(id_, t)); } bool operator !=(const __Myt & t) const{ return !operator ==(t); } bool operator !=(pthread_t t) const{ return !operator ==(t); } //转化成可读字符串 std::string toString() const{ CToString oss; oss<<"{id_="<<id_ <<", ret_="<<ret_ <<'}'; return oss.str(); } private: //fields void * ret_; pthread_t id_; }; inline bool operator ==(pthread_t t1, const CThread & t2) { return (t2.operator ==(t1)); } inline bool operator !=(pthread_t t1, const CThread & t2) { return (t2.operator !=(t1)); } class CThreadPool { typedef CThreadPool __Myt; typedef CAtomicSync<unsigned int> __Count; typedef std::vector<CThread> __Threads; public: CThreadPool(){} virtual ~CThreadPool(){} //启动所有线程 //子类可以重载,但最后应该调用CThreadPool::startThreads() //threadCount: 线程数 //attr: 线程属性 //return: // <0 出错,所有线程都无法启动 // 其他 实际启动的线程数 virtual int startThreads(unsigned int threadCount = 1){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this)) threads_.push_back(t); } return threads_.size(); } virtual int startThreads(unsigned int threadCount, const CThreadAttr & attr){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this, attr)) threads_.push_back(t); } return threads_.size(); } //获取运行的线程数 unsigned int runningCount() const volatile{return cnt_;} //判断所有线程是否都已停止 bool stopped() const volatile{return (0 == runningCount());} //通知所有线程停止 //子类可以重载,但最后应该调用CThreadPool::stopThreads() virtual void stopThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->stop(); } //等待所有线程退出,回收资源 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::joinThreads() virtual void joinThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->join(); } //detach所有线程 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::detachThreads() virtual void detachThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->detach(); } //给所有线程发送信号 void signalThreads(int sig) const{ for(__Threads::const_iterator i = threads_.begin();i != threads_.end();++i) i->signal(sig); } //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{cnt_="<<cnt_ <<", threads_=("<<threads_.size()<<"){"; for(size_t i = 0;i < threads_.size();++i){ if(i) oss<<", "; oss<<'['<<i<<"]="<<threads_[i].toString(); } oss<<"}}"; return oss.str(); } protected: //工作函数,子类必须重载 //注意:函数退出后,线程也会结束 virtual void run() = 0; private: CThreadPool(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); static void * threadProc(void * arg){ typedef NS_IMPL::CActive<__Count> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); __Active a(self.cnt_); self.run(); return NULL; } //fields __Threads threads_; __Count cnt_; }; template<class Task> class CThreadManager { typedef CThreadManager<Task> __Myt; typedef CLockInt<unsigned int> __Count; typedef CLockIntMax<unsigned int> __CountMax; typedef std::list<CThread> __Workers; typedef CMutex __WorkLock; typedef CGuard<__WorkLock> __WorkGuard; protected: typedef Task __Task; typedef CLockQueue<__Task> __Queue; static const int kScheduleInterval = 500; //ms, 默认调度间隔 public: CThreadManager() : inputQue_(NULL) , threadCountMin_(1) , threadCountMax_(0) , interval_(kScheduleInterval) {} virtual ~CThreadManager(){} //设置/获取最少线程数 void threadCountMin(unsigned int count){threadCountMin_ = count;} unsigned int threadCountMin() const{return threadCountMin_;} //设置/获取最多线程数 void threadCountMax(unsigned int count){threadCountMax_ = count;} unsigned int threadCountMax() const{return threadCountMax_;} //设置/获取调度线程的处理间隔时间(毫秒) void scheduleInterval(int timeMs){interval_ = timeMs;} int scheduleInterval() const{return interval_;} //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
7  CThread 单个线程 CThreadPool 固定数目的线程池 CThreadManager 自动伸缩的线程池 //*/ #include <pthread.h> #include <vector> #include <list> #include <cassert> #include "lock_int.hh" #include "lock_queue.hh" #include "logger.hh" #include "atomic_sync.hh" #include "impl/threads_impl.hh" NS_SERVER_BEGIN class CThread; class CThreadAttr { friend class CThread; typedef CThreadAttr __Myt; public: CThreadAttr() throw(std::runtime_error){ THROW_RT_IF_FAIL(::pthread_attr_init(&a_)); } ~CThreadAttr(){ ::pthread_attr_destroy(&a_); } //设置/获取栈大小(字节) void stackSize(size_t sz) throw(std::runtime_error){ THROW_RT_IF_FAIL(::pthread_attr_setstacksize(&a_, sz)); } size_t stackSize() const throw(std::runtime_error){ size_t sz = 0; THROW_RT_IF_FAIL(::pthread_attr_getstacksize(&a_, &sz)); return sz; } //设置/获取栈保护空间大小(字节) void guardSize(size_t sz) throw(std::runtime_error){ THROW_RT_IF_FAIL(::pthread_attr_setguardsize(&a_, sz)); } size_t guardSize() const throw(std::runtime_error){ size_t sz = 0; THROW_RT_IF_FAIL(::pthread_attr_getguardsize(&a_, &sz)); return sz; } //设置/获取线程状态 void detach(bool on) throw(std::runtime_error){ THROW_RT_IF_FAIL(::pthread_attr_setdetachstate(&a_ , (on ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE))); } bool detach() const throw(std::runtime_error){ int state = 0; THROW_RT_IF_FAIL(::pthread_attr_getdetachstate(&a_, &state)); return (PTHREAD_CREATE_DETACHED == state); } #ifdef __API_HAS_PTHREAD_ATTR_SETSTACK //设置/获取栈地址和大小 void stack(void * addr, size_t sz) throw(std::runtime_error){ THROW_RT_IF_FAIL(::pthread_attr_setstack(&a_, addr, sz)); } void * stack(size_t * sz) const throw(std::runtime_error){ void * addr = NULL; size_t s = 0; THROW_RT_IF_FAIL(::pthread_attr_getstack(&a_, &addr, &s)); if(sz) *sz = s; return addr; } #endif private: CThreadAttr(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); pthread_attr_t * a(){return &a_;} const pthread_attr_t * a() const{return &a_;} //fields pthread_attr_t a_; }; class CThread { typedef CThread __Myt; typedef void * (*__ThreadProc)(void *); public: CThread():ret_(NULL), id_(0){} //启动线程 //proc: 线程函数 //arg: 线程函数的参数 //attr: 线程属性 //注意: // 重复调用start(),会导致之前的线程id等信息丢失 bool start(__ThreadProc proc, void * arg = NULL){ return (0 == ::pthread_create(&id_, NULL, proc, arg)); } bool start(__ThreadProc proc, void * arg, const CThreadAttr & attr){ return (0 == ::pthread_create(&id_, attr.a(), proc, arg)); } //通知线程停止 bool stop(){ return (0 == ::pthread_cancel(id_)); } //等待线程停止,并存储返回值 //仅对joinable状态的线程有效 bool join(){ return (0 == ::pthread_join(id_, &ret_)); } //获取线程返回值 //仅对joinable状态的线程有效 void * retval() const{return ret_;} //detach线程 //仅对joinable状态的线程有效 bool detach(){ return (0 == ::pthread_detach(id_)); } //给线程发送信号 bool signal(int sig) const{ return (0 == ::pthread_kill(id_, sig)); } //判断线程id是否相等 bool operator ==(const __Myt & t) const{ return operator ==(t.id_); } bool operator ==(pthread_t t) const{ return (0 != ::pthread_equal(id_, t)); } bool operator !=(const __Myt & t) const{ return !operator ==(t); } bool operator !=(pthread_t t) const{ return !operator ==(t); } //转化成可读字符串 std::string toString() const{ CToString oss; oss<<"{id_="<<id_ <<", ret_="<<ret_ <<'}'; return oss.str(); } private: //fields void * ret_; pthread_t id_; }; inline bool operator ==(pthread_t t1, const CThread & t2) { return (t2.operator ==(t1)); } inline bool operator !=(pthread_t t1, const CThread & t2) { return (t2.operator !=(t1)); } class CThreadPool { typedef CThreadPool __Myt; typedef CAtomicSync<unsigned int> __Count; typedef std::vector<CThread> __Threads; public: CThreadPool(){} virtual ~CThreadPool(){} //启动所有线程 //子类可以重载,但最后应该调用CThreadPool::startThreads() //threadCount: 线程数 //attr: 线程属性 //return: // <0 出错,所有线程都无法启动 // 其他 实际启动的线程数 virtual int startThreads(unsigned int threadCount = 1){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this)) threads_.push_back(t); } return threads_.size(); } virtual int startThreads(unsigned int threadCount, const CThreadAttr & attr){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this, attr)) threads_.push_back(t); } return threads_.size(); } //获取运行的线程数 unsigned int runningCount() const volatile{return cnt_;} //判断所有线程是否都已停止 bool stopped() const volatile{return (0 == runningCount());} //通知所有线程停止 //子类可以重载,但最后应该调用CThreadPool::stopThreads() virtual void stopThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->stop(); } //等待所有线程退出,回收资源 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::joinThreads() virtual void joinThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->join(); } //detach所有线程 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::detachThreads() virtual void detachThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->detach(); } //给所有线程发送信号 void signalThreads(int sig) const{ for(__Threads::const_iterator i = threads_.begin();i != threads_.end();++i) i->signal(sig); } //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{cnt_="<<cnt_ <<", threads_=("<<threads_.size()<<"){"; for(size_t i = 0;i < threads_.size();++i){ if(i) oss<<", "; oss<<'['<<i<<"]="<<threads_[i].toString(); } oss<<"}}"; return oss.str(); } protected: //工作函数,子类必须重载 //注意:函数退出后,线程也会结束 virtual void run() = 0; private: CThreadPool(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); static void * threadProc(void * arg){ typedef NS_IMPL::CActive<__Count> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); __Active a(self.cnt_); self.run(); return NULL; } //fields __Threads threads_; __Count cnt_; }; template<class Task> class CThreadManager { typedef CThreadManager<Task> __Myt; typedef CLockInt<unsigned int> __Count; typedef CLockIntMax<unsigned int> __CountMax; typedef std::list<CThread> __Workers; typedef CMutex __WorkLock; typedef CGuard<__WorkLock> __WorkGuard; protected: typedef Task __Task; typedef CLockQueue<__Task> __Queue; static const int kScheduleInterval = 500; //ms, 默认调度间隔 public: CThreadManager() : inputQue_(NULL) , threadCountMin_(1) , threadCountMax_(0) , interval_(kScheduleInterval) {} virtual ~CThreadManager(){} //设置/获取最少线程数 void threadCountMin(unsigned int count){threadCountMin_ = count;} unsigned int threadCountMin() const{return threadCountMin_;} //设置/获取最多线程数 void threadCountMax(unsigned int count){threadCountMax_ = count;} unsigned int threadCountMax() const{return threadCountMax_;} //设置/获取调度线程的处理间隔时间(毫秒) void scheduleInterval(int timeMs){interval_ = timeMs;} int scheduleInterval() const{return interval_;} //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
8  CThreadPool 固定数目的线程池 CThreadManager 自动伸缩的线程池 //*/ #include <pthread.h> #include <vector> #include <list> #include <cassert> #include "lock_int.hh" #include "lock_queue.hh" #include "logger.hh" #include "atomic_sync.hh" #include "impl/threads_impl.hh" NS_SERVER_BEGIN class CThread; class CThreadAttr { friend class CThread; typedef CThreadAttr __Myt; public: CThreadAttr() throw(std::runtime_error){ THROW_RT_IF_FAIL(::pthread_attr_init(&a_)); } ~CThreadAttr(){ ::pthread_attr_destroy(&a_); } //设置/获取栈大小(字节) void stackSize(size_t sz) throw(std::runtime_error){ THROW_RT_IF_FAIL(::pthread_attr_setstacksize(&a_, sz)); } size_t stackSize() const throw(std::runtime_error){ size_t sz = 0; THROW_RT_IF_FAIL(::pthread_attr_getstacksize(&a_, &sz)); return sz; } //设置/获取栈保护空间大小(字节) void guardSize(size_t sz) throw(std::runtime_error){ THROW_RT_IF_FAIL(::pthread_attr_setguardsize(&a_, sz)); } size_t guardSize() const throw(std::runtime_error){ size_t sz = 0; THROW_RT_IF_FAIL(::pthread_attr_getguardsize(&a_, &sz)); return sz; } //设置/获取线程状态 void detach(bool on) throw(std::runtime_error){ THROW_RT_IF_FAIL(::pthread_attr_setdetachstate(&a_ , (on ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE))); } bool detach() const throw(std::runtime_error){ int state = 0; THROW_RT_IF_FAIL(::pthread_attr_getdetachstate(&a_, &state)); return (PTHREAD_CREATE_DETACHED == state); } #ifdef __API_HAS_PTHREAD_ATTR_SETSTACK //设置/获取栈地址和大小 void stack(void * addr, size_t sz) throw(std::runtime_error){ THROW_RT_IF_FAIL(::pthread_attr_setstack(&a_, addr, sz)); } void * stack(size_t * sz) const throw(std::runtime_error){ void * addr = NULL; size_t s = 0; THROW_RT_IF_FAIL(::pthread_attr_getstack(&a_, &addr, &s)); if(sz) *sz = s; return addr; } #endif private: CThreadAttr(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); pthread_attr_t * a(){return &a_;} const pthread_attr_t * a() const{return &a_;} //fields pthread_attr_t a_; }; class CThread { typedef CThread __Myt; typedef void * (*__ThreadProc)(void *); public: CThread():ret_(NULL), id_(0){} //启动线程 //proc: 线程函数 //arg: 线程函数的参数 //attr: 线程属性 //注意: // 重复调用start(),会导致之前的线程id等信息丢失 bool start(__ThreadProc proc, void * arg = NULL){ return (0 == ::pthread_create(&id_, NULL, proc, arg)); } bool start(__ThreadProc proc, void * arg, const CThreadAttr & attr){ return (0 == ::pthread_create(&id_, attr.a(), proc, arg)); } //通知线程停止 bool stop(){ return (0 == ::pthread_cancel(id_)); } //等待线程停止,并存储返回值 //仅对joinable状态的线程有效 bool join(){ return (0 == ::pthread_join(id_, &ret_)); } //获取线程返回值 //仅对joinable状态的线程有效 void * retval() const{return ret_;} //detach线程 //仅对joinable状态的线程有效 bool detach(){ return (0 == ::pthread_detach(id_)); } //给线程发送信号 bool signal(int sig) const{ return (0 == ::pthread_kill(id_, sig)); } //判断线程id是否相等 bool operator ==(const __Myt & t) const{ return operator ==(t.id_); } bool operator ==(pthread_t t) const{ return (0 != ::pthread_equal(id_, t)); } bool operator !=(const __Myt & t) const{ return !operator ==(t); } bool operator !=(pthread_t t) const{ return !operator ==(t); } //转化成可读字符串 std::string toString() const{ CToString oss; oss<<"{id_="<<id_ <<", ret_="<<ret_ <<'}'; return oss.str(); } private: //fields void * ret_; pthread_t id_; }; inline bool operator ==(pthread_t t1, const CThread & t2) { return (t2.operator ==(t1)); } inline bool operator !=(pthread_t t1, const CThread & t2) { return (t2.operator !=(t1)); } class CThreadPool { typedef CThreadPool __Myt; typedef CAtomicSync<unsigned int> __Count; typedef std::vector<CThread> __Threads; public: CThreadPool(){} virtual ~CThreadPool(){} //启动所有线程 //子类可以重载,但最后应该调用CThreadPool::startThreads() //threadCount: 线程数 //attr: 线程属性 //return: // <0 出错,所有线程都无法启动 // 其他 实际启动的线程数 virtual int startThreads(unsigned int threadCount = 1){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this)) threads_.push_back(t); } return threads_.size(); } virtual int startThreads(unsigned int threadCount, const CThreadAttr & attr){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this, attr)) threads_.push_back(t); } return threads_.size(); } //获取运行的线程数 unsigned int runningCount() const volatile{return cnt_;} //判断所有线程是否都已停止 bool stopped() const volatile{return (0 == runningCount());} //通知所有线程停止 //子类可以重载,但最后应该调用CThreadPool::stopThreads() virtual void stopThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->stop(); } //等待所有线程退出,回收资源 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::joinThreads() virtual void joinThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->join(); } //detach所有线程 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::detachThreads() virtual void detachThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->detach(); } //给所有线程发送信号 void signalThreads(int sig) const{ for(__Threads::const_iterator i = threads_.begin();i != threads_.end();++i) i->signal(sig); } //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{cnt_="<<cnt_ <<", threads_=("<<threads_.size()<<"){"; for(size_t i = 0;i < threads_.size();++i){ if(i) oss<<", "; oss<<'['<<i<<"]="<<threads_[i].toString(); } oss<<"}}"; return oss.str(); } protected: //工作函数,子类必须重载 //注意:函数退出后,线程也会结束 virtual void run() = 0; private: CThreadPool(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); static void * threadProc(void * arg){ typedef NS_IMPL::CActive<__Count> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); __Active a(self.cnt_); self.run(); return NULL; } //fields __Threads threads_; __Count cnt_; }; template<class Task> class CThreadManager { typedef CThreadManager<Task> __Myt; typedef CLockInt<unsigned int> __Count; typedef CLockIntMax<unsigned int> __CountMax; typedef std::list<CThread> __Workers; typedef CMutex __WorkLock; typedef CGuard<__WorkLock> __WorkGuard; protected: typedef Task __Task; typedef CLockQueue<__Task> __Queue; static const int kScheduleInterval = 500; //ms, 默认调度间隔 public: CThreadManager() : inputQue_(NULL) , threadCountMin_(1) , threadCountMax_(0) , interval_(kScheduleInterval) {} virtual ~CThreadManager(){} //设置/获取最少线程数 void threadCountMin(unsigned int count){threadCountMin_ = count;} unsigned int threadCountMin() const{return threadCountMin_;} //设置/获取最多线程数 void threadCountMax(unsigned int count){threadCountMax_ = count;} unsigned int threadCountMax() const{return threadCountMax_;} //设置/获取调度线程的处理间隔时间(毫秒) void scheduleInterval(int timeMs){interval_ = timeMs;} int scheduleInterval() const{return interval_;} //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
9  CThreadManager 自动伸缩的线程池//*/ #include <pthread.h> #include <vector> #include <list> #include <cassert> #include "lock_int.hh" #include "lock_queue.hh" #include "logger.hh" #include "atomic_sync.hh" #include "impl/threads_impl.hh" NS_SERVER_BEGIN class CThread; class CThreadAttr { friend class CThread; typedef CThreadAttr __Myt; public: CThreadAttr() throw(std::runtime_error){ THROW_RT_IF_FAIL(::pthread_attr_init(&a_)); } ~CThreadAttr(){ ::pthread_attr_destroy(&a_); } //设置/获取栈大小(字节) void stackSize(size_t sz) throw(std::runtime_error){ THROW_RT_IF_FAIL(::pthread_attr_setstacksize(&a_, sz)); } size_t stackSize() const throw(std::runtime_error){ size_t sz = 0; THROW_RT_IF_FAIL(::pthread_attr_getstacksize(&a_, &sz)); return sz; } //设置/获取栈保护空间大小(字节) void guardSize(size_t sz) throw(std::runtime_error){ THROW_RT_IF_FAIL(::pthread_attr_setguardsize(&a_, sz)); } size_t guardSize() const throw(std::runtime_error){ size_t sz = 0; THROW_RT_IF_FAIL(::pthread_attr_getguardsize(&a_, &sz)); return sz; } //设置/获取线程状态 void detach(bool on) throw(std::runtime_error){ THROW_RT_IF_FAIL(::pthread_attr_setdetachstate(&a_ , (on ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE))); } bool detach() const throw(std::runtime_error){ int state = 0; THROW_RT_IF_FAIL(::pthread_attr_getdetachstate(&a_, &state)); return (PTHREAD_CREATE_DETACHED == state); } #ifdef __API_HAS_PTHREAD_ATTR_SETSTACK //设置/获取栈地址和大小 void stack(void * addr, size_t sz) throw(std::runtime_error){ THROW_RT_IF_FAIL(::pthread_attr_setstack(&a_, addr, sz)); } void * stack(size_t * sz) const throw(std::runtime_error){ void * addr = NULL; size_t s = 0; THROW_RT_IF_FAIL(::pthread_attr_getstack(&a_, &addr, &s)); if(sz) *sz = s; return addr; } #endif private: CThreadAttr(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); pthread_attr_t * a(){return &a_;} const pthread_attr_t * a() const{return &a_;} //fields pthread_attr_t a_; }; class CThread { typedef CThread __Myt; typedef void * (*__ThreadProc)(void *); public: CThread():ret_(NULL), id_(0){} //启动线程 //proc: 线程函数 //arg: 线程函数的参数 //attr: 线程属性 //注意: // 重复调用start(),会导致之前的线程id等信息丢失 bool start(__ThreadProc proc, void * arg = NULL){ return (0 == ::pthread_create(&id_, NULL, proc, arg)); } bool start(__ThreadProc proc, void * arg, const CThreadAttr & attr){ return (0 == ::pthread_create(&id_, attr.a(), proc, arg)); } //通知线程停止 bool stop(){ return (0 == ::pthread_cancel(id_)); } //等待线程停止,并存储返回值 //仅对joinable状态的线程有效 bool join(){ return (0 == ::pthread_join(id_, &ret_)); } //获取线程返回值 //仅对joinable状态的线程有效 void * retval() const{return ret_;} //detach线程 //仅对joinable状态的线程有效 bool detach(){ return (0 == ::pthread_detach(id_)); } //给线程发送信号 bool signal(int sig) const{ return (0 == ::pthread_kill(id_, sig)); } //判断线程id是否相等 bool operator ==(const __Myt & t) const{ return operator ==(t.id_); } bool operator ==(pthread_t t) const{ return (0 != ::pthread_equal(id_, t)); } bool operator !=(const __Myt & t) const{ return !operator ==(t); } bool operator !=(pthread_t t) const{ return !operator ==(t); } //转化成可读字符串 std::string toString() const{ CToString oss; oss<<"{id_="<<id_ <<", ret_="<<ret_ <<'}'; return oss.str(); } private: //fields void * ret_; pthread_t id_; }; inline bool operator ==(pthread_t t1, const CThread & t2) { return (t2.operator ==(t1)); } inline bool operator !=(pthread_t t1, const CThread & t2) { return (t2.operator !=(t1)); } class CThreadPool { typedef CThreadPool __Myt; typedef CAtomicSync<unsigned int> __Count; typedef std::vector<CThread> __Threads; public: CThreadPool(){} virtual ~CThreadPool(){} //启动所有线程 //子类可以重载,但最后应该调用CThreadPool::startThreads() //threadCount: 线程数 //attr: 线程属性 //return: // <0 出错,所有线程都无法启动 // 其他 实际启动的线程数 virtual int startThreads(unsigned int threadCount = 1){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this)) threads_.push_back(t); } return threads_.size(); } virtual int startThreads(unsigned int threadCount, const CThreadAttr & attr){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this, attr)) threads_.push_back(t); } return threads_.size(); } //获取运行的线程数 unsigned int runningCount() const volatile{return cnt_;} //判断所有线程是否都已停止 bool stopped() const volatile{return (0 == runningCount());} //通知所有线程停止 //子类可以重载,但最后应该调用CThreadPool::stopThreads() virtual void stopThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->stop(); } //等待所有线程退出,回收资源 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::joinThreads() virtual void joinThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->join(); } //detach所有线程 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::detachThreads() virtual void detachThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->detach(); } //给所有线程发送信号 void signalThreads(int sig) const{ for(__Threads::const_iterator i = threads_.begin();i != threads_.end();++i) i->signal(sig); } //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{cnt_="<<cnt_ <<", threads_=("<<threads_.size()<<"){"; for(size_t i = 0;i < threads_.size();++i){ if(i) oss<<", "; oss<<'['<<i<<"]="<<threads_[i].toString(); } oss<<"}}"; return oss.str(); } protected: //工作函数,子类必须重载 //注意:函数退出后,线程也会结束 virtual void run() = 0; private: CThreadPool(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); static void * threadProc(void * arg){ typedef NS_IMPL::CActive<__Count> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); __Active a(self.cnt_); self.run(); return NULL; } //fields __Threads threads_; __Count cnt_; }; template<class Task> class CThreadManager { typedef CThreadManager<Task> __Myt; typedef CLockInt<unsigned int> __Count; typedef CLockIntMax<unsigned int> __CountMax; typedef std::list<CThread> __Workers; typedef CMutex __WorkLock; typedef CGuard<__WorkLock> __WorkGuard; protected: typedef Task __Task; typedef CLockQueue<__Task> __Queue; static const int kScheduleInterval = 500; //ms, 默认调度间隔 public: CThreadManager() : inputQue_(NULL) , threadCountMin_(1) , threadCountMax_(0) , interval_(kScheduleInterval) {} virtual ~CThreadManager(){} //设置/获取最少线程数 void threadCountMin(unsigned int count){threadCountMin_ = count;} unsigned int threadCountMin() const{return threadCountMin_;} //设置/获取最多线程数 void threadCountMax(unsigned int count){threadCountMax_ = count;} unsigned int threadCountMax() const{return threadCountMax_;} //设置/获取调度线程的处理间隔时间(毫秒) void scheduleInterval(int timeMs){interval_ = timeMs;} int scheduleInterval() const{return interval_;} //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
10 //*/
11 
12 #include <pthread.h>
13 #include <vector>
14 #include <list>
15 #include <cassert>
16 #include "lock_int.hh"
17 #include "lock_queue.hh"
18 #include "logger.hh"
19 #include "atomic_sync.hh"
20 #include "impl/threads_impl.hh"
21 
22 NS_SERVER_BEGIN
23 
24 class CThread;
25 
27 {
28  friend class CThread;
29  typedef CThreadAttr __Myt;
30 public:
31  CThreadAttr() throw(std::runtime_error){
32  THROW_RT_IF_FAIL(::pthread_attr_init(&a_));
33  }
34  ~CThreadAttr(){
35  ::pthread_attr_destroy(&a_);
36  }
37  //设置/获取栈大小(字节)
38  void stackSize(size_t sz) throw(std::runtime_error){
39  THROW_RT_IF_FAIL(::pthread_attr_setstacksize(&a_, sz));
40  }
41  size_t stackSize() const throw(std::runtime_error){
42  size_t sz = 0;
43  THROW_RT_IF_FAIL(::pthread_attr_getstacksize(&a_, &sz));
44  return sz;
45  }
46  //设置/获取栈保护空间大小(字节)
47  void guardSize(size_t sz) throw(std::runtime_error){
48  THROW_RT_IF_FAIL(::pthread_attr_setguardsize(&a_, sz));
49  }
50  size_t guardSize() const throw(std::runtime_error){
51  size_t sz = 0;
52  THROW_RT_IF_FAIL(::pthread_attr_getguardsize(&a_, &sz));
53  return sz;
54  }
55  //设置/获取线程状态
56  void detach(bool on) throw(std::runtime_error){
57  THROW_RT_IF_FAIL(::pthread_attr_setdetachstate(&a_
58  , (on ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE)));
59  }
60  bool detach() const throw(std::runtime_error){
61  int state = 0;
62  THROW_RT_IF_FAIL(::pthread_attr_getdetachstate(&a_, &state));
63  return (PTHREAD_CREATE_DETACHED == state);
64  }
65 #ifdef __API_HAS_PTHREAD_ATTR_SETSTACK
66  //设置/获取栈地址和大小
67  void stack(void * addr, size_t sz) throw(std::runtime_error){
68  THROW_RT_IF_FAIL(::pthread_attr_setstack(&a_, addr, sz));
69  }
70  void * stack(size_t * sz) const throw(std::runtime_error){
71  void * addr = NULL;
72  size_t s = 0;
73  THROW_RT_IF_FAIL(::pthread_attr_getstack(&a_, &addr, &s));
74  if(sz)
75  *sz = s;
76  return addr;
77  }
78 #endif
79 private:
80  CThreadAttr(const __Myt &); //disable copy and assignment
81  __Myt & operator =(const __Myt &);
82  pthread_attr_t * a(){return &a_;}
83  const pthread_attr_t * a() const{return &a_;}
84  //fields
85  pthread_attr_t a_;
86 };
87 
88 class CThread
89 {
90  typedef CThread __Myt;
91  typedef void * (*__ThreadProc)(void *);
92 public:
93  CThread():ret_(NULL), id_(0){}
94  //启动线程 //proc: 线程函数 //arg: 线程函数的参数 //attr: 线程属性 //注意: // 重复调用start(),会导致之前的线程id等信息丢失 bool start(__ThreadProc proc, void * arg = NULL){ return (0 == ::pthread_create(&id_, NULL, proc, arg)); } bool start(__ThreadProc proc, void * arg, const CThreadAttr & attr){ return (0 == ::pthread_create(&id_, attr.a(), proc, arg)); } //通知线程停止 bool stop(){ return (0 == ::pthread_cancel(id_)); } //等待线程停止,并存储返回值 //仅对joinable状态的线程有效 bool join(){ return (0 == ::pthread_join(id_, &ret_)); } //获取线程返回值 //仅对joinable状态的线程有效 void * retval() const{return ret_;} //detach线程 //仅对joinable状态的线程有效 bool detach(){ return (0 == ::pthread_detach(id_)); } //给线程发送信号 bool signal(int sig) const{ return (0 == ::pthread_kill(id_, sig)); } //判断线程id是否相等 bool operator ==(const __Myt & t) const{ return operator ==(t.id_); } bool operator ==(pthread_t t) const{ return (0 != ::pthread_equal(id_, t)); } bool operator !=(const __Myt & t) const{ return !operator ==(t); } bool operator !=(pthread_t t) const{ return !operator ==(t); } //转化成可读字符串 std::string toString() const{ CToString oss; oss<<"{id_="<<id_ <<", ret_="<<ret_ <<'}'; return oss.str(); } private: //fields void * ret_; pthread_t id_; }; inline bool operator ==(pthread_t t1, const CThread & t2) { return (t2.operator ==(t1)); } inline bool operator !=(pthread_t t1, const CThread & t2) { return (t2.operator !=(t1)); } class CThreadPool { typedef CThreadPool __Myt; typedef CAtomicSync<unsigned int> __Count; typedef std::vector<CThread> __Threads; public: CThreadPool(){} virtual ~CThreadPool(){} //启动所有线程 //子类可以重载,但最后应该调用CThreadPool::startThreads() //threadCount: 线程数 //attr: 线程属性 //return: // <0 出错,所有线程都无法启动 // 其他 实际启动的线程数 virtual int startThreads(unsigned int threadCount = 1){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this)) threads_.push_back(t); } return threads_.size(); } virtual int startThreads(unsigned int threadCount, const CThreadAttr & attr){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this, attr)) threads_.push_back(t); } return threads_.size(); } //获取运行的线程数 unsigned int runningCount() const volatile{return cnt_;} //判断所有线程是否都已停止 bool stopped() const volatile{return (0 == runningCount());} //通知所有线程停止 //子类可以重载,但最后应该调用CThreadPool::stopThreads() virtual void stopThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->stop(); } //等待所有线程退出,回收资源 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::joinThreads() virtual void joinThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->join(); } //detach所有线程 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::detachThreads() virtual void detachThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->detach(); } //给所有线程发送信号 void signalThreads(int sig) const{ for(__Threads::const_iterator i = threads_.begin();i != threads_.end();++i) i->signal(sig); } //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{cnt_="<<cnt_ <<", threads_=("<<threads_.size()<<"){"; for(size_t i = 0;i < threads_.size();++i){ if(i) oss<<", "; oss<<'['<<i<<"]="<<threads_[i].toString(); } oss<<"}}"; return oss.str(); } protected: //工作函数,子类必须重载 //注意:函数退出后,线程也会结束 virtual void run() = 0; private: CThreadPool(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); static void * threadProc(void * arg){ typedef NS_IMPL::CActive<__Count> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); __Active a(self.cnt_); self.run(); return NULL; } //fields __Threads threads_; __Count cnt_; }; template<class Task> class CThreadManager { typedef CThreadManager<Task> __Myt; typedef CLockInt<unsigned int> __Count; typedef CLockIntMax<unsigned int> __CountMax; typedef std::list<CThread> __Workers; typedef CMutex __WorkLock; typedef CGuard<__WorkLock> __WorkGuard; protected: typedef Task __Task; typedef CLockQueue<__Task> __Queue; static const int kScheduleInterval = 500; //ms, 默认调度间隔 public: CThreadManager() : inputQue_(NULL) , threadCountMin_(1) , threadCountMax_(0) , interval_(kScheduleInterval) {} virtual ~CThreadManager(){} //设置/获取最少线程数 void threadCountMin(unsigned int count){threadCountMin_ = count;} unsigned int threadCountMin() const{return threadCountMin_;} //设置/获取最多线程数 void threadCountMax(unsigned int count){threadCountMax_ = count;} unsigned int threadCountMax() const{return threadCountMax_;} //设置/获取调度线程的处理间隔时间(毫秒) void scheduleInterval(int timeMs){interval_ = timeMs;} int scheduleInterval() const{return interval_;} //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
95  //proc: 线程函数
96  //arg: 线程函数的参数 //attr: 线程属性 //注意: // 重复调用start(),会导致之前的线程id等信息丢失 bool start(__ThreadProc proc, void * arg = NULL){ return (0 == ::pthread_create(&id_, NULL, proc, arg)); } bool start(__ThreadProc proc, void * arg, const CThreadAttr & attr){ return (0 == ::pthread_create(&id_, attr.a(), proc, arg)); } //通知线程停止 bool stop(){ return (0 == ::pthread_cancel(id_)); } //等待线程停止,并存储返回值 //仅对joinable状态的线程有效 bool join(){ return (0 == ::pthread_join(id_, &ret_)); } //获取线程返回值 //仅对joinable状态的线程有效 void * retval() const{return ret_;} //detach线程 //仅对joinable状态的线程有效 bool detach(){ return (0 == ::pthread_detach(id_)); } //给线程发送信号 bool signal(int sig) const{ return (0 == ::pthread_kill(id_, sig)); } //判断线程id是否相等 bool operator ==(const __Myt & t) const{ return operator ==(t.id_); } bool operator ==(pthread_t t) const{ return (0 != ::pthread_equal(id_, t)); } bool operator !=(const __Myt & t) const{ return !operator ==(t); } bool operator !=(pthread_t t) const{ return !operator ==(t); } //转化成可读字符串 std::string toString() const{ CToString oss; oss<<"{id_="<<id_ <<", ret_="<<ret_ <<'}'; return oss.str(); } private: //fields void * ret_; pthread_t id_; }; inline bool operator ==(pthread_t t1, const CThread & t2) { return (t2.operator ==(t1)); } inline bool operator !=(pthread_t t1, const CThread & t2) { return (t2.operator !=(t1)); } class CThreadPool { typedef CThreadPool __Myt; typedef CAtomicSync<unsigned int> __Count; typedef std::vector<CThread> __Threads; public: CThreadPool(){} virtual ~CThreadPool(){} //启动所有线程 //子类可以重载,但最后应该调用CThreadPool::startThreads() //threadCount: 线程数 //attr: 线程属性 //return: // <0 出错,所有线程都无法启动 // 其他 实际启动的线程数 virtual int startThreads(unsigned int threadCount = 1){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this)) threads_.push_back(t); } return threads_.size(); } virtual int startThreads(unsigned int threadCount, const CThreadAttr & attr){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this, attr)) threads_.push_back(t); } return threads_.size(); } //获取运行的线程数 unsigned int runningCount() const volatile{return cnt_;} //判断所有线程是否都已停止 bool stopped() const volatile{return (0 == runningCount());} //通知所有线程停止 //子类可以重载,但最后应该调用CThreadPool::stopThreads() virtual void stopThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->stop(); } //等待所有线程退出,回收资源 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::joinThreads() virtual void joinThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->join(); } //detach所有线程 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::detachThreads() virtual void detachThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->detach(); } //给所有线程发送信号 void signalThreads(int sig) const{ for(__Threads::const_iterator i = threads_.begin();i != threads_.end();++i) i->signal(sig); } //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{cnt_="<<cnt_ <<", threads_=("<<threads_.size()<<"){"; for(size_t i = 0;i < threads_.size();++i){ if(i) oss<<", "; oss<<'['<<i<<"]="<<threads_[i].toString(); } oss<<"}}"; return oss.str(); } protected: //工作函数,子类必须重载 //注意:函数退出后,线程也会结束 virtual void run() = 0; private: CThreadPool(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); static void * threadProc(void * arg){ typedef NS_IMPL::CActive<__Count> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); __Active a(self.cnt_); self.run(); return NULL; } //fields __Threads threads_; __Count cnt_; }; template<class Task> class CThreadManager { typedef CThreadManager<Task> __Myt; typedef CLockInt<unsigned int> __Count; typedef CLockIntMax<unsigned int> __CountMax; typedef std::list<CThread> __Workers; typedef CMutex __WorkLock; typedef CGuard<__WorkLock> __WorkGuard; protected: typedef Task __Task; typedef CLockQueue<__Task> __Queue; static const int kScheduleInterval = 500; //ms, 默认调度间隔 public: CThreadManager() : inputQue_(NULL) , threadCountMin_(1) , threadCountMax_(0) , interval_(kScheduleInterval) {} virtual ~CThreadManager(){} //设置/获取最少线程数 void threadCountMin(unsigned int count){threadCountMin_ = count;} unsigned int threadCountMin() const{return threadCountMin_;} //设置/获取最多线程数 void threadCountMax(unsigned int count){threadCountMax_ = count;} unsigned int threadCountMax() const{return threadCountMax_;} //设置/获取调度线程的处理间隔时间(毫秒) void scheduleInterval(int timeMs){interval_ = timeMs;} int scheduleInterval() const{return interval_;} //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
97  //attr: 线程属性 //注意: // 重复调用start(),会导致之前的线程id等信息丢失 bool start(__ThreadProc proc, void * arg = NULL){ return (0 == ::pthread_create(&id_, NULL, proc, arg)); } bool start(__ThreadProc proc, void * arg, const CThreadAttr & attr){ return (0 == ::pthread_create(&id_, attr.a(), proc, arg)); } //通知线程停止 bool stop(){ return (0 == ::pthread_cancel(id_)); } //等待线程停止,并存储返回值 //仅对joinable状态的线程有效 bool join(){ return (0 == ::pthread_join(id_, &ret_)); } //获取线程返回值 //仅对joinable状态的线程有效 void * retval() const{return ret_;} //detach线程 //仅对joinable状态的线程有效 bool detach(){ return (0 == ::pthread_detach(id_)); } //给线程发送信号 bool signal(int sig) const{ return (0 == ::pthread_kill(id_, sig)); } //判断线程id是否相等 bool operator ==(const __Myt & t) const{ return operator ==(t.id_); } bool operator ==(pthread_t t) const{ return (0 != ::pthread_equal(id_, t)); } bool operator !=(const __Myt & t) const{ return !operator ==(t); } bool operator !=(pthread_t t) const{ return !operator ==(t); } //转化成可读字符串 std::string toString() const{ CToString oss; oss<<"{id_="<<id_ <<", ret_="<<ret_ <<'}'; return oss.str(); } private: //fields void * ret_; pthread_t id_; }; inline bool operator ==(pthread_t t1, const CThread & t2) { return (t2.operator ==(t1)); } inline bool operator !=(pthread_t t1, const CThread & t2) { return (t2.operator !=(t1)); } class CThreadPool { typedef CThreadPool __Myt; typedef CAtomicSync<unsigned int> __Count; typedef std::vector<CThread> __Threads; public: CThreadPool(){} virtual ~CThreadPool(){} //启动所有线程 //子类可以重载,但最后应该调用CThreadPool::startThreads() //threadCount: 线程数 //attr: 线程属性 //return: // <0 出错,所有线程都无法启动 // 其他 实际启动的线程数 virtual int startThreads(unsigned int threadCount = 1){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this)) threads_.push_back(t); } return threads_.size(); } virtual int startThreads(unsigned int threadCount, const CThreadAttr & attr){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this, attr)) threads_.push_back(t); } return threads_.size(); } //获取运行的线程数 unsigned int runningCount() const volatile{return cnt_;} //判断所有线程是否都已停止 bool stopped() const volatile{return (0 == runningCount());} //通知所有线程停止 //子类可以重载,但最后应该调用CThreadPool::stopThreads() virtual void stopThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->stop(); } //等待所有线程退出,回收资源 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::joinThreads() virtual void joinThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->join(); } //detach所有线程 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::detachThreads() virtual void detachThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->detach(); } //给所有线程发送信号 void signalThreads(int sig) const{ for(__Threads::const_iterator i = threads_.begin();i != threads_.end();++i) i->signal(sig); } //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{cnt_="<<cnt_ <<", threads_=("<<threads_.size()<<"){"; for(size_t i = 0;i < threads_.size();++i){ if(i) oss<<", "; oss<<'['<<i<<"]="<<threads_[i].toString(); } oss<<"}}"; return oss.str(); } protected: //工作函数,子类必须重载 //注意:函数退出后,线程也会结束 virtual void run() = 0; private: CThreadPool(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); static void * threadProc(void * arg){ typedef NS_IMPL::CActive<__Count> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); __Active a(self.cnt_); self.run(); return NULL; } //fields __Threads threads_; __Count cnt_; }; template<class Task> class CThreadManager { typedef CThreadManager<Task> __Myt; typedef CLockInt<unsigned int> __Count; typedef CLockIntMax<unsigned int> __CountMax; typedef std::list<CThread> __Workers; typedef CMutex __WorkLock; typedef CGuard<__WorkLock> __WorkGuard; protected: typedef Task __Task; typedef CLockQueue<__Task> __Queue; static const int kScheduleInterval = 500; //ms, 默认调度间隔 public: CThreadManager() : inputQue_(NULL) , threadCountMin_(1) , threadCountMax_(0) , interval_(kScheduleInterval) {} virtual ~CThreadManager(){} //设置/获取最少线程数 void threadCountMin(unsigned int count){threadCountMin_ = count;} unsigned int threadCountMin() const{return threadCountMin_;} //设置/获取最多线程数 void threadCountMax(unsigned int count){threadCountMax_ = count;} unsigned int threadCountMax() const{return threadCountMax_;} //设置/获取调度线程的处理间隔时间(毫秒) void scheduleInterval(int timeMs){interval_ = timeMs;} int scheduleInterval() const{return interval_;} //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
98  //注意:
99  // 重复调用start(),会导致之前的线程id等信息丢失
100  bool start(__ThreadProc proc, void * arg = NULL){
101  return (0 == ::pthread_create(&id_, NULL, proc, arg));
102  }
103  bool start(__ThreadProc proc, void * arg, const CThreadAttr & attr){
104  return (0 == ::pthread_create(&id_, attr.a(), proc, arg));
105  }
106  //通知线程停止
107  bool stop(){
108  return (0 == ::pthread_cancel(id_));
109  }
110  //等待线程停止,并存储返回值
111  //仅对joinable状态的线程有效
112  bool join(){
113  return (0 == ::pthread_join(id_, &ret_));
114  }
115  //获取线程返回值
116  //仅对joinable状态的线程有效
117  void * retval() const{return ret_;}
118  //detach线程 //仅对joinable状态的线程有效 bool detach(){ return (0 == ::pthread_detach(id_)); } //给线程发送信号 bool signal(int sig) const{ return (0 == ::pthread_kill(id_, sig)); } //判断线程id是否相等 bool operator ==(const __Myt & t) const{ return operator ==(t.id_); } bool operator ==(pthread_t t) const{ return (0 != ::pthread_equal(id_, t)); } bool operator !=(const __Myt & t) const{ return !operator ==(t); } bool operator !=(pthread_t t) const{ return !operator ==(t); } //转化成可读字符串 std::string toString() const{ CToString oss; oss<<"{id_="<<id_ <<", ret_="<<ret_ <<'}'; return oss.str(); } private: //fields void * ret_; pthread_t id_; }; inline bool operator ==(pthread_t t1, const CThread & t2) { return (t2.operator ==(t1)); } inline bool operator !=(pthread_t t1, const CThread & t2) { return (t2.operator !=(t1)); } class CThreadPool { typedef CThreadPool __Myt; typedef CAtomicSync<unsigned int> __Count; typedef std::vector<CThread> __Threads; public: CThreadPool(){} virtual ~CThreadPool(){} //启动所有线程 //子类可以重载,但最后应该调用CThreadPool::startThreads() //threadCount: 线程数 //attr: 线程属性 //return: // <0 出错,所有线程都无法启动 // 其他 实际启动的线程数 virtual int startThreads(unsigned int threadCount = 1){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this)) threads_.push_back(t); } return threads_.size(); } virtual int startThreads(unsigned int threadCount, const CThreadAttr & attr){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this, attr)) threads_.push_back(t); } return threads_.size(); } //获取运行的线程数 unsigned int runningCount() const volatile{return cnt_;} //判断所有线程是否都已停止 bool stopped() const volatile{return (0 == runningCount());} //通知所有线程停止 //子类可以重载,但最后应该调用CThreadPool::stopThreads() virtual void stopThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->stop(); } //等待所有线程退出,回收资源 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::joinThreads() virtual void joinThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->join(); } //detach所有线程 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::detachThreads() virtual void detachThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->detach(); } //给所有线程发送信号 void signalThreads(int sig) const{ for(__Threads::const_iterator i = threads_.begin();i != threads_.end();++i) i->signal(sig); } //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{cnt_="<<cnt_ <<", threads_=("<<threads_.size()<<"){"; for(size_t i = 0;i < threads_.size();++i){ if(i) oss<<", "; oss<<'['<<i<<"]="<<threads_[i].toString(); } oss<<"}}"; return oss.str(); } protected: //工作函数,子类必须重载 //注意:函数退出后,线程也会结束 virtual void run() = 0; private: CThreadPool(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); static void * threadProc(void * arg){ typedef NS_IMPL::CActive<__Count> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); __Active a(self.cnt_); self.run(); return NULL; } //fields __Threads threads_; __Count cnt_; }; template<class Task> class CThreadManager { typedef CThreadManager<Task> __Myt; typedef CLockInt<unsigned int> __Count; typedef CLockIntMax<unsigned int> __CountMax; typedef std::list<CThread> __Workers; typedef CMutex __WorkLock; typedef CGuard<__WorkLock> __WorkGuard; protected: typedef Task __Task; typedef CLockQueue<__Task> __Queue; static const int kScheduleInterval = 500; //ms, 默认调度间隔 public: CThreadManager() : inputQue_(NULL) , threadCountMin_(1) , threadCountMax_(0) , interval_(kScheduleInterval) {} virtual ~CThreadManager(){} //设置/获取最少线程数 void threadCountMin(unsigned int count){threadCountMin_ = count;} unsigned int threadCountMin() const{return threadCountMin_;} //设置/获取最多线程数 void threadCountMax(unsigned int count){threadCountMax_ = count;} unsigned int threadCountMax() const{return threadCountMax_;} //设置/获取调度线程的处理间隔时间(毫秒) void scheduleInterval(int timeMs){interval_ = timeMs;} int scheduleInterval() const{return interval_;} //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
119  //仅对joinable状态的线程有效
120  bool detach(){
121  return (0 == ::pthread_detach(id_));
122  }
123  //给线程发送信号 bool signal(int sig) const{ return (0 == ::pthread_kill(id_, sig)); } //判断线程id是否相等 bool operator ==(const __Myt & t) const{ return operator ==(t.id_); } bool operator ==(pthread_t t) const{ return (0 != ::pthread_equal(id_, t)); } bool operator !=(const __Myt & t) const{ return !operator ==(t); } bool operator !=(pthread_t t) const{ return !operator ==(t); } //转化成可读字符串 std::string toString() const{ CToString oss; oss<<"{id_="<<id_ <<", ret_="<<ret_ <<'}'; return oss.str(); } private: //fields void * ret_; pthread_t id_; }; inline bool operator ==(pthread_t t1, const CThread & t2) { return (t2.operator ==(t1)); } inline bool operator !=(pthread_t t1, const CThread & t2) { return (t2.operator !=(t1)); } class CThreadPool { typedef CThreadPool __Myt; typedef CAtomicSync<unsigned int> __Count; typedef std::vector<CThread> __Threads; public: CThreadPool(){} virtual ~CThreadPool(){} //启动所有线程 //子类可以重载,但最后应该调用CThreadPool::startThreads() //threadCount: 线程数 //attr: 线程属性 //return: // <0 出错,所有线程都无法启动 // 其他 实际启动的线程数 virtual int startThreads(unsigned int threadCount = 1){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this)) threads_.push_back(t); } return threads_.size(); } virtual int startThreads(unsigned int threadCount, const CThreadAttr & attr){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this, attr)) threads_.push_back(t); } return threads_.size(); } //获取运行的线程数 unsigned int runningCount() const volatile{return cnt_;} //判断所有线程是否都已停止 bool stopped() const volatile{return (0 == runningCount());} //通知所有线程停止 //子类可以重载,但最后应该调用CThreadPool::stopThreads() virtual void stopThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->stop(); } //等待所有线程退出,回收资源 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::joinThreads() virtual void joinThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->join(); } //detach所有线程 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::detachThreads() virtual void detachThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->detach(); } //给所有线程发送信号 void signalThreads(int sig) const{ for(__Threads::const_iterator i = threads_.begin();i != threads_.end();++i) i->signal(sig); } //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{cnt_="<<cnt_ <<", threads_=("<<threads_.size()<<"){"; for(size_t i = 0;i < threads_.size();++i){ if(i) oss<<", "; oss<<'['<<i<<"]="<<threads_[i].toString(); } oss<<"}}"; return oss.str(); } protected: //工作函数,子类必须重载 //注意:函数退出后,线程也会结束 virtual void run() = 0; private: CThreadPool(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); static void * threadProc(void * arg){ typedef NS_IMPL::CActive<__Count> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); __Active a(self.cnt_); self.run(); return NULL; } //fields __Threads threads_; __Count cnt_; }; template<class Task> class CThreadManager { typedef CThreadManager<Task> __Myt; typedef CLockInt<unsigned int> __Count; typedef CLockIntMax<unsigned int> __CountMax; typedef std::list<CThread> __Workers; typedef CMutex __WorkLock; typedef CGuard<__WorkLock> __WorkGuard; protected: typedef Task __Task; typedef CLockQueue<__Task> __Queue; static const int kScheduleInterval = 500; //ms, 默认调度间隔 public: CThreadManager() : inputQue_(NULL) , threadCountMin_(1) , threadCountMax_(0) , interval_(kScheduleInterval) {} virtual ~CThreadManager(){} //设置/获取最少线程数 void threadCountMin(unsigned int count){threadCountMin_ = count;} unsigned int threadCountMin() const{return threadCountMin_;} //设置/获取最多线程数 void threadCountMax(unsigned int count){threadCountMax_ = count;} unsigned int threadCountMax() const{return threadCountMax_;} //设置/获取调度线程的处理间隔时间(毫秒) void scheduleInterval(int timeMs){interval_ = timeMs;} int scheduleInterval() const{return interval_;} //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
124  bool signal(int sig) const{
125  return (0 == ::pthread_kill(id_, sig));
126  }
127  //判断线程id是否相等 bool operator ==(const __Myt & t) const{ return operator ==(t.id_); } bool operator ==(pthread_t t) const{ return (0 != ::pthread_equal(id_, t)); } bool operator !=(const __Myt & t) const{ return !operator ==(t); } bool operator !=(pthread_t t) const{ return !operator ==(t); } //转化成可读字符串 std::string toString() const{ CToString oss; oss<<"{id_="<<id_ <<", ret_="<<ret_ <<'}'; return oss.str(); } private: //fields void * ret_; pthread_t id_; }; inline bool operator ==(pthread_t t1, const CThread & t2) { return (t2.operator ==(t1)); } inline bool operator !=(pthread_t t1, const CThread & t2) { return (t2.operator !=(t1)); } class CThreadPool { typedef CThreadPool __Myt; typedef CAtomicSync<unsigned int> __Count; typedef std::vector<CThread> __Threads; public: CThreadPool(){} virtual ~CThreadPool(){} //启动所有线程 //子类可以重载,但最后应该调用CThreadPool::startThreads() //threadCount: 线程数 //attr: 线程属性 //return: // <0 出错,所有线程都无法启动 // 其他 实际启动的线程数 virtual int startThreads(unsigned int threadCount = 1){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this)) threads_.push_back(t); } return threads_.size(); } virtual int startThreads(unsigned int threadCount, const CThreadAttr & attr){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this, attr)) threads_.push_back(t); } return threads_.size(); } //获取运行的线程数 unsigned int runningCount() const volatile{return cnt_;} //判断所有线程是否都已停止 bool stopped() const volatile{return (0 == runningCount());} //通知所有线程停止 //子类可以重载,但最后应该调用CThreadPool::stopThreads() virtual void stopThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->stop(); } //等待所有线程退出,回收资源 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::joinThreads() virtual void joinThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->join(); } //detach所有线程 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::detachThreads() virtual void detachThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->detach(); } //给所有线程发送信号 void signalThreads(int sig) const{ for(__Threads::const_iterator i = threads_.begin();i != threads_.end();++i) i->signal(sig); } //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{cnt_="<<cnt_ <<", threads_=("<<threads_.size()<<"){"; for(size_t i = 0;i < threads_.size();++i){ if(i) oss<<", "; oss<<'['<<i<<"]="<<threads_[i].toString(); } oss<<"}}"; return oss.str(); } protected: //工作函数,子类必须重载 //注意:函数退出后,线程也会结束 virtual void run() = 0; private: CThreadPool(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); static void * threadProc(void * arg){ typedef NS_IMPL::CActive<__Count> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); __Active a(self.cnt_); self.run(); return NULL; } //fields __Threads threads_; __Count cnt_; }; template<class Task> class CThreadManager { typedef CThreadManager<Task> __Myt; typedef CLockInt<unsigned int> __Count; typedef CLockIntMax<unsigned int> __CountMax; typedef std::list<CThread> __Workers; typedef CMutex __WorkLock; typedef CGuard<__WorkLock> __WorkGuard; protected: typedef Task __Task; typedef CLockQueue<__Task> __Queue; static const int kScheduleInterval = 500; //ms, 默认调度间隔 public: CThreadManager() : inputQue_(NULL) , threadCountMin_(1) , threadCountMax_(0) , interval_(kScheduleInterval) {} virtual ~CThreadManager(){} //设置/获取最少线程数 void threadCountMin(unsigned int count){threadCountMin_ = count;} unsigned int threadCountMin() const{return threadCountMin_;} //设置/获取最多线程数 void threadCountMax(unsigned int count){threadCountMax_ = count;} unsigned int threadCountMax() const{return threadCountMax_;} //设置/获取调度线程的处理间隔时间(毫秒) void scheduleInterval(int timeMs){interval_ = timeMs;} int scheduleInterval() const{return interval_;} //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
128  bool operator ==(const __Myt & t) const{
129  return operator ==(t.id_);
130  }
131  bool operator ==(pthread_t t) const{
132  return (0 != ::pthread_equal(id_, t));
133  }
134  bool operator !=(const __Myt & t) const{
135  return !operator ==(t);
136  }
137  bool operator !=(pthread_t t) const{
138  return !operator ==(t);
139  }
140  //转化成可读字符串
141  std::string toString() const{
142  CToString oss;
143  oss<<"{id_="<<id_
144  <<", ret_="<<ret_
145  <<'}';
146  return oss.str();
147  }
148 private:
149  //fields
150  void * ret_;
151  pthread_t id_;
152 };
153 
154 inline bool operator ==(pthread_t t1, const CThread & t2)
155 {
156  return (t2.operator ==(t1));
157 }
158 
159 inline bool operator !=(pthread_t t1, const CThread & t2)
160 {
161  return (t2.operator !=(t1));
162 }
163 
165 {
166  typedef CThreadPool __Myt;
168  typedef std::vector<CThread> __Threads;
169 public:
170  CThreadPool(){}
171  virtual ~CThreadPool(){}
172  //启动所有线程 //子类可以重载,但最后应该调用CThreadPool::startThreads() //threadCount: 线程数 //attr: 线程属性 //return: // <0 出错,所有线程都无法启动 // 其他 实际启动的线程数 virtual int startThreads(unsigned int threadCount = 1){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this)) threads_.push_back(t); } return threads_.size(); } virtual int startThreads(unsigned int threadCount, const CThreadAttr & attr){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this, attr)) threads_.push_back(t); } return threads_.size(); } //获取运行的线程数 unsigned int runningCount() const volatile{return cnt_;} //判断所有线程是否都已停止 bool stopped() const volatile{return (0 == runningCount());} //通知所有线程停止 //子类可以重载,但最后应该调用CThreadPool::stopThreads() virtual void stopThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->stop(); } //等待所有线程退出,回收资源 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::joinThreads() virtual void joinThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->join(); } //detach所有线程 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::detachThreads() virtual void detachThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->detach(); } //给所有线程发送信号 void signalThreads(int sig) const{ for(__Threads::const_iterator i = threads_.begin();i != threads_.end();++i) i->signal(sig); } //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{cnt_="<<cnt_ <<", threads_=("<<threads_.size()<<"){"; for(size_t i = 0;i < threads_.size();++i){ if(i) oss<<", "; oss<<'['<<i<<"]="<<threads_[i].toString(); } oss<<"}}"; return oss.str(); } protected: //工作函数,子类必须重载 //注意:函数退出后,线程也会结束 virtual void run() = 0; private: CThreadPool(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); static void * threadProc(void * arg){ typedef NS_IMPL::CActive<__Count> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); __Active a(self.cnt_); self.run(); return NULL; } //fields __Threads threads_; __Count cnt_; }; template<class Task> class CThreadManager { typedef CThreadManager<Task> __Myt; typedef CLockInt<unsigned int> __Count; typedef CLockIntMax<unsigned int> __CountMax; typedef std::list<CThread> __Workers; typedef CMutex __WorkLock; typedef CGuard<__WorkLock> __WorkGuard; protected: typedef Task __Task; typedef CLockQueue<__Task> __Queue; static const int kScheduleInterval = 500; //ms, 默认调度间隔 public: CThreadManager() : inputQue_(NULL) , threadCountMin_(1) , threadCountMax_(0) , interval_(kScheduleInterval) {} virtual ~CThreadManager(){} //设置/获取最少线程数 void threadCountMin(unsigned int count){threadCountMin_ = count;} unsigned int threadCountMin() const{return threadCountMin_;} //设置/获取最多线程数 void threadCountMax(unsigned int count){threadCountMax_ = count;} unsigned int threadCountMax() const{return threadCountMax_;} //设置/获取调度线程的处理间隔时间(毫秒) void scheduleInterval(int timeMs){interval_ = timeMs;} int scheduleInterval() const{return interval_;} //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
173  //子类可以重载,但最后应该调用CThreadPool::startThreads()
174  //threadCount: 线程数 //attr: 线程属性 //return: // <0 出错,所有线程都无法启动 // 其他 实际启动的线程数 virtual int startThreads(unsigned int threadCount = 1){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this)) threads_.push_back(t); } return threads_.size(); } virtual int startThreads(unsigned int threadCount, const CThreadAttr & attr){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this, attr)) threads_.push_back(t); } return threads_.size(); } //获取运行的线程数 unsigned int runningCount() const volatile{return cnt_;} //判断所有线程是否都已停止 bool stopped() const volatile{return (0 == runningCount());} //通知所有线程停止 //子类可以重载,但最后应该调用CThreadPool::stopThreads() virtual void stopThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->stop(); } //等待所有线程退出,回收资源 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::joinThreads() virtual void joinThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->join(); } //detach所有线程 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::detachThreads() virtual void detachThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->detach(); } //给所有线程发送信号 void signalThreads(int sig) const{ for(__Threads::const_iterator i = threads_.begin();i != threads_.end();++i) i->signal(sig); } //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{cnt_="<<cnt_ <<", threads_=("<<threads_.size()<<"){"; for(size_t i = 0;i < threads_.size();++i){ if(i) oss<<", "; oss<<'['<<i<<"]="<<threads_[i].toString(); } oss<<"}}"; return oss.str(); } protected: //工作函数,子类必须重载 //注意:函数退出后,线程也会结束 virtual void run() = 0; private: CThreadPool(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); static void * threadProc(void * arg){ typedef NS_IMPL::CActive<__Count> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); __Active a(self.cnt_); self.run(); return NULL; } //fields __Threads threads_; __Count cnt_; }; template<class Task> class CThreadManager { typedef CThreadManager<Task> __Myt; typedef CLockInt<unsigned int> __Count; typedef CLockIntMax<unsigned int> __CountMax; typedef std::list<CThread> __Workers; typedef CMutex __WorkLock; typedef CGuard<__WorkLock> __WorkGuard; protected: typedef Task __Task; typedef CLockQueue<__Task> __Queue; static const int kScheduleInterval = 500; //ms, 默认调度间隔 public: CThreadManager() : inputQue_(NULL) , threadCountMin_(1) , threadCountMax_(0) , interval_(kScheduleInterval) {} virtual ~CThreadManager(){} //设置/获取最少线程数 void threadCountMin(unsigned int count){threadCountMin_ = count;} unsigned int threadCountMin() const{return threadCountMin_;} //设置/获取最多线程数 void threadCountMax(unsigned int count){threadCountMax_ = count;} unsigned int threadCountMax() const{return threadCountMax_;} //设置/获取调度线程的处理间隔时间(毫秒) void scheduleInterval(int timeMs){interval_ = timeMs;} int scheduleInterval() const{return interval_;} //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
175  //attr: 线程属性 //return: // <0 出错,所有线程都无法启动 // 其他 实际启动的线程数 virtual int startThreads(unsigned int threadCount = 1){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this)) threads_.push_back(t); } return threads_.size(); } virtual int startThreads(unsigned int threadCount, const CThreadAttr & attr){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this, attr)) threads_.push_back(t); } return threads_.size(); } //获取运行的线程数 unsigned int runningCount() const volatile{return cnt_;} //判断所有线程是否都已停止 bool stopped() const volatile{return (0 == runningCount());} //通知所有线程停止 //子类可以重载,但最后应该调用CThreadPool::stopThreads() virtual void stopThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->stop(); } //等待所有线程退出,回收资源 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::joinThreads() virtual void joinThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->join(); } //detach所有线程 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::detachThreads() virtual void detachThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->detach(); } //给所有线程发送信号 void signalThreads(int sig) const{ for(__Threads::const_iterator i = threads_.begin();i != threads_.end();++i) i->signal(sig); } //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{cnt_="<<cnt_ <<", threads_=("<<threads_.size()<<"){"; for(size_t i = 0;i < threads_.size();++i){ if(i) oss<<", "; oss<<'['<<i<<"]="<<threads_[i].toString(); } oss<<"}}"; return oss.str(); } protected: //工作函数,子类必须重载 //注意:函数退出后,线程也会结束 virtual void run() = 0; private: CThreadPool(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); static void * threadProc(void * arg){ typedef NS_IMPL::CActive<__Count> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); __Active a(self.cnt_); self.run(); return NULL; } //fields __Threads threads_; __Count cnt_; }; template<class Task> class CThreadManager { typedef CThreadManager<Task> __Myt; typedef CLockInt<unsigned int> __Count; typedef CLockIntMax<unsigned int> __CountMax; typedef std::list<CThread> __Workers; typedef CMutex __WorkLock; typedef CGuard<__WorkLock> __WorkGuard; protected: typedef Task __Task; typedef CLockQueue<__Task> __Queue; static const int kScheduleInterval = 500; //ms, 默认调度间隔 public: CThreadManager() : inputQue_(NULL) , threadCountMin_(1) , threadCountMax_(0) , interval_(kScheduleInterval) {} virtual ~CThreadManager(){} //设置/获取最少线程数 void threadCountMin(unsigned int count){threadCountMin_ = count;} unsigned int threadCountMin() const{return threadCountMin_;} //设置/获取最多线程数 void threadCountMax(unsigned int count){threadCountMax_ = count;} unsigned int threadCountMax() const{return threadCountMax_;} //设置/获取调度线程的处理间隔时间(毫秒) void scheduleInterval(int timeMs){interval_ = timeMs;} int scheduleInterval() const{return interval_;} //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
176  //return:
177  // <0 出错,所有线程都无法启动
178  // 其他 实际启动的线程数 virtual int startThreads(unsigned int threadCount = 1){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this)) threads_.push_back(t); } return threads_.size(); } virtual int startThreads(unsigned int threadCount, const CThreadAttr & attr){ if(!threads_.empty()) return -1; //re-start threads_.reserve(threadCount); while(threadCount-- > 0){ CThread t; if(t.start(threadProc, this, attr)) threads_.push_back(t); } return threads_.size(); } //获取运行的线程数 unsigned int runningCount() const volatile{return cnt_;} //判断所有线程是否都已停止 bool stopped() const volatile{return (0 == runningCount());} //通知所有线程停止 //子类可以重载,但最后应该调用CThreadPool::stopThreads() virtual void stopThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->stop(); } //等待所有线程退出,回收资源 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::joinThreads() virtual void joinThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->join(); } //detach所有线程 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::detachThreads() virtual void detachThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->detach(); } //给所有线程发送信号 void signalThreads(int sig) const{ for(__Threads::const_iterator i = threads_.begin();i != threads_.end();++i) i->signal(sig); } //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{cnt_="<<cnt_ <<", threads_=("<<threads_.size()<<"){"; for(size_t i = 0;i < threads_.size();++i){ if(i) oss<<", "; oss<<'['<<i<<"]="<<threads_[i].toString(); } oss<<"}}"; return oss.str(); } protected: //工作函数,子类必须重载 //注意:函数退出后,线程也会结束 virtual void run() = 0; private: CThreadPool(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); static void * threadProc(void * arg){ typedef NS_IMPL::CActive<__Count> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); __Active a(self.cnt_); self.run(); return NULL; } //fields __Threads threads_; __Count cnt_; }; template<class Task> class CThreadManager { typedef CThreadManager<Task> __Myt; typedef CLockInt<unsigned int> __Count; typedef CLockIntMax<unsigned int> __CountMax; typedef std::list<CThread> __Workers; typedef CMutex __WorkLock; typedef CGuard<__WorkLock> __WorkGuard; protected: typedef Task __Task; typedef CLockQueue<__Task> __Queue; static const int kScheduleInterval = 500; //ms, 默认调度间隔 public: CThreadManager() : inputQue_(NULL) , threadCountMin_(1) , threadCountMax_(0) , interval_(kScheduleInterval) {} virtual ~CThreadManager(){} //设置/获取最少线程数 void threadCountMin(unsigned int count){threadCountMin_ = count;} unsigned int threadCountMin() const{return threadCountMin_;} //设置/获取最多线程数 void threadCountMax(unsigned int count){threadCountMax_ = count;} unsigned int threadCountMax() const{return threadCountMax_;} //设置/获取调度线程的处理间隔时间(毫秒) void scheduleInterval(int timeMs){interval_ = timeMs;} int scheduleInterval() const{return interval_;} //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
179  virtual int startThreads(unsigned int threadCount = 1){
180  if(!threads_.empty())
181  return -1; //re-start
182  threads_.reserve(threadCount);
183  while(threadCount-- > 0){
184  CThread t;
185  if(t.start(threadProc, this))
186  threads_.push_back(t);
187  }
188  return threads_.size();
189  }
190  virtual int startThreads(unsigned int threadCount, const CThreadAttr & attr){
191  if(!threads_.empty())
192  return -1; //re-start
193  threads_.reserve(threadCount);
194  while(threadCount-- > 0){
195  CThread t;
196  if(t.start(threadProc, this, attr))
197  threads_.push_back(t);
198  }
199  return threads_.size();
200  }
201  //获取运行的线程数 unsigned int runningCount() const volatile{return cnt_;} //判断所有线程是否都已停止 bool stopped() const volatile{return (0 == runningCount());} //通知所有线程停止 //子类可以重载,但最后应该调用CThreadPool::stopThreads() virtual void stopThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->stop(); } //等待所有线程退出,回收资源 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::joinThreads() virtual void joinThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->join(); } //detach所有线程 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::detachThreads() virtual void detachThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->detach(); } //给所有线程发送信号 void signalThreads(int sig) const{ for(__Threads::const_iterator i = threads_.begin();i != threads_.end();++i) i->signal(sig); } //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{cnt_="<<cnt_ <<", threads_=("<<threads_.size()<<"){"; for(size_t i = 0;i < threads_.size();++i){ if(i) oss<<", "; oss<<'['<<i<<"]="<<threads_[i].toString(); } oss<<"}}"; return oss.str(); } protected: //工作函数,子类必须重载 //注意:函数退出后,线程也会结束 virtual void run() = 0; private: CThreadPool(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); static void * threadProc(void * arg){ typedef NS_IMPL::CActive<__Count> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); __Active a(self.cnt_); self.run(); return NULL; } //fields __Threads threads_; __Count cnt_; }; template<class Task> class CThreadManager { typedef CThreadManager<Task> __Myt; typedef CLockInt<unsigned int> __Count; typedef CLockIntMax<unsigned int> __CountMax; typedef std::list<CThread> __Workers; typedef CMutex __WorkLock; typedef CGuard<__WorkLock> __WorkGuard; protected: typedef Task __Task; typedef CLockQueue<__Task> __Queue; static const int kScheduleInterval = 500; //ms, 默认调度间隔 public: CThreadManager() : inputQue_(NULL) , threadCountMin_(1) , threadCountMax_(0) , interval_(kScheduleInterval) {} virtual ~CThreadManager(){} //设置/获取最少线程数 void threadCountMin(unsigned int count){threadCountMin_ = count;} unsigned int threadCountMin() const{return threadCountMin_;} //设置/获取最多线程数 void threadCountMax(unsigned int count){threadCountMax_ = count;} unsigned int threadCountMax() const{return threadCountMax_;} //设置/获取调度线程的处理间隔时间(毫秒) void scheduleInterval(int timeMs){interval_ = timeMs;} int scheduleInterval() const{return interval_;} //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
202  unsigned int runningCount() const volatile{return cnt_;}
203  //判断所有线程是否都已停止
204  bool stopped() const volatile{return (0 == runningCount());}
205  //通知所有线程停止
206  //子类可以重载,但最后应该调用CThreadPool::stopThreads()
207  virtual void stopThreads(){
208  for(__Threads::iterator i = threads_.begin();i != threads_.end();++i)
209  i->stop();
210  }
211  //等待所有线程退出,回收资源
212  //仅对joinable状态的线程有效
213  //子类可以重载,但最后应该调用CThreadPool::joinThreads()
214  virtual void joinThreads(){
215  for(__Threads::iterator i = threads_.begin();i != threads_.end();++i)
216  i->join();
217  }
218  //detach所有线程 //仅对joinable状态的线程有效 //子类可以重载,但最后应该调用CThreadPool::detachThreads() virtual void detachThreads(){ for(__Threads::iterator i = threads_.begin();i != threads_.end();++i) i->detach(); } //给所有线程发送信号 void signalThreads(int sig) const{ for(__Threads::const_iterator i = threads_.begin();i != threads_.end();++i) i->signal(sig); } //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{cnt_="<<cnt_ <<", threads_=("<<threads_.size()<<"){"; for(size_t i = 0;i < threads_.size();++i){ if(i) oss<<", "; oss<<'['<<i<<"]="<<threads_[i].toString(); } oss<<"}}"; return oss.str(); } protected: //工作函数,子类必须重载 //注意:函数退出后,线程也会结束 virtual void run() = 0; private: CThreadPool(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); static void * threadProc(void * arg){ typedef NS_IMPL::CActive<__Count> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); __Active a(self.cnt_); self.run(); return NULL; } //fields __Threads threads_; __Count cnt_; }; template<class Task> class CThreadManager { typedef CThreadManager<Task> __Myt; typedef CLockInt<unsigned int> __Count; typedef CLockIntMax<unsigned int> __CountMax; typedef std::list<CThread> __Workers; typedef CMutex __WorkLock; typedef CGuard<__WorkLock> __WorkGuard; protected: typedef Task __Task; typedef CLockQueue<__Task> __Queue; static const int kScheduleInterval = 500; //ms, 默认调度间隔 public: CThreadManager() : inputQue_(NULL) , threadCountMin_(1) , threadCountMax_(0) , interval_(kScheduleInterval) {} virtual ~CThreadManager(){} //设置/获取最少线程数 void threadCountMin(unsigned int count){threadCountMin_ = count;} unsigned int threadCountMin() const{return threadCountMin_;} //设置/获取最多线程数 void threadCountMax(unsigned int count){threadCountMax_ = count;} unsigned int threadCountMax() const{return threadCountMax_;} //设置/获取调度线程的处理间隔时间(毫秒) void scheduleInterval(int timeMs){interval_ = timeMs;} int scheduleInterval() const{return interval_;} //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
219  //仅对joinable状态的线程有效
220  //子类可以重载,但最后应该调用CThreadPool::detachThreads()
221  virtual void detachThreads(){
222  for(__Threads::iterator i = threads_.begin();i != threads_.end();++i)
223  i->detach();
224  }
225  //给所有线程发送信号 void signalThreads(int sig) const{ for(__Threads::const_iterator i = threads_.begin();i != threads_.end();++i) i->signal(sig); } //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{cnt_="<<cnt_ <<", threads_=("<<threads_.size()<<"){"; for(size_t i = 0;i < threads_.size();++i){ if(i) oss<<", "; oss<<'['<<i<<"]="<<threads_[i].toString(); } oss<<"}}"; return oss.str(); } protected: //工作函数,子类必须重载 //注意:函数退出后,线程也会结束 virtual void run() = 0; private: CThreadPool(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); static void * threadProc(void * arg){ typedef NS_IMPL::CActive<__Count> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); __Active a(self.cnt_); self.run(); return NULL; } //fields __Threads threads_; __Count cnt_; }; template<class Task> class CThreadManager { typedef CThreadManager<Task> __Myt; typedef CLockInt<unsigned int> __Count; typedef CLockIntMax<unsigned int> __CountMax; typedef std::list<CThread> __Workers; typedef CMutex __WorkLock; typedef CGuard<__WorkLock> __WorkGuard; protected: typedef Task __Task; typedef CLockQueue<__Task> __Queue; static const int kScheduleInterval = 500; //ms, 默认调度间隔 public: CThreadManager() : inputQue_(NULL) , threadCountMin_(1) , threadCountMax_(0) , interval_(kScheduleInterval) {} virtual ~CThreadManager(){} //设置/获取最少线程数 void threadCountMin(unsigned int count){threadCountMin_ = count;} unsigned int threadCountMin() const{return threadCountMin_;} //设置/获取最多线程数 void threadCountMax(unsigned int count){threadCountMax_ = count;} unsigned int threadCountMax() const{return threadCountMax_;} //设置/获取调度线程的处理间隔时间(毫秒) void scheduleInterval(int timeMs){interval_ = timeMs;} int scheduleInterval() const{return interval_;} //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
226  void signalThreads(int sig) const{
227  for(__Threads::const_iterator i = threads_.begin();i != threads_.end();++i)
228  i->signal(sig);
229  }
230  //转换成可读字符串
231  std::string toString() const{
232  CToString oss;
233  oss<<"{cnt_="<<cnt_
234  <<", threads_=("<<threads_.size()<<"){";
235  for(size_t i = 0;i < threads_.size();++i){
236  if(i)
237  oss<<", ";
238  oss<<'['<<i<<"]="<<threads_[i].toString();
239  }
240  oss<<"}}";
241  return oss.str();
242  }
243 protected:
244  //工作函数,子类必须重载 //注意:函数退出后,线程也会结束 virtual void run() = 0; private: CThreadPool(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); static void * threadProc(void * arg){ typedef NS_IMPL::CActive<__Count> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); __Active a(self.cnt_); self.run(); return NULL; } //fields __Threads threads_; __Count cnt_; }; template<class Task> class CThreadManager { typedef CThreadManager<Task> __Myt; typedef CLockInt<unsigned int> __Count; typedef CLockIntMax<unsigned int> __CountMax; typedef std::list<CThread> __Workers; typedef CMutex __WorkLock; typedef CGuard<__WorkLock> __WorkGuard; protected: typedef Task __Task; typedef CLockQueue<__Task> __Queue; static const int kScheduleInterval = 500; //ms, 默认调度间隔 public: CThreadManager() : inputQue_(NULL) , threadCountMin_(1) , threadCountMax_(0) , interval_(kScheduleInterval) {} virtual ~CThreadManager(){} //设置/获取最少线程数 void threadCountMin(unsigned int count){threadCountMin_ = count;} unsigned int threadCountMin() const{return threadCountMin_;} //设置/获取最多线程数 void threadCountMax(unsigned int count){threadCountMax_ = count;} unsigned int threadCountMax() const{return threadCountMax_;} //设置/获取调度线程的处理间隔时间(毫秒) void scheduleInterval(int timeMs){interval_ = timeMs;} int scheduleInterval() const{return interval_;} //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
245  //注意:函数退出后,线程也会结束 virtual void run() = 0; private: CThreadPool(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); static void * threadProc(void * arg){ typedef NS_IMPL::CActive<__Count> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); __Active a(self.cnt_); self.run(); return NULL; } //fields __Threads threads_; __Count cnt_; }; template<class Task> class CThreadManager { typedef CThreadManager<Task> __Myt; typedef CLockInt<unsigned int> __Count; typedef CLockIntMax<unsigned int> __CountMax; typedef std::list<CThread> __Workers; typedef CMutex __WorkLock; typedef CGuard<__WorkLock> __WorkGuard; protected: typedef Task __Task; typedef CLockQueue<__Task> __Queue; static const int kScheduleInterval = 500; //ms, 默认调度间隔 public: CThreadManager() : inputQue_(NULL) , threadCountMin_(1) , threadCountMax_(0) , interval_(kScheduleInterval) {} virtual ~CThreadManager(){} //设置/获取最少线程数 void threadCountMin(unsigned int count){threadCountMin_ = count;} unsigned int threadCountMin() const{return threadCountMin_;} //设置/获取最多线程数 void threadCountMax(unsigned int count){threadCountMax_ = count;} unsigned int threadCountMax() const{return threadCountMax_;} //设置/获取调度线程的处理间隔时间(毫秒) void scheduleInterval(int timeMs){interval_ = timeMs;} int scheduleInterval() const{return interval_;} //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
246  virtual void run() = 0;
247 private:
248  CThreadPool(const __Myt &); //disable copy and assignment
249  __Myt & operator =(const __Myt &);
250  static void * threadProc(void * arg){
251  typedef NS_IMPL::CActive<__Count> __Active;
252  LOGGER_CRASH_HANDLER();
253  assert(arg);
254  __Myt & self = *static_cast<__Myt *>(arg);
255  __Active a(self.cnt_);
256  self.run();
257  return NULL;
258  }
259  //fields
260  __Threads threads_;
261  __Count cnt_;
262 };
263 
264 template<class Task>
266 {
267  typedef CThreadManager<Task> __Myt;
270  typedef std::list<CThread> __Workers;
271  typedef CMutex __WorkLock;
273 protected:
274  typedef Task __Task;
275  typedef CLockQueue<__Task> __Queue;
276  static const int kScheduleInterval = 500; //ms, 默认调度间隔public: CThreadManager() : inputQue_(NULL) , threadCountMin_(1) , threadCountMax_(0) , interval_(kScheduleInterval) {} virtual ~CThreadManager(){} //设置/获取最少线程数 void threadCountMin(unsigned int count){threadCountMin_ = count;} unsigned int threadCountMin() const{return threadCountMin_;} //设置/获取最多线程数 void threadCountMax(unsigned int count){threadCountMax_ = count;} unsigned int threadCountMax() const{return threadCountMax_;} //设置/获取调度线程的处理间隔时间(毫秒) void scheduleInterval(int timeMs){interval_ = timeMs;} int scheduleInterval() const{return interval_;} //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
277 public:
279  : inputQue_(NULL)
280  , threadCountMin_(1)
281  , threadCountMax_(0)
282  , interval_(kScheduleInterval)
283  {}
284  virtual ~CThreadManager(){}
285  //设置/获取最少线程数 void threadCountMin(unsigned int count){threadCountMin_ = count;} unsigned int threadCountMin() const{return threadCountMin_;} //设置/获取最多线程数 void threadCountMax(unsigned int count){threadCountMax_ = count;} unsigned int threadCountMax() const{return threadCountMax_;} //设置/获取调度线程的处理间隔时间(毫秒) void scheduleInterval(int timeMs){interval_ = timeMs;} int scheduleInterval() const{return interval_;} //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
286  void threadCountMin(unsigned int count){threadCountMin_ = count;}
287  unsigned int threadCountMin() const{return threadCountMin_;}
288  //设置/获取最多线程数 void threadCountMax(unsigned int count){threadCountMax_ = count;} unsigned int threadCountMax() const{return threadCountMax_;} //设置/获取调度线程的处理间隔时间(毫秒) void scheduleInterval(int timeMs){interval_ = timeMs;} int scheduleInterval() const{return interval_;} //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
289  void threadCountMax(unsigned int count){threadCountMax_ = count;}
290  unsigned int threadCountMax() const{return threadCountMax_;}
291  //设置/获取调度线程的处理间隔时间(毫秒)
292  void scheduleInterval(int timeMs){interval_ = timeMs;}
293  int scheduleInterval() const{return interval_;}
294  //启动线程 //子类可以重载,但最后应该调用CThreadManager::startThreads() //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
295  //子类可以重载,但最后应该调用CThreadManager::startThreads()
296  //inputQue: 任务队列 //initCount: 初始线程数目 // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
297  //initCount: 初始线程数目
298  // <threadCountMin_ 启动threadCountMin_个线程 // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
299  // >threadCountMax_ 启动threadCountMax_个线程 // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
300  // 其他 启动initCount个线程 //stackSz: 每个线程的栈大小 // 0 系统默认 // 其他 指定字节大小 //return: // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
301  //stackSz: 每个线程的栈大小
302  // 0 系统默认
303  // 其他 指定字节大小
304  //return:
305  // <0 出错 // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
306  // 其他 实际启动的worker线程数 virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){ //check if(NULL != inputQue_) return -1; //re-start //attr attr_.detach(true); if(stackSz) attr_.stackSize(stackSz); //schedule if(!sched_.start(threadSchedule, this, attr_)) return -1; inputQue_ = &inputQue; //worker return addThreads(adjustThreadCount(initCount)); } //通知所有线程停止 //子类可以重载,但最后应该调用CThreadManager::stopThreads() virtual void stopThreads(){ sched_.stop(); __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) it->stop(); workers_.clear(); } //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
307  virtual int startThreads(__Queue & inputQue, unsigned int initCount = 1, size_t stackSz = 0){
308  //check
309  if(NULL != inputQue_)
310  return -1; //re-start
311  //attr
312  attr_.detach(true);
313  if(stackSz)
314  attr_.stackSize(stackSz);
315  //schedule
316  if(!sched_.start(threadSchedule, this, attr_))
317  return -1;
318  inputQue_ = &inputQue;
319  //worker
320  return addThreads(adjustThreadCount(initCount));
321  }
322  //通知所有线程停止
323  //子类可以重载,但最后应该调用CThreadManager::stopThreads()
324  virtual void stopThreads(){
325  sched_.stop();
326  __Workers::iterator it;
327  __WorkGuard g(workLock_);
328  for(it = workers_.begin();it != workers_.end();++it)
329  it->stop();
330  workers_.clear();
331  }
332  //获取worker线程数统计 //countMax: // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
333  //countMax:
334  // NULL 忽略 // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
335  // 其他 返回最大worker线程数目,且重新开始统计 //return: 当前worker线程数目 unsigned int runningCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = workerCount_.resetMax(); return workerCount_.load(); } //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
336  //return: 当前worker线程数目
337  unsigned int runningCount(unsigned int * countMax = NULL) volatile{
338  if(NULL != countMax)
339  *countMax = workerCount_.resetMax();
340  return workerCount_.load();
341  }
342  //获取活跃线程统计 //countMax: // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
343  //countMax:
344  // NULL 忽略 // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
345  // 其他 返回最大活跃线程数目,且重新开始统计 //return: 当前活跃线程数目 unsigned int activeCount(unsigned int * countMax = NULL) volatile{ if(NULL != countMax) *countMax = activeCount_.resetMax(); return activeCount_.load(); } //检查所有线程是否停止 bool stopped() const volatile{return (0 == workerCount_.load());} //转换成可读字符串 std::string toString() const{ CToString oss; oss<<"{sched_="<<sched_.toString() <<", workerCount_="<<workerCount_.load() <<", activeCount_="<<activeCount_.load() <<", deleteCount_="<<deleteCount_.load() <<", inputQue_="<<inputQue_ <<", threadCountMin_="<<threadCountMin_ <<", threadCountMax_="<<threadCountMax_ <<", interval_="<<interval_ <<'}'; return oss.str(); } protected: //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
346  //return: 当前活跃线程数目
347  unsigned int activeCount(unsigned int * countMax = NULL) volatile{
348  if(NULL != countMax)
349  *countMax = activeCount_.resetMax();
350  return activeCount_.load();
351  }
352  //检查所有线程是否停止
353  bool stopped() const volatile{return (0 == workerCount_.load());}
354  //转换成可读字符串
355  std::string toString() const{
356  CToString oss;
357  oss<<"{sched_="<<sched_.toString()
358  <<", workerCount_="<<workerCount_.load()
359  <<", activeCount_="<<activeCount_.load()
360  <<", deleteCount_="<<deleteCount_.load()
361  <<", inputQue_="<<inputQue_
362  <<", threadCountMin_="<<threadCountMin_
363  <<", threadCountMax_="<<threadCountMax_
364  <<", interval_="<<interval_
365  <<'}';
366  return oss.str();
367  }
368 protected:
369  //工作函数,负责处理一个任务,完成后返回 //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
370  //子类必须重载 //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
371  //task: 从任务队列中取出的任务 virtual void run(__Task & task) = 0; private: //schedule thread static void * threadSchedule(void * arg){ LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); ::sleep(1); for(unsigned int expect = 0;;){ ::usleep(self.interval_ * 1000); const int kStrategy = 1; //策略选择 const unsigned int count = self.workerCount_.load(); const unsigned int active = self.activeCount_.load(); int add = 0; if(0 == kStrategy){ expect = self.adjustThreadCount(active << 1); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; }else if(1 == kStrategy){ expect = (expect + 1) >> 1; unsigned int max = self.adjustThreadCount(count); if(count > max){ add = max - count; }else{ expect = self.adjustThreadCount(expect + active); if(count > expect + 4 && count > expect + (expect >> 1)){ add = expect - count; }else if(count < expect) //need more threads add = expect - count; } } if(add > 0) self.addThreads(add); else if(add < 0) self.deleteCount_ = -add; //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}"); } return NULL; } //worker thread static void * threadWorker(void * arg){ typedef NS_IMPL::CActive<__CountMax> __Active; LOGGER_CRASH_HANDLER(); assert(arg); __Myt & self = *static_cast<__Myt *>(arg); assert(self.inputQue_); __Active a(self.workerCount_); for(__Task task;self.querySurvive();){ if(self.inputQue_->pop(task)){ __Active b(self.activeCount_); ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); self.run(task); ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); } } self.rmWorker(::pthread_self()); return NULL; } unsigned int adjustThreadCount(unsigned int count) const{ const unsigned int t_min = threadCountMin_; const unsigned int t_max = threadCountMax_; if(count < t_min || t_max <= t_min) return t_min; if(count > t_max) return t_max; return count; } bool querySurvive(){ typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred; return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL); } int addThreads(unsigned int count){ if(0 == count) return 0; int ret = 0; while(count-- > 0){ CThread t; __WorkGuard g(workLock_); //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
372  virtual void run(__Task & task) = 0;
373 private:
374  //schedule thread
375  static void * threadSchedule(void * arg){
376  LOGGER_CRASH_HANDLER();
377  assert(arg);
378  __Myt & self = *static_cast<__Myt *>(arg);
379  ::sleep(1);
380  for(unsigned int expect = 0;;){
381  ::usleep(self.interval_ * 1000);
382  const int kStrategy = 1; //策略选择
383  const unsigned int count = self.workerCount_.load();
384  const unsigned int active = self.activeCount_.load();
385  int add = 0;
386  if(0 == kStrategy){
387  expect = self.adjustThreadCount(active << 1);
388  if(count > expect + 4 && count > expect + (expect >> 1)){
389  add = expect - count;
390  }else if(count < expect) //need more threads
391  add = expect - count;
392  }else if(1 == kStrategy){
393  expect = (expect + 1) >> 1;
394  unsigned int max = self.adjustThreadCount(count);
395  if(count > max){
396  add = max - count;
397  }else{
398  expect = self.adjustThreadCount(expect + active);
399  if(count > expect + 4 && count > expect + (expect >> 1)){
400  add = expect - count;
401  }else if(count < expect) //need more threads
402  add = expect - count;
403  }
404  }
405  if(add > 0)
406  self.addThreads(add);
407  else if(add < 0)
408  self.deleteCount_ = -add;
409  //FATAL("stats={"<<count<<", "<<active<<", "<<expect<<", "<<add<<"}");
410  }
411  return NULL;
412  }
413  //worker thread
414  static void * threadWorker(void * arg){
415  typedef NS_IMPL::CActive<__CountMax> __Active;
416  LOGGER_CRASH_HANDLER();
417  assert(arg);
418  __Myt & self = *static_cast<__Myt *>(arg);
419  assert(self.inputQue_);
420  __Active a(self.workerCount_);
421  for(__Task task;self.querySurvive();){
422  if(self.inputQue_->pop(task)){
423  __Active b(self.activeCount_);
424  ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
425  self.run(task);
426  ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
427  }
428  }
429  self.rmWorker(::pthread_self());
430  return NULL;
431  }
432  unsigned int adjustThreadCount(unsigned int count) const{
433  const unsigned int t_min = threadCountMin_;
434  const unsigned int t_max = threadCountMax_;
435  if(count < t_min || t_max <= t_min)
436  return t_min;
437  if(count > t_max)
438  return t_max;
439  return count;
440  }
441  bool querySurvive(){
442  typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred;
443  return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL);
444  }
445  int addThreads(unsigned int count){
446  if(0 == count)
447  return 0;
448  int ret = 0;
449  while(count-- > 0){
450  CThread t;
451  __WorkGuard g(workLock_);
452  //避免其他线程调用stopThreads()后,schedule线程继续创建worker线程 ::pthread_testcancel(); if(t.start(threadWorker, this, attr_)){ workers_.push_back(t); ++ret; } } return ret; } void rmWorker(pthread_t t){ __Workers::iterator it; __WorkGuard g(workLock_); for(it = workers_.begin();it != workers_.end();++it) if(*it == t){ workers_.erase(it); break; } } CThreadManager(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields CThread sched_; __Workers workers_; __WorkLock workLock_; CThreadAttr attr_; __CountMax workerCount_; __CountMax activeCount_; __Count deleteCount_; //thread count to be deleted __Queue * inputQue_; volatile unsigned int threadCountMin_; volatile unsigned int threadCountMax_; volatile int interval_; //ms, schedule interval }; NS_SERVER_END #endif
453  ::pthread_testcancel();
454  if(t.start(threadWorker, this, attr_)){
455  workers_.push_back(t);
456  ++ret;
457  }
458  }
459  return ret;
460  }
461  void rmWorker(pthread_t t){
462  __Workers::iterator it;
463  __WorkGuard g(workLock_);
464  for(it = workers_.begin();it != workers_.end();++it)
465  if(*it == t){
466  workers_.erase(it);
467  break;
468  }
469  }
470  CThreadManager(const __Myt &); //disable copy and assignment
471  __Myt & operator =(const __Myt &);
472  //fields
473  CThread sched_;
474  __Workers workers_;
475  __WorkLock workLock_;
476  CThreadAttr attr_;
477  __CountMax workerCount_;
478  __CountMax activeCount_;
479  __Count deleteCount_; //thread count to be deleted
480  __Queue * inputQue_;
481  volatile unsigned int threadCountMin_;
482  volatile unsigned int threadCountMax_;
483  volatile int interval_; //ms, schedule interval
484 };
485 
486 NS_SERVER_END
487 
488 #endif
489 
Definition: threads.hh:26
Definition: to_string.hh:43
Definition: threads.hh:88
Definition: mutex.hh:73
Definition: threads.hh:164
Definition: threads.hh:265
Lock-free atomic operations for integral types.
Definition: mutex.hh:430
Definition: lock_queue.hh:29