123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- #include <cstring>
- #include <algorithm>
- #include "RawCircleBuff.h"
- #ifndef SCF_MAX_LIMITED_BUFF_SIZE
- #define SCF_MAX_LIMITED_BUFF_SIZE (32 * 1024 * 1024) // 32MB
- #endif
- CircleBuff::CircleBuff()
- {
- m_TotalSize = 0;
- m_UsedSize = 0;
- m_BasePos = 0;
- m_HeadPos = 0;
- m_Limit = SCF_MAX_LIMITED_BUFF_SIZE;
- m_pBuff = NULL;
- pthread_mutex_init(&m_Mutex, NULL);
- }
- CircleBuff::~CircleBuff()
- {
- if (m_pBuff) {
- delete[] m_pBuff;
- }
- pthread_mutex_destroy(&m_Mutex);
- }
- void CircleBuff::Lock() {
- pthread_mutex_lock(&m_Mutex);
- }
- void CircleBuff::Unlock() {
- pthread_mutex_unlock(&m_Mutex);
- }
- void CircleBuff::SetBuffMaxLimit(uint32_t limit)
- {
- Lock();
- m_Limit = limit;
- Unlock();
- }
- bool CircleBuff::ReSetBuffSize(uint32_t Size)
- {
- Lock();
- if (m_pBuff && m_TotalSize > Size) {
- Unlock();
- return true;
- }
- if (Size > m_Limit) {
- Unlock();
- return false;
- }
- char* pOldBuff = m_pBuff;
- MsgQueue<Packet_CB> OldPacketQue;
- m_pBuff = new char[Size];
- if (m_pBuff != NULL) {
- uint32_t OldTotalSize = m_TotalSize;
- m_TotalSize = Size;
- m_UsedSize = 0;
- m_BasePos = 0;
- m_HeadPos = 0;
- if (pOldBuff) {
- Packet_CB pcb;
- while (m_PacketQue.DeQueue(pcb)) {
- OldPacketQue.InQueue(pcb);
- }
- while (OldPacketQue.DeQueue(pcb)) {
- uint32_t copied = 0;
- uint32_t offset = m_BasePos;
- if ((pcb.m_Offset + pcb.m_PacketSize) > OldTotalSize) {
- copied += PushBlock(&pOldBuff[pcb.m_Offset], OldTotalSize - pcb.m_Offset);
- copied += PushBlock(&pOldBuff[0], pcb.m_PacketSize - (OldTotalSize - pcb.m_Offset));
- }
- else {
- copied += PushBlock(&pOldBuff[pcb.m_Offset], pcb.m_PacketSize);
- }
- Packet_CB packet(offset, pcb.m_PacketSize);
- m_PacketQue.InQueue(packet);
- }
- delete[] pOldBuff;
- }
- Unlock();
- return true;
- }
- m_TotalSize = 0;
- Unlock();
- return false;
- }
- uint32_t CircleBuff::PushBlock(const char* pContext, uint32_t Size)
- {
- if (m_TotalSize < Size + m_UsedSize) {
- return 0;
- }
- uint32_t copied = 0;
- uint32_t offset = m_HeadPos;
- if (m_HeadPos >= m_BasePos) {
- copied = std::min(m_TotalSize - m_HeadPos, Size);
- memcpy(&m_pBuff[m_HeadPos], pContext, copied);
- if (copied == Size) {
- m_HeadPos += copied;
- if (m_HeadPos == m_TotalSize) {
- m_HeadPos = 0;
- }
- }
- else {
- memcpy(&m_pBuff[0], &pContext[copied], Size - copied);
- m_HeadPos = Size - copied;
- }
- }
- else {
- memcpy(&m_pBuff[m_HeadPos], pContext, Size);
- m_HeadPos += Size;
- }
- m_UsedSize += Size;
- return Size;
- }
- uint32_t CircleBuff::Push(const char* pContext, uint32_t Size)
- {
- if (Size == 0) {
- return 0;
- }
- Lock();
- uint32_t offset = m_HeadPos;
- uint32_t copied = PushBlock(pContext, Size);
- if (copied == 0) {
- uint32_t newSize = m_TotalSize + std::max(Size + 4096, 1024 * 1024u);
- if (newSize > m_Limit) newSize = m_Limit;
- if (newSize <= m_TotalSize || !ReSetBuffSize(newSize)) {
- Unlock();
- return 0;
- }
- copied = PushBlock(pContext, Size);
- }
- if (copied == Size) {
- Packet_CB packet(offset, Size);
- m_PacketQue.InQueue(packet);
- }
- Unlock();
- return copied;
- }
- int CircleBuff::Pop(char* pContext, uint32_t ContextSize)
- {
- uint32_t size = 0;
- Packet_CB pack;
- Lock();
- if (m_PacketQue.DeQueue(pack)) {
- if (ContextSize < pack.m_PacketSize) {
- m_PacketQue.InQueue(pack);
- Unlock();
- return LOW_REQUEST_SIZE;
- }
- uint32_t copied = std::min(m_TotalSize - pack.m_Offset, pack.m_PacketSize);
- memcpy(pContext, &m_pBuff[pack.m_Offset], copied);
- m_BasePos += copied;
- if (m_BasePos == m_TotalSize) {
- m_BasePos = 0;
- }
- if (copied < pack.m_PacketSize) {
- memcpy(&pContext[copied], &m_pBuff[0], pack.m_PacketSize - copied);
- m_BasePos = pack.m_PacketSize - copied;
- }
- m_UsedSize -= pack.m_PacketSize;
- size = pack.m_PacketSize;
- }
- Unlock();
- return size;
- }
- uint32_t CircleBuff::GetPacketCount()
- {
- return m_PacketQue.size();
- }
- uint32_t CircleBuff::GetFrontPacketSize()
- {
- uint32_t size = 0;
- Lock();
- if (m_PacketQue.size() > 0) {
- Packet_CB pack = m_PacketQue.front();
- size = pack.m_PacketSize;
- }
- Unlock();
- return size;
- }
|