StatRunner.java 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. package com.zskk.dicom.monitor.schedules;
  2. import com.zskk.dicom.monitor.config.Configs;
  3. import com.zskk.dicom.monitor.queue.*;
  4. import com.zskk.dicom.monitor.runner.*;
  5. import java.util.HashSet;
  6. import java.util.Set;
  7. import java.util.concurrent.ExecutorService;
  8. import java.util.concurrent.Executors;
  9. import java.util.concurrent.TimeUnit;
  10. /**
  11. *
  12. * 功能描述:
  13. *
  14. * @param: dirMonitorPools 监控文件线程池 oldUploadPools 旧文件
  15. * @return:
  16. * @auther: zzp
  17. * @date: 2019/3/10 10:35
  18. */
  19. public class StatRunner {
  20. /**
  21. * 休眠30s等待线程全部退出
  22. */
  23. private static final int WAIT_TIME = 30;
  24. /**
  25. * dcm文件产生文件夹监听线程(池)
  26. */
  27. private static ExecutorService dirMonitorPools = null;
  28. /**
  29. * 重试文件夹监听线程(池)
  30. */
  31. private static ExecutorService retryMonitorPools = null;
  32. /**
  33. * 把遗留上传文件添加进入队列的线程(池)
  34. */
  35. private static ExecutorService addHistoryFile2QueuePools = null;
  36. /**
  37. * 把遗留重试上传文件添加进入队列的线程(池)
  38. */
  39. private static ExecutorService addHistoryRetryFile2QueuePools = null;
  40. /**
  41. * 上传当前重试文件线程(池)
  42. */
  43. private static ExecutorService currentRetryPools = null;
  44. /**
  45. * 把文件移动到错误文件的线程(池)
  46. */
  47. private static ExecutorService errorPools = null;
  48. /**
  49. * 把文件移动到重试文件的线程(池)
  50. */
  51. private static ExecutorService remove2RetryPools = null;
  52. /**
  53. * 删除空文件夹的线程(池)
  54. */
  55. private static ExecutorService deleteEmptyDirPools = null;
  56. /**
  57. * 删除空重试文件夹的线程(池)
  58. */
  59. private static ExecutorService deleteEmptyRetryDirPools = null;
  60. /**
  61. * 上传遗留上传文件线程池(默认线程数 4 最小线程数 1)
  62. */
  63. private static ExecutorService oldUploadPools = null;
  64. /**
  65. * 上传当前上传文件线程池(默认线程数 4 最小线程数 1)
  66. */
  67. private static ExecutorService currentUploadPools = null;
  68. /**
  69. * 上传遗留重试文件线程池(默认线程数 2 最小线程数 1)
  70. */
  71. private static ExecutorService oldRetryPools = null;
  72. /**
  73. * 把文件移动到备份文件的线程池(默认线程数 2 最小线程数 1)
  74. */
  75. private static ExecutorService successPools = null;
  76. /**
  77. * 队列集合数组
  78. */
  79. private static Set<IQueue<String>> queueSet = null;
  80. /**
  81. * 线程池启动次数
  82. */
  83. private static int poolsStartCount = 0;
  84. /**
  85. * 初始化各种线程池并启动线程
  86. */
  87. public static void start() {
  88. Configs.sysLog.info("call start runner count: " + poolsStartCount);
  89. if(poolsStartCount == 0) {
  90. initPools();
  91. initQueueSet();
  92. startRunner();
  93. poolsStartCount++;
  94. return;
  95. }
  96. // 线程池启动标识设置为true
  97. // Configs.poolsStartingFlag = true;
  98. // 销毁队列集合
  99. destroyQueueSet();
  100. // 初始化线程池
  101. initPools();
  102. // 等待runner销毁
  103. waitRunningDestroy();
  104. // 初始化队列集合
  105. initQueueSet();
  106. // 线程池启动标识设置为false
  107. // Configs.poolsStartingFlag = false;
  108. // 启动线程
  109. startRunner();
  110. // 线程启动次数加一
  111. poolsStartCount++;
  112. }
  113. /**
  114. * 启动各个线程
  115. */
  116. private static void startRunner() {
  117. // 监听目标文件
  118. startDirMonitorRunner();
  119. // 监听重试文件
  120. startRetryMonitorRunner();
  121. // 老文件加入老上传队列
  122. startAddHistoryFile2UploadRunner();
  123. // 老重试文件加入老重试上传队列
  124. startAddHistoryRetryFile2UploadRunner();
  125. // 删除空文件夹
  126. startDeleteEmptyDirPools();
  127. // 删除空重试文件夹
  128. startDeleteEmptyRetryDirPools();
  129. // 新重试上传
  130. startCurrentRetryRunner();
  131. // 重试移动文件
  132. startRemove2RetryRunner();
  133. // 错误移动文件
  134. startErrorRunner();
  135. // 成功移动
  136. startSuccessRunner(Math.max(Configs.successPoolsLength, 1));
  137. // 当前上传
  138. startCurrentUploadRunner(Math.max(Configs.currentUploadPoolsLength, 1));
  139. try {
  140. TimeUnit.SECONDS.sleep(1);
  141. } catch (InterruptedException e) {
  142. e.printStackTrace();
  143. }
  144. // 老文件上传
  145. startOldUploadRunner(Math.max(Configs.oldUploadPoolsLength, 1));
  146. // 老重试上传
  147. startOldRetryRunner(Math.max(Configs.oldRetryPoolsLength, 1));
  148. }
  149. /**
  150. * 启动删除重试文件夹的空目录
  151. */
  152. private static void startDeleteEmptyRetryDirPools() {
  153. deleteEmptyRetryDirPools.execute(new BaseDeleteRunner(Configs.monitorRetryDir));
  154. }
  155. /**
  156. * 启动删除监听文件夹的空目录
  157. */
  158. private static void startDeleteEmptyDirPools() {
  159. deleteEmptyDirPools.execute(new BaseDeleteRunner(Configs.monitorDir));
  160. }
  161. /**
  162. * 启动遗留重试文件上传的线程池
  163. * @param oldRetryPoolsLength 线程池数量
  164. */
  165. private static void startOldRetryRunner(int oldRetryPoolsLength) {
  166. int count = 0;
  167. while (count < oldRetryPoolsLength) {
  168. oldRetryPools.execute(new RetryHistoryUploadRunner(OldRetryQueue.getInstance()));
  169. count++;
  170. }
  171. }
  172. /**
  173. * 启动遗留文件上传的线程池
  174. * @param oldUploadPoolsLength 线程池数量
  175. */
  176. private static void startOldUploadRunner(int oldUploadPoolsLength) {
  177. int count = 0;
  178. while (count < oldUploadPoolsLength) {
  179. oldUploadPools.execute(new HistoryUploadRunner(OldUploadQueue.getInstance()));
  180. count++;
  181. }
  182. }
  183. /**
  184. * 启动当前文件上传的线程池
  185. * @param currentUploadPoolsLength 线程池数量
  186. */
  187. private static void startCurrentUploadRunner(int currentUploadPoolsLength) {
  188. int count = 0;
  189. while (count < currentUploadPoolsLength) {
  190. currentUploadPools.execute(new CurrentUploadRunner(CurrentUploadQueue.getInstance()));
  191. count++;
  192. }
  193. }
  194. /**
  195. * 启动上传成功后文件移动到备份文件夹的线程池
  196. * @param successPoolsLength 线程池数量
  197. */
  198. private static void startSuccessRunner(int successPoolsLength) {
  199. int count = 0;
  200. while (count < successPoolsLength) {
  201. successPools.execute(new SuccessRunner(SuccessQueue.getInstance()));
  202. count++;
  203. }
  204. }
  205. /**
  206. * 启动文件移动到重试文件夹的线程池(单线程)
  207. */
  208. private static void startErrorRunner() {
  209. errorPools.execute(new ErrorRunner(ErrorQueue.getInstance()));
  210. }
  211. /**
  212. * 启动文件移动到重试文件夹的线程池(单线程)
  213. */
  214. private static void startRemove2RetryRunner() {
  215. remove2RetryPools.execute(new RemoveFile2RetryRunner(RemoveFile2RetryQueue.getInstance()));
  216. }
  217. /**
  218. * 启动重试上传的线程池(单线程)
  219. */
  220. private static void startCurrentRetryRunner() {
  221. currentRetryPools.execute(new RetryCurrentUploadRunner(CurrentRetryUploadQueue.getInstance()));
  222. }
  223. /**
  224. * 启动遍历dcm产生文件夹并把遗留上传文件放入上传队列的线程池(单线程)
  225. */
  226. private static void startAddHistoryRetryFile2UploadRunner() {
  227. addHistoryRetryFile2QueuePools.execute(new AddRetryHistory2QueueRunner());
  228. }
  229. /**
  230. * 启动遍历dcm产生文件夹并把遗留上传文件放入上传队列的线程池(单线程)
  231. */
  232. private static void startAddHistoryFile2UploadRunner() {
  233. addHistoryFile2QueuePools.execute(new AddHistory2QueueRunner());
  234. }
  235. /**
  236. * 启动重试文件夹监听的线程池(单线程)
  237. */
  238. private static void startRetryMonitorRunner() {
  239. retryMonitorPools.execute(new RetryDirMonitor());
  240. }
  241. /**
  242. * 启动目标dcm产生文件夹监听的线程池(单线程)
  243. */
  244. private static void startDirMonitorRunner() {
  245. dirMonitorPools.execute(new DirMonitor());
  246. }
  247. /**
  248. * 等待runner销毁
  249. */
  250. private static void waitRunningDestroy() {
  251. try {
  252. TimeUnit.SECONDS.sleep(WAIT_TIME);
  253. } catch (InterruptedException e) {
  254. e.printStackTrace();
  255. }
  256. }
  257. /**
  258. * 初始化线程池
  259. */
  260. private static void initPools() {
  261. // 监听目标文件线程池
  262. initDirMonitorPools();
  263. // 监听重试文件线程池
  264. initRetryMonitorPools();
  265. // 老文件加入老上传队列线程池
  266. initAddHistoryFile2UploadPools();
  267. // 老重试文件加入老重试上传队列线程池
  268. initAddHistoryRetryFile2UploadPools();
  269. // 新重试上传线程池
  270. initCurrentRetryPools();
  271. // 重试移动文件线程池
  272. initRemove2RetryPools();
  273. // 错误移动文件线程池
  274. initErrorPools();
  275. // 删除空文件夹线程池
  276. initDeleteEmptyDirPools();
  277. // 删除空重试文件夹线程池
  278. initDeleteEmptyRetryDirPools();
  279. // 成功移动线程池
  280. initSuccessPools(Math.max(Configs.successPoolsLength, 1));
  281. // 当前上传线程池
  282. initCurrentUploadPools(Math.max(Configs.currentUploadPoolsLength, 1));
  283. // 老文件上传线程池
  284. initOldUploadPools(Math.max(Configs.oldUploadPoolsLength, 1));
  285. // 老重试上传线程池
  286. initOldRetryPools(Math.max(Configs.oldRetryPoolsLength, 1));
  287. }
  288. /**
  289. * 初始化删除空文件夹的线程池
  290. */
  291. private static void initDeleteEmptyDirPools() {
  292. deleteEmptyDirPools = generateSinglePools(deleteEmptyDirPools);
  293. }
  294. /**
  295. * 初始化删除空重试文件夹的线程池
  296. */
  297. private static void initDeleteEmptyRetryDirPools() {
  298. deleteEmptyRetryDirPools = generateSinglePools(deleteEmptyRetryDirPools);
  299. }
  300. /**
  301. * 初始化遗留重试文件上传的线程池
  302. * @param oldRetryPoolsLength 线程池数量
  303. */
  304. private static void initOldRetryPools(int oldRetryPoolsLength) {
  305. oldRetryPools = generatePools(oldRetryPools, oldRetryPoolsLength);
  306. }
  307. /**
  308. * 初始化遗留文件上传的线程池
  309. * @param oldUploadPoolsLength 线程池数量
  310. */
  311. private static void initOldUploadPools(int oldUploadPoolsLength) {
  312. oldUploadPools = generatePools(oldUploadPools, oldUploadPoolsLength);
  313. }
  314. /**
  315. * 初始化当前文件上传的线程池
  316. * @param currentUploadPoolsLength 线程池数量
  317. */
  318. private static void initCurrentUploadPools(int currentUploadPoolsLength) {
  319. currentUploadPools = generatePools(currentUploadPools, currentUploadPoolsLength);
  320. }
  321. /**
  322. * 初始化上传成功后文件移动到备份文件夹的线程池
  323. * @param successPoolsLength 线程池数量
  324. */
  325. private static void initSuccessPools(int successPoolsLength) {
  326. successPools = generatePools(successPools, successPoolsLength);
  327. }
  328. /**
  329. * 初始化文件移动到错误文件夹的线程池(单线程)
  330. */
  331. private static void initErrorPools() {
  332. errorPools = generateSinglePools(errorPools);
  333. }
  334. /**
  335. * 初始化文件移动到重试文件夹的线程池(单线程)
  336. */
  337. private static void initRemove2RetryPools() {
  338. remove2RetryPools = generateSinglePools(remove2RetryPools);
  339. }
  340. /**
  341. * 初始化重试上传的线程池(单线程)
  342. */
  343. private static void initCurrentRetryPools() {
  344. currentRetryPools = generateSinglePools(currentRetryPools);
  345. }
  346. /**
  347. * 初始化遍历重试文件夹并把遗留重试文件放入队列的线程池(单线程)
  348. */
  349. private static void initAddHistoryRetryFile2UploadPools() {
  350. addHistoryRetryFile2QueuePools = generateSinglePools(addHistoryRetryFile2QueuePools);
  351. }
  352. /**
  353. * 初始化遍历dcm产生文件夹并把遗留上传文件放入上传队列的线程池(单线程)
  354. */
  355. private static void initAddHistoryFile2UploadPools() {
  356. addHistoryFile2QueuePools = generateSinglePools(addHistoryFile2QueuePools);
  357. }
  358. /**
  359. * 初始化重试文件夹监听的线程池(单线程)
  360. */
  361. private static void initRetryMonitorPools() {
  362. retryMonitorPools = generateSinglePools(retryMonitorPools);
  363. }
  364. /**
  365. * 初始化目标dcm产生文件夹监听的线程池(单线程)
  366. */
  367. private static void initDirMonitorPools() {
  368. dirMonitorPools = generateSinglePools(dirMonitorPools);
  369. }
  370. /**
  371. * 销毁队列集合
  372. */
  373. private static void destroyQueueSet() {
  374. if(queueSet != null) {
  375. queueSet = destroyQueue(queueSet);
  376. }
  377. if(queueSet != null) {
  378. Configs.sysLog.info("queueSet destroy fail!");
  379. queueSet.clear();
  380. queueSet = null;
  381. }
  382. }
  383. /**
  384. * 销毁已存在的队列集合
  385. * @param set
  386. * @return
  387. */
  388. private static Set<IQueue<String>> destroyQueue(Set<IQueue<String>> set) {
  389. for (IQueue<String> queue : queueSet) {
  390. if(queue != null) {
  391. queue.destroy();
  392. }
  393. }
  394. return null;
  395. }
  396. /**
  397. * 初始化队列集合
  398. */
  399. private static void initQueueSet() {
  400. queueSet = new HashSet<>();
  401. createQueue(queueSet);
  402. initQueue(queueSet);
  403. }
  404. /**
  405. * 创建队列
  406. * @param set
  407. */
  408. private static void createQueue(Set<IQueue<String>> set) {
  409. set.add(CurrentRetryUploadQueue.getInstance());
  410. set.add(CurrentUploadQueue.getInstance());
  411. set.add(ErrorQueue.getInstance());
  412. set.add(OldRetryQueue.getInstance());
  413. set.add(OldUploadQueue.getInstance());
  414. set.add(RemoveFile2RetryQueue.getInstance());
  415. set.add(SuccessQueue.getInstance());
  416. }
  417. /**
  418. * 初始化所有队列
  419. * @param queueSet
  420. */
  421. private static void initQueue(Set<IQueue<String>> queueSet) {
  422. for (IQueue<String> queue : queueSet) {
  423. if(queue != null) {
  424. queue.init();
  425. }
  426. }
  427. }
  428. /**
  429. * 生成线程池
  430. * @param oldPool 老线程池
  431. * @param length 线程数量
  432. * @return
  433. */
  434. public static ExecutorService generatePools(ExecutorService oldPool, int length) {
  435. if (oldPool != null && !oldPool.isTerminated()) {
  436. oldPool.shutdown();
  437. while (oldPool.isTerminated()) {
  438. try {
  439. oldPool.awaitTermination(200, TimeUnit.MILLISECONDS);
  440. } catch (InterruptedException e) {
  441. e.printStackTrace();
  442. }
  443. }
  444. }
  445. if(length == 1) {
  446. return Executors.newSingleThreadExecutor();
  447. }
  448. return Executors.newFixedThreadPool(length);
  449. }
  450. /**
  451. * 生成只有一个线程的线程池
  452. * @param oldPool 老线程池
  453. * @return
  454. */
  455. public static ExecutorService generateSinglePools(ExecutorService oldPool) {
  456. return generatePools(oldPool, 1);
  457. }
  458. }