#include "StdAfx.h" #include "MsgQueue.h" #include //#include "CDInterface.h" #include "DiosThread.h" //#include "stacktrace.hpp" using namespace std; DWORD WINAPI Thread_Base::Thread_Base_Thread(LPVOID pPara) { INT ret = 0; int waitevent = 0; Thread_Base *handle = (Thread_Base *)pPara; //prev work Sleep(30); if (handle->RegistThread() == false) { goto endwork_entry; } if (handle->OnStartThread() == false) { goto endwork_entry; } handle->SetThreadOnTheRun(true);//设置线程已经开始工作了 while (true) { //do work if (handle->Exec() == false) { break; } //check exit flag waitevent = handle->WaitTheIncommingEvent(0); if (waitevent == 0) { break; } } endwork_entry: //end work //SetEvent(handle->m_ExitFlag); //handle->OnEndThread(); //handle->UnRegistThread(); //handle->SetThreadOnTheRun(false);//设置线程已经结束工作了 handle->ThreadExitProcedure(); return 0; } Thread_Base::Thread_Base(void) { m_Base_Thread = 0; m_ThreadID = 0; m_pThreadLogger = 0; m_ExitFlag = CreateEvent(0, true, false, 0); m_WorkFlag = CreateEvent(0, false, false, 0); m_RunFlag = CreateEvent(0, true, false, 0); } Thread_Base::~Thread_Base(void) { StopThread(); CloseHandle(m_ExitFlag); CloseHandle(m_WorkFlag); CloseHandle(m_RunFlag); m_ThreadID = 0; m_Base_Thread = 0; m_pThreadLogger = 0; } bool Thread_Base::RegistThread() { return true; //return CDInterface::GetCDI()->RegistThread(this); } bool Thread_Base::UnRegistThread() { return true; //CDInterface::GetCDI()->UnRegistThread(m_ThreadID); //return true; } bool Thread_Base::OnStartThread() { return true; } bool Thread_Base::OnEndThread() { return true; } bool Thread_Base::Exec() { return false; } bool Thread_Base::StartThread(bool Sync,bool Inherit) { DWORD wait; if (m_strName.length() <= 0) { std::cout << "" << endl; } //std::cout << "========== Thread_Base::StartThread [" << m_strName.c_str() << "] by TID [" << GetCurrentThreadId() << "]" << endl; if (m_Base_Thread) { wait = WaitForSingleObject(m_Base_Thread, 0); if (wait == WAIT_TIMEOUT) { //already running //GPRINTA_ERROR("StartThread Failed. it's in running state"); std::cout << "StartThread Failed.it's [" << m_strName.c_str() << "] in running state" << endl; return true; } CloseHandle(m_Base_Thread); } ResetEvent(m_ExitFlag); if (Inherit) { SECURITY_ATTRIBUTES sa = { 0 }; sa.bInheritHandle = TRUE; sa.nLength = sizeof(SECURITY_ATTRIBUTES); m_Base_Thread = CreateThread(&sa, 0, Thread_Base_Thread, this, 0, &m_ThreadID); } else { m_Base_Thread = CreateThread(0, 0, Thread_Base_Thread, this, 0, &m_ThreadID); } //create not success, try again. if (m_Base_Thread == NULL) { std::cout << "========== Thread_Base::StartThread [" << m_strName.c_str() << "] failed by [" << GetCurrentThreadId() << "] " << endl; //wait for 100 ms WaitForSingleObject(m_ExitFlag, 100); if (Inherit) { SECURITY_ATTRIBUTES sa = { 0 }; sa.bInheritHandle = TRUE; sa.nLength = sizeof(SECURITY_ATTRIBUTES); m_Base_Thread = CreateThread(&sa, 0, Thread_Base_Thread, this, 0, &m_ThreadID); } else { m_Base_Thread = CreateThread(0, 0, Thread_Base_Thread, this, 0, &m_ThreadID); } } if (m_Base_Thread == NULL) { //GPRINTA_ERROR("CreateThread Failed. ErrorCode:%ld", GetLastError()); SetEvent(m_ExitFlag); printf("CreateThread Failed. [%s] WTF??.ErrCode:%d\n", m_strName.c_str(), GetLastError()); return false; } //std::cout << "========== Thread_Base::StartThread [" << m_strName.c_str() << "] succeeded " << GetThreadId(m_Base_Thread) << " by [" << GetCurrentThreadId() << "]" << endl; if (Sync) { //for test //return WaitTheThreadOnTheRun(INFINITE); return WaitTheThreadOnTheRun(300000); } return true; } bool Thread_Base::WaitTheThreadEnd(DWORD waittime) { bool ret = false; if (m_Base_Thread) { DWORD wait = WaitForSingleObject(m_Base_Thread, waittime); if (wait == WAIT_OBJECT_0) { ret = true; } } else { ret = true; } return ret; } bool Thread_Base::SetThreadOnTheRun(bool OnTheRun) { if (OnTheRun) { SetEvent(m_RunFlag); } else { ResetEvent(m_RunFlag); } return true; } bool Thread_Base::WaitTheThreadOnTheRun(DWORD waittime) { if (m_Base_Thread) { DWORD wait = WaitForSingleObject(m_Base_Thread, 0); if (wait == WAIT_OBJECT_0) { //thread already terminated //StackTrace st; //printf("WaitTheThreadOnTheRun Failed.WTF??\n"); std::cout << "WaitTheThreadOnTheRun Thread already Exit [" << m_strName.c_str() << "]" << endl; return false; } DWORD dwTick = GetTickCount(); wait = WaitForSingleObject(m_RunFlag, waittime); dwTick = GetTickCount() - dwTick; if (wait == WAIT_OBJECT_0) { std::cout << " WaitTheThreadOnTheRun Wait for Run ok [" << m_strName.c_str() << "] use time [" << dwTick << "ms]" << endl; return true; } std::cout << " WaitTheThreadOnTheRun Wait for Run timeout ["<< m_strName.c_str() << "] use time [" << dwTick << "ms]" << endl; } else { std::cout << "WaitTheThreadOnTheRun Failed.no thread exist?? [" << m_strName.c_str() << "]" << endl; } return false; } bool Thread_Base::WaitTheThreadEndSign(DWORD waittime) { if (m_Base_Thread) { DWORD wait = WaitForSingleObject(m_ExitFlag, waittime); if (wait == WAIT_OBJECT_0) { return true; } return false; } return true; } void Thread_Base::NotifyExit() { SetEvent(m_ExitFlag); } bool Thread_Base::StopThread(DWORD timeperiod) { bool ret = true; SetEvent(m_ExitFlag); //std::cout << "========== Thread_Base::StopThread ["<< m_strName.c_str() <<"] ThreadID [" << GetThreadId(m_Base_Thread) << "] by [" << GetCurrentThreadId() << endl; //me call me if (GetTID() == GetCurrentThreadId()) { return true; } while (m_Base_Thread) { DWORD wait = WaitForSingleObject(m_Base_Thread, timeperiod); if (wait == WAIT_TIMEOUT) { //printf("Warning!!!!!!\nWarning!!!!!!\nWarning!!!!!!\n StopThread Using Terminate Method\n"); TerminateThread(m_Base_Thread, 0); //if it's Terminated in unknown reason without notice,we need check it out //if (WaitTheThreadOnTheRun(1000)) { //Terminated ThreadExitProcedure(); } //CloseHandle(m_Base_Thread); ret = false; } else { CloseHandle(m_Base_Thread); } m_Base_Thread = NULL; } m_ThreadID = 0; return ret; } DWORD Thread_Base::GetTID() { return m_ThreadID; } void Thread_Base::NotifyThreadWork() { SetEvent(m_WorkFlag); } void Thread_Base::SetName(const char* pszThreadName) { m_strName = pszThreadName; } INT Thread_Base::WaitTheIncommingEvent(DWORD waittime) { HANDLE waits[2] = { m_ExitFlag, m_WorkFlag }; if (m_Base_Thread) { DWORD wait = WaitForMultipleObjects(2, waits, false, waittime); if (wait == WAIT_OBJECT_0) { return 0; } else if (wait == WAIT_OBJECT_0 + 1) { return 1; } else { //timeout return -1; } } //WTF?? //treat it as exist thread return 0; } void Thread_Base::SetLogger(PVOID pLoger) { m_pThreadLogger = pLoger; } PVOID Thread_Base::GetLogger() { return m_pThreadLogger; } void Thread_Base::ThreadExitProcedure() { SetEvent(m_ExitFlag); OnEndThread(); UnRegistThread(); SetThreadOnTheRun(false);//设置线程已经结束工作了 //CloseHandle(m_Base_Thread); } HANDLE Thread_Base::GetWorkEvt() { return m_WorkFlag; } HANDLE Thread_Base::GetExitEvt() { return m_ExitFlag; } //------------------------work thread-------------------------------------- Work_Thread::Work_Thread(void) { MsgQueue *p; p = new MsgQueue; m_pWorkQueReq = (HANDLE)p; p = new MsgQueue; m_pWorkQueRes = (HANDLE)p; m_PauseEvt = CreateEvent(0, TRUE, 0, 0); m_ResumeEvt = CreateEvent(0, TRUE, 0, 0); m_SleepingEvt = CreateEvent(0, TRUE, 0, 0); m_WorkingEvt = CreateEvent(0, TRUE, 0, 0);//初始状态就是FINISH } Work_Thread::~Work_Thread(void) { MsgQueue *p; p = (MsgQueue *)m_pWorkQueReq; delete p; m_pWorkQueReq = NULL; p = (MsgQueue *)m_pWorkQueRes; delete p; m_pWorkQueRes = NULL; CloseHandle(m_PauseEvt); m_PauseEvt = NULL; CloseHandle(m_SleepingEvt); m_SleepingEvt = NULL; CloseHandle(m_ResumeEvt); m_ResumeEvt = NULL; CloseHandle(m_WorkingEvt); m_WorkingEvt = NULL; } bool Work_Thread::PopReqDataObject(ResDataObject &obj) { return (bool)((MsgQueue *)m_pWorkQueReq)->DeQueue(obj); } bool Work_Thread::PushReqDataObject(ResDataObject &obj) { if (WaitTheThreadEndSign(0)) { return false;//not possible to send it } //printf("Thread:%d,push REQ one\n", GetCurrentThreadId()); ((MsgQueue *)m_pWorkQueReq)->InQueue(obj); NotifyThreadWork(); return true; } bool Work_Thread::PopResDataObject(ResDataObject &obj) { //printf("Thread:%d,Pop Res one\n", GetCurrentThreadId()); return (bool)((MsgQueue *)m_pWorkQueRes)->DeQueue(obj); } bool Work_Thread::PushResDataObject(ResDataObject &obj) { if (WaitTheThreadEndSign(0)) { return false;//not possible to send it } //printf("Thread:%d,push Res one\n",GetCurrentThreadId()); ((MsgQueue *)m_pWorkQueRes)->InQueue(obj); NotifyThreadWork(); return true; } bool Work_Thread::WaitForInQue(DWORD m_Timeout) { HANDLE wait[5] = { m_ExitFlag, m_PauseEvt, m_WorkFlag ,0,0}; wait[3] = ((MsgQueue *)m_pWorkQueReq)->GetNotifyHandle(); wait[4] = ((MsgQueue *)m_pWorkQueRes)->GetNotifyHandle(); DWORD ret = WaitForMultipleObjects(5,wait,0, 0); if ((ret >= WAIT_OBJECT_0) && (ret <= WAIT_OBJECT_0 + 1)) { return false; } if (((MsgQueue *)m_pWorkQueReq)->WaitForInQue(0) == WAIT_OBJECT_0) { return true; } if (((MsgQueue *)m_pWorkQueRes)->WaitForInQue(0) == WAIT_OBJECT_0) { return true; } ret = WaitForMultipleObjects(5, wait, FALSE, m_Timeout); if ((ret >= WAIT_OBJECT_0 + 2) && (ret < (WAIT_OBJECT_0 + 5))) { return true; } return false; } //finished=No,Pause=Yes,Others=No //only Pause=Yes bool Work_Thread::CheckForPause() { DWORD retEvt = 0; bool ret = false; HANDLE waitExp[2] = { m_ExitFlag, m_PauseEvt }; retEvt = WaitForMultipleObjects(2, waitExp, FALSE, 0); if (retEvt == WAIT_OBJECT_0 + 1) { ResetEvent(m_WorkingEvt); SetEvent(m_SleepingEvt); ret = true; } return ret; } //only Resume=Yes bool Work_Thread::WaitforResume() { bool Exret = false; HANDLE wait[2] = { m_ExitFlag, m_ResumeEvt }; DWORD ret = 0; ret = WaitForMultipleObjects(2, wait, FALSE, INFINITE); if (ret == WAIT_OBJECT_0 + 1) { //拿到Resume ResetEvent(m_SleepingEvt); SetEvent(m_WorkingEvt); Exret = true; } return Exret; } bool Work_Thread::PauseThread(DWORD timeout) { bool ret = false; DWORD retEvt = 0; HANDLE wait[2] = { m_ExitFlag, m_SleepingEvt }; if (GetCurrentThreadId() == m_ThreadID) { return false; } if (WaitTheThreadOnTheRun(0) == false) { //thread is not running return false; } SetEvent(m_PauseEvt); //get response retEvt = WaitForMultipleObjects(2, wait, FALSE, timeout); if (retEvt == WAIT_OBJECT_0 + 1) { ret = true; } ResetEvent(m_PauseEvt); return ret; } bool Work_Thread::ResumeThread(DWORD timeout) { bool ret = 0; DWORD retEvt = 0; HANDLE wait[3] = { m_ExitFlag, m_WorkingEvt }; if (GetCurrentThreadId() == m_ThreadID) { return false; } if (WaitTheThreadOnTheRun(0) == false) { //thread is not running return false; } SetEvent(m_ResumeEvt); retEvt = WaitForMultipleObjects(2, wait, FALSE, timeout); if (retEvt == WAIT_OBJECT_0 + 1) { ret = true; } ResetEvent(m_ResumeEvt); return ret; } //------------------------dios thread-------------------------------------- Dios_Thread::Dios_Thread(void) { } Dios_Thread::~Dios_Thread(void) { }