MsgLoop.h 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. #pragma once
  2. #include <queue>
  3. using namespace std;
  4. template<typename T> class MsgLoop
  5. {
  6. size_t m_MaxThreshold;
  7. HANDLE m_Lock;
  8. HANDLE m_NotifyInQue;
  9. deque<T> m_DataQueue;
  10. public:
  11. MsgLoop(void)
  12. {
  13. m_MaxThreshold = 0;
  14. m_Lock = CreateMutex(0,0,0);
  15. m_NotifyInQue = CreateEvent(0,1,0,0);
  16. };
  17. ~MsgLoop(void)
  18. {
  19. Clear();
  20. CloseHandle(m_Lock);
  21. CloseHandle(m_NotifyInQue);
  22. };
  23. void init(size_t LoopThreshold)
  24. {
  25. m_MaxThreshold = LoopThreshold;
  26. };
  27. bool Clear()
  28. {
  29. ResetEvent(m_NotifyInQue);
  30. WaitForSingleObject(m_Lock,INFINITE);
  31. m_DataQueue.clear();
  32. ReleaseMutex(m_Lock);
  33. return true;
  34. };
  35. MsgLoop& operator = (const MsgLoop &tValue)
  36. {
  37. if(this != &tValue)
  38. {
  39. WaitForSingleObject(m_Lock, INFINITE);
  40. WaitForSingleObject(tValue.m_Lock, INFINITE);
  41. m_DataQueue = tValue.m_DataQueue;
  42. ReleaseMutex(m_Lock);
  43. ReleaseMutex(tValue.m_Lock);
  44. }
  45. return (*this);
  46. };
  47. void Lock()
  48. {
  49. WaitForSingleObject(m_Lock, INFINITE);
  50. };
  51. void UnLock()
  52. {
  53. ReleaseMutex(m_Lock);
  54. };
  55. T* PeekLatest()
  56. {
  57. size_t Size = m_DataQueue.size();
  58. if (Size > 0)
  59. {
  60. return &(m_DataQueue.front());
  61. }
  62. return NULL;
  63. }
  64. T* PeekBackware()
  65. {
  66. size_t Size = m_DataQueue.size();
  67. if (Size > 0)
  68. {
  69. if (Size <= m_MaxThreshold)
  70. {
  71. return &(m_DataQueue.back());
  72. }
  73. else
  74. {
  75. return &(m_DataQueue[m_MaxThreshold - 1]);
  76. }
  77. }
  78. return NULL;
  79. }
  80. bool InQueueBackward(T &rMsg)
  81. {
  82. bool ret = false;
  83. WaitForSingleObject(m_Lock, INFINITE);
  84. //printf("Thread:%d,Pop REQ one\n", GetCurrentThreadId());
  85. m_DataQueue.push_back(rMsg);
  86. //while (m_DataQueue.size() > (m_MaxThreshold*2))
  87. //{
  88. // m_DataQueue.pop_front();
  89. //}
  90. ReleaseMutex(m_Lock);
  91. SetEvent(m_NotifyInQue);
  92. return ret;
  93. };
  94. bool InQueue(T &rMsg)
  95. {
  96. bool ret = false;
  97. WaitForSingleObject(m_Lock, INFINITE);
  98. //printf("Thread:%d,Pop REQ one\n", GetCurrentThreadId());
  99. m_DataQueue.push_front(rMsg);
  100. //while (m_DataQueue.size() > (m_MaxThreshold*2))
  101. //{
  102. // m_DataQueue.pop_back();
  103. //}
  104. ReleaseMutex(m_Lock);
  105. SetEvent(m_NotifyInQue);
  106. return ret;
  107. };
  108. void DeQueueLatest()
  109. {
  110. WaitForSingleObject(m_Lock, INFINITE);
  111. if (m_DataQueue.size() > 0)
  112. {
  113. m_DataQueue.pop_front();
  114. }
  115. ReleaseMutex(m_Lock);
  116. };
  117. size_t size()
  118. {
  119. size_t Size = 0;
  120. WaitForSingleObject(m_Lock, INFINITE);
  121. Size = m_DataQueue.size();
  122. ReleaseMutex(m_Lock);
  123. if (Size > m_MaxThreshold)
  124. {
  125. Size = m_MaxThreshold;
  126. }
  127. return Size;
  128. };
  129. HANDLE GetNotifyHandle()
  130. {
  131. return m_NotifyInQue;
  132. };
  133. //must use with lock&unlock method
  134. T& operator [] (size_t idx)
  135. {
  136. if (idx >= m_MaxThreshold)
  137. {
  138. assert(0);
  139. }
  140. return m_DataQueue[idx];
  141. };
  142. DWORD WaitForInQue(DWORD timeout)
  143. {
  144. DWORD ret = WAIT_TIMEOUT;
  145. WaitForSingleObject(m_Lock, INFINITE);
  146. if (m_DataQueue.size() > 0)
  147. {
  148. ret = WAIT_OBJECT_0;
  149. }
  150. ReleaseMutex(m_Lock);
  151. if (ret == WAIT_TIMEOUT)
  152. {
  153. ret = WaitForSingleObject(m_NotifyInQue, timeout);
  154. if (ret == WAIT_TIMEOUT)
  155. {
  156. return ret;
  157. }
  158. //check again
  159. //make sure data is in the que
  160. WaitForSingleObject(m_Lock, INFINITE);
  161. if (m_DataQueue.size() > 0)
  162. {
  163. ret = WAIT_OBJECT_0;
  164. }
  165. ReleaseMutex(m_Lock);
  166. }
  167. return ret;
  168. };
  169. };