123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550 |
- package com.zskk.dicom.monitor.schedules;
- import com.zskk.dicom.monitor.config.Configs;
- import com.zskk.dicom.monitor.queue.*;
- import com.zskk.dicom.monitor.runner.*;
- import java.util.HashSet;
- import java.util.Set;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
- /**
- *
- * 功能描述:
- *
- * @param: dirMonitorPools 监控文件线程池 oldUploadPools 旧文件
- * @return:
- * @auther: zzp
- * @date: 2019/3/10 10:35
- */
- public class StatRunner {
- /**
- * 休眠30s等待线程全部退出
- */
- private static final int WAIT_TIME = 30;
- /**
- * dcm文件产生文件夹监听线程(池)
- */
- private static ExecutorService dirMonitorPools = null;
- /**
- * 重试文件夹监听线程(池)
- */
- private static ExecutorService retryMonitorPools = null;
- /**
- * 把遗留上传文件添加进入队列的线程(池)
- */
- private static ExecutorService addHistoryFile2QueuePools = null;
- /**
- * 把遗留重试上传文件添加进入队列的线程(池)
- */
- private static ExecutorService addHistoryRetryFile2QueuePools = null;
- /**
- * 上传当前重试文件线程(池)
- */
- private static ExecutorService currentRetryPools = null;
- /**
- * 把文件移动到错误文件的线程(池)
- */
- private static ExecutorService errorPools = null;
- /**
- * 把文件移动到重试文件的线程(池)
- */
- private static ExecutorService remove2RetryPools = null;
- /**
- * 删除空文件夹的线程(池)
- */
- private static ExecutorService deleteEmptyDirPools = null;
- /**
- * 删除空重试文件夹的线程(池)
- */
- private static ExecutorService deleteEmptyRetryDirPools = null;
- /**
- * 上传遗留上传文件线程池(默认线程数 4 最小线程数 1)
- */
- private static ExecutorService oldUploadPools = null;
- /**
- * 上传当前上传文件线程池(默认线程数 4 最小线程数 1)
- */
- private static ExecutorService currentUploadPools = null;
- /**
- * 上传遗留重试文件线程池(默认线程数 2 最小线程数 1)
- */
- private static ExecutorService oldRetryPools = null;
- /**
- * 把文件移动到备份文件的线程池(默认线程数 2 最小线程数 1)
- */
- private static ExecutorService successPools = null;
- /**
- * 队列集合数组
- */
- private static Set<IQueue<String>> queueSet = null;
- /**
- * 线程池启动次数
- */
- private static int poolsStartCount = 0;
- /**
- * 初始化各种线程池并启动线程
- */
- public static void start() {
- Configs.sysLog.info("call start runner count: " + poolsStartCount);
- if(poolsStartCount == 0) {
- initPools();
- initQueueSet();
- startRunner();
- poolsStartCount++;
- return;
- }
- // 线程池启动标识设置为true
- // Configs.poolsStartingFlag = true;
- // 销毁队列集合
- destroyQueueSet();
- // 初始化线程池
- initPools();
- // 等待runner销毁
- waitRunningDestroy();
- // 初始化队列集合
- initQueueSet();
- // 线程池启动标识设置为false
- // Configs.poolsStartingFlag = false;
- // 启动线程
- startRunner();
- // 线程启动次数加一
- poolsStartCount++;
- }
- /**
- * 启动各个线程
- */
- private static void startRunner() {
- // 监听目标文件
- startDirMonitorRunner();
- // 监听重试文件
- startRetryMonitorRunner();
- // 老文件加入老上传队列
- startAddHistoryFile2UploadRunner();
- // 老重试文件加入老重试上传队列
- startAddHistoryRetryFile2UploadRunner();
- // 删除空文件夹
- startDeleteEmptyDirPools();
- // 删除空重试文件夹
- startDeleteEmptyRetryDirPools();
- // 新重试上传
- startCurrentRetryRunner();
- // 重试移动文件
- startRemove2RetryRunner();
- // 错误移动文件
- startErrorRunner();
- // 成功移动
- startSuccessRunner(Math.max(Configs.successPoolsLength, 1));
- // 当前上传
- startCurrentUploadRunner(Math.max(Configs.currentUploadPoolsLength, 1));
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- // 老文件上传
- startOldUploadRunner(Math.max(Configs.oldUploadPoolsLength, 1));
- // 老重试上传
- startOldRetryRunner(Math.max(Configs.oldRetryPoolsLength, 1));
- }
- /**
- * 启动删除重试文件夹的空目录
- */
- private static void startDeleteEmptyRetryDirPools() {
- deleteEmptyRetryDirPools.execute(new BaseDeleteRunner(Configs.monitorRetryDir));
- }
- /**
- * 启动删除监听文件夹的空目录
- */
- private static void startDeleteEmptyDirPools() {
- deleteEmptyDirPools.execute(new BaseDeleteRunner(Configs.monitorDir));
- }
- /**
- * 启动遗留重试文件上传的线程池
- * @param oldRetryPoolsLength 线程池数量
- */
- private static void startOldRetryRunner(int oldRetryPoolsLength) {
- int count = 0;
- while (count < oldRetryPoolsLength) {
- oldRetryPools.execute(new RetryHistoryUploadRunner(OldRetryQueue.getInstance()));
- count++;
- }
- }
- /**
- * 启动遗留文件上传的线程池
- * @param oldUploadPoolsLength 线程池数量
- */
- private static void startOldUploadRunner(int oldUploadPoolsLength) {
- int count = 0;
- while (count < oldUploadPoolsLength) {
- oldUploadPools.execute(new HistoryUploadRunner(OldUploadQueue.getInstance()));
- count++;
- }
- }
- /**
- * 启动当前文件上传的线程池
- * @param currentUploadPoolsLength 线程池数量
- */
- private static void startCurrentUploadRunner(int currentUploadPoolsLength) {
- int count = 0;
- while (count < currentUploadPoolsLength) {
- currentUploadPools.execute(new CurrentUploadRunner(CurrentUploadQueue.getInstance()));
- count++;
- }
- }
- /**
- * 启动上传成功后文件移动到备份文件夹的线程池
- * @param successPoolsLength 线程池数量
- */
- private static void startSuccessRunner(int successPoolsLength) {
- int count = 0;
- while (count < successPoolsLength) {
- successPools.execute(new SuccessRunner(SuccessQueue.getInstance()));
- count++;
- }
- }
- /**
- * 启动文件移动到重试文件夹的线程池(单线程)
- */
- private static void startErrorRunner() {
- errorPools.execute(new ErrorRunner(ErrorQueue.getInstance()));
- }
- /**
- * 启动文件移动到重试文件夹的线程池(单线程)
- */
- private static void startRemove2RetryRunner() {
- remove2RetryPools.execute(new RemoveFile2RetryRunner(RemoveFile2RetryQueue.getInstance()));
- }
- /**
- * 启动重试上传的线程池(单线程)
- */
- private static void startCurrentRetryRunner() {
- currentRetryPools.execute(new RetryCurrentUploadRunner(CurrentRetryUploadQueue.getInstance()));
- }
- /**
- * 启动遍历dcm产生文件夹并把遗留上传文件放入上传队列的线程池(单线程)
- */
- private static void startAddHistoryRetryFile2UploadRunner() {
- addHistoryRetryFile2QueuePools.execute(new AddRetryHistory2QueueRunner());
- }
- /**
- * 启动遍历dcm产生文件夹并把遗留上传文件放入上传队列的线程池(单线程)
- */
- private static void startAddHistoryFile2UploadRunner() {
- addHistoryFile2QueuePools.execute(new AddHistory2QueueRunner());
- }
- /**
- * 启动重试文件夹监听的线程池(单线程)
- */
- private static void startRetryMonitorRunner() {
- retryMonitorPools.execute(new RetryDirMonitor());
- }
- /**
- * 启动目标dcm产生文件夹监听的线程池(单线程)
- */
- private static void startDirMonitorRunner() {
- dirMonitorPools.execute(new DirMonitor());
- }
- /**
- * 等待runner销毁
- */
- private static void waitRunningDestroy() {
- try {
- TimeUnit.SECONDS.sleep(WAIT_TIME);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- /**
- * 初始化线程池
- */
- private static void initPools() {
- // 监听目标文件线程池
- initDirMonitorPools();
- // 监听重试文件线程池
- initRetryMonitorPools();
- // 老文件加入老上传队列线程池
- initAddHistoryFile2UploadPools();
- // 老重试文件加入老重试上传队列线程池
- initAddHistoryRetryFile2UploadPools();
- // 新重试上传线程池
- initCurrentRetryPools();
- // 重试移动文件线程池
- initRemove2RetryPools();
- // 错误移动文件线程池
- initErrorPools();
- // 删除空文件夹线程池
- initDeleteEmptyDirPools();
- // 删除空重试文件夹线程池
- initDeleteEmptyRetryDirPools();
- // 成功移动线程池
- initSuccessPools(Math.max(Configs.successPoolsLength, 1));
- // 当前上传线程池
- initCurrentUploadPools(Math.max(Configs.currentUploadPoolsLength, 1));
- // 老文件上传线程池
- initOldUploadPools(Math.max(Configs.oldUploadPoolsLength, 1));
- // 老重试上传线程池
- initOldRetryPools(Math.max(Configs.oldRetryPoolsLength, 1));
- }
- /**
- * 初始化删除空文件夹的线程池
- */
- private static void initDeleteEmptyDirPools() {
- deleteEmptyDirPools = generateSinglePools(deleteEmptyDirPools);
- }
- /**
- * 初始化删除空重试文件夹的线程池
- */
- private static void initDeleteEmptyRetryDirPools() {
- deleteEmptyRetryDirPools = generateSinglePools(deleteEmptyRetryDirPools);
- }
- /**
- * 初始化遗留重试文件上传的线程池
- * @param oldRetryPoolsLength 线程池数量
- */
- private static void initOldRetryPools(int oldRetryPoolsLength) {
- oldRetryPools = generatePools(oldRetryPools, oldRetryPoolsLength);
- }
- /**
- * 初始化遗留文件上传的线程池
- * @param oldUploadPoolsLength 线程池数量
- */
- private static void initOldUploadPools(int oldUploadPoolsLength) {
- oldUploadPools = generatePools(oldUploadPools, oldUploadPoolsLength);
- }
- /**
- * 初始化当前文件上传的线程池
- * @param currentUploadPoolsLength 线程池数量
- */
- private static void initCurrentUploadPools(int currentUploadPoolsLength) {
- currentUploadPools = generatePools(currentUploadPools, currentUploadPoolsLength);
- }
- /**
- * 初始化上传成功后文件移动到备份文件夹的线程池
- * @param successPoolsLength 线程池数量
- */
- private static void initSuccessPools(int successPoolsLength) {
- successPools = generatePools(successPools, successPoolsLength);
- }
- /**
- * 初始化文件移动到错误文件夹的线程池(单线程)
- */
- private static void initErrorPools() {
- errorPools = generateSinglePools(errorPools);
- }
- /**
- * 初始化文件移动到重试文件夹的线程池(单线程)
- */
- private static void initRemove2RetryPools() {
- remove2RetryPools = generateSinglePools(remove2RetryPools);
- }
- /**
- * 初始化重试上传的线程池(单线程)
- */
- private static void initCurrentRetryPools() {
- currentRetryPools = generateSinglePools(currentRetryPools);
- }
- /**
- * 初始化遍历重试文件夹并把遗留重试文件放入队列的线程池(单线程)
- */
- private static void initAddHistoryRetryFile2UploadPools() {
- addHistoryRetryFile2QueuePools = generateSinglePools(addHistoryRetryFile2QueuePools);
- }
- /**
- * 初始化遍历dcm产生文件夹并把遗留上传文件放入上传队列的线程池(单线程)
- */
- private static void initAddHistoryFile2UploadPools() {
- addHistoryFile2QueuePools = generateSinglePools(addHistoryFile2QueuePools);
- }
- /**
- * 初始化重试文件夹监听的线程池(单线程)
- */
- private static void initRetryMonitorPools() {
- retryMonitorPools = generateSinglePools(retryMonitorPools);
- }
- /**
- * 初始化目标dcm产生文件夹监听的线程池(单线程)
- */
- private static void initDirMonitorPools() {
- dirMonitorPools = generateSinglePools(dirMonitorPools);
- }
- /**
- * 销毁队列集合
- */
- private static void destroyQueueSet() {
- if(queueSet != null) {
- queueSet = destroyQueue(queueSet);
- }
- if(queueSet != null) {
- Configs.sysLog.info("queueSet destroy fail!");
- queueSet.clear();
- queueSet = null;
- }
- }
- /**
- * 销毁已存在的队列集合
- * @param set
- * @return
- */
- private static Set<IQueue<String>> destroyQueue(Set<IQueue<String>> set) {
- for (IQueue<String> queue : queueSet) {
- if(queue != null) {
- queue.destroy();
- }
- }
- return null;
- }
- /**
- * 初始化队列集合
- */
- private static void initQueueSet() {
- queueSet = new HashSet<>();
- createQueue(queueSet);
- initQueue(queueSet);
- }
- /**
- * 创建队列
- * @param set
- */
- private static void createQueue(Set<IQueue<String>> set) {
- set.add(CurrentRetryUploadQueue.getInstance());
- set.add(CurrentUploadQueue.getInstance());
- set.add(ErrorQueue.getInstance());
- set.add(OldRetryQueue.getInstance());
- set.add(OldUploadQueue.getInstance());
- set.add(RemoveFile2RetryQueue.getInstance());
- set.add(SuccessQueue.getInstance());
- }
- /**
- * 初始化所有队列
- * @param queueSet
- */
- private static void initQueue(Set<IQueue<String>> queueSet) {
- for (IQueue<String> queue : queueSet) {
- if(queue != null) {
- queue.init();
- }
- }
- }
- /**
- * 生成线程池
- * @param oldPool 老线程池
- * @param length 线程数量
- * @return
- */
- public static ExecutorService generatePools(ExecutorService oldPool, int length) {
- if (oldPool != null && !oldPool.isTerminated()) {
- oldPool.shutdown();
- while (oldPool.isTerminated()) {
- try {
- oldPool.awaitTermination(200, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- if(length == 1) {
- return Executors.newSingleThreadExecutor();
- }
- return Executors.newFixedThreadPool(length);
- }
- /**
- * 生成只有一个线程的线程池
- * @param oldPool 老线程池
- * @return
- */
- public static ExecutorService generateSinglePools(ExecutorService oldPool) {
- return generatePools(oldPool, 1);
- }
- }
|