RawCircleBuff.cpp 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. #include <cstring>
  2. #include <algorithm>
  3. #include "RawCircleBuff.h"
  4. #ifndef SCF_MAX_LIMITED_BUFF_SIZE
  5. #define SCF_MAX_LIMITED_BUFF_SIZE (32 * 1024 * 1024) // 32MB
  6. #endif
  7. CircleBuff::CircleBuff()
  8. {
  9. m_TotalSize = 0;
  10. m_UsedSize = 0;
  11. m_BasePos = 0;
  12. m_HeadPos = 0;
  13. m_Limit = SCF_MAX_LIMITED_BUFF_SIZE;
  14. m_pBuff = NULL;
  15. pthread_mutex_init(&m_Mutex, NULL);
  16. }
  17. CircleBuff::~CircleBuff()
  18. {
  19. if (m_pBuff) {
  20. delete[] m_pBuff;
  21. }
  22. pthread_mutex_destroy(&m_Mutex);
  23. }
  24. void CircleBuff::Lock() {
  25. pthread_mutex_lock(&m_Mutex);
  26. }
  27. void CircleBuff::Unlock() {
  28. pthread_mutex_unlock(&m_Mutex);
  29. }
  30. void CircleBuff::SetBuffMaxLimit(uint32_t limit)
  31. {
  32. Lock();
  33. m_Limit = limit;
  34. Unlock();
  35. }
  36. bool CircleBuff::ReSetBuffSize(uint32_t Size)
  37. {
  38. Lock();
  39. if (m_pBuff && m_TotalSize > Size) {
  40. Unlock();
  41. return true;
  42. }
  43. if (Size > m_Limit) {
  44. Unlock();
  45. return false;
  46. }
  47. char* pOldBuff = m_pBuff;
  48. MsgQueue<Packet_CB> OldPacketQue;
  49. m_pBuff = new char[Size];
  50. if (m_pBuff != NULL) {
  51. uint32_t OldTotalSize = m_TotalSize;
  52. m_TotalSize = Size;
  53. m_UsedSize = 0;
  54. m_BasePos = 0;
  55. m_HeadPos = 0;
  56. if (pOldBuff) {
  57. Packet_CB pcb;
  58. while (m_PacketQue.DeQueue(pcb)) {
  59. OldPacketQue.InQueue(pcb);
  60. }
  61. while (OldPacketQue.DeQueue(pcb)) {
  62. uint32_t copied = 0;
  63. uint32_t offset = m_BasePos;
  64. if ((pcb.m_Offset + pcb.m_PacketSize) > OldTotalSize) {
  65. copied += PushBlock(&pOldBuff[pcb.m_Offset], OldTotalSize - pcb.m_Offset);
  66. copied += PushBlock(&pOldBuff[0], pcb.m_PacketSize - (OldTotalSize - pcb.m_Offset));
  67. }
  68. else {
  69. copied += PushBlock(&pOldBuff[pcb.m_Offset], pcb.m_PacketSize);
  70. }
  71. Packet_CB packet(offset, pcb.m_PacketSize);
  72. m_PacketQue.InQueue(packet);
  73. }
  74. delete[] pOldBuff;
  75. }
  76. Unlock();
  77. return true;
  78. }
  79. m_TotalSize = 0;
  80. Unlock();
  81. return false;
  82. }
  83. uint32_t CircleBuff::PushBlock(const char* pContext, uint32_t Size)
  84. {
  85. if (m_TotalSize < Size + m_UsedSize) {
  86. return 0;
  87. }
  88. uint32_t copied = 0;
  89. uint32_t offset = m_HeadPos;
  90. if (m_HeadPos >= m_BasePos) {
  91. copied = std::min(m_TotalSize - m_HeadPos, Size);
  92. memcpy(&m_pBuff[m_HeadPos], pContext, copied);
  93. if (copied == Size) {
  94. m_HeadPos += copied;
  95. if (m_HeadPos == m_TotalSize) {
  96. m_HeadPos = 0;
  97. }
  98. }
  99. else {
  100. memcpy(&m_pBuff[0], &pContext[copied], Size - copied);
  101. m_HeadPos = Size - copied;
  102. }
  103. }
  104. else {
  105. memcpy(&m_pBuff[m_HeadPos], pContext, Size);
  106. m_HeadPos += Size;
  107. }
  108. m_UsedSize += Size;
  109. return Size;
  110. }
  111. uint32_t CircleBuff::Push(const char* pContext, uint32_t Size)
  112. {
  113. if (Size == 0) {
  114. return 0;
  115. }
  116. Lock();
  117. uint32_t offset = m_HeadPos;
  118. uint32_t copied = PushBlock(pContext, Size);
  119. if (copied == 0) {
  120. uint32_t newSize = m_TotalSize + std::max(Size + 4096, 1024 * 1024u);
  121. if (newSize > m_Limit) newSize = m_Limit;
  122. if (newSize <= m_TotalSize || !ReSetBuffSize(newSize)) {
  123. Unlock();
  124. return 0;
  125. }
  126. copied = PushBlock(pContext, Size);
  127. }
  128. if (copied == Size) {
  129. Packet_CB packet(offset, Size);
  130. m_PacketQue.InQueue(packet);
  131. }
  132. Unlock();
  133. return copied;
  134. }
  135. int CircleBuff::Pop(char* pContext, uint32_t ContextSize)
  136. {
  137. uint32_t size = 0;
  138. Packet_CB pack;
  139. Lock();
  140. if (m_PacketQue.DeQueue(pack)) {
  141. if (ContextSize < pack.m_PacketSize) {
  142. m_PacketQue.InQueue(pack);
  143. Unlock();
  144. return LOW_REQUEST_SIZE;
  145. }
  146. uint32_t copied = std::min(m_TotalSize - pack.m_Offset, pack.m_PacketSize);
  147. memcpy(pContext, &m_pBuff[pack.m_Offset], copied);
  148. m_BasePos += copied;
  149. if (m_BasePos == m_TotalSize) {
  150. m_BasePos = 0;
  151. }
  152. if (copied < pack.m_PacketSize) {
  153. memcpy(&pContext[copied], &m_pBuff[0], pack.m_PacketSize - copied);
  154. m_BasePos = pack.m_PacketSize - copied;
  155. }
  156. m_UsedSize -= pack.m_PacketSize;
  157. size = pack.m_PacketSize;
  158. }
  159. Unlock();
  160. return size;
  161. }
  162. uint32_t CircleBuff::GetPacketCount()
  163. {
  164. return m_PacketQue.size();
  165. }
  166. uint32_t CircleBuff::GetFrontPacketSize()
  167. {
  168. uint32_t size = 0;
  169. Lock();
  170. if (m_PacketQue.size() > 0) {
  171. Packet_CB pack = m_PacketQue.front();
  172. size = pack.m_PacketSize;
  173. }
  174. Unlock();
  175. return size;
  176. }