MsgQueue.h 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. #pragma once
  2. #include "ScopeLock.h"
  3. #include "Definitions.h"
  4. #include "LinuxEvent.h"
  5. #include <queue>
  6. #include <pthread.h>
  7. #include <ctime>
  8. #include <cerrno>
  9. #ifndef INFINITE
  10. #define INFINITE 0xFFFFFFFF
  11. #endif
  12. #ifndef WAIT_OBJECT_0
  13. #define WAIT_OBJECT_0 0
  14. #endif
  15. #ifndef WAIT_TIMEOUT
  16. #define WAIT_TIMEOUT 258
  17. #endif
  18. template<typename T> class MsgQueue
  19. {
  20. uint32_t m_MaxThreshold;
  21. uint32_t m_CurThreshold;
  22. uint32_t m_OwnerTid;
  23. CCriticalSec m_Lock;
  24. std::shared_ptr<LinuxEvent> m_NotifyInQue;
  25. std::queue<T> m_DataQueue;
  26. public:
  27. MsgQueue()
  28. {
  29. m_NotifyInQue = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET, false);
  30. m_CurThreshold = 0;
  31. m_MaxThreshold = 2500000;
  32. m_OwnerTid = 0;
  33. };
  34. ~MsgQueue()
  35. {
  36. Clear();
  37. };
  38. bool Clear()
  39. {
  40. CScopeLock lock(m_Lock);
  41. m_NotifyInQue->ResetEvent();
  42. m_OwnerTid = pthread_self();
  43. while (!m_DataQueue.empty()) {
  44. m_DataQueue.pop();
  45. }
  46. return true;
  47. };
  48. std::shared_ptr<LinuxEvent> GetNotifyHandle()
  49. {
  50. return m_NotifyInQue;
  51. }
  52. bool DeQueue(T& rMsg)
  53. {
  54. bool ret = false;
  55. //m_Lock.Thread_Lock();
  56. CScopeLock lock(m_Lock);
  57. //DebugPrint("before deQue size:%d\n",m_DataQueue.size());
  58. if (m_DataQueue.size() > 0)
  59. {
  60. rMsg = m_DataQueue.front();
  61. m_DataQueue.pop();
  62. ret = true;
  63. }
  64. //DebugPrint("after deQue size:%d\n",m_DataQueue.size());
  65. if (m_DataQueue.size() == 0)
  66. {
  67. m_NotifyInQue->ResetEvent();
  68. }
  69. //m_Lock.Thread_UnLock();
  70. return ret;
  71. };
  72. MsgQueue& operator = (const MsgQueue& tValue)
  73. {
  74. //m_Lock.Thread_Lock();
  75. //tValue.m_Lock.Thread_Lock();
  76. CScopeLock lock(m_Lock);
  77. m_DataQueue = tValue.m_DataQueue;
  78. //m_Lock.Thread_UnLock();
  79. //tValue.m_Lock.Thread_UnLock();
  80. return (*this);
  81. };
  82. bool InQueue(T& rMsg)
  83. {
  84. bool ret = false;
  85. CScopeLock lock(m_Lock);
  86. DWORD curSize = (DWORD)m_DataQueue.size();
  87. //printf("Thread:%d,Pop REQ one\n", GetCurrentThreadId());
  88. if (curSize < m_MaxThreshold)
  89. {
  90. ret = true;
  91. }
  92. else
  93. {
  94. // printf("WARNING:Hitting MaxThreshold:%d\n", curSize);
  95. //printf("WARNING:Losing the Packet------------!!!!\n");
  96. //TPRINTA_FATAL("WARNING:Hitting MaxThreshold : %d.Losing packets\n", curSize);
  97. }
  98. m_DataQueue.push(rMsg);
  99. //m_Lock.Thread_UnLock();
  100. m_NotifyInQue->SetEvent();
  101. return ret;
  102. };
  103. size_t size()
  104. {
  105. //m_Lock.Thread_Lock();
  106. CScopeLock lock(m_Lock);
  107. DWORD size = (DWORD)m_DataQueue.size();
  108. //m_Lock.Thread_UnLock();
  109. return size;
  110. };
  111. T& front()
  112. {
  113. return m_DataQueue.front();
  114. };
  115. DWORD WaitForInQue(uint32_t timeout)
  116. {
  117. DWORD ret = WAIT_TIMEOUT;
  118. //m_Lock.Thread_Lock();
  119. CScopeLock lock(m_Lock);
  120. if (m_DataQueue.size() > 0)
  121. {
  122. ret = 0;
  123. }
  124. //m_Lock.Thread_UnLock();
  125. if (ret == WAIT_TIMEOUT)
  126. {
  127. ret = m_NotifyInQue->Wait(timeout);
  128. if (ret == WAIT_TIMEOUT)
  129. {
  130. return ret;
  131. }
  132. //check again
  133. //make sure data is in the que
  134. //m_Lock.Thread_Lock();
  135. if (m_DataQueue.size() > 0)
  136. {
  137. ret = 0;
  138. }
  139. //m_Lock.Thread_UnLock();
  140. }
  141. return ret;
  142. };
  143. };