com.zskk.pacsonline
├── modules
│ └── sync
│ ├── entity # 实体类
│ │ ├── SyncConfig.java
│ │ ├── SyncLog.java
│ │ ├── DataChangeLog.java
│ │ └── TenantInfo.java
│ ├── dto # 数据传输对象
│ │ ├── SyncDataRequest.java
│ │ ├── SyncDataItem.java
│ │ └── DataReceiveResult.java
│ ├── mapper # Mapper接口
│ │ ├── SyncConfigMapper.java
│ │ ├── SyncLogMapper.java
│ │ ├── DataChangeLogMapper.java
│ │ └── TenantInfoMapper.java
│ ├── service # 服务接口
│ │ ├── SyncConfigService.java
│ │ ├── SyncLogService.java
│ │ ├── DataChangeLogService.java
│ │ ├── DataSyncService.java
│ │ └── DataReceiveService.java
│ ├── service.impl # 服务实现
│ │ ├── SyncConfigServiceImpl.java
│ │ ├── SyncLogServiceImpl.java
│ │ ├── DataChangeLogServiceImpl.java
│ │ ├── DataSyncServiceImpl.java
│ │ └── DataReceiveServiceImpl.java
│ ├── controller # 控制器
│ │ ├── SyncController.java (公网端)
│ │ └── SyncManageController.java (内网端)
│ ├── aop # AOP拦截器
│ │ └── DataChangeCaptureAop.java
│ └── scheduler # 定时任务
│ └── DataSyncScheduler.java
└── utils
├── DataEncryptUtil.java # 数据加密工具
└── HttpClientUtil.java # HTTP客户端工具
package com.zskk.pacsonline.modules.sync.entity;
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 数据同步配置表
*/
@Data
@TableName("sync_config")
public class SyncConfig {
@TableId(type = IdType.AUTO)
private Long id;
/**
* 租户ID(唯一标识)
*/
private String tenantId;
/**
* 租户名称
*/
private String tenantName;
/**
* 同步模式:auto-自动,manual-手动,schedule-定时
*/
private String syncMode;
/**
* 同步间隔(秒)
*/
private Integer syncInterval;
/**
* Cron表达式
*/
private String cronExpression;
/**
* 需要同步的表名列表(JSON数组)
*/
private String syncTables;
/**
* 公网API地址
*/
private String cloudApiUrl;
/**
* 访问令牌
*/
private String accessToken;
/**
* 数据加密密钥
*/
private String secretKey;
/**
* 是否启用:0-禁用,1-启用
*/
private Integer enabled;
/**
* 失败重试次数
*/
private Integer retryTimes;
/**
* 请求超时时间(毫秒)
*/
private Integer timeout;
/**
* 最后同步时间
*/
private LocalDateTime lastSyncTime;
/**
* 备注
*/
private String remark;
/**
* 创建时间
*/
@TableField(fill = FieldFill.INSERT)
private LocalDateTime createTime;
/**
* 更新时间
*/
@TableField(fill = FieldFill.INSERT_UPDATE)
private LocalDateTime updateTime;
}
package com.zskk.pacsonline.modules.sync.entity;
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 数据同步日志表
*/
@Data
@TableName("sync_log")
public class SyncLog {
@TableId(type = IdType.AUTO)
private Long id;
/**
* 租户ID
*/
private String tenantId;
/**
* 批次ID
*/
private String batchId;
/**
* 同步的表名
*/
private String tableName;
/**
* 操作类型
*/
private String operationType;
/**
* 本次同步记录数
*/
private Integer recordCount;
/**
* 成功数量
*/
private Integer successCount;
/**
* 失败数量
*/
private Integer failCount;
/**
* 同步状态:pending/processing/success/failed
*/
private String syncStatus;
/**
* 开始时间
*/
private LocalDateTime startTime;
/**
* 结束时间
*/
private LocalDateTime endTime;
/**
* 耗时(毫秒)
*/
private Long useTime;
/**
* 错误信息
*/
private String errorMsg;
/**
* 数据快照(JSON格式)
*/
private String dataSnapshot;
/**
* 重试次数
*/
private Integer retryCount;
/**
* 备注
*/
private String remark;
/**
* 创建时间
*/
@TableField(fill = FieldFill.INSERT)
private LocalDateTime createTime;
}
package com.zskk.pacsonline.modules.sync.entity;
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 数据变更记录表
*/
@Data
@TableName("data_change_log")
public class DataChangeLog {
@TableId(type = IdType.AUTO)
private Long id;
/**
* 表名
*/
private String tableName;
/**
* 主键值
*/
private String primaryKey;
/**
* 操作类型:INSERT/UPDATE/DELETE
*/
private String operationType;
/**
* 变更前数据(JSON)
*/
private String beforeData;
/**
* 变更后数据(JSON)
*/
private String afterData;
/**
* 变更时间
*/
@TableField(fill = FieldFill.INSERT)
private LocalDateTime changeTime;
/**
* 同步状态:0-未同步,1-已同步,2-同步失败
*/
private Byte syncStatus;
/**
* 同步时间
*/
private LocalDateTime syncTime;
/**
* 同步批次ID
*/
private String syncBatchId;
/**
* 操作人ID
*/
private Long operatorId;
/**
* 操作人姓名
*/
private String operatorName;
}
package com.zskk.pacsonline.modules.sync.entity;
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 租户信息表(公网端)
*/
@Data
@TableName("tenant_info")
public class TenantInfo {
@TableId(type = IdType.AUTO)
private Long id;
/**
* 租户唯一标识
*/
private String tenantId;
/**
* 租户名称
*/
private String tenantName;
/**
* 租户类型:intranet-内网,cloud-公网
*/
private String tenantType;
/**
* 访问令牌
*/
private String accessToken;
/**
* 数据加密密钥
*/
private String secretKey;
/**
* IP白名单(JSON数组)
*/
private String ipWhitelist;
/**
* 最大存储空间(字节)
*/
private Long maxStorage;
/**
* 已用存储空间(字节)
*/
private Long usedStorage;
/**
* 状态:0-禁用,1-启用
*/
private Integer status;
/**
* 过期时间
*/
private LocalDateTime expireTime;
/**
* 最后同步时间
*/
private LocalDateTime lastSyncTime;
/**
* 联系人
*/
private String contactPerson;
/**
* 联系电话
*/
private String contactPhone;
/**
* 联系邮箱
*/
private String contactEmail;
/**
* 备注
*/
private String remark;
/**
* 创建时间
*/
@TableField(fill = FieldFill.INSERT)
private LocalDateTime createTime;
/**
* 更新时间
*/
@TableField(fill = FieldFill.INSERT_UPDATE)
private LocalDateTime updateTime;
}
package com.zskk.pacsonline.modules.sync.dto;
import lombok.Data;
import java.util.List;
/**
* 数据同步请求对象
*/
@Data
public class SyncDataRequest {
/**
* 租户ID
*/
private String tenantId;
/**
* 表名
*/
private String tableName;
/**
* 变更记录列表
*/
private List<SyncDataItem> changeLogs;
}
package com.zskk.pacsonline.modules.sync.dto;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 同步数据项
*/
@Data
public class SyncDataItem {
/**
* 主键值
*/
private String primaryKey;
/**
* 操作类型:INSERT/UPDATE/DELETE
*/
private String operationType;
/**
* 变更前数据(JSON)
*/
private String beforeData;
/**
* 变更后数据(JSON)
*/
private String afterData;
/**
* 变更时间
*/
private LocalDateTime changeTime;
}
package com.zskk.pacsonline.modules.sync.dto;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
/**
* 数据接收结果
*/
@Data
public class DataReceiveResult {
/**
* 成功数量
*/
private Integer successCount = 0;
/**
* 失败数量
*/
private Integer failCount = 0;
/**
* 失败的数据项
*/
private List<FailedItem> failedItems = new ArrayList<>();
/**
* 添加失败项
*/
public void addFailedItem(SyncDataItem item, String errorMsg) {
FailedItem failedItem = new FailedItem();
failedItem.setItem(item);
failedItem.setErrorMsg(errorMsg);
this.failedItems.add(failedItem);
}
@Data
public static class FailedItem {
private SyncDataItem item;
private String errorMsg;
}
}
package com.zskk.pacsonline.modules.sync.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.zskk.pacsonline.modules.sync.entity.SyncConfig;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface SyncConfigMapper extends BaseMapper<SyncConfig> {
}
package com.zskk.pacsonline.modules.sync.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.zskk.pacsonline.modules.sync.entity.SyncLog;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface SyncLogMapper extends BaseMapper<SyncLog> {
}
package com.zskk.pacsonline.modules.sync.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.zskk.pacsonline.modules.sync.entity.DataChangeLog;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface DataChangeLogMapper extends BaseMapper<DataChangeLog> {
}
package com.zskk.pacsonline.modules.sync.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.zskk.pacsonline.modules.sync.entity.TenantInfo;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface TenantInfoMapper extends BaseMapper<TenantInfo> {
}
package com.zskk.pacsonline.modules.sync.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.zskk.pacsonline.modules.sync.entity.DataChangeLog;
/**
* 数据变更记录服务
*/
public interface DataChangeLogService extends IService<DataChangeLog> {
/**
* 记录数据变更(异步)
*/
void recordChangeAsync(String tableName, String operationType, Object entity);
/**
* 检查是否有未同步的数据
*/
boolean hasUnsyncedData();
}
package com.zskk.pacsonline.modules.sync.service;
import com.zskk.pacsonline.modules.sync.entity.SyncConfig;
/**
* 数据同步服务(内网端)
*/
public interface DataSyncService {
/**
* 执行数据同步
*/
void syncData(SyncConfig config);
/**
* 手动触发同步
*/
void manualSync();
/**
* 检查是否有未同步的数据
*/
boolean hasUnsyncedData();
}
package com.zskk.pacsonline.modules.sync.service;
import com.zskk.pacsonline.modules.sync.dto.DataReceiveResult;
import com.zskk.pacsonline.modules.sync.dto.SyncDataRequest;
import com.zskk.pacsonline.modules.sync.entity.TenantInfo;
/**
* 数据接收服务(公网端)
*/
public interface DataReceiveService {
/**
* 接收并保存数据
*/
DataReceiveResult receiveData(TenantInfo tenant, SyncDataRequest request);
}
package com.zskk.pacsonline.modules.sync.service.impl;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.zskk.pacsonline.modules.sync.entity.DataChangeLog;
import com.zskk.pacsonline.modules.sync.mapper.DataChangeLogMapper;
import com.zskk.pacsonline.modules.sync.service.DataChangeLogService;
import com.zskk.pacsonline.security.LoginUser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
/**
* 数据变更记录服务实现
*/
@Service
public class DataChangeLogServiceImpl extends ServiceImpl<DataChangeLogMapper, DataChangeLog>
implements DataChangeLogService {
private static final Logger logger = LoggerFactory.getLogger(DataChangeLogServiceImpl.class);
@Value("${sync.enabled:false}")
private Boolean syncEnabled;
@Value("${sync.sync-tables:}")
private String syncTables;
/**
* 记录数据变更(异步)
*/
@Override
@Async
public void recordChangeAsync(String tableName, String operationType, Object entity) {
// 检查是否启用同步
if (!syncEnabled) {
return;
}
// 检查表是否在同步列表中
if (!isTableSyncEnabled(tableName)) {
return;
}
try {
DataChangeLog changeLog = new DataChangeLog();
changeLog.setTableName(tableName);
changeLog.setOperationType(operationType);
// 获取主键值
String primaryKey = extractPrimaryKey(entity);
changeLog.setPrimaryKey(primaryKey);
// 记录数据
String jsonData = JSON.toJSONString(entity);
if ("INSERT".equals(operationType)) {
changeLog.setAfterData(jsonData);
} else if ("UPDATE".equals(operationType)) {
changeLog.setAfterData(jsonData);
// TODO: 获取变更前的数据(需要在AOP中传递)
} else if ("DELETE".equals(operationType)) {
changeLog.setBeforeData(jsonData);
}
changeLog.setSyncStatus((byte) 0); // 未同步
changeLog.setChangeTime(LocalDateTime.now());
// 获取操作人信息
try {
Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
if (authentication != null && authentication.getPrincipal() instanceof LoginUser) {
LoginUser loginUser = (LoginUser) authentication.getPrincipal();
changeLog.setOperatorId(loginUser.getUser().getId());
changeLog.setOperatorName(loginUser.getUser().getUsername());
}
} catch (Exception e) {
logger.warn("获取操作人信息失败", e);
}
// 保存变更记录
this.save(changeLog);
logger.debug("记录数据变更:表={}, 操作={}, 主键={}", tableName, operationType, primaryKey);
} catch (Exception e) {
logger.error("记录数据变更失败", e);
}
}
/**
* 检查表是否启用同步
*/
private boolean isTableSyncEnabled(String tableName) {
if (syncTables == null || syncTables.trim().isEmpty()) {
return false;
}
// 解析JSON数组
try {
List<String> tables = JSON.parseArray(syncTables, String.class);
return tables.contains(tableName);
} catch (Exception e) {
// 简单逗号分隔
String[] tables = syncTables.split(",");
return Arrays.asList(tables).contains(tableName);
}
}
/**
* 提取主键值
*/
private String extractPrimaryKey(Object entity) {
try {
// 反射获取id字段
java.lang.reflect.Field idField = entity.getClass().getDeclaredField("id");
idField.setAccessible(true);
Object id = idField.get(entity);
return id != null ? id.toString() : "";
} catch (Exception e) {
logger.warn("提取主键失败", e);
return "";
}
}
/**
* 检查是否有未同步的数据
*/
@Override
public boolean hasUnsyncedData() {
Long count = this.count(new LambdaQueryWrapper<DataChangeLog>()
.eq(DataChangeLog::getSyncStatus, 0));
return count > 0;
}
}
package com.zskk.pacsonline.modules.sync.service.impl;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.zskk.pacsonline.component.response.RestResult;
import com.zskk.pacsonline.modules.sync.dto.DataReceiveResult;
import com.zskk.pacsonline.modules.sync.dto.SyncDataItem;
import com.zskk.pacsonline.modules.sync.dto.SyncDataRequest;
import com.zskk.pacsonline.modules.sync.entity.DataChangeLog;
import com.zskk.pacsonline.modules.sync.entity.SyncConfig;
import com.zskk.pacsonline.modules.sync.entity.SyncLog;
import com.zskk.pacsonline.modules.sync.mapper.DataChangeLogMapper;
import com.zskk.pacsonline.modules.sync.service.DataChangeLogService;
import com.zskk.pacsonline.modules.sync.service.DataSyncService;
import com.zskk.pacsonline.modules.sync.service.SyncConfigService;
import com.zskk.pacsonline.modules.sync.service.SyncLogService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.*;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;
/**
* 数据同步服务实现(内网端)
*/
@Service
public class DataSyncServiceImpl implements DataSyncService {
private static final Logger logger = LoggerFactory.getLogger(DataSyncServiceImpl.class);
@Autowired
private DataChangeLogMapper dataChangeLogMapper;
@Autowired
private DataChangeLogService dataChangeLogService;
@Autowired
private SyncConfigService syncConfigService;
@Autowired
private SyncLogService syncLogService;
@Autowired
private RestTemplate restTemplate;
/**
* 执行数据同步
*/
@Override
@Transactional
public void syncData(SyncConfig config) {
String batchId = UUID.randomUUID().toString().replace("-", "");
logger.info("====== 开始同步数据 ======");
logger.info("租户:{}", config.getTenantId());
logger.info("批次:{}", batchId);
try {
// 1. 查询未同步的数据变更记录
List<DataChangeLog> changeLogs = dataChangeLogMapper.selectList(
new LambdaQueryWrapper<DataChangeLog>()
.eq(DataChangeLog::getSyncStatus, 0)
.orderByAsc(DataChangeLog::getChangeTime)
.last("LIMIT 1000") // 每批次最多1000条
);
if (changeLogs.isEmpty()) {
logger.info("没有需要同步的数据");
return;
}
logger.info("待同步数据:{} 条", changeLogs.size());
// 2. 按表名分组
Map<String, List<DataChangeLog>> groupedLogs = changeLogs.stream()
.collect(Collectors.groupingBy(DataChangeLog::getTableName));
// 3. 逐表同步
int totalSuccess = 0;
int totalFail = 0;
for (Map.Entry<String, List<DataChangeLog>> entry : groupedLogs.entrySet()) {
String tableName = entry.getKey();
List<DataChangeLog> logs = entry.getValue();
logger.info("同步表:{}, 记录数:{}", tableName, logs.size());
int[] result = syncTable(config, batchId, tableName, logs);
totalSuccess += result[0];
totalFail += result[1];
}
// 4. 更新最后同步时间
config.setLastSyncTime(LocalDateTime.now());
syncConfigService.updateById(config);
logger.info("====== 同步完成 ======");
logger.info("成功:{} 条, 失败:{} 条", totalSuccess, totalFail);
} catch (Exception e) {
logger.error("同步失败,批次:" + batchId, e);
}
}
/**
* 同步单个表的数据
* @return [成功数, 失败数]
*/
private int[] syncTable(SyncConfig config, String batchId,
String tableName, List<DataChangeLog> logs) {
SyncLog syncLog = new SyncLog();
syncLog.setTenantId(config.getTenantId());
syncLog.setBatchId(batchId);
syncLog.setTableName(tableName);
syncLog.setRecordCount(logs.size());
syncLog.setSyncStatus("processing");
syncLog.setStartTime(LocalDateTime.now());
syncLog.setRetryCount(0);
syncLogService.save(syncLog);
int successCount = 0;
int failCount = 0;
List<DataChangeLog> failedLogs = new ArrayList<>();
try {
// 构建同步数据请求
SyncDataRequest request = new SyncDataRequest();
request.setTenantId(config.getTenantId());
request.setTableName(tableName);
request.setChangeLogs(logs.stream().map(log -> {
SyncDataItem item = new SyncDataItem();
item.setPrimaryKey(log.getPrimaryKey());
item.setOperationType(log.getOperationType());
item.setBeforeData(log.getBeforeData());
item.setAfterData(log.getAfterData());
item.setChangeTime(log.getChangeTime());
return item;
}).collect(Collectors.toList()));
// 发送HTTP请求到公网
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.setBearerAuth(config.getAccessToken());
HttpEntity<SyncDataRequest> entity = new HttpEntity<>(request, headers);
String url = config.getCloudApiUrl() + "/api/sync/receiveData";
logger.info("发送请求:{}", url);
ResponseEntity<RestResult> response = restTemplate.exchange(
url,
HttpMethod.POST,
entity,
RestResult.class
);
logger.info("响应状态:{}", response.getStatusCode());
if (response.getStatusCode() == HttpStatus.OK &&
response.getBody() != null &&
response.getBody().getCode() == 200) {
// 同步成功,更新状态
successCount = logs.size();
for (DataChangeLog log : logs) {
log.setSyncStatus((byte) 1);
log.setSyncTime(LocalDateTime.now());
log.setSyncBatchId(batchId);
dataChangeLogMapper.updateById(log);
}
syncLog.setSyncStatus("success");
logger.info("同步成功:{} 条", successCount);
} else {
// 同步失败
failCount = logs.size();
failedLogs.addAll(logs);
syncLog.setSyncStatus("failed");
syncLog.setErrorMsg("公网返回错误:" + (response.getBody() != null ? response.getBody().getMessage() : "未知错误"));
logger.error("同步失败:{}", syncLog.getErrorMsg());
}
} catch (Exception e) {
logger.error("同步表 " + tableName + " 失败", e);
failCount = logs.size();
failedLogs.addAll(logs);
// 标记为同步失败
for (DataChangeLog log : logs) {
log.setSyncStatus((byte) 2);
dataChangeLogMapper.updateById(log);
}
syncLog.setSyncStatus("failed");
syncLog.setErrorMsg(e.getMessage());
}
// 更新同步日志
syncLog.setSuccessCount(successCount);
syncLog.setFailCount(failCount);
syncLog.setEndTime(LocalDateTime.now());
syncLog.setUseTime(
Duration.between(syncLog.getStartTime(), syncLog.getEndTime()).toMillis()
);
if (!failedLogs.isEmpty()) {
syncLog.setDataSnapshot(JSON.toJSONString(failedLogs));
}
syncLogService.updateById(syncLog);
return new int[]{successCount, failCount};
}
/**
* 手动触发同步
*/
@Override
public void manualSync() {
SyncConfig config = syncConfigService.getOne(
new LambdaQueryWrapper<SyncConfig>()
.eq(SyncConfig::getEnabled, 1)
.last("LIMIT 1")
);
if (config == null) {
logger.warn("未找到启用的同步配置");
return;
}
syncData(config);
}
/**
* 检查是否有未同步的数据
*/
@Override
public boolean hasUnsyncedData() {
return dataChangeLogService.hasUnsyncedData();
}
}
package com.zskk.pacsonline.modules.sync.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zskk.pacsonline.modules.sync.dto.DataReceiveResult;
import com.zskk.pacsonline.modules.sync.dto.SyncDataItem;
import com.zskk.pacsonline.modules.sync.dto.SyncDataRequest;
import com.zskk.pacsonline.modules.sync.entity.TenantInfo;
import com.zskk.pacsonline.modules.sync.service.DataReceiveService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.List;
/**
* 数据接收服务实现(公网端)
*/
@Service
public class DataReceiveServiceImpl implements DataReceiveService {
private static final Logger logger = LoggerFactory.getLogger(DataReceiveServiceImpl.class);
@Autowired
private JdbcTemplate jdbcTemplate;
/**
* 接收并保存数据
*/
@Override
@Transactional
public DataReceiveResult receiveData(TenantInfo tenant, SyncDataRequest request) {
logger.info("接收数据:租户={}, 表={}, 记录数={}",
tenant.getTenantId(), request.getTableName(), request.getChangeLogs().size());
DataReceiveResult result = new DataReceiveResult();
result.setSuccessCount(0);
result.setFailCount(0);
String tableName = request.getTableName();
// 多租户数据隔离:添加租户前缀
String targetTableName = tenant.getTenantId() + "_" + tableName;
// 确保目标表存在
ensureTableExists(targetTableName, tableName);
// 处理每条数据
for (SyncDataItem item : request.getChangeLogs()) {
try {
switch (item.getOperationType()) {
case "INSERT":
insertData(targetTableName, item.getAfterData());
break;
case "UPDATE":
updateData(targetTableName, item.getPrimaryKey(), item.getAfterData());
break;
case "DELETE":
deleteData(targetTableName, item.getPrimaryKey());
break;
default:
logger.warn("未知的操作类型:{}", item.getOperationType());
continue;
}
result.setSuccessCount(result.getSuccessCount() + 1);
} catch (Exception e) {
logger.error("处理数据失败:" + item, e);
result.setFailCount(result.getFailCount() + 1);
result.addFailedItem(item, e.getMessage());
}
}
logger.info("接收完成:成功={}, 失败={}", result.getSuccessCount(), result.getFailCount());
return result;
}
/**
* 确保目标表存在
*/
private void ensureTableExists(String targetTableName, String sourceTableName) {
String checkSql = "SELECT COUNT(*) FROM information_schema.tables " +
"WHERE table_schema = DATABASE() AND table_name = ?";
Integer count = jdbcTemplate.queryForObject(checkSql, Integer.class, targetTableName);
if (count == null || count == 0) {
// 表不存在,根据源表结构创建
String createSql = "CREATE TABLE " + targetTableName + " LIKE " + sourceTableName;
jdbcTemplate.execute(createSql);
logger.info("创建租户表:{}", targetTableName);
}
}
/**
* 插入数据
*/
private void insertData(String tableName, String jsonData) {
if (jsonData == null) {
return;
}
JSONObject data = JSON.parseObject(jsonData);
StringBuilder columns = new StringBuilder();
StringBuilder values = new StringBuilder();
List<Object> params = new ArrayList<>();
for (String key : data.keySet()) {
if (columns.length() > 0) {
columns.append(", ");
values.append(", ");
}
columns.append("`").append(key).append("`");
values.append("?");
params.add(data.get(key));
}
String sql = "INSERT INTO " + tableName + " (" + columns + ") VALUES (" + values + ")";
jdbcTemplate.update(sql, params.toArray());
logger.debug("插入数据:表={}, SQL={}", tableName, sql);
}
/**
* 更新数据
*/
private void updateData(String tableName, String primaryKey, String jsonData) {
if (jsonData == null) {
return;
}
JSONObject data = JSON.parseObject(jsonData);
StringBuilder sets = new StringBuilder();
List<Object> params = new ArrayList<>();
Object pkValue = null;
for (String key : data.keySet()) {
if ("id".equals(key)) {
pkValue = data.get(key);
continue;
}
if (sets.length() > 0) {
sets.append(", ");
}
sets.append("`").append(key).append("` = ?");
params.add(data.get(key));
}
params.add(pkValue);
String sql = "UPDATE " + tableName + " SET " + sets + " WHERE id = ?";
jdbcTemplate.update(sql, params.toArray());
logger.debug("更新数据:表={}, 主键={}", tableName, primaryKey);
}
/**
* 删除数据
*/
private void deleteData(String tableName, String primaryKey) {
String sql = "DELETE FROM " + tableName + " WHERE id = ?";
jdbcTemplate.update(sql, primaryKey);
logger.debug("删除数据:表={}, 主键={}", tableName, primaryKey);
}
}
package com.zskk.pacsonline.modules.sync.controller;
import com.zskk.pacsonline.component.response.RestResult;
import com.zskk.pacsonline.modules.sync.dto.DataReceiveResult;
import com.zskk.pacsonline.modules.sync.dto.SyncDataRequest;
import com.zskk.pacsonline.modules.sync.entity.TenantInfo;
import com.zskk.pacsonline.modules.sync.service.DataReceiveService;
import com.zskk.pacsonline.modules.sync.service.TenantInfoService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
/**
* 数据同步控制器(公网端)
*/
@RestController
@RequestMapping("/api/sync")
public class SyncController {
private static final Logger logger = LoggerFactory.getLogger(SyncController.class);
@Autowired
private TenantInfoService tenantInfoService;
@Autowired
private DataReceiveService dataReceiveService;
/**
* 接收内网同步的数据
*/
@PostMapping("/receiveData")
public RestResult<?> receiveData(@RequestBody SyncDataRequest request,
@RequestHeader("Authorization") String token) {
try {
// 1. 验证令牌
if (token == null || !token.startsWith("Bearer ")) {
return RestResult.error("无效的访问令牌");
}
String accessToken = token.substring(7);
// 2. 验证租户
TenantInfo tenant = tenantInfoService.validateToken(request.getTenantId(), accessToken);
if (tenant == null) {
logger.warn("租户验证失败:{}", request.getTenantId());
return RestResult.error("租户验证失败");
}
if (tenant.getStatus() == 0) {
return RestResult.error("租户已被禁用");
}
// 3. 检查存储空间
if (tenant.getMaxStorage() != null &&
tenant.getUsedStorage() != null &&
tenant.getUsedStorage() >= tenant.getMaxStorage()) {
return RestResult.error("存储空间已满");
}
// 4. 接收并保存数据
DataReceiveResult result = dataReceiveService.receiveData(tenant, request);
// 5. 更新租户最后同步时间
tenant.setLastSyncTime(LocalDateTime.now());
tenantInfoService.updateById(tenant);
logger.info("接收数据成功:租户={}, 表={}, 记录数={}, 成功={}, 失败={}",
request.getTenantId(),
request.getTableName(),
request.getChangeLogs().size(),
result.getSuccessCount(),
result.getFailCount());
return RestResult.ok("同步成功", result);
} catch (Exception e) {
logger.error("接收数据失败", e);
return RestResult.error("同步失败:" + e.getMessage());
}
}
/**
* 查询同步状态
*/
@GetMapping("/status/{tenantId}")
public RestResult<?> getSyncStatus(@PathVariable String tenantId,
@RequestHeader("Authorization") String token) {
// 验证租户
String accessToken = token.substring(7);
TenantInfo tenant = tenantInfoService.validateToken(tenantId, accessToken);
if (tenant == null) {
return RestResult.error("租户验证失败");
}
// 查询同步状态
Map<String, Object> status = new HashMap<>();
status.put("tenantId", tenant.getTenantId());
status.put("tenantName", tenant.getTenantName());
status.put("lastSyncTime", tenant.getLastSyncTime());
status.put("usedStorage", tenant.getUsedStorage());
status.put("maxStorage", tenant.getMaxStorage());
status.put("status", tenant.getStatus());
return RestResult.ok(status);
}
}
package com.zskk.pacsonline.modules.sync.controller;
import com.zskk.pacsonline.component.response.RestResult;
import com.zskk.pacsonline.modules.sync.service.DataSyncService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/**
* 数据同步管理控制器(内网端)
*/
@RestController
@RequestMapping("/api/syncManage")
public class SyncManageController {
@Autowired
private DataSyncService dataSyncService;
/**
* 手动触发同步
*/
@PostMapping("/manual")
public RestResult<?> manualSync() {
try {
dataSyncService.manualSync();
return RestResult.ok("同步任务已启动");
} catch (Exception e) {
return RestResult.error("同步失败:" + e.getMessage());
}
}
/**
* 检查未同步数据
*/
@GetMapping("/checkUnsynced")
public RestResult<?> checkUnsynced() {
boolean hasUnsynced = dataSyncService.hasUnsyncedData();
return RestResult.ok(hasUnsynced ? "有未同步的数据" : "所有数据已同步");
}
}
package com.zskk.pacsonline.modules.sync.aop;
import com.zskk.pacsonline.modules.sync.service.DataChangeLogService;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 数据变更捕获AOP
* 拦截所有Mapper的insert/update/delete操作
*/
@Aspect
@Component
public class DataChangeCaptureAop {
private static final Logger logger = LoggerFactory.getLogger(DataChangeCaptureAop.class);
@Autowired
private DataChangeLogService dataChangeLogService;
/**
* 拦截Mapper的insert/update/delete方法
*/
@Around("execution(* com.zskk.pacsonline.modules.*.mapper.*Mapper.insert*(..)) || " +
"execution(* com.zskk.pacsonline.modules.*.mapper.*Mapper.update*(..)) || " +
"execution(* com.zskk.pacsonline.modules.*.mapper.*Mapper.delete*(..))")
public Object captureDataChange(ProceedingJoinPoint jp) throws Throwable {
// 判断操作类型
String methodName = jp.getSignature().getName();
String operationType = getOperationType(methodName);
// 获取表名
String tableName = getTableName(jp);
// 获取实体对象
Object entity = getEntity(jp.getArgs());
// 执行原方法
Object result = jp.proceed();
// 记录变更日志(异步)
if (entity != null && tableName != null) {
try {
dataChangeLogService.recordChangeAsync(tableName, operationType, entity);
} catch (Exception e) {
logger.error("记录数据变更失败", e);
}
}
return result;
}
/**
* 获取操作类型
*/
private String getOperationType(String methodName) {
if (methodName.startsWith("insert")) {
return "INSERT";
} else if (methodName.startsWith("update")) {
return "UPDATE";
} else if (methodName.startsWith("delete")) {
return "DELETE";
}
return "UNKNOWN";
}
/**
* 获取表名
*/
private String getTableName(ProceedingJoinPoint jp) {
try {
// 从Mapper接口名推断表名
String className = jp.getTarget().getClass().getSimpleName();
// 例如:SysUserMapper -> sys_user
String entityName = className.replace("Mapper", "")
.replaceAll("([A-Z])", "_$1")
.toLowerCase()
.substring(1);
return entityName;
} catch (Exception e) {
logger.warn("获取表名失败", e);
return null;
}
}
/**
* 获取实体对象
*/
private Object getEntity(Object[] args) {
if (args == null || args.length == 0) {
return null;
}
// 第一个参数通常是实体对象
for (Object arg : args) {
if (arg != null && !isPrimitiveOrWrapper(arg.getClass())) {
return arg;
}
}
return null;
}
/**
* 判断是否是基本类型或包装类
*/
private boolean isPrimitiveOrWrapper(Class<?> clazz) {
return clazz.isPrimitive() ||
clazz == String.class ||
clazz == Integer.class ||
clazz == Long.class ||
clazz == Double.class ||
clazz == Float.class ||
clazz == Boolean.class ||
clazz == Byte.class ||
clazz == Short.class ||
clazz == Character.class;
}
}
package com.zskk.pacsonline.modules.sync.scheduler;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.zskk.pacsonline.modules.sync.entity.SyncConfig;
import com.zskk.pacsonline.modules.sync.service.DataSyncService;
import com.zskk.pacsonline.modules.sync.service.SyncConfigService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* 数据同步定时任务
*/
@Component
public class DataSyncScheduler {
private static final Logger logger = LoggerFactory.getLogger(DataSyncScheduler.class);
@Autowired
private SyncConfigService syncConfigService;
@Autowired
private DataSyncService dataSyncService;
/**
* 定时检查并执行同步任务
* 每分钟检查一次
*/
@Scheduled(cron = "0 * * * * ?")
public void checkAndSync() {
logger.debug("检查同步任务...");
// 获取启用的同步配置
List<SyncConfig> configs = syncConfigService.list(
new LambdaQueryWrapper<SyncConfig>()
.eq(SyncConfig::getEnabled, 1)
);
if (configs.isEmpty()) {
return;
}
for (SyncConfig config : configs) {
// 判断是否需要同步
if (shouldSync(config)) {
// 异步执行同步任务
CompletableFuture.runAsync(() -> {
try {
dataSyncService.syncData(config);
} catch (Exception e) {
logger.error("同步任务执行失败", e);
}
});
}
}
}
/**
* 判断是否需要同步
*/
private boolean shouldSync(SyncConfig config) {
String syncMode = config.getSyncMode();
if ("manual".equals(syncMode)) {
return false; // 手动模式不自动同步
}
if ("auto".equals(syncMode)) {
// 实时模式:检查是否有未同步的数据
return dataSyncService.hasUnsyncedData();
}
if ("schedule".equals(syncMode)) {
// 定时模式:判断是否到达同步时间
LocalDateTime lastSyncTime = config.getLastSyncTime();
if (lastSyncTime == null) {
return true; // 从未同步过
}
Integer interval = config.getSyncInterval();
if (interval == null || interval <= 0) {
return false;
}
LocalDateTime nextSyncTime = lastSyncTime.plusSeconds(interval);
return LocalDateTime.now().isAfter(nextSyncTime);
}
return false;
}
}
package com.zskk.pacsonline.utils;
import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
import java.util.Base64;
/**
* 数据加密工具类
*/
public class DataEncryptUtil {
private static final String ALGORITHM = "AES";
private static final String TRANSFORMATION = "AES/ECB/PKCS5Padding";
/**
* AES加密
*/
public static String encrypt(String data, String secretKey) {
try {
// 密钥长度必须是16/24/32字节
byte[] keyBytes = secretKey.getBytes();
if (keyBytes.length != 16 && keyBytes.length != 24 && keyBytes.length != 32) {
throw new IllegalArgumentException("密钥长度必须是16、24或32字节");
}
SecretKeySpec key = new SecretKeySpec(keyBytes, ALGORITHM);
Cipher cipher = Cipher.getInstance(TRANSFORMATION);
cipher.init(Cipher.ENCRYPT_MODE, key);
byte[] encrypted = cipher.doFinal(data.getBytes("UTF-8"));
return Base64.getEncoder().encodeToString(encrypted);
} catch (Exception e) {
throw new RuntimeException("加密失败", e);
}
}
/**
* AES解密
*/
public static String decrypt(String encryptedData, String secretKey) {
try {
byte[] keyBytes = secretKey.getBytes();
if (keyBytes.length != 16 && keyBytes.length != 24 && keyBytes.length != 32) {
throw new IllegalArgumentException("密钥长度必须是16、24或32字节");
}
SecretKeySpec key = new SecretKeySpec(keyBytes, ALGORITHM);
Cipher cipher = Cipher.getInstance(TRANSFORMATION);
cipher.init(Cipher.DECRYPT_MODE, key);
byte[] decrypted = cipher.doFinal(Base64.getDecoder().decode(encryptedData));
return new String(decrypted, "UTF-8");
} catch (Exception e) {
throw new RuntimeException("解密失败", e);
}
}
/**
* 生成随机密钥(32字节)
*/
public static String generateKey() {
String chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
StringBuilder key = new StringBuilder();
for (int i = 0; i < 32; i++) {
int index = (int) (Math.random() * chars.length());
key.append(chars.charAt(index));
}
return key.toString();
}
}
package com.zskk.pacsonline.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;
/**
* RestTemplate配置
*/
@Configuration
public class RestTemplateConfig {
@Bean
public RestTemplate restTemplate() {
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
factory.setConnectTimeout(5000); // 连接超时5秒
factory.setReadTimeout(30000); // 读取超时30秒
return new RestTemplate(factory);
}
}
添加依赖(pom.xml已包含)
配置application.yml
sync:
enabled: true
tenant-id: tenant_001
tenant-name: 内网环境A
cloud-api-url: https://cloud.example.com
access-token: your_jwt_token_here
sync-mode: auto
sync-interval: 300
sync-tables: '["sys_user","sys_role","sys_menu"]'
bash
mysql -u root -p your_database < doc/sql/sync_tables.sql
启动应用
数据变更会自动记录到data_change_log表
定时任务每分钟检查一次,自动同步到公网
配置application.yml
multi-tenant:
enabled: true
sql
INSERT INTO tenant_info (tenant_id, tenant_name, access_token, status)
VALUES ('tenant_001', '内网环境A', 'your_jwt_token_here', 1);
启动应用
等待内网端推送数据
数据会自动保存到tenant_001_表名的表中
文档版本:v1.0 最后更新:2025-10-30