MsgQueue.h 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. #pragma once
  2. #include "DiosLock.h"
  3. #include <queue>
  4. #include "Definitions.h"
  5. using namespace std;
  6. template<typename T> class MsgQueue
  7. {
  8. DWORD m_MaxThreshold;
  9. DWORD m_CurThreshold;
  10. DWORD m_OwnerTid;
  11. DiosLock m_Lock;
  12. HANDLE m_NotifyInQue;
  13. queue<T> m_DataQueue;
  14. public:
  15. MsgQueue(void)
  16. {
  17. m_CurThreshold = 0;
  18. m_MaxThreshold = 2500000;
  19. m_OwnerTid = 0;
  20. m_NotifyInQue = CreateEvent(0,1,0,0);
  21. };
  22. ~MsgQueue(void)
  23. {
  24. Clear();
  25. CloseHandle(m_NotifyInQue);
  26. };
  27. bool Clear()
  28. {
  29. ResetEvent(m_NotifyInQue);
  30. m_Lock.Thread_Lock();
  31. m_OwnerTid = GetCurrentThreadId();
  32. while(m_DataQueue.size() > 0)
  33. {
  34. m_DataQueue.pop();
  35. }
  36. m_Lock.Thread_UnLock();
  37. return true;
  38. };
  39. HANDLE GetNotifyHandle()
  40. {
  41. return m_NotifyInQue;
  42. };
  43. bool DeQueue(T &rMsg)
  44. {
  45. bool ret = false;
  46. m_Lock.Thread_Lock();
  47. //DebugPrint("before deQue size:%d\n",m_DataQueue.size());
  48. if(m_DataQueue.size() > 0)
  49. {
  50. rMsg = m_DataQueue.front();
  51. m_DataQueue.pop();
  52. ret = true;
  53. }
  54. //DebugPrint("after deQue size:%d\n",m_DataQueue.size());
  55. if (m_DataQueue.size() == 0)
  56. {
  57. ResetEvent(m_NotifyInQue);
  58. }
  59. m_Lock.Thread_UnLock();
  60. return ret;
  61. };
  62. MsgQueue& operator = (const MsgQueue &tValue)
  63. {
  64. m_Lock.Thread_Lock();
  65. tValue.m_Lock.Thread_Lock();
  66. m_DataQueue = tValue.m_DataQueue;
  67. m_Lock.Thread_UnLock();
  68. tValue.m_Lock.Thread_UnLock();
  69. return (*this);
  70. };
  71. void Lock(DWORD Timeout = INFINITE)
  72. {
  73. m_Lock.Thread_Lock(Timeout);
  74. };
  75. void UnLock()
  76. {
  77. m_Lock.Thread_UnLock();
  78. };
  79. bool InQueue(T &rMsg)
  80. {
  81. bool ret = false;
  82. m_Lock.Thread_Lock();
  83. DWORD curSize = (DWORD)m_DataQueue.size();
  84. #ifdef _DEBUG
  85. if (curSize > m_CurThreshold)
  86. {
  87. printf("Raising The Threshold:%d\n", curSize);
  88. m_CurThreshold = curSize;
  89. }
  90. #endif
  91. //printf("Thread:%d,Pop REQ one\n", GetCurrentThreadId());
  92. if (curSize < m_MaxThreshold)
  93. {
  94. ret = true;
  95. }
  96. else
  97. {
  98. printf("WARNING:Hitting MaxThreshold:%d\n", curSize);
  99. printf("WARNING:Losing the Packet------------!!!!\n");
  100. //TPRINTA_FATAL("WARNING:Hitting MaxThreshold : %d.Losing packets\n", curSize);
  101. }
  102. m_DataQueue.push(rMsg);
  103. m_Lock.Thread_UnLock();
  104. SetEvent(m_NotifyInQue);
  105. return ret;
  106. };
  107. DWORD size()
  108. {
  109. m_Lock.Thread_Lock();
  110. DWORD size = (DWORD)m_DataQueue.size();
  111. m_Lock.Thread_UnLock();
  112. return size;
  113. };
  114. T& front()
  115. {
  116. return m_DataQueue.front();
  117. };
  118. DWORD WaitForInQue(DWORD timeout)
  119. {
  120. DWORD ret = WAIT_TIMEOUT;
  121. m_Lock.Thread_Lock();
  122. if (m_DataQueue.size() > 0)
  123. {
  124. ret = WAIT_OBJECT_0;
  125. }
  126. m_Lock.Thread_UnLock();
  127. if (ret == WAIT_TIMEOUT)
  128. {
  129. ret = WaitForSingleObject(m_NotifyInQue, timeout);
  130. if (ret == WAIT_TIMEOUT)
  131. {
  132. return ret;
  133. }
  134. //check again
  135. //make sure data is in the que
  136. m_Lock.Thread_Lock();
  137. if (m_DataQueue.size() > 0)
  138. {
  139. ret = WAIT_OBJECT_0;
  140. }
  141. m_Lock.Thread_UnLock();
  142. }
  143. return ret;
  144. };
  145. };