|
@@ -2176,6 +2176,7 @@ void resubscribe_topic(void* client, void* reconnect_date)
|
|
|
//int msgarrvd(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
|
|
//int msgarrvd(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
|
|
|
void msgarrivd(void* client, message_data_t* message)
|
|
void msgarrivd(void* client, message_data_t* message)
|
|
|
{
|
|
{
|
|
|
|
|
+ std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] "<<endl;
|
|
|
if (client == nullptr)
|
|
if (client == nullptr)
|
|
|
{
|
|
{
|
|
|
//printf("************************ somthing happend...");
|
|
//printf("************************ somthing happend...");
|
|
@@ -2322,6 +2323,7 @@ void* MqttSendThreadFunc(void* pPara)
|
|
|
mqtt_client* pConn = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
|
|
mqtt_client* pConn = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
|
|
|
std::string client_id = std::get<CLINET_ID_ID>(*connection);
|
|
std::string client_id = std::get<CLINET_ID_ID>(*connection);
|
|
|
sem_t* hSemaphore = std::get<SEMAPHORE_HANDLE_ID>(*connection);
|
|
sem_t* hSemaphore = std::get<SEMAPHORE_HANDLE_ID>(*connection);
|
|
|
|
|
+ mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*connection);
|
|
|
|
|
|
|
|
// 线程启动日志
|
|
// 线程启动日志
|
|
|
std::cout << "[" << client_id << "] The MQTT sending thread is started" << std::endl;
|
|
std::cout << "[" << client_id << "] The MQTT sending thread is started" << std::endl;
|
|
@@ -2338,7 +2340,7 @@ void* MqttSendThreadFunc(void* pPara)
|
|
|
std::cout << "[" << client_id << "] Received the message semaphore and ready to process the message" << std::endl;
|
|
std::cout << "[" << client_id << "] Received the message semaphore and ready to process the message" << std::endl;
|
|
|
|
|
|
|
|
pLock->Thread_Lock();
|
|
pLock->Thread_Lock();
|
|
|
- std::cout << "[" << client_id << "] Successfully obtained the sending lock" << std::endl;
|
|
|
|
|
|
|
+ //std::cout << "[" << client_id << "] Successfully obtained the sending lock" << std::endl;
|
|
|
|
|
|
|
|
if (pList->empty()) {
|
|
if (pList->empty()) {
|
|
|
std::cout << "[" << client_id << "] The message list is empty, release the lock." << std::endl;
|
|
std::cout << "[" << client_id << "] The message list is empty, release the lock." << std::endl;
|
|
@@ -2348,41 +2350,43 @@ void* MqttSendThreadFunc(void* pPara)
|
|
|
|
|
|
|
|
ResDataObject* pMsg = pList->front();
|
|
ResDataObject* pMsg = pList->front();
|
|
|
pList->pop_front();
|
|
pList->pop_front();
|
|
|
- std::cout << "[" << client_id << "] Get a message from the message list, the number of remaining messages: " << pList->size() << std::endl;
|
|
|
|
|
-
|
|
|
|
|
|
|
+ //std::cout << "[" << client_id << "] Get a message from the message list, the number of remaining messages: " << pList->size() << std::endl;
|
|
|
|
|
+ std::string m_strSendMessage;
|
|
|
for (int x = 0; x < pMsg->size(); x++) {
|
|
for (int x = 0; x < pMsg->size(); x++) {
|
|
|
const char* pTopic = pMsg->GetKey(x);
|
|
const char* pTopic = pMsg->GetKey(x);
|
|
|
- std::string message = (std::string)(*pMsg)[pTopic];
|
|
|
|
|
-
|
|
|
|
|
- std::cout << "[" << client_id << "] Prepare to publish messages to the topic: " << pTopic
|
|
|
|
|
- << ", Message length: " << message.length() << "Byte" << std::endl;
|
|
|
|
|
-
|
|
|
|
|
- mqtt_message_t msg;
|
|
|
|
|
- memset(&msg, 0, sizeof(msg));
|
|
|
|
|
- msg.payload = (void*)message.c_str();
|
|
|
|
|
- msg.payloadlen = message.length();
|
|
|
|
|
- msg.qos = QOS1;
|
|
|
|
|
- // 发布消息
|
|
|
|
|
- int publishResult = mqtt_publish(pConn, pTopic, &msg);
|
|
|
|
|
- if (publishResult == 0) {
|
|
|
|
|
- std::cout << "[" << client_id << "] Message published successfully to the topic: " << pTopic << std::endl;
|
|
|
|
|
- }
|
|
|
|
|
- else {
|
|
|
|
|
- std::cout << "[" << client_id << "] Message publishing failed to the topic: " << pTopic
|
|
|
|
|
- << ", Error code: " << publishResult << std::endl;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ m_strSendMessage = (std::string)(*pMsg)[pTopic];
|
|
|
|
|
+
|
|
|
|
|
+ std::cout<< CurrentDateTime() << " [" << client_id << "] Prepare to publish messages to the topic: " << pTopic
|
|
|
|
|
+ << ", Message length: " << m_strSendMessage.length() << "Byte" << std::endl;
|
|
|
|
|
+ PublishActionWithoutLock(m_strSendMessage, pTopic, pConn, client_id, pTopicList->size() <= 0 ? QOS2 : MQTT_QOS);
|
|
|
|
|
+ //mqtt_message_t msg;
|
|
|
|
|
+ //memset(&msg, 0, sizeof(msg));
|
|
|
|
|
+ //msg.payload = (void*)message.c_str();
|
|
|
|
|
+ //msg.payloadlen = message.length();
|
|
|
|
|
+ //msg.qos = QOS1;
|
|
|
|
|
+ //// 发布消息
|
|
|
|
|
+ //int publishResult = mqtt_publish(pConn, pTopic, &msg);
|
|
|
|
|
+ //if (publishResult == 0) {
|
|
|
|
|
+ // std::cout << "[" << client_id << "] Message published successfully to the topic: " << pTopic << std::endl;
|
|
|
|
|
+ //}
|
|
|
|
|
+ //else {
|
|
|
|
|
+ // std::cout << "[" << client_id << "] Message publishing failed to the topic: " << pTopic
|
|
|
|
|
+ // << ", Error code: " << publishResult << std::endl;
|
|
|
|
|
+ //}
|
|
|
}
|
|
}
|
|
|
- delete pMsg;
|
|
|
|
|
std::cout << "[" << client_id << "] The message processing is completed, and the message object has been released." << std::endl;
|
|
std::cout << "[" << client_id << "] The message processing is completed, and the message object has been released." << std::endl;
|
|
|
|
|
+ delete pMsg;
|
|
|
pLock->Thread_UnLock();
|
|
pLock->Thread_UnLock();
|
|
|
- std::cout << "[" << client_id << "] Release the sending lock" << std::endl;
|
|
|
|
|
|
|
+ //std::cout << "[" << client_id << "] Release the sending lock" << std::endl;
|
|
|
}
|
|
}
|
|
|
else if (errno == ETIMEDOUT) {
|
|
else if (errno == ETIMEDOUT) {
|
|
|
//std::cout << "[" << client_id << "] The semaphore wait timed out, continue waiting." << std::endl;
|
|
//std::cout << "[" << client_id << "] The semaphore wait timed out, continue waiting." << std::endl;
|
|
|
|
|
+ continue;
|
|
|
}
|
|
}
|
|
|
else {
|
|
else {
|
|
|
// 其他错误情况
|
|
// 其他错误情况
|
|
|
std::cout << "[" << client_id << "] Semaphore wait error, error code: " << errno << std::endl;
|
|
std::cout << "[" << client_id << "] Semaphore wait error, error code: " << errno << std::endl;
|
|
|
|
|
+ continue;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
return NULL;
|
|
return NULL;
|
|
@@ -2474,7 +2478,7 @@ mqtt_client_t* InnerConnect(ccos_mqtt_connection* connection) {
|
|
|
try {
|
|
try {
|
|
|
while (true) {
|
|
while (true) {
|
|
|
attempt_count++;
|
|
attempt_count++;
|
|
|
- std::cout << "[ATTEMPT #" << attempt_count << "] Trying MQTT connect..." << std::endl;
|
|
|
|
|
|
|
+ std::cout << CurrentDateTime()<< " [ATTEMPT #" << attempt_count << "] Trying MQTT connect..." << std::endl;
|
|
|
rc = mqtt_connect(pMqttClient);
|
|
rc = mqtt_connect(pMqttClient);
|
|
|
|
|
|
|
|
if (rc != MQTT_SUCCESS_ERROR) {
|
|
if (rc != MQTT_SUCCESS_ERROR) {
|
|
@@ -2615,10 +2619,12 @@ ccos_mqtt_connection* LogicDevice::NewConnection(const char* pszServer,const cha
|
|
|
sem_init(semaphore, 0, 0);
|
|
sem_init(semaphore, 0, 0);
|
|
|
std::get<SEMAPHORE_HANDLE_ID>(*connection) = semaphore;
|
|
std::get<SEMAPHORE_HANDLE_ID>(*connection) = semaphore;
|
|
|
|
|
|
|
|
|
|
+ std::get<THREAD_RUNNING_ID>(*connection) = true;
|
|
|
// 创建发送线程
|
|
// 创建发送线程
|
|
|
pthread_t threadId;
|
|
pthread_t threadId;
|
|
|
if (pthread_create(&threadId, NULL, MqttSendThreadFunc, connection) != 0) {
|
|
if (pthread_create(&threadId, NULL, MqttSendThreadFunc, connection) != 0) {
|
|
|
std::cerr << "Failed to create MQTT send thread" << std::endl;
|
|
std::cerr << "Failed to create MQTT send thread" << std::endl;
|
|
|
|
|
+ std::get<THREAD_RUNNING_ID>(*connection) = false;
|
|
|
// 错误处理...
|
|
// 错误处理...
|
|
|
sem_destroy(semaphore);
|
|
sem_destroy(semaphore);
|
|
|
delete semaphore;
|
|
delete semaphore;
|
|
@@ -2628,7 +2634,6 @@ ccos_mqtt_connection* LogicDevice::NewConnection(const char* pszServer,const cha
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
std::get<CONNECTED_HANDLE_ID>(*connection) = threadId;
|
|
std::get<CONNECTED_HANDLE_ID>(*connection) = threadId;
|
|
|
- std::get<THREAD_RUNNING_ID>(*connection) = true;
|
|
|
|
|
std::cout << "MqttSendThreadFunc thread id [" << threadId << "]" << std::endl;
|
|
std::cout << "MqttSendThreadFunc thread id [" << threadId << "]" << std::endl;
|
|
|
|
|
|
|
|
pLock->Thread_UnLock();
|
|
pLock->Thread_UnLock();
|
|
@@ -2735,10 +2740,12 @@ LOGICDEVICE_API ccos_mqtt_connection* NewConnection(const char* pszClientID, cco
|
|
|
sem_init(semaphore, 0, 0);
|
|
sem_init(semaphore, 0, 0);
|
|
|
std::get<SEMAPHORE_HANDLE_ID>(*connection) = semaphore;
|
|
std::get<SEMAPHORE_HANDLE_ID>(*connection) = semaphore;
|
|
|
|
|
|
|
|
|
|
+ std::get<THREAD_RUNNING_ID>(*connection) = true;
|
|
|
// 创建发送线程
|
|
// 创建发送线程
|
|
|
pthread_t threadId;
|
|
pthread_t threadId;
|
|
|
if (pthread_create(&threadId, NULL, MqttSendThreadFunc, connection) != 0) {
|
|
if (pthread_create(&threadId, NULL, MqttSendThreadFunc, connection) != 0) {
|
|
|
std::cerr << "Failed to create MQTT send thread" << std::endl;
|
|
std::cerr << "Failed to create MQTT send thread" << std::endl;
|
|
|
|
|
+ std::get<THREAD_RUNNING_ID>(*connection) = false;
|
|
|
// 错误处理...
|
|
// 错误处理...
|
|
|
sem_destroy(semaphore);
|
|
sem_destroy(semaphore);
|
|
|
delete semaphore;
|
|
delete semaphore;
|
|
@@ -2748,7 +2755,6 @@ LOGICDEVICE_API ccos_mqtt_connection* NewConnection(const char* pszClientID, cco
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
std::get<CONNECTED_HANDLE_ID>(*connection) = threadId;
|
|
std::get<CONNECTED_HANDLE_ID>(*connection) = threadId;
|
|
|
- std::get<THREAD_RUNNING_ID>(*connection) = true;
|
|
|
|
|
std::cout << "MqttSendThreadFunc thread id [" << threadId << "]" << std::endl;
|
|
std::cout << "MqttSendThreadFunc thread id [" << threadId << "]" << std::endl;
|
|
|
|
|
|
|
|
pLock->Thread_UnLock();
|
|
pLock->Thread_UnLock();
|
|
@@ -2885,8 +2891,11 @@ LOGICDEVICE_API void CloseConnection(ccos_mqtt_connection* hConnection)
|
|
|
//主动订阅主题
|
|
//主动订阅主题
|
|
|
LOGICDEVICE_API int SubscribeTopic(ccos_mqtt_connection* hConnection, const char* pszTopic, bool isShare)
|
|
LOGICDEVICE_API int SubscribeTopic(ccos_mqtt_connection* hConnection, const char* pszTopic, bool isShare)
|
|
|
{
|
|
{
|
|
|
|
|
+ std::cout << "SubscribeTopic called. Topic: " << (pszTopic ? pszTopic : "null")
|
|
|
|
|
+ << ", isShare: " << (isShare ? "true" : "false") << std::endl;
|
|
|
if (hConnection == nullptr)
|
|
if (hConnection == nullptr)
|
|
|
{
|
|
{
|
|
|
|
|
+ std::cout << "SubscribeTopic error: hConnection is nullptr" << std::endl;
|
|
|
return 0;
|
|
return 0;
|
|
|
}
|
|
}
|
|
|
mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
|
|
mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
|
|
@@ -2916,7 +2925,7 @@ LOGICDEVICE_API int SubscribeTopic(ccos_mqtt_connection* hConnection, const cha
|
|
|
int ret = mqtt_subscribe(pMqttClient, topicToSubscribe, MQTT_QOS, msgarrivd);
|
|
int ret = mqtt_subscribe(pMqttClient, topicToSubscribe, MQTT_QOS, msgarrivd);
|
|
|
//int ret = mqtt_subscribe(pMqttClient, pszTopic, MQTT_QOS, msgarrivd);
|
|
//int ret = mqtt_subscribe(pMqttClient, pszTopic, MQTT_QOS, msgarrivd);
|
|
|
////mLog::FWARN("mqtt {$} Subscribe ReUse {$} topic num {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret, pTopicList->size());
|
|
////mLog::FWARN("mqtt {$} Subscribe ReUse {$} topic num {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret, pTopicList->size());
|
|
|
- std::cout << "mqtt [" << std::get<CLINET_ID_ID>(*hConnection) << " ]Subscribe ReUse " << pszTopic << endl;
|
|
|
|
|
|
|
+ std::cout << CurrentDateTime() << " mqtt [" << std::get<CLINET_ID_ID>(*hConnection) << " ]Subscribe ReUse " << pszTopic << " ret: "<<ret <<endl;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if (rc != MQTT_SUCCESS_ERROR)
|
|
if (rc != MQTT_SUCCESS_ERROR)
|
|
@@ -3014,7 +3023,7 @@ LOGICDEVICE_API int PublishAction(ResDataObject* pAction, const char* pszTopic,
|
|
|
std::cout << CurrentDateTime() << "Who ????? " << "Publish to [" << pszTopic << "] Action Body: "<< pAction->encode() << endl; //<< pAction->encode()
|
|
std::cout << CurrentDateTime() << "Who ????? " << "Publish to [" << pszTopic << "] Action Body: "<< pAction->encode() << endl; //<< pAction->encode()
|
|
|
return 0;
|
|
return 0;
|
|
|
}
|
|
}
|
|
|
- std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " Publish Action to ["<< pszTopic << "] Action Body: " << endl; //<< pAction->encode()
|
|
|
|
|
|
|
+ std::cout << CurrentDateTime()<<" " << std::get<CLINET_ID_ID>(*hConnection) << " Publish Action to [" << pszTopic << "] Action Body: " << endl; //<< pAction->encode()
|
|
|
|
|
|
|
|
string topic = pszTopic;
|
|
string topic = pszTopic;
|
|
|
if (topic.length() <= 0)
|
|
if (topic.length() <= 0)
|
|
@@ -3156,9 +3165,10 @@ int PublishActionWithoutLock(string& message, const char* pszTopic, mqtt_client
|
|
|
}
|
|
}
|
|
|
} while (rc != MQTT_SUCCESS_ERROR && (nTryTimes <= 2 || GetTickCount() - dwTick < dwTimeout));
|
|
} while (rc != MQTT_SUCCESS_ERROR && (nTryTimes <= 2 || GetTickCount() - dwTick < dwTimeout));
|
|
|
|
|
|
|
|
- if(nTryTimes > 1)
|
|
|
|
|
|
|
+ //if(nTryTimes > 1)
|
|
|
////mLog::FWARN("CLT {$} PublishAction to {$} send Times {$} result {$}", client_id, pszTopic, nTryTimes, rc);
|
|
////mLog::FWARN("CLT {$} PublishAction to {$} send Times {$} result {$}", client_id, pszTopic, nTryTimes, rc);
|
|
|
|
|
|
|
|
|
|
+ cout << CurrentDateTime() << " CLT " << client_id.c_str() << " PublishAction to " << pszTopic << " send Times " << nTryTimes << " result " << rc << endl;
|
|
|
////mLog::FDEBUG("CLT {$} PublishAction to {$} send Times {$} result {$}", client_id, pszTopic, nTryTimes, rc);
|
|
////mLog::FDEBUG("CLT {$} PublishAction to {$} send Times {$} result {$}", client_id, pszTopic, nTryTimes, rc);
|
|
|
|
|
|
|
|
if (rc < 0)
|
|
if (rc < 0)
|
|
@@ -3419,7 +3429,7 @@ LOGICDEVICE_API int ActionAndRespWithConnDefalt(ccos_mqtt_connection* hConnecti
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
pLock->Thread_UnLock();
|
|
pLock->Thread_UnLock();
|
|
|
- std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << "try [" << pszTopic << "] getresp ok Use Time[" << dwTick << "]ms" << endl;
|
|
|
|
|
|
|
+ std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << " try [" << pszTopic << "] getresp ok Use Time[" << dwTick << "]ms" << endl;
|
|
|
////mLog::FDEBUG("CLT {$} try {$} getresp ok Use Time {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, dwTick);
|
|
////mLog::FDEBUG("CLT {$} try {$} getresp ok Use Time {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, dwTick);
|
|
|
//std::get<FILTER_RES_OBJ_ID>(*pfilter) = nullptr;
|
|
//std::get<FILTER_RES_OBJ_ID>(*pfilter) = nullptr;
|
|
|
return 2;
|
|
return 2;
|