|
- package com.zskk.dicom.monitor.schedules;
- import com.zskk.dicom.monitor.config.Configs;
- import com.zskk.dicom.monitor.queue.*;
- import com.zskk.dicom.monitor.runner.*;
- import java.util.HashSet;
- import java.util.Set;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
- /**
- *
- * 功能描述:
- *
- * @param: dirMonitorPools 监控文件线程池 oldUploadPools 旧文件
- * @return:
- * @auther: zzp
- * @date: 2019/3/10 10:35
- */
- public class StatRunner {
- /**
- * 休眠30s等待线程全部退出
- */
- private static final int WAIT_TIME = 30;
- /**
- * dcm文件产生文件夹监听线程(池)
- */
- private static ExecutorService dirMonitorPools = null;
- /**
- * 重试文件夹监听线程(池)
- */
- private static ExecutorService retryMonitorPools = null;
- /**
- * 把遗留上传文件添加进入队列的线程(池)
- */
- private static ExecutorService addHistoryFile2QueuePools = null;
- /**
- * 把遗留重试上传文件添加进入队列的线程(池)
- */
- private static ExecutorService addHistoryRetryFile2QueuePools = null;
- /**
- * 上传当前重试文件线程(池)
- */
- private static ExecutorService currentRetryPools = null;
- /**
- * 把文件移动到错误文件的线程(池)
- */
- private static ExecutorService errorPools = null;
- /**
- * 把文件移动到重试文件的线程(池)
- */
- private static ExecutorService remove2RetryPools = null;
- /**
- * 删除空文件夹的线程(池)
- */
- private static ExecutorService deleteEmptyDirPools = null;
- /**
- * 删除空重试文件夹的线程(池)
- */
- private static ExecutorService deleteEmptyRetryDirPools = null;
- /**
- * 上传遗留上传文件线程池(默认线程数 4 最小线程数 1)
- */
- private static ExecutorService oldUploadPools = null;
- /**
- * 上传当前上传文件线程池(默认线程数 4 最小线程数 1)
- */
- private static ExecutorService currentUploadPools = null;
- /**
- * 上传遗留重试文件线程池(默认线程数 2 最小线程数 1)
- */
- private static ExecutorService oldRetryPools = null;
- /**
- * 把文件移动到备份文件的线程池(默认线程数 2 最小线程数 1)
- */
- private static ExecutorService successPools = null;
- /**
- * 队列集合数组
- */
- private static Set<IQueue<String>> queueSet = null;
- /**
- * 线程池启动次数
- */
- private static int poolsStartCount = 0;
- /**
- * 初始化各种线程池并启动线程
- */
- public static void start() {
- Configs.sysLog.info("call start runner count: " + poolsStartCount);
- if(poolsStartCount == 0) {
- initPools();
- initQueueSet();
- startRunner();
- poolsStartCount++;
- return;
- }
- // 线程池启动标识设置为true
- // Configs.poolsStartingFlag = true;
- // 销毁队列集合
- destroyQueueSet();
- // 初始化线程池
- initPools();
- // 等待runner销毁
- waitRunningDestroy();
- // 初始化队列集合
- initQueueSet();
- // 线程池启动标识设置为false
- // Configs.poolsStartingFlag = false;
- // 启动线程
- startRunner();
- // 线程启动次数加一
- poolsStartCount++;
- }
- /**
- * 启动各个线程
- */
- private static void startRunner() {
- // 监听目标文件
- startDirMonitorRunner();
- // 监听重试文件
- startRetryMonitorRunner();
- // 老文件加入老上传队列
- startAddHistoryFile2UploadRunner();
- // 老重试文件加入老重试上传队列
- startAddHistoryRetryFile2UploadRunner();
- // 删除空文件夹
- startDeleteEmptyDirPools();
- // 删除空重试文件夹
- startDeleteEmptyRetryDirPools();
- // 新重试上传
- startCurrentRetryRunner();
- // 重试移动文件
- startRemove2RetryRunner();
- // 错误移动文件
- startErrorRunner();
- // 成功移动
- startSuccessRunner(Math.max(Configs.successPoolsLength, 1));
- // 当前上传
- startCurrentUploadRunner(Math.max(Configs.currentUploadPoolsLength, 1));
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- // 老文件上传
- startOldUploadRunner(Math.max(Configs.oldUploadPoolsLength, 1));
- // 老重试上传
- startOldRetryRunner(Math.max(Configs.oldRetryPoolsLength, 1));
- }
- /**
- * 启动删除重试文件夹的空目录
- */
- private static void startDeleteEmptyRetryDirPools() {
- deleteEmptyRetryDirPools.execute(new BaseDeleteRunner(Configs.monitorRetryDir));
- }
- /**
- * 启动删除监听文件夹的空目录
- */
- private static void startDeleteEmptyDirPools() {
- deleteEmptyDirPools.execute(new BaseDeleteRunner(Configs.monitorDir));
- }
- /**
- * 启动遗留重试文件上传的线程池
- * @param oldRetryPoolsLength 线程池数量
- */
- private static void startOldRetryRunner(int oldRetryPoolsLength) {
- int count = 0;
- while (count < oldRetryPoolsLength) {
- oldRetryPools.execute(new RetryHistoryUploadRunner(OldRetryQueue.getInstance()));
- count++;
- }
- }
- /**
- * 启动遗留文件上传的线程池
- * @param oldUploadPoolsLength 线程池数量
- */
- private static void startOldUploadRunner(int oldUploadPoolsLength) {
- int count = 0;
- while (count < oldUploadPoolsLength) {
- oldUploadPools.execute(new HistoryUploadRunner(OldUploadQueue.getInstance()));
- count++;
- }
- }
- /**
- * 启动当前文件上传的线程池
- * @param currentUploadPoolsLength 线程池数量
- */
- private static void startCurrentUploadRunner(int currentUploadPoolsLength) {
- int count = 0;
- while (count < currentUploadPoolsLength) {
- currentUploadPools.execute(new CurrentUploadRunner(CurrentUploadQueue.getInstance()));
- count++;
- }
- }
- /**
- * 启动上传成功后文件移动到备份文件夹的线程池
- * @param successPoolsLength 线程池数量
- */
- private static void startSuccessRunner(int successPoolsLength) {
- int count = 0;
- while (count < successPoolsLength) {
- successPools.execute(new SuccessRunner(SuccessQueue.getInstance()));
- count++;
- }
- }
- /**
- * 启动文件移动到重试文件夹的线程池(单线程)
- */
- private static void startErrorRunner() {
- errorPools.execute(new ErrorRunner(ErrorQueue.getInstance()));
- }
- /**
- * 启动文件移动到重试文件夹的线程池(单线程)
- */
- private static void startRemove2RetryRunner() {
- remove2RetryPools.execute(new RemoveFile2RetryRunner(RemoveFile2RetryQueue.getInstance()));
- }
- /**
- * 启动重试上传的线程池(单线程)
- */
- private static void startCurrentRetryRunner() {
- currentRetryPools.execute(new RetryCurrentUploadRunner(CurrentRetryUploadQueue.getInstance()));
- }
- /**
- * 启动遍历dcm产生文件夹并把遗留上传文件放入上传队列的线程池(单线程)
- */
- private static void startAddHistoryRetryFile2UploadRunner() {
- addHistoryRetryFile2QueuePools.execute(new AddRetryHistory2QueueRunner());
- }
- /**
- * 启动遍历dcm产生文件夹并把遗留上传文件放入上传队列的线程池(单线程)
- */
- private static void startAddHistoryFile2UploadRunner() {
- addHistoryFile2QueuePools.execute(new AddHistory2QueueRunner());
- }
- /**
- * 启动重试文件夹监听的线程池(单线程)
- */
- private static void startRetryMonitorRunner() {
- retryMonitorPools.execute(new RetryDirMonitor());
- }
- /**
- * 启动目标dcm产生文件夹监听的线程池(单线程)
- */
- private static void startDirMonitorRunner() {
- dirMonitorPools.execute(new DirMonitor());
- }
- /**
- * 等待runner销毁
- */
- private static void waitRunningDestroy() {
- try {
- TimeUnit.SECONDS.sleep(WAIT_TIME);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- /**
- * 初始化线程池
- */
- private static void initPools() {
- // 监听目标文件线程池
- initDirMonitorPools();
- // 监听重试文件线程池
- initRetryMonitorPools();
- // 老文件加入老上传队列线程池
- initAddHistoryFile2UploadPools();
- // 老重试文件加入老重试上传队列线程池
- initAddHistoryRetryFile2UploadPools();
- // 新重试上传线程池
- initCurrentRetryPools();
- // 重试移动文件线程池
- initRemove2RetryPools();
- // 错误移动文件线程池
- initErrorPools();
- // 删除空文件夹线程池
- initDeleteEmptyDirPools();
- // 删除空重试文件夹线程池
- initDeleteEmptyRetryDirPools();
- // 成功移动线程池
- initSuccessPools(Math.max(Configs.successPoolsLength, 1));
- // 当前上传线程池
- initCurrentUploadPools(Math.max(Configs.currentUploadPoolsLength, 1));
- // 老文件上传线程池
- initOldUploadPools(Math.max(Configs.oldUploadPoolsLength, 1));
- // 老重试上传线程池
- initOldRetryPools(Math.max(Configs.oldRetryPoolsLength, 1));
- }
- /**
- * 初始化删除空文件夹的线程池
- */
- private static void initDeleteEmptyDirPools() {
- deleteEmptyDirPools = generateSinglePools(deleteEmptyDirPools);
- }
- /**
- * 初始化删除空重试文件夹的线程池
- */
- private static void initDeleteEmptyRetryDirPools() {
- deleteEmptyRetryDirPools = generateSinglePools(deleteEmptyRetryDirPools);
- }
- /**
- * 初始化遗留重试文件上传的线程池
- * @param oldRetryPoolsLength 线程池数量
- */
- private static void initOldRetryPools(int oldRetryPoolsLength) {
- oldRetryPools = generatePools(oldRetryPools, oldRetryPoolsLength);
- }
- /**
- * 初始化遗留文件上传的线程池
- * @param oldUploadPoolsLength 线程池数量
- */
- private static void initOldUploadPools(int oldUploadPoolsLength) {
- oldUploadPools = generatePools(oldUploadPools, oldUploadPoolsLength);
- }
- /**
- * 初始化当前文件上传的线程池
- * @param currentUploadPoolsLength 线程池数量
- */
- private static void initCurrentUploadPools(int currentUploadPoolsLength) {
- currentUploadPools = generatePools(currentUploadPools, currentUploadPoolsLength);
- }
- /**
- * 初始化上传成功后文件移动到备份文件夹的线程池
- * @param successPoolsLength 线程池数量
- */
- private static void initSuccessPools(int successPoolsLength) {
- successPools = generatePools(successPools, successPoolsLength);
- }
- /**
- * 初始化文件移动到错误文件夹的线程池(单线程)
- */
- private static void initErrorPools() {
- errorPools = generateSinglePools(errorPools);
- }
- /**
- * 初始化文件移动到重试文件夹的线程池(单线程)
- */
- private static void initRemove2RetryPools() {
- remove2RetryPools = generateSinglePools(remove2RetryPools);
- }
- /**
- * 初始化重试上传的线程池(单线程)
- */
- private static void initCurrentRetryPools() {
- currentRetryPools = generateSinglePools(currentRetryPools);
- }
- /**
- * 初始化遍历重试文件夹并把遗留重试文件放入队列的线程池(单线程)
- */
- private static void initAddHistoryRetryFile2UploadPools() {
- addHistoryRetryFile2QueuePools = generateSinglePools(addHistoryRetryFile2QueuePools);
- }
- /**
- * 初始化遍历dcm产生文件夹并把遗留上传文件放入上传队列的线程池(单线程)
- */
- private static void initAddHistoryFile2UploadPools() {
- addHistoryFile2QueuePools = generateSinglePools(addHistoryFile2QueuePools);
- }
- /**
- * 初始化重试文件夹监听的线程池(单线程)
- */
- private static void initRetryMonitorPools() {
- retryMonitorPools = generateSinglePools(retryMonitorPools);
- }
- /**
- * 初始化目标dcm产生文件夹监听的线程池(单线程)
- */
- private static void initDirMonitorPools() {
- dirMonitorPools = generateSinglePools(dirMonitorPools);
- }
- /**
- * 销毁队列集合
- */
- private static void destroyQueueSet() {
- if(queueSet != null) {
- queueSet = destroyQueue(queueSet);
- }
- if(queueSet != null) {
- Configs.sysLog.info("queueSet destroy fail!");
- queueSet.clear();
- queueSet = null;
- }
- }
- /**
- * 销毁已存在的队列集合
- * @param set
- * @return
- */
- private static Set<IQueue<String>> destroyQueue(Set<IQueue<String>> set) {
- for (IQueue<String> queue : queueSet) {
- if(queue != null) {
- queue.destroy();
- }
- }
- return null;
- }
- /**
- * 初始化队列集合
- */
- private static void initQueueSet() {
- queueSet = new HashSet<>();
- createQueue(queueSet);
- initQueue(queueSet);
- }
- /**
- * 创建队列
- * @param set
- */
- private static void createQueue(Set<IQueue<String>> set) {
- set.add(CurrentRetryUploadQueue.getInstance());
- set.add(CurrentUploadQueue.getInstance());
- set.add(ErrorQueue.getInstance());
- set.add(OldRetryQueue.getInstance());
- set.add(OldUploadQueue.getInstance());
- set.add(RemoveFile2RetryQueue.getInstance());
- set.add(SuccessQueue.getInstance());
- }
- /**
- * 初始化所有队列
- * @param queueSet
- */
- private static void initQueue(Set<IQueue<String>> queueSet) {
- for (IQueue<String> queue : queueSet) {
- if(queue != null) {
- queue.init();
- }
- }
- }
- /**
- * 生成线程池
- * @param oldPool 老线程池
- * @param length 线程数量
- * @return
- */
- public static ExecutorService generatePools(ExecutorService oldPool, int length) {
- if (oldPool != null && !oldPool.isTerminated()) {
- oldPool.shutdown();
- while (oldPool.isTerminated()) {
- try {
- oldPool.awaitTermination(200, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- if(length == 1) {
- return Executors.newSingleThreadExecutor();
- }
- return Executors.newFixedThreadPool(length);
- }
- /**
- * 生成只有一个线程的线程池
- * @param oldPool 老线程池
- * @return
- */
- public static ExecutorService generateSinglePools(ExecutorService oldPool) {
- return generatePools(oldPool, 1);
- }
- }
|