123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483 |
- // BusThread.cpp : 定义 DLL 应用程序的导出函数。
- //
- #include "BusThread.h"
- //#include "Logger.h"
- //#include "CDInterface.h"
- //#include <iostream>
- //#include "PacketAnalizer.h"
- //#include "LogicClient.h"
- //#include "LocalConfig.h"
- //#include "DevBusManager.h"
- //#include "ShareMemWR.h"
- //#include "ClientManager.h"
- /*
- inline string CurrentDateTime()
- {
- std::time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
- char buf[100] = { 0 };
- std::strftime(buf, sizeof(buf), " %Y-%m-%d %H:%M:%S ", std::localtime(&now));
- return buf;
- }
- BusThread::BusThread(void)
- {
- m_bLocal = true;
- }
- BusThread::~BusThread(void)
- {
- }
- bool BusThread::ReceivedFromLocalBus(ResDataObject& packet)
- {
- bool ret = true;
- //mLog::FINFO("Packet from Local Bus {$} ", packet.encode());
- //correct it's direction
- try {
- PacketAnalizer::UpdatePacketRoute(packet, CCOS_PACKET_ROUTE_LOCAL);
- //do the normal dispatch
- ret = ReceivedFromBus(packet);
- }
- catch (...)
- {
- ret = false;
- }
- return ret;
- }
- bool BusThread::ReceivedFromEthBus(ResDataObject& packet)
- {
- bool ret = true;
- //mLog::FINFO( "Packet from Eth Bus {$}",packet.encode());
- //correct it's direction
- try {
- PacketAnalizer::UpdatePacketRoute(packet, CCOS_PACKET_ROUTE_ETH);
- //do the normal dispatch
- ret = ReceivedFromBus(packet);
- }
- catch (...)
- {
- ret = false;
- }
- return ret;
- }
- //1.command comes from client
- //2.dispatch command comes from device
- bool BusThread::ReceivedFromBus(ResDataObject& packet)
- {
- bool ret = true;
- //RES_//mLog::FINFO(packet,"Packet from Bus");
- PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&packet);
- PACKET_TYPE type = PacketAnalizer::GetPacketType(&packet);
- if (PACKET_TYPE_REQ == type)
- {
- switch (cmd) {
- case PACKET_CMD_OPEN:
- case PACKET_CMD_CLOSE:
- ret = HandleOpenClose(packet, cmd);
- break;
- case PACKET_CMD_GET:
- case PACKET_CMD_ADD:
- case PACKET_CMD_DEL:
- case PACKET_CMD_UPDATE://更新对象
- case PACKET_CMD_EXE:
- case PACKET_CMD_DATA:
- case PACKET_CMD_MSG:
- case PACKET_CMD_ONLINE://设备掉线后重新注册到Root
- case PACKET_CMD_PART_UPDATE://只更新对象的子节点
- ret = HandleOthers(packet, cmd);
- break;
- }
- }
- return ret;
- }
- CMD_ECHO DoSendPacket(ResDataObject& packet, bool Local, bool Block, string& TargetbusId)
- {
- //do send
- //block or packet
- if (Block)
- {
- //get context
- ResDataObject resImg;
- ImgDataInfo ImgData;
- string MachineId;
- if (PacketAnalizer::GetDestinationMachineId(packet, MachineId) == false)
- {
- return CMD_ECHO_IGNORE;
- }
- printf("Got Req of PACKET_CMD_DATA!!!---------------------\n");
- if (MachineId != (const char*)getLocalMachineId())
- {
- PacketAnalizer::GetPacketContext(&packet, resImg);
- ImgData.SetResDataObject(resImg);
- printf("Going to send SMPacket!!!---------------------\n");
- //target is not in this machine
- if (BusSendSMPacket(packet, Local, TargetbusId, (ImgData.nShareMemID)) == false)
- {
- //failed to send
- return CMD_ECHO_NOTARGET;
- }
- return CMD_ECHO_OK;
- }
- //same machineId,just send it as normal packet
- }
- if (BusSendPacket(packet, Local, TargetbusId) == false)
- {
- //failed to send
- return CMD_ECHO_NOTARGET;
- }
- return CMD_ECHO_OK;
- }
- bool BusThread::SendCcosPacket(ResDataObject& packet)
- {
- //std::cout << CurrentDateTime << " BusThread::SendCcosPacket Send Packet " << packet.encode() << endl;
- CCOS_PACKET_ROUTE Route;
- PacketAnalizer::GetPacketRoute(&packet, Route);
-
- bool sendtoLocalMachine = false;
- bool sendBlockData = false;
- //packet cmd
- PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&packet);
- if (cmd == PACKET_CMD_DATA)
- {
- sendBlockData = true;
- }
- string DesBusId;
- if (!PacketAnalizer::GetDestinationBusId(packet, DesBusId))
- {
- return false;
- }
- //Precaculate direction
- if (Route == CCOS_PACKET_ROUTE_ANY)
- {
- //choose one
- Route = g_BusManager.GetBusIdDirection(DesBusId);
- }
- //caculate again
- if (Route == CCOS_PACKET_ROUTE_LOCAL)
- {
- //target in this machine
- sendtoLocalMachine = true;
- }
- else if (Route == CCOS_PACKET_ROUTE_ETH)
- {
- //target is not in this machine
- }
- else if (Route == CCOS_PACKET_ROUTE_ANY)
- {
- //if still any direction,then try local and try eth
- //local first
- std::cout << "Send local first...... to " << DesBusId << endl;
- if (DoSendPacket(packet, true, sendBlockData, DesBusId) == CMD_ECHO_OK)
- {
- return CMD_ECHO_OK;
- }
- std::cout << "Send Eth......to " << DesBusId << endl;
- //try eth
- return DoSendPacket(packet, false, sendBlockData, DesBusId);
- }
- else
- {
- //ignore
- return CMD_ECHO_IGNORE;
- }
- //do send
- std::cout << "Send ?????.....to " << DesBusId << " ? local " << sendtoLocalMachine << endl;
- return DoSendPacket(packet, sendtoLocalMachine, sendBlockData, DesBusId);
- }
- static string ProcessKey(const char* pszKey) {
- char szKeys[256];
- strcpy(szKeys, pszKey);
- char* pt = szKeys;
- while (*pt != 0)
- {
- if (*pt == '/' || *pt == '{' || *pt == '}' || *pt == '-')
- *pt = '_';
- pt++;
- }
- return szKeys;
- }
- bool BusThread::HandleOpenClose(ResDataObject& packet, PACKET_CMD cmd)
- {
- // stringstream stream;
- //stream << "LogicClient_" << szClientName << "_" << GetCurrentProcessId();
- //m_strClientName = stream.str();
- if (cmd == PACKET_CMD_OPEN)
- {
- char szClientID[512];
- UINT64 cltid = 0, addr = 0;
- string openKey = PacketAnalizer::GetPacketKey(&packet);
- //if(openKey.length() > 48)
- // openKey.erase(48,openKey.length()-48);
- string findkey = ProcessKey(openKey.c_str());
- PacketAnalizer::GetPacketHandleProcId(&packet, cltid, false);
- PacketAnalizer::GetPacketHandleAddr(&packet, addr, false);
- LogicClient* pClient = g_ClientManager.GetClient(cltid, addr, findkey.c_str());
- sprintf_s(szClientID, "BUSCLT_%d_%d_%d_%s", cltid, addr, GetCurrentProcessId(), findkey.c_str());
- std::cout << CurrentDateTime << "Get Open Request From Ebus Open Key " << openKey << endl;
- std::cout << "Use Client id to connect mqtt " << szClientID << " gened by findkey " << findkey << endl;
- ResDataObject ebusHandle;
- PacketAnalizer::GetPacketHandle(&packet, ebusHandle);
- CcosDevFileHandle* pHandle = new CcosDevFileHandle;
- pHandle->SetResDataObject(ebusHandle);
- if (pClient != nullptr)
- {
- std::cout << "Connect seconed times .Reuse Connection ......." << endl;
-
- ////直接复用连接,考虑过一段时间重新关闭打开
- //std::cout << szClientID << " Had alread opend reuse it" << endl;
- //ResDataObject resRes, resResponse;
- //pClient->GetDeviceResource(&resRes);
- //PacketAnalizer::MakeOpenResponse(packet, resResponse, resRes);
- //PacketAnalizer::UpdateOpenRequest(resResponse, getLocalMachineId(), getLocalEbusId(), (UINT64)GetCurrentProcessId(), (UINT64)pClient);
- ////ResDataObject resTopic;
- ////PacketAnalizer::GetContextTopic(pCmd, resTopic);
- //std::cout << "Transfer packet org " << packet.encode() << endl;
- //std::cout << "Dest packet : " << resResponse.encode() << endl;
- //SendCcosPacket(resResponse);
- //return true;
- pClient->Close();
- }
- else
- {
- std::cout << "Connect First times .crate Connection ......." << endl;
- pClient = new LogicClient(szClientID, ("EBUS"+(string)getRootpath()).c_str());
-
- }
- std::cout << "" << CurrentDateTime() << "EUBS-CLIENT Set LogicClient Ebus handle [" << pClient->GetClientName() << "] Handle " << ebusHandle.encode() << endl;
- pClient->SetEbusFileHandle(pHandle);
- if (pClient->Open(openKey.c_str(), ALL_ACCESS, 1002) == RET_SUCCEED)
- {
- std::cout << "" << CurrentDateTime() << szClientID << " EUBS-CLIENT Open Mqtt Server Succeed.." << endl;
- //把客户端对象存进去
- g_ClientManager.RegistClient((UINT64)pClient);
- g_ClientManager.RegistClient(cltid, addr, findkey.c_str(), pClient);
- //PacketAnalizer::UpdateOpenRequest(packet, getLocalMachineId(), getLocalEbusId(), GetCurrentProcessId(), (UINT64)pClient);
- //CcosDevFileHandle handle;
- //ResDataObject handleRes;
- //PacketAnalizer::GetPacketHandle(&packet, handleRes);
- //handle.SetResDataObject(handleRes);
- ResDataObject resRes, resResponse;
- pClient->GetDeviceResource(&resRes);
- PacketAnalizer::MakeOpenResponse(packet, resResponse, resRes);
- PacketAnalizer::UpdateOpenRequest(resResponse, getLocalMachineId(), getLocalEbusId(), (UINT64)GetCurrentProcessId(), (UINT64)pClient);
- PacketAnalizer::UpdateHandleId(&resResponse,(UINT64)pClient); //把LogicClient 的对象指针作为HnaleID 传回去
- //ResDataObject resTopic;
- //PacketAnalizer::GetContextTopic(pCmd, resTopic);
- std::cout << "" << CurrentDateTime() << " Open Success .. Transfer packet org " << endl;//<< packet.encode()
- //std::cout << "Dest packet : " << resResponse.encode() << endl;
- SendCcosPacket(resResponse);
-
- std::cout << "###################### " << pClient->GetClientName() << " Make Use Point as HandleID " << (UINT64)pClient << endl;
- pClient->SetPacketArrivedCallback([this, pClient](ResDataObject* res, const char* topic, void* conn_void) {
- ccos_mqtt_connection* conn = (ccos_mqtt_connection*)conn_void;
- //std::cout << "Send back to Ebus packet " << res->encode() << endl;
- UINT64 addr, proc;
- addr = proc = 0;
- PacketAnalizer::GetPacketHandleAddr(res, addr, false);
- PacketAnalizer::GetPacketHandleProcId(res, proc, false);
- //g_ClientManager.GetClient((UINT32)proc, (UINT32)addr, )
- if (addr == 0 && proc == 0)
- {
- //这里是Notify
- //ResDataObject resEbusHandle;
- CcosDevFileHandle* pHandle = pClient->GetEbusFileHanle();
- //pHandle->GetResDataObject(resEbusHandle);
- PacketAnalizer::UpdateNotifyHandle(*res, *pHandle);
- PacketAnalizer::UpdateOpenRequest(*res, getLocalMachineId(), getLocalEbusId(), (UINT64)GetCurrentProcessId(), (UINT64)pClient);
- PacketAnalizer::UpdateHandleId(res, (UINT64)pClient); //把LogicClient 的对象指针作为HnaleID 传回去
- std::cout << "###################### " << pClient->GetClientName() << " Used Point as HandleID" << (UINT64)pClient << endl;
- }
- SendCcosPacket(*res);
- });
- }
- else {
- std::cout << "" << CurrentDateTime() << szClientID << " EUBS-CLIENT Open Mqtt Server Failed.." << endl;
- return false;
- }
- }
- else if (cmd == PACKET_CMD_CLOSE)
- {
- UINT64 conn;
- UINT64 cltid = 0, addr = 0;
- PacketAnalizer::GetPacketHandleAddr(&packet, conn, true);
- PacketAnalizer::GetPacketHandleProcId(&packet, cltid, false);
- PacketAnalizer::GetPacketHandleAddr(&packet, addr, false);
- //要判断下指针是否我分配过
- if (g_ClientManager.FindClient(conn))
- {
- g_ClientManager.UnRegistClient(conn);
- LogicClient* pClient = (LogicClient*)conn;
- delete pClient;
- }
- //CloseConnection((ccos_mqtt_connection*)conn);
- }
- return true;
- }
- bool BusThread::HandleOthers(ResDataObject& packet, PACKET_CMD cmd)
- {
- std::cout << "BusThread::HandleOthers " << cmd << endl;
- UINT64 conn = 0;
- //PacketAnalizer::GetPacketHandleAddr(&packet, conn, true);
- PacketAnalizer::GetHandleId(&packet, conn);
- string key;
- ResDataObject resParam, resHandle;
- //要判断下指针是否我分配过
- if (g_ClientManager.FindClient(conn))
- {
- LogicClient* pClient = (LogicClient*)conn;
- switch (cmd) {
- case PACKET_CMD_GET:
- break;
- case PACKET_CMD_ADD:
- case PACKET_CMD_DEL:
- break;
- case PACKET_CMD_UPDATE://更新对象
- break;
- case PACKET_CMD_EXE:
- //std::cout << pClient->get
- //todo 将Owner透传过去
- pClient->Action_Trans(&packet);
- break;
- case PACKET_CMD_DATA:
- case PACKET_CMD_MSG:
- break;
- case PACKET_CMD_ONLINE://设备掉线后重新注册到Root
- case PACKET_CMD_PART_UPDATE://只更新对象的子节点
- break;
- }
- }
- return false;
- }
- //work
- bool BusThread::Exec()
- {
- while (WaitForInQue())
- {
- bool status = true;
- //pump In&Out que
- while (status)
- {
- ResDataObject Packet;
- status = false;
- //check req first
- if (PopReqDataObject(Packet))
- {
- status = true;
- //send it to cdi
- PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&Packet);
- PACKET_TYPE type = PacketAnalizer::GetPacketType(& Packet);
- string key = PacketAnalizer::GetPacketKey(&Packet);
- std::cout << "------------ From Ebus type: [" << type << "] cmd : [" << cmd << "] key [" << key << "]" << endl;
- //CDInterface *p = CDInterface::GetCDI();
- if (m_bLocal)
- {
- ReceivedFromLocalBus(Packet);
- }
- else
- {
- ReceivedFromEthBus(Packet);
- }
- //p->ReceivedFromBus(Packet);
- }
- }
- }
- return true;
- }
- bool BusThread::OnStartThread(void)
- {
- printf("========== BusThread::OnStartThread 0X%08X \n", GetCurrentThreadId());
- #ifndef TEMP_COMP
- //mLog::FINFO("Bus Thread Begin Entry.....");
- #endif
- return true;
- }
- bool BusThread::OnEndThread(void)
- {
- printf("========== BusThread::OnEndThread 0X%08X \n", GetCurrentThreadId());
- #ifndef TEMP_COMP
- Logger *pLog = (Logger*)m_pThreadLogger;
- //mLog::FINFO( "Bus Thread End Exit.....");
- #endif
- return true;
- }
- */
|