Marine Library  1.0
C++ library for Linux Networking Development
lock_queue.hh
1 #ifndef DOZERG_LOCKED_QUEUE_H_20070901
2 #define DOZERG_LOCKED_QUEUE_H_20070901
3 
4 /*
5  加锁保护的FIFO消息队列,适合线程间传递数据 CLockQueue History 20070925 把pthread_cond_t和pthread_mutex_t替换成CCondMutex 20071025 加入capacity_,并把CCondMutex分成CMutex和CCondition 20080128 加入pushFront(),把元素放到队列前面 20080203 加入Mutex(),统一接口 20080903 增加popAll(),避免频繁pop() 20080911 增加pushAll(),避免频繁push() 20080912 增加_append(),对适用pushAll()的类型进行接口统一 20080920 增加lock_type,adapter_type和guard_type,修改getLock(),lock()和unlock() 20111204 去掉对CList的支持,将Container改成成员变量 修改waitNotEmpty()和waitNotFull(),严格检验size 20130603 把broadcast()修改成signal() 去掉getLock(),lock(),unlock() 20131016 去掉adapter_type //*/ #include "mutex.hh" #include "single_list.hh" NS_SERVER_BEGIN template<class T, class Container = CSingleList<T> > class CLockQueue { typedef CLockQueue<T, Container> __Myt; public: typedef CMutex lock_type; typedef CGuard<lock_type> guard_type; typedef Container container_type; typedef typename container_type::value_type value_type; typedef typename container_type::size_type size_type; typedef typename container_type::reference reference; typedef typename container_type::iterator iterator; typedef typename container_type::const_reference const_reference; typedef typename container_type::const_iterator const_iterator; private: static const size_t kCapacityDefault = 10000; //默认容量 template<class E, class A> static void _append(CSingleList<E, A> & to, CSingleList<E, A> & from){ to.append(from); } public: explicit CLockQueue(size_t capacity = kCapacityDefault) : capacity_(capacity) , top_size_(0) {} explicit CLockQueue(const container_type & con, size_t capacity = kCapacityDefault) : con_(con) , capacity_(capacity) , top_size_(con.size()) {} size_type capacity() const{return capacity_;} void capacity(size_type c){capacity_ = c;} bool empty() const volatile{ guard_type g(lock_); return c().empty(); } size_type size() const volatile{ guard_type g(lock_); return c().size(); } //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 bool push(const_reference v, int32_t timeMs = -1){ guard_type g(lock_); const int sig = waitNotFull(timeMs, 1); if(sig < 0) return false; con_.push_back(v); const size_t sz = con_.size(); if(sz > top_size_) //for statistic top_size_ = sz; not_empty_.signal(); return true; } //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 bool pushFront(const_reference v, int32_t timeMs = -1){ guard_type g(lock_); const int sig = waitNotFull(timeMs, 1); if(sig < 0) return false; con_.push_front(v); const size_t sz = con_.size(); if(sz > top_size_) //for statistic top_size_ = sz; not_empty_.signal(); return true; } //把con的所有元素加到队列尾 //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 //只有重载了CLockQueue::_append函数的类型,才能使用此操作 bool pushAll(container_type & con, int32_t timeMs = -1){ if(con.empty()) return true; guard_type g(lock_); const int sig = waitNotFull(timeMs, con.size()); if(sig < 0) return false; _append(con_, con); const size_t sz = con_.size(); if(sz > top_size_) //for statistic top_size_ = sz; not_empty_.broadcast(); return true; } //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 bool pop(reference v, int32_t timeMs = -1){ guard_type g(lock_); const int sig = waitNotEmpty(timeMs, 1); if(sig < 0) return false; v = con_.front(); con_.pop_front(); not_full_.signal(); return true; } //把队列的所有元素转移到con里 //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 bool popAll(container_type & con, int32_t timeMs = -1){ con.clear(); guard_type g(lock_); const int sig = waitNotEmpty(timeMs, (con_.empty() ? capacity_ : con_.size())); if(sig < 0) return false; con_.swap(con); not_full_.broadcast(); return true; } size_t topSize() const volatile{ //队列长度的峰值 guard_type g(lock_); return top_size_; } size_t resetTopSize() volatile{ guard_type g(lock_); size_t ret = top_size_; top_size_ = c().size(); return ret; } private: /* Return Value: -1 failed 0 not full 1 full //*/ int waitNotEmpty(int32_t timeMs, size_t need){ if(need > capacity_) return -1; while(con_.empty()){ if(timeMs < 0){ not_empty_.wait(lock_); }else if(!timeMs || !not_empty_.timeWait(lock_, timeMs)) return -1; } need = (con_.size() <= need ? 0 : con_.size() - need); //size after pop return (con_.size() >= capacity_ && need < capacity_ ? 1 : 0); } /* Return Value: -1 failed 0 not empty 1 empty //*/ int waitNotFull(int32_t timeMs, size_t need){ assert(need > 0); if(need > capacity_) return -1; while(con_.size() + need > capacity_){ if(timeMs < 0){ not_full_.wait(lock_); }else if(!timeMs || !not_full_.timeWait(lock_, timeMs)) return -1; } return (con_.empty() ? 1 : 0); } container_type & c() volatile{return const_cast<container_type &>(con_);} const container_type & c() const volatile{return const_cast<const container_type &>(con_);} CLockQueue(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields container_type con_; size_type capacity_; //队列最大长度,达到capacity_后push会阻塞 size_type top_size_; //con_.size()的峰值,统计用 lock_type lock_; CCondition not_empty_, not_full_; }; NS_SERVER_END #endif
6  CLockQueue
7  History
8  20070925 把pthread_cond_t和pthread_mutex_t替换成CCondMutex
9  20071025 加入capacity_,并把CCondMutex分成CMutex和CCondition
10  20080128 加入pushFront(),把元素放到队列前面
11  20080203 加入Mutex(),统一接口 20080903 增加popAll(),避免频繁pop() 20080911 增加pushAll(),避免频繁push() 20080912 增加_append(),对适用pushAll()的类型进行接口统一 20080920 增加lock_type,adapter_type和guard_type,修改getLock(),lock()和unlock() 20111204 去掉对CList的支持,将Container改成成员变量 修改waitNotEmpty()和waitNotFull(),严格检验size 20130603 把broadcast()修改成signal() 去掉getLock(),lock(),unlock() 20131016 去掉adapter_type //*/ #include "mutex.hh" #include "single_list.hh" NS_SERVER_BEGIN template<class T, class Container = CSingleList<T> > class CLockQueue { typedef CLockQueue<T, Container> __Myt; public: typedef CMutex lock_type; typedef CGuard<lock_type> guard_type; typedef Container container_type; typedef typename container_type::value_type value_type; typedef typename container_type::size_type size_type; typedef typename container_type::reference reference; typedef typename container_type::iterator iterator; typedef typename container_type::const_reference const_reference; typedef typename container_type::const_iterator const_iterator; private: static const size_t kCapacityDefault = 10000; //默认容量 template<class E, class A> static void _append(CSingleList<E, A> & to, CSingleList<E, A> & from){ to.append(from); } public: explicit CLockQueue(size_t capacity = kCapacityDefault) : capacity_(capacity) , top_size_(0) {} explicit CLockQueue(const container_type & con, size_t capacity = kCapacityDefault) : con_(con) , capacity_(capacity) , top_size_(con.size()) {} size_type capacity() const{return capacity_;} void capacity(size_type c){capacity_ = c;} bool empty() const volatile{ guard_type g(lock_); return c().empty(); } size_type size() const volatile{ guard_type g(lock_); return c().size(); } //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 bool push(const_reference v, int32_t timeMs = -1){ guard_type g(lock_); const int sig = waitNotFull(timeMs, 1); if(sig < 0) return false; con_.push_back(v); const size_t sz = con_.size(); if(sz > top_size_) //for statistic top_size_ = sz; not_empty_.signal(); return true; } //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 bool pushFront(const_reference v, int32_t timeMs = -1){ guard_type g(lock_); const int sig = waitNotFull(timeMs, 1); if(sig < 0) return false; con_.push_front(v); const size_t sz = con_.size(); if(sz > top_size_) //for statistic top_size_ = sz; not_empty_.signal(); return true; } //把con的所有元素加到队列尾 //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 //只有重载了CLockQueue::_append函数的类型,才能使用此操作 bool pushAll(container_type & con, int32_t timeMs = -1){ if(con.empty()) return true; guard_type g(lock_); const int sig = waitNotFull(timeMs, con.size()); if(sig < 0) return false; _append(con_, con); const size_t sz = con_.size(); if(sz > top_size_) //for statistic top_size_ = sz; not_empty_.broadcast(); return true; } //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 bool pop(reference v, int32_t timeMs = -1){ guard_type g(lock_); const int sig = waitNotEmpty(timeMs, 1); if(sig < 0) return false; v = con_.front(); con_.pop_front(); not_full_.signal(); return true; } //把队列的所有元素转移到con里 //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 bool popAll(container_type & con, int32_t timeMs = -1){ con.clear(); guard_type g(lock_); const int sig = waitNotEmpty(timeMs, (con_.empty() ? capacity_ : con_.size())); if(sig < 0) return false; con_.swap(con); not_full_.broadcast(); return true; } size_t topSize() const volatile{ //队列长度的峰值 guard_type g(lock_); return top_size_; } size_t resetTopSize() volatile{ guard_type g(lock_); size_t ret = top_size_; top_size_ = c().size(); return ret; } private: /* Return Value: -1 failed 0 not full 1 full //*/ int waitNotEmpty(int32_t timeMs, size_t need){ if(need > capacity_) return -1; while(con_.empty()){ if(timeMs < 0){ not_empty_.wait(lock_); }else if(!timeMs || !not_empty_.timeWait(lock_, timeMs)) return -1; } need = (con_.size() <= need ? 0 : con_.size() - need); //size after pop return (con_.size() >= capacity_ && need < capacity_ ? 1 : 0); } /* Return Value: -1 failed 0 not empty 1 empty //*/ int waitNotFull(int32_t timeMs, size_t need){ assert(need > 0); if(need > capacity_) return -1; while(con_.size() + need > capacity_){ if(timeMs < 0){ not_full_.wait(lock_); }else if(!timeMs || !not_full_.timeWait(lock_, timeMs)) return -1; } return (con_.empty() ? 1 : 0); } container_type & c() volatile{return const_cast<container_type &>(con_);} const container_type & c() const volatile{return const_cast<const container_type &>(con_);} CLockQueue(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields container_type con_; size_type capacity_; //队列最大长度,达到capacity_后push会阻塞 size_type top_size_; //con_.size()的峰值,统计用 lock_type lock_; CCondition not_empty_, not_full_; }; NS_SERVER_END #endif
12  20080903 增加popAll(),避免频繁pop()
13  20080911 增加pushAll(),避免频繁push()
14  20080912 增加_append(),对适用pushAll()的类型进行接口统一
15  20080920 增加lock_type,adapter_type和guard_type,修改getLock(),lock()和unlock()
16  20111204 去掉对CList的支持,将Container改成成员变量
17  修改waitNotEmpty()和waitNotFull(),严格检验size
18  20130603 把broadcast()修改成signal()
19  去掉getLock(),lock(),unlock()
20  20131016 去掉adapter_type
21 //*/
22 
23 #include "mutex.hh"
24 #include "single_list.hh"
25 
26 NS_SERVER_BEGIN
27 
28 template<class T, class Container = CSingleList<T> >
30 {
32 public:
33  typedef CMutex lock_type;
35  typedef Container container_type;
36  typedef typename container_type::value_type value_type;
37  typedef typename container_type::size_type size_type;
38  typedef typename container_type::reference reference;
39  typedef typename container_type::iterator iterator;
40  typedef typename container_type::const_reference const_reference;
41  typedef typename container_type::const_iterator const_iterator;
42 private:
43  static const size_t kCapacityDefault = 10000; //默认容量
44  template<class E, class A>
45  static void _append(CSingleList<E, A> & to, CSingleList<E, A> & from){
46  to.append(from);
47  }
48 public:
49  explicit CLockQueue(size_t capacity = kCapacityDefault)
50  : capacity_(capacity)
51  , top_size_(0)
52  {}
53  explicit CLockQueue(const container_type & con, size_t capacity = kCapacityDefault)
54  : con_(con)
55  , capacity_(capacity)
56  , top_size_(con.size())
57  {}
58  size_type capacity() const{return capacity_;}
59  void capacity(size_type c){capacity_ = c;}
60  bool empty() const volatile{
61  guard_type g(lock_);
62  return c().empty();
63  }
64  size_type size() const volatile{
65  guard_type g(lock_);
66  return c().size();
67  }
68  //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 bool push(const_reference v, int32_t timeMs = -1){ guard_type g(lock_); const int sig = waitNotFull(timeMs, 1); if(sig < 0) return false; con_.push_back(v); const size_t sz = con_.size(); if(sz > top_size_) //for statistic top_size_ = sz; not_empty_.signal(); return true; } //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 bool pushFront(const_reference v, int32_t timeMs = -1){ guard_type g(lock_); const int sig = waitNotFull(timeMs, 1); if(sig < 0) return false; con_.push_front(v); const size_t sz = con_.size(); if(sz > top_size_) //for statistic top_size_ = sz; not_empty_.signal(); return true; } //把con的所有元素加到队列尾 //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 //只有重载了CLockQueue::_append函数的类型,才能使用此操作 bool pushAll(container_type & con, int32_t timeMs = -1){ if(con.empty()) return true; guard_type g(lock_); const int sig = waitNotFull(timeMs, con.size()); if(sig < 0) return false; _append(con_, con); const size_t sz = con_.size(); if(sz > top_size_) //for statistic top_size_ = sz; not_empty_.broadcast(); return true; } //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 bool pop(reference v, int32_t timeMs = -1){ guard_type g(lock_); const int sig = waitNotEmpty(timeMs, 1); if(sig < 0) return false; v = con_.front(); con_.pop_front(); not_full_.signal(); return true; } //把队列的所有元素转移到con里 //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 bool popAll(container_type & con, int32_t timeMs = -1){ con.clear(); guard_type g(lock_); const int sig = waitNotEmpty(timeMs, (con_.empty() ? capacity_ : con_.size())); if(sig < 0) return false; con_.swap(con); not_full_.broadcast(); return true; } size_t topSize() const volatile{ //队列长度的峰值 guard_type g(lock_); return top_size_; } size_t resetTopSize() volatile{ guard_type g(lock_); size_t ret = top_size_; top_size_ = c().size(); return ret; } private: /* Return Value: -1 failed 0 not full 1 full //*/ int waitNotEmpty(int32_t timeMs, size_t need){ if(need > capacity_) return -1; while(con_.empty()){ if(timeMs < 0){ not_empty_.wait(lock_); }else if(!timeMs || !not_empty_.timeWait(lock_, timeMs)) return -1; } need = (con_.size() <= need ? 0 : con_.size() - need); //size after pop return (con_.size() >= capacity_ && need < capacity_ ? 1 : 0); } /* Return Value: -1 failed 0 not empty 1 empty //*/ int waitNotFull(int32_t timeMs, size_t need){ assert(need > 0); if(need > capacity_) return -1; while(con_.size() + need > capacity_){ if(timeMs < 0){ not_full_.wait(lock_); }else if(!timeMs || !not_full_.timeWait(lock_, timeMs)) return -1; } return (con_.empty() ? 1 : 0); } container_type & c() volatile{return const_cast<container_type &>(con_);} const container_type & c() const volatile{return const_cast<const container_type &>(con_);} CLockQueue(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields container_type con_; size_type capacity_; //队列最大长度,达到capacity_后push会阻塞 size_type top_size_; //con_.size()的峰值,统计用 lock_type lock_; CCondition not_empty_, not_full_; }; NS_SERVER_END #endif
69  bool push(const_reference v, int32_t timeMs = -1){
70  guard_type g(lock_);
71  const int sig = waitNotFull(timeMs, 1);
72  if(sig < 0)
73  return false;
74  con_.push_back(v);
75  const size_t sz = con_.size();
76  if(sz > top_size_) //for statistic
77  top_size_ = sz;
78  not_empty_.signal();
79  return true;
80  }
81  //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 bool pushFront(const_reference v, int32_t timeMs = -1){ guard_type g(lock_); const int sig = waitNotFull(timeMs, 1); if(sig < 0) return false; con_.push_front(v); const size_t sz = con_.size(); if(sz > top_size_) //for statistic top_size_ = sz; not_empty_.signal(); return true; } //把con的所有元素加到队列尾 //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 //只有重载了CLockQueue::_append函数的类型,才能使用此操作 bool pushAll(container_type & con, int32_t timeMs = -1){ if(con.empty()) return true; guard_type g(lock_); const int sig = waitNotFull(timeMs, con.size()); if(sig < 0) return false; _append(con_, con); const size_t sz = con_.size(); if(sz > top_size_) //for statistic top_size_ = sz; not_empty_.broadcast(); return true; } //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 bool pop(reference v, int32_t timeMs = -1){ guard_type g(lock_); const int sig = waitNotEmpty(timeMs, 1); if(sig < 0) return false; v = con_.front(); con_.pop_front(); not_full_.signal(); return true; } //把队列的所有元素转移到con里 //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 bool popAll(container_type & con, int32_t timeMs = -1){ con.clear(); guard_type g(lock_); const int sig = waitNotEmpty(timeMs, (con_.empty() ? capacity_ : con_.size())); if(sig < 0) return false; con_.swap(con); not_full_.broadcast(); return true; } size_t topSize() const volatile{ //队列长度的峰值 guard_type g(lock_); return top_size_; } size_t resetTopSize() volatile{ guard_type g(lock_); size_t ret = top_size_; top_size_ = c().size(); return ret; } private: /* Return Value: -1 failed 0 not full 1 full //*/ int waitNotEmpty(int32_t timeMs, size_t need){ if(need > capacity_) return -1; while(con_.empty()){ if(timeMs < 0){ not_empty_.wait(lock_); }else if(!timeMs || !not_empty_.timeWait(lock_, timeMs)) return -1; } need = (con_.size() <= need ? 0 : con_.size() - need); //size after pop return (con_.size() >= capacity_ && need < capacity_ ? 1 : 0); } /* Return Value: -1 failed 0 not empty 1 empty //*/ int waitNotFull(int32_t timeMs, size_t need){ assert(need > 0); if(need > capacity_) return -1; while(con_.size() + need > capacity_){ if(timeMs < 0){ not_full_.wait(lock_); }else if(!timeMs || !not_full_.timeWait(lock_, timeMs)) return -1; } return (con_.empty() ? 1 : 0); } container_type & c() volatile{return const_cast<container_type &>(con_);} const container_type & c() const volatile{return const_cast<const container_type &>(con_);} CLockQueue(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields container_type con_; size_type capacity_; //队列最大长度,达到capacity_后push会阻塞 size_type top_size_; //con_.size()的峰值,统计用 lock_type lock_; CCondition not_empty_, not_full_; }; NS_SERVER_END #endif
82  bool pushFront(const_reference v, int32_t timeMs = -1){
83  guard_type g(lock_);
84  const int sig = waitNotFull(timeMs, 1);
85  if(sig < 0)
86  return false;
87  con_.push_front(v);
88  const size_t sz = con_.size();
89  if(sz > top_size_) //for statistic
90  top_size_ = sz;
91  not_empty_.signal();
92  return true;
93  }
94  //把con的所有元素加到队列尾
95  //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 //只有重载了CLockQueue::_append函数的类型,才能使用此操作 bool pushAll(container_type & con, int32_t timeMs = -1){ if(con.empty()) return true; guard_type g(lock_); const int sig = waitNotFull(timeMs, con.size()); if(sig < 0) return false; _append(con_, con); const size_t sz = con_.size(); if(sz > top_size_) //for statistic top_size_ = sz; not_empty_.broadcast(); return true; } //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 bool pop(reference v, int32_t timeMs = -1){ guard_type g(lock_); const int sig = waitNotEmpty(timeMs, 1); if(sig < 0) return false; v = con_.front(); con_.pop_front(); not_full_.signal(); return true; } //把队列的所有元素转移到con里 //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 bool popAll(container_type & con, int32_t timeMs = -1){ con.clear(); guard_type g(lock_); const int sig = waitNotEmpty(timeMs, (con_.empty() ? capacity_ : con_.size())); if(sig < 0) return false; con_.swap(con); not_full_.broadcast(); return true; } size_t topSize() const volatile{ //队列长度的峰值 guard_type g(lock_); return top_size_; } size_t resetTopSize() volatile{ guard_type g(lock_); size_t ret = top_size_; top_size_ = c().size(); return ret; } private: /* Return Value: -1 failed 0 not full 1 full //*/ int waitNotEmpty(int32_t timeMs, size_t need){ if(need > capacity_) return -1; while(con_.empty()){ if(timeMs < 0){ not_empty_.wait(lock_); }else if(!timeMs || !not_empty_.timeWait(lock_, timeMs)) return -1; } need = (con_.size() <= need ? 0 : con_.size() - need); //size after pop return (con_.size() >= capacity_ && need < capacity_ ? 1 : 0); } /* Return Value: -1 failed 0 not empty 1 empty //*/ int waitNotFull(int32_t timeMs, size_t need){ assert(need > 0); if(need > capacity_) return -1; while(con_.size() + need > capacity_){ if(timeMs < 0){ not_full_.wait(lock_); }else if(!timeMs || !not_full_.timeWait(lock_, timeMs)) return -1; } return (con_.empty() ? 1 : 0); } container_type & c() volatile{return const_cast<container_type &>(con_);} const container_type & c() const volatile{return const_cast<const container_type &>(con_);} CLockQueue(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields container_type con_; size_type capacity_; //队列最大长度,达到capacity_后push会阻塞 size_type top_size_; //con_.size()的峰值,统计用 lock_type lock_; CCondition not_empty_, not_full_; }; NS_SERVER_END #endif
96  //只有重载了CLockQueue::_append函数的类型,才能使用此操作 bool pushAll(container_type & con, int32_t timeMs = -1){ if(con.empty()) return true; guard_type g(lock_); const int sig = waitNotFull(timeMs, con.size()); if(sig < 0) return false; _append(con_, con); const size_t sz = con_.size(); if(sz > top_size_) //for statistic top_size_ = sz; not_empty_.broadcast(); return true; } //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 bool pop(reference v, int32_t timeMs = -1){ guard_type g(lock_); const int sig = waitNotEmpty(timeMs, 1); if(sig < 0) return false; v = con_.front(); con_.pop_front(); not_full_.signal(); return true; } //把队列的所有元素转移到con里 //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 bool popAll(container_type & con, int32_t timeMs = -1){ con.clear(); guard_type g(lock_); const int sig = waitNotEmpty(timeMs, (con_.empty() ? capacity_ : con_.size())); if(sig < 0) return false; con_.swap(con); not_full_.broadcast(); return true; } size_t topSize() const volatile{ //队列长度的峰值 guard_type g(lock_); return top_size_; } size_t resetTopSize() volatile{ guard_type g(lock_); size_t ret = top_size_; top_size_ = c().size(); return ret; } private: /* Return Value: -1 failed 0 not full 1 full //*/ int waitNotEmpty(int32_t timeMs, size_t need){ if(need > capacity_) return -1; while(con_.empty()){ if(timeMs < 0){ not_empty_.wait(lock_); }else if(!timeMs || !not_empty_.timeWait(lock_, timeMs)) return -1; } need = (con_.size() <= need ? 0 : con_.size() - need); //size after pop return (con_.size() >= capacity_ && need < capacity_ ? 1 : 0); } /* Return Value: -1 failed 0 not empty 1 empty //*/ int waitNotFull(int32_t timeMs, size_t need){ assert(need > 0); if(need > capacity_) return -1; while(con_.size() + need > capacity_){ if(timeMs < 0){ not_full_.wait(lock_); }else if(!timeMs || !not_full_.timeWait(lock_, timeMs)) return -1; } return (con_.empty() ? 1 : 0); } container_type & c() volatile{return const_cast<container_type &>(con_);} const container_type & c() const volatile{return const_cast<const container_type &>(con_);} CLockQueue(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields container_type con_; size_type capacity_; //队列最大长度,达到capacity_后push会阻塞 size_type top_size_; //con_.size()的峰值,统计用 lock_type lock_; CCondition not_empty_, not_full_; }; NS_SERVER_END #endif
97  bool pushAll(container_type & con, int32_t timeMs = -1){
98  if(con.empty())
99  return true;
100  guard_type g(lock_);
101  const int sig = waitNotFull(timeMs, con.size());
102  if(sig < 0)
103  return false;
104  _append(con_, con);
105  const size_t sz = con_.size();
106  if(sz > top_size_) //for statistic
107  top_size_ = sz;
108  not_empty_.broadcast();
109  return true;
110  }
111  //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 bool pop(reference v, int32_t timeMs = -1){ guard_type g(lock_); const int sig = waitNotEmpty(timeMs, 1); if(sig < 0) return false; v = con_.front(); con_.pop_front(); not_full_.signal(); return true; } //把队列的所有元素转移到con里 //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 bool popAll(container_type & con, int32_t timeMs = -1){ con.clear(); guard_type g(lock_); const int sig = waitNotEmpty(timeMs, (con_.empty() ? capacity_ : con_.size())); if(sig < 0) return false; con_.swap(con); not_full_.broadcast(); return true; } size_t topSize() const volatile{ //队列长度的峰值 guard_type g(lock_); return top_size_; } size_t resetTopSize() volatile{ guard_type g(lock_); size_t ret = top_size_; top_size_ = c().size(); return ret; } private: /* Return Value: -1 failed 0 not full 1 full //*/ int waitNotEmpty(int32_t timeMs, size_t need){ if(need > capacity_) return -1; while(con_.empty()){ if(timeMs < 0){ not_empty_.wait(lock_); }else if(!timeMs || !not_empty_.timeWait(lock_, timeMs)) return -1; } need = (con_.size() <= need ? 0 : con_.size() - need); //size after pop return (con_.size() >= capacity_ && need < capacity_ ? 1 : 0); } /* Return Value: -1 failed 0 not empty 1 empty //*/ int waitNotFull(int32_t timeMs, size_t need){ assert(need > 0); if(need > capacity_) return -1; while(con_.size() + need > capacity_){ if(timeMs < 0){ not_full_.wait(lock_); }else if(!timeMs || !not_full_.timeWait(lock_, timeMs)) return -1; } return (con_.empty() ? 1 : 0); } container_type & c() volatile{return const_cast<container_type &>(con_);} const container_type & c() const volatile{return const_cast<const container_type &>(con_);} CLockQueue(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields container_type con_; size_type capacity_; //队列最大长度,达到capacity_后push会阻塞 size_type top_size_; //con_.size()的峰值,统计用 lock_type lock_; CCondition not_empty_, not_full_; }; NS_SERVER_END #endif
112  bool pop(reference v, int32_t timeMs = -1){
113  guard_type g(lock_);
114  const int sig = waitNotEmpty(timeMs, 1);
115  if(sig < 0)
116  return false;
117  v = con_.front();
118  con_.pop_front();
119  not_full_.signal();
120  return true;
121  }
122  //把队列的所有元素转移到con里
123  //timeMs < 0,阻塞式;timeMs >= 0,等待timeMs毫秒 bool popAll(container_type & con, int32_t timeMs = -1){ con.clear(); guard_type g(lock_); const int sig = waitNotEmpty(timeMs, (con_.empty() ? capacity_ : con_.size())); if(sig < 0) return false; con_.swap(con); not_full_.broadcast(); return true; } size_t topSize() const volatile{ //队列长度的峰值 guard_type g(lock_); return top_size_; } size_t resetTopSize() volatile{ guard_type g(lock_); size_t ret = top_size_; top_size_ = c().size(); return ret; } private: /* Return Value: -1 failed 0 not full 1 full //*/ int waitNotEmpty(int32_t timeMs, size_t need){ if(need > capacity_) return -1; while(con_.empty()){ if(timeMs < 0){ not_empty_.wait(lock_); }else if(!timeMs || !not_empty_.timeWait(lock_, timeMs)) return -1; } need = (con_.size() <= need ? 0 : con_.size() - need); //size after pop return (con_.size() >= capacity_ && need < capacity_ ? 1 : 0); } /* Return Value: -1 failed 0 not empty 1 empty //*/ int waitNotFull(int32_t timeMs, size_t need){ assert(need > 0); if(need > capacity_) return -1; while(con_.size() + need > capacity_){ if(timeMs < 0){ not_full_.wait(lock_); }else if(!timeMs || !not_full_.timeWait(lock_, timeMs)) return -1; } return (con_.empty() ? 1 : 0); } container_type & c() volatile{return const_cast<container_type &>(con_);} const container_type & c() const volatile{return const_cast<const container_type &>(con_);} CLockQueue(const __Myt &); //disable copy and assignment __Myt & operator =(const __Myt &); //fields container_type con_; size_type capacity_; //队列最大长度,达到capacity_后push会阻塞 size_type top_size_; //con_.size()的峰值,统计用 lock_type lock_; CCondition not_empty_, not_full_; }; NS_SERVER_END #endif
124  bool popAll(container_type & con, int32_t timeMs = -1){
125  con.clear();
126  guard_type g(lock_);
127  const int sig = waitNotEmpty(timeMs, (con_.empty() ? capacity_ : con_.size()));
128  if(sig < 0)
129  return false;
130  con_.swap(con);
131  not_full_.broadcast();
132  return true;
133  }
134  size_t topSize() const volatile{ //队列长度的峰值
135  guard_type g(lock_);
136  return top_size_;
137  }
138  size_t resetTopSize() volatile{
139  guard_type g(lock_);
140  size_t ret = top_size_;
141  top_size_ = c().size();
142  return ret;
143  }
144 private:
145  /*
146  Return Value:
147  -1 failed
148  0 not full
149  1 full
150  //*/
151  int waitNotEmpty(int32_t timeMs, size_t need){
152  if(need > capacity_)
153  return -1;
154  while(con_.empty()){
155  if(timeMs < 0){
156  not_empty_.wait(lock_);
157  }else if(!timeMs || !not_empty_.timeWait(lock_, timeMs))
158  return -1;
159  }
160  need = (con_.size() <= need ? 0 : con_.size() - need); //size after pop
161  return (con_.size() >= capacity_ && need < capacity_ ? 1 : 0);
162  }
163  /*
164  Return Value:
165  -1 failed
166  0 not empty
167  1 empty
168  //*/
169  int waitNotFull(int32_t timeMs, size_t need){
170  assert(need > 0);
171  if(need > capacity_)
172  return -1;
173  while(con_.size() + need > capacity_){
174  if(timeMs < 0){
175  not_full_.wait(lock_);
176  }else if(!timeMs || !not_full_.timeWait(lock_, timeMs))
177  return -1;
178  }
179  return (con_.empty() ? 1 : 0);
180  }
181  container_type & c() volatile{return const_cast<container_type &>(con_);}
182  const container_type & c() const volatile{return const_cast<const container_type &>(con_);}
183  CLockQueue(const __Myt &); //disable copy and assignment
184  __Myt & operator =(const __Myt &);
185  //fields
186  container_type con_;
187  size_type capacity_; //队列最大长度,达到capacity_后push会阻塞 size_type top_size_; //con_.size()的峰值,统计用 lock_type lock_; CCondition not_empty_, not_full_; }; NS_SERVER_END #endif
188  size_type top_size_; //con_.size()的峰值,统计用 lock_type lock_; CCondition not_empty_, not_full_; }; NS_SERVER_END #endif
189  lock_type lock_;
190  CCondition not_empty_, not_full_;
191 };
192 
193 NS_SERVER_END
194 
195 #endif
196 
Definition: mutex.hh:73
Definition: mutex.hh:430
Definition: mutex.hh:146
Definition: single_list.hh:28
Definition: lock_queue.hh:29