paho_c_pub.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. /*******************************************************************************
  2. * Copyright (c) 2012, 2020 IBM Corp., and others
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial contribution
  15. * Guilherme Maciel Ferreira - add keep alive option
  16. * Ian Craggs - add full capability
  17. *******************************************************************************/
  18. #include "MQTTAsync.h"
  19. #include "pubsub_opts.h"
  20. #include <stdio.h>
  21. #include <signal.h>
  22. #include <string.h>
  23. #include <stdlib.h>
  24. #if defined(_WIN32)
  25. #include <windows.h>
  26. #define sleep Sleep
  27. #else
  28. #include <unistd.h>
  29. #include <sys/time.h>
  30. #include <unistd.h>
  31. #endif
  32. #if defined(_WRS_KERNEL)
  33. #include <OsWrapper.h>
  34. #endif
  35. volatile int toStop = 0;
  36. struct pubsub_opts opts =
  37. {
  38. 1, 0, 0, 0, "\n", 100, /* debug/app options */
  39. NULL, NULL, 1, 0, 0, /* message options */
  40. MQTTVERSION_DEFAULT, NULL, "paho-c-pub", 0, 0, NULL, NULL, "localhost", "1883", NULL, 10, /* MQTT options */
  41. NULL, NULL, 0, 0, /* will options */
  42. 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */
  43. 0, {NULL, NULL}, /* MQTT V5 options */
  44. NULL, NULL, /* HTTP and HTTPS proxies */
  45. };
  46. MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer;
  47. MQTTProperty property;
  48. MQTTProperties props = MQTTProperties_initializer;
  49. void mysleep(int ms)
  50. {
  51. #if defined(_WIN32)
  52. Sleep(ms);
  53. #else
  54. usleep(ms * 1000);
  55. #endif
  56. }
  57. void cfinish(int sig)
  58. {
  59. signal(SIGINT, NULL);
  60. toStop = 1;
  61. }
  62. int messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m)
  63. {
  64. /* not expecting any messages */
  65. return 1;
  66. }
  67. static int disconnected = 0;
  68. void onDisconnect5(void* context, MQTTAsync_successData5* response)
  69. {
  70. disconnected = 1;
  71. }
  72. void onDisconnect(void* context, MQTTAsync_successData* response)
  73. {
  74. disconnected = 1;
  75. }
  76. static int connected = 0;
  77. void myconnect(MQTTAsync client);
  78. int mypublish(MQTTAsync client, int datalen, char* data);
  79. void onConnectFailure5(void* context, MQTTAsync_failureData5* response)
  80. {
  81. fprintf(stderr, "Connect failed, rc %s reason code %s\n",
  82. MQTTAsync_strerror(response->code),
  83. MQTTReasonCode_toString(response->reasonCode));
  84. connected = -1;
  85. MQTTAsync client = (MQTTAsync)context;
  86. }
  87. void onConnectFailure(void* context, MQTTAsync_failureData* response)
  88. {
  89. fprintf(stderr, "Connect failed, rc %s\n", response ? MQTTAsync_strerror(response->code) : "none");
  90. connected = -1;
  91. MQTTAsync client = (MQTTAsync)context;
  92. }
  93. void onConnect5(void* context, MQTTAsync_successData5* response)
  94. {
  95. MQTTAsync client = (MQTTAsync)context;
  96. int rc = 0;
  97. if (opts.verbose)
  98. printf("Connected\n");
  99. if (opts.null_message == 1)
  100. rc = mypublish(client, 0, "");
  101. else if (opts.message)
  102. rc = mypublish(client, (int)strlen(opts.message), opts.message);
  103. else if (opts.filename)
  104. {
  105. int data_len = 0;
  106. char* buffer = readfile(&data_len, &opts);
  107. if (buffer == NULL)
  108. toStop = 1;
  109. else
  110. {
  111. rc = mypublish(client, data_len, buffer);
  112. free(buffer);
  113. }
  114. }
  115. connected = 1;
  116. }
  117. void onConnect(void* context, MQTTAsync_successData* response)
  118. {
  119. MQTTAsync client = (MQTTAsync)context;
  120. int rc = 0;
  121. if (opts.verbose)
  122. printf("Connected\n");
  123. if (opts.null_message == 1)
  124. rc = mypublish(client, 0, "");
  125. else if (opts.message)
  126. rc = mypublish(client, (int)strlen(opts.message), opts.message);
  127. else if (opts.filename)
  128. {
  129. int data_len = 0;
  130. char* buffer = readfile(&data_len, &opts);
  131. if (buffer == NULL)
  132. toStop = 1;
  133. else
  134. {
  135. rc = mypublish(client, data_len, buffer);
  136. free(buffer);
  137. }
  138. }
  139. connected = 1;
  140. }
  141. static int published = 0;
  142. void onPublishFailure5(void* context, MQTTAsync_failureData5* response)
  143. {
  144. if (opts.verbose)
  145. fprintf(stderr, "Publish failed, rc %s reason code %s\n",
  146. MQTTAsync_strerror(response->code),
  147. MQTTReasonCode_toString(response->reasonCode));
  148. published = -1;
  149. }
  150. void onPublishFailure(void* context, MQTTAsync_failureData* response)
  151. {
  152. if (opts.verbose)
  153. fprintf(stderr, "Publish failed, rc %s\n", MQTTAsync_strerror(response->code));
  154. published = -1;
  155. }
  156. void onPublish5(void* context, MQTTAsync_successData5* response)
  157. {
  158. if (opts.verbose)
  159. printf("Publish succeeded, reason code %s\n",
  160. MQTTReasonCode_toString(response->reasonCode));
  161. if (opts.null_message || opts.message || opts.filename)
  162. toStop = 1;
  163. published = 1;
  164. }
  165. void onPublish(void* context, MQTTAsync_successData* response)
  166. {
  167. if (opts.verbose)
  168. printf("Publish succeeded\n");
  169. if (opts.null_message || opts.message || opts.filename)
  170. toStop = 1;
  171. published = 1;
  172. }
  173. static int onSSLError(const char *str, size_t len, void *context)
  174. {
  175. MQTTAsync client = (MQTTAsync)context;
  176. return fprintf(stderr, "SSL error: %s\n", str);
  177. }
  178. static unsigned int onPSKAuth(const char* hint,
  179. char* identity,
  180. unsigned int max_identity_len,
  181. unsigned char* psk,
  182. unsigned int max_psk_len,
  183. void* context)
  184. {
  185. int psk_len;
  186. int k, n;
  187. int rc = 0;
  188. struct pubsub_opts* opts = context;
  189. /* printf("Trying TLS-PSK auth with hint: %s\n", hint);*/
  190. if (opts->psk == NULL || opts->psk_identity == NULL)
  191. {
  192. /* printf("No PSK entered\n"); */
  193. goto exit;
  194. }
  195. /* psk should be array of bytes. This is a quick and dirty way to
  196. * convert hex to bytes without input validation */
  197. psk_len = (int)strlen(opts->psk) / 2;
  198. if (psk_len > max_psk_len)
  199. {
  200. fprintf(stderr, "PSK too long\n");
  201. goto exit;
  202. }
  203. for (k=0, n=0; k < psk_len; k++, n += 2)
  204. {
  205. sscanf(&opts->psk[n], "%2hhx", &psk[k]);
  206. }
  207. /* identity should be NULL terminated string */
  208. strncpy(identity, opts->psk_identity, max_identity_len);
  209. if (identity[max_identity_len - 1] != '\0')
  210. {
  211. fprintf(stderr, "Identity too long\n");
  212. goto exit;
  213. }
  214. /* Function should return length of psk on success. */
  215. rc = psk_len;
  216. exit:
  217. return rc;
  218. }
  219. void myconnect(MQTTAsync client)
  220. {
  221. MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  222. MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
  223. MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
  224. int rc = 0;
  225. if (opts.verbose)
  226. printf("Connecting\n");
  227. if (opts.MQTTVersion == MQTTVERSION_5)
  228. {
  229. MQTTAsync_connectOptions conn_opts5 = MQTTAsync_connectOptions_initializer5;
  230. conn_opts = conn_opts5;
  231. conn_opts.onSuccess5 = onConnect5;
  232. conn_opts.onFailure5 = onConnectFailure5;
  233. conn_opts.cleanstart = 1;
  234. }
  235. else
  236. {
  237. conn_opts.onSuccess = onConnect;
  238. conn_opts.onFailure = onConnectFailure;
  239. conn_opts.cleansession = 1;
  240. }
  241. conn_opts.keepAliveInterval = opts.keepalive;
  242. conn_opts.username = opts.username;
  243. conn_opts.password = opts.password;
  244. conn_opts.MQTTVersion = opts.MQTTVersion;
  245. conn_opts.context = client;
  246. conn_opts.automaticReconnect = 1;
  247. conn_opts.httpProxy = opts.http_proxy;
  248. conn_opts.httpsProxy = opts.https_proxy;
  249. if (opts.will_topic) /* will options */
  250. {
  251. will_opts.message = opts.will_payload;
  252. will_opts.topicName = opts.will_topic;
  253. will_opts.qos = opts.will_qos;
  254. will_opts.retained = opts.will_retain;
  255. conn_opts.will = &will_opts;
  256. }
  257. if (opts.connection && (strncmp(opts.connection, "ssl://", 6) == 0 ||
  258. strncmp(opts.connection, "wss://", 6) == 0))
  259. {
  260. if (opts.insecure)
  261. ssl_opts.verify = 0;
  262. else
  263. ssl_opts.verify = 1;
  264. ssl_opts.CApath = opts.capath;
  265. ssl_opts.keyStore = opts.cert;
  266. ssl_opts.trustStore = opts.cafile;
  267. ssl_opts.privateKey = opts.key;
  268. ssl_opts.privateKeyPassword = opts.keypass;
  269. ssl_opts.enabledCipherSuites = opts.ciphers;
  270. ssl_opts.ssl_error_cb = onSSLError;
  271. ssl_opts.ssl_error_context = client;
  272. ssl_opts.ssl_psk_cb = onPSKAuth;
  273. ssl_opts.ssl_psk_context = &opts;
  274. conn_opts.ssl = &ssl_opts;
  275. }
  276. connected = 0;
  277. if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
  278. {
  279. fprintf(stderr, "Failed to start connect, return code %s\n", MQTTAsync_strerror(rc));
  280. exit(EXIT_FAILURE);
  281. }
  282. }
  283. int mypublish(MQTTAsync client, int datalen, char* data)
  284. {
  285. int rc;
  286. if (opts.verbose)
  287. printf("Publishing data of length %d\n", datalen);
  288. rc = MQTTAsync_send(client, opts.topic, datalen, data, opts.qos, opts.retained, &pub_opts);
  289. if (opts.verbose && rc != MQTTASYNC_SUCCESS && !opts.quiet)
  290. fprintf(stderr, "Error from MQTTAsync_send: %s\n", MQTTAsync_strerror(rc));
  291. return rc;
  292. }
  293. void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
  294. {
  295. fprintf(stderr, "Trace : %d, %s\n", level, message);
  296. }
  297. int main(int argc, char** argv)
  298. {
  299. MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
  300. MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
  301. MQTTAsync client;
  302. char* buffer = NULL;
  303. char* url = NULL;
  304. int url_allocated = 0;
  305. int rc = 0;
  306. const char* version = NULL;
  307. const char* program_name = "paho_c_pub";
  308. MQTTAsync_nameValue* infos = MQTTAsync_getVersionInfo();
  309. #if !defined(_WIN32)
  310. struct sigaction sa;
  311. #endif
  312. if (argc < 2)
  313. usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
  314. if (getopts(argc, argv, &opts) != 0)
  315. usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
  316. if (opts.connection)
  317. url = opts.connection;
  318. else
  319. {
  320. url = malloc(100);
  321. url_allocated = 1;
  322. sprintf(url, "%s:%s", opts.host, opts.port);
  323. }
  324. if (opts.verbose)
  325. printf("URL is %s\n", url);
  326. if (opts.tracelevel > 0)
  327. {
  328. MQTTAsync_setTraceCallback(trace_callback);
  329. MQTTAsync_setTraceLevel(opts.tracelevel);
  330. }
  331. create_opts.sendWhileDisconnected = 1;
  332. if (opts.MQTTVersion >= MQTTVERSION_5)
  333. create_opts.MQTTVersion = MQTTVERSION_5;
  334. rc = MQTTAsync_createWithOptions(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);
  335. if (rc != MQTTASYNC_SUCCESS)
  336. {
  337. if (!opts.quiet)
  338. fprintf(stderr, "Failed to create client, return code: %s\n", MQTTAsync_strerror(rc));
  339. exit(EXIT_FAILURE);
  340. }
  341. #if defined(_WIN32)
  342. signal(SIGINT, cfinish);
  343. signal(SIGTERM, cfinish);
  344. #else
  345. memset(&sa, 0, sizeof(struct sigaction));
  346. sa.sa_handler = cfinish;
  347. sa.sa_flags = 0;
  348. sigaction(SIGINT, &sa, NULL);
  349. sigaction(SIGTERM, &sa, NULL);
  350. #endif
  351. rc = MQTTAsync_setCallbacks(client, client, NULL, messageArrived, NULL);
  352. if (rc != MQTTASYNC_SUCCESS)
  353. {
  354. if (!opts.quiet)
  355. fprintf(stderr, "Failed to set callbacks, return code: %s\n", MQTTAsync_strerror(rc));
  356. exit(EXIT_FAILURE);
  357. }
  358. if (opts.MQTTVersion >= MQTTVERSION_5)
  359. {
  360. pub_opts.onSuccess5 = onPublish5;
  361. pub_opts.onFailure5 = onPublishFailure5;
  362. if (opts.message_expiry > 0)
  363. {
  364. property.identifier = MQTTPROPERTY_CODE_MESSAGE_EXPIRY_INTERVAL;
  365. property.value.integer4 = opts.message_expiry;
  366. MQTTProperties_add(&props, &property);
  367. }
  368. if (opts.user_property.name)
  369. {
  370. property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY;
  371. property.value.data.data = opts.user_property.name;
  372. property.value.data.len = (int)strlen(opts.user_property.name);
  373. property.value.value.data = opts.user_property.value;
  374. property.value.value.len = (int)strlen(opts.user_property.value);
  375. MQTTProperties_add(&props, &property);
  376. }
  377. pub_opts.properties = props;
  378. }
  379. else
  380. {
  381. pub_opts.onSuccess = onPublish;
  382. pub_opts.onFailure = onPublishFailure;
  383. }
  384. myconnect(client);
  385. while (!toStop)
  386. {
  387. int data_len = 0;
  388. int delim_len = 0;
  389. if (opts.stdin_lines)
  390. {
  391. buffer = malloc(opts.maxdatalen);
  392. delim_len = (int)strlen(opts.delimiter);
  393. do
  394. {
  395. buffer[data_len++] = getchar();
  396. if (data_len > delim_len)
  397. {
  398. if (strncmp(opts.delimiter, &buffer[data_len - delim_len], delim_len) == 0)
  399. break;
  400. }
  401. } while (data_len < opts.maxdatalen);
  402. rc = mypublish(client, data_len, buffer);
  403. }
  404. else
  405. mysleep(100);
  406. }
  407. if (opts.message == 0 && opts.null_message == 0 && opts.filename == 0)
  408. free(buffer);
  409. if (opts.MQTTVersion >= MQTTVERSION_5)
  410. disc_opts.onSuccess5 = onDisconnect5;
  411. else
  412. disc_opts.onSuccess = onDisconnect;
  413. if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
  414. {
  415. if (!opts.quiet)
  416. fprintf(stderr, "Failed to start disconnect, return code: %s\n", MQTTAsync_strerror(rc));
  417. exit(EXIT_FAILURE);
  418. }
  419. while (!disconnected)
  420. mysleep(100);
  421. MQTTAsync_destroy(&client);
  422. if (url_allocated)
  423. free(url);
  424. return EXIT_SUCCESS;
  425. }