P2PModule.cpp 24 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184
  1. #include "P2PModule.h"
  2. #include <sys/time.h>
  3. #include <sys/select.h>
  4. #include <netinet/tcp.h>
  5. #include <cstdio>
  6. #include "DeviceBus.h"
  7. #include "common_api.h"
  8. #include "LogLocalHelper.h"
  9. #include "Log4CPP.h"
  10. //using namespace std;
  11. #define P2P_CHECK_PERIOD (2000)
  12. string FormatLinkError(int ErrorCode)
  13. {
  14. switch (ErrorCode)
  15. {
  16. case 7: return "sckOutOfMemory";// 7 内存不足
  17. case 380: return "sckInvalidPropertyValue";// 380 属性值无效
  18. case 394: return "sckGetNotSupported";// 394 属性不可读
  19. case 383: return "sckGetNotSupported";// 383 属性是只读的
  20. case 40006: return "sckBadState";// 40006 所请求的事务或请求本身的错误协议或者错误连接状态
  21. case 40014: return "sckInvalidArg";// 40014 传递给函数的参数格式不确定,或者不在指定范围内
  22. case 40017: return "sckSuccess";// 40017 成功
  23. case 40018: return "sckUnsupported";// 40018 不支持的变量类型
  24. case 40020: return "sckInvalidOp";// 40020 在当前状态下的无效操作
  25. case 40021: return "sckOutOfRange";// 40021 参数越界
  26. case 40026: return "sckWrongProtocol";// 40026 所请求的事务或请求本身的错误协议
  27. case 10004: return "sckOpCanceled";// 10004 取消操作
  28. case 10014: return "sckInvalidArgument";// 10014 所请求的地址是广播地址,但未设置标记
  29. case 10035: return "sckWouldBlock";// 10035 套接字不成块,而指定操作将使之成块
  30. case 10036: return "sckInProgress";// 10036 制造块的Winsock操作在进行之中
  31. case 10037: return "sckAlreadyComplete";// 10037 完成操作。未进行制作块的操作
  32. case 10038: return "sckNotSocket";// 10038 描述符不是套接字
  33. case 10040: return "sckMsgTooBig";// 10040 数据太大,不适于缓冲区的要求,因而被截断
  34. case 10043: return "sckPortNotSupported";// 10043 不支持指定的端口
  35. case 10048: return "sckAddressInUse";// 10048 地址在使用中
  36. case 10049: return "sckAddressNotAvailable";// 10049 来自本地机器的不可用地址
  37. case 10050: return "sckNetworkSubsystemFailed";// 10050 网络子系统失败
  38. case 10051: return "sckNetworkUnreachable";// 10051 当前不能从主机到达网络
  39. case 10052: return "sckNetReset 10052";// 在设置SO_KEEPALIVE时连接超时
  40. case 10053: return "sckConnectAborted";// 10053 由于超时或者其它失败而中止接连
  41. case 10054: return "sckConnectionReset";// 10054 通过远端重新设置连接
  42. case 10055: return "sckNoBufferSpace";// 10055 没有可用的缓存空间
  43. case 10056: return "sckAlreadyConnected";// 10056 已连接的套接字
  44. case 10057: return "sckNotConnected";// 10057 未接连套接字
  45. case 10058: return "sckSockedShutdown";// 10058 已关闭套接字
  46. case 10060: return "sckTimedout";// 10060 套接字超时
  47. case 10061: return "sckConnectionRefused";// 10061 强行拒绝连接
  48. case 10093: return "sckNotInitialized";// 10093 套接字没有初始化
  49. case 11001: return "sckHostNotFound";// 11001 授权应答:未找到主机
  50. case 11002: return "sckHostNotFoundTryAgain";// 11002 非授权应答:未找到主机,重试
  51. case 11003: return "sckNonRecoverableError";// 11003 不可恢复的错误
  52. case 11004: return "sckNoData";// 11004 无效名,对所请求的类型无数据记录
  53. default: return string("Socket error %d", ErrorCode);
  54. }
  55. }
  56. int socket_set_keepalive(int &fd)
  57. {
  58. int keepalive = 1;
  59. int keepidle = 10; // 10秒无活动开始探测
  60. int keepintvl = 5; // 5秒探测间隔
  61. int keepcnt = 3; // 3次探测失败断开
  62. if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(keepalive)))
  63. {
  64. return -1;
  65. }
  66. if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &keepidle, sizeof(keepidle))) {
  67. return -2;
  68. }
  69. if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &keepintvl, sizeof(keepintvl))) {
  70. return -3;
  71. }
  72. if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &keepcnt, sizeof(keepcnt))) {
  73. return -4;
  74. }
  75. return 0;
  76. }
  77. P2P_Module_Base::P2P_Module_Base()
  78. {
  79. m_Port = 0;
  80. m_MessageBuffSize = 1024 * 1024;//set 1M limits
  81. m_pszMessageBuff = new char[m_MessageBuffSize];
  82. m_pMainDevBus = NULL;
  83. Socketfd = -1;
  84. m_SocketTarget = -1;
  85. m_pszIpAddress = new char[256];
  86. m_pszLocalBusId = new char[256];
  87. m_pszRemoteBusId = new char[256];
  88. memset(m_pszIpAddress, 0, 256);
  89. memset(m_pszLocalBusId, 0, 256);
  90. memset(m_pszRemoteBusId, 0, 256);
  91. m_ConnectionReady = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET,false);
  92. }
  93. P2P_Module_Base::~P2P_Module_Base()
  94. {
  95. Disconnect();
  96. delete[]m_pszIpAddress;
  97. delete[]m_pszLocalBusId;
  98. delete[]m_pszRemoteBusId;
  99. m_pszIpAddress = NULL;
  100. m_pszLocalBusId = NULL;
  101. m_pszRemoteBusId = NULL;
  102. delete[]m_pszMessageBuff;
  103. m_pszMessageBuff = NULL;
  104. }
  105. //init
  106. void P2P_Module_Base::InitP2P(const char *pszIp, const char *pszLocalBusId, bool AsServer, PVOID pParent)
  107. {
  108. strncpy(m_pszIpAddress, pszIp, 255);
  109. strncpy(m_pszLocalBusId, pszLocalBusId, 255);
  110. m_Port = (unsigned short)(CCOS_HW_CHANNEL/10) + 3;//80000 is out of WORD limits
  111. m_AsServer = AsServer;
  112. m_pMainDevBus = pParent;
  113. /*string logfile = GetProcessDirectory() + "\\logs\\Local_p2pDispathlog.log";
  114. Logger *pLog = CreateLogger();
  115. pLog->SetLogFilepath(logfile.c_str());
  116. SetLogger(pLog);*/
  117. }
  118. //connect
  119. bool P2P_Module_Base::ConnectP2P()
  120. {
  121. return false;
  122. }
  123. //disconnect
  124. void P2P_Module_Base::Disconnect()
  125. {
  126. }
  127. bool P2P_Module_Base::IsConnected()
  128. {
  129. return false;
  130. }
  131. //Blob的定义
  132. //4字节 大长度:4字节Msg的长度:Msg内容,4字节Block的长度:Block内容
  133. //必须存在项为 大长度,Msg长度,Block长度,如果无此内容,其值为0
  134. bool P2P_Module_Base::SendRaw(int &sck, const char *pData, DWORD size, bool Sync)
  135. {
  136. uint32_t Sented = 0;
  137. int Sendonce = 0;
  138. while (Sented < size) {
  139. Thread_Lock();
  140. if (WaitTheThreadEndSign(0)) {
  141. Thread_UnLock();
  142. return false;
  143. }
  144. if (sck != -1) {
  145. if (!Sync) {
  146. fd_set WriteSet;
  147. struct timeval tm;
  148. tm.tv_sec = P2P_CHECK_PERIOD / 1000;
  149. tm.tv_usec = (P2P_CHECK_PERIOD % 1000) * 1000;
  150. FD_ZERO(&WriteSet);
  151. FD_SET(sck, &WriteSet);
  152. int sel_ret = select(sck + 1, NULL, &WriteSet, NULL, &tm);
  153. if (sel_ret <= 0) {
  154. // 处理错误或超时
  155. }
  156. }
  157. Sendonce = send(sck, pData + Sented, size - Sented, 0);
  158. if (Sendonce > 0) {
  159. Sented += Sendonce;
  160. }
  161. else {
  162. close(sck);
  163. sck = -1;
  164. Thread_UnLock();
  165. return false;
  166. }
  167. }
  168. else {
  169. Thread_UnLock();
  170. return false;
  171. }
  172. Thread_UnLock();
  173. }
  174. return true;
  175. }
  176. bool P2P_Module_Base::ReadRawSync(int &sck, char *pszBuff, DWORD ExpectedSize)
  177. {
  178. DWORD Total = 0;
  179. int Received = 0;
  180. //FINFO("Enter ReadRawSync");
  181. while (Total < ExpectedSize)
  182. {
  183. Thread_Lock();
  184. if (WaitTheThreadEndSign(0) == false)
  185. {
  186. if (sck != -1)
  187. {
  188. //ok
  189. //FINFO("br--");
  190. Received = recv(sck, (char*)&pszBuff[Total], ExpectedSize - Total, 0);
  191. //FINFO("ar--:{$}", Received);
  192. if (Received > 0)
  193. {
  194. Total += Received;
  195. }
  196. else if (Received == 0) // 对端关闭连接
  197. {
  198. close(sck);
  199. sck = -1;
  200. Thread_UnLock();
  201. //FERROR("socket closed by peer\n");
  202. return false;
  203. }
  204. else
  205. {
  206. int err = errno;
  207. close(sck);
  208. sck = -1;
  209. Thread_UnLock();
  210. // 处理常见的非阻塞错误
  211. if (err == EAGAIN || err == EWOULDBLOCK) {
  212. // 对于同步读取,这通常表示超时,但在同步模式中不应发生
  213. //FERROR("socket timeout: %d\n", err);
  214. }
  215. else {
  216. //FERROR("socket error: %d\n", err);
  217. }
  218. //socket error,or closed
  219. //string errinfo = FormatLinkError(WSAGetLastError());
  220. //FERROR("socket error:{$}\n", errinfo.c_str());
  221. return false;
  222. }
  223. }
  224. }
  225. Thread_UnLock();
  226. }
  227. //FINFO("Leave ReadRawSync OK");
  228. return true;
  229. }
  230. bool P2P_Module_Base::ReadRaw(int &sck, char *pszBuff, DWORD ExpectedSize)
  231. {
  232. DWORD Total = 0;
  233. int Received = 0;
  234. //FINFO("Enter ReadRaw");
  235. while (Total < ExpectedSize)
  236. {
  237. Thread_Lock();
  238. if (WaitTheThreadEndSign(0) == false)
  239. {
  240. if (sck != -1)
  241. {
  242. //check readable
  243. // 超时时间
  244. struct timeval tm;
  245. fd_set ReadSet;
  246. tm.tv_sec = P2P_CHECK_PERIOD / 1000;
  247. tm.tv_usec = P2P_CHECK_PERIOD % 1000 * 1000;
  248. FD_ZERO(&ReadSet);
  249. FD_SET(sck, &ReadSet);
  250. //FINFO("bs--");
  251. int sel_ret = select(sck + 1, &ReadSet, NULL, NULL, &tm);
  252. //FINFO("as--");
  253. if (sel_ret > 0)
  254. {
  255. //ok
  256. //FINFO("br--");
  257. Received = recv(sck, (char*)&pszBuff[Total], ExpectedSize - Total, 0);
  258. //FINFO("ar--:{$}", Received);
  259. if (Received > 0)
  260. {
  261. Total += Received;
  262. }
  263. else if (Received == 0) // 对端关闭连接
  264. {
  265. close(sck);
  266. sck = -1;
  267. Thread_UnLock();
  268. //FERROR("socket closed by peer\n");
  269. return false;
  270. }
  271. else
  272. {
  273. int err = errno;
  274. close(sck);
  275. sck = -1;
  276. Thread_UnLock();
  277. if (err == EAGAIN || err == EWOULDBLOCK) {
  278. //FERROR("socket timeout: %d\n", err);
  279. }
  280. else {
  281. //FERROR("socket recv error: %s (%d)\n", strerror(err), err);
  282. }
  283. return false;
  284. }
  285. }
  286. else if (sel_ret < 0)
  287. {
  288. int err = errno;
  289. close(sck);
  290. sck = -1;
  291. Thread_UnLock();
  292. //FERROR("select error: %s (%d)\n", strerror(err), err);
  293. return false;
  294. }
  295. //timeout
  296. }
  297. else
  298. {
  299. //socket erroe
  300. Thread_UnLock();
  301. //FERROR("socket error:using empty sck.\n");
  302. return false;
  303. }
  304. }
  305. else
  306. {
  307. if (sck)
  308. {
  309. close(sck);
  310. sck = -1;
  311. }
  312. Thread_UnLock();
  313. //FINFO("exit thread.close socket .\n");
  314. return false;
  315. }
  316. Thread_UnLock();
  317. }
  318. //FINFO("Leave ReadRaw OK");
  319. return true;
  320. }
  321. //send block
  322. bool P2P_Module_Base::SendBlob(int &sck, const char *pMsg, const char *pBlock, DWORD size, bool Sync)
  323. {
  324. bool ret = false;
  325. DWORD MsgLen = 0;
  326. //Logger *pLog = (Logger*)GetLogger();
  327. if (pMsg)
  328. {
  329. MsgLen = (DWORD)strlen(pMsg);
  330. }
  331. DWORD BlockLen = size;
  332. DWORD TotalLen = MsgLen + BlockLen + 4 + 4;
  333. if (MsgLen > m_MessageBuffSize - 12)
  334. {
  335. //FERROR( "SendBlob error.Message size is too big\n");
  336. return false;
  337. }
  338. int MsgPartSize = 0;
  339. //copy total Len
  340. memcpy(&m_pszMessageBuff[MsgPartSize], (const char*)&TotalLen, sizeof(TotalLen));
  341. MsgPartSize += sizeof(TotalLen);
  342. //copy msg Len
  343. memcpy(&m_pszMessageBuff[MsgPartSize], (const char*)&MsgLen, sizeof(MsgLen));
  344. MsgPartSize += sizeof(MsgLen);
  345. //copy msg
  346. memcpy(&m_pszMessageBuff[MsgPartSize], pMsg, MsgLen);
  347. MsgPartSize += MsgLen;
  348. //copy BlockLen
  349. memcpy(&m_pszMessageBuff[MsgPartSize], (const char*)&BlockLen, sizeof(BlockLen));
  350. MsgPartSize += sizeof(BlockLen);
  351. if (MsgLen > 0)
  352. {
  353. //FINFO( "TotalLen = {$},MsgLen = {$},BlobLen = {$}.MsgInfo:{\n{$}}", TotalLen, MsgLen, BlockLen, pMsg);
  354. }
  355. //connection is steady
  356. Thread_Lock();
  357. if (sck)
  358. {
  359. bool Sent = SendRaw(sck, m_pszMessageBuff, MsgPartSize);
  360. if (Sent)
  361. {
  362. FINFO( "write head part succeed.size = {$}", MsgPartSize);
  363. if (BlockLen > 0)
  364. {
  365. Sent = SendRaw(sck, (const char*)pBlock, BlockLen);
  366. if (Sent)
  367. {
  368. FINFO( "write Block data: {$} succeed\n", BlockLen);
  369. FINFO( "write Blob: succeed----------------\n");
  370. ret = true;
  371. }
  372. }
  373. else
  374. {
  375. FINFO( "write MessageOnly: succeed----------------\n");
  376. ret = true;
  377. }
  378. }
  379. }
  380. if (ret == false)
  381. {
  382. FERROR( "SendBlob error.close p2p socket\n");
  383. }
  384. Thread_UnLock();
  385. return ret;
  386. }
  387. void P2P_Module_Base::Quit()
  388. {
  389. }
  390. bool P2P_Module_Base::SendBlob(const char *pMsg, const char *pBlock, DWORD size)
  391. {
  392. bool ret = false;
  393. return ret;
  394. }
  395. bool P2P_Module_Base::ReadBlobSync(int &sck, char *pMsg, DWORD &MsgSize, char *pBlock, DWORD &size)
  396. {
  397. bool ret = false;
  398. DWORD TotalLen = 0;
  399. DWORD MsgLen = 0;
  400. DWORD BlockLen = 0;
  401. do {
  402. //head
  403. ret = ReadRawSync(sck, (char*)&TotalLen, sizeof(TotalLen));
  404. if (ret == false || TotalLen < 8)
  405. {
  406. break;
  407. }
  408. FDEBUG("read head:{$}\n", TotalLen);
  409. //msg len
  410. ret = ReadRawSync(sck, (char*)&MsgLen, sizeof(MsgLen));
  411. if (ret == false || MsgLen + 8 > TotalLen || (MsgLen > MsgSize))
  412. {
  413. break;
  414. }
  415. FDEBUG("read MsgLen:{$}\n", MsgLen);
  416. //msg info
  417. if (MsgLen > 0)
  418. {
  419. ret = ReadRawSync(sck, pMsg, MsgLen);
  420. if (ret == false)
  421. {
  422. break;
  423. }
  424. pMsg[MsgLen] = 0;
  425. FDEBUG("read Msg:{$}\n", pMsg);
  426. }
  427. //Block Len
  428. ret = ReadRawSync(sck, (char*)&BlockLen, sizeof(BlockLen));
  429. if (ret == false || BlockLen + MsgLen + 8 > TotalLen || (BlockLen > size))
  430. {
  431. break;
  432. }
  433. FDEBUG("read BlockLen:{$}\n", BlockLen);
  434. //block info
  435. if (BlockLen > 0)
  436. {
  437. if (pBlock == NULL)
  438. {
  439. break;
  440. }
  441. ret = ReadRawSync(sck, pBlock, BlockLen);
  442. if (ret == false)
  443. {
  444. break;
  445. }
  446. FDEBUG("read Block data Succeed\n");
  447. }
  448. MsgSize = MsgLen;
  449. size = BlockLen;
  450. FDEBUG("read Block Succeed-\n");
  451. return true;
  452. } while (0);
  453. return false;
  454. }
  455. //Read block
  456. bool P2P_Module_Base::ReadBlob(int &sck, char *pMsg, DWORD &MsgSize, char *pBlock, DWORD &size)
  457. {
  458. bool ret = false;
  459. DWORD TotalLen = 0;
  460. DWORD MsgLen = 0;
  461. DWORD BlockLen = 0;
  462. do {
  463. //head
  464. ret = ReadRaw(sck,(char*)&TotalLen, sizeof(TotalLen));
  465. if (ret == false || TotalLen < 8)
  466. {
  467. break;
  468. }
  469. FINFO("read head:{$}\n", TotalLen);
  470. //msg len
  471. ret = ReadRaw(sck, (char*)&MsgLen, sizeof(MsgLen));
  472. if (ret == false || MsgLen + 8 > TotalLen || (MsgLen > MsgSize))
  473. {
  474. break;
  475. }
  476. FINFO("read MsgLen:{$}\n", MsgLen);
  477. //msg info
  478. if (MsgLen > 0)
  479. {
  480. ret = ReadRaw(sck, pMsg, MsgLen);
  481. if (ret == false)
  482. {
  483. break;
  484. }
  485. pMsg[MsgLen] = 0;
  486. FINFO("read Msg:{$}\n", pMsg);
  487. }
  488. //Block Len
  489. ret = ReadRaw(sck, (char*)&BlockLen, sizeof(BlockLen));
  490. if (ret == false || BlockLen + MsgLen + 8 > TotalLen || (BlockLen > size))
  491. {
  492. break;
  493. }
  494. FINFO("read BlockLen:{$}\n", BlockLen);
  495. //block info
  496. if (BlockLen > 0)
  497. {
  498. if (pBlock == NULL)
  499. {
  500. break;
  501. }
  502. ret = ReadRaw(sck, pBlock, BlockLen);
  503. if (ret == false)
  504. {
  505. break;
  506. }
  507. FINFO("read Block data Succeed\n");
  508. }
  509. MsgSize = MsgLen;
  510. size = BlockLen;
  511. FINFO("read Block Succeed-\n");
  512. return true;
  513. } while (0);
  514. return false;
  515. }
  516. //callback
  517. void P2P_Module_Base::OnBlob(char *pMsg,unsigned char *pData, DWORD size)
  518. {
  519. //if (m_pMainDevBus)
  520. //{
  521. // ((DeviceBUS*)m_pMainDevBus)->BlobDataArrived(pMsg, pData, size);
  522. //}
  523. }
  524. bool P2P_Module_Base::Exec()
  525. {
  526. return false;
  527. }
  528. bool P2P_Module_Base::IsTargetBusId(const char *pszTargetBusId)
  529. {
  530. bool ret = false;
  531. //Thread_Lock();
  532. //only server can do this check....
  533. if ((m_pszRemoteBusId != NULL) && (pszTargetBusId != NULL) && (m_AsServer == true))
  534. {
  535. ret = (strcmp(m_pszRemoteBusId, pszTargetBusId) == 0);
  536. }
  537. //Thread_UnLock();
  538. return ret;
  539. }
  540. //---------------------------------------------------------------------------------------------------------------
  541. P2P_Module_Server::P2P_Module_Server()
  542. {
  543. m_HeartBeatTimeout = 0;
  544. }
  545. P2P_Module_Server::~P2P_Module_Server()
  546. {
  547. }
  548. //connect
  549. bool P2P_Module_Server::ConnectP2P()
  550. {
  551. //start thread
  552. return StartThread();
  553. }
  554. void P2P_Module_Server::Quit()
  555. {
  556. StopThread(1000);
  557. }
  558. //disconnect
  559. void P2P_Module_Server::Disconnect()
  560. {
  561. //stop thread
  562. //不能给太长时间,因为内部用的Socket是阻塞模式
  563. //StopThread(P2P_CHECK_PERIOD + 1000);
  564. // close socket in the internal thread
  565. }
  566. bool P2P_Module_Server::IsConnected()
  567. {
  568. if (m_SocketTarget != -1)
  569. {
  570. return m_ConnectionReady->Wait(0);
  571. }
  572. return false;
  573. }
  574. //-1:exit,0:socket error,1:succeed
  575. int P2P_Module_Server::TryInit()
  576. {
  577. while (WaitTheThreadEndSign(0) == false)
  578. {
  579. //init socket
  580. Socketfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  581. if (Socketfd == -1)
  582. {
  583. return 0;
  584. }
  585. sockaddr_in service;
  586. memset(&service, 0, sizeof(service));
  587. service.sin_family = AF_INET;
  588. service.sin_addr.s_addr = inet_addr(m_pszIpAddress);
  589. service.sin_port = htons(m_Port);
  590. //bind
  591. int reuse = 1;
  592. setsockopt(Socketfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
  593. if (bind(Socketfd, (struct sockaddr*)&service, sizeof(service)) < 0) {
  594. close(Socketfd);
  595. return 0;
  596. }
  597. //buff size
  598. //listen
  599. if (listen(Socketfd, SOMAXCONN) < 0) {
  600. close(Socketfd);
  601. return 0;
  602. }
  603. // 设置非阻塞
  604. int flags = fcntl(Socketfd, F_GETFL, 0);
  605. fcntl(Socketfd, F_SETFL, flags | O_NONBLOCK);
  606. return 1;
  607. }
  608. return -1;
  609. }
  610. //-1:exit,0:socket error,1:succeed
  611. int P2P_Module_Server::TryAccept()
  612. {
  613. while (WaitTheThreadEndSign(0) == false)
  614. {
  615. int sClient;
  616. fd_set ReadSet;
  617. struct sockaddr_in remoteAddr;
  618. socklen_t nAddrlen = sizeof(remoteAddr);
  619. // 超时时间
  620. struct timeval tm;
  621. tm.tv_sec = P2P_CHECK_PERIOD / 1000;
  622. tm.tv_usec = P2P_CHECK_PERIOD % 1000 * 1000;
  623. FD_ZERO(&ReadSet);
  624. FD_SET(Socketfd, &ReadSet);
  625. int max_fd = Socketfd + 1;
  626. int sel_ret = select(max_fd, &ReadSet, NULL, NULL, &tm);
  627. if (sel_ret > 0)
  628. {
  629. // 检查是否有连接请求
  630. if (FD_ISSET(Socketfd, &ReadSet))
  631. {
  632. //ok
  633. sClient = accept(Socketfd, (sockaddr*)&remoteAddr, &nAddrlen);
  634. if (sClient == -1)
  635. {
  636. int err = errno;
  637. if (err == EAGAIN || err == EWOULDBLOCK) {
  638. // 非阻塞模式下可能发生,继续尝试
  639. continue;
  640. }
  641. // 其他错误,记录并返回
  642. FERROR("accept failed: %s (%d)", strerror(err), err);
  643. return 0;
  644. }
  645. //accept succeed
  646. Thread_Lock();
  647. m_SocketTarget = sClient;
  648. Thread_UnLock();
  649. return 1;
  650. }
  651. }
  652. else if (sel_ret < 0)
  653. {
  654. int err = errno;
  655. if (err == EINTR) {
  656. // 被信号中断,继续尝试
  657. continue;
  658. }
  659. FERROR("select failed: %s (%d)", strerror(err), err);
  660. close(Socketfd);
  661. return 0;
  662. }
  663. //timeout
  664. }
  665. return -1;
  666. }
  667. //-1:exit,0:socket error
  668. int P2P_Module_Server::TryCheckConnection()
  669. {
  670. while (WaitTheThreadEndSign(100) == false)
  671. {
  672. {
  673. DWORD ret = Thread_Lock(1000);
  674. if (ret == WAIT_OBJECT_0)
  675. {
  676. //got lock
  677. if (m_SocketTarget)
  678. {
  679. //check connection
  680. // 超时时间
  681. fd_set ErrSet;
  682. struct timeval tm;
  683. tm.tv_sec = 0 / 1000;
  684. tm.tv_usec = 0 % 1000*1000;
  685. FD_ZERO(&ErrSet);
  686. FD_SET(Socketfd, &ErrSet);
  687. int sel_ret = select(-1, NULL, NULL, &ErrSet, &tm);
  688. if (sel_ret < 0)
  689. {
  690. int err = errno;
  691. close(m_SocketTarget);
  692. m_SocketTarget = -1;
  693. close(Socketfd);
  694. Thread_UnLock();
  695. //string errinfo = FormatLinkError(WSAGetLastError());
  696. FERROR("socket error: %s (%d)", strerror(err), err);
  697. return 0;//do over
  698. }
  699. //timeout or ok
  700. if ((GetTickCount() - m_HeartBeatTimeout) > 20000)
  701. {
  702. //heartbeat
  703. //server -> client
  704. //it might cause the m_SocketTarget gets empty
  705. if (SendBlob(m_pszLocalBusId, 0, 0))
  706. {
  707. FINFO("Server:Send heart beat\n");
  708. }
  709. }
  710. }
  711. if (m_SocketTarget == -1)
  712. {
  713. int err = errno;
  714. close(Socketfd);
  715. Thread_UnLock();
  716. //string errinfo = FormatLinkError(WSAGetLastError());
  717. FERROR("socket error: %s (%d)", strerror(err), err);
  718. return 0;//do over
  719. }
  720. Thread_UnLock();
  721. }
  722. }
  723. }
  724. //exit
  725. FINFO("Server:Stop Thread.exiting...\n");
  726. DWORD ret = Thread_Lock(1000);
  727. if (ret == WAIT_OBJECT_0)
  728. {
  729. //got lock
  730. if (m_SocketTarget)
  731. {
  732. close(m_SocketTarget);
  733. m_SocketTarget = -1;
  734. close(Socketfd);
  735. }
  736. Thread_UnLock();
  737. }
  738. else
  739. {
  740. //no lock ,no fxxx...
  741. FINFO("P2PServer:Failed CLose Socket...\n");
  742. }
  743. return -1;
  744. }
  745. bool P2P_Module_Server::Exec()
  746. {
  747. int ret = 0;
  748. FINFO("P2PServer:init Entry\n");
  749. m_ConnectionReady->ResetEvent();
  750. if (WaitTheThreadEndSign(100) == true)
  751. {
  752. return false;
  753. }
  754. //init socket
  755. ret = TryInit();
  756. if (ret < 0)
  757. {
  758. return false;
  759. }
  760. else if (ret == 0)
  761. {
  762. return true;
  763. }
  764. FINFO("P2PServer:try init succeed\n");
  765. //try accept
  766. ret = TryAccept();
  767. if (ret < 0)
  768. {
  769. return false;
  770. }
  771. else if (ret == 0)
  772. {
  773. return true;
  774. }
  775. FINFO("P2PServer:try accept succeed\n");
  776. //protocal one: read client's BusId
  777. //read targetbusId
  778. DWORD MsgSize = 256;
  779. DWORD BlockSize = 0;
  780. memset(m_pszRemoteBusId, 0, MsgSize);
  781. if (ReadBlob(m_SocketTarget,m_pszRemoteBusId, MsgSize, NULL, BlockSize) == false)
  782. {
  783. //handshake failed
  784. close(Socketfd);
  785. return true;
  786. }
  787. FINFO("P2PServer:try handshake succeed.{$}\n", m_pszRemoteBusId);
  788. int on = 1;
  789. //tcp no delay
  790. setsockopt(m_SocketTarget, IPPROTO_TCP, TCP_NODELAY, (const char *)&on, sizeof(on));
  791. int optVal = 1024 * 1280 * 6 + 6000;
  792. socklen_t optLen = sizeof(int);
  793. setsockopt(m_SocketTarget, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, optLen);
  794. FINFO("try set send buff size:{$}\n", optVal);
  795. getsockopt(m_SocketTarget, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, &optLen);
  796. FINFO("after try set send buff size:{$}\n", optVal);
  797. m_ConnectionReady->SetEvent();
  798. m_HeartBeatTimeout = GetTickCount();
  799. //do while and try check connection + exitflag
  800. ret = TryCheckConnection();
  801. if (ret < 0)
  802. {
  803. return false;
  804. }
  805. return true;
  806. }
  807. bool P2P_Module_Server::SendBlob(const char *pMsg, const char *pBlock, DWORD size)
  808. {
  809. bool ret = P2P_Module_Base::SendBlob(m_SocketTarget, pMsg, pBlock, size);
  810. if (ret)
  811. {
  812. m_HeartBeatTimeout = GetTickCount();
  813. }
  814. return ret;
  815. }
  816. //---------------------------------------------------------------------------------------------------------------
  817. P2P_Module_Client::P2P_Module_Client()
  818. {
  819. m_pszMsgBuff = new char[1024 * 1024];
  820. m_pszBlockBuff = new unsigned char[1024 * 1024 * 4];
  821. }
  822. P2P_Module_Client::~P2P_Module_Client()
  823. {
  824. delete []m_pszMsgBuff;
  825. m_pszMsgBuff = NULL;
  826. delete[]m_pszBlockBuff;
  827. m_pszBlockBuff = NULL;
  828. }
  829. //connect
  830. bool P2P_Module_Client::ConnectP2P()
  831. {
  832. return StartThread();
  833. }
  834. //disconnect
  835. void P2P_Module_Client::Disconnect()
  836. {
  837. StopThread(P2P_CHECK_PERIOD + 1000);
  838. }
  839. //send block,one way to client no send to anyone
  840. //virtual bool SendBlob(char *pMsg, char *pBlock, DWORD size);
  841. //callback
  842. //void P2P_Module_Client::OnBlob(char *pMsg, char *pData, DWORD size)
  843. //{
  844. //
  845. //}
  846. bool P2P_Module_Client::TryConnectServer()
  847. {
  848. while (WaitTheThreadEndSign(100) == false)
  849. {
  850. Socketfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  851. if (Socketfd == -1)
  852. {
  853. return false;
  854. }
  855. //u_long iMode = 1;
  856. //int ret = ioctlsocket(Socketfd, FIONBIO, &iMode);//non-blocking mode
  857. //if (ret == 0)
  858. {
  859. sockaddr_in addrSrv;
  860. memset(&addrSrv, 0, sizeof(addrSrv));
  861. addrSrv.sin_family = AF_INET;
  862. addrSrv.sin_port = htons(m_Port);
  863. inet_pton(AF_INET, m_pszIpAddress, &addrSrv.sin_addr);
  864. if (connect(Socketfd, (struct sockaddr*)&addrSrv, sizeof(addrSrv)) == 0) {
  865. socket_set_keepalive(Socketfd);
  866. return true;
  867. }
  868. //else
  869. //{
  870. // // 超时时间
  871. // struct timeval tm;
  872. // tm.tv_sec = P2P_CHECK_PERIOD / 1000;
  873. // tm.tv_usec = P2P_CHECK_PERIOD % 1000;
  874. // int selret = 0;
  875. // fd_set set;
  876. // FD_ZERO(&set);
  877. // FD_SET(Socketfd, &set);
  878. // Sleep(1000);
  879. // selret = select(-1, NULL, &set, NULL, &tm);
  880. // if (selret > 0)
  881. // {
  882. // int error = -1;
  883. // int optLen = sizeof(int);
  884. // getsockopt(Socketfd, SOL_SOCKET, SO_ERROR, (char*)&error, &optLen);
  885. // // 之所以下面的程序不写成三目运算符的形式, 是为了更直观, 便于注释
  886. // if (0 == error)
  887. // {
  888. // //just try it
  889. // socket_set_keepalive(Socketfd);
  890. // return StartThread();
  891. // }
  892. // }
  893. //}
  894. }
  895. close(Socketfd);
  896. Socketfd = -1;
  897. }
  898. return false;
  899. }
  900. bool P2P_Module_Client::Exec()
  901. {
  902. if (TryConnectServer())
  903. {
  904. int on = 1;
  905. //tcp no delay
  906. setsockopt(Socketfd, IPPROTO_TCP, TCP_NODELAY, (const char *)&on, sizeof(on));
  907. int optVal = 1024 * 1280 * 6 + 6000;
  908. socklen_t optLen = sizeof(int);
  909. setsockopt(Socketfd, SOL_SOCKET, SO_RCVBUF, (char*)&optVal, optLen);
  910. FINFO("try set read buff size:{$}\n", optVal);
  911. getsockopt(Socketfd, SOL_SOCKET, SO_RCVBUF, (char*)&optVal, &optLen);
  912. FINFO("after try set read buff size:{$}\n", optVal);
  913. //send local BusId
  914. bool ret = P2P_Module_Base::SendBlob(Socketfd, m_pszLocalBusId, 0, 0, true);
  915. if (ret == false)
  916. {
  917. close(Socketfd);
  918. Socketfd = -1;
  919. FERROR("Send Handshake failed");
  920. return false;
  921. }
  922. FINFO("Handshake succeed:\n");
  923. //u_long iMode = 0;
  924. //ioctlsocket(Socketfd, FIONBIO, &iMode);//blocking mode
  925. DWORD MsgSize = 1024 * 1024;
  926. DWORD BlockSize = 1024 * 1024 * 4;
  927. while (ReadBlobSync(Socketfd, m_pszMsgBuff, MsgSize, (char*)m_pszBlockBuff, BlockSize))
  928. {
  929. FINFO("Read Block Data succeed:\n");
  930. if (BlockSize > 0)
  931. {
  932. //FINFO("Notify Block Data begin\n");
  933. OnBlob(m_pszMsgBuff, m_pszBlockBuff, BlockSize);
  934. //FINFO("Notify Block Data end\n");
  935. }
  936. else
  937. {
  938. //ignore others
  939. //printf("got Heartbeat--------------------------\n\n\n");
  940. FINFO("got Heartbeat--------------------------");
  941. }
  942. //reinit the buff
  943. MsgSize = 1024 * 1024;
  944. BlockSize = 1024 * 1024 * 4;
  945. }
  946. FINFO("Exit Thread--------------------------");
  947. if (Thread_Lock(1000) == WAIT_OBJECT_0)
  948. {
  949. if (Socketfd)
  950. {
  951. close(Socketfd);
  952. Socketfd = -1;
  953. }
  954. Thread_UnLock();
  955. }
  956. else
  957. {
  958. //no can do...
  959. }
  960. }
  961. if (WaitTheThreadEndSign(0))
  962. {
  963. return false;
  964. }
  965. return true;
  966. }