#pragma once #include "DiosLock.h" #include #include "Definitions.h" using namespace std; template class MsgQueue { DWORD m_MaxThreshold; DWORD m_CurThreshold; DWORD m_OwnerTid; DiosLock m_Lock; HANDLE m_NotifyInQue; queue m_DataQueue; public: MsgQueue(void) { m_CurThreshold = 0; m_MaxThreshold = 2500000; m_OwnerTid = 0; m_NotifyInQue = CreateEvent(0,1,0,0); }; ~MsgQueue(void) { Clear(); CloseHandle(m_NotifyInQue); }; bool Clear() { ResetEvent(m_NotifyInQue); m_Lock.Thread_Lock(); m_OwnerTid = GetCurrentThreadId(); while(m_DataQueue.size() > 0) { m_DataQueue.pop(); } m_Lock.Thread_UnLock(); return true; }; HANDLE GetNotifyHandle() { return m_NotifyInQue; }; bool DeQueue(T &rMsg) { bool ret = false; m_Lock.Thread_Lock(); //DebugPrint("before deQue size:%d\n",m_DataQueue.size()); if(m_DataQueue.size() > 0) { rMsg = m_DataQueue.front(); m_DataQueue.pop(); ret = true; } //DebugPrint("after deQue size:%d\n",m_DataQueue.size()); if (m_DataQueue.size() == 0) { ResetEvent(m_NotifyInQue); } m_Lock.Thread_UnLock(); return ret; }; MsgQueue& operator = (const MsgQueue &tValue) { m_Lock.Thread_Lock(); tValue.m_Lock.Thread_Lock(); m_DataQueue = tValue.m_DataQueue; m_Lock.Thread_UnLock(); tValue.m_Lock.Thread_UnLock(); return (*this); }; void Lock(DWORD Timeout = INFINITE) { m_Lock.Thread_Lock(Timeout); }; void UnLock() { m_Lock.Thread_UnLock(); }; bool InQueue(T &rMsg) { bool ret = false; m_Lock.Thread_Lock(); DWORD curSize = (DWORD)m_DataQueue.size(); #ifdef _DEBUG if (curSize > m_CurThreshold) { printf("Raising The Threshold:%d\n", curSize); m_CurThreshold = curSize; } #endif //printf("Thread:%d,Pop REQ one\n", GetCurrentThreadId()); if (curSize < m_MaxThreshold) { ret = true; } else { printf("WARNING:Hitting MaxThreshold:%d\n", curSize); printf("WARNING:Losing the Packet------------!!!!\n"); //TPRINTA_FATAL("WARNING:Hitting MaxThreshold : %d.Losing packets\n", curSize); } m_DataQueue.push(rMsg); m_Lock.Thread_UnLock(); SetEvent(m_NotifyInQue); return ret; }; DWORD size() { m_Lock.Thread_Lock(); DWORD size = (DWORD)m_DataQueue.size(); m_Lock.Thread_UnLock(); return size; }; T& front() { return m_DataQueue.front(); }; DWORD WaitForInQue(DWORD timeout) { DWORD ret = WAIT_TIMEOUT; m_Lock.Thread_Lock(); if (m_DataQueue.size() > 0) { ret = WAIT_OBJECT_0; } m_Lock.Thread_UnLock(); if (ret == WAIT_TIMEOUT) { ret = WaitForSingleObject(m_NotifyInQue, timeout); if (ret == WAIT_TIMEOUT) { return ret; } //check again //make sure data is in the que m_Lock.Thread_Lock(); if (m_DataQueue.size() > 0) { ret = WAIT_OBJECT_0; } m_Lock.Thread_UnLock(); } return ret; }; };