#include "CcosThread.h" #include "MsgQueue.h" #include #include #include #include #include // Thread_Base 实现 Thread_Base::Thread_Base(void) : m_Base_Thread(0), m_ThreadID(0), m_pThreadLogger(0), m_ThreadRunning(false), m_ExitRequested(false) { std::cout << "LinuxEvent::CreateEvent : m_ExitFlag m_WorkFlag m_RunFlag" << std::endl; m_ExitFlag = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET, false); m_WorkFlag = LinuxEvent::CreateEvent(LinuxEvent::AUTO_RESET, false); m_RunFlag = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET, false); } Thread_Base::~Thread_Base(void) { StopThread(); } void Thread_Base::SetLogger(PVOID pLoger) { m_pThreadLogger = pLoger; } PVOID Thread_Base::GetLogger() { return m_pThreadLogger; } void* Thread_Base::Thread_Base_Thread(void* pPara) { Thread_Base* handle = (Thread_Base*)pPara; usleep(30000); // Sleep(30) 替代 if (!handle->RegistThread()) { goto endwork_entry; } if (!handle->OnStartThread()) { goto endwork_entry; } std::cout << "handle->SetThreadOnTheRun(true)" << std::endl; handle->SetThreadOnTheRun(true); while (true) { if (!handle->Exec()) { break; } int waitevent = handle->WaitTheIncommingEvent(0); if (waitevent == 0) { break; } } endwork_entry: std::cout << "handle->ThreadExitProcedure()" << std::endl; handle->ThreadExitProcedure(); return NULL; } bool Thread_Base::StartThread(bool Sync, bool Inherit) { if (m_strName.empty()) { std::cout << "Thread name is empty" << std::endl; } // 检查线程是否已经在运行 if (m_ThreadRunning) { // 使用 tryjoin 检查线程状态 int tryJoin = pthread_tryjoin_np(m_Base_Thread, NULL); if (tryJoin == EBUSY) { std::cout << "StartThread Failed.it's [" << m_strName << "] in running state" << std::endl; return true; } // 线程已结束,清理资源 std::cout << "Thread [" << m_strName << "] has ended, cleaning up resources" << std::endl; pthread_join(m_Base_Thread, NULL); m_ThreadRunning = false; // 重置标志位 std::cout << "Thread [" << m_strName << "] resources cleaned up" << std::endl; } if (m_ExitFlag == nullptr) { std::cerr << getLogPrefix() << "Thread_Base::StartThread: m_ExitFlag is NULL! Thread ID: " << std::this_thread::get_id() << std::endl; } else { try { std::cout << "Resetting exit flag for thread [" << m_strName << "]" << std::endl; m_ExitFlag->ResetEvent(); } catch (...) { std::cerr << getLogPrefix() << "Thread_Base::StartThread: ResetEvent failed for Thread ID: " << std::this_thread::get_id() << std::endl; } } // 准备线程属性 std::cout << "Preparing thread attributes for [" << m_strName << "]" << std::endl; pthread_attr_t attr; pthread_attr_init(&attr); // 设置继承属性(简化实现,Linux 中继承性不如 Windows 重要) if (Inherit) { // 设置可继承的调度策略 std::cout << "Setting thread [" << m_strName << "] to inherit scheduling" << std::endl; pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED); } else { std::cout << "Setting thread [" << m_strName << "] to explicit scheduling" << std::endl; pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED); } // 创建线程 std::cout << "Attempting to create thread [" << m_strName << "]" << std::endl; int result = pthread_create(&m_Base_Thread, &attr, Thread_Base_Thread, this); pthread_attr_destroy(&attr); // 如果创建失败,重试一次 if (result != 0) { std::cerr << "Failed to create thread [" << m_strName << "]. Error code: " << result << ". Retrying..." << std::endl; // 等待100ms后重试 usleep(100 * 1000); // 100ms pthread_attr_t retryAttr; pthread_attr_init(&retryAttr); if (Inherit) { pthread_attr_setinheritsched(&retryAttr, PTHREAD_INHERIT_SCHED); } result = pthread_create(&m_Base_Thread, &retryAttr, Thread_Base_Thread, this); pthread_attr_destroy(&retryAttr); } // 检查最终结果 if (result != 0) { std::cerr << "Failed to create thread [" << m_strName << "] after retry. Error code: " << result << std::endl; // 设置退出标志 m_ExitFlag->SetEvent(); m_ThreadRunning = false; return false; } // 线程创建成功,设置运行标志 m_ThreadRunning = true; std::cout << "Thread [" << m_strName << "] created successfully" << std::endl; // 获取Linux线程ID m_ThreadID = (unsigned long)m_Base_Thread; std::cout << "Thread [" << m_strName << "] ID: " << m_ThreadID << std::endl; // 同步等待线程启动 if (Sync) { std::cout << "Waiting for thread [" << m_strName << "] to start (timeout: 300000ms)" << std::endl; return WaitTheThreadOnTheRun(300000); // 300秒超时 } return true; } bool Thread_Base::StopThread(unsigned long timeperiod) { bool ret = true; // 设置退出标志 - 通过信号量通知 m_ExitFlag->SetEvent(); // 检查是否在同一个线程内调用 if (pthread_equal(pthread_self(), m_Base_Thread)) { return true; } if (m_Base_Thread) { struct timespec ts; // 计算绝对超时时间 if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { perror("clock_gettime"); return false; } // 将毫秒转换为秒和纳秒 ts.tv_sec += timeperiod / 1000; ts.tv_nsec += (timeperiod % 1000) * 1000000; // 处理纳秒溢出 if (ts.tv_nsec >= 1000000000) { ts.tv_sec += 1; ts.tv_nsec -= 1000000000; } // 使用 timedjoin 等待线程退出 int joinResult = pthread_timedjoin_np(m_Base_Thread, NULL, &ts); if (joinResult == ETIMEDOUT) { std::cerr << "Warning!!!!!!\nWarning!!!!!!\nWarning!!!!!!\n" << "StopThread timed out for thread: " << m_strName << std::endl; // 作为最后手段,取消线程 pthread_cancel(m_Base_Thread); // 尝试再次等待较短时间 struct timespec short_ts; clock_gettime(CLOCK_REALTIME, &short_ts); short_ts.tv_sec += 1; // 额外等待1秒 if (pthread_timedjoin_np(m_Base_Thread, NULL, &short_ts) == 0) { ret = false; } else { // 线程仍未退出,分离它 pthread_detach(m_Base_Thread); std::cerr << "Failed to stop thread: " << m_strName << ". Thread detached." << std::endl; ret = false; } // 执行线程退出清理 ThreadExitProcedure(); } else if (joinResult != 0) { ret = false; } else { // 线程正常退出 ret = true; } m_Base_Thread = 0; } m_ThreadID = 0; return ret; } void Thread_Base::NotifyExit() { // 发送退出信号 m_ExitFlag->SetEvent(); } int Thread_Base::WaitTheIncommingEvent(unsigned long waittime) // 修改为 unsigned long 以匹配毫秒超时 { std::vector> waitEvents = { m_ExitFlag, m_WorkFlag }; if (m_Base_Thread) { // 等待两个事件中的一个发生 int wait = LinuxEvent::WaitForMultipleEvents(waitEvents, waittime); if (wait == 0) { return 0; // ExitFlag触发 } else if (wait == 1) { return 1; // WorkFlag触发 } else { // 超时或错误 return -1; } } // 无线程对象,默认认为退出 return 0; } bool Thread_Base::WaitTheThreadEnd(unsigned long waittime) { if (m_Base_Thread == 0) { return true; // 没有线程存在,直接返回成功 } // 计算绝对超时时间 struct timespec ts; if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { perror("clock_gettime"); return false; } ts.tv_sec += waittime / 1000; ts.tv_nsec += (waittime % 1000) * 1000000; if (ts.tv_nsec >= 1000000000) { ts.tv_sec += 1; ts.tv_nsec -= 1000000000; } // 使用带超时的线程等待 int result = pthread_timedjoin_np(m_Base_Thread, NULL, &ts); if (result == 0) { return true; // 线程正常结束 } else if (result == ETIMEDOUT) { return false; // 超时 } else { return false; } } bool Thread_Base::WaitTheThreadEndSign(unsigned long waittime) { if (m_Base_Thread) { if (m_ExitFlag->Wait(waittime)) { return true; } return false; } return true; } bool Thread_Base::SetThreadOnTheRun(bool OnTheRun) { if (OnTheRun) { m_RunFlag->SetEvent(); } else { m_RunFlag->ResetEvent(); } return true; } void Thread_Base::NotifyThreadWork() { m_WorkFlag->SetEvent(); // 发送退出信号 } bool Thread_Base::WaitTheThreadOnTheRun(unsigned long waittime) { std::cout << "Enter WaitTheThreadOnTheRun for [" << m_strName << "]" << std::endl; if (m_Base_Thread == 0) { std::cout << "WaitTheThreadOnTheRun Failed. no thread exist?? [" << m_strName << "]" << std::endl; return false; } // 检查线程是否已结束 int tryJoin = pthread_tryjoin_np(m_Base_Thread, NULL); if (tryJoin == 0) { std::cout << "WaitTheThreadOnTheRun Thread already Exit [" << m_strName << "]" << std::endl; return false; } unsigned long startTick = GetTickCount(); // 等待运行信号 bool result = m_RunFlag->Wait(waittime); unsigned long elapsed = GetTickCount() - startTick; if (result) { std::cout << " WaitTheThreadOnTheRun Wait for Run ok [" << m_strName.c_str() << "] use time [" << elapsed << "ms]" << endl; return true; } std::cout << " WaitTheThreadOnTheRun Wait for Run timeout [" << m_strName.c_str() << "] use time [" << elapsed << "ms]" << endl; return false; } void Thread_Base::SetName(const char* pszThreadName) { m_strName = pszThreadName; } pthread_t Thread_Base::GetTID() { return m_Base_Thread; } bool Thread_Base::RegistThread() { return true; } bool Thread_Base::UnRegistThread() { return true; } bool Thread_Base::OnStartThread() { return true; } bool Thread_Base::OnEndThread() { return true; } std::shared_ptr Thread_Base::GetWorkEvt() { return m_WorkFlag; } std::shared_ptr Thread_Base::GetExitEvt() { return m_ExitFlag; } bool Thread_Base::Exec() { return false; } void Thread_Base::ThreadExitProcedure() { m_ThreadRunning = false; if (m_ExitFlag == nullptr) { std::cerr << getLogPrefix() << "Thread_Base::ThreadExitProcedure: m_pEvent is NULL! Thread ID: " << std::this_thread::get_id() << std::endl; } else { try { m_ExitFlag->SetEvent(); // 原调用点 } catch (...) { std::cerr << getLogPrefix() << "Thread_Base::ThreadExitProcedure: SetEvent failed for Thread ID: " << std::this_thread::get_id() << std::endl; } } OnEndThread(); UnRegistThread(); SetThreadOnTheRun(false); } // Work_Thread 实现 Work_Thread::Work_Thread() { m_PauseEvt = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET, false); m_ResumeEvt = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET, false); m_SleepingEvt = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET, false); m_WorkingEvt = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET, false); MsgQueue* p; p = new MsgQueue; m_pWorkQueReq = (void *)p; p = new MsgQueue; m_pWorkQueRes = (void*)p; } Work_Thread::~Work_Thread() { MsgQueue* p; p = (MsgQueue *)m_pWorkQueReq; delete p; m_pWorkQueReq = NULL; p = (MsgQueue *)m_pWorkQueRes; delete p; m_pWorkQueRes = NULL; } bool Work_Thread::PopReqDataObject(ResDataObject& obj) { return (bool)((MsgQueue *)m_pWorkQueReq)->DeQueue(obj); } bool Work_Thread::PushReqDataObject(ResDataObject& obj) { if (WaitTheThreadEndSign(0)) { return false;//not possible to send it } //printf("Thread:%d,push REQ one\n", GetCurrentThreadId()); ((MsgQueue *)m_pWorkQueReq)->InQueue(obj); NotifyThreadWork(); return true; } bool Work_Thread::PopResDataObject(ResDataObject& obj) { //printf("Thread:%d,Pop Res one\n", GetCurrentThreadId()); return (bool)((MsgQueue *)m_pWorkQueRes)->DeQueue(obj); } bool Work_Thread::PushResDataObject(ResDataObject& obj) { if (WaitTheThreadEndSign(0)) { return false;//not possible to send it } //printf("Thread:%d,push Res one\n",GetCurrentThreadId()); ((MsgQueue *)m_pWorkQueRes)->InQueue(obj); NotifyThreadWork(); return true; } bool Work_Thread::WaitForInQue(unsigned long m_Timeout) { std::vector> waitEvents = { m_ExitFlag, m_PauseEvt, m_WorkFlag , 0, 0 }; waitEvents[3] = ((MsgQueue *)m_pWorkQueReq)->GetNotifyHandle(); waitEvents[4] = ((MsgQueue *)m_pWorkQueRes)->GetNotifyHandle(); DWORD ret = LinuxEvent::WaitForMultipleEvents(waitEvents,0); if ((ret >= WAIT_OBJECT_0) && (ret <= WAIT_OBJECT_0 + 1)) { return false; } if (((MsgQueue *)m_pWorkQueReq)->WaitForInQue(0) == WAIT_OBJECT_0) { return true; } if (((MsgQueue *)m_pWorkQueRes)->WaitForInQue(0) == WAIT_OBJECT_0) { return true; } ret = LinuxEvent::WaitForMultipleEvents(waitEvents,m_Timeout); if ((ret >= WAIT_OBJECT_0 + 2) && (ret < (WAIT_OBJECT_0 + 5))) { return true; } return false; } bool Work_Thread::CheckForPause() { DWORD retEvt = 0; bool ret = false; std::vector> waitExp = { m_ExitFlag, m_PauseEvt }; retEvt = LinuxEvent::WaitForMultipleEvents(waitExp, 0); if (retEvt == WAIT_OBJECT_0 + 1) { m_WorkingEvt->ResetEvent(); m_SleepingEvt->SetEvent(); ret = true; } return ret; } bool Work_Thread::WaitforResume() { bool Exret = false; std::vector> wait = { m_ExitFlag, m_ResumeEvt }; DWORD ret = 0; ret = LinuxEvent::WaitForMultipleEvents(wait, -1); if (ret == WAIT_OBJECT_0 + 1) { //拿到Resume m_SleepingEvt->ResetEvent(); m_WorkingEvt->SetEvent(); Exret = true; } return Exret; } bool Work_Thread::PauseThread(unsigned long timeout) { bool ret = false; DWORD retEvt = 0; std::vector> wait = { m_ExitFlag, m_SleepingEvt }; if (pthread_self() == GetTID()) { return false; } if (!WaitTheThreadOnTheRun(0)) { return false; } m_PauseEvt->SetEvent(); //get response retEvt = LinuxEvent::WaitForMultipleEvents(wait, timeout); if (retEvt == WAIT_OBJECT_0 + 1) { ret = true; } m_PauseEvt->ResetEvent(); return ret; } bool Work_Thread::ResumeThread(unsigned long timeout) { bool ret = false; DWORD retEvt = 0; std::vector> wait = { m_ExitFlag, m_SleepingEvt }; if (pthread_self() == GetTID()) { return false; } if (!WaitTheThreadOnTheRun(0)) { return false; } m_PauseEvt->SetEvent(); //get response retEvt = LinuxEvent::WaitForMultipleEvents(wait, timeout); if (retEvt == WAIT_OBJECT_0 + 1) { ret = true; } m_PauseEvt->ResetEvent(); return ret; } // Ccos_Thread 实现 Ccos_Thread::Ccos_Thread() {} Ccos_Thread::~Ccos_Thread() {}