P2PModule.cpp 24 KB


  1. #include "P2PModule.h"
  2. #include <sys/time.h>
  3. #include <sys/select.h>
  4. #include <netinet/tcp.h>
  5. #include <cstdio>
  6. #include "P2PModule.h"
  7. #include "DeviceBus.h"
  8. #include "common_api.h"
  9. //using namespace std;
  10. #define P2P_CHECK_PERIOD (2000)
  11. string FormatLinkError(int ErrorCode)
  12. {
  13. switch (ErrorCode)
  14. {
  15. case 7: return "sckOutOfMemory";// 7 内存不足
  16. case 380: return "sckInvalidPropertyValue";// 380 属性值无效
  17. case 394: return "sckGetNotSupported";// 394 属性不可读
  18. case 383: return "sckGetNotSupported";// 383 属性是只读的
  19. case 40006: return "sckBadState";// 40006 所请求的事务或请求本身的错误协议或者错误连接状态
  20. case 40014: return "sckInvalidArg";// 40014 传递给函数的参数格式不确定,或者不在指定范围内
  21. case 40017: return "sckSuccess";// 40017 成功
  22. case 40018: return "sckUnsupported";// 40018 不支持的变量类型
  23. case 40020: return "sckInvalidOp";// 40020 在当前状态下的无效操作
  24. case 40021: return "sckOutOfRange";// 40021 参数越界
  25. case 40026: return "sckWrongProtocol";// 40026 所请求的事务或请求本身的错误协议
  26. case 10004: return "sckOpCanceled";// 10004 取消操作
  27. case 10014: return "sckInvalidArgument";// 10014 所请求的地址是广播地址,但未设置标记
  28. case 10035: return "sckWouldBlock";// 10035 套接字不成块,而指定操作将使之成块
  29. case 10036: return "sckInProgress";// 10036 制造块的Winsock操作在进行之中
  30. case 10037: return "sckAlreadyComplete";// 10037 完成操作。未进行制作块的操作
  31. case 10038: return "sckNotSocket";// 10038 描述符不是套接字
  32. case 10040: return "sckMsgTooBig";// 10040 数据太大,不适于缓冲区的要求,因而被截断
  33. case 10043: return "sckPortNotSupported";// 10043 不支持指定的端口
  34. case 10048: return "sckAddressInUse";// 10048 地址在使用中
  35. case 10049: return "sckAddressNotAvailable";// 10049 来自本地机器的不可用地址
  36. case 10050: return "sckNetworkSubsystemFailed";// 10050 网络子系统失败
  37. case 10051: return "sckNetworkUnreachable";// 10051 当前不能从主机到达网络
  38. case 10052: return "sckNetReset 10052";// 在设置SO_KEEPALIVE时连接超时
  39. case 10053: return "sckConnectAborted";// 10053 由于超时或者其它失败而中止接连
  40. case 10054: return "sckConnectionReset";// 10054 通过远端重新设置连接
  41. case 10055: return "sckNoBufferSpace";// 10055 没有可用的缓存空间
  42. case 10056: return "sckAlreadyConnected";// 10056 已连接的套接字
  43. case 10057: return "sckNotConnected";// 10057 未接连套接字
  44. case 10058: return "sckSockedShutdown";// 10058 已关闭套接字
  45. case 10060: return "sckTimedout";// 10060 套接字超时
  46. case 10061: return "sckConnectionRefused";// 10061 强行拒绝连接
  47. case 10093: return "sckNotInitialized";// 10093 套接字没有初始化
  48. case 11001: return "sckHostNotFound";// 11001 授权应答:未找到主机
  49. case 11002: return "sckHostNotFoundTryAgain";// 11002 非授权应答:未找到主机,重试
  50. case 11003: return "sckNonRecoverableError";// 11003 不可恢复的错误
  51. case 11004: return "sckNoData";// 11004 无效名,对所请求的类型无数据记录
  52. default: return string("Socket error %d", ErrorCode);
  53. }
  54. }
  55. int socket_set_keepalive(int &fd)
  56. {
  57. int keepalive = 1;
  58. int keepidle = 10; // 10秒无活动开始探测
  59. int keepintvl = 5; // 5秒探测间隔
  60. int keepcnt = 3; // 3次探测失败断开
  61. if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(keepalive)))
  62. {
  63. return -1;
  64. }
  65. if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &keepidle, sizeof(keepidle))) {
  66. return -2;
  67. }
  68. if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &keepintvl, sizeof(keepintvl))) {
  69. return -3;
  70. }
  71. if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &keepcnt, sizeof(keepcnt))) {
  72. return -4;
  73. }
  74. return 0;
  75. }
  76. P2P_Module_Base::P2P_Module_Base()
  77. {
  78. m_Port = 0;
  79. m_MessageBuffSize = 1024 * 1024;//set 1M limits
  80. m_pszMessageBuff = new char[m_MessageBuffSize];
  81. m_pMainDevBus = NULL;
  82. Socketfd = -1;
  83. m_SocketTarget = -1;
  84. m_pszIpAddress = new char[256];
  85. m_pszLocalBusId = new char[256];
  86. m_pszRemoteBusId = new char[256];
  87. memset(m_pszIpAddress, 0, 256);
  88. memset(m_pszLocalBusId, 0, 256);
  89. memset(m_pszRemoteBusId, 0, 256);
  90. m_ConnectionReady = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET,false);
  91. }
  92. P2P_Module_Base::~P2P_Module_Base()
  93. {
  94. Disconnect();
  95. delete[]m_pszIpAddress;
  96. delete[]m_pszLocalBusId;
  97. delete[]m_pszRemoteBusId;
  98. m_pszIpAddress = NULL;
  99. m_pszLocalBusId = NULL;
  100. m_pszRemoteBusId = NULL;
  101. delete[]m_pszMessageBuff;
  102. m_pszMessageBuff = NULL;
  103. }
  104. //init
  105. void P2P_Module_Base::InitP2P(const char *pszIp, const char *pszLocalBusId, bool AsServer, PVOID pParent)
  106. {
  107. strncpy(m_pszIpAddress, pszIp, 255);
  108. strncpy(m_pszLocalBusId, pszLocalBusId, 255);
  109. m_Port = (unsigned short)(CCOS_HW_CHANNEL/10) + 3;//80000 is out of WORD limits
  110. m_AsServer = AsServer;
  111. m_pMainDevBus = pParent;
  112. /*string logfile = GetProcessDirectory() + "\\logs\\Local_p2pDispathlog.log";
  113. Logger *pLog = CreateLogger();
  114. pLog->SetLogFilepath(logfile.c_str());
  115. SetLogger(pLog);*/
  116. }
  117. //connect
  118. bool P2P_Module_Base::ConnectP2P()
  119. {
  120. return false;
  121. }
  122. //disconnect
  123. void P2P_Module_Base::Disconnect()
  124. {
  125. }
  126. bool P2P_Module_Base::IsConnected()
  127. {
  128. return false;
  129. }
  130. //Blob的定义
  131. //4字节 大长度:4字节Msg的长度:Msg内容,4字节Block的长度:Block内容
  132. //必须存在项为 大长度,Msg长度,Block长度,如果无此内容,其值为0
  133. bool P2P_Module_Base::SendRaw(int &sck, const char *pData, DWORD size, bool Sync)
  134. {
  135. uint32_t Sented = 0;
  136. int Sendonce = 0;
  137. while (Sented < size) {
  138. Thread_Lock();
  139. if (WaitTheThreadEndSign(0)) {
  140. Thread_UnLock();
  141. return false;
  142. }
  143. if (sck != -1) {
  144. if (!Sync) {
  145. fd_set WriteSet;
  146. struct timeval tm;
  147. tm.tv_sec = P2P_CHECK_PERIOD / 1000;
  148. tm.tv_usec = (P2P_CHECK_PERIOD % 1000) * 1000;
  149. FD_ZERO(&WriteSet);
  150. FD_SET(sck, &WriteSet);
  151. int sel_ret = select(sck + 1, NULL, &WriteSet, NULL, &tm);
  152. if (sel_ret <= 0) {
  153. // 处理错误或超时
  154. }
  155. }
  156. Sendonce = send(sck, pData + Sented, size - Sented, 0);
  157. if (Sendonce > 0) {
  158. Sented += Sendonce;
  159. }
  160. else {
  161. close(sck);
  162. sck = -1;
  163. Thread_UnLock();
  164. return false;
  165. }
  166. }
  167. else {
  168. Thread_UnLock();
  169. return false;
  170. }
  171. Thread_UnLock();
  172. }
  173. return true;
  174. }
  175. bool P2P_Module_Base::ReadRawSync(int &sck, char *pszBuff, DWORD ExpectedSize)
  176. {
  177. DWORD Total = 0;
  178. int Received = 0;
  179. //////mLog::FINFO("Enter ReadRawSync");
  180. while (Total < ExpectedSize)
  181. {
  182. Thread_Lock();
  183. if (WaitTheThreadEndSign(0) == false)
  184. {
  185. if (sck != -1)
  186. {
  187. //ok
  188. //////mLog::FINFO("br--");
  189. Received = recv(sck, (char*)&pszBuff[Total], ExpectedSize - Total, 0);
  190. //////mLog::FINFO("ar--:{$}", Received);
  191. if (Received > 0)
  192. {
  193. Total += Received;
  194. }
  195. else if (Received == 0) // 对端关闭连接
  196. {
  197. close(sck);
  198. sck = -1;
  199. Thread_UnLock();
  200. //////mLog::FERROR("socket closed by peer\n");
  201. return false;
  202. }
  203. else
  204. {
  205. int err = errno;
  206. close(sck);
  207. sck = -1;
  208. Thread_UnLock();
  209. // 处理常见的非阻塞错误
  210. if (err == EAGAIN || err == EWOULDBLOCK) {
  211. // 对于同步读取,这通常表示超时,但在同步模式中不应发生
  212. //////mLog::FERROR("socket timeout: %d\n", err);
  213. }
  214. else {
  215. //////mLog::FERROR("socket error: %d\n", err);
  216. }
  217. //socket error,or closed
  218. //string errinfo = FormatLinkError(WSAGetLastError());
  219. //////mLog::FERROR("socket error:{$}\n", errinfo.c_str());
  220. return false;
  221. }
  222. }
  223. }
  224. Thread_UnLock();
  225. }
  226. //////mLog::FINFO("Leave ReadRawSync OK");
  227. return true;
  228. }
  229. bool P2P_Module_Base::ReadRaw(int &sck, char *pszBuff, DWORD ExpectedSize)
  230. {
  231. DWORD Total = 0;
  232. int Received = 0;
  233. //////mLog::FINFO("Enter ReadRaw");
  234. while (Total < ExpectedSize)
  235. {
  236. Thread_Lock();
  237. if (WaitTheThreadEndSign(0) == false)
  238. {
  239. if (sck != -1)
  240. {
  241. //check readable
  242. // 超时时间
  243. struct timeval tm;
  244. fd_set ReadSet;
  245. tm.tv_sec = P2P_CHECK_PERIOD / 1000;
  246. tm.tv_usec = P2P_CHECK_PERIOD % 1000 * 1000;
  247. FD_ZERO(&ReadSet);
  248. FD_SET(sck, &ReadSet);
  249. //////mLog::FINFO("bs--");
  250. int sel_ret = select(sck + 1, &ReadSet, NULL, NULL, &tm);
  251. //////mLog::FINFO("as--");
  252. if (sel_ret > 0)
  253. {
  254. //ok
  255. //////mLog::FINFO("br--");
  256. Received = recv(sck, (char*)&pszBuff[Total], ExpectedSize - Total, 0);
  257. //////mLog::FINFO("ar--:{$}", Received);
  258. if (Received > 0)
  259. {
  260. Total += Received;
  261. }
  262. else if (Received == 0) // 对端关闭连接
  263. {
  264. close(sck);
  265. sck = -1;
  266. Thread_UnLock();
  267. //////mLog::FERROR("socket closed by peer\n");
  268. return false;
  269. }
  270. else
  271. {
  272. int err = errno;
  273. close(sck);
  274. sck = -1;
  275. Thread_UnLock();
  276. if (err == EAGAIN || err == EWOULDBLOCK) {
  277. //////mLog::FERROR("socket timeout: %d\n", err);
  278. }
  279. else {
  280. //////mLog::FERROR("socket recv error: %s (%d)\n", strerror(err), err);
  281. }
  282. return false;
  283. }
  284. }
  285. else if (sel_ret < 0)
  286. {
  287. int err = errno;
  288. close(sck);
  289. sck = -1;
  290. Thread_UnLock();
  291. //////mLog::FERROR("select error: %s (%d)\n", strerror(err), err);
  292. return false;
  293. }
  294. //timeout
  295. }
  296. else
  297. {
  298. //socket erroe
  299. Thread_UnLock();
  300. //////mLog::FERROR("socket error:using empty sck.\n");
  301. return false;
  302. }
  303. }
  304. else
  305. {
  306. if (sck)
  307. {
  308. close(sck);
  309. sck = -1;
  310. }
  311. Thread_UnLock();
  312. //////mLog::FINFO("exit thread.close socket .\n");
  313. return false;
  314. }
  315. Thread_UnLock();
  316. }
  317. //////mLog::FINFO("Leave ReadRaw OK");
  318. return true;
  319. }
  320. //send block
  321. bool P2P_Module_Base::SendBlob(int &sck, const char *pMsg, const char *pBlock, DWORD size, bool Sync)
  322. {
  323. bool ret = false;
  324. DWORD MsgLen = 0;
  325. //Logger *pLog = (Logger*)GetLogger();
  326. if (pMsg)
  327. {
  328. MsgLen = (DWORD)strlen(pMsg);
  329. }
  330. DWORD BlockLen = size;
  331. DWORD TotalLen = MsgLen + BlockLen + 4 + 4;
  332. if (MsgLen > m_MessageBuffSize - 12)
  333. {
  334. //////mLog::FERROR( "SendBlob error.Message size is too big\n");
  335. return false;
  336. }
  337. int MsgPartSize = 0;
  338. //copy total Len
  339. memcpy(&m_pszMessageBuff[MsgPartSize], (const char*)&TotalLen, sizeof(TotalLen));
  340. MsgPartSize += sizeof(TotalLen);
  341. //copy msg Len
  342. memcpy(&m_pszMessageBuff[MsgPartSize], (const char*)&MsgLen, sizeof(MsgLen));
  343. MsgPartSize += sizeof(MsgLen);
  344. //copy msg
  345. memcpy(&m_pszMessageBuff[MsgPartSize], pMsg, MsgLen);
  346. MsgPartSize += MsgLen;
  347. //copy BlockLen
  348. memcpy(&m_pszMessageBuff[MsgPartSize], (const char*)&BlockLen, sizeof(BlockLen));
  349. MsgPartSize += sizeof(BlockLen);
  350. if (MsgLen > 0)
  351. {
  352. //////mLog::FINFO( "TotalLen = {$},MsgLen = {$},BlobLen = {$}.MsgInfo:{\n{$}}", TotalLen, MsgLen, BlockLen, pMsg);
  353. }
  354. //connection is steady
  355. Thread_Lock();
  356. if (sck)
  357. {
  358. bool Sent = SendRaw(sck, m_pszMessageBuff, MsgPartSize);
  359. if (Sent)
  360. {
  361. ////mLog::FINFO( "write head part succeed.size = {$}", MsgPartSize);
  362. if (BlockLen > 0)
  363. {
  364. Sent = SendRaw(sck, (const char*)pBlock, BlockLen);
  365. if (Sent)
  366. {
  367. ////mLog::FINFO( "write Block data: {$} succeed\n", BlockLen);
  368. ////mLog::FINFO( "write Blob: succeed----------------\n");
  369. ret = true;
  370. }
  371. }
  372. else
  373. {
  374. ////mLog::FINFO( "write MessageOnly: succeed----------------\n");
  375. ret = true;
  376. }
  377. }
  378. }
  379. if (ret == false)
  380. {
  381. ////mLog::FERROR( "SendBlob error.close p2p socket\n");
  382. }
  383. Thread_UnLock();
  384. return ret;
  385. }
  386. void P2P_Module_Base::Quit()
  387. {
  388. }
  389. bool P2P_Module_Base::SendBlob(const char *pMsg, const char *pBlock, DWORD size)
  390. {
  391. bool ret = false;
  392. return ret;
  393. }
  394. bool P2P_Module_Base::ReadBlobSync(int &sck, char *pMsg, DWORD &MsgSize, char *pBlock, DWORD &size)
  395. {
  396. bool ret = false;
  397. DWORD TotalLen = 0;
  398. DWORD MsgLen = 0;
  399. DWORD BlockLen = 0;
  400. do {
  401. //head
  402. ret = ReadRawSync(sck, (char*)&TotalLen, sizeof(TotalLen));
  403. if (ret == false || TotalLen < 8)
  404. {
  405. break;
  406. }
  407. ////mLog::FDEBUG("read head:{$}\n", TotalLen);
  408. //msg len
  409. ret = ReadRawSync(sck, (char*)&MsgLen, sizeof(MsgLen));
  410. if (ret == false || MsgLen + 8 > TotalLen || (MsgLen > MsgSize))
  411. {
  412. break;
  413. }
  414. ////mLog::FDEBUG("read MsgLen:{$}\n", MsgLen);
  415. //msg info
  416. if (MsgLen > 0)
  417. {
  418. ret = ReadRawSync(sck, pMsg, MsgLen);
  419. if (ret == false)
  420. {
  421. break;
  422. }
  423. pMsg[MsgLen] = 0;
  424. ////mLog::FDEBUG("read Msg:{$}\n", pMsg);
  425. }
  426. //Block Len
  427. ret = ReadRawSync(sck, (char*)&BlockLen, sizeof(BlockLen));
  428. if (ret == false || BlockLen + MsgLen + 8 > TotalLen || (BlockLen > size))
  429. {
  430. break;
  431. }
  432. ////mLog::FDEBUG("read BlockLen:{$}\n", BlockLen);
  433. //block info
  434. if (BlockLen > 0)
  435. {
  436. if (pBlock == NULL)
  437. {
  438. break;
  439. }
  440. ret = ReadRawSync(sck, pBlock, BlockLen);
  441. if (ret == false)
  442. {
  443. break;
  444. }
  445. ////mLog::FDEBUG("read Block data Succeed\n");
  446. }
  447. MsgSize = MsgLen;
  448. size = BlockLen;
  449. ////mLog::FDEBUG("read Block Succeed-\n");
  450. return true;
  451. } while (0);
  452. return false;
  453. }
  454. //Read block
  455. bool P2P_Module_Base::ReadBlob(int &sck, char *pMsg, DWORD &MsgSize, char *pBlock, DWORD &size)
  456. {
  457. bool ret = false;
  458. DWORD TotalLen = 0;
  459. DWORD MsgLen = 0;
  460. DWORD BlockLen = 0;
  461. do {
  462. //head
  463. ret = ReadRaw(sck,(char*)&TotalLen, sizeof(TotalLen));
  464. if (ret == false || TotalLen < 8)
  465. {
  466. break;
  467. }
  468. ////mLog::FINFO("read head:{$}\n", TotalLen);
  469. //msg len
  470. ret = ReadRaw(sck, (char*)&MsgLen, sizeof(MsgLen));
  471. if (ret == false || MsgLen + 8 > TotalLen || (MsgLen > MsgSize))
  472. {
  473. break;
  474. }
  475. ////mLog::FINFO("read MsgLen:{$}\n", MsgLen);
  476. //msg info
  477. if (MsgLen > 0)
  478. {
  479. ret = ReadRaw(sck, pMsg, MsgLen);
  480. if (ret == false)
  481. {
  482. break;
  483. }
  484. pMsg[MsgLen] = 0;
  485. ////mLog::FINFO("read Msg:{$}\n", pMsg);
  486. }
  487. //Block Len
  488. ret = ReadRaw(sck, (char*)&BlockLen, sizeof(BlockLen));
  489. if (ret == false || BlockLen + MsgLen + 8 > TotalLen || (BlockLen > size))
  490. {
  491. break;
  492. }
  493. ////mLog::FINFO("read BlockLen:{$}\n", BlockLen);
  494. //block info
  495. if (BlockLen > 0)
  496. {
  497. if (pBlock == NULL)
  498. {
  499. break;
  500. }
  501. ret = ReadRaw(sck, pBlock, BlockLen);
  502. if (ret == false)
  503. {
  504. break;
  505. }
  506. ////mLog::FINFO("read Block data Succeed\n");
  507. }
  508. MsgSize = MsgLen;
  509. size = BlockLen;
  510. ////mLog::FINFO("read Block Succeed-\n");
  511. return true;
  512. } while (0);
  513. return false;
  514. }
  515. //callback
  516. void P2P_Module_Base::OnBlob(char *pMsg,unsigned char *pData, DWORD size)
  517. {
  518. //if (m_pMainDevBus)
  519. //{
  520. // ((DeviceBUS*)m_pMainDevBus)->BlobDataArrived(pMsg, pData, size);
  521. //}
  522. }
  523. bool P2P_Module_Base::Exec()
  524. {
  525. return false;
  526. }
  527. bool P2P_Module_Base::IsTargetBusId(const char *pszTargetBusId)
  528. {
  529. bool ret = false;
  530. //Thread_Lock();
  531. //only server can do this check....
  532. if ((m_pszRemoteBusId != NULL) && (pszTargetBusId != NULL) && (m_AsServer == true))
  533. {
  534. ret = (strcmp(m_pszRemoteBusId, pszTargetBusId) == 0);
  535. }
  536. //Thread_UnLock();
  537. return ret;
  538. }
  539. //---------------------------------------------------------------------------------------------------------------
  540. P2P_Module_Server::P2P_Module_Server()
  541. {
  542. m_HeartBeatTimeout = 0;
  543. }
  544. P2P_Module_Server::~P2P_Module_Server()
  545. {
  546. }
  547. //connect
  548. bool P2P_Module_Server::ConnectP2P()
  549. {
  550. //start thread
  551. return StartThread();
  552. }
  553. void P2P_Module_Server::Quit()
  554. {
  555. StopThread(1000);
  556. }
  557. //disconnect
  558. void P2P_Module_Server::Disconnect()
  559. {
  560. //stop thread
  561. //不能给太长时间,因为内部用的Socket是阻塞模式
  562. //StopThread(P2P_CHECK_PERIOD + 1000);
  563. // close socket in the internal thread
  564. }
  565. bool P2P_Module_Server::IsConnected()
  566. {
  567. if (m_SocketTarget != -1)
  568. {
  569. return m_ConnectionReady->Wait(0);
  570. }
  571. return false;
  572. }
  573. //-1:exit,0:socket error,1:succeed
  574. int P2P_Module_Server::TryInit()
  575. {
  576. while (WaitTheThreadEndSign(0) == false)
  577. {
  578. //init socket
  579. Socketfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  580. if (Socketfd == -1)
  581. {
  582. return 0;
  583. }
  584. sockaddr_in service;
  585. memset(&service, 0, sizeof(service));
  586. service.sin_family = AF_INET;
  587. service.sin_addr.s_addr = inet_addr(m_pszIpAddress);
  588. service.sin_port = htons(m_Port);
  589. //bind
  590. int reuse = 1;
  591. setsockopt(Socketfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
  592. if (bind(Socketfd, (struct sockaddr*)&service, sizeof(service)) < 0) {
  593. close(Socketfd);
  594. return 0;
  595. }
  596. //buff size
  597. //listen
  598. if (listen(Socketfd, SOMAXCONN) < 0) {
  599. close(Socketfd);
  600. return 0;
  601. }
  602. // 设置非阻塞
  603. int flags = fcntl(Socketfd, F_GETFL, 0);
  604. fcntl(Socketfd, F_SETFL, flags | O_NONBLOCK);
  605. return 1;
  606. }
  607. return -1;
  608. }
  609. //-1:exit,0:socket error,1:succeed
  610. int P2P_Module_Server::TryAccept()
  611. {
  612. while (WaitTheThreadEndSign(0) == false)
  613. {
  614. int sClient;
  615. fd_set ReadSet;
  616. struct sockaddr_in remoteAddr;
  617. socklen_t nAddrlen = sizeof(remoteAddr);
  618. // 超时时间
  619. struct timeval tm;
  620. tm.tv_sec = P2P_CHECK_PERIOD / 1000;
  621. tm.tv_usec = P2P_CHECK_PERIOD % 1000 * 1000;
  622. FD_ZERO(&ReadSet);
  623. FD_SET(Socketfd, &ReadSet);
  624. int max_fd = Socketfd + 1;
  625. int sel_ret = select(max_fd, &ReadSet, NULL, NULL, &tm);
  626. if (sel_ret > 0)
  627. {
  628. // 检查是否有连接请求
  629. if (FD_ISSET(Socketfd, &ReadSet))
  630. {
  631. //ok
  632. sClient = accept(Socketfd, (sockaddr*)&remoteAddr, &nAddrlen);
  633. if (sClient == -1)
  634. {
  635. int err = errno;
  636. if (err == EAGAIN || err == EWOULDBLOCK) {
  637. // 非阻塞模式下可能发生,继续尝试
  638. continue;
  639. }
  640. // 其他错误,记录并返回
  641. ////mLog::FERROR("accept failed: %s (%d)", strerror(err), err);
  642. return 0;
  643. }
  644. //accept succeed
  645. Thread_Lock();
  646. m_SocketTarget = sClient;
  647. Thread_UnLock();
  648. return 1;
  649. }
  650. }
  651. else if (sel_ret < 0)
  652. {
  653. int err = errno;
  654. if (err == EINTR) {
  655. // 被信号中断,继续尝试
  656. continue;
  657. }
  658. ////mLog::FERROR("select failed: %s (%d)", strerror(err), err);
  659. close(Socketfd);
  660. return 0;
  661. }
  662. //timeout
  663. }
  664. return -1;
  665. }
  666. //-1:exit,0:socket error
  667. int P2P_Module_Server::TryCheckConnection()
  668. {
  669. while (WaitTheThreadEndSign(100) == false)
  670. {
  671. {
  672. DWORD ret = Thread_Lock(1000);
  673. if (ret == WAIT_OBJECT_0)
  674. {
  675. //got lock
  676. if (m_SocketTarget)
  677. {
  678. //check connection
  679. // 超时时间
  680. fd_set ErrSet;
  681. struct timeval tm;
  682. tm.tv_sec = 0 / 1000;
  683. tm.tv_usec = 0 % 1000*1000;
  684. FD_ZERO(&ErrSet);
  685. FD_SET(Socketfd, &ErrSet);
  686. int sel_ret = select(-1, NULL, NULL, &ErrSet, &tm);
  687. if (sel_ret < 0)
  688. {
  689. int err = errno;
  690. close(m_SocketTarget);
  691. m_SocketTarget = -1;
  692. close(Socketfd);
  693. Thread_UnLock();
  694. //string errinfo = FormatLinkError(WSAGetLastError());
  695. ////mLog::FERROR("socket error: %s (%d)", strerror(err), err);
  696. return 0;//do over
  697. }
  698. //timeout or ok
  699. if ((GetTickCount() - m_HeartBeatTimeout) > 20000)
  700. {
  701. //heartbeat
  702. //server -> client
  703. //it might cause the m_SocketTarget gets empty
  704. if (SendBlob(m_pszLocalBusId, 0, 0))
  705. {
  706. ////mLog::FINFO("Server:Send heart beat\n");
  707. }
  708. }
  709. }
  710. if (m_SocketTarget == -1)
  711. {
  712. int err = errno;
  713. close(Socketfd);
  714. Thread_UnLock();
  715. //string errinfo = FormatLinkError(WSAGetLastError());
  716. ////mLog::FERROR("socket error: %s (%d)", strerror(err), err);
  717. return 0;//do over
  718. }
  719. Thread_UnLock();
  720. }
  721. }
  722. }
  723. //exit
  724. ////mLog::FINFO("Server:Stop Thread.exiting...\n");
  725. DWORD ret = Thread_Lock(1000);
  726. if (ret == WAIT_OBJECT_0)
  727. {
  728. //got lock
  729. if (m_SocketTarget)
  730. {
  731. close(m_SocketTarget);
  732. m_SocketTarget = -1;
  733. close(Socketfd);
  734. }
  735. Thread_UnLock();
  736. }
  737. else
  738. {
  739. //no lock ,no fxxx...
  740. ////mLog::FINFO("P2PServer:Failed CLose Socket...\n");
  741. }
  742. return -1;
  743. }
  744. bool P2P_Module_Server::Exec()
  745. {
  746. int ret = 0;
  747. ////mLog::FINFO("P2PServer:init Entry\n");
  748. m_ConnectionReady->ResetEvent();
  749. if (WaitTheThreadEndSign(100) == true)
  750. {
  751. return false;
  752. }
  753. //init socket
  754. ret = TryInit();
  755. if (ret < 0)
  756. {
  757. return false;
  758. }
  759. else if (ret == 0)
  760. {
  761. return true;
  762. }
  763. ////mLog::FINFO("P2PServer:try init succeed\n");
  764. //try accept
  765. ret = TryAccept();
  766. if (ret < 0)
  767. {
  768. return false;
  769. }
  770. else if (ret == 0)
  771. {
  772. return true;
  773. }
  774. ////mLog::FINFO("P2PServer:try accept succeed\n");
  775. //protocal one: read client's BusId
  776. //read targetbusId
  777. DWORD MsgSize = 256;
  778. DWORD BlockSize = 0;
  779. memset(m_pszRemoteBusId, 0, MsgSize);
  780. if (ReadBlob(m_SocketTarget,m_pszRemoteBusId, MsgSize, NULL, BlockSize) == false)
  781. {
  782. //handshake failed
  783. close(Socketfd);
  784. return true;
  785. }
  786. ////mLog::FINFO("P2PServer:try handshake succeed.{$}\n", m_pszRemoteBusId);
  787. int on = 1;
  788. //tcp no delay
  789. setsockopt(m_SocketTarget, IPPROTO_TCP, TCP_NODELAY, (const char *)&on, sizeof(on));
  790. int optVal = 1024 * 1280 * 6 + 6000;
  791. socklen_t optLen = sizeof(int);
  792. setsockopt(m_SocketTarget, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, optLen);
  793. ////mLog::FINFO("try set send buff size:{$}\n", optVal);
  794. getsockopt(m_SocketTarget, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, &optLen);
  795. ////mLog::FINFO("after try set send buff size:{$}\n", optVal);
  796. m_ConnectionReady->SetEvent();
  797. m_HeartBeatTimeout = GetTickCount();
  798. //do while and try check connection + exitflag
  799. ret = TryCheckConnection();
  800. if (ret < 0)
  801. {
  802. return false;
  803. }
  804. return true;
  805. }
  806. bool P2P_Module_Server::SendBlob(const char *pMsg, const char *pBlock, DWORD size)
  807. {
  808. bool ret = P2P_Module_Base::SendBlob(m_SocketTarget, pMsg, pBlock, size);
  809. if (ret)
  810. {
  811. m_HeartBeatTimeout = GetTickCount();
  812. }
  813. return ret;
  814. }
  815. //---------------------------------------------------------------------------------------------------------------
  816. P2P_Module_Client::P2P_Module_Client()
  817. {
  818. m_pszMsgBuff = new char[1024 * 1024];
  819. m_pszBlockBuff = new unsigned char[1024 * 1024 * 4];
  820. }
  821. P2P_Module_Client::~P2P_Module_Client()
  822. {
  823. delete []m_pszMsgBuff;
  824. m_pszMsgBuff = NULL;
  825. delete[]m_pszBlockBuff;
  826. m_pszBlockBuff = NULL;
  827. }
  828. //connect
  829. bool P2P_Module_Client::ConnectP2P()
  830. {
  831. return StartThread();
  832. }
  833. //disconnect
  834. void P2P_Module_Client::Disconnect()
  835. {
  836. StopThread(P2P_CHECK_PERIOD + 1000);
  837. }
  838. //send block,one way to client no send to anyone
  839. //virtual bool SendBlob(char *pMsg, char *pBlock, DWORD size);
  840. //callback
  841. //void P2P_Module_Client::OnBlob(char *pMsg, char *pData, DWORD size)
  842. //{
  843. //
  844. //}
  845. bool P2P_Module_Client::TryConnectServer()
  846. {
  847. while (WaitTheThreadEndSign(100) == false)
  848. {
  849. Socketfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  850. if (Socketfd == -1)
  851. {
  852. return false;
  853. }
  854. //u_long iMode = 1;
  855. //int ret = ioctlsocket(Socketfd, FIONBIO, &iMode);//non-blocking mode
  856. //if (ret == 0)
  857. {
  858. sockaddr_in addrSrv;
  859. memset(&addrSrv, 0, sizeof(addrSrv));
  860. addrSrv.sin_family = AF_INET;
  861. addrSrv.sin_port = htons(m_Port);
  862. inet_pton(AF_INET, m_pszIpAddress, &addrSrv.sin_addr);
  863. if (connect(Socketfd, (struct sockaddr*)&addrSrv, sizeof(addrSrv)) == 0) {
  864. socket_set_keepalive(Socketfd);
  865. return true;
  866. }
  867. //else
  868. //{
  869. // // 超时时间
  870. // struct timeval tm;
  871. // tm.tv_sec = P2P_CHECK_PERIOD / 1000;
  872. // tm.tv_usec = P2P_CHECK_PERIOD % 1000;
  873. // int selret = 0;
  874. // fd_set set;
  875. // FD_ZERO(&set);
  876. // FD_SET(Socketfd, &set);
  877. // Sleep(1000);
  878. // selret = select(-1, NULL, &set, NULL, &tm);
  879. // if (selret > 0)
  880. // {
  881. // int error = -1;
  882. // int optLen = sizeof(int);
  883. // getsockopt(Socketfd, SOL_SOCKET, SO_ERROR, (char*)&error, &optLen);
  884. // // 之所以下面的程序不写成三目运算符的形式, 是为了更直观, 便于注释
  885. // if (0 == error)
  886. // {
  887. // //just try it
  888. // socket_set_keepalive(Socketfd);
  889. // return StartThread();
  890. // }
  891. // }
  892. //}
  893. }
  894. close(Socketfd);
  895. Socketfd = -1;
  896. }
  897. return false;
  898. }
  899. bool P2P_Module_Client::Exec()
  900. {
  901. if (TryConnectServer())
  902. {
  903. int on = 1;
  904. //tcp no delay
  905. setsockopt(Socketfd, IPPROTO_TCP, TCP_NODELAY, (const char *)&on, sizeof(on));
  906. int optVal = 1024 * 1280 * 6 + 6000;
  907. socklen_t optLen = sizeof(int);
  908. setsockopt(Socketfd, SOL_SOCKET, SO_RCVBUF, (char*)&optVal, optLen);
  909. ////mLog::FINFO("try set read buff size:{$}\n", optVal);
  910. getsockopt(Socketfd, SOL_SOCKET, SO_RCVBUF, (char*)&optVal, &optLen);
  911. ////mLog::FINFO("after try set read buff size:{$}\n", optVal);
  912. //send local BusId
  913. bool ret = P2P_Module_Base::SendBlob(Socketfd, m_pszLocalBusId, 0, 0, true);
  914. if (ret == false)
  915. {
  916. close(Socketfd);
  917. Socketfd = -1;
  918. ////mLog::FERROR("Send Handshake failed");
  919. return false;
  920. }
  921. ////mLog::FINFO("Handshake succeed:\n");
  922. //u_long iMode = 0;
  923. //ioctlsocket(Socketfd, FIONBIO, &iMode);//blocking mode
  924. DWORD MsgSize = 1024 * 1024;
  925. DWORD BlockSize = 1024 * 1024 * 4;
  926. while (ReadBlobSync(Socketfd, m_pszMsgBuff, MsgSize, (char*)m_pszBlockBuff, BlockSize))
  927. {
  928. ////mLog::FINFO("Read Block Data succeed:\n");
  929. if (BlockSize > 0)
  930. {
  931. //////mLog::FINFO("Notify Block Data begin\n");
  932. OnBlob(m_pszMsgBuff, m_pszBlockBuff, BlockSize);
  933. //////mLog::FINFO("Notify Block Data end\n");
  934. }
  935. else
  936. {
  937. //ignore others
  938. //printf("got Heartbeat--------------------------\n\n\n");
  939. ////mLog::FINFO("got Heartbeat--------------------------");
  940. }
  941. //reinit the buff
  942. MsgSize = 1024 * 1024;
  943. BlockSize = 1024 * 1024 * 4;
  944. }
  945. ////mLog::FINFO("Exit Thread--------------------------");
  946. if (Thread_Lock(1000) == WAIT_OBJECT_0)
  947. {
  948. if (Socketfd)
  949. {
  950. close(Socketfd);
  951. Socketfd = -1;
  952. }
  953. Thread_UnLock();
  954. }
  955. else
  956. {
  957. //no can do...
  958. }
  959. }
  960. if (WaitTheThreadEndSign(0))
  961. {
  962. return false;
  963. }
  964. return true;
  965. }