// BusThread.cpp : 定义 DLL 应用程序的导出函数。 // #include "BusThread.h" //#include "Logger.h" //#include "CDInterface.h" //#include //#include "PacketAnalizer.h" //#include "LogicClient.h" //#include "LocalConfig.h" //#include "DevBusManager.h" //#include "ShareMemWR.h" //#include "ClientManager.h" /* inline string CurrentDateTime() { std::time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); char buf[100] = { 0 }; std::strftime(buf, sizeof(buf), " %Y-%m-%d %H:%M:%S ", std::localtime(&now)); return buf; } BusThread::BusThread(void) { m_bLocal = true; } BusThread::~BusThread(void) { } bool BusThread::ReceivedFromLocalBus(ResDataObject& packet) { bool ret = true; //mLog::FINFO("Packet from Local Bus {$} ", packet.encode()); //correct it's direction try { PacketAnalizer::UpdatePacketRoute(packet, CCOS_PACKET_ROUTE_LOCAL); //do the normal dispatch ret = ReceivedFromBus(packet); } catch (...) { ret = false; } return ret; } bool BusThread::ReceivedFromEthBus(ResDataObject& packet) { bool ret = true; //mLog::FINFO( "Packet from Eth Bus {$}",packet.encode()); //correct it's direction try { PacketAnalizer::UpdatePacketRoute(packet, CCOS_PACKET_ROUTE_ETH); //do the normal dispatch ret = ReceivedFromBus(packet); } catch (...) { ret = false; } return ret; } //1.command comes from client //2.dispatch command comes from device bool BusThread::ReceivedFromBus(ResDataObject& packet) { bool ret = true; //RES_//mLog::FINFO(packet,"Packet from Bus"); PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&packet); PACKET_TYPE type = PacketAnalizer::GetPacketType(&packet); if (PACKET_TYPE_REQ == type) { switch (cmd) { case PACKET_CMD_OPEN: case PACKET_CMD_CLOSE: ret = HandleOpenClose(packet, cmd); break; case PACKET_CMD_GET: case PACKET_CMD_ADD: case PACKET_CMD_DEL: case PACKET_CMD_UPDATE://更新对象 case PACKET_CMD_EXE: case PACKET_CMD_DATA: case PACKET_CMD_MSG: case PACKET_CMD_ONLINE://设备掉线后重新注册到Root case PACKET_CMD_PART_UPDATE://只更新对象的子节点 ret = HandleOthers(packet, cmd); break; } } return ret; } CMD_ECHO DoSendPacket(ResDataObject& packet, bool Local, bool Block, string& TargetbusId) { //do send //block or packet if (Block) { //get context ResDataObject resImg; ImgDataInfo ImgData; string MachineId; if (PacketAnalizer::GetDestinationMachineId(packet, MachineId) == false) { return CMD_ECHO_IGNORE; } printf("Got Req of PACKET_CMD_DATA!!!---------------------\n"); if (MachineId != (const char*)getLocalMachineId()) { PacketAnalizer::GetPacketContext(&packet, resImg); ImgData.SetResDataObject(resImg); printf("Going to send SMPacket!!!---------------------\n"); //target is not in this machine if (BusSendSMPacket(packet, Local, TargetbusId, (ImgData.nShareMemID)) == false) { //failed to send return CMD_ECHO_NOTARGET; } return CMD_ECHO_OK; } //same machineId,just send it as normal packet } if (BusSendPacket(packet, Local, TargetbusId) == false) { //failed to send return CMD_ECHO_NOTARGET; } return CMD_ECHO_OK; } bool BusThread::SendCcosPacket(ResDataObject& packet) { //std::cout << CurrentDateTime << " BusThread::SendCcosPacket Send Packet " << packet.encode() << endl; CCOS_PACKET_ROUTE Route; PacketAnalizer::GetPacketRoute(&packet, Route); bool sendtoLocalMachine = false; bool sendBlockData = false; //packet cmd PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&packet); if (cmd == PACKET_CMD_DATA) { sendBlockData = true; } string DesBusId; if (!PacketAnalizer::GetDestinationBusId(packet, DesBusId)) { return false; } //Precaculate direction if (Route == CCOS_PACKET_ROUTE_ANY) { //choose one Route = g_BusManager.GetBusIdDirection(DesBusId); } //caculate again if (Route == CCOS_PACKET_ROUTE_LOCAL) { //target in this machine sendtoLocalMachine = true; } else if (Route == CCOS_PACKET_ROUTE_ETH) { //target is not in this machine } else if (Route == CCOS_PACKET_ROUTE_ANY) { //if still any direction,then try local and try eth //local first std::cout << "Send local first...... to " << DesBusId << endl; if (DoSendPacket(packet, true, sendBlockData, DesBusId) == CMD_ECHO_OK) { return CMD_ECHO_OK; } std::cout << "Send Eth......to " << DesBusId << endl; //try eth return DoSendPacket(packet, false, sendBlockData, DesBusId); } else { //ignore return CMD_ECHO_IGNORE; } //do send std::cout << "Send ?????.....to " << DesBusId << " ? local " << sendtoLocalMachine << endl; return DoSendPacket(packet, sendtoLocalMachine, sendBlockData, DesBusId); } static string ProcessKey(const char* pszKey) { char szKeys[256]; strcpy(szKeys, pszKey); char* pt = szKeys; while (*pt != 0) { if (*pt == '/' || *pt == '{' || *pt == '}' || *pt == '-') *pt = '_'; pt++; } return szKeys; } bool BusThread::HandleOpenClose(ResDataObject& packet, PACKET_CMD cmd) { // stringstream stream; //stream << "LogicClient_" << szClientName << "_" << GetCurrentProcessId(); //m_strClientName = stream.str(); if (cmd == PACKET_CMD_OPEN) { char szClientID[512]; UINT64 cltid = 0, addr = 0; string openKey = PacketAnalizer::GetPacketKey(&packet); //if(openKey.length() > 48) // openKey.erase(48,openKey.length()-48); string findkey = ProcessKey(openKey.c_str()); PacketAnalizer::GetPacketHandleProcId(&packet, cltid, false); PacketAnalizer::GetPacketHandleAddr(&packet, addr, false); LogicClient* pClient = g_ClientManager.GetClient(cltid, addr, findkey.c_str()); sprintf_s(szClientID, "BUSCLT_%d_%d_%d_%s", cltid, addr, GetCurrentProcessId(), findkey.c_str()); std::cout << CurrentDateTime << "Get Open Request From Ebus Open Key " << openKey << endl; std::cout << "Use Client id to connect mqtt " << szClientID << " gened by findkey " << findkey << endl; ResDataObject ebusHandle; PacketAnalizer::GetPacketHandle(&packet, ebusHandle); CcosDevFileHandle* pHandle = new CcosDevFileHandle; pHandle->SetResDataObject(ebusHandle); if (pClient != nullptr) { std::cout << "Connect seconed times .Reuse Connection ......." << endl; ////直接复用连接,考虑过一段时间重新关闭打开 //std::cout << szClientID << " Had alread opend reuse it" << endl; //ResDataObject resRes, resResponse; //pClient->GetDeviceResource(&resRes); //PacketAnalizer::MakeOpenResponse(packet, resResponse, resRes); //PacketAnalizer::UpdateOpenRequest(resResponse, getLocalMachineId(), getLocalEbusId(), (UINT64)GetCurrentProcessId(), (UINT64)pClient); ////ResDataObject resTopic; ////PacketAnalizer::GetContextTopic(pCmd, resTopic); //std::cout << "Transfer packet org " << packet.encode() << endl; //std::cout << "Dest packet : " << resResponse.encode() << endl; //SendCcosPacket(resResponse); //return true; pClient->Close(); } else { std::cout << "Connect First times .crate Connection ......." << endl; pClient = new LogicClient(szClientID, ("EBUS"+(string)getRootpath()).c_str()); } std::cout << "" << CurrentDateTime() << "EUBS-CLIENT Set LogicClient Ebus handle [" << pClient->GetClientName() << "] Handle " << ebusHandle.encode() << endl; pClient->SetEbusFileHandle(pHandle); if (pClient->Open(openKey.c_str(), ALL_ACCESS, 1002) == RET_SUCCEED) { std::cout << "" << CurrentDateTime() << szClientID << " EUBS-CLIENT Open Mqtt Server Succeed.." << endl; //把客户端对象存进去 g_ClientManager.RegistClient((UINT64)pClient); g_ClientManager.RegistClient(cltid, addr, findkey.c_str(), pClient); //PacketAnalizer::UpdateOpenRequest(packet, getLocalMachineId(), getLocalEbusId(), GetCurrentProcessId(), (UINT64)pClient); //CcosDevFileHandle handle; //ResDataObject handleRes; //PacketAnalizer::GetPacketHandle(&packet, handleRes); //handle.SetResDataObject(handleRes); ResDataObject resRes, resResponse; pClient->GetDeviceResource(&resRes); PacketAnalizer::MakeOpenResponse(packet, resResponse, resRes); PacketAnalizer::UpdateOpenRequest(resResponse, getLocalMachineId(), getLocalEbusId(), (UINT64)GetCurrentProcessId(), (UINT64)pClient); PacketAnalizer::UpdateHandleId(&resResponse,(UINT64)pClient); //把LogicClient 的对象指针作为HnaleID 传回去 //ResDataObject resTopic; //PacketAnalizer::GetContextTopic(pCmd, resTopic); std::cout << "" << CurrentDateTime() << " Open Success .. Transfer packet org " << endl;//<< packet.encode() //std::cout << "Dest packet : " << resResponse.encode() << endl; SendCcosPacket(resResponse); std::cout << "###################### " << pClient->GetClientName() << " Make Use Point as HandleID " << (UINT64)pClient << endl; pClient->SetPacketArrivedCallback([this, pClient](ResDataObject* res, const char* topic, void* conn_void) { ccos_mqtt_connection* conn = (ccos_mqtt_connection*)conn_void; //std::cout << "Send back to Ebus packet " << res->encode() << endl; UINT64 addr, proc; addr = proc = 0; PacketAnalizer::GetPacketHandleAddr(res, addr, false); PacketAnalizer::GetPacketHandleProcId(res, proc, false); //g_ClientManager.GetClient((UINT32)proc, (UINT32)addr, ) if (addr == 0 && proc == 0) { //这里是Notify //ResDataObject resEbusHandle; CcosDevFileHandle* pHandle = pClient->GetEbusFileHanle(); //pHandle->GetResDataObject(resEbusHandle); PacketAnalizer::UpdateNotifyHandle(*res, *pHandle); PacketAnalizer::UpdateOpenRequest(*res, getLocalMachineId(), getLocalEbusId(), (UINT64)GetCurrentProcessId(), (UINT64)pClient); PacketAnalizer::UpdateHandleId(res, (UINT64)pClient); //把LogicClient 的对象指针作为HnaleID 传回去 std::cout << "###################### " << pClient->GetClientName() << " Used Point as HandleID" << (UINT64)pClient << endl; } SendCcosPacket(*res); }); } else { std::cout << "" << CurrentDateTime() << szClientID << " EUBS-CLIENT Open Mqtt Server Failed.." << endl; return false; } } else if (cmd == PACKET_CMD_CLOSE) { UINT64 conn; UINT64 cltid = 0, addr = 0; PacketAnalizer::GetPacketHandleAddr(&packet, conn, true); PacketAnalizer::GetPacketHandleProcId(&packet, cltid, false); PacketAnalizer::GetPacketHandleAddr(&packet, addr, false); //要判断下指针是否我分配过 if (g_ClientManager.FindClient(conn)) { g_ClientManager.UnRegistClient(conn); LogicClient* pClient = (LogicClient*)conn; delete pClient; } //CloseConnection((ccos_mqtt_connection*)conn); } return true; } bool BusThread::HandleOthers(ResDataObject& packet, PACKET_CMD cmd) { std::cout << "BusThread::HandleOthers " << cmd << endl; UINT64 conn = 0; //PacketAnalizer::GetPacketHandleAddr(&packet, conn, true); PacketAnalizer::GetHandleId(&packet, conn); string key; ResDataObject resParam, resHandle; //要判断下指针是否我分配过 if (g_ClientManager.FindClient(conn)) { LogicClient* pClient = (LogicClient*)conn; switch (cmd) { case PACKET_CMD_GET: break; case PACKET_CMD_ADD: case PACKET_CMD_DEL: break; case PACKET_CMD_UPDATE://更新对象 break; case PACKET_CMD_EXE: //std::cout << pClient->get //todo 将Owner透传过去 pClient->Action_Trans(&packet); break; case PACKET_CMD_DATA: case PACKET_CMD_MSG: break; case PACKET_CMD_ONLINE://设备掉线后重新注册到Root case PACKET_CMD_PART_UPDATE://只更新对象的子节点 break; } } return false; } //work bool BusThread::Exec() { while (WaitForInQue()) { bool status = true; //pump In&Out que while (status) { ResDataObject Packet; status = false; //check req first if (PopReqDataObject(Packet)) { status = true; //send it to cdi PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&Packet); PACKET_TYPE type = PacketAnalizer::GetPacketType(& Packet); string key = PacketAnalizer::GetPacketKey(&Packet); std::cout << "------------ From Ebus type: [" << type << "] cmd : [" << cmd << "] key [" << key << "]" << endl; //CDInterface *p = CDInterface::GetCDI(); if (m_bLocal) { ReceivedFromLocalBus(Packet); } else { ReceivedFromEthBus(Packet); } //p->ReceivedFromBus(Packet); } } } return true; } bool BusThread::OnStartThread(void) { printf("========== BusThread::OnStartThread 0X%08X \n", GetCurrentThreadId()); #ifndef TEMP_COMP //mLog::FINFO("Bus Thread Begin Entry....."); #endif return true; } bool BusThread::OnEndThread(void) { printf("========== BusThread::OnEndThread 0X%08X \n", GetCurrentThreadId()); #ifndef TEMP_COMP Logger *pLog = (Logger*)m_pThreadLogger; //mLog::FINFO( "Bus Thread End Exit....."); #endif return true; } */