123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- #pragma once
- #include "ScopeLock.h"
- #include "Definitions.h"
- #include "LinuxEvent.h"
- #include <queue>
- #include <pthread.h>
- #include <ctime>
- #include <cerrno>
- #ifndef INFINITE
- #define INFINITE 0xFFFFFFFF
- #endif
- #ifndef WAIT_OBJECT_0
- #define WAIT_OBJECT_0 0
- #endif
- #ifndef WAIT_TIMEOUT
- #define WAIT_TIMEOUT 258
- #endif
- template<typename T> class MsgQueue
- {
- uint32_t m_MaxThreshold;
- uint32_t m_CurThreshold;
- uint32_t m_OwnerTid;
- CCriticalSec m_Lock;
- std::shared_ptr<LinuxEvent> m_NotifyInQue;
- std::queue<T> m_DataQueue;
- public:
- MsgQueue()
- {
- m_NotifyInQue = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET, false);
- m_CurThreshold = 0;
- m_MaxThreshold = 2500000;
- m_OwnerTid = 0;
- };
- ~MsgQueue()
- {
- Clear();
- };
- bool Clear()
- {
- CScopeLock lock(m_Lock);
- m_NotifyInQue->ResetEvent();
- m_OwnerTid = pthread_self();
- while (!m_DataQueue.empty()) {
- m_DataQueue.pop();
- }
- return true;
- };
- std::shared_ptr<LinuxEvent> GetNotifyHandle()
- {
- return m_NotifyInQue;
- }
- bool DeQueue(T& rMsg)
- {
- bool ret = false;
- //m_Lock.Thread_Lock();
- CScopeLock lock(m_Lock);
- //DebugPrint("before deQue size:%d\n",m_DataQueue.size());
- if (m_DataQueue.size() > 0)
- {
- rMsg = m_DataQueue.front();
- m_DataQueue.pop();
- ret = true;
- }
- //DebugPrint("after deQue size:%d\n",m_DataQueue.size());
- if (m_DataQueue.size() == 0)
- {
- m_NotifyInQue->ResetEvent();
- }
- //m_Lock.Thread_UnLock();
- return ret;
- };
- MsgQueue& operator = (const MsgQueue& tValue)
- {
- //m_Lock.Thread_Lock();
- //tValue.m_Lock.Thread_Lock();
- CScopeLock lock(m_Lock);
- m_DataQueue = tValue.m_DataQueue;
- //m_Lock.Thread_UnLock();
- //tValue.m_Lock.Thread_UnLock();
- return (*this);
- };
- bool InQueue(T& rMsg)
- {
- bool ret = false;
- CScopeLock lock(m_Lock);
- DWORD curSize = (DWORD)m_DataQueue.size();
- //printf("Thread:%d,Pop REQ one\n", GetCurrentThreadId());
- if (curSize < m_MaxThreshold)
- {
- ret = true;
- }
- else
- {
- // printf("WARNING:Hitting MaxThreshold:%d\n", curSize);
- //printf("WARNING:Losing the Packet------------!!!!\n");
- //TPRINTA_FATAL("WARNING:Hitting MaxThreshold : %d.Losing packets\n", curSize);
- }
- m_DataQueue.push(rMsg);
- //m_Lock.Thread_UnLock();
- m_NotifyInQue->SetEvent();
- return ret;
- };
- size_t size()
- {
- //m_Lock.Thread_Lock();
- CScopeLock lock(m_Lock);
- DWORD size = (DWORD)m_DataQueue.size();
- //m_Lock.Thread_UnLock();
- return size;
- };
- T& front()
- {
- return m_DataQueue.front();
- };
- DWORD WaitForInQue(uint32_t timeout)
- {
- DWORD ret = WAIT_TIMEOUT;
- //m_Lock.Thread_Lock();
- CScopeLock lock(m_Lock);
- if (m_DataQueue.size() > 0)
- {
- ret = 0;
- }
- //m_Lock.Thread_UnLock();
- if (ret == WAIT_TIMEOUT)
- {
- ret = m_NotifyInQue->Wait(timeout);
- if (ret == WAIT_TIMEOUT)
- {
- return ret;
- }
- //check again
- //make sure data is in the que
- //m_Lock.Thread_Lock();
- if (m_DataQueue.size() > 0)
- {
- ret = 0;
- }
- //m_Lock.Thread_UnLock();
- }
- return ret;
- };
- };
|