#include #include #include "RawCircleBuff.h" #ifndef SCF_MAX_LIMITED_BUFF_SIZE #define SCF_MAX_LIMITED_BUFF_SIZE (32 * 1024 * 1024) // 32MB #endif CircleBuff::CircleBuff() { m_TotalSize = 0; m_UsedSize = 0; m_BasePos = 0; m_HeadPos = 0; m_Limit = SCF_MAX_LIMITED_BUFF_SIZE; m_pBuff = NULL; pthread_mutex_init(&m_Mutex, NULL); } CircleBuff::~CircleBuff() { if (m_pBuff) { delete[] m_pBuff; } pthread_mutex_destroy(&m_Mutex); } void CircleBuff::Lock() { pthread_mutex_lock(&m_Mutex); } void CircleBuff::Unlock() { pthread_mutex_unlock(&m_Mutex); } void CircleBuff::SetBuffMaxLimit(uint32_t limit) { Lock(); m_Limit = limit; Unlock(); } bool CircleBuff::ReSetBuffSize(uint32_t Size) { Lock(); // 开头加锁 bool ret = false; // 统一返回标志 char* pOldBuff = m_pBuff; // 提前保存旧缓冲区(避免异常后丢失) do { // 用do-while实现“一键break退出,确保解锁” // 1. 容量检查:若当前容量足够,直接返回 if (m_pBuff && m_TotalSize > Size) { ret = true; break; } // 2. 超过上限,返回失败 if (Size > m_Limit) { ret = false; break; } // 3. 用nothrow new分配内存(失败返回NULL,不抛异常) char* pNewBuff = new (std::nothrow) char[Size]; if (pNewBuff == NULL) { std::cerr << "ReSetBuffSize: new failed (Size=" << Size << ")" << std::endl; ret = false; break; } // 4. 初始化新缓冲区参数 uint32_t OldTotalSize = m_TotalSize; m_pBuff = pNewBuff; m_TotalSize = Size; m_UsedSize = 0; m_BasePos = 0; m_HeadPos = 0; // 5. 处理旧数据(仅当旧缓冲区存在时) if (pOldBuff != NULL) { Packet_CB pcb; MsgQueue OldPacketQue; // 转移旧包队列 while (m_PacketQue.DeQueue(pcb)) { OldPacketQue.InQueue(pcb); } // 重新写入旧数据 while (OldPacketQue.DeQueue(pcb)) { uint32_t offset = m_BasePos; uint32_t copied = 0; if ((pcb.m_Offset + pcb.m_PacketSize) > OldTotalSize) { copied += PushBlock(&pOldBuff[pcb.m_Offset], OldTotalSize - pcb.m_Offset); copied += PushBlock(&pOldBuff[0], pcb.m_PacketSize - (OldTotalSize - pcb.m_Offset)); } else { copied += PushBlock(&pOldBuff[pcb.m_Offset], pcb.m_PacketSize); } // 入队新包(offset已更新为新缓冲区的位置) Packet_CB newPcb(offset, pcb.m_PacketSize); m_PacketQue.InQueue(newPcb); } delete[] pOldBuff; // 释放旧缓冲区 } ret = true; // 所有步骤成功,返回true } while (false); Unlock(); // 关键:无论成功/失败,都解锁! return ret; } uint32_t CircleBuff::PushBlock(const char* pContext, uint32_t Size) { if (m_pBuff == NULL || pContext == NULL || Size == 0) { return 0; } if (m_TotalSize < Size + m_UsedSize) { return 0; } uint32_t copied = 0; uint32_t offset = m_HeadPos; if (m_HeadPos >= m_BasePos) { copied = std::min(m_TotalSize - m_HeadPos, Size); memcpy(&m_pBuff[m_HeadPos], pContext, copied); if (copied == Size) { m_HeadPos += copied; if (m_HeadPos == m_TotalSize) { m_HeadPos = 0; } } else { memcpy(&m_pBuff[0], &pContext[copied], Size - copied); m_HeadPos = Size - copied; } } else { memcpy(&m_pBuff[m_HeadPos], pContext, Size); m_HeadPos += Size; } m_UsedSize += Size; return Size; } uint32_t CircleBuff::Push(const char* pContext, uint32_t Size) { if (Size == 0 || pContext == NULL) { // 用cerr + flush确保即时输出 std::cerr << "Push: invalid input (Size=0 or pContext=NULL)" << std::endl; return 0; } // 所有日志输出后加flush,强制刷新缓冲区 std::cerr << "Push: request Size=" << Size << ", pre-Lock UsedSize=" << m_UsedSize << ", TotalSize=" << m_TotalSize << std::endl; Lock(); std::cerr << "Push: post-Lock UsedSize=" << m_UsedSize << ", TotalSize=" << m_TotalSize << std::endl; uint32_t offset = m_HeadPos; uint32_t copied = PushBlock(pContext, Size); // 将cout改为cerr(无缓冲),并加flush std::cerr << "PushBlock copied: " << copied << ", UsedSize: " << m_UsedSize << ", TotalSize: " << m_TotalSize << std::endl; if (copied == 0) { uint32_t newSize = m_TotalSize + std::max(Size + 4096, 1024 * 1024u); if (newSize > m_Limit) newSize = m_Limit; if (newSize <= m_TotalSize || !ReSetBuffSize(newSize)) { Unlock(); return 0; } copied = PushBlock(pContext, Size); } // 同样改为cerr + flush std::cerr << "After ReSetBuffSize copied: " << copied << ", UsedSize: " << m_UsedSize << ", TotalSize: " << m_TotalSize << std::endl; if (copied == Size) { Packet_CB packet(offset, Size); m_PacketQue.InQueue(packet); } Unlock(); std::cerr << "Push copied: " << copied << ", UsedSize: " << m_UsedSize << ", TotalSize: " << m_TotalSize << std::endl; return copied; } int CircleBuff::Pop(char* pContext, uint32_t ContextSize) { uint32_t size = 0; Packet_CB pack; Lock(); if (m_PacketQue.DeQueue(pack)) { if (ContextSize < pack.m_PacketSize) { m_PacketQue.InQueue(pack); Unlock(); return LOW_REQUEST_SIZE; } uint32_t copied = std::min(m_TotalSize - pack.m_Offset, pack.m_PacketSize); memcpy(pContext, &m_pBuff[pack.m_Offset], copied); m_BasePos += copied; if (m_BasePos == m_TotalSize) { m_BasePos = 0; } if (copied < pack.m_PacketSize) { memcpy(&pContext[copied], &m_pBuff[0], pack.m_PacketSize - copied); m_BasePos = pack.m_PacketSize - copied; } m_UsedSize -= pack.m_PacketSize; size = pack.m_PacketSize; } Unlock(); return size; } uint32_t CircleBuff::GetPacketCount() { return m_PacketQue.size(); } uint32_t CircleBuff::GetFrontPacketSize() { uint32_t size = 0; Lock(); if (m_PacketQue.size() > 0) { Packet_CB pack = m_PacketQue.front(); size = pack.m_PacketSize; } Unlock(); return size; }