MsgCircle.h 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. #pragma once
  2. #include "Logger.h"
  3. #include "DiosLock.h"
  4. #include <queue>
  5. #include "Definitions.h"
  6. using namespace std;
  7. template<typename T> class MsgCircle
  8. {
  9. DWORD m_MaxThreshold;
  10. DWORD m_CurThreshold;
  11. DiosLock m_Lock;
  12. HANDLE m_NotifyInQue;
  13. deque<T> m_DataQueue;
  14. public:
  15. MsgCircle(void)
  16. {
  17. m_CurThreshold = 0;
  18. m_MaxThreshold = 2500000;
  19. m_NotifyInQue = CreateEvent(0,1,0,0);
  20. };
  21. ~MsgCircle(void)
  22. {
  23. Clear();
  24. CloseHandle(m_NotifyInQue);
  25. };
  26. bool Clear()
  27. {
  28. ResetEvent(m_NotifyInQue);
  29. Lock();
  30. m_DataQueue.clear();
  31. UnLock();
  32. return true;
  33. };
  34. bool Peek(T &rMsg)
  35. {
  36. bool ret = false;
  37. Lock();
  38. //DebugPrint("before deQue size:%d\n",m_DataQueue.size());
  39. if (m_DataQueue.size() > 0)
  40. {
  41. rMsg = m_DataQueue.front();
  42. ret = true;
  43. }
  44. //DebugPrint("after deQue size:%d\n",m_DataQueue.size());
  45. UnLock();
  46. return ret;
  47. };
  48. bool DeQueue(T &rMsg)
  49. {
  50. bool ret = false;
  51. Lock();
  52. //DebugPrint("before deQue size:%d\n",m_DataQueue.size());
  53. if(m_DataQueue.size() > 0)
  54. {
  55. rMsg = m_DataQueue.front();
  56. m_DataQueue.pop_front();
  57. ret = true;
  58. }
  59. //DebugPrint("after deQue size:%d\n",m_DataQueue.size());
  60. if (m_DataQueue.size() == 0)
  61. {
  62. ResetEvent(m_NotifyInQue);
  63. }
  64. UnLock();
  65. return ret;
  66. };
  67. MsgCircle& operator = (const MsgCircle &tValue)
  68. {
  69. if(this != &tValue)
  70. {
  71. Lock();
  72. tValue.Lock();
  73. m_DataQueue = tValue.m_DataQueue;
  74. UnLock();
  75. tValue.UnLock();
  76. }
  77. return (*this);
  78. };
  79. void Lock()
  80. {
  81. m_Lock.Thread_Lock();
  82. };
  83. void UnLock()
  84. {
  85. m_Lock.Thread_UnLock();
  86. };
  87. bool InQueue(T &rMsg)
  88. {
  89. bool ret = false;
  90. Lock();
  91. DWORD curSize = (DWORD)m_DataQueue.size();
  92. #ifdef _DEBUG
  93. if (curSize > m_CurThreshold)
  94. {
  95. printf("Raising The Threshold:%d\n", curSize);
  96. m_CurThreshold = curSize;
  97. }
  98. #endif
  99. //printf("Thread:%d,Pop REQ one\n", GetCurrentThreadId());
  100. if (curSize < m_MaxThreshold)
  101. {
  102. ret = true;
  103. m_DataQueue.push_back(rMsg);
  104. }
  105. else
  106. {
  107. printf("WARNING:Hitting MaxThreshold:%d\n", curSize);
  108. printf("WARNING:Losing the Packet------------!!!!\n");
  109. TPRINTA_FATAL("WARNING:Hitting MaxThreshold : %d.Losing packets\n", curSize);
  110. }
  111. UnLock();
  112. SetEvent(m_NotifyInQue);
  113. return ret;
  114. };
  115. DWORD size()
  116. {
  117. Lock();
  118. DWORD size = (DWORD)m_DataQueue.size();
  119. UnLock();
  120. return size;
  121. };
  122. T& front()
  123. {
  124. return m_DataQueue.front();
  125. };
  126. HANDLE GetNotifyHandle()
  127. {
  128. return m_NotifyInQue;
  129. };
  130. //must use with lock&unlock method
  131. T& operator [] (DWORD idx)
  132. {
  133. if(idx > m_DataQueue.size())
  134. {
  135. assert(0);
  136. }
  137. return m_DataQueue[idx];
  138. };
  139. DWORD WaitForInQue(DWORD timeout)
  140. {
  141. DWORD ret = WAIT_TIMEOUT;
  142. Lock();
  143. if (m_DataQueue.size() > 0)
  144. {
  145. ret = WAIT_OBJECT_0;
  146. }
  147. UnLock();
  148. if (ret == WAIT_TIMEOUT)
  149. {
  150. ret = WaitForSingleObject(m_NotifyInQue, timeout);
  151. if (ret == WAIT_TIMEOUT)
  152. {
  153. return ret;
  154. }
  155. //check again
  156. //make sure data is in the que
  157. Lock();
  158. if (m_DataQueue.size() > 0)
  159. {
  160. ret = WAIT_OBJECT_0;
  161. }
  162. UnLock();
  163. }
  164. return ret;
  165. };
  166. };