RawCircleBuff.cpp 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. #include <cstring>
  2. #include <algorithm>
  3. #include "RawCircleBuff.h"
  4. #ifndef SCF_MAX_LIMITED_BUFF_SIZE
  5. #define SCF_MAX_LIMITED_BUFF_SIZE (32 * 1024 * 1024) // 32MB
  6. #endif
  7. CircleBuff::CircleBuff()
  8. {
  9. m_TotalSize = 0;
  10. m_UsedSize = 0;
  11. m_BasePos = 0;
  12. m_HeadPos = 0;
  13. m_Limit = SCF_MAX_LIMITED_BUFF_SIZE;
  14. m_pBuff = NULL;
  15. pthread_mutex_init(&m_Mutex, NULL);
  16. }
  17. CircleBuff::~CircleBuff()
  18. {
  19. if (m_pBuff) {
  20. delete[] m_pBuff;
  21. }
  22. pthread_mutex_destroy(&m_Mutex);
  23. }
  24. void CircleBuff::Lock() {
  25. pthread_mutex_lock(&m_Mutex);
  26. }
  27. void CircleBuff::Unlock() {
  28. pthread_mutex_unlock(&m_Mutex);
  29. }
  30. void CircleBuff::SetBuffMaxLimit(uint32_t limit)
  31. {
  32. Lock();
  33. m_Limit = limit;
  34. Unlock();
  35. }
  36. bool CircleBuff::ReSetBuffSize(uint32_t Size)
  37. {
  38. Lock(); // 开头加锁
  39. bool ret = false; // 统一返回标志
  40. char* pOldBuff = m_pBuff; // 提前保存旧缓冲区(避免异常后丢失)
  41. do { // 用do-while实现“一键break退出,确保解锁”
  42. // 1. 容量检查:若当前容量足够,直接返回
  43. if (m_pBuff && m_TotalSize > Size) {
  44. ret = true;
  45. break;
  46. }
  47. // 2. 超过上限,返回失败
  48. if (Size > m_Limit) {
  49. ret = false;
  50. break;
  51. }
  52. // 3. 用nothrow new分配内存(失败返回NULL,不抛异常)
  53. char* pNewBuff = new (std::nothrow) char[Size];
  54. if (pNewBuff == NULL) {
  55. std::cerr << "ReSetBuffSize: new failed (Size=" << Size << ")" << std::endl;
  56. ret = false;
  57. break;
  58. }
  59. // 4. 初始化新缓冲区参数
  60. uint32_t OldTotalSize = m_TotalSize;
  61. m_pBuff = pNewBuff;
  62. m_TotalSize = Size;
  63. m_UsedSize = 0;
  64. m_BasePos = 0;
  65. m_HeadPos = 0;
  66. // 5. 处理旧数据(仅当旧缓冲区存在时)
  67. if (pOldBuff != NULL) {
  68. Packet_CB pcb;
  69. MsgQueue<Packet_CB> OldPacketQue;
  70. // 转移旧包队列
  71. while (m_PacketQue.DeQueue(pcb)) {
  72. OldPacketQue.InQueue(pcb);
  73. }
  74. // 重新写入旧数据
  75. while (OldPacketQue.DeQueue(pcb)) {
  76. uint32_t offset = m_BasePos;
  77. uint32_t copied = 0;
  78. if ((pcb.m_Offset + pcb.m_PacketSize) > OldTotalSize) {
  79. copied += PushBlock(&pOldBuff[pcb.m_Offset], OldTotalSize - pcb.m_Offset);
  80. copied += PushBlock(&pOldBuff[0], pcb.m_PacketSize - (OldTotalSize - pcb.m_Offset));
  81. }
  82. else {
  83. copied += PushBlock(&pOldBuff[pcb.m_Offset], pcb.m_PacketSize);
  84. }
  85. // 入队新包(offset已更新为新缓冲区的位置)
  86. Packet_CB newPcb(offset, pcb.m_PacketSize);
  87. m_PacketQue.InQueue(newPcb);
  88. }
  89. delete[] pOldBuff; // 释放旧缓冲区
  90. }
  91. ret = true; // 所有步骤成功,返回true
  92. } while (false);
  93. Unlock(); // 关键:无论成功/失败,都解锁!
  94. return ret;
  95. }
  96. uint32_t CircleBuff::PushBlock(const char* pContext, uint32_t Size)
  97. {
  98. if (m_pBuff == NULL || pContext == NULL || Size == 0) {
  99. return 0;
  100. }
  101. if (m_TotalSize < Size + m_UsedSize) {
  102. return 0;
  103. }
  104. uint32_t copied = 0;
  105. uint32_t offset = m_HeadPos;
  106. if (m_HeadPos >= m_BasePos) {
  107. copied = std::min(m_TotalSize - m_HeadPos, Size);
  108. memcpy(&m_pBuff[m_HeadPos], pContext, copied);
  109. if (copied == Size) {
  110. m_HeadPos += copied;
  111. if (m_HeadPos == m_TotalSize) {
  112. m_HeadPos = 0;
  113. }
  114. }
  115. else {
  116. memcpy(&m_pBuff[0], &pContext[copied], Size - copied);
  117. m_HeadPos = Size - copied;
  118. }
  119. }
  120. else {
  121. memcpy(&m_pBuff[m_HeadPos], pContext, Size);
  122. m_HeadPos += Size;
  123. }
  124. m_UsedSize += Size;
  125. return Size;
  126. }
  127. uint32_t CircleBuff::Push(const char* pContext, uint32_t Size)
  128. {
  129. if (Size == 0 || pContext == NULL) {
  130. // 用cerr + flush确保即时输出
  131. std::cerr << "Push: invalid input (Size=0 or pContext=NULL)" << std::endl;
  132. return 0;
  133. }
  134. // 所有日志输出后加flush,强制刷新缓冲区
  135. std::cerr << "Push: request Size=" << Size << ", pre-Lock UsedSize=" << m_UsedSize << ", TotalSize=" << m_TotalSize << std::endl;
  136. Lock();
  137. std::cerr << "Push: post-Lock UsedSize=" << m_UsedSize << ", TotalSize=" << m_TotalSize << std::endl;
  138. uint32_t offset = m_HeadPos;
  139. uint32_t copied = PushBlock(pContext, Size);
  140. // 将cout改为cerr(无缓冲),并加flush
  141. std::cerr << "PushBlock copied: " << copied << ", UsedSize: " << m_UsedSize << ", TotalSize: " << m_TotalSize << std::endl;
  142. if (copied == 0) {
  143. uint32_t newSize = m_TotalSize + std::max(Size + 4096, 1024 * 1024u);
  144. if (newSize > m_Limit) newSize = m_Limit;
  145. if (newSize <= m_TotalSize || !ReSetBuffSize(newSize)) {
  146. Unlock();
  147. return 0;
  148. }
  149. copied = PushBlock(pContext, Size);
  150. }
  151. // 同样改为cerr + flush
  152. std::cerr << "After ReSetBuffSize copied: " << copied << ", UsedSize: " << m_UsedSize << ", TotalSize: " << m_TotalSize << std::endl;
  153. if (copied == Size) {
  154. Packet_CB packet(offset, Size);
  155. m_PacketQue.InQueue(packet);
  156. }
  157. Unlock();
  158. std::cerr << "Push copied: " << copied << ", UsedSize: " << m_UsedSize << ", TotalSize: " << m_TotalSize << std::endl;
  159. return copied;
  160. }
  161. int CircleBuff::Pop(char* pContext, uint32_t ContextSize)
  162. {
  163. uint32_t size = 0;
  164. Packet_CB pack;
  165. Lock();
  166. if (m_PacketQue.DeQueue(pack)) {
  167. if (ContextSize < pack.m_PacketSize) {
  168. m_PacketQue.InQueue(pack);
  169. Unlock();
  170. return LOW_REQUEST_SIZE;
  171. }
  172. uint32_t copied = std::min(m_TotalSize - pack.m_Offset, pack.m_PacketSize);
  173. memcpy(pContext, &m_pBuff[pack.m_Offset], copied);
  174. m_BasePos += copied;
  175. if (m_BasePos == m_TotalSize) {
  176. m_BasePos = 0;
  177. }
  178. if (copied < pack.m_PacketSize) {
  179. memcpy(&pContext[copied], &m_pBuff[0], pack.m_PacketSize - copied);
  180. m_BasePos = pack.m_PacketSize - copied;
  181. }
  182. m_UsedSize -= pack.m_PacketSize;
  183. size = pack.m_PacketSize;
  184. }
  185. Unlock();
  186. return size;
  187. }
  188. uint32_t CircleBuff::GetPacketCount()
  189. {
  190. return m_PacketQue.size();
  191. }
  192. uint32_t CircleBuff::GetFrontPacketSize()
  193. {
  194. uint32_t size = 0;
  195. Lock();
  196. if (m_PacketQue.size() > 0) {
  197. Packet_CB pack = m_PacketQue.front();
  198. size = pack.m_PacketSize;
  199. }
  200. Unlock();
  201. return size;
  202. }