MsgCircle.h 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. #pragma once
  2. //#include "Logger.h"
  3. #include "CcosLock.h"
  4. #include <queue>
  5. #include <memory>
  6. #include "Definitions.h"
  7. #include "LinuxEvent.h"
  8. using namespace std;
  9. template<typename T> class MsgCircle
  10. {
  11. DWORD m_MaxThreshold;
  12. DWORD m_CurThreshold;
  13. CcosLock m_Lock;
  14. std::shared_ptr<LinuxEvent> m_NotifyInQue;
  15. deque<T> m_DataQueue;
  16. public:
  17. MsgCircle(void)
  18. {
  19. m_CurThreshold = 0;
  20. m_MaxThreshold = 2500000;
  21. m_NotifyInQue = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET,false);
  22. };
  23. ~MsgCircle(void)
  24. {
  25. Clear();
  26. };
  27. bool Clear()
  28. {
  29. m_NotifyInQue->ResetEvent();
  30. Lock();
  31. m_DataQueue.clear();
  32. UnLock();
  33. return true;
  34. };
  35. bool Peek(T &rMsg)
  36. {
  37. bool ret = false;
  38. Lock();
  39. //DebugPrint("before deQue size:%d\n",m_DataQueue.size());
  40. if (m_DataQueue.size() > 0)
  41. {
  42. rMsg = m_DataQueue.front();
  43. ret = true;
  44. }
  45. //DebugPrint("after deQue size:%d\n",m_DataQueue.size());
  46. UnLock();
  47. return ret;
  48. };
  49. bool DeQueue(T &rMsg)
  50. {
  51. bool ret = false;
  52. Lock();
  53. //DebugPrint("before deQue size:%d\n",m_DataQueue.size());
  54. if(m_DataQueue.size() > 0)
  55. {
  56. rMsg = m_DataQueue.front();
  57. m_DataQueue.pop_front();
  58. ret = true;
  59. }
  60. //DebugPrint("after deQue size:%d\n",m_DataQueue.size());
  61. if (m_DataQueue.size() == 0)
  62. {
  63. m_NotifyInQue->ResetEvent();
  64. }
  65. UnLock();
  66. return ret;
  67. };
  68. MsgCircle& operator = (const MsgCircle &tValue)
  69. {
  70. if(this != &tValue)
  71. {
  72. Lock();
  73. tValue.Lock();
  74. m_DataQueue = tValue.m_DataQueue;
  75. UnLock();
  76. tValue.UnLock();
  77. }
  78. return (*this);
  79. };
  80. void Lock()
  81. {
  82. m_Lock.Thread_Lock();
  83. };
  84. void UnLock()
  85. {
  86. m_Lock.Thread_UnLock();
  87. };
  88. bool InQueue(T &rMsg)
  89. {
  90. bool ret = false;
  91. Lock();
  92. DWORD curSize = (DWORD)m_DataQueue.size();
  93. #ifdef _DEBUG
  94. if (curSize > m_CurThreshold)
  95. {
  96. printf("Raising The Threshold:%d\n", curSize);
  97. m_CurThreshold = curSize;
  98. }
  99. #endif
  100. //printf("Thread:%d,Pop REQ one\n", GetCurrentThreadId());
  101. if (curSize < m_MaxThreshold)
  102. {
  103. ret = true;
  104. m_DataQueue.push_back(rMsg);
  105. }
  106. else
  107. {
  108. printf("WARNING:Hitting MaxThreshold:%d\n", curSize);
  109. printf("WARNING:Losing the Packet------------!!!!\n");
  110. //TPRINTA_FATAL("WARNING:Hitting MaxThreshold : %d.Losing packets\n", curSize);
  111. }
  112. UnLock();
  113. m_NotifyInQue->SetEvent();
  114. return ret;
  115. };
  116. DWORD size()
  117. {
  118. Lock();
  119. DWORD size = (DWORD)m_DataQueue.size();
  120. UnLock();
  121. return size;
  122. };
  123. T& front()
  124. {
  125. return m_DataQueue.front();
  126. };
  127. std::shared_ptr<LinuxEvent> GetNotifyHandle()
  128. {
  129. return m_NotifyInQue;
  130. };
  131. //must use with lock&unlock method
  132. T& operator [] (DWORD idx)
  133. {
  134. if(idx > m_DataQueue.size())
  135. {
  136. assert(0);
  137. }
  138. return m_DataQueue[idx];
  139. };
  140. DWORD WaitForInQue(DWORD timeout)
  141. {
  142. DWORD ret = WAIT_TIMEOUT;
  143. Lock();
  144. if (m_DataQueue.size() > 0)
  145. {
  146. ret = WAIT_OBJECT_0;
  147. }
  148. UnLock();
  149. if (ret == WAIT_TIMEOUT)
  150. {
  151. ret = m_NotifyInQue->Wait(timeout);
  152. if (ret == WAIT_TIMEOUT)
  153. {
  154. return ret;
  155. }
  156. //check again
  157. //make sure data is in the que
  158. Lock();
  159. if (m_DataQueue.size() > 0)
  160. {
  161. ret = WAIT_OBJECT_0;
  162. }
  163. UnLock();
  164. }
  165. return ret;
  166. };
  167. };