Selaa lähdekoodia

修改质控任务并行处理的死锁问题

gengjunfang 1 viikko sitten
vanhempi
commit
a00e944709

+ 27 - 11
src/main/java/com/zskk/qcns/modules/qc/quartz/AutoQcJob.java

@@ -9,15 +9,15 @@ import org.springframework.stereotype.Component;
 
 /**
  * 自动质控任务Job
- * 由Quartz定时调度执行
+ * 由Quartz定时调度执行,支持中断
  *
  * @author system
  * @date 2026-02-03
  */
 @Slf4j
 @Component
-@DisallowConcurrentExecution  // 禁止并发执行同一任务
-public class AutoQcJob implements Job {
+@DisallowConcurrentExecution
+public class AutoQcJob implements InterruptableJob {
 
     @Autowired
     private QcTaskExecutionService taskExecutionService;
@@ -25,41 +25,57 @@ public class AutoQcJob implements Job {
     @Autowired
     private QcExecutionService qcExecutionService;
 
+    private volatile boolean interrupted = false;
+
+    @Override
+    public void interrupt() throws UnableToInterruptJobException {
+        log.info("收到中断信号,准备停止质控任务");
+        interrupted = true;
+    }
+
     @Override
     public void execute(JobExecutionContext context) throws JobExecutionException {
         String taskId = null;
         Long executionId = null;
 
         try {
-            // 1. 从JobDataMap获取任务ID
             JobDataMap dataMap = context.getJobDetail().getJobDataMap();
             taskId = dataMap.getString("taskId");
 
             log.info("自动质控任务开始执行,taskId={}", taskId);
 
-            // 2. 创建执行记录
             executionId = taskExecutionService.createExecution(taskId);
 
-            // 3. 执行质控(使用原有的QcExecutionService)
             boolean success = qcExecutionService.executeQcTask(taskId);
 
+            if (interrupted) {
+                log.info("质控任务被中断,taskId={}", taskId);
+                if (executionId != null) {
+                    taskExecutionService.failExecution(executionId, "任务被用户手动停用", null);
+                }
+                return;
+            }
+
             if (!success) {
                 throw new RuntimeException("质控任务执行失败");
             }
 
-            // 4. 更新执行记录状态为完成
             taskExecutionService.completeExecution(executionId);
-
             log.info("自动质控任务执行完成,taskId={}, executionId={}", taskId, executionId);
 
         } catch (Exception e) {
-            log.error("自动质控任务执行失败,taskId={}, executionId={}", taskId, executionId, e);
+            if (interrupted) {
+                log.info("质控任务被中断,taskId={}", taskId);
+                if (executionId != null) {
+                    taskExecutionService.failExecution(executionId, "任务被用户手动停用", null);
+                }
+                return;
+            }
 
-            // 记录失败信息
+            log.error("自动质控任务执行失败,taskId={}, executionId={}", taskId, executionId, e);
             if (executionId != null) {
                 taskExecutionService.failExecution(executionId, e.getMessage(), getStackTrace(e));
             }
-
             throw new JobExecutionException(e);
         }
     }

+ 8 - 0
src/main/java/com/zskk/qcns/modules/qc/service/QcJobService.java

@@ -56,6 +56,14 @@ public interface QcJobService {
      */
     void updateJobCronExpression(String taskId, String cronExpression);
 
+    /**
+     * 中断正在执行的任务
+     *
+     * @param taskId 任务ID
+     * @return true中断成功 false任务未在执行
+     */
+    boolean interruptJob(String taskId);
+
     /**
      * 检查任务是否存在
      *

+ 64 - 25
src/main/java/com/zskk/qcns/modules/qc/service/impl/QcExecutionServiceImpl.java

@@ -150,7 +150,6 @@ public class QcExecutionServiceImpl implements QcExecutionService {
     private ThreadPoolTaskExecutor qcTaskThreadPool;
 
     @Override
-    @Transactional(rollbackFor = Exception.class)
     public boolean executeQcTask(String taskId) {
         try {
             log.info("开始执行质控任务:taskId={}", taskId);
@@ -162,7 +161,7 @@ public class QcExecutionServiceImpl implements QcExecutionService {
                 return false;
             }
 
-            // 2. 更新任务状态为执行中
+            // 2. 更新任务状态为执行中(独立操作,无需事务包裹)
             task.setStatus(1);
             task.setExecuteTime(LocalDateTime.now());
             qcTaskMapper.updateById(task);
@@ -173,7 +172,6 @@ public class QcExecutionServiceImpl implements QcExecutionService {
             List<StudyInfo> studies = getStudiesToCheck(task);
             log.info("========== 获取到 {} 个待检查数据 ==========", studies.size());
 
-            // 打印每个检查的 study_id
             for (StudyInfo study : studies) {
                 log.info("待检查:studyId={}, studyInstanceUid={}, modality={}",
                     study.getStudyId(), study.getStudyInstanceUid(), study.getModality());
@@ -193,45 +191,75 @@ public class QcExecutionServiceImpl implements QcExecutionService {
             log.info("智能并发控制:总检查数={}, 总影像数={}, 最优并发度={}",
                     studies.size(), totalImages, optimalConcurrency);
 
-            // 5. 使用并行流执行质控检查
+            // 5. 使用线程池执行质控检查(不使用 parallelStream,避免 ForkJoinPool 事务问题)
             int totalCount = studies.size();
             AtomicInteger passCount = new AtomicInteger(0);
             AtomicInteger failCount = new AtomicInteger(0);
             AtomicInteger checkedCount = new AtomicInteger(0);
+            final String institutionId = task.getInstitutionId();
 
-            // 将列表分割为批次,每批大小等于并发度
             List<List<StudyInfo>> batches = partitionList(studies, optimalConcurrency);
 
             for (int batchIndex = 0; batchIndex < batches.size(); batchIndex++) {
+                // 检查当前线程是否被中断(支持停用任务)
+                if (Thread.currentThread().isInterrupted()) {
+                    log.info("质控任务被中断,停止执行:taskId={}", taskId);
+                    updateTaskStatusToFailed(taskId, passCount.get(), failCount.get(), totalCount);
+                    return false;
+                }
+
                 List<StudyInfo> batch = batches.get(batchIndex);
                 log.info("执行批次 [{}/{}]:本批次检查数={}", batchIndex + 1, batches.size(), batch.size());
 
-                // 并行执行当前批次的所有检查
-                batch.parallelStream().forEach(study -> {
-                    int currentCount = checkedCount.incrementAndGet();
-                    log.info("========== 并行执行检查质控 [{}/{}]:studyId={}, studyInstanceUid={} ==========",
-                            currentCount, totalCount, study.getStudyId(), study.getStudyInstanceUid());
+                // 提交当前批次的所有任务到线程池
+                List<Future<?>> futures = new ArrayList<>();
+                for (StudyInfo study : batch) {
+                    Future<?> future = qcTaskThreadPool.submit(() -> {
+                        int currentCount = checkedCount.incrementAndGet();
+                        log.info("========== 并行执行检查质控 [{}/{}]:studyId={}, studyInstanceUid={} ==========",
+                                currentCount, totalCount, study.getStudyId(), study.getStudyInstanceUid());
 
-                    try {
-                        StudyQcResult result = executeStudyQc(taskId, study.getStudyId(), task.getInstitutionId());
+                        try {
+                            StudyQcResult result = executeStudyQc(taskId, study.getStudyId(), institutionId);
 
-                        if (result.isSuccess()) {
-                            passCount.addAndGet(result.getPassCount());
-                            failCount.addAndGet(result.getFailCount());
-                        } else {
+                            if (result.isSuccess()) {
+                                passCount.addAndGet(result.getPassCount());
+                                failCount.addAndGet(result.getFailCount());
+                            } else {
+                                failCount.incrementAndGet();
+                                log.error("检查质控失败:studyId={}, error={}", study.getStudyId(), result.getErrorMessage());
+                            }
+                        } catch (Exception e) {
+                            log.error("执行检查质控异常:studyId={}, error={}", study.getStudyId(), e.getMessage(), e);
                             failCount.incrementAndGet();
-                            log.error("检查质控失败:studyId={}, error={}", study.getStudyId(), result.getErrorMessage());
                         }
-                    } catch (Exception e) {
-                        log.error("执行检查质控异常:studyId={}, error={}", study.getStudyId(), e.getMessage(), e);
-                        failCount.incrementAndGet();
+                    });
+                    futures.add(future);
+                }
+
+                // 等待当前批次完成
+                for (Future<?> future : futures) {
+                    try {
+                        future.get();
+                    } catch (InterruptedException e) {
+                        log.info("质控任务被中断,取消剩余任务:taskId={}", taskId);
+                        // 取消所有未完成的任务
+                        for (Future<?> f : futures) {
+                            f.cancel(true);
+                        }
+                        Thread.currentThread().interrupt();
+                        updateTaskStatusToFailed(taskId, passCount.get(), failCount.get(), totalCount);
+                        return false;
+                    } catch (java.util.concurrent.ExecutionException e) {
+                        log.error("质控子任务执行异常:{}", e.getMessage(), e);
                     }
-                });
+                }
 
                 log.info("批次 [{}/{}] 完成", batchIndex + 1, batches.size());
             }
 
-            // 6. 更新任务状态为已完成
+            // 6. 更新任务状态为已完成(独立操作)
+            task = qcTaskMapper.selectById(taskId);
             task.setStatus(2);
             task.setCompleteTime(LocalDateTime.now());
             task.setTotalCount(totalCount);
@@ -246,16 +274,27 @@ public class QcExecutionServiceImpl implements QcExecutionService {
 
         } catch (Exception e) {
             log.error("执行质控任务失败:taskId={}, error={}", taskId, e.getMessage(), e);
+            updateTaskStatusToFailed(taskId, 0, 0, 0);
+            return false;
+        }
+    }
 
-            // 更新任务状态为失败
+    /**
+     * 更新任务状态为失败
+     */
+    private void updateTaskStatusToFailed(String taskId, int passCount, int failCount, int totalCount) {
+        try {
             QcTask task = qcTaskMapper.selectById(taskId);
             if (task != null) {
                 task.setStatus(3);
                 task.setCompleteTime(LocalDateTime.now());
+                task.setTotalCount(totalCount);
+                task.setPassCount(passCount);
+                task.setFailCount(failCount);
                 qcTaskMapper.updateById(task);
             }
-
-            return false;
+        } catch (Exception ex) {
+            log.error("更新任务失败状态异常:taskId={}", taskId, ex);
         }
     }
 

+ 10 - 0
src/main/java/com/zskk/qcns/modules/qc/service/impl/QcJobServiceImpl.java

@@ -112,6 +112,16 @@ public class QcJobServiceImpl implements QcJobService {
         }
     }
 
+    @Override
+    public boolean interruptJob(String taskId) {
+        try {
+            return scheduler.interrupt(getJobKey(taskId));
+        } catch (UnableToInterruptJobException e) {
+            log.warn("中断任务失败(任务可能未在执行):taskId={}", taskId);
+            return false;
+        }
+    }
+
     @Override
     public boolean checkJobExists(String taskId) {
         try {

+ 2 - 0
src/main/java/com/zskk/qcns/modules/qc/service/impl/QcTaskServiceImpl.java

@@ -378,6 +378,8 @@ public class QcTaskServiceImpl extends ServiceImpl<QcTaskMapper, QcTask> impleme
             if (isEnabled) {
                 qcJobService.resumeJob(taskId);
             } else {
+                // 先中断正在执行的任务,再暂停调度
+                qcJobService.interruptJob(taskId);
                 qcJobService.pauseJob(taskId);
             }
         }