#include #include #include "RawCircleBuff.h" #ifndef SCF_MAX_LIMITED_BUFF_SIZE #define SCF_MAX_LIMITED_BUFF_SIZE (32 * 1024 * 1024) // 32MB #endif CircleBuff::CircleBuff() { m_TotalSize = 0; m_UsedSize = 0; m_BasePos = 0; m_HeadPos = 0; m_Limit = SCF_MAX_LIMITED_BUFF_SIZE; m_pBuff = NULL; pthread_mutex_init(&m_Mutex, NULL); } CircleBuff::~CircleBuff() { if (m_pBuff) { delete[] m_pBuff; } pthread_mutex_destroy(&m_Mutex); } void CircleBuff::Lock() { pthread_mutex_lock(&m_Mutex); } void CircleBuff::Unlock() { pthread_mutex_unlock(&m_Mutex); } void CircleBuff::SetBuffMaxLimit(uint32_t limit) { Lock(); m_Limit = limit; Unlock(); } bool CircleBuff::ReSetBuffSize(uint32_t Size) { Lock(); if (m_pBuff && m_TotalSize > Size) { Unlock(); return true; } if (Size > m_Limit) { Unlock(); return false; } char* pOldBuff = m_pBuff; MsgQueue OldPacketQue; m_pBuff = new char[Size]; if (m_pBuff != NULL) { uint32_t OldTotalSize = m_TotalSize; m_TotalSize = Size; m_UsedSize = 0; m_BasePos = 0; m_HeadPos = 0; if (pOldBuff) { Packet_CB pcb; while (m_PacketQue.DeQueue(pcb)) { OldPacketQue.InQueue(pcb); } while (OldPacketQue.DeQueue(pcb)) { uint32_t copied = 0; uint32_t offset = m_BasePos; if ((pcb.m_Offset + pcb.m_PacketSize) > OldTotalSize) { copied += PushBlock(&pOldBuff[pcb.m_Offset], OldTotalSize - pcb.m_Offset); copied += PushBlock(&pOldBuff[0], pcb.m_PacketSize - (OldTotalSize - pcb.m_Offset)); } else { copied += PushBlock(&pOldBuff[pcb.m_Offset], pcb.m_PacketSize); } Packet_CB packet(offset, pcb.m_PacketSize); m_PacketQue.InQueue(packet); } delete[] pOldBuff; } Unlock(); return true; } m_TotalSize = 0; Unlock(); return false; } uint32_t CircleBuff::PushBlock(const char* pContext, uint32_t Size) { if (m_TotalSize < Size + m_UsedSize) { return 0; } uint32_t copied = 0; uint32_t offset = m_HeadPos; if (m_HeadPos >= m_BasePos) { copied = std::min(m_TotalSize - m_HeadPos, Size); memcpy(&m_pBuff[m_HeadPos], pContext, copied); if (copied == Size) { m_HeadPos += copied; if (m_HeadPos == m_TotalSize) { m_HeadPos = 0; } } else { memcpy(&m_pBuff[0], &pContext[copied], Size - copied); m_HeadPos = Size - copied; } } else { memcpy(&m_pBuff[m_HeadPos], pContext, Size); m_HeadPos += Size; } m_UsedSize += Size; return Size; } uint32_t CircleBuff::Push(const char* pContext, uint32_t Size) { if (Size == 0) { return 0; } Lock(); uint32_t offset = m_HeadPos; uint32_t copied = PushBlock(pContext, Size); if (copied == 0) { uint32_t newSize = m_TotalSize + std::max(Size + 4096, 1024 * 1024u); if (newSize > m_Limit) newSize = m_Limit; if (newSize <= m_TotalSize || !ReSetBuffSize(newSize)) { Unlock(); return 0; } copied = PushBlock(pContext, Size); } if (copied == Size) { Packet_CB packet(offset, Size); m_PacketQue.InQueue(packet); } Unlock(); return copied; } int CircleBuff::Pop(char* pContext, uint32_t ContextSize) { uint32_t size = 0; Packet_CB pack; Lock(); if (m_PacketQue.DeQueue(pack)) { if (ContextSize < pack.m_PacketSize) { m_PacketQue.InQueue(pack); Unlock(); return LOW_REQUEST_SIZE; } uint32_t copied = std::min(m_TotalSize - pack.m_Offset, pack.m_PacketSize); memcpy(pContext, &m_pBuff[pack.m_Offset], copied); m_BasePos += copied; if (m_BasePos == m_TotalSize) { m_BasePos = 0; } if (copied < pack.m_PacketSize) { memcpy(&pContext[copied], &m_pBuff[0], pack.m_PacketSize - copied); m_BasePos = pack.m_PacketSize - copied; } m_UsedSize -= pack.m_PacketSize; size = pack.m_PacketSize; } Unlock(); return size; } uint32_t CircleBuff::GetPacketCount() { return m_PacketQue.size(); } uint32_t CircleBuff::GetFrontPacketSize() { uint32_t size = 0; Lock(); if (m_PacketQue.size() > 0) { Packet_CB pack = m_PacketQue.front(); size = pack.m_PacketSize; } Unlock(); return size; }