1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183 |
- #include "P2PModule.h"
- #include <sys/time.h>
- #include <sys/select.h>
- #include <netinet/tcp.h>
- #include <cstdio>
- #include "P2PModule.h"
- #include "DeviceBus.h"
- #include "common_api.h"
- //using namespace std;
- #define P2P_CHECK_PERIOD (2000)
- string FormatLinkError(int ErrorCode)
- {
- switch (ErrorCode)
- {
- case 7: return "sckOutOfMemory";// 7 内存不足
- case 380: return "sckInvalidPropertyValue";// 380 属性值无效
- case 394: return "sckGetNotSupported";// 394 属性不可读
- case 383: return "sckGetNotSupported";// 383 属性是只读的
- case 40006: return "sckBadState";// 40006 所请求的事务或请求本身的错误协议或者错误连接状态
- case 40014: return "sckInvalidArg";// 40014 传递给函数的参数格式不确定,或者不在指定范围内
- case 40017: return "sckSuccess";// 40017 成功
- case 40018: return "sckUnsupported";// 40018 不支持的变量类型
- case 40020: return "sckInvalidOp";// 40020 在当前状态下的无效操作
- case 40021: return "sckOutOfRange";// 40021 参数越界
- case 40026: return "sckWrongProtocol";// 40026 所请求的事务或请求本身的错误协议
- case 10004: return "sckOpCanceled";// 10004 取消操作
- case 10014: return "sckInvalidArgument";// 10014 所请求的地址是广播地址,但未设置标记
- case 10035: return "sckWouldBlock";// 10035 套接字不成块,而指定操作将使之成块
- case 10036: return "sckInProgress";// 10036 制造块的Winsock操作在进行之中
- case 10037: return "sckAlreadyComplete";// 10037 完成操作。未进行制作块的操作
- case 10038: return "sckNotSocket";// 10038 描述符不是套接字
- case 10040: return "sckMsgTooBig";// 10040 数据太大,不适于缓冲区的要求,因而被截断
- case 10043: return "sckPortNotSupported";// 10043 不支持指定的端口
- case 10048: return "sckAddressInUse";// 10048 地址在使用中
- case 10049: return "sckAddressNotAvailable";// 10049 来自本地机器的不可用地址
- case 10050: return "sckNetworkSubsystemFailed";// 10050 网络子系统失败
- case 10051: return "sckNetworkUnreachable";// 10051 当前不能从主机到达网络
- case 10052: return "sckNetReset 10052";// 在设置SO_KEEPALIVE时连接超时
- case 10053: return "sckConnectAborted";// 10053 由于超时或者其它失败而中止接连
- case 10054: return "sckConnectionReset";// 10054 通过远端重新设置连接
- case 10055: return "sckNoBufferSpace";// 10055 没有可用的缓存空间
- case 10056: return "sckAlreadyConnected";// 10056 已连接的套接字
- case 10057: return "sckNotConnected";// 10057 未接连套接字
- case 10058: return "sckSockedShutdown";// 10058 已关闭套接字
- case 10060: return "sckTimedout";// 10060 套接字超时
- case 10061: return "sckConnectionRefused";// 10061 强行拒绝连接
- case 10093: return "sckNotInitialized";// 10093 套接字没有初始化
- case 11001: return "sckHostNotFound";// 11001 授权应答:未找到主机
- case 11002: return "sckHostNotFoundTryAgain";// 11002 非授权应答:未找到主机,重试
- case 11003: return "sckNonRecoverableError";// 11003 不可恢复的错误
- case 11004: return "sckNoData";// 11004 无效名,对所请求的类型无数据记录
- default: return string("Socket error %d", ErrorCode);
- }
- }
- int socket_set_keepalive(int &fd)
- {
- int keepalive = 1;
- int keepidle = 10; // 10秒无活动开始探测
- int keepintvl = 5; // 5秒探测间隔
- int keepcnt = 3; // 3次探测失败断开
- if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(keepalive)))
- {
- return -1;
- }
- if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &keepidle, sizeof(keepidle))) {
- return -2;
- }
- if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &keepintvl, sizeof(keepintvl))) {
- return -3;
- }
- if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &keepcnt, sizeof(keepcnt))) {
- return -4;
- }
- return 0;
- }
- P2P_Module_Base::P2P_Module_Base()
- {
- m_Port = 0;
- m_MessageBuffSize = 1024 * 1024;//set 1M limits
- m_pszMessageBuff = new char[m_MessageBuffSize];
- m_pMainDevBus = NULL;
- Socketfd = -1;
- m_SocketTarget = -1;
- m_pszIpAddress = new char[256];
- m_pszLocalBusId = new char[256];
- m_pszRemoteBusId = new char[256];
- memset(m_pszIpAddress, 0, 256);
- memset(m_pszLocalBusId, 0, 256);
- memset(m_pszRemoteBusId, 0, 256);
- m_ConnectionReady = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET,false);
- }
- P2P_Module_Base::~P2P_Module_Base()
- {
- Disconnect();
- delete[]m_pszIpAddress;
- delete[]m_pszLocalBusId;
- delete[]m_pszRemoteBusId;
- m_pszIpAddress = NULL;
- m_pszLocalBusId = NULL;
- m_pszRemoteBusId = NULL;
- delete[]m_pszMessageBuff;
- m_pszMessageBuff = NULL;
- }
- //init
- void P2P_Module_Base::InitP2P(const char *pszIp, const char *pszLocalBusId, bool AsServer, PVOID pParent)
- {
- strncpy(m_pszIpAddress, pszIp, 255);
- strncpy(m_pszLocalBusId, pszLocalBusId, 255);
- m_Port = (unsigned short)(CCOS_HW_CHANNEL/10) + 3;//80000 is out of WORD limits
- m_AsServer = AsServer;
- m_pMainDevBus = pParent;
- /*string logfile = GetProcessDirectory() + "\\logs\\Local_p2pDispathlog.log";
- Logger *pLog = CreateLogger();
- pLog->SetLogFilepath(logfile.c_str());
- SetLogger(pLog);*/
- }
- //connect
- bool P2P_Module_Base::ConnectP2P()
- {
- return false;
- }
- //disconnect
- void P2P_Module_Base::Disconnect()
- {
- }
- bool P2P_Module_Base::IsConnected()
- {
- return false;
- }
- //Blob的定义
- //4字节 大长度:4字节Msg的长度:Msg内容,4字节Block的长度:Block内容
- //必须存在项为 大长度,Msg长度,Block长度,如果无此内容,其值为0
- bool P2P_Module_Base::SendRaw(int &sck, const char *pData, DWORD size, bool Sync)
- {
- uint32_t Sented = 0;
- int Sendonce = 0;
- while (Sented < size) {
- Thread_Lock();
- if (WaitTheThreadEndSign(0)) {
- Thread_UnLock();
- return false;
- }
- if (sck != -1) {
- if (!Sync) {
- fd_set WriteSet;
- struct timeval tm;
- tm.tv_sec = P2P_CHECK_PERIOD / 1000;
- tm.tv_usec = (P2P_CHECK_PERIOD % 1000) * 1000;
- FD_ZERO(&WriteSet);
- FD_SET(sck, &WriteSet);
- int sel_ret = select(sck + 1, NULL, &WriteSet, NULL, &tm);
- if (sel_ret <= 0) {
- // 处理错误或超时
- }
- }
- Sendonce = send(sck, pData + Sented, size - Sented, 0);
- if (Sendonce > 0) {
- Sented += Sendonce;
- }
- else {
- close(sck);
- sck = -1;
- Thread_UnLock();
- return false;
- }
- }
- else {
- Thread_UnLock();
- return false;
- }
- Thread_UnLock();
- }
- return true;
- }
- bool P2P_Module_Base::ReadRawSync(int &sck, char *pszBuff, DWORD ExpectedSize)
- {
- DWORD Total = 0;
- int Received = 0;
- //////mLog::FINFO("Enter ReadRawSync");
- while (Total < ExpectedSize)
- {
- Thread_Lock();
- if (WaitTheThreadEndSign(0) == false)
- {
- if (sck != -1)
- {
- //ok
- //////mLog::FINFO("br--");
- Received = recv(sck, (char*)&pszBuff[Total], ExpectedSize - Total, 0);
- //////mLog::FINFO("ar--:{$}", Received);
- if (Received > 0)
- {
- Total += Received;
- }
- else if (Received == 0) // 对端关闭连接
- {
- close(sck);
- sck = -1;
- Thread_UnLock();
- //////mLog::FERROR("socket closed by peer\n");
- return false;
- }
- else
- {
- int err = errno;
- close(sck);
- sck = -1;
- Thread_UnLock();
- // 处理常见的非阻塞错误
- if (err == EAGAIN || err == EWOULDBLOCK) {
- // 对于同步读取,这通常表示超时,但在同步模式中不应发生
- //////mLog::FERROR("socket timeout: %d\n", err);
- }
- else {
- //////mLog::FERROR("socket error: %d\n", err);
- }
- //socket error,or closed
- //string errinfo = FormatLinkError(WSAGetLastError());
- //////mLog::FERROR("socket error:{$}\n", errinfo.c_str());
- return false;
- }
- }
- }
- Thread_UnLock();
- }
- //////mLog::FINFO("Leave ReadRawSync OK");
- return true;
- }
- bool P2P_Module_Base::ReadRaw(int &sck, char *pszBuff, DWORD ExpectedSize)
- {
- DWORD Total = 0;
- int Received = 0;
- //////mLog::FINFO("Enter ReadRaw");
- while (Total < ExpectedSize)
- {
- Thread_Lock();
- if (WaitTheThreadEndSign(0) == false)
- {
- if (sck != -1)
- {
- //check readable
- // 超时时间
- struct timeval tm;
- fd_set ReadSet;
- tm.tv_sec = P2P_CHECK_PERIOD / 1000;
- tm.tv_usec = P2P_CHECK_PERIOD % 1000 * 1000;
- FD_ZERO(&ReadSet);
- FD_SET(sck, &ReadSet);
-
- //////mLog::FINFO("bs--");
- int sel_ret = select(sck + 1, &ReadSet, NULL, NULL, &tm);
- //////mLog::FINFO("as--");
- if (sel_ret > 0)
- {
- //ok
- //////mLog::FINFO("br--");
- Received = recv(sck, (char*)&pszBuff[Total], ExpectedSize - Total, 0);
- //////mLog::FINFO("ar--:{$}", Received);
- if (Received > 0)
- {
- Total += Received;
- }
- else if (Received == 0) // 对端关闭连接
- {
- close(sck);
- sck = -1;
- Thread_UnLock();
- //////mLog::FERROR("socket closed by peer\n");
- return false;
- }
- else
- {
- int err = errno;
- close(sck);
- sck = -1;
- Thread_UnLock();
- if (err == EAGAIN || err == EWOULDBLOCK) {
- //////mLog::FERROR("socket timeout: %d\n", err);
- }
- else {
- //////mLog::FERROR("socket recv error: %s (%d)\n", strerror(err), err);
- }
- return false;
- }
- }
- else if (sel_ret < 0)
- {
- int err = errno;
- close(sck);
- sck = -1;
- Thread_UnLock();
- //////mLog::FERROR("select error: %s (%d)\n", strerror(err), err);
- return false;
- }
- //timeout
- }
- else
- {
- //socket erroe
- Thread_UnLock();
- //////mLog::FERROR("socket error:using empty sck.\n");
- return false;
- }
- }
- else
- {
- if (sck)
- {
- close(sck);
- sck = -1;
- }
- Thread_UnLock();
- //////mLog::FINFO("exit thread.close socket .\n");
- return false;
- }
- Thread_UnLock();
- }
- //////mLog::FINFO("Leave ReadRaw OK");
- return true;
- }
- //send block
- bool P2P_Module_Base::SendBlob(int &sck, const char *pMsg, const char *pBlock, DWORD size, bool Sync)
- {
- bool ret = false;
- DWORD MsgLen = 0;
- //Logger *pLog = (Logger*)GetLogger();
- if (pMsg)
- {
- MsgLen = (DWORD)strlen(pMsg);
- }
- DWORD BlockLen = size;
- DWORD TotalLen = MsgLen + BlockLen + 4 + 4;
- if (MsgLen > m_MessageBuffSize - 12)
- {
- //////mLog::FERROR( "SendBlob error.Message size is too big\n");
- return false;
- }
- int MsgPartSize = 0;
- //copy total Len
- memcpy(&m_pszMessageBuff[MsgPartSize], (const char*)&TotalLen, sizeof(TotalLen));
- MsgPartSize += sizeof(TotalLen);
- //copy msg Len
- memcpy(&m_pszMessageBuff[MsgPartSize], (const char*)&MsgLen, sizeof(MsgLen));
- MsgPartSize += sizeof(MsgLen);
- //copy msg
- memcpy(&m_pszMessageBuff[MsgPartSize], pMsg, MsgLen);
- MsgPartSize += MsgLen;
- //copy BlockLen
- memcpy(&m_pszMessageBuff[MsgPartSize], (const char*)&BlockLen, sizeof(BlockLen));
- MsgPartSize += sizeof(BlockLen);
- if (MsgLen > 0)
- {
- //////mLog::FINFO( "TotalLen = {$},MsgLen = {$},BlobLen = {$}.MsgInfo:{\n{$}}", TotalLen, MsgLen, BlockLen, pMsg);
- }
- //connection is steady
- Thread_Lock();
- if (sck)
- {
- bool Sent = SendRaw(sck, m_pszMessageBuff, MsgPartSize);
- if (Sent)
- {
- ////mLog::FINFO( "write head part succeed.size = {$}", MsgPartSize);
- if (BlockLen > 0)
- {
- Sent = SendRaw(sck, (const char*)pBlock, BlockLen);
- if (Sent)
- {
- ////mLog::FINFO( "write Block data: {$} succeed\n", BlockLen);
- ////mLog::FINFO( "write Blob: succeed----------------\n");
- ret = true;
- }
- }
- else
- {
- ////mLog::FINFO( "write MessageOnly: succeed----------------\n");
- ret = true;
- }
- }
- }
- if (ret == false)
- {
- ////mLog::FERROR( "SendBlob error.close p2p socket\n");
- }
- Thread_UnLock();
- return ret;
- }
- void P2P_Module_Base::Quit()
- {
- }
- bool P2P_Module_Base::SendBlob(const char *pMsg, const char *pBlock, DWORD size)
- {
- bool ret = false;
- return ret;
- }
- bool P2P_Module_Base::ReadBlobSync(int &sck, char *pMsg, DWORD &MsgSize, char *pBlock, DWORD &size)
- {
- bool ret = false;
- DWORD TotalLen = 0;
- DWORD MsgLen = 0;
- DWORD BlockLen = 0;
- do {
- //head
- ret = ReadRawSync(sck, (char*)&TotalLen, sizeof(TotalLen));
- if (ret == false || TotalLen < 8)
- {
- break;
- }
- ////mLog::FDEBUG("read head:{$}\n", TotalLen);
- //msg len
- ret = ReadRawSync(sck, (char*)&MsgLen, sizeof(MsgLen));
- if (ret == false || MsgLen + 8 > TotalLen || (MsgLen > MsgSize))
- {
- break;
- }
- ////mLog::FDEBUG("read MsgLen:{$}\n", MsgLen);
- //msg info
- if (MsgLen > 0)
- {
- ret = ReadRawSync(sck, pMsg, MsgLen);
- if (ret == false)
- {
- break;
- }
- pMsg[MsgLen] = 0;
- ////mLog::FDEBUG("read Msg:{$}\n", pMsg);
- }
- //Block Len
- ret = ReadRawSync(sck, (char*)&BlockLen, sizeof(BlockLen));
- if (ret == false || BlockLen + MsgLen + 8 > TotalLen || (BlockLen > size))
- {
- break;
- }
- ////mLog::FDEBUG("read BlockLen:{$}\n", BlockLen);
- //block info
- if (BlockLen > 0)
- {
- if (pBlock == NULL)
- {
- break;
- }
- ret = ReadRawSync(sck, pBlock, BlockLen);
- if (ret == false)
- {
- break;
- }
- ////mLog::FDEBUG("read Block data Succeed\n");
- }
- MsgSize = MsgLen;
- size = BlockLen;
- ////mLog::FDEBUG("read Block Succeed-\n");
- return true;
- } while (0);
- return false;
- }
- //Read block
- bool P2P_Module_Base::ReadBlob(int &sck, char *pMsg, DWORD &MsgSize, char *pBlock, DWORD &size)
- {
- bool ret = false;
- DWORD TotalLen = 0;
- DWORD MsgLen = 0;
- DWORD BlockLen = 0;
- do {
- //head
- ret = ReadRaw(sck,(char*)&TotalLen, sizeof(TotalLen));
- if (ret == false || TotalLen < 8)
- {
- break;
- }
- ////mLog::FINFO("read head:{$}\n", TotalLen);
- //msg len
- ret = ReadRaw(sck, (char*)&MsgLen, sizeof(MsgLen));
- if (ret == false || MsgLen + 8 > TotalLen || (MsgLen > MsgSize))
- {
- break;
- }
- ////mLog::FINFO("read MsgLen:{$}\n", MsgLen);
- //msg info
- if (MsgLen > 0)
- {
- ret = ReadRaw(sck, pMsg, MsgLen);
- if (ret == false)
- {
- break;
- }
- pMsg[MsgLen] = 0;
- ////mLog::FINFO("read Msg:{$}\n", pMsg);
- }
- //Block Len
- ret = ReadRaw(sck, (char*)&BlockLen, sizeof(BlockLen));
- if (ret == false || BlockLen + MsgLen + 8 > TotalLen || (BlockLen > size))
- {
- break;
- }
- ////mLog::FINFO("read BlockLen:{$}\n", BlockLen);
- //block info
- if (BlockLen > 0)
- {
- if (pBlock == NULL)
- {
- break;
- }
- ret = ReadRaw(sck, pBlock, BlockLen);
- if (ret == false)
- {
- break;
- }
- ////mLog::FINFO("read Block data Succeed\n");
- }
- MsgSize = MsgLen;
- size = BlockLen;
- ////mLog::FINFO("read Block Succeed-\n");
- return true;
- } while (0);
- return false;
- }
- //callback
- void P2P_Module_Base::OnBlob(char *pMsg,unsigned char *pData, DWORD size)
- {
- //if (m_pMainDevBus)
- //{
- // ((DeviceBUS*)m_pMainDevBus)->BlobDataArrived(pMsg, pData, size);
- //}
- }
- bool P2P_Module_Base::Exec()
- {
- return false;
- }
- bool P2P_Module_Base::IsTargetBusId(const char *pszTargetBusId)
- {
- bool ret = false;
- //Thread_Lock();
- //only server can do this check....
- if ((m_pszRemoteBusId != NULL) && (pszTargetBusId != NULL) && (m_AsServer == true))
- {
- ret = (strcmp(m_pszRemoteBusId, pszTargetBusId) == 0);
- }
- //Thread_UnLock();
- return ret;
- }
- //---------------------------------------------------------------------------------------------------------------
- P2P_Module_Server::P2P_Module_Server()
- {
- m_HeartBeatTimeout = 0;
- }
- P2P_Module_Server::~P2P_Module_Server()
- {
- }
- //connect
- bool P2P_Module_Server::ConnectP2P()
- {
- //start thread
- return StartThread();
- }
- void P2P_Module_Server::Quit()
- {
- StopThread(1000);
- }
- //disconnect
- void P2P_Module_Server::Disconnect()
- {
- //stop thread
- //不能给太长时间,因为内部用的Socket是阻塞模式
- //StopThread(P2P_CHECK_PERIOD + 1000);
- // close socket in the internal thread
- }
- bool P2P_Module_Server::IsConnected()
- {
- if (m_SocketTarget != -1)
- {
- return m_ConnectionReady->Wait(0);
- }
- return false;
- }
- //-1:exit,0:socket error,1:succeed
- int P2P_Module_Server::TryInit()
- {
- while (WaitTheThreadEndSign(0) == false)
- {
- //init socket
- Socketfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
- if (Socketfd == -1)
- {
- return 0;
- }
- sockaddr_in service;
- memset(&service, 0, sizeof(service));
- service.sin_family = AF_INET;
- service.sin_addr.s_addr = inet_addr(m_pszIpAddress);
- service.sin_port = htons(m_Port);
- //bind
- int reuse = 1;
- setsockopt(Socketfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
- if (bind(Socketfd, (struct sockaddr*)&service, sizeof(service)) < 0) {
- close(Socketfd);
- return 0;
- }
- //buff size
- //listen
- if (listen(Socketfd, SOMAXCONN) < 0) {
- close(Socketfd);
- return 0;
- }
- // 设置非阻塞
- int flags = fcntl(Socketfd, F_GETFL, 0);
- fcntl(Socketfd, F_SETFL, flags | O_NONBLOCK);
- return 1;
- }
- return -1;
- }
- //-1:exit,0:socket error,1:succeed
- int P2P_Module_Server::TryAccept()
- {
- while (WaitTheThreadEndSign(0) == false)
- {
- int sClient;
- fd_set ReadSet;
- struct sockaddr_in remoteAddr;
- socklen_t nAddrlen = sizeof(remoteAddr);
- // 超时时间
- struct timeval tm;
- tm.tv_sec = P2P_CHECK_PERIOD / 1000;
- tm.tv_usec = P2P_CHECK_PERIOD % 1000 * 1000;
- FD_ZERO(&ReadSet);
- FD_SET(Socketfd, &ReadSet);
- int max_fd = Socketfd + 1;
- int sel_ret = select(max_fd, &ReadSet, NULL, NULL, &tm);
- if (sel_ret > 0)
- {
- // 检查是否有连接请求
- if (FD_ISSET(Socketfd, &ReadSet))
- {
- //ok
- sClient = accept(Socketfd, (sockaddr*)&remoteAddr, &nAddrlen);
- if (sClient == -1)
- {
- int err = errno;
- if (err == EAGAIN || err == EWOULDBLOCK) {
- // 非阻塞模式下可能发生,继续尝试
- continue;
- }
- // 其他错误,记录并返回
- ////mLog::FERROR("accept failed: %s (%d)", strerror(err), err);
- return 0;
- }
- //accept succeed
- Thread_Lock();
- m_SocketTarget = sClient;
- Thread_UnLock();
- return 1;
- }
- }
- else if (sel_ret < 0)
- {
- int err = errno;
- if (err == EINTR) {
- // 被信号中断,继续尝试
- continue;
- }
- ////mLog::FERROR("select failed: %s (%d)", strerror(err), err);
- close(Socketfd);
- return 0;
- }
- //timeout
- }
- return -1;
- }
- //-1:exit,0:socket error
- int P2P_Module_Server::TryCheckConnection()
- {
- while (WaitTheThreadEndSign(100) == false)
- {
- {
- DWORD ret = Thread_Lock(1000);
- if (ret == WAIT_OBJECT_0)
- {
- //got lock
- if (m_SocketTarget)
- {
- //check connection
- // 超时时间
- fd_set ErrSet;
- struct timeval tm;
- tm.tv_sec = 0 / 1000;
- tm.tv_usec = 0 % 1000*1000;
- FD_ZERO(&ErrSet);
- FD_SET(Socketfd, &ErrSet);
- int sel_ret = select(-1, NULL, NULL, &ErrSet, &tm);
- if (sel_ret < 0)
- {
- int err = errno;
- close(m_SocketTarget);
- m_SocketTarget = -1;
- close(Socketfd);
- Thread_UnLock();
- //string errinfo = FormatLinkError(WSAGetLastError());
- ////mLog::FERROR("socket error: %s (%d)", strerror(err), err);
- return 0;//do over
- }
- //timeout or ok
- if ((GetTickCount() - m_HeartBeatTimeout) > 20000)
- {
- //heartbeat
- //server -> client
- //it might cause the m_SocketTarget gets empty
- if (SendBlob(m_pszLocalBusId, 0, 0))
- {
- ////mLog::FINFO("Server:Send heart beat\n");
- }
- }
- }
-
- if (m_SocketTarget == -1)
- {
- int err = errno;
- close(Socketfd);
- Thread_UnLock();
- //string errinfo = FormatLinkError(WSAGetLastError());
- ////mLog::FERROR("socket error: %s (%d)", strerror(err), err);
- return 0;//do over
- }
- Thread_UnLock();
- }
- }
- }
- //exit
- ////mLog::FINFO("Server:Stop Thread.exiting...\n");
- DWORD ret = Thread_Lock(1000);
- if (ret == WAIT_OBJECT_0)
- {
- //got lock
- if (m_SocketTarget)
- {
- close(m_SocketTarget);
- m_SocketTarget = -1;
- close(Socketfd);
- }
- Thread_UnLock();
- }
- else
- {
- //no lock ,no fxxx...
- ////mLog::FINFO("P2PServer:Failed CLose Socket...\n");
- }
- return -1;
- }
- bool P2P_Module_Server::Exec()
- {
- int ret = 0;
- ////mLog::FINFO("P2PServer:init Entry\n");
- m_ConnectionReady->ResetEvent();
- if (WaitTheThreadEndSign(100) == true)
- {
- return false;
- }
- //init socket
- ret = TryInit();
- if (ret < 0)
- {
- return false;
- }
- else if (ret == 0)
- {
- return true;
- }
- ////mLog::FINFO("P2PServer:try init succeed\n");
- //try accept
- ret = TryAccept();
- if (ret < 0)
- {
- return false;
- }
- else if (ret == 0)
- {
- return true;
- }
- ////mLog::FINFO("P2PServer:try accept succeed\n");
- //protocal one: read client's BusId
- //read targetbusId
- DWORD MsgSize = 256;
- DWORD BlockSize = 0;
- memset(m_pszRemoteBusId, 0, MsgSize);
- if (ReadBlob(m_SocketTarget,m_pszRemoteBusId, MsgSize, NULL, BlockSize) == false)
- {
- //handshake failed
- close(Socketfd);
- return true;
- }
- ////mLog::FINFO("P2PServer:try handshake succeed.{$}\n", m_pszRemoteBusId);
- int on = 1;
- //tcp no delay
- setsockopt(m_SocketTarget, IPPROTO_TCP, TCP_NODELAY, (const char *)&on, sizeof(on));
- int optVal = 1024 * 1280 * 6 + 6000;
- socklen_t optLen = sizeof(int);
- setsockopt(m_SocketTarget, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, optLen);
- ////mLog::FINFO("try set send buff size:{$}\n", optVal);
- getsockopt(m_SocketTarget, SOL_SOCKET, SO_SNDBUF, (char*)&optVal, &optLen);
- ////mLog::FINFO("after try set send buff size:{$}\n", optVal);
- m_ConnectionReady->SetEvent();
- m_HeartBeatTimeout = GetTickCount();
- //do while and try check connection + exitflag
- ret = TryCheckConnection();
- if (ret < 0)
- {
- return false;
- }
- return true;
- }
- bool P2P_Module_Server::SendBlob(const char *pMsg, const char *pBlock, DWORD size)
- {
- bool ret = P2P_Module_Base::SendBlob(m_SocketTarget, pMsg, pBlock, size);
- if (ret)
- {
- m_HeartBeatTimeout = GetTickCount();
- }
- return ret;
- }
- //---------------------------------------------------------------------------------------------------------------
- P2P_Module_Client::P2P_Module_Client()
- {
- m_pszMsgBuff = new char[1024 * 1024];
- m_pszBlockBuff = new unsigned char[1024 * 1024 * 4];
- }
- P2P_Module_Client::~P2P_Module_Client()
- {
- delete []m_pszMsgBuff;
- m_pszMsgBuff = NULL;
- delete[]m_pszBlockBuff;
- m_pszBlockBuff = NULL;
- }
- //connect
- bool P2P_Module_Client::ConnectP2P()
- {
- return StartThread();
- }
- //disconnect
- void P2P_Module_Client::Disconnect()
- {
- StopThread(P2P_CHECK_PERIOD + 1000);
- }
- //send block,one way to client no send to anyone
- //virtual bool SendBlob(char *pMsg, char *pBlock, DWORD size);
- //callback
- //void P2P_Module_Client::OnBlob(char *pMsg, char *pData, DWORD size)
- //{
- //
- //}
- bool P2P_Module_Client::TryConnectServer()
- {
- while (WaitTheThreadEndSign(100) == false)
- {
- Socketfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
- if (Socketfd == -1)
- {
- return false;
- }
- //u_long iMode = 1;
- //int ret = ioctlsocket(Socketfd, FIONBIO, &iMode);//non-blocking mode
- //if (ret == 0)
- {
- sockaddr_in addrSrv;
- memset(&addrSrv, 0, sizeof(addrSrv));
- addrSrv.sin_family = AF_INET;
- addrSrv.sin_port = htons(m_Port);
- inet_pton(AF_INET, m_pszIpAddress, &addrSrv.sin_addr);
- if (connect(Socketfd, (struct sockaddr*)&addrSrv, sizeof(addrSrv)) == 0) {
- socket_set_keepalive(Socketfd);
- return true;
- }
- //else
- //{
- // // 超时时间
- // struct timeval tm;
- // tm.tv_sec = P2P_CHECK_PERIOD / 1000;
- // tm.tv_usec = P2P_CHECK_PERIOD % 1000;
- // int selret = 0;
- // fd_set set;
- // FD_ZERO(&set);
- // FD_SET(Socketfd, &set);
- // Sleep(1000);
- // selret = select(-1, NULL, &set, NULL, &tm);
- // if (selret > 0)
- // {
- // int error = -1;
- // int optLen = sizeof(int);
- // getsockopt(Socketfd, SOL_SOCKET, SO_ERROR, (char*)&error, &optLen);
- // // 之所以下面的程序不写成三目运算符的形式, 是为了更直观, 便于注释
- // if (0 == error)
- // {
- // //just try it
- // socket_set_keepalive(Socketfd);
- // return StartThread();
- // }
- // }
- //}
- }
- close(Socketfd);
- Socketfd = -1;
- }
- return false;
- }
- bool P2P_Module_Client::Exec()
- {
- if (TryConnectServer())
- {
- int on = 1;
- //tcp no delay
- setsockopt(Socketfd, IPPROTO_TCP, TCP_NODELAY, (const char *)&on, sizeof(on));
- int optVal = 1024 * 1280 * 6 + 6000;
- socklen_t optLen = sizeof(int);
- setsockopt(Socketfd, SOL_SOCKET, SO_RCVBUF, (char*)&optVal, optLen);
- ////mLog::FINFO("try set read buff size:{$}\n", optVal);
- getsockopt(Socketfd, SOL_SOCKET, SO_RCVBUF, (char*)&optVal, &optLen);
- ////mLog::FINFO("after try set read buff size:{$}\n", optVal);
- //send local BusId
- bool ret = P2P_Module_Base::SendBlob(Socketfd, m_pszLocalBusId, 0, 0, true);
- if (ret == false)
- {
- close(Socketfd);
- Socketfd = -1;
- ////mLog::FERROR("Send Handshake failed");
- return false;
- }
- ////mLog::FINFO("Handshake succeed:\n");
- //u_long iMode = 0;
- //ioctlsocket(Socketfd, FIONBIO, &iMode);//blocking mode
- DWORD MsgSize = 1024 * 1024;
- DWORD BlockSize = 1024 * 1024 * 4;
- while (ReadBlobSync(Socketfd, m_pszMsgBuff, MsgSize, (char*)m_pszBlockBuff, BlockSize))
- {
- ////mLog::FINFO("Read Block Data succeed:\n");
- if (BlockSize > 0)
- {
- //////mLog::FINFO("Notify Block Data begin\n");
- OnBlob(m_pszMsgBuff, m_pszBlockBuff, BlockSize);
- //////mLog::FINFO("Notify Block Data end\n");
- }
- else
- {
- //ignore others
- //printf("got Heartbeat--------------------------\n\n\n");
- ////mLog::FINFO("got Heartbeat--------------------------");
- }
- //reinit the buff
- MsgSize = 1024 * 1024;
- BlockSize = 1024 * 1024 * 4;
- }
- ////mLog::FINFO("Exit Thread--------------------------");
- if (Thread_Lock(1000) == WAIT_OBJECT_0)
- {
- if (Socketfd)
- {
- close(Socketfd);
- Socketfd = -1;
- }
- Thread_UnLock();
- }
- else
- {
- //no can do...
- }
- }
- if (WaitTheThreadEndSign(0))
- {
- return false;
- }
- return true;
- }
|