// P2PModule.cpp : 定义 DLL 应用程序的导出函数。 // #include "stdafx.h" #include #include #include "P2PModule.h" #include "DeviceBus.h" #include "Logger.h" #include "common_api.h" using namespace std; #define P2P_CHECK_PERIOD (2000) string FormatLinkError(int ErrorCode) { switch (ErrorCode) { case 7: return "sckOutOfMemory";// 7 内存不足 case 380: return "sckInvalidPropertyValue";// 380 属性值无效 case 394: return "sckGetNotSupported";// 394 属性不可读 case 383: return "sckGetNotSupported";// 383 属性是只读的 case 40006: return "sckBadState";// 40006 所请求的事务或请求本身的错误协议或者错误连接状态 case 40014: return "sckInvalidArg";// 40014 传递给函数的参数格式不确定,或者不在指定范围内 case 40017: return "sckSuccess";// 40017 成功 case 40018: return "sckUnsupported";// 40018 不支持的变量类型 case 40020: return "sckInvalidOp";// 40020 在当前状态下的无效操作 case 40021: return "sckOutOfRange";// 40021 参数越界 case 40026: return "sckWrongProtocol";// 40026 所请求的事务或请求本身的错误协议 case 10004: return "sckOpCanceled";// 10004 取消操作 case 10014: return "sckInvalidArgument";// 10014 所请求的地址是广播地址,但未设置标记 case 10035: return "sckWouldBlock";// 10035 套接字不成块,而指定操作将使之成块 case 10036: return "sckInProgress";// 10036 制造块的Winsock操作在进行之中 case 10037: return "sckAlreadyComplete";// 10037 完成操作。未进行制作块的操作 case 10038: return "sckNotSocket";// 10038 描述符不是套接字 case 10040: return "sckMsgTooBig";// 10040 数据太大,不适于缓冲区的要求,因而被截断 case 10043: return "sckPortNotSupported";// 10043 不支持指定的端口 case 10048: return "sckAddressInUse";// 10048 地址在使用中 case 10049: return "sckAddressNotAvailable";// 10049 来自本地机器的不可用地址 case 10050: return "sckNetworkSubsystemFailed";// 10050 网络子系统失败 case 10051: return "sckNetworkUnreachable";// 10051 当前不能从主机到达网络 case 10052: return "sckNetReset 10052";// 在设置SO_KEEPALIVE时连接超时 case 10053: return "sckConnectAborted";// 10053 由于超时或者其它失败而中止接连 case 10054: return "sckConnectionReset";// 10054 通过远端重新设置连接 case 10055: return "sckNoBufferSpace";// 10055 没有可用的缓存空间 case 10056: return "sckAlreadyConnected";// 10056 已连接的套接字 case 10057: return "sckNotConnected";// 10057 未接连套接字 case 10058: return "sckSockedShutdown";// 10058 已关闭套接字 case 10060: return "sckTimedout";// 10060 套接字超时 case 10061: return "sckConnectionRefused";// 10061 强行拒绝连接 case 10093: return "sckNotInitialized";// 10093 套接字没有初始化 case 11001: return "sckHostNotFound";// 11001 授权应答:未找到主机 case 11002: return "sckHostNotFoundTryAgain";// 11002 非授权应答:未找到主机,重试 case 11003: return "sckNonRecoverableError";// 11003 不可恢复的错误 case 11004: return "sckNoData";// 11004 无效名,对所请求的类型无数据记录 default: return string("Socket error %d", ErrorCode); } } int socket_set_keepalive(SOCKET &fd) { //struct tcp_keepalive kavars[1] = { // 1, // 10 * 1000, /* 10 seconds */ // 5 * 1000 /* 5 seconds */ //}; ///* Set: use keepalive on fd */ //int alive = 1; //int ret; //if (setsockopt // (fd, SOL_SOCKET, SO_KEEPALIVE, (const char *)&alive, // sizeof alive) != 0) //{ // //printf("Set keep alive error1: %s.\n", strerror (errno)); // return -1; //} //if (WSAIoctl(fd, SIO_KEEPALIVE_VALS, kavars, sizeof(kavars), NULL, sizeof (int), (LPDWORD)&ret, NULL, // NULL) != 0) //{ // //printf("Set keep alive error2: %s.\n", strerror (WSAGetLastError ())); // return -2; //} return 0; } P2P_Module_Base::P2P_Module_Base() { m_Port = 0; m_MessageBuffSize = 1024 * 1024;//set 1M limits m_pszMessageBuff = new char[m_MessageBuffSize]; m_pMainDevBus = NULL; Socketfd = NULL; m_SocketTarget = NULL; m_pszIpAddress = new char[256]; m_pszLocalBusId = new char[256]; m_pszRemoteBusId = new char[256]; memset(m_pszIpAddress, 0, 256); memset(m_pszLocalBusId, 0, 256); memset(m_pszRemoteBusId, 0, 256); m_ConnectionReady = CreateEvent(0, 1, 0, 0); } P2P_Module_Base::~P2P_Module_Base() { Disconnect(); delete[]m_pszIpAddress; delete[]m_pszLocalBusId; delete[]m_pszRemoteBusId; m_pszIpAddress = NULL; m_pszLocalBusId = NULL; m_pszRemoteBusId = NULL; CloseHandle(m_ConnectionReady); delete[]m_pszMessageBuff; m_pszMessageBuff = NULL; } //init void P2P_Module_Base::InitP2P(const char *pszIp, const char *pszLocalBusId, bool AsServer, PVOID pParent) { strcpy_s(m_pszIpAddress,256, pszIp); strcpy_s(m_pszLocalBusId,256, pszLocalBusId); m_Port = (WORD)(CCOS_HW_CHANNEL/10) + 3;//80000 is out of WORD limits m_AsServer = AsServer; m_pMainDevBus = pParent; string logfile = GetProcessDirectory() + "\\logs\\Local_p2pDispathlog.log"; Logger *pLog = CreateLogger(); pLog->SetLogFilepath(logfile.c_str()); SetLogger(pLog); } //connect bool P2P_Module_Base::ConnectP2P() { return false; } //disconnect void P2P_Module_Base::Disconnect() { } bool P2P_Module_Base::IsConnected() { return false; } //Blob的定义 //4字节 大长度:4字节Msg的长度:Msg内容,4字节Block的长度:Block内容 //必须存在项为 大长度,Msg长度,Block长度,如果无此内容,其值为0 bool P2P_Module_Base::SendRaw(SOCKET &sck, const char *pData, DWORD size, bool Sync) { DWORD Sented = 0; int Sendonce = 0; Logger *pLog = (Logger*)GetLogger(); PRINTA_TRACE(pLog,"Enter SendRaw"); while (Sented < size) { Thread_Lock(); if (WaitTheThreadEndSign(0) == false) { if (sck) { int sel_ret = 1; if (Sync == false) { //check readable // 超时时间 struct timeval tm; FD_SET WriteSet; tm.tv_sec = P2P_CHECK_PERIOD / 1000; tm.tv_usec = P2P_CHECK_PERIOD % 1000; FD_ZERO(&WriteSet); FD_SET(sck, &WriteSet); //PRINTA_TRACE(pLog,"bs--"); sel_ret = select(-1, NULL, &WriteSet, NULL, &tm); } //PRINTA_TRACE(pLog,"as--"); if (sel_ret > 0) { //ok //PRINTA_TRACE(pLog,"bsend--"); Sendonce = send(sck, (char*)&pData[Sented], size - Sented, 0); //PRINTA_TRACE(pLog,"asend--%d", Sendonce); if (Sendonce > 0) { Sented += Sendonce; } else { //socket error,or closed string errinfo = FormatLinkError(WSAGetLastError()); PRINTA_ERROR(pLog, "socket error:%s\n", errinfo.c_str()); closesocket(sck); sck = NULL; Thread_UnLock(); return false; } } else if (sel_ret < 0) { //socket error,or closed string errinfo = FormatLinkError(WSAGetLastError()); PRINTA_ERROR(pLog, "socket error:%s\n", errinfo.c_str()); closesocket(sck); sck = NULL; Thread_UnLock(); return false; } //timeout } else { //socket erroe Thread_UnLock(); PRINTA_ERROR(pLog, "socket error:using empty socket handle\n"); return false; } } else { if (sck) { closesocket(sck); sck = NULL; } Thread_UnLock(); PRINTA_INFO(pLog, "exit thread:close socket\n"); return false; } Thread_UnLock(); } TPRINTA_TRACE("Leave SendRaw OK"); return true; } bool P2P_Module_Base::ReadRawSync(SOCKET &sck, char *pszBuff, DWORD ExpectedSize) { DWORD Total = 0; int Received = 0; TPRINTA_TRACE("Enter ReadRawSync"); while (Total < ExpectedSize) { Thread_Lock(); if (WaitTheThreadEndSign(0) == false) { if (sck) { //ok //TPRINTA_TRACE("br--"); Received = recv(sck, (char*)&pszBuff[Total], ExpectedSize - Total, 0); //TPRINTA_TRACE("ar--:%d", Received); if (Received > 0) { Total += Received; } else { closesocket(sck); sck = NULL; Thread_UnLock(); //socket error,or closed string errinfo = FormatLinkError(WSAGetLastError()); TPRINTA_ERROR("socket error:%s\n", errinfo.c_str()); return false; } } } Thread_UnLock(); } TPRINTA_TRACE("Leave ReadRawSync OK"); return true; } bool P2P_Module_Base::ReadRaw(SOCKET &sck, char *pszBuff, DWORD ExpectedSize) { DWORD Total = 0; int Received = 0; TPRINTA_TRACE("Enter ReadRaw"); while (Total < ExpectedSize) { Thread_Lock(); if (WaitTheThreadEndSign(0) == false) { if (sck) { //check readable // 超时时间 struct timeval tm; FD_SET ReadSet; tm.tv_sec = P2P_CHECK_PERIOD / 1000; tm.tv_usec = P2P_CHECK_PERIOD % 1000; FD_ZERO(&ReadSet); FD_SET(sck, &ReadSet); //TPRINTA_TRACE("bs--"); int sel_ret = select(-1, &ReadSet, NULL, NULL, &tm); //TPRINTA_TRACE("as--"); if (sel_ret > 0) { //ok //TPRINTA_TRACE("br--"); Received = recv(sck, (char*)&pszBuff[Total], ExpectedSize - Total, 0); //TPRINTA_TRACE("ar--:%d", Received); if (Received > 0) { Total += Received; } else { closesocket(sck); sck = NULL; Thread_UnLock(); //socket error,or closed string errinfo = FormatLinkError(WSAGetLastError()); TPRINTA_ERROR("socket error:%s\n", errinfo.c_str()); return false; } } else if (sel_ret < 0) { closesocket(sck); sck = NULL; Thread_UnLock(); string errinfo = FormatLinkError(WSAGetLastError()); TPRINTA_ERROR("socket error:%s\n", errinfo.c_str()); //socket error,or closed return false; } //timeout } else { //socket erroe Thread_UnLock(); TPRINTA_ERROR("socket error:using empty sck.\n"); return false; } } else { if (sck) { closesocket(sck); sck = NULL; } Thread_UnLock(); TPRINTA_INFO("exit thread.close socket .\n"); return false; } Thread_UnLock(); } TPRINTA_TRACE("Leave ReadRaw OK"); return true; } //send block bool P2P_Module_Base::SendBlob(SOCKET &sck, const char *pMsg, const char *pBlock, DWORD size, bool Sync) { bool ret = false; DWORD MsgLen = 0; Logger *pLog = (Logger*)GetLogger(); if (pMsg) { MsgLen = (DWORD)strlen(pMsg); } DWORD BlockLen = size; DWORD TotalLen = MsgLen + BlockLen + 4 + 4; if (MsgLen > m_MessageBuffSize - 12) { PRINTA_ERROR(pLog, "SendBlob error.Message size is too big\n"); return false; } int MsgPartSize = 0; //copy total Len memcpy(&m_pszMessageBuff[MsgPartSize], (const char*)&TotalLen, sizeof(TotalLen)); MsgPartSize += sizeof(TotalLen); //copy msg Len memcpy(&m_pszMessageBuff[MsgPartSize], (const char*)&MsgLen, sizeof(MsgLen)); MsgPartSize += sizeof(MsgLen); //copy msg memcpy(&m_pszMessageBuff[MsgPartSize], pMsg, MsgLen); MsgPartSize += MsgLen; //copy BlockLen memcpy(&m_pszMessageBuff[MsgPartSize], (const char*)&BlockLen, sizeof(BlockLen)); MsgPartSize += sizeof(BlockLen); if (MsgLen > 0) { PRINTA_TRACE(pLog, "TotalLen = %d,MsgLen = %d,BlobLen = %d.MsgInfo:{\n%s}", TotalLen, MsgLen, BlockLen, pMsg); } //connection is steady Thread_Lock(); if (sck) { bool Sent = SendRaw(sck, m_pszMessageBuff, MsgPartSize); if (Sent) { PRINTA_TRACE(pLog, "write head part succeed.size = %d", MsgPartSize); if (BlockLen > 0) { Sent = SendRaw(sck, (const char*)pBlock, BlockLen); if (Sent) { PRINTA_TRACE(pLog, "write Block data: %d succeed\n", BlockLen); PRINTA_TRACE(pLog, "write Blob: succeed----------------\n"); ret = true; } } else { PRINTA_TRACE(pLog, "write MessageOnly: succeed----------------\n"); ret = true; } } } if (ret == false) { PRINTA_ERROR(pLog, "SendBlob error.close p2p socket\n"); } Thread_UnLock(); return ret; } void P2P_Module_Base::Quit() { } bool P2P_Module_Base::SendBlob(const char *pMsg, const char *pBlock, DWORD size) { bool ret = false; return ret; } bool P2P_Module_Base::ReadBlobSync(SOCKET &sck, char *pMsg, DWORD &MsgSize, char *pBlock, DWORD &size) { bool ret = false; DWORD TotalLen = 0; DWORD MsgLen = 0; DWORD BlockLen = 0; do { //head ret = ReadRawSync(sck, (char*)&TotalLen, sizeof(TotalLen)); if (ret == false || TotalLen < 8) { break; } TPRINTA_DEBUG("read head:%d\n", TotalLen); //msg len ret = ReadRawSync(sck, (char*)&MsgLen, sizeof(MsgLen)); if (ret == false || MsgLen + 8 > TotalLen || (MsgLen > MsgSize)) { break; } TPRINTA_DEBUG("read MsgLen:%d\n", MsgLen); //msg info if (MsgLen > 0) { ret = ReadRawSync(sck, pMsg, MsgLen); if (ret == false) { break; } pMsg[MsgLen] = 0; TPRINTA_DEBUG("read Msg:%s\n", pMsg); } //Block Len ret = ReadRawSync(sck, (char*)&BlockLen, sizeof(BlockLen)); if (ret == false || BlockLen + MsgLen + 8 > TotalLen || (BlockLen > size)) { break; } TPRINTA_DEBUG("read BlockLen:%d\n", BlockLen); //block info if (BlockLen > 0) { if (pBlock == NULL) { break; } ret = ReadRawSync(sck, pBlock, BlockLen); if (ret == false) { break; } TPRINTA_DEBUG("read Block data Succeed\n"); } MsgSize = MsgLen; size = BlockLen; TPRINTA_DEBUG("read Block Succeed-\n"); return true; } while (0); return false; } //Read block bool P2P_Module_Base::ReadBlob(SOCKET &sck, char *pMsg, DWORD &MsgSize, char *pBlock, DWORD &size) { bool ret = false; DWORD TotalLen = 0; DWORD MsgLen = 0; DWORD BlockLen = 0; do { //head ret = ReadRaw(sck,(char*)&TotalLen, sizeof(TotalLen)); if (ret == false || TotalLen < 8) { break; } TPRINTA_TRACE("read head:%d\n", TotalLen); //msg len ret = ReadRaw(sck, (char*)&MsgLen, sizeof(MsgLen)); if (ret == false || MsgLen + 8 > TotalLen || (MsgLen > MsgSize)) { break; } TPRINTA_TRACE("read MsgLen:%d\n", MsgLen); //msg info if (MsgLen > 0) { ret = ReadRaw(sck, pMsg, MsgLen); if (ret == false) { break; } pMsg[MsgLen] = 0; TPRINTA_TRACE("read Msg:%s\n", pMsg); } //Block Len ret = ReadRaw(sck, (char*)&BlockLen, sizeof(BlockLen)); if (ret == false || BlockLen + MsgLen + 8 > TotalLen || (BlockLen > size)) { break; } TPRINTA_TRACE("read BlockLen:%d\n", BlockLen); //block info if (BlockLen > 0) { if (pBlock == NULL) { break; } ret = ReadRaw(sck, pBlock, BlockLen); if (ret == false) { break; } TPRINTA_TRACE("read Block data Succeed\n"); } MsgSize = MsgLen; size = BlockLen; TPRINTA_TRACE("read Block Succeed-\n"); return true; } while (0); return false; } //callback void P2P_Module_Base::OnBlob(char *pMsg,unsigned char *pData, DWORD size) { if (m_pMainDevBus) { ((DeviceBUS*)m_pMainDevBus)->BlobDataArrived(pMsg, pData, size); } } bool P2P_Module_Base::Exec() { return false; } bool P2P_Module_Base::IsTargetBusId(const char *pszTargetBusId) { bool ret = false; //Thread_Lock(); //only server can do this check.... if ((m_pszRemoteBusId != NULL) && (pszTargetBusId != NULL) && (m_AsServer == true)) { ret = (strcmp(m_pszRemoteBusId, pszTargetBusId) == 0); } //Thread_UnLock(); return ret; } //--------------------------------------------------------------------------------------------------------------- P2P_Module_Server::P2P_Module_Server() { m_HeartBeatTimeout = 0; } P2P_Module_Server::~P2P_Module_Server() { } //connect bool P2P_Module_Server::ConnectP2P() { //start thread return StartThread(); } void P2P_Module_Server::Quit() { StopThread(1000); } //disconnect void P2P_Module_Server::Disconnect() { //stop thread //不能给太长时间,因为内部用的Socket是阻塞模式 //StopThread(P2P_CHECK_PERIOD + 1000); // close socket in the internal thread } bool P2P_Module_Server::IsConnected() { if (m_SocketTarget != NULL) { return (WaitForSingleObject(m_ConnectionReady, 0) == WAIT_OBJECT_0); } return false; } //-1:exit,0:socket error,1:succeed int P2P_Module_Server::TryInit() { while (WaitTheThreadEndSign(0) == false) { //init socket Socketfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (Socketfd == INVALID_SOCKET) { return 0; } sockaddr_in service; service.sin_family = AF_INET; service.sin_addr.s_addr = inet_addr(m_pszIpAddress); service.sin_port = htons(m_Port); //bind if (bind(Socketfd, (LPSOCKADDR)&service, sizeof(service)) == SOCKET_ERROR) { closesocket(Socketfd); return 0; } //buff size //listen if (listen(Socketfd, SOMAXCONN) == SOCKET_ERROR) { closesocket(Socketfd); return 0; } // 设置为非阻塞模式 ULONG NonBlock = 1; if (ioctlsocket(Socketfd, FIONBIO, &NonBlock) == SOCKET_ERROR) { //printf("ioctlsocket() failed with error %d\n", WSAGetLastError()); closesocket(Socketfd); return 0; } return 1; } return -1; } //-1:exit,0:socket error,1:succeed int P2P_Module_Server::TryAccept() { while (WaitTheThreadEndSign(0) == false) { SOCKET sClient; FD_SET ReadSet; sockaddr_in remoteAddr; int nAddrlen = sizeof(remoteAddr); // 超时时间 struct timeval tm; tm.tv_sec = P2P_CHECK_PERIOD / 1000; tm.tv_usec = P2P_CHECK_PERIOD % 1000; FD_ZERO(&ReadSet); FD_SET(Socketfd, &ReadSet); int sel_ret = select(-1, &ReadSet,NULL, NULL, &tm); if (sel_ret > 0) { //ok sClient = accept(Socketfd, (SOCKADDR *)&remoteAddr, &nAddrlen); if (sClient == INVALID_SOCKET) { continue; } //accept succeed Thread_Lock(); m_SocketTarget = sClient; Thread_UnLock(); return 1; } else if (sel_ret < 0) { closesocket(Socketfd); return 0; } //timeout } return -1; } //-1:exit,0:socket error int P2P_Module_Server::TryCheckConnection() { while (WaitTheThreadEndSign(100) == false) { { DWORD ret = Thread_Lock(1000); if (ret == WAIT_OBJECT_0) { //got lock if (m_SocketTarget) { //check connection // 超时时间 FD_SET ErrSet; struct timeval tm; tm.tv_sec = 0 / 1000; tm.tv_usec = 0 % 1000; FD_ZERO(&ErrSet); FD_SET(Socketfd, &ErrSet); int sel_ret = select(-1, NULL, NULL, &ErrSet, &tm); if (sel_ret < 0) { closesocket(m_SocketTarget); m_SocketTarget = NULL; closesocket(Socketfd); Thread_UnLock(); string errinfo = FormatLinkError(WSAGetLastError()); TPRINTA_ERROR("socket error:%s\n", errinfo.c_str()); return 0;//do over } //timeout or ok if ((GetTickCount() - m_HeartBeatTimeout) > 20000) { //heartbeat //server -> client //it might cause the m_SocketTarget gets empty if (SendBlob(m_pszLocalBusId, 0, 0)) { TPRINTA_INFO("Server:Send heart beat\n"); } } } if (m_SocketTarget == NULL) { closesocket(Socketfd); Thread_UnLock(); string errinfo = FormatLinkError(WSAGetLastError()); TPRINTA_ERROR("socket error:%s\n", errinfo.c_str()); return 0;//do over } Thread_UnLock(); } } } //exit TPRINTA_INFO("Server:Stop Thread.exiting...\n"); DWORD ret = Thread_Lock(1000); if (ret == WAIT_OBJECT_0) { //got lock if (m_SocketTarget) { closesocket(m_SocketTarget); m_SocketTarget = NULL; closesocket(Socketfd); } Thread_UnLock(); } else { //no lock ,no fxxx... TPRINTA_INFO("P2PServer:Failed CLose Socket...\n"); } return -1; } bool P2P_Module_Server::Exec() { int ret = 0; TPRINTA_INFO("P2PServer:init Entry\n"); ResetEvent(m_ConnectionReady); if (WaitTheThreadEndSign(100) == true) { return false; } //init socket ret = TryInit(); if (ret < 0) { return false; } else if (ret == 0) { return true; } TPRINTA_INFO("P2PServer:try init succeed\n"); //try accept ret = TryAccept(); if (ret < 0) { return false; } else if (ret == 0) { return true; } TPRINTA_INFO("P2PServer:try accept succeed\n"); //protocal one: read client's BusId //read targetbusId DWORD MsgSize = 256; DWORD BlockSize = 0; memset(m_pszRemoteBusId, 0, MsgSize); if (ReadBlob(m_SocketTarget,m_pszRemoteBusId, MsgSize, NULL, BlockSize) == false) { //handshake failed closesocket(Socketfd); return true; } TPRINTA_INFO("P2PServer:try handshake succeed.%s\n", m_pszRemoteBusId); int on = 1; //tcp no delay setsockopt(m_SocketTarget, IPPROTO_TCP, TCP_NODELAY, (const char *)&on, sizeof(on)); int optVal = 1024 * 1280 * 6 + 6000; int optLen = sizeof(int); setsockopt(m_SocketTarget, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, optLen); TPRINTA_INFO("try set send buff size:%d\n", optVal); getsockopt(m_SocketTarget, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, &optLen); TPRINTA_INFO("after try set send buff size:%d\n", optVal); SetEvent(m_ConnectionReady); m_HeartBeatTimeout = GetTickCount(); //do while and try check connection + exitflag ret = TryCheckConnection(); if (ret < 0) { return false; } return true; } bool P2P_Module_Server::SendBlob(const char *pMsg, const char *pBlock, DWORD size) { bool ret = P2P_Module_Base::SendBlob(m_SocketTarget, pMsg, pBlock, size); if (ret) { m_HeartBeatTimeout = GetTickCount(); } return ret; } //--------------------------------------------------------------------------------------------------------------- P2P_Module_Client::P2P_Module_Client() { m_pszMsgBuff = new char[1024 * 1024]; m_pszBlockBuff = new unsigned char[1024 * 1024 * 4]; } P2P_Module_Client::~P2P_Module_Client() { delete []m_pszMsgBuff; m_pszMsgBuff = NULL; delete[]m_pszBlockBuff; m_pszBlockBuff = NULL; } //connect bool P2P_Module_Client::ConnectP2P() { return StartThread(); } //disconnect void P2P_Module_Client::Disconnect() { StopThread(P2P_CHECK_PERIOD + 1000); } //send block,one way to client no send to anyone //virtual bool SendBlob(char *pMsg, char *pBlock, DWORD size); //callback //void P2P_Module_Client::OnBlob(char *pMsg, char *pData, DWORD size) //{ // //} bool P2P_Module_Client::TryConnectServer() { while (WaitTheThreadEndSign(100) == false) { Socketfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (Socketfd == INVALID_SOCKET) { return false; } //u_long iMode = 1; //int ret = ioctlsocket(Socketfd, FIONBIO, &iMode);//non-blocking mode //if (ret == 0) { SOCKADDR_IN addrSrv; addrSrv.sin_addr.S_un.S_addr = inet_addr(m_pszIpAddress);//服务器端的地址 addrSrv.sin_family = AF_INET; addrSrv.sin_port = htons(m_Port); int Error = connect(Socketfd, (SOCKADDR*)&addrSrv, sizeof(SOCKADDR)); if (!Error) { //just try it socket_set_keepalive(Socketfd); return true; } //else //{ // // 超时时间 // struct timeval tm; // tm.tv_sec = P2P_CHECK_PERIOD / 1000; // tm.tv_usec = P2P_CHECK_PERIOD % 1000; // int selret = 0; // fd_set set; // FD_ZERO(&set); // FD_SET(Socketfd, &set); // Sleep(1000); // selret = select(-1, NULL, &set, NULL, &tm); // if (selret > 0) // { // int error = -1; // int optLen = sizeof(int); // getsockopt(Socketfd, SOL_SOCKET, SO_ERROR, (char*)&error, &optLen); // // 之所以下面的程序不写成三目运算符的形式, 是为了更直观, 便于注释 // if (0 == error) // { // //just try it // socket_set_keepalive(Socketfd); // return StartThread(); // } // } //} } closesocket(Socketfd); Socketfd = NULL; } return false; } bool P2P_Module_Client::Exec() { if (TryConnectServer()) { int on = 1; //tcp no delay setsockopt(Socketfd, IPPROTO_TCP, TCP_NODELAY, (const char *)&on, sizeof(on)); int optVal = 1024 * 1280 * 6 + 6000; int optLen = sizeof(int); setsockopt(Socketfd, SOL_SOCKET, SO_RCVBUF, (char*)&optVal, optLen); TPRINTA_INFO("try set read buff size:%d\n", optVal); getsockopt(Socketfd, SOL_SOCKET, SO_RCVBUF, (char*)&optVal, &optLen); TPRINTA_INFO("after try set read buff size:%d\n", optVal); //send local BusId bool ret = P2P_Module_Base::SendBlob(Socketfd, m_pszLocalBusId, 0, 0, true); if (ret == false) { closesocket(Socketfd); Socketfd = NULL; TPRINTA_ERROR("Send Handshake failed"); return false; } TPRINTA_INFO("Handshake succeed:\n"); //u_long iMode = 0; //ioctlsocket(Socketfd, FIONBIO, &iMode);//blocking mode DWORD MsgSize = 1024 * 1024; DWORD BlockSize = 1024 * 1024 * 4; while (ReadBlobSync(Socketfd, m_pszMsgBuff, MsgSize, (char*)m_pszBlockBuff, BlockSize)) { TPRINTA_TRACE("Read Block Data succeed:\n"); if (BlockSize > 0) { //TPRINTA_TRACE("Notify Block Data begin\n"); OnBlob(m_pszMsgBuff, m_pszBlockBuff, BlockSize); //TPRINTA_TRACE("Notify Block Data end\n"); } else { //ignore others //printf("got Heartbeat--------------------------\n\n\n"); TPRINTA_INFO("got Heartbeat--------------------------"); } //reinit the buff MsgSize = 1024 * 1024; BlockSize = 1024 * 1024 * 4; } TPRINTA_INFO("Exit Thread--------------------------"); if (Thread_Lock(1000) == WAIT_OBJECT_0) { if (Socketfd) { closesocket(Socketfd); Socketfd = NULL; } Thread_UnLock(); } else { //no can do... } } if (WaitTheThreadEndSign(0)) { return false; } return true; }