#include "P2PModule.h" #include #include #include #include #include "P2PModule.h" #include "DeviceBus.h" #include "common_api.h" //using namespace std; #define P2P_CHECK_PERIOD (2000) string FormatLinkError(int ErrorCode) { switch (ErrorCode) { case 7: return "sckOutOfMemory";// 7 内存不足 case 380: return "sckInvalidPropertyValue";// 380 属性值无效 case 394: return "sckGetNotSupported";// 394 属性不可读 case 383: return "sckGetNotSupported";// 383 属性是只读的 case 40006: return "sckBadState";// 40006 所请求的事务或请求本身的错误协议或者错误连接状态 case 40014: return "sckInvalidArg";// 40014 传递给函数的参数格式不确定,或者不在指定范围内 case 40017: return "sckSuccess";// 40017 成功 case 40018: return "sckUnsupported";// 40018 不支持的变量类型 case 40020: return "sckInvalidOp";// 40020 在当前状态下的无效操作 case 40021: return "sckOutOfRange";// 40021 参数越界 case 40026: return "sckWrongProtocol";// 40026 所请求的事务或请求本身的错误协议 case 10004: return "sckOpCanceled";// 10004 取消操作 case 10014: return "sckInvalidArgument";// 10014 所请求的地址是广播地址,但未设置标记 case 10035: return "sckWouldBlock";// 10035 套接字不成块,而指定操作将使之成块 case 10036: return "sckInProgress";// 10036 制造块的Winsock操作在进行之中 case 10037: return "sckAlreadyComplete";// 10037 完成操作。未进行制作块的操作 case 10038: return "sckNotSocket";// 10038 描述符不是套接字 case 10040: return "sckMsgTooBig";// 10040 数据太大,不适于缓冲区的要求,因而被截断 case 10043: return "sckPortNotSupported";// 10043 不支持指定的端口 case 10048: return "sckAddressInUse";// 10048 地址在使用中 case 10049: return "sckAddressNotAvailable";// 10049 来自本地机器的不可用地址 case 10050: return "sckNetworkSubsystemFailed";// 10050 网络子系统失败 case 10051: return "sckNetworkUnreachable";// 10051 当前不能从主机到达网络 case 10052: return "sckNetReset 10052";// 在设置SO_KEEPALIVE时连接超时 case 10053: return "sckConnectAborted";// 10053 由于超时或者其它失败而中止接连 case 10054: return "sckConnectionReset";// 10054 通过远端重新设置连接 case 10055: return "sckNoBufferSpace";// 10055 没有可用的缓存空间 case 10056: return "sckAlreadyConnected";// 10056 已连接的套接字 case 10057: return "sckNotConnected";// 10057 未接连套接字 case 10058: return "sckSockedShutdown";// 10058 已关闭套接字 case 10060: return "sckTimedout";// 10060 套接字超时 case 10061: return "sckConnectionRefused";// 10061 强行拒绝连接 case 10093: return "sckNotInitialized";// 10093 套接字没有初始化 case 11001: return "sckHostNotFound";// 11001 授权应答:未找到主机 case 11002: return "sckHostNotFoundTryAgain";// 11002 非授权应答:未找到主机,重试 case 11003: return "sckNonRecoverableError";// 11003 不可恢复的错误 case 11004: return "sckNoData";// 11004 无效名,对所请求的类型无数据记录 default: return string("Socket error %d", ErrorCode); } } int socket_set_keepalive(int &fd) { int keepalive = 1; int keepidle = 10; // 10秒无活动开始探测 int keepintvl = 5; // 5秒探测间隔 int keepcnt = 3; // 3次探测失败断开 if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(keepalive))) { return -1; } if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &keepidle, sizeof(keepidle))) { return -2; } if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &keepintvl, sizeof(keepintvl))) { return -3; } if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &keepcnt, sizeof(keepcnt))) { return -4; } return 0; } P2P_Module_Base::P2P_Module_Base() { m_Port = 0; m_MessageBuffSize = 1024 * 1024;//set 1M limits m_pszMessageBuff = new char[m_MessageBuffSize]; m_pMainDevBus = NULL; Socketfd = -1; m_SocketTarget = -1; m_pszIpAddress = new char[256]; m_pszLocalBusId = new char[256]; m_pszRemoteBusId = new char[256]; memset(m_pszIpAddress, 0, 256); memset(m_pszLocalBusId, 0, 256); memset(m_pszRemoteBusId, 0, 256); m_ConnectionReady = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET,false); } P2P_Module_Base::~P2P_Module_Base() { Disconnect(); delete[]m_pszIpAddress; delete[]m_pszLocalBusId; delete[]m_pszRemoteBusId; m_pszIpAddress = NULL; m_pszLocalBusId = NULL; m_pszRemoteBusId = NULL; delete[]m_pszMessageBuff; m_pszMessageBuff = NULL; } //init void P2P_Module_Base::InitP2P(const char *pszIp, const char *pszLocalBusId, bool AsServer, PVOID pParent) { strncpy(m_pszIpAddress, pszIp, 255); strncpy(m_pszLocalBusId, pszLocalBusId, 255); m_Port = (unsigned short)(CCOS_HW_CHANNEL/10) + 3;//80000 is out of WORD limits m_AsServer = AsServer; m_pMainDevBus = pParent; /*string logfile = GetProcessDirectory() + "\\logs\\Local_p2pDispathlog.log"; Logger *pLog = CreateLogger(); pLog->SetLogFilepath(logfile.c_str()); SetLogger(pLog);*/ } //connect bool P2P_Module_Base::ConnectP2P() { return false; } //disconnect void P2P_Module_Base::Disconnect() { } bool P2P_Module_Base::IsConnected() { return false; } //Blob的定义 //4字节 大长度:4字节Msg的长度:Msg内容,4字节Block的长度:Block内容 //必须存在项为 大长度,Msg长度,Block长度,如果无此内容,其值为0 bool P2P_Module_Base::SendRaw(int &sck, const char *pData, DWORD size, bool Sync) { uint32_t Sented = 0; int Sendonce = 0; while (Sented < size) { Thread_Lock(); if (WaitTheThreadEndSign(0)) { Thread_UnLock(); return false; } if (sck != -1) { if (!Sync) { fd_set WriteSet; struct timeval tm; tm.tv_sec = P2P_CHECK_PERIOD / 1000; tm.tv_usec = (P2P_CHECK_PERIOD % 1000) * 1000; FD_ZERO(&WriteSet); FD_SET(sck, &WriteSet); int sel_ret = select(sck + 1, NULL, &WriteSet, NULL, &tm); if (sel_ret <= 0) { // 处理错误或超时 } } Sendonce = send(sck, pData + Sented, size - Sented, 0); if (Sendonce > 0) { Sented += Sendonce; } else { close(sck); sck = -1; Thread_UnLock(); return false; } } else { Thread_UnLock(); return false; } Thread_UnLock(); } return true; } bool P2P_Module_Base::ReadRawSync(int &sck, char *pszBuff, DWORD ExpectedSize) { DWORD Total = 0; int Received = 0; //////mLog::FINFO("Enter ReadRawSync"); while (Total < ExpectedSize) { Thread_Lock(); if (WaitTheThreadEndSign(0) == false) { if (sck != -1) { //ok //////mLog::FINFO("br--"); Received = recv(sck, (char*)&pszBuff[Total], ExpectedSize - Total, 0); //////mLog::FINFO("ar--:{$}", Received); if (Received > 0) { Total += Received; } else if (Received == 0) // 对端关闭连接 { close(sck); sck = -1; Thread_UnLock(); //////mLog::FERROR("socket closed by peer\n"); return false; } else { int err = errno; close(sck); sck = -1; Thread_UnLock(); // 处理常见的非阻塞错误 if (err == EAGAIN || err == EWOULDBLOCK) { // 对于同步读取,这通常表示超时,但在同步模式中不应发生 //////mLog::FERROR("socket timeout: %d\n", err); } else { //////mLog::FERROR("socket error: %d\n", err); } //socket error,or closed //string errinfo = FormatLinkError(WSAGetLastError()); //////mLog::FERROR("socket error:{$}\n", errinfo.c_str()); return false; } } } Thread_UnLock(); } //////mLog::FINFO("Leave ReadRawSync OK"); return true; } bool P2P_Module_Base::ReadRaw(int &sck, char *pszBuff, DWORD ExpectedSize) { DWORD Total = 0; int Received = 0; //////mLog::FINFO("Enter ReadRaw"); while (Total < ExpectedSize) { Thread_Lock(); if (WaitTheThreadEndSign(0) == false) { if (sck != -1) { //check readable // 超时时间 struct timeval tm; fd_set ReadSet; tm.tv_sec = P2P_CHECK_PERIOD / 1000; tm.tv_usec = P2P_CHECK_PERIOD % 1000 * 1000; FD_ZERO(&ReadSet); FD_SET(sck, &ReadSet); //////mLog::FINFO("bs--"); int sel_ret = select(sck + 1, &ReadSet, NULL, NULL, &tm); //////mLog::FINFO("as--"); if (sel_ret > 0) { //ok //////mLog::FINFO("br--"); Received = recv(sck, (char*)&pszBuff[Total], ExpectedSize - Total, 0); //////mLog::FINFO("ar--:{$}", Received); if (Received > 0) { Total += Received; } else if (Received == 0) // 对端关闭连接 { close(sck); sck = -1; Thread_UnLock(); //////mLog::FERROR("socket closed by peer\n"); return false; } else { int err = errno; close(sck); sck = -1; Thread_UnLock(); if (err == EAGAIN || err == EWOULDBLOCK) { //////mLog::FERROR("socket timeout: %d\n", err); } else { //////mLog::FERROR("socket recv error: %s (%d)\n", strerror(err), err); } return false; } } else if (sel_ret < 0) { int err = errno; close(sck); sck = -1; Thread_UnLock(); //////mLog::FERROR("select error: %s (%d)\n", strerror(err), err); return false; } //timeout } else { //socket erroe Thread_UnLock(); //////mLog::FERROR("socket error:using empty sck.\n"); return false; } } else { if (sck) { close(sck); sck = -1; } Thread_UnLock(); //////mLog::FINFO("exit thread.close socket .\n"); return false; } Thread_UnLock(); } //////mLog::FINFO("Leave ReadRaw OK"); return true; } //send block bool P2P_Module_Base::SendBlob(int &sck, const char *pMsg, const char *pBlock, DWORD size, bool Sync) { bool ret = false; DWORD MsgLen = 0; //Logger *pLog = (Logger*)GetLogger(); if (pMsg) { MsgLen = (DWORD)strlen(pMsg); } DWORD BlockLen = size; DWORD TotalLen = MsgLen + BlockLen + 4 + 4; if (MsgLen > m_MessageBuffSize - 12) { //////mLog::FERROR( "SendBlob error.Message size is too big\n"); return false; } int MsgPartSize = 0; //copy total Len memcpy(&m_pszMessageBuff[MsgPartSize], (const char*)&TotalLen, sizeof(TotalLen)); MsgPartSize += sizeof(TotalLen); //copy msg Len memcpy(&m_pszMessageBuff[MsgPartSize], (const char*)&MsgLen, sizeof(MsgLen)); MsgPartSize += sizeof(MsgLen); //copy msg memcpy(&m_pszMessageBuff[MsgPartSize], pMsg, MsgLen); MsgPartSize += MsgLen; //copy BlockLen memcpy(&m_pszMessageBuff[MsgPartSize], (const char*)&BlockLen, sizeof(BlockLen)); MsgPartSize += sizeof(BlockLen); if (MsgLen > 0) { //////mLog::FINFO( "TotalLen = {$},MsgLen = {$},BlobLen = {$}.MsgInfo:{\n{$}}", TotalLen, MsgLen, BlockLen, pMsg); } //connection is steady Thread_Lock(); if (sck) { bool Sent = SendRaw(sck, m_pszMessageBuff, MsgPartSize); if (Sent) { ////mLog::FINFO( "write head part succeed.size = {$}", MsgPartSize); if (BlockLen > 0) { Sent = SendRaw(sck, (const char*)pBlock, BlockLen); if (Sent) { ////mLog::FINFO( "write Block data: {$} succeed\n", BlockLen); ////mLog::FINFO( "write Blob: succeed----------------\n"); ret = true; } } else { ////mLog::FINFO( "write MessageOnly: succeed----------------\n"); ret = true; } } } if (ret == false) { ////mLog::FERROR( "SendBlob error.close p2p socket\n"); } Thread_UnLock(); return ret; } void P2P_Module_Base::Quit() { } bool P2P_Module_Base::SendBlob(const char *pMsg, const char *pBlock, DWORD size) { bool ret = false; return ret; } bool P2P_Module_Base::ReadBlobSync(int &sck, char *pMsg, DWORD &MsgSize, char *pBlock, DWORD &size) { bool ret = false; DWORD TotalLen = 0; DWORD MsgLen = 0; DWORD BlockLen = 0; do { //head ret = ReadRawSync(sck, (char*)&TotalLen, sizeof(TotalLen)); if (ret == false || TotalLen < 8) { break; } ////mLog::FDEBUG("read head:{$}\n", TotalLen); //msg len ret = ReadRawSync(sck, (char*)&MsgLen, sizeof(MsgLen)); if (ret == false || MsgLen + 8 > TotalLen || (MsgLen > MsgSize)) { break; } ////mLog::FDEBUG("read MsgLen:{$}\n", MsgLen); //msg info if (MsgLen > 0) { ret = ReadRawSync(sck, pMsg, MsgLen); if (ret == false) { break; } pMsg[MsgLen] = 0; ////mLog::FDEBUG("read Msg:{$}\n", pMsg); } //Block Len ret = ReadRawSync(sck, (char*)&BlockLen, sizeof(BlockLen)); if (ret == false || BlockLen + MsgLen + 8 > TotalLen || (BlockLen > size)) { break; } ////mLog::FDEBUG("read BlockLen:{$}\n", BlockLen); //block info if (BlockLen > 0) { if (pBlock == NULL) { break; } ret = ReadRawSync(sck, pBlock, BlockLen); if (ret == false) { break; } ////mLog::FDEBUG("read Block data Succeed\n"); } MsgSize = MsgLen; size = BlockLen; ////mLog::FDEBUG("read Block Succeed-\n"); return true; } while (0); return false; } //Read block bool P2P_Module_Base::ReadBlob(int &sck, char *pMsg, DWORD &MsgSize, char *pBlock, DWORD &size) { bool ret = false; DWORD TotalLen = 0; DWORD MsgLen = 0; DWORD BlockLen = 0; do { //head ret = ReadRaw(sck,(char*)&TotalLen, sizeof(TotalLen)); if (ret == false || TotalLen < 8) { break; } ////mLog::FINFO("read head:{$}\n", TotalLen); //msg len ret = ReadRaw(sck, (char*)&MsgLen, sizeof(MsgLen)); if (ret == false || MsgLen + 8 > TotalLen || (MsgLen > MsgSize)) { break; } ////mLog::FINFO("read MsgLen:{$}\n", MsgLen); //msg info if (MsgLen > 0) { ret = ReadRaw(sck, pMsg, MsgLen); if (ret == false) { break; } pMsg[MsgLen] = 0; ////mLog::FINFO("read Msg:{$}\n", pMsg); } //Block Len ret = ReadRaw(sck, (char*)&BlockLen, sizeof(BlockLen)); if (ret == false || BlockLen + MsgLen + 8 > TotalLen || (BlockLen > size)) { break; } ////mLog::FINFO("read BlockLen:{$}\n", BlockLen); //block info if (BlockLen > 0) { if (pBlock == NULL) { break; } ret = ReadRaw(sck, pBlock, BlockLen); if (ret == false) { break; } ////mLog::FINFO("read Block data Succeed\n"); } MsgSize = MsgLen; size = BlockLen; ////mLog::FINFO("read Block Succeed-\n"); return true; } while (0); return false; } //callback void P2P_Module_Base::OnBlob(char *pMsg,unsigned char *pData, DWORD size) { //if (m_pMainDevBus) //{ // ((DeviceBUS*)m_pMainDevBus)->BlobDataArrived(pMsg, pData, size); //} } bool P2P_Module_Base::Exec() { return false; } bool P2P_Module_Base::IsTargetBusId(const char *pszTargetBusId) { bool ret = false; //Thread_Lock(); //only server can do this check.... if ((m_pszRemoteBusId != NULL) && (pszTargetBusId != NULL) && (m_AsServer == true)) { ret = (strcmp(m_pszRemoteBusId, pszTargetBusId) == 0); } //Thread_UnLock(); return ret; } //--------------------------------------------------------------------------------------------------------------- P2P_Module_Server::P2P_Module_Server() { m_HeartBeatTimeout = 0; } P2P_Module_Server::~P2P_Module_Server() { } //connect bool P2P_Module_Server::ConnectP2P() { //start thread return StartThread(); } void P2P_Module_Server::Quit() { StopThread(1000); } //disconnect void P2P_Module_Server::Disconnect() { //stop thread //不能给太长时间,因为内部用的Socket是阻塞模式 //StopThread(P2P_CHECK_PERIOD + 1000); // close socket in the internal thread } bool P2P_Module_Server::IsConnected() { if (m_SocketTarget != -1) { return m_ConnectionReady->Wait(0); } return false; } //-1:exit,0:socket error,1:succeed int P2P_Module_Server::TryInit() { while (WaitTheThreadEndSign(0) == false) { //init socket Socketfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (Socketfd == -1) { return 0; } sockaddr_in service; memset(&service, 0, sizeof(service)); service.sin_family = AF_INET; service.sin_addr.s_addr = inet_addr(m_pszIpAddress); service.sin_port = htons(m_Port); //bind int reuse = 1; setsockopt(Socketfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); if (bind(Socketfd, (struct sockaddr*)&service, sizeof(service)) < 0) { close(Socketfd); return 0; } //buff size //listen if (listen(Socketfd, SOMAXCONN) < 0) { close(Socketfd); return 0; } // 设置非阻塞 int flags = fcntl(Socketfd, F_GETFL, 0); fcntl(Socketfd, F_SETFL, flags | O_NONBLOCK); return 1; } return -1; } //-1:exit,0:socket error,1:succeed int P2P_Module_Server::TryAccept() { while (WaitTheThreadEndSign(0) == false) { int sClient; fd_set ReadSet; struct sockaddr_in remoteAddr; socklen_t nAddrlen = sizeof(remoteAddr); // 超时时间 struct timeval tm; tm.tv_sec = P2P_CHECK_PERIOD / 1000; tm.tv_usec = P2P_CHECK_PERIOD % 1000 * 1000; FD_ZERO(&ReadSet); FD_SET(Socketfd, &ReadSet); int max_fd = Socketfd + 1; int sel_ret = select(max_fd, &ReadSet, NULL, NULL, &tm); if (sel_ret > 0) { // 检查是否有连接请求 if (FD_ISSET(Socketfd, &ReadSet)) { //ok sClient = accept(Socketfd, (sockaddr*)&remoteAddr, &nAddrlen); if (sClient == -1) { int err = errno; if (err == EAGAIN || err == EWOULDBLOCK) { // 非阻塞模式下可能发生,继续尝试 continue; } // 其他错误,记录并返回 ////mLog::FERROR("accept failed: %s (%d)", strerror(err), err); return 0; } //accept succeed Thread_Lock(); m_SocketTarget = sClient; Thread_UnLock(); return 1; } } else if (sel_ret < 0) { int err = errno; if (err == EINTR) { // 被信号中断,继续尝试 continue; } ////mLog::FERROR("select failed: %s (%d)", strerror(err), err); close(Socketfd); return 0; } //timeout } return -1; } //-1:exit,0:socket error int P2P_Module_Server::TryCheckConnection() { while (WaitTheThreadEndSign(100) == false) { { DWORD ret = Thread_Lock(1000); if (ret == WAIT_OBJECT_0) { //got lock if (m_SocketTarget) { //check connection // 超时时间 fd_set ErrSet; struct timeval tm; tm.tv_sec = 0 / 1000; tm.tv_usec = 0 % 1000*1000; FD_ZERO(&ErrSet); FD_SET(Socketfd, &ErrSet); int sel_ret = select(-1, NULL, NULL, &ErrSet, &tm); if (sel_ret < 0) { int err = errno; close(m_SocketTarget); m_SocketTarget = -1; close(Socketfd); Thread_UnLock(); //string errinfo = FormatLinkError(WSAGetLastError()); ////mLog::FERROR("socket error: %s (%d)", strerror(err), err); return 0;//do over } //timeout or ok if ((GetTickCount() - m_HeartBeatTimeout) > 20000) { //heartbeat //server -> client //it might cause the m_SocketTarget gets empty if (SendBlob(m_pszLocalBusId, 0, 0)) { ////mLog::FINFO("Server:Send heart beat\n"); } } } if (m_SocketTarget == -1) { int err = errno; close(Socketfd); Thread_UnLock(); //string errinfo = FormatLinkError(WSAGetLastError()); ////mLog::FERROR("socket error: %s (%d)", strerror(err), err); return 0;//do over } Thread_UnLock(); } } } //exit ////mLog::FINFO("Server:Stop Thread.exiting...\n"); DWORD ret = Thread_Lock(1000); if (ret == WAIT_OBJECT_0) { //got lock if (m_SocketTarget) { close(m_SocketTarget); m_SocketTarget = -1; close(Socketfd); } Thread_UnLock(); } else { //no lock ,no fxxx... ////mLog::FINFO("P2PServer:Failed CLose Socket...\n"); } return -1; } bool P2P_Module_Server::Exec() { int ret = 0; ////mLog::FINFO("P2PServer:init Entry\n"); m_ConnectionReady->ResetEvent(); if (WaitTheThreadEndSign(100) == true) { return false; } //init socket ret = TryInit(); if (ret < 0) { return false; } else if (ret == 0) { return true; } ////mLog::FINFO("P2PServer:try init succeed\n"); //try accept ret = TryAccept(); if (ret < 0) { return false; } else if (ret == 0) { return true; } ////mLog::FINFO("P2PServer:try accept succeed\n"); //protocal one: read client's BusId //read targetbusId DWORD MsgSize = 256; DWORD BlockSize = 0; memset(m_pszRemoteBusId, 0, MsgSize); if (ReadBlob(m_SocketTarget,m_pszRemoteBusId, MsgSize, NULL, BlockSize) == false) { //handshake failed close(Socketfd); return true; } ////mLog::FINFO("P2PServer:try handshake succeed.{$}\n", m_pszRemoteBusId); int on = 1; //tcp no delay setsockopt(m_SocketTarget, IPPROTO_TCP, TCP_NODELAY, (const char *)&on, sizeof(on)); int optVal = 1024 * 1280 * 6 + 6000; socklen_t optLen = sizeof(int); setsockopt(m_SocketTarget, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, optLen); ////mLog::FINFO("try set send buff size:{$}\n", optVal); getsockopt(m_SocketTarget, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, &optLen); ////mLog::FINFO("after try set send buff size:{$}\n", optVal); m_ConnectionReady->SetEvent(); m_HeartBeatTimeout = GetTickCount(); //do while and try check connection + exitflag ret = TryCheckConnection(); if (ret < 0) { return false; } return true; } bool P2P_Module_Server::SendBlob(const char *pMsg, const char *pBlock, DWORD size) { bool ret = P2P_Module_Base::SendBlob(m_SocketTarget, pMsg, pBlock, size); if (ret) { m_HeartBeatTimeout = GetTickCount(); } return ret; } //--------------------------------------------------------------------------------------------------------------- P2P_Module_Client::P2P_Module_Client() { m_pszMsgBuff = new char[1024 * 1024]; m_pszBlockBuff = new unsigned char[1024 * 1024 * 4]; } P2P_Module_Client::~P2P_Module_Client() { delete []m_pszMsgBuff; m_pszMsgBuff = NULL; delete[]m_pszBlockBuff; m_pszBlockBuff = NULL; } //connect bool P2P_Module_Client::ConnectP2P() { return StartThread(); } //disconnect void P2P_Module_Client::Disconnect() { StopThread(P2P_CHECK_PERIOD + 1000); } //send block,one way to client no send to anyone //virtual bool SendBlob(char *pMsg, char *pBlock, DWORD size); //callback //void P2P_Module_Client::OnBlob(char *pMsg, char *pData, DWORD size) //{ // //} bool P2P_Module_Client::TryConnectServer() { while (WaitTheThreadEndSign(100) == false) { Socketfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (Socketfd == -1) { return false; } //u_long iMode = 1; //int ret = ioctlsocket(Socketfd, FIONBIO, &iMode);//non-blocking mode //if (ret == 0) { sockaddr_in addrSrv; memset(&addrSrv, 0, sizeof(addrSrv)); addrSrv.sin_family = AF_INET; addrSrv.sin_port = htons(m_Port); inet_pton(AF_INET, m_pszIpAddress, &addrSrv.sin_addr); if (connect(Socketfd, (struct sockaddr*)&addrSrv, sizeof(addrSrv)) == 0) { socket_set_keepalive(Socketfd); return true; } //else //{ // // 超时时间 // struct timeval tm; // tm.tv_sec = P2P_CHECK_PERIOD / 1000; // tm.tv_usec = P2P_CHECK_PERIOD % 1000; // int selret = 0; // fd_set set; // FD_ZERO(&set); // FD_SET(Socketfd, &set); // Sleep(1000); // selret = select(-1, NULL, &set, NULL, &tm); // if (selret > 0) // { // int error = -1; // int optLen = sizeof(int); // getsockopt(Socketfd, SOL_SOCKET, SO_ERROR, (char*)&error, &optLen); // // 之所以下面的程序不写成三目运算符的形式, 是为了更直观, 便于注释 // if (0 == error) // { // //just try it // socket_set_keepalive(Socketfd); // return StartThread(); // } // } //} } close(Socketfd); Socketfd = -1; } return false; } bool P2P_Module_Client::Exec() { if (TryConnectServer()) { int on = 1; //tcp no delay setsockopt(Socketfd, IPPROTO_TCP, TCP_NODELAY, (const char *)&on, sizeof(on)); int optVal = 1024 * 1280 * 6 + 6000; socklen_t optLen = sizeof(int); setsockopt(Socketfd, SOL_SOCKET, SO_RCVBUF, (char*)&optVal, optLen); ////mLog::FINFO("try set read buff size:{$}\n", optVal); getsockopt(Socketfd, SOL_SOCKET, SO_RCVBUF, (char*)&optVal, &optLen); ////mLog::FINFO("after try set read buff size:{$}\n", optVal); //send local BusId bool ret = P2P_Module_Base::SendBlob(Socketfd, m_pszLocalBusId, 0, 0, true); if (ret == false) { close(Socketfd); Socketfd = -1; ////mLog::FERROR("Send Handshake failed"); return false; } ////mLog::FINFO("Handshake succeed:\n"); //u_long iMode = 0; //ioctlsocket(Socketfd, FIONBIO, &iMode);//blocking mode DWORD MsgSize = 1024 * 1024; DWORD BlockSize = 1024 * 1024 * 4; while (ReadBlobSync(Socketfd, m_pszMsgBuff, MsgSize, (char*)m_pszBlockBuff, BlockSize)) { ////mLog::FINFO("Read Block Data succeed:\n"); if (BlockSize > 0) { //////mLog::FINFO("Notify Block Data begin\n"); OnBlob(m_pszMsgBuff, m_pszBlockBuff, BlockSize); //////mLog::FINFO("Notify Block Data end\n"); } else { //ignore others //printf("got Heartbeat--------------------------\n\n\n"); ////mLog::FINFO("got Heartbeat--------------------------"); } //reinit the buff MsgSize = 1024 * 1024; BlockSize = 1024 * 1024 * 4; } ////mLog::FINFO("Exit Thread--------------------------"); if (Thread_Lock(1000) == WAIT_OBJECT_0) { if (Socketfd) { close(Socketfd); Socketfd = -1; } Thread_UnLock(); } else { //no can do... } } if (WaitTheThreadEndSign(0)) { return false; } return true; }