#include "stdafx.h" #include "Definitions.h" #include "RawCircleBuff.h" CircleBuff::CircleBuff() { m_TotalSize = 0; m_UsedSize = 0; m_BasePos = 0; m_HeadPos = 0; m_Limit = SCF_MAX_LIMITED_BUFF_SIZE; m_pBuff = NULL; } CircleBuff::~CircleBuff() { if (m_pBuff) { delete[]m_pBuff; } m_pBuff = NULL; } void CircleBuff::SetBuffMaxLimit(DWORD limit) { m_Limit = limit; } bool CircleBuff::ReSetBuffSize(DWORD Size) { if (m_pBuff && m_TotalSize > Size) { return true; } if (Size > m_Limit) { return false; } char *pOldBuff = m_pBuff; MsgQueue OldPacketQue; m_pBuff = new char[Size]; if (m_pBuff != NULL) { DWORD OldTotalSize = m_TotalSize; //init m_TotalSize = Size; m_UsedSize = 0; m_BasePos = 0; m_HeadPos = 0; if (pOldBuff) { //backup the queue Packet_CB pcb; while (m_PacketQue.DeQueue(pcb)) { OldPacketQue.InQueue(pcb); } //do the copy while (OldPacketQue.DeQueue(pcb)) { DWORD copied = 0; DWORD offset = m_BasePos; //copy old to new one if ((pcb.m_Offset + pcb.m_PacketSize) > OldTotalSize) { //two block //first block copied += PushBlock(&pOldBuff[pcb.m_Offset], OldTotalSize - pcb.m_Offset); //second block copied += PushBlock(&pOldBuff[0], pcb.m_PacketSize - (OldTotalSize - pcb.m_Offset)); } else { copied += PushBlock(&pOldBuff[pcb.m_Offset], pcb.m_PacketSize); } // Packet_CB packet(offset, pcb.m_PacketSize); m_PacketQue.InQueue(packet); } } return true; } m_TotalSize = 0; return false; } DWORD CircleBuff::PushBlock(const char *pContext, DWORD Size) { DWORD copied = 0; if (m_TotalSize > (Size + m_UsedSize)) { DWORD offset = m_HeadPos; if (m_HeadPos >= m_BasePos) { copied = min((m_TotalSize - m_HeadPos), Size); memcpy(&m_pBuff[m_HeadPos], pContext, copied); if (copied == Size) { //done here m_HeadPos += copied; if (m_HeadPos == m_TotalSize) { m_HeadPos = 0; } } else { //it must be overloaded memcpy(&m_pBuff[0], &pContext[copied], Size - copied); m_HeadPos = Size - copied; } } else { memcpy(&m_pBuff[m_HeadPos], pContext, Size); m_HeadPos += Size; } m_UsedSize += Size; copied = Size; } return copied; } DWORD CircleBuff::Push(const char *pContext, DWORD Size) { DWORD copied = 0; if (Size == 0) { //wtf?? return 0; } Thread_Lock(); DWORD offset = m_HeadPos; copied = PushBlock(pContext, Size); if (copied == 0 && Size != 0) { //reset & retry if (ReSetBuffSize(m_TotalSize + max(Size + 4096, 1024 * 1024))) { //succeed copied = PushBlock(pContext, Size); } } if (copied == Size) { copied = Size; Packet_CB packet(offset, Size); m_PacketQue.InQueue(packet); } Thread_UnLock(); return copied; } int CircleBuff::Pop(char *pContext, DWORD ContextSize) { DWORD size = 0; Packet_CB pack; Thread_Lock();//lock start here if (m_PacketQue.DeQueue(pack)) { //got a pack if (ContextSize < pack.m_PacketSize || ContextSize == 0) { m_PacketQue.InQueue(pack); Thread_UnLock();//lock end here return LOW_REQUEST_SIZE; } DWORD copied = min((m_TotalSize - pack.m_Offset), pack.m_PacketSize); memcpy(pContext, &(m_pBuff[pack.m_Offset]), copied); m_BasePos += copied; if (m_BasePos == m_TotalSize) { m_BasePos = 0; } if (copied < pack.m_PacketSize) { memcpy(&(pContext[copied]), &(m_pBuff[0]), pack.m_PacketSize - copied); m_BasePos = pack.m_PacketSize - copied; } m_UsedSize -= pack.m_PacketSize; size = pack.m_PacketSize; } Thread_UnLock();//lock end here return size; } DWORD CircleBuff::GetPacketCount() { return m_PacketQue.size(); } DWORD CircleBuff::GetFrontPacketSize() { DWORD size = 0; Thread_Lock();//lock start here if (m_PacketQue.size() > 0) { Packet_CB pack = m_PacketQue.front(); size = pack.m_PacketSize; } Thread_UnLock();//lock end here return size; } HANDLE CircleBuff::GetNotifyHandle() { return m_PacketQue.GetNotifyHandle(); }