MsgQueue.h 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. #pragma once
  2. #include "DiosLock.h"
  3. #include "Logger.h"
  4. #include <queue>
  5. #include "Definitions.h"
  6. using namespace std;
  7. template<typename T> class MsgQueue
  8. {
  9. DWORD m_MaxThreshold;
  10. DWORD m_CurThreshold;
  11. DWORD m_OwnerTid;
  12. DiosLock m_Lock;
  13. HANDLE m_NotifyInQue;
  14. queue<T> m_DataQueue;
  15. public:
  16. MsgQueue(void)
  17. {
  18. m_CurThreshold = 0;
  19. m_MaxThreshold = 2500000;
  20. m_OwnerTid = 0;
  21. m_NotifyInQue = CreateEvent(0,1,0,0);
  22. };
  23. ~MsgQueue(void)
  24. {
  25. Clear();
  26. CloseHandle(m_NotifyInQue);
  27. };
  28. bool Clear()
  29. {
  30. ResetEvent(m_NotifyInQue);
  31. m_Lock.Thread_Lock();
  32. m_OwnerTid = GetCurrentThreadId();
  33. while(m_DataQueue.size() > 0)
  34. {
  35. m_DataQueue.pop();
  36. }
  37. m_Lock.Thread_UnLock();
  38. return true;
  39. };
  40. HANDLE GetNotifyHandle()
  41. {
  42. return m_NotifyInQue;
  43. };
  44. bool DeQueue(T &rMsg)
  45. {
  46. bool ret = false;
  47. m_Lock.Thread_Lock();
  48. //DebugPrint("before deQue size:%d\n",m_DataQueue.size());
  49. if(m_DataQueue.size() > 0)
  50. {
  51. rMsg = m_DataQueue.front();
  52. m_DataQueue.pop();
  53. ret = true;
  54. }
  55. //DebugPrint("after deQue size:%d\n",m_DataQueue.size());
  56. if (m_DataQueue.size() == 0)
  57. {
  58. ResetEvent(m_NotifyInQue);
  59. }
  60. m_Lock.Thread_UnLock();
  61. return ret;
  62. };
  63. MsgQueue& operator = (const MsgQueue &tValue)
  64. {
  65. m_Lock.Thread_Lock();
  66. tValue.m_Lock.Thread_Lock();
  67. m_DataQueue = tValue.m_DataQueue;
  68. m_Lock.Thread_UnLock();
  69. tValue.m_Lock.Thread_UnLock();
  70. return (*this);
  71. };
  72. void Lock(DWORD Timeout = INFINITE)
  73. {
  74. m_Lock.Thread_Lock(Timeout);
  75. };
  76. void UnLock()
  77. {
  78. m_Lock.Thread_UnLock();
  79. };
  80. bool InQueue(T &rMsg)
  81. {
  82. bool ret = false;
  83. m_Lock.Thread_Lock();
  84. DWORD curSize = (DWORD)m_DataQueue.size();
  85. #ifdef _DEBUG
  86. if (curSize > m_CurThreshold)
  87. {
  88. printf("Raising The Threshold:%d\n", curSize);
  89. m_CurThreshold = curSize;
  90. }
  91. #endif
  92. //printf("Thread:%d,Pop REQ one\n", GetCurrentThreadId());
  93. if (curSize < m_MaxThreshold)
  94. {
  95. ret = true;
  96. }
  97. else
  98. {
  99. printf("WARNING:Hitting MaxThreshold:%d\n", curSize);
  100. printf("WARNING:Losing the Packet------------!!!!\n");
  101. TPRINTA_FATAL("WARNING:Hitting MaxThreshold : %d.Losing packets\n", curSize);
  102. }
  103. m_DataQueue.push(rMsg);
  104. m_Lock.Thread_UnLock();
  105. SetEvent(m_NotifyInQue);
  106. return ret;
  107. };
  108. DWORD size()
  109. {
  110. m_Lock.Thread_Lock();
  111. DWORD size = (DWORD)m_DataQueue.size();
  112. m_Lock.Thread_UnLock();
  113. return size;
  114. };
  115. T& front()
  116. {
  117. return m_DataQueue.front();
  118. };
  119. DWORD WaitForInQue(DWORD timeout)
  120. {
  121. DWORD ret = WAIT_TIMEOUT;
  122. m_Lock.Thread_Lock();
  123. if (m_DataQueue.size() > 0)
  124. {
  125. ret = WAIT_OBJECT_0;
  126. }
  127. m_Lock.Thread_UnLock();
  128. if (ret == WAIT_TIMEOUT)
  129. {
  130. ret = WaitForSingleObject(m_NotifyInQue, timeout);
  131. if (ret == WAIT_TIMEOUT)
  132. {
  133. return ret;
  134. }
  135. //check again
  136. //make sure data is in the que
  137. m_Lock.Thread_Lock();
  138. if (m_DataQueue.size() > 0)
  139. {
  140. ret = WAIT_OBJECT_0;
  141. }
  142. m_Lock.Thread_UnLock();
  143. }
  144. return ret;
  145. };
  146. };