#pragma once #include "ScopeLock.h" #include "Definitions.h" #include "LinuxEvent.h" #include #include #include #include #ifndef INFINITE #define INFINITE 0xFFFFFFFF #endif #ifndef WAIT_OBJECT_0 #define WAIT_OBJECT_0 0 #endif #ifndef WAIT_TIMEOUT #define WAIT_TIMEOUT 258 #endif template class MsgQueue { uint32_t m_MaxThreshold; uint32_t m_CurThreshold; uint32_t m_OwnerTid; CCriticalSec m_Lock; std::shared_ptr m_NotifyInQue; std::queue m_DataQueue; public: MsgQueue() { m_NotifyInQue = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET, false); m_CurThreshold = 0; m_MaxThreshold = 2500000; m_OwnerTid = 0; }; ~MsgQueue() { Clear(); }; bool Clear() { CScopeLock lock(m_Lock); m_NotifyInQue->ResetEvent(); m_OwnerTid = pthread_self(); while (!m_DataQueue.empty()) { m_DataQueue.pop(); } return true; }; std::shared_ptr GetNotifyHandle() { return m_NotifyInQue; } bool DeQueue(T& rMsg) { bool ret = false; //m_Lock.Thread_Lock(); CScopeLock lock(m_Lock); //DebugPrint("before deQue size:%d\n",m_DataQueue.size()); if (m_DataQueue.size() > 0) { rMsg = m_DataQueue.front(); m_DataQueue.pop(); ret = true; } //DebugPrint("after deQue size:%d\n",m_DataQueue.size()); if (m_DataQueue.size() == 0) { m_NotifyInQue->ResetEvent(); } //m_Lock.Thread_UnLock(); return ret; }; MsgQueue& operator = (const MsgQueue& tValue) { //m_Lock.Thread_Lock(); //tValue.m_Lock.Thread_Lock(); CScopeLock lock(m_Lock); m_DataQueue = tValue.m_DataQueue; //m_Lock.Thread_UnLock(); //tValue.m_Lock.Thread_UnLock(); return (*this); }; bool InQueue(T& rMsg) { bool ret = false; CScopeLock lock(m_Lock); DWORD curSize = (DWORD)m_DataQueue.size(); //printf("Thread:%d,Pop REQ one\n", GetCurrentThreadId()); if (curSize < m_MaxThreshold) { ret = true; } else { // printf("WARNING:Hitting MaxThreshold:%d\n", curSize); //printf("WARNING:Losing the Packet------------!!!!\n"); //TPRINTA_FATAL("WARNING:Hitting MaxThreshold : %d.Losing packets\n", curSize); } m_DataQueue.push(rMsg); //m_Lock.Thread_UnLock(); m_NotifyInQue->SetEvent(); return ret; }; size_t size() { //m_Lock.Thread_Lock(); CScopeLock lock(m_Lock); DWORD size = (DWORD)m_DataQueue.size(); //m_Lock.Thread_UnLock(); return size; }; T& front() { return m_DataQueue.front(); }; DWORD WaitForInQue(uint32_t timeout) { DWORD ret = WAIT_TIMEOUT; //m_Lock.Thread_Lock(); CScopeLock lock(m_Lock); if (m_DataQueue.size() > 0) { ret = 0; } //m_Lock.Thread_UnLock(); if (ret == WAIT_TIMEOUT) { ret = m_NotifyInQue->Wait(timeout); if (ret == WAIT_TIMEOUT) { return ret; } //check again //make sure data is in the que //m_Lock.Thread_Lock(); if (m_DataQueue.size() > 0) { ret = 0; } //m_Lock.Thread_UnLock(); } return ret; }; };