#include "CcosThread.h" #include "MsgQueue.h" #include #include #include #include #include #include #include // 线程安全日志锁 static std::mutex g_logMutex; #define LOG(...) { std::lock_guard lock(g_logMutex); std::cout << __VA_ARGS__ << std::endl; } #define ERR_LOG(...) { std::lock_guard lock(g_logMutex); std::cerr << __VA_ARGS__ << std::endl; } // Thread_Base 实现 Thread_Base::Thread_Base(void) : m_Base_Thread(0), m_ThreadID(0), m_ThreadRunning(false), m_ExitRequested(false), m_strName(""), m_pThreadLogger(nullptr) { LOG("Creating exit/work/run events"); m_ExitFlag = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET, false); m_WorkFlag = LinuxEvent::CreateEvent(LinuxEvent::AUTO_RESET, false); m_RunFlag = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET, false); if (!m_ExitFlag || !m_WorkFlag || !m_RunFlag) { throw std::runtime_error("Failed to create event objects"); } // 初始化条件变量 if (pthread_cond_init(&m_StateCond, nullptr) != 0) { throw std::runtime_error("Failed to initialize condition variable"); } } Thread_Base::~Thread_Base(void) { StopThread(); pthread_cond_destroy(&m_StateCond); } void Thread_Base::SetLogger(PVOID pLoger) { m_pThreadLogger = pLoger; } PVOID Thread_Base::GetLogger() { return m_pThreadLogger; } void* Thread_Base::Thread_Base_Thread(void* pPara) { try { Thread_Base* handle = static_cast(pPara); if (!handle) { ERR_LOG("Invalid thread parameter"); return nullptr; } usleep(30000); // Sleep(30) 替代 if (!handle->RegistThread()) { goto endwork_entry; } if (!handle->OnStartThread()) { goto endwork_entry; } LOG("Thread " << handle->m_strName << " starting"); handle->SetThreadOnTheRun(true); while (true) { if (!handle->Exec()) { break; } int waitevent = handle->WaitTheIncommingEvent(0); if (waitevent == 0) { break; } } endwork_entry: LOG("Thread " << handle->m_strName << " exiting"); handle->ThreadExitProcedure(); return nullptr; } catch (const std::exception& e) { ERR_LOG("[Thread] Exception: " << e.what()); } catch (...) { ERR_LOG("[Thread] Unknown exception"); } return nullptr; } bool Thread_Base::StartThread(bool Sync, bool Inherit) { if (m_strName.empty()) { LOG("Warning: Thread name is empty"); } // 检查线程是否已运行 if (m_ThreadRunning) { #ifdef __GNU_LIBRARY__ // 仅GNU环境支持非标准函数 int tryJoin = pthread_tryjoin_np(m_Base_Thread, nullptr); if (tryJoin == EBUSY) { LOG("Thread " << m_strName << " is already running"); return true; } #endif // 线程已结束,清理资源 LOG("Thread " << m_strName << " has ended, cleaning up"); pthread_join(m_Base_Thread, nullptr); m_ThreadRunning = false; } m_ExitFlag->ResetEvent(); // 初始化线程属性 pthread_attr_t attr; if (pthread_attr_init(&attr) != 0) { ERR_LOG("Failed to initialize thread attributes"); return false; } // 设置栈大小 size_t stack_size = 2 * 1024 * 1024; // 2MB if (pthread_attr_setstacksize(&attr, stack_size) != 0) { ERR_LOG("Failed to set thread stack size"); pthread_attr_destroy(&attr); return false; } // 配置调度继承 if (Inherit) { pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED); } else { pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED); } // 创建线程 LOG("Creating thread " << m_strName); int result = pthread_create(&m_Base_Thread, &attr, Thread_Base_Thread, this); pthread_attr_destroy(&attr); if (result != 0) { ERR_LOG("Failed to create thread " << m_strName << ", error: " << result); m_ExitFlag->SetEvent(); return false; } m_ThreadRunning = true; m_ThreadID = static_cast(m_Base_Thread); LOG("Thread " << m_strName << " created, ID: " << m_ThreadID); // 同步等待线程启动 if (Sync) { LOG("Waiting for thread " << m_strName << " to start"); return WaitTheThreadOnTheRun(300000); // 300秒超时 } return true; } bool Thread_Base::StopThread(unsigned long timeperiod) { bool ret = true; // 设置退出标志 - 通过信号量通知 m_ExitFlag->SetEvent(); // 检查是否在同一个线程内调用 if (pthread_equal(pthread_self(), m_Base_Thread)) { return true; } if (m_Base_Thread) { struct timespec ts; // 计算绝对超时时间 if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { perror("clock_gettime"); return false; } // 将毫秒转换为秒和纳秒 ts.tv_sec += timeperiod / 1000; ts.tv_nsec += (timeperiod % 1000) * 1000000; // 处理纳秒溢出 if (ts.tv_nsec >= 1000000000) { ts.tv_sec += 1; ts.tv_nsec -= 1000000000; } #ifdef __GNU_LIBRARY__ int joinResult = pthread_timedjoin_np(m_Base_Thread, nullptr, &ts); #else // 标准环境下使用循环等待 int joinResult = ETIMEDOUT; const int interval = 100; // 100ms轮询 for (unsigned long elapsed = 0; elapsed < timeperiod; elapsed += interval) { int tryJoin = pthread_tryjoin_np(m_Base_Thread, nullptr); if (tryJoin == 0) { joinResult = 0; break; } usleep(interval * 1000); } #endif if (joinResult == ETIMEDOUT) { ERR_LOG("Timeout stopping thread " << m_strName); // 尝试取消线程 pthread_cancel(m_Base_Thread); // 尝试再次等待较短时间 clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += 5; // 额外等待5秒 ts.tv_nsec = 0; if (pthread_timedjoin_np(m_Base_Thread, NULL, &ts) == 0) { ERR_LOG("Failed to stop thread " << m_strName << ", detaching"); pthread_detach(m_Base_Thread); ret = false; } // 执行线程退出清理 ThreadExitProcedure(); } else if (joinResult != 0) { ERR_LOG("Failed to join thread " << m_strName << ", error: " << joinResult); ret = false; } m_Base_Thread = 0; } m_ThreadID = 0; m_ThreadRunning = false; return ret; } void Thread_Base::NotifyExit() { // 发送退出信号 m_ExitFlag->SetEvent(); } int Thread_Base::WaitTheIncommingEvent(unsigned long waittime) // 修改为 unsigned long 以匹配毫秒超时 { std::vector> waitEvents = { m_ExitFlag, m_WorkFlag }; if (m_Base_Thread) { // 等待两个事件中的一个发生 int wait = LinuxEvent::WaitForMultipleEvents(waitEvents, waittime); if (wait == 0) { return 0; // ExitFlag触发 } else if (wait == 1) { return 1; // WorkFlag触发 } else { // 超时或错误 return -1; } } // 无线程对象,默认认为退出 return 0; } bool Thread_Base::WaitTheThreadEnd(unsigned long waittime) { if (m_Base_Thread == 0) { return true; // 没有线程存在,直接返回成功 } // 计算绝对超时时间 struct timespec ts; if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { perror("clock_gettime"); return false; } ts.tv_sec += waittime / 1000; ts.tv_nsec += (waittime % 1000) * 1000000; if (ts.tv_nsec >= 1000000000) { ts.tv_sec += 1; ts.tv_nsec -= 1000000000; } #ifdef __GNU_LIBRARY__ int result = pthread_timedjoin_np(m_Base_Thread, nullptr, &ts); #else int result = ETIMEDOUT; const int interval = 100; for (unsigned long elapsed = 0; elapsed < waittime; elapsed += interval) { if (pthread_tryjoin_np(m_Base_Thread, nullptr) == 0) { result = 0; break; } usleep(interval * 1000); } #endif return result == 0; } bool Thread_Base::WaitTheThreadEndSign(unsigned long waittime) { return m_Base_Thread ? m_ExitFlag->Wait(waittime) : true; } bool Thread_Base::SetThreadOnTheRun(bool OnTheRun) { if (OnTheRun) { m_RunFlag->SetEvent(); } else { m_RunFlag->ResetEvent(); } return true; } void Thread_Base::NotifyThreadWork() { m_WorkFlag->SetEvent(); // 发送退出信号 } bool Thread_Base::WaitTheThreadOnTheRun(unsigned long waittime) { LOG("Waiting for thread " << m_strName << " to run"); if (m_Base_Thread == 0) { ERR_LOG("Thread " << m_strName << " does not exist"); return false; } return m_RunFlag->Wait(waittime); } // 虚函数默认实现 bool Thread_Base::Exec() { return true; } bool Thread_Base::OnStartThread() { return true; } bool Thread_Base::OnEndThread() { return true; } bool Thread_Base::RegistThread() { return true; } bool Thread_Base::UnRegistThread() { return true; } pthread_t Thread_Base::GetTID() { return m_Base_Thread; } void Thread_Base::SetName(const char* pszThreadName) { if (pszThreadName) m_strName = pszThreadName; } std::shared_ptr Thread_Base::GetWorkEvt() { return m_WorkFlag; } std::shared_ptr Thread_Base::GetExitEvt() { return m_ExitFlag; } void Thread_Base::ThreadExitProcedure() { m_ThreadRunning = false; m_ExitFlag->SetEvent(); OnEndThread(); UnRegistThread(); SetThreadOnTheRun(false); } // Work_Thread 实现 Work_Thread::Work_Thread() { m_PauseEvt = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET, false); m_ResumeEvt = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET, false); m_SleepingEvt = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET, false); m_WorkingEvt = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET, false); if (!m_PauseEvt || !m_SleepingEvt || !m_ResumeEvt || !m_WorkingEvt) { throw std::runtime_error("Work_Thread event creation failed"); } MsgQueue* p; p = new MsgQueue; m_pWorkQueReq = (void *)p; p = new MsgQueue; m_pWorkQueRes = (void*)p; } Work_Thread::~Work_Thread() { MsgQueue* p; p = (MsgQueue *)m_pWorkQueReq; delete p; m_pWorkQueReq = NULL; p = (MsgQueue *)m_pWorkQueRes; delete p; m_pWorkQueRes = NULL; } bool Work_Thread::PopReqDataObject(ResDataObject& obj) { return (bool)((MsgQueue *)m_pWorkQueReq)->DeQueue(obj); } bool Work_Thread::PushReqDataObject(ResDataObject& obj) { if (WaitTheThreadEndSign(0)) { return false;//not possible to send it } //printf("Thread:%d,push REQ one\n", GetCurrentThreadId()); ((MsgQueue *)m_pWorkQueReq)->InQueue(obj); NotifyThreadWork(); return true; } bool Work_Thread::PopResDataObject(ResDataObject& obj) { //printf("Thread:%d,Pop Res one\n", GetCurrentThreadId()); return (bool)((MsgQueue *)m_pWorkQueRes)->DeQueue(obj); } bool Work_Thread::PushResDataObject(ResDataObject& obj) { if (WaitTheThreadEndSign(0)) { return false;//not possible to send it } //printf("Thread:%d,push Res one\n",GetCurrentThreadId()); ((MsgQueue *)m_pWorkQueRes)->InQueue(obj); NotifyThreadWork(); return true; } bool Work_Thread::WaitForInQue(unsigned long m_Timeout) { std::vector> waitEvents = { m_ExitFlag, m_PauseEvt, m_WorkFlag , 0, 0 }; waitEvents[3] = ((MsgQueue *)m_pWorkQueReq)->GetNotifyHandle(); waitEvents[4] = ((MsgQueue *)m_pWorkQueRes)->GetNotifyHandle(); int ret = LinuxEvent::WaitForMultipleEvents(waitEvents,0); if ((ret >= WAIT_OBJECT_0) && (ret <= WAIT_OBJECT_0 + 1)) { return false; } if (((MsgQueue *)m_pWorkQueReq)->WaitForInQue(0) == WAIT_OBJECT_0) { return true; } if (((MsgQueue *)m_pWorkQueRes)->WaitForInQue(0) == WAIT_OBJECT_0) { return true; } ret = LinuxEvent::WaitForMultipleEvents(waitEvents,m_Timeout); if ((ret >= WAIT_OBJECT_0 + 2) && (ret < (WAIT_OBJECT_0 + 5))) { return true; } return false; } bool Work_Thread::CheckForPause() { DWORD retEvt = 0; bool ret = false; std::vector> waitExp = { m_ExitFlag, m_PauseEvt }; retEvt = LinuxEvent::WaitForMultipleEvents(waitExp, 0); if (retEvt == WAIT_OBJECT_0 + 1) { m_WorkingEvt->ResetEvent(); m_SleepingEvt->SetEvent(); ret = true; } return ret; } bool Work_Thread::WaitforResume() { bool Exret = false; std::vector> wait = { m_ExitFlag, m_ResumeEvt }; DWORD ret = 0; ret = LinuxEvent::WaitForMultipleEvents(wait, -1); if (ret == WAIT_OBJECT_0 + 1) { //拿到Resume m_SleepingEvt->ResetEvent(); m_WorkingEvt->SetEvent(); Exret = true; } return Exret; } bool Work_Thread::PauseThread(unsigned long timeout) { bool ret = false; DWORD retEvt = 0; std::vector> wait = { m_ExitFlag, m_SleepingEvt }; if (pthread_self() == GetTID()) { return false; } if (!WaitTheThreadOnTheRun(0)) { return false; } m_PauseEvt->SetEvent(); //get response retEvt = LinuxEvent::WaitForMultipleEvents(wait, timeout); if (retEvt == WAIT_OBJECT_0 + 1) { ret = true; } m_PauseEvt->ResetEvent(); return ret; } bool Work_Thread::ResumeThread(unsigned long timeout) { bool ret = false; DWORD retEvt = 0; std::vector> wait = { m_ExitFlag, m_WorkingEvt }; if (pthread_self() == GetTID()) { return false; } if (!WaitTheThreadOnTheRun(0)) { return false; } m_ResumeEvt->SetEvent(); //get response retEvt = LinuxEvent::WaitForMultipleEvents(wait, timeout); if (retEvt == WAIT_OBJECT_0 + 1) { ret = true; } m_ResumeEvt->ResetEvent(); return ret; } // Ccos_Thread 实现 Ccos_Thread::Ccos_Thread() {} Ccos_Thread::~Ccos_Thread() {}