123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- #pragma once
- //#include "Logger.h"
- #include "CcosLock.h"
- #include <queue>
- #include <memory>
- #include "Definitions.h"
- #include "LinuxEvent.h"
- using namespace std;
- template<typename T> class MsgCircle
- {
- DWORD m_MaxThreshold;
- DWORD m_CurThreshold;
- CcosLock m_Lock;
- std::shared_ptr<LinuxEvent> m_NotifyInQue;
- deque<T> m_DataQueue;
- public:
- MsgCircle(void)
- {
- m_CurThreshold = 0;
- m_MaxThreshold = 2500000;
- m_NotifyInQue = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET,false);
- };
- ~MsgCircle(void)
- {
- Clear();
- };
- bool Clear()
- {
- m_NotifyInQue->ResetEvent();
- Lock();
- m_DataQueue.clear();
- UnLock();
- return true;
- };
- bool Peek(T &rMsg)
- {
- bool ret = false;
- Lock();
- //DebugPrint("before deQue size:%d\n",m_DataQueue.size());
- if (m_DataQueue.size() > 0)
- {
- rMsg = m_DataQueue.front();
- ret = true;
- }
- //DebugPrint("after deQue size:%d\n",m_DataQueue.size());
- UnLock();
- return ret;
- };
- bool DeQueue(T &rMsg)
- {
- bool ret = false;
- Lock();
- //DebugPrint("before deQue size:%d\n",m_DataQueue.size());
- if(m_DataQueue.size() > 0)
- {
- rMsg = m_DataQueue.front();
- m_DataQueue.pop_front();
- ret = true;
- }
- //DebugPrint("after deQue size:%d\n",m_DataQueue.size());
- if (m_DataQueue.size() == 0)
- {
- m_NotifyInQue->ResetEvent();
- }
- UnLock();
- return ret;
- };
- MsgCircle& operator = (const MsgCircle &tValue)
- {
- if(this != &tValue)
- {
- Lock();
- tValue.Lock();
-
- m_DataQueue = tValue.m_DataQueue;
-
- UnLock();
- tValue.UnLock();
-
- }
- return (*this);
- };
- void Lock()
- {
- m_Lock.Thread_Lock();
- };
- void UnLock()
- {
- m_Lock.Thread_UnLock();
- };
- bool InQueue(T &rMsg)
- {
- bool ret = false;
- Lock();
- DWORD curSize = (DWORD)m_DataQueue.size();
- #ifdef _DEBUG
- if (curSize > m_CurThreshold)
- {
- printf("Raising The Threshold:%d\n", curSize);
- m_CurThreshold = curSize;
- }
- #endif
- //printf("Thread:%d,Pop REQ one\n", GetCurrentThreadId());
- if (curSize < m_MaxThreshold)
- {
- ret = true;
- m_DataQueue.push_back(rMsg);
- }
- else
- {
- printf("WARNING:Hitting MaxThreshold:%d\n", curSize);
- printf("WARNING:Losing the Packet------------!!!!\n");
- //TPRINTA_FATAL("WARNING:Hitting MaxThreshold : %d.Losing packets\n", curSize);
- }
- UnLock();
- m_NotifyInQue->SetEvent();
- return ret;
- };
- DWORD size()
- {
- Lock();
- DWORD size = (DWORD)m_DataQueue.size();
- UnLock();
- return size;
- };
- T& front()
- {
- return m_DataQueue.front();
- };
-
- std::shared_ptr<LinuxEvent> GetNotifyHandle()
- {
- return m_NotifyInQue;
- };
-
- //must use with lock&unlock method
- T& operator [] (DWORD idx)
- {
- if(idx > m_DataQueue.size())
- {
- assert(0);
- }
- return m_DataQueue[idx];
- };
-
- DWORD WaitForInQue(DWORD timeout)
- {
- DWORD ret = WAIT_TIMEOUT;
- Lock();
- if (m_DataQueue.size() > 0)
- {
- ret = WAIT_OBJECT_0;
- }
- UnLock();
- if (ret == WAIT_TIMEOUT)
- {
- ret = m_NotifyInQue->Wait(timeout);
- if (ret == WAIT_TIMEOUT)
- {
- return ret;
- }
- //check again
- //make sure data is in the que
- Lock();
- if (m_DataQueue.size() > 0)
- {
- ret = WAIT_OBJECT_0;
- }
- UnLock();
- }
- return ret;
- };
- };
|