# 数据同步核心代码实现 ## 目录结构 ``` com.zskk.pacsonline ├── modules │ └── sync │ ├── entity # 实体类 │ │ ├── SyncConfig.java │ │ ├── SyncLog.java │ │ ├── DataChangeLog.java │ │ └── TenantInfo.java │ ├── dto # 数据传输对象 │ │ ├── SyncDataRequest.java │ │ ├── SyncDataItem.java │ │ └── DataReceiveResult.java │ ├── mapper # Mapper接口 │ │ ├── SyncConfigMapper.java │ │ ├── SyncLogMapper.java │ │ ├── DataChangeLogMapper.java │ │ └── TenantInfoMapper.java │ ├── service # 服务接口 │ │ ├── SyncConfigService.java │ │ ├── SyncLogService.java │ │ ├── DataChangeLogService.java │ │ ├── DataSyncService.java │ │ └── DataReceiveService.java │ ├── service.impl # 服务实现 │ │ ├── SyncConfigServiceImpl.java │ │ ├── SyncLogServiceImpl.java │ │ ├── DataChangeLogServiceImpl.java │ │ ├── DataSyncServiceImpl.java │ │ └── DataReceiveServiceImpl.java │ ├── controller # 控制器 │ │ ├── SyncController.java (公网端) │ │ └── SyncManageController.java (内网端) │ ├── aop # AOP拦截器 │ │ └── DataChangeCaptureAop.java │ └── scheduler # 定时任务 │ └── DataSyncScheduler.java └── utils ├── DataEncryptUtil.java # 数据加密工具 └── HttpClientUtil.java # HTTP客户端工具 ``` --- ## 1. 实体类 (Entity) ### 1.1 SyncConfig.java ```java package com.zskk.pacsonline.modules.sync.entity; import com.baomidou.mybatisplus.annotation.*; import lombok.Data; import java.time.LocalDateTime; /** * 数据同步配置表 */ @Data @TableName("sync_config") public class SyncConfig { @TableId(type = IdType.AUTO) private Long id; /** * 租户ID(唯一标识) */ private String tenantId; /** * 租户名称 */ private String tenantName; /** * 同步模式:auto-自动,manual-手动,schedule-定时 */ private String syncMode; /** * 同步间隔(秒) */ private Integer syncInterval; /** * Cron表达式 */ private String cronExpression; /** * 需要同步的表名列表(JSON数组) */ private String syncTables; /** * 公网API地址 */ private String cloudApiUrl; /** * 访问令牌 */ private String accessToken; /** * 数据加密密钥 */ private String secretKey; /** * 是否启用:0-禁用,1-启用 */ private Integer enabled; /** * 失败重试次数 */ private Integer retryTimes; /** * 请求超时时间(毫秒) */ private Integer timeout; /** * 最后同步时间 */ private LocalDateTime lastSyncTime; /** * 备注 */ private String remark; /** * 创建时间 */ @TableField(fill = FieldFill.INSERT) private LocalDateTime createTime; /** * 更新时间 */ @TableField(fill = FieldFill.INSERT_UPDATE) private LocalDateTime updateTime; } ``` ### 1.2 SyncLog.java ```java package com.zskk.pacsonline.modules.sync.entity; import com.baomidou.mybatisplus.annotation.*; import lombok.Data; import java.time.LocalDateTime; /** * 数据同步日志表 */ @Data @TableName("sync_log") public class SyncLog { @TableId(type = IdType.AUTO) private Long id; /** * 租户ID */ private String tenantId; /** * 批次ID */ private String batchId; /** * 同步的表名 */ private String tableName; /** * 操作类型 */ private String operationType; /** * 本次同步记录数 */ private Integer recordCount; /** * 成功数量 */ private Integer successCount; /** * 失败数量 */ private Integer failCount; /** * 同步状态:pending/processing/success/failed */ private String syncStatus; /** * 开始时间 */ private LocalDateTime startTime; /** * 结束时间 */ private LocalDateTime endTime; /** * 耗时(毫秒) */ private Long useTime; /** * 错误信息 */ private String errorMsg; /** * 数据快照(JSON格式) */ private String dataSnapshot; /** * 重试次数 */ private Integer retryCount; /** * 备注 */ private String remark; /** * 创建时间 */ @TableField(fill = FieldFill.INSERT) private LocalDateTime createTime; } ``` ### 1.3 DataChangeLog.java ```java package com.zskk.pacsonline.modules.sync.entity; import com.baomidou.mybatisplus.annotation.*; import lombok.Data; import java.time.LocalDateTime; /** * 数据变更记录表 */ @Data @TableName("data_change_log") public class DataChangeLog { @TableId(type = IdType.AUTO) private Long id; /** * 表名 */ private String tableName; /** * 主键值 */ private String primaryKey; /** * 操作类型:INSERT/UPDATE/DELETE */ private String operationType; /** * 变更前数据(JSON) */ private String beforeData; /** * 变更后数据(JSON) */ private String afterData; /** * 变更时间 */ @TableField(fill = FieldFill.INSERT) private LocalDateTime changeTime; /** * 同步状态:0-未同步,1-已同步,2-同步失败 */ private Byte syncStatus; /** * 同步时间 */ private LocalDateTime syncTime; /** * 同步批次ID */ private String syncBatchId; /** * 操作人ID */ private Long operatorId; /** * 操作人姓名 */ private String operatorName; } ``` ### 1.4 TenantInfo.java ```java package com.zskk.pacsonline.modules.sync.entity; import com.baomidou.mybatisplus.annotation.*; import lombok.Data; import java.time.LocalDateTime; /** * 租户信息表(公网端) */ @Data @TableName("tenant_info") public class TenantInfo { @TableId(type = IdType.AUTO) private Long id; /** * 租户唯一标识 */ private String tenantId; /** * 租户名称 */ private String tenantName; /** * 租户类型:intranet-内网,cloud-公网 */ private String tenantType; /** * 访问令牌 */ private String accessToken; /** * 数据加密密钥 */ private String secretKey; /** * IP白名单(JSON数组) */ private String ipWhitelist; /** * 最大存储空间(字节) */ private Long maxStorage; /** * 已用存储空间(字节) */ private Long usedStorage; /** * 状态:0-禁用,1-启用 */ private Integer status; /** * 过期时间 */ private LocalDateTime expireTime; /** * 最后同步时间 */ private LocalDateTime lastSyncTime; /** * 联系人 */ private String contactPerson; /** * 联系电话 */ private String contactPhone; /** * 联系邮箱 */ private String contactEmail; /** * 备注 */ private String remark; /** * 创建时间 */ @TableField(fill = FieldFill.INSERT) private LocalDateTime createTime; /** * 更新时间 */ @TableField(fill = FieldFill.INSERT_UPDATE) private LocalDateTime updateTime; } ``` --- ## 2. 数据传输对象 (DTO) ### 2.1 SyncDataRequest.java ```java package com.zskk.pacsonline.modules.sync.dto; import lombok.Data; import java.util.List; /** * 数据同步请求对象 */ @Data public class SyncDataRequest { /** * 租户ID */ private String tenantId; /** * 表名 */ private String tableName; /** * 变更记录列表 */ private List changeLogs; } ``` ### 2.2 SyncDataItem.java ```java package com.zskk.pacsonline.modules.sync.dto; import lombok.Data; import java.time.LocalDateTime; /** * 同步数据项 */ @Data public class SyncDataItem { /** * 主键值 */ private String primaryKey; /** * 操作类型:INSERT/UPDATE/DELETE */ private String operationType; /** * 变更前数据(JSON) */ private String beforeData; /** * 变更后数据(JSON) */ private String afterData; /** * 变更时间 */ private LocalDateTime changeTime; } ``` ### 2.3 DataReceiveResult.java ```java package com.zskk.pacsonline.modules.sync.dto; import lombok.Data; import java.util.ArrayList; import java.util.List; /** * 数据接收结果 */ @Data public class DataReceiveResult { /** * 成功数量 */ private Integer successCount = 0; /** * 失败数量 */ private Integer failCount = 0; /** * 失败的数据项 */ private List failedItems = new ArrayList<>(); /** * 添加失败项 */ public void addFailedItem(SyncDataItem item, String errorMsg) { FailedItem failedItem = new FailedItem(); failedItem.setItem(item); failedItem.setErrorMsg(errorMsg); this.failedItems.add(failedItem); } @Data public static class FailedItem { private SyncDataItem item; private String errorMsg; } } ``` --- ## 3. Mapper接口 ### 3.1 SyncConfigMapper.java ```java package com.zskk.pacsonline.modules.sync.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.zskk.pacsonline.modules.sync.entity.SyncConfig; import org.apache.ibatis.annotations.Mapper; @Mapper public interface SyncConfigMapper extends BaseMapper { } ``` ### 3.2 SyncLogMapper.java ```java package com.zskk.pacsonline.modules.sync.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.zskk.pacsonline.modules.sync.entity.SyncLog; import org.apache.ibatis.annotations.Mapper; @Mapper public interface SyncLogMapper extends BaseMapper { } ``` ### 3.3 DataChangeLogMapper.java ```java package com.zskk.pacsonline.modules.sync.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.zskk.pacsonline.modules.sync.entity.DataChangeLog; import org.apache.ibatis.annotations.Mapper; @Mapper public interface DataChangeLogMapper extends BaseMapper { } ``` ### 3.4 TenantInfoMapper.java ```java package com.zskk.pacsonline.modules.sync.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.zskk.pacsonline.modules.sync.entity.TenantInfo; import org.apache.ibatis.annotations.Mapper; @Mapper public interface TenantInfoMapper extends BaseMapper { } ``` --- ## 4. Service接口 ### 4.1 DataChangeLogService.java ```java package com.zskk.pacsonline.modules.sync.service; import com.baomidou.mybatisplus.extension.service.IService; import com.zskk.pacsonline.modules.sync.entity.DataChangeLog; /** * 数据变更记录服务 */ public interface DataChangeLogService extends IService { /** * 记录数据变更(异步) */ void recordChangeAsync(String tableName, String operationType, Object entity); /** * 检查是否有未同步的数据 */ boolean hasUnsyncedData(); } ``` ### 4.2 DataSyncService.java ```java package com.zskk.pacsonline.modules.sync.service; import com.zskk.pacsonline.modules.sync.entity.SyncConfig; /** * 数据同步服务(内网端) */ public interface DataSyncService { /** * 执行数据同步 */ void syncData(SyncConfig config); /** * 手动触发同步 */ void manualSync(); /** * 检查是否有未同步的数据 */ boolean hasUnsyncedData(); } ``` ### 4.3 DataReceiveService.java ```java package com.zskk.pacsonline.modules.sync.service; import com.zskk.pacsonline.modules.sync.dto.DataReceiveResult; import com.zskk.pacsonline.modules.sync.dto.SyncDataRequest; import com.zskk.pacsonline.modules.sync.entity.TenantInfo; /** * 数据接收服务(公网端) */ public interface DataReceiveService { /** * 接收并保存数据 */ DataReceiveResult receiveData(TenantInfo tenant, SyncDataRequest request); } ``` --- ## 5. Service实现(关键代码) ### 5.1 DataChangeLogServiceImpl.java ```java package com.zskk.pacsonline.modules.sync.service.impl; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.zskk.pacsonline.modules.sync.entity.DataChangeLog; import com.zskk.pacsonline.modules.sync.mapper.DataChangeLogMapper; import com.zskk.pacsonline.modules.sync.service.DataChangeLogService; import com.zskk.pacsonline.security.LoginUser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.util.Arrays; import java.util.List; /** * 数据变更记录服务实现 */ @Service public class DataChangeLogServiceImpl extends ServiceImpl implements DataChangeLogService { private static final Logger logger = LoggerFactory.getLogger(DataChangeLogServiceImpl.class); @Value("${sync.enabled:false}") private Boolean syncEnabled; @Value("${sync.sync-tables:}") private String syncTables; /** * 记录数据变更(异步) */ @Override @Async public void recordChangeAsync(String tableName, String operationType, Object entity) { // 检查是否启用同步 if (!syncEnabled) { return; } // 检查表是否在同步列表中 if (!isTableSyncEnabled(tableName)) { return; } try { DataChangeLog changeLog = new DataChangeLog(); changeLog.setTableName(tableName); changeLog.setOperationType(operationType); // 获取主键值 String primaryKey = extractPrimaryKey(entity); changeLog.setPrimaryKey(primaryKey); // 记录数据 String jsonData = JSON.toJSONString(entity); if ("INSERT".equals(operationType)) { changeLog.setAfterData(jsonData); } else if ("UPDATE".equals(operationType)) { changeLog.setAfterData(jsonData); // TODO: 获取变更前的数据(需要在AOP中传递) } else if ("DELETE".equals(operationType)) { changeLog.setBeforeData(jsonData); } changeLog.setSyncStatus((byte) 0); // 未同步 changeLog.setChangeTime(LocalDateTime.now()); // 获取操作人信息 try { Authentication authentication = SecurityContextHolder.getContext().getAuthentication(); if (authentication != null && authentication.getPrincipal() instanceof LoginUser) { LoginUser loginUser = (LoginUser) authentication.getPrincipal(); changeLog.setOperatorId(loginUser.getUser().getId()); changeLog.setOperatorName(loginUser.getUser().getUsername()); } } catch (Exception e) { logger.warn("获取操作人信息失败", e); } // 保存变更记录 this.save(changeLog); logger.debug("记录数据变更:表={}, 操作={}, 主键={}", tableName, operationType, primaryKey); } catch (Exception e) { logger.error("记录数据变更失败", e); } } /** * 检查表是否启用同步 */ private boolean isTableSyncEnabled(String tableName) { if (syncTables == null || syncTables.trim().isEmpty()) { return false; } // 解析JSON数组 try { List tables = JSON.parseArray(syncTables, String.class); return tables.contains(tableName); } catch (Exception e) { // 简单逗号分隔 String[] tables = syncTables.split(","); return Arrays.asList(tables).contains(tableName); } } /** * 提取主键值 */ private String extractPrimaryKey(Object entity) { try { // 反射获取id字段 java.lang.reflect.Field idField = entity.getClass().getDeclaredField("id"); idField.setAccessible(true); Object id = idField.get(entity); return id != null ? id.toString() : ""; } catch (Exception e) { logger.warn("提取主键失败", e); return ""; } } /** * 检查是否有未同步的数据 */ @Override public boolean hasUnsyncedData() { Long count = this.count(new LambdaQueryWrapper() .eq(DataChangeLog::getSyncStatus, 0)); return count > 0; } } ``` ### 5.2 DataSyncServiceImpl.java(内网端核心) ```java package com.zskk.pacsonline.modules.sync.service.impl; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.zskk.pacsonline.component.response.RestResult; import com.zskk.pacsonline.modules.sync.dto.DataReceiveResult; import com.zskk.pacsonline.modules.sync.dto.SyncDataItem; import com.zskk.pacsonline.modules.sync.dto.SyncDataRequest; import com.zskk.pacsonline.modules.sync.entity.DataChangeLog; import com.zskk.pacsonline.modules.sync.entity.SyncConfig; import com.zskk.pacsonline.modules.sync.entity.SyncLog; import com.zskk.pacsonline.modules.sync.mapper.DataChangeLogMapper; import com.zskk.pacsonline.modules.sync.service.DataChangeLogService; import com.zskk.pacsonline.modules.sync.service.DataSyncService; import com.zskk.pacsonline.modules.sync.service.SyncConfigService; import com.zskk.pacsonline.modules.sync.service.SyncLogService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.*; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.client.RestTemplate; import java.time.Duration; import java.time.LocalDateTime; import java.util.*; import java.util.stream.Collectors; /** * 数据同步服务实现(内网端) */ @Service public class DataSyncServiceImpl implements DataSyncService { private static final Logger logger = LoggerFactory.getLogger(DataSyncServiceImpl.class); @Autowired private DataChangeLogMapper dataChangeLogMapper; @Autowired private DataChangeLogService dataChangeLogService; @Autowired private SyncConfigService syncConfigService; @Autowired private SyncLogService syncLogService; @Autowired private RestTemplate restTemplate; /** * 执行数据同步 */ @Override @Transactional public void syncData(SyncConfig config) { String batchId = UUID.randomUUID().toString().replace("-", ""); logger.info("====== 开始同步数据 ======"); logger.info("租户:{}", config.getTenantId()); logger.info("批次:{}", batchId); try { // 1. 查询未同步的数据变更记录 List changeLogs = dataChangeLogMapper.selectList( new LambdaQueryWrapper() .eq(DataChangeLog::getSyncStatus, 0) .orderByAsc(DataChangeLog::getChangeTime) .last("LIMIT 1000") // 每批次最多1000条 ); if (changeLogs.isEmpty()) { logger.info("没有需要同步的数据"); return; } logger.info("待同步数据:{} 条", changeLogs.size()); // 2. 按表名分组 Map> groupedLogs = changeLogs.stream() .collect(Collectors.groupingBy(DataChangeLog::getTableName)); // 3. 逐表同步 int totalSuccess = 0; int totalFail = 0; for (Map.Entry> entry : groupedLogs.entrySet()) { String tableName = entry.getKey(); List logs = entry.getValue(); logger.info("同步表:{}, 记录数:{}", tableName, logs.size()); int[] result = syncTable(config, batchId, tableName, logs); totalSuccess += result[0]; totalFail += result[1]; } // 4. 更新最后同步时间 config.setLastSyncTime(LocalDateTime.now()); syncConfigService.updateById(config); logger.info("====== 同步完成 ======"); logger.info("成功:{} 条, 失败:{} 条", totalSuccess, totalFail); } catch (Exception e) { logger.error("同步失败,批次:" + batchId, e); } } /** * 同步单个表的数据 * @return [成功数, 失败数] */ private int[] syncTable(SyncConfig config, String batchId, String tableName, List logs) { SyncLog syncLog = new SyncLog(); syncLog.setTenantId(config.getTenantId()); syncLog.setBatchId(batchId); syncLog.setTableName(tableName); syncLog.setRecordCount(logs.size()); syncLog.setSyncStatus("processing"); syncLog.setStartTime(LocalDateTime.now()); syncLog.setRetryCount(0); syncLogService.save(syncLog); int successCount = 0; int failCount = 0; List failedLogs = new ArrayList<>(); try { // 构建同步数据请求 SyncDataRequest request = new SyncDataRequest(); request.setTenantId(config.getTenantId()); request.setTableName(tableName); request.setChangeLogs(logs.stream().map(log -> { SyncDataItem item = new SyncDataItem(); item.setPrimaryKey(log.getPrimaryKey()); item.setOperationType(log.getOperationType()); item.setBeforeData(log.getBeforeData()); item.setAfterData(log.getAfterData()); item.setChangeTime(log.getChangeTime()); return item; }).collect(Collectors.toList())); // 发送HTTP请求到公网 HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); headers.setBearerAuth(config.getAccessToken()); HttpEntity entity = new HttpEntity<>(request, headers); String url = config.getCloudApiUrl() + "/api/sync/receiveData"; logger.info("发送请求:{}", url); ResponseEntity response = restTemplate.exchange( url, HttpMethod.POST, entity, RestResult.class ); logger.info("响应状态:{}", response.getStatusCode()); if (response.getStatusCode() == HttpStatus.OK && response.getBody() != null && response.getBody().getCode() == 200) { // 同步成功,更新状态 successCount = logs.size(); for (DataChangeLog log : logs) { log.setSyncStatus((byte) 1); log.setSyncTime(LocalDateTime.now()); log.setSyncBatchId(batchId); dataChangeLogMapper.updateById(log); } syncLog.setSyncStatus("success"); logger.info("同步成功:{} 条", successCount); } else { // 同步失败 failCount = logs.size(); failedLogs.addAll(logs); syncLog.setSyncStatus("failed"); syncLog.setErrorMsg("公网返回错误:" + (response.getBody() != null ? response.getBody().getMessage() : "未知错误")); logger.error("同步失败:{}", syncLog.getErrorMsg()); } } catch (Exception e) { logger.error("同步表 " + tableName + " 失败", e); failCount = logs.size(); failedLogs.addAll(logs); // 标记为同步失败 for (DataChangeLog log : logs) { log.setSyncStatus((byte) 2); dataChangeLogMapper.updateById(log); } syncLog.setSyncStatus("failed"); syncLog.setErrorMsg(e.getMessage()); } // 更新同步日志 syncLog.setSuccessCount(successCount); syncLog.setFailCount(failCount); syncLog.setEndTime(LocalDateTime.now()); syncLog.setUseTime( Duration.between(syncLog.getStartTime(), syncLog.getEndTime()).toMillis() ); if (!failedLogs.isEmpty()) { syncLog.setDataSnapshot(JSON.toJSONString(failedLogs)); } syncLogService.updateById(syncLog); return new int[]{successCount, failCount}; } /** * 手动触发同步 */ @Override public void manualSync() { SyncConfig config = syncConfigService.getOne( new LambdaQueryWrapper() .eq(SyncConfig::getEnabled, 1) .last("LIMIT 1") ); if (config == null) { logger.warn("未找到启用的同步配置"); return; } syncData(config); } /** * 检查是否有未同步的数据 */ @Override public boolean hasUnsyncedData() { return dataChangeLogService.hasUnsyncedData(); } } ``` ### 5.3 DataReceiveServiceImpl.java(公网端核心) ```java package com.zskk.pacsonline.modules.sync.service.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.zskk.pacsonline.modules.sync.dto.DataReceiveResult; import com.zskk.pacsonline.modules.sync.dto.SyncDataItem; import com.zskk.pacsonline.modules.sync.dto.SyncDataRequest; import com.zskk.pacsonline.modules.sync.entity.TenantInfo; import com.zskk.pacsonline.modules.sync.service.DataReceiveService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.ArrayList; import java.util.List; /** * 数据接收服务实现(公网端) */ @Service public class DataReceiveServiceImpl implements DataReceiveService { private static final Logger logger = LoggerFactory.getLogger(DataReceiveServiceImpl.class); @Autowired private JdbcTemplate jdbcTemplate; /** * 接收并保存数据 */ @Override @Transactional public DataReceiveResult receiveData(TenantInfo tenant, SyncDataRequest request) { logger.info("接收数据:租户={}, 表={}, 记录数={}", tenant.getTenantId(), request.getTableName(), request.getChangeLogs().size()); DataReceiveResult result = new DataReceiveResult(); result.setSuccessCount(0); result.setFailCount(0); String tableName = request.getTableName(); // 多租户数据隔离:添加租户前缀 String targetTableName = tenant.getTenantId() + "_" + tableName; // 确保目标表存在 ensureTableExists(targetTableName, tableName); // 处理每条数据 for (SyncDataItem item : request.getChangeLogs()) { try { switch (item.getOperationType()) { case "INSERT": insertData(targetTableName, item.getAfterData()); break; case "UPDATE": updateData(targetTableName, item.getPrimaryKey(), item.getAfterData()); break; case "DELETE": deleteData(targetTableName, item.getPrimaryKey()); break; default: logger.warn("未知的操作类型:{}", item.getOperationType()); continue; } result.setSuccessCount(result.getSuccessCount() + 1); } catch (Exception e) { logger.error("处理数据失败:" + item, e); result.setFailCount(result.getFailCount() + 1); result.addFailedItem(item, e.getMessage()); } } logger.info("接收完成:成功={}, 失败={}", result.getSuccessCount(), result.getFailCount()); return result; } /** * 确保目标表存在 */ private void ensureTableExists(String targetTableName, String sourceTableName) { String checkSql = "SELECT COUNT(*) FROM information_schema.tables " + "WHERE table_schema = DATABASE() AND table_name = ?"; Integer count = jdbcTemplate.queryForObject(checkSql, Integer.class, targetTableName); if (count == null || count == 0) { // 表不存在,根据源表结构创建 String createSql = "CREATE TABLE " + targetTableName + " LIKE " + sourceTableName; jdbcTemplate.execute(createSql); logger.info("创建租户表:{}", targetTableName); } } /** * 插入数据 */ private void insertData(String tableName, String jsonData) { if (jsonData == null) { return; } JSONObject data = JSON.parseObject(jsonData); StringBuilder columns = new StringBuilder(); StringBuilder values = new StringBuilder(); List params = new ArrayList<>(); for (String key : data.keySet()) { if (columns.length() > 0) { columns.append(", "); values.append(", "); } columns.append("`").append(key).append("`"); values.append("?"); params.add(data.get(key)); } String sql = "INSERT INTO " + tableName + " (" + columns + ") VALUES (" + values + ")"; jdbcTemplate.update(sql, params.toArray()); logger.debug("插入数据:表={}, SQL={}", tableName, sql); } /** * 更新数据 */ private void updateData(String tableName, String primaryKey, String jsonData) { if (jsonData == null) { return; } JSONObject data = JSON.parseObject(jsonData); StringBuilder sets = new StringBuilder(); List params = new ArrayList<>(); Object pkValue = null; for (String key : data.keySet()) { if ("id".equals(key)) { pkValue = data.get(key); continue; } if (sets.length() > 0) { sets.append(", "); } sets.append("`").append(key).append("` = ?"); params.add(data.get(key)); } params.add(pkValue); String sql = "UPDATE " + tableName + " SET " + sets + " WHERE id = ?"; jdbcTemplate.update(sql, params.toArray()); logger.debug("更新数据:表={}, 主键={}", tableName, primaryKey); } /** * 删除数据 */ private void deleteData(String tableName, String primaryKey) { String sql = "DELETE FROM " + tableName + " WHERE id = ?"; jdbcTemplate.update(sql, primaryKey); logger.debug("删除数据:表={}, 主键={}", tableName, primaryKey); } } ``` --- ## 6. 控制器 ### 6.1 SyncController.java(公网端) ```java package com.zskk.pacsonline.modules.sync.controller; import com.zskk.pacsonline.component.response.RestResult; import com.zskk.pacsonline.modules.sync.dto.DataReceiveResult; import com.zskk.pacsonline.modules.sync.dto.SyncDataRequest; import com.zskk.pacsonline.modules.sync.entity.TenantInfo; import com.zskk.pacsonline.modules.sync.service.DataReceiveService; import com.zskk.pacsonline.modules.sync.service.TenantInfoService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import java.time.LocalDateTime; import java.util.HashMap; import java.util.Map; /** * 数据同步控制器(公网端) */ @RestController @RequestMapping("/api/sync") public class SyncController { private static final Logger logger = LoggerFactory.getLogger(SyncController.class); @Autowired private TenantInfoService tenantInfoService; @Autowired private DataReceiveService dataReceiveService; /** * 接收内网同步的数据 */ @PostMapping("/receiveData") public RestResult receiveData(@RequestBody SyncDataRequest request, @RequestHeader("Authorization") String token) { try { // 1. 验证令牌 if (token == null || !token.startsWith("Bearer ")) { return RestResult.error("无效的访问令牌"); } String accessToken = token.substring(7); // 2. 验证租户 TenantInfo tenant = tenantInfoService.validateToken(request.getTenantId(), accessToken); if (tenant == null) { logger.warn("租户验证失败:{}", request.getTenantId()); return RestResult.error("租户验证失败"); } if (tenant.getStatus() == 0) { return RestResult.error("租户已被禁用"); } // 3. 检查存储空间 if (tenant.getMaxStorage() != null && tenant.getUsedStorage() != null && tenant.getUsedStorage() >= tenant.getMaxStorage()) { return RestResult.error("存储空间已满"); } // 4. 接收并保存数据 DataReceiveResult result = dataReceiveService.receiveData(tenant, request); // 5. 更新租户最后同步时间 tenant.setLastSyncTime(LocalDateTime.now()); tenantInfoService.updateById(tenant); logger.info("接收数据成功:租户={}, 表={}, 记录数={}, 成功={}, 失败={}", request.getTenantId(), request.getTableName(), request.getChangeLogs().size(), result.getSuccessCount(), result.getFailCount()); return RestResult.ok("同步成功", result); } catch (Exception e) { logger.error("接收数据失败", e); return RestResult.error("同步失败:" + e.getMessage()); } } /** * 查询同步状态 */ @GetMapping("/status/{tenantId}") public RestResult getSyncStatus(@PathVariable String tenantId, @RequestHeader("Authorization") String token) { // 验证租户 String accessToken = token.substring(7); TenantInfo tenant = tenantInfoService.validateToken(tenantId, accessToken); if (tenant == null) { return RestResult.error("租户验证失败"); } // 查询同步状态 Map status = new HashMap<>(); status.put("tenantId", tenant.getTenantId()); status.put("tenantName", tenant.getTenantName()); status.put("lastSyncTime", tenant.getLastSyncTime()); status.put("usedStorage", tenant.getUsedStorage()); status.put("maxStorage", tenant.getMaxStorage()); status.put("status", tenant.getStatus()); return RestResult.ok(status); } } ``` ### 6.2 SyncManageController.java(内网端) ```java package com.zskk.pacsonline.modules.sync.controller; import com.zskk.pacsonline.component.response.RestResult; import com.zskk.pacsonline.modules.sync.service.DataSyncService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; /** * 数据同步管理控制器(内网端) */ @RestController @RequestMapping("/api/syncManage") public class SyncManageController { @Autowired private DataSyncService dataSyncService; /** * 手动触发同步 */ @PostMapping("/manual") public RestResult manualSync() { try { dataSyncService.manualSync(); return RestResult.ok("同步任务已启动"); } catch (Exception e) { return RestResult.error("同步失败:" + e.getMessage()); } } /** * 检查未同步数据 */ @GetMapping("/checkUnsynced") public RestResult checkUnsynced() { boolean hasUnsynced = dataSyncService.hasUnsyncedData(); return RestResult.ok(hasUnsynced ? "有未同步的数据" : "所有数据已同步"); } } ``` --- ## 7. AOP拦截器 ### 7.1 DataChangeCaptureAop.java ```java package com.zskk.pacsonline.modules.sync.aop; import com.zskk.pacsonline.modules.sync.service.DataChangeLogService; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 数据变更捕获AOP * 拦截所有Mapper的insert/update/delete操作 */ @Aspect @Component public class DataChangeCaptureAop { private static final Logger logger = LoggerFactory.getLogger(DataChangeCaptureAop.class); @Autowired private DataChangeLogService dataChangeLogService; /** * 拦截Mapper的insert/update/delete方法 */ @Around("execution(* com.zskk.pacsonline.modules.*.mapper.*Mapper.insert*(..)) || " + "execution(* com.zskk.pacsonline.modules.*.mapper.*Mapper.update*(..)) || " + "execution(* com.zskk.pacsonline.modules.*.mapper.*Mapper.delete*(..))") public Object captureDataChange(ProceedingJoinPoint jp) throws Throwable { // 判断操作类型 String methodName = jp.getSignature().getName(); String operationType = getOperationType(methodName); // 获取表名 String tableName = getTableName(jp); // 获取实体对象 Object entity = getEntity(jp.getArgs()); // 执行原方法 Object result = jp.proceed(); // 记录变更日志(异步) if (entity != null && tableName != null) { try { dataChangeLogService.recordChangeAsync(tableName, operationType, entity); } catch (Exception e) { logger.error("记录数据变更失败", e); } } return result; } /** * 获取操作类型 */ private String getOperationType(String methodName) { if (methodName.startsWith("insert")) { return "INSERT"; } else if (methodName.startsWith("update")) { return "UPDATE"; } else if (methodName.startsWith("delete")) { return "DELETE"; } return "UNKNOWN"; } /** * 获取表名 */ private String getTableName(ProceedingJoinPoint jp) { try { // 从Mapper接口名推断表名 String className = jp.getTarget().getClass().getSimpleName(); // 例如:SysUserMapper -> sys_user String entityName = className.replace("Mapper", "") .replaceAll("([A-Z])", "_$1") .toLowerCase() .substring(1); return entityName; } catch (Exception e) { logger.warn("获取表名失败", e); return null; } } /** * 获取实体对象 */ private Object getEntity(Object[] args) { if (args == null || args.length == 0) { return null; } // 第一个参数通常是实体对象 for (Object arg : args) { if (arg != null && !isPrimitiveOrWrapper(arg.getClass())) { return arg; } } return null; } /** * 判断是否是基本类型或包装类 */ private boolean isPrimitiveOrWrapper(Class clazz) { return clazz.isPrimitive() || clazz == String.class || clazz == Integer.class || clazz == Long.class || clazz == Double.class || clazz == Float.class || clazz == Boolean.class || clazz == Byte.class || clazz == Short.class || clazz == Character.class; } } ``` --- ## 8. 定时任务 ### 8.1 DataSyncScheduler.java ```java package com.zskk.pacsonline.modules.sync.scheduler; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.zskk.pacsonline.modules.sync.entity.SyncConfig; import com.zskk.pacsonline.modules.sync.service.DataSyncService; import com.zskk.pacsonline.modules.sync.service.SyncConfigService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.util.List; import java.util.concurrent.CompletableFuture; /** * 数据同步定时任务 */ @Component public class DataSyncScheduler { private static final Logger logger = LoggerFactory.getLogger(DataSyncScheduler.class); @Autowired private SyncConfigService syncConfigService; @Autowired private DataSyncService dataSyncService; /** * 定时检查并执行同步任务 * 每分钟检查一次 */ @Scheduled(cron = "0 * * * * ?") public void checkAndSync() { logger.debug("检查同步任务..."); // 获取启用的同步配置 List configs = syncConfigService.list( new LambdaQueryWrapper() .eq(SyncConfig::getEnabled, 1) ); if (configs.isEmpty()) { return; } for (SyncConfig config : configs) { // 判断是否需要同步 if (shouldSync(config)) { // 异步执行同步任务 CompletableFuture.runAsync(() -> { try { dataSyncService.syncData(config); } catch (Exception e) { logger.error("同步任务执行失败", e); } }); } } } /** * 判断是否需要同步 */ private boolean shouldSync(SyncConfig config) { String syncMode = config.getSyncMode(); if ("manual".equals(syncMode)) { return false; // 手动模式不自动同步 } if ("auto".equals(syncMode)) { // 实时模式:检查是否有未同步的数据 return dataSyncService.hasUnsyncedData(); } if ("schedule".equals(syncMode)) { // 定时模式:判断是否到达同步时间 LocalDateTime lastSyncTime = config.getLastSyncTime(); if (lastSyncTime == null) { return true; // 从未同步过 } Integer interval = config.getSyncInterval(); if (interval == null || interval <= 0) { return false; } LocalDateTime nextSyncTime = lastSyncTime.plusSeconds(interval); return LocalDateTime.now().isAfter(nextSyncTime); } return false; } } ``` --- ## 9. 工具类 ### 9.1 DataEncryptUtil.java ```java package com.zskk.pacsonline.utils; import javax.crypto.Cipher; import javax.crypto.spec.SecretKeySpec; import java.util.Base64; /** * 数据加密工具类 */ public class DataEncryptUtil { private static final String ALGORITHM = "AES"; private static final String TRANSFORMATION = "AES/ECB/PKCS5Padding"; /** * AES加密 */ public static String encrypt(String data, String secretKey) { try { // 密钥长度必须是16/24/32字节 byte[] keyBytes = secretKey.getBytes(); if (keyBytes.length != 16 && keyBytes.length != 24 && keyBytes.length != 32) { throw new IllegalArgumentException("密钥长度必须是16、24或32字节"); } SecretKeySpec key = new SecretKeySpec(keyBytes, ALGORITHM); Cipher cipher = Cipher.getInstance(TRANSFORMATION); cipher.init(Cipher.ENCRYPT_MODE, key); byte[] encrypted = cipher.doFinal(data.getBytes("UTF-8")); return Base64.getEncoder().encodeToString(encrypted); } catch (Exception e) { throw new RuntimeException("加密失败", e); } } /** * AES解密 */ public static String decrypt(String encryptedData, String secretKey) { try { byte[] keyBytes = secretKey.getBytes(); if (keyBytes.length != 16 && keyBytes.length != 24 && keyBytes.length != 32) { throw new IllegalArgumentException("密钥长度必须是16、24或32字节"); } SecretKeySpec key = new SecretKeySpec(keyBytes, ALGORITHM); Cipher cipher = Cipher.getInstance(TRANSFORMATION); cipher.init(Cipher.DECRYPT_MODE, key); byte[] decrypted = cipher.doFinal(Base64.getDecoder().decode(encryptedData)); return new String(decrypted, "UTF-8"); } catch (Exception e) { throw new RuntimeException("解密失败", e); } } /** * 生成随机密钥(32字节) */ public static String generateKey() { String chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; StringBuilder key = new StringBuilder(); for (int i = 0; i < 32; i++) { int index = (int) (Math.random() * chars.length()); key.append(chars.charAt(index)); } return key.toString(); } } ``` --- ## 10. 配置类 ### 10.1 RestTemplateConfig.java ```java package com.zskk.pacsonline.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.client.SimpleClientHttpRequestFactory; import org.springframework.web.client.RestTemplate; /** * RestTemplate配置 */ @Configuration public class RestTemplateConfig { @Bean public RestTemplate restTemplate() { SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory(); factory.setConnectTimeout(5000); // 连接超时5秒 factory.setReadTimeout(30000); // 读取超时30秒 return new RestTemplate(factory); } } ``` --- ## 11. 使用说明 ### 11.1 内网端部署步骤 1. **添加依赖**(pom.xml已包含) 2. **配置application.yml** ```yaml sync: enabled: true tenant-id: tenant_001 tenant-name: 内网环境A cloud-api-url: https://cloud.example.com access-token: your_jwt_token_here sync-mode: auto sync-interval: 300 sync-tables: '["sys_user","sys_role","sys_menu"]' ``` 3. **执行SQL脚本** ```bash mysql -u root -p your_database < doc/sql/sync_tables.sql ``` 4. **启动应用** - 数据变更会自动记录到`data_change_log`表 - 定时任务每分钟检查一次,自动同步到公网 ### 11.2 公网端部署步骤 1. **配置application.yml** ```yaml multi-tenant: enabled: true ``` 2. **执行SQL脚本,创建租户** ```sql INSERT INTO tenant_info (tenant_id, tenant_name, access_token, status) VALUES ('tenant_001', '内网环境A', 'your_jwt_token_here', 1); ``` 3. **启动应用** - 等待内网端推送数据 - 数据会自动保存到`tenant_001_表名`的表中 --- **文档版本**:v1.0 **最后更新**:2025-10-30