1 #ifndef DOZERG_THREADS_H_20130222 2 #define DOZERG_THREADS_H_20130222 16 #include "lock_int.hh" 17 #include "lock_queue.hh" 20 #include "impl/threads_impl.hh" 32 THROW_RT_IF_FAIL(::pthread_attr_init(&a_));
35 ::pthread_attr_destroy(&a_);
38 void stackSize(
size_t sz)
throw(std::runtime_error){
39 THROW_RT_IF_FAIL(::pthread_attr_setstacksize(&a_, sz));
41 size_t stackSize()
const throw(std::runtime_error){
43 THROW_RT_IF_FAIL(::pthread_attr_getstacksize(&a_, &sz));
47 void guardSize(
size_t sz)
throw(std::runtime_error){
48 THROW_RT_IF_FAIL(::pthread_attr_setguardsize(&a_, sz));
50 size_t guardSize()
const throw(std::runtime_error){
52 THROW_RT_IF_FAIL(::pthread_attr_getguardsize(&a_, &sz));
56 void detach(
bool on)
throw(std::runtime_error){
57 THROW_RT_IF_FAIL(::pthread_attr_setdetachstate(&a_
58 , (on ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE)));
60 bool detach()
const throw(std::runtime_error){
62 THROW_RT_IF_FAIL(::pthread_attr_getdetachstate(&a_, &state));
63 return (PTHREAD_CREATE_DETACHED == state);
65 #ifdef __API_HAS_PTHREAD_ATTR_SETSTACK 67 void stack(
void * addr,
size_t sz)
throw(std::runtime_error){
68 THROW_RT_IF_FAIL(::pthread_attr_setstack(&a_, addr, sz));
70 void * stack(
size_t * sz)
const throw(std::runtime_error){
73 THROW_RT_IF_FAIL(::pthread_attr_getstack(&a_, &addr, &s));
81 __Myt & operator =(
const __Myt &);
82 pthread_attr_t * a(){
return &a_;}
83 const pthread_attr_t * a()
const{
return &a_;}
91 typedef void * (*__ThreadProc)(
void *);
100 bool start(__ThreadProc proc,
void * arg = NULL){
101 return (0 == ::pthread_create(&id_, NULL, proc, arg));
103 bool start(__ThreadProc proc,
void * arg,
const CThreadAttr & attr){
104 return (0 == ::pthread_create(&id_, attr.a(), proc, arg));
108 return (0 == ::pthread_cancel(id_));
113 return (0 == ::pthread_join(id_, &ret_));
117 void * retval()
const{
return ret_;}
121 return (0 == ::pthread_detach(id_));
124 bool signal(
int sig)
const{
125 return (0 == ::pthread_kill(id_, sig));
128 bool operator ==(
const __Myt & t)
const{
129 return operator ==(t.id_);
131 bool operator ==(pthread_t t)
const{
132 return (0 != ::pthread_equal(id_, t));
134 bool operator !=(
const __Myt & t)
const{
135 return !operator ==(t);
137 bool operator !=(pthread_t t)
const{
138 return !operator ==(t);
141 std::string toString()
const{
154 inline bool operator ==(pthread_t t1,
const CThread & t2)
156 return (t2.operator ==(t1));
159 inline bool operator !=(pthread_t t1,
const CThread & t2)
161 return (t2.operator !=(t1));
168 typedef std::vector<CThread> __Threads;
179 virtual int startThreads(
unsigned int threadCount = 1){
180 if(!threads_.empty())
182 threads_.reserve(threadCount);
183 while(threadCount-- > 0){
185 if(t.start(threadProc,
this))
186 threads_.push_back(t);
188 return threads_.size();
190 virtual int startThreads(
unsigned int threadCount,
const CThreadAttr & attr){
191 if(!threads_.empty())
193 threads_.reserve(threadCount);
194 while(threadCount-- > 0){
196 if(t.start(threadProc,
this, attr))
197 threads_.push_back(t);
199 return threads_.size();
202 unsigned int runningCount()
const volatile{
return cnt_;}
204 bool stopped()
const volatile{
return (0 == runningCount());}
207 virtual void stopThreads(){
208 for(__Threads::iterator i = threads_.begin();i != threads_.end();++i)
214 virtual void joinThreads(){
215 for(__Threads::iterator i = threads_.begin();i != threads_.end();++i)
221 virtual void detachThreads(){
222 for(__Threads::iterator i = threads_.begin();i != threads_.end();++i)
226 void signalThreads(
int sig)
const{
227 for(__Threads::const_iterator i = threads_.begin();i != threads_.end();++i)
231 std::string toString()
const{
234 <<
", threads_=("<<threads_.size()<<
"){";
235 for(
size_t i = 0;i < threads_.size();++i){
238 oss<<
'['<<i<<
"]="<<threads_[i].toString();
246 virtual void run() = 0;
249 __Myt & operator =(
const __Myt &);
250 static void * threadProc(
void * arg){
251 typedef NS_IMPL::CActive<__Count> __Active;
252 LOGGER_CRASH_HANDLER();
254 __Myt &
self = *
static_cast<__Myt *
>(arg);
255 __Active a(
self.cnt_);
270 typedef std::list<CThread> __Workers;
276 static const int kScheduleInterval = 500;
282 , interval_(kScheduleInterval)
286 void threadCountMin(
unsigned int count){threadCountMin_ = count;}
287 unsigned int threadCountMin()
const{
return threadCountMin_;}
289 void threadCountMax(
unsigned int count){threadCountMax_ = count;}
290 unsigned int threadCountMax()
const{
return threadCountMax_;}
292 void scheduleInterval(
int timeMs){interval_ = timeMs;}
293 int scheduleInterval()
const{
return interval_;}
307 virtual int startThreads(__Queue & inputQue,
unsigned int initCount = 1,
size_t stackSz = 0){
309 if(NULL != inputQue_)
314 attr_.stackSize(stackSz);
316 if(!sched_.start(threadSchedule,
this, attr_))
318 inputQue_ = &inputQue;
320 return addThreads(adjustThreadCount(initCount));
324 virtual void stopThreads(){
326 __Workers::iterator it;
327 __WorkGuard g(workLock_);
328 for(it = workers_.begin();it != workers_.end();++it)
337 unsigned int runningCount(
unsigned int * countMax = NULL)
volatile{
339 *countMax = workerCount_.resetMax();
340 return workerCount_.load();
347 unsigned int activeCount(
unsigned int * countMax = NULL)
volatile{
349 *countMax = activeCount_.resetMax();
350 return activeCount_.load();
353 bool stopped()
const volatile{
return (0 == workerCount_.load());}
355 std::string toString()
const{
357 oss<<
"{sched_="<<sched_.toString()
358 <<
", workerCount_="<<workerCount_.load()
359 <<
", activeCount_="<<activeCount_.load()
360 <<
", deleteCount_="<<deleteCount_.load()
361 <<
", inputQue_="<<inputQue_
362 <<
", threadCountMin_="<<threadCountMin_
363 <<
", threadCountMax_="<<threadCountMax_
364 <<
", interval_="<<interval_
372 virtual void run(__Task & task) = 0;
375 static void * threadSchedule(
void * arg){
376 LOGGER_CRASH_HANDLER();
378 __Myt &
self = *
static_cast<__Myt *
>(arg);
380 for(
unsigned int expect = 0;;){
381 ::usleep(
self.interval_ * 1000);
382 const int kStrategy = 1;
383 const unsigned int count =
self.workerCount_.load();
384 const unsigned int active =
self.activeCount_.load();
387 expect =
self.adjustThreadCount(active << 1);
388 if(count > expect + 4 && count > expect + (expect >> 1)){
389 add = expect - count;
390 }
else if(count < expect)
391 add = expect - count;
392 }
else if(1 == kStrategy){
393 expect = (expect + 1) >> 1;
394 unsigned int max =
self.adjustThreadCount(count);
398 expect =
self.adjustThreadCount(expect + active);
399 if(count > expect + 4 && count > expect + (expect >> 1)){
400 add = expect - count;
401 }
else if(count < expect)
402 add = expect - count;
406 self.addThreads(add);
408 self.deleteCount_ = -add;
414 static void * threadWorker(
void * arg){
415 typedef NS_IMPL::CActive<__CountMax> __Active;
416 LOGGER_CRASH_HANDLER();
418 __Myt &
self = *
static_cast<__Myt *
>(arg);
419 assert(
self.inputQue_);
420 __Active a(
self.workerCount_);
421 for(__Task task;
self.querySurvive();){
422 if(
self.inputQue_->pop(task)){
423 __Active b(
self.activeCount_);
424 ::pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
426 ::pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
429 self.rmWorker(::pthread_self());
432 unsigned int adjustThreadCount(
unsigned int count)
const{
433 const unsigned int t_min = threadCountMin_;
434 const unsigned int t_max = threadCountMax_;
435 if(count < t_min || t_max <= t_min)
442 typedef NS_IMPL::IsPositive<__CountMax::value_type> __Pred;
443 return !deleteCount_.test_sub(__Pred(), 1, NULL, NULL);
445 int addThreads(
unsigned int count){
451 __WorkGuard g(workLock_);
453 ::pthread_testcancel();
454 if(t.start(threadWorker,
this, attr_)){
455 workers_.push_back(t);
461 void rmWorker(pthread_t t){
462 __Workers::iterator it;
463 __WorkGuard g(workLock_);
464 for(it = workers_.begin();it != workers_.end();++it)
471 __Myt & operator =(
const __Myt &);
475 __WorkLock workLock_;
477 __CountMax workerCount_;
478 __CountMax activeCount_;
479 __Count deleteCount_;
481 volatile unsigned int threadCountMin_;
482 volatile unsigned int threadCountMax_;
483 volatile int interval_;
Definition: threads.hh:26
Definition: to_string.hh:43
Definition: threads.hh:88
Definition: threads.hh:164
Definition: threads.hh:265
Lock-free atomic operations for integral types.
Definition: lock_queue.hh:29