数据同步核心代码实现.md 48 KB

数据同步核心代码实现

目录结构

com.zskk.pacsonline
├── modules
│   └── sync
│       ├── entity          # 实体类
│       │   ├── SyncConfig.java
│       │   ├── SyncLog.java
│       │   ├── DataChangeLog.java
│       │   └── TenantInfo.java
│       ├── dto             # 数据传输对象
│       │   ├── SyncDataRequest.java
│       │   ├── SyncDataItem.java
│       │   └── DataReceiveResult.java
│       ├── mapper          # Mapper接口
│       │   ├── SyncConfigMapper.java
│       │   ├── SyncLogMapper.java
│       │   ├── DataChangeLogMapper.java
│       │   └── TenantInfoMapper.java
│       ├── service         # 服务接口
│       │   ├── SyncConfigService.java
│       │   ├── SyncLogService.java
│       │   ├── DataChangeLogService.java
│       │   ├── DataSyncService.java
│       │   └── DataReceiveService.java
│       ├── service.impl    # 服务实现
│       │   ├── SyncConfigServiceImpl.java
│       │   ├── SyncLogServiceImpl.java
│       │   ├── DataChangeLogServiceImpl.java
│       │   ├── DataSyncServiceImpl.java
│       │   └── DataReceiveServiceImpl.java
│       ├── controller      # 控制器
│       │   ├── SyncController.java (公网端)
│       │   └── SyncManageController.java (内网端)
│       ├── aop             # AOP拦截器
│       │   └── DataChangeCaptureAop.java
│       └── scheduler       # 定时任务
│           └── DataSyncScheduler.java
└── utils
    ├── DataEncryptUtil.java    # 数据加密工具
    └── HttpClientUtil.java      # HTTP客户端工具

1. 实体类 (Entity)

1.1 SyncConfig.java

package com.zskk.pacsonline.modules.sync.entity;

import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.time.LocalDateTime;

/**
 * 数据同步配置表
 */
@Data
@TableName("sync_config")
public class SyncConfig {

    @TableId(type = IdType.AUTO)
    private Long id;

    /**
     * 租户ID(唯一标识)
     */
    private String tenantId;

    /**
     * 租户名称
     */
    private String tenantName;

    /**
     * 同步模式:auto-自动,manual-手动,schedule-定时
     */
    private String syncMode;

    /**
     * 同步间隔(秒)
     */
    private Integer syncInterval;

    /**
     * Cron表达式
     */
    private String cronExpression;

    /**
     * 需要同步的表名列表(JSON数组)
     */
    private String syncTables;

    /**
     * 公网API地址
     */
    private String cloudApiUrl;

    /**
     * 访问令牌
     */
    private String accessToken;

    /**
     * 数据加密密钥
     */
    private String secretKey;

    /**
     * 是否启用:0-禁用,1-启用
     */
    private Integer enabled;

    /**
     * 失败重试次数
     */
    private Integer retryTimes;

    /**
     * 请求超时时间(毫秒)
     */
    private Integer timeout;

    /**
     * 最后同步时间
     */
    private LocalDateTime lastSyncTime;

    /**
     * 备注
     */
    private String remark;

    /**
     * 创建时间
     */
    @TableField(fill = FieldFill.INSERT)
    private LocalDateTime createTime;

    /**
     * 更新时间
     */
    @TableField(fill = FieldFill.INSERT_UPDATE)
    private LocalDateTime updateTime;
}

1.2 SyncLog.java

package com.zskk.pacsonline.modules.sync.entity;

import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.time.LocalDateTime;

/**
 * 数据同步日志表
 */
@Data
@TableName("sync_log")
public class SyncLog {

    @TableId(type = IdType.AUTO)
    private Long id;

    /**
     * 租户ID
     */
    private String tenantId;

    /**
     * 批次ID
     */
    private String batchId;

    /**
     * 同步的表名
     */
    private String tableName;

    /**
     * 操作类型
     */
    private String operationType;

    /**
     * 本次同步记录数
     */
    private Integer recordCount;

    /**
     * 成功数量
     */
    private Integer successCount;

    /**
     * 失败数量
     */
    private Integer failCount;

    /**
     * 同步状态:pending/processing/success/failed
     */
    private String syncStatus;

    /**
     * 开始时间
     */
    private LocalDateTime startTime;

    /**
     * 结束时间
     */
    private LocalDateTime endTime;

    /**
     * 耗时(毫秒)
     */
    private Long useTime;

    /**
     * 错误信息
     */
    private String errorMsg;

    /**
     * 数据快照(JSON格式)
     */
    private String dataSnapshot;

    /**
     * 重试次数
     */
    private Integer retryCount;

    /**
     * 备注
     */
    private String remark;

    /**
     * 创建时间
     */
    @TableField(fill = FieldFill.INSERT)
    private LocalDateTime createTime;
}

1.3 DataChangeLog.java

package com.zskk.pacsonline.modules.sync.entity;

import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.time.LocalDateTime;

/**
 * 数据变更记录表
 */
@Data
@TableName("data_change_log")
public class DataChangeLog {

    @TableId(type = IdType.AUTO)
    private Long id;

    /**
     * 表名
     */
    private String tableName;

    /**
     * 主键值
     */
    private String primaryKey;

    /**
     * 操作类型:INSERT/UPDATE/DELETE
     */
    private String operationType;

    /**
     * 变更前数据(JSON)
     */
    private String beforeData;

    /**
     * 变更后数据(JSON)
     */
    private String afterData;

    /**
     * 变更时间
     */
    @TableField(fill = FieldFill.INSERT)
    private LocalDateTime changeTime;

    /**
     * 同步状态:0-未同步,1-已同步,2-同步失败
     */
    private Byte syncStatus;

    /**
     * 同步时间
     */
    private LocalDateTime syncTime;

    /**
     * 同步批次ID
     */
    private String syncBatchId;

    /**
     * 操作人ID
     */
    private Long operatorId;

    /**
     * 操作人姓名
     */
    private String operatorName;
}

1.4 TenantInfo.java

package com.zskk.pacsonline.modules.sync.entity;

import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.time.LocalDateTime;

/**
 * 租户信息表(公网端)
 */
@Data
@TableName("tenant_info")
public class TenantInfo {

    @TableId(type = IdType.AUTO)
    private Long id;

    /**
     * 租户唯一标识
     */
    private String tenantId;

    /**
     * 租户名称
     */
    private String tenantName;

    /**
     * 租户类型:intranet-内网,cloud-公网
     */
    private String tenantType;

    /**
     * 访问令牌
     */
    private String accessToken;

    /**
     * 数据加密密钥
     */
    private String secretKey;

    /**
     * IP白名单(JSON数组)
     */
    private String ipWhitelist;

    /**
     * 最大存储空间(字节)
     */
    private Long maxStorage;

    /**
     * 已用存储空间(字节)
     */
    private Long usedStorage;

    /**
     * 状态:0-禁用,1-启用
     */
    private Integer status;

    /**
     * 过期时间
     */
    private LocalDateTime expireTime;

    /**
     * 最后同步时间
     */
    private LocalDateTime lastSyncTime;

    /**
     * 联系人
     */
    private String contactPerson;

    /**
     * 联系电话
     */
    private String contactPhone;

    /**
     * 联系邮箱
     */
    private String contactEmail;

    /**
     * 备注
     */
    private String remark;

    /**
     * 创建时间
     */
    @TableField(fill = FieldFill.INSERT)
    private LocalDateTime createTime;

    /**
     * 更新时间
     */
    @TableField(fill = FieldFill.INSERT_UPDATE)
    private LocalDateTime updateTime;
}

2. 数据传输对象 (DTO)

2.1 SyncDataRequest.java

package com.zskk.pacsonline.modules.sync.dto;

import lombok.Data;
import java.util.List;

/**
 * 数据同步请求对象
 */
@Data
public class SyncDataRequest {

    /**
     * 租户ID
     */
    private String tenantId;

    /**
     * 表名
     */
    private String tableName;

    /**
     * 变更记录列表
     */
    private List<SyncDataItem> changeLogs;
}

2.2 SyncDataItem.java

package com.zskk.pacsonline.modules.sync.dto;

import lombok.Data;
import java.time.LocalDateTime;

/**
 * 同步数据项
 */
@Data
public class SyncDataItem {

    /**
     * 主键值
     */
    private String primaryKey;

    /**
     * 操作类型:INSERT/UPDATE/DELETE
     */
    private String operationType;

    /**
     * 变更前数据(JSON)
     */
    private String beforeData;

    /**
     * 变更后数据(JSON)
     */
    private String afterData;

    /**
     * 变更时间
     */
    private LocalDateTime changeTime;
}

2.3 DataReceiveResult.java

package com.zskk.pacsonline.modules.sync.dto;

import lombok.Data;
import java.util.ArrayList;
import java.util.List;

/**
 * 数据接收结果
 */
@Data
public class DataReceiveResult {

    /**
     * 成功数量
     */
    private Integer successCount = 0;

    /**
     * 失败数量
     */
    private Integer failCount = 0;

    /**
     * 失败的数据项
     */
    private List<FailedItem> failedItems = new ArrayList<>();

    /**
     * 添加失败项
     */
    public void addFailedItem(SyncDataItem item, String errorMsg) {
        FailedItem failedItem = new FailedItem();
        failedItem.setItem(item);
        failedItem.setErrorMsg(errorMsg);
        this.failedItems.add(failedItem);
    }

    @Data
    public static class FailedItem {
        private SyncDataItem item;
        private String errorMsg;
    }
}

3. Mapper接口

3.1 SyncConfigMapper.java

package com.zskk.pacsonline.modules.sync.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.zskk.pacsonline.modules.sync.entity.SyncConfig;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface SyncConfigMapper extends BaseMapper<SyncConfig> {
}

3.2 SyncLogMapper.java

package com.zskk.pacsonline.modules.sync.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.zskk.pacsonline.modules.sync.entity.SyncLog;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface SyncLogMapper extends BaseMapper<SyncLog> {
}

3.3 DataChangeLogMapper.java

package com.zskk.pacsonline.modules.sync.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.zskk.pacsonline.modules.sync.entity.DataChangeLog;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface DataChangeLogMapper extends BaseMapper<DataChangeLog> {
}

3.4 TenantInfoMapper.java

package com.zskk.pacsonline.modules.sync.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.zskk.pacsonline.modules.sync.entity.TenantInfo;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface TenantInfoMapper extends BaseMapper<TenantInfo> {
}

4. Service接口

4.1 DataChangeLogService.java

package com.zskk.pacsonline.modules.sync.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.zskk.pacsonline.modules.sync.entity.DataChangeLog;

/**
 * 数据变更记录服务
 */
public interface DataChangeLogService extends IService<DataChangeLog> {

    /**
     * 记录数据变更(异步)
     */
    void recordChangeAsync(String tableName, String operationType, Object entity);

    /**
     * 检查是否有未同步的数据
     */
    boolean hasUnsyncedData();
}

4.2 DataSyncService.java

package com.zskk.pacsonline.modules.sync.service;

import com.zskk.pacsonline.modules.sync.entity.SyncConfig;

/**
 * 数据同步服务(内网端)
 */
public interface DataSyncService {

    /**
     * 执行数据同步
     */
    void syncData(SyncConfig config);

    /**
     * 手动触发同步
     */
    void manualSync();

    /**
     * 检查是否有未同步的数据
     */
    boolean hasUnsyncedData();
}

4.3 DataReceiveService.java

package com.zskk.pacsonline.modules.sync.service;

import com.zskk.pacsonline.modules.sync.dto.DataReceiveResult;
import com.zskk.pacsonline.modules.sync.dto.SyncDataRequest;
import com.zskk.pacsonline.modules.sync.entity.TenantInfo;

/**
 * 数据接收服务(公网端)
 */
public interface DataReceiveService {

    /**
     * 接收并保存数据
     */
    DataReceiveResult receiveData(TenantInfo tenant, SyncDataRequest request);
}

5. Service实现(关键代码)

5.1 DataChangeLogServiceImpl.java

package com.zskk.pacsonline.modules.sync.service.impl;

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.zskk.pacsonline.modules.sync.entity.DataChangeLog;
import com.zskk.pacsonline.modules.sync.mapper.DataChangeLogMapper;
import com.zskk.pacsonline.modules.sync.service.DataChangeLogService;
import com.zskk.pacsonline.security.LoginUser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;

/**
 * 数据变更记录服务实现
 */
@Service
public class DataChangeLogServiceImpl extends ServiceImpl<DataChangeLogMapper, DataChangeLog>
        implements DataChangeLogService {

    private static final Logger logger = LoggerFactory.getLogger(DataChangeLogServiceImpl.class);

    @Value("${sync.enabled:false}")
    private Boolean syncEnabled;

    @Value("${sync.sync-tables:}")
    private String syncTables;

    /**
     * 记录数据变更(异步)
     */
    @Override
    @Async
    public void recordChangeAsync(String tableName, String operationType, Object entity) {
        // 检查是否启用同步
        if (!syncEnabled) {
            return;
        }

        // 检查表是否在同步列表中
        if (!isTableSyncEnabled(tableName)) {
            return;
        }

        try {
            DataChangeLog changeLog = new DataChangeLog();
            changeLog.setTableName(tableName);
            changeLog.setOperationType(operationType);

            // 获取主键值
            String primaryKey = extractPrimaryKey(entity);
            changeLog.setPrimaryKey(primaryKey);

            // 记录数据
            String jsonData = JSON.toJSONString(entity);
            if ("INSERT".equals(operationType)) {
                changeLog.setAfterData(jsonData);
            } else if ("UPDATE".equals(operationType)) {
                changeLog.setAfterData(jsonData);
                // TODO: 获取变更前的数据(需要在AOP中传递)
            } else if ("DELETE".equals(operationType)) {
                changeLog.setBeforeData(jsonData);
            }

            changeLog.setSyncStatus((byte) 0); // 未同步
            changeLog.setChangeTime(LocalDateTime.now());

            // 获取操作人信息
            try {
                Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
                if (authentication != null && authentication.getPrincipal() instanceof LoginUser) {
                    LoginUser loginUser = (LoginUser) authentication.getPrincipal();
                    changeLog.setOperatorId(loginUser.getUser().getId());
                    changeLog.setOperatorName(loginUser.getUser().getUsername());
                }
            } catch (Exception e) {
                logger.warn("获取操作人信息失败", e);
            }

            // 保存变更记录
            this.save(changeLog);

            logger.debug("记录数据变更:表={}, 操作={}, 主键={}", tableName, operationType, primaryKey);

        } catch (Exception e) {
            logger.error("记录数据变更失败", e);
        }
    }

    /**
     * 检查表是否启用同步
     */
    private boolean isTableSyncEnabled(String tableName) {
        if (syncTables == null || syncTables.trim().isEmpty()) {
            return false;
        }

        // 解析JSON数组
        try {
            List<String> tables = JSON.parseArray(syncTables, String.class);
            return tables.contains(tableName);
        } catch (Exception e) {
            // 简单逗号分隔
            String[] tables = syncTables.split(",");
            return Arrays.asList(tables).contains(tableName);
        }
    }

    /**
     * 提取主键值
     */
    private String extractPrimaryKey(Object entity) {
        try {
            // 反射获取id字段
            java.lang.reflect.Field idField = entity.getClass().getDeclaredField("id");
            idField.setAccessible(true);
            Object id = idField.get(entity);
            return id != null ? id.toString() : "";
        } catch (Exception e) {
            logger.warn("提取主键失败", e);
            return "";
        }
    }

    /**
     * 检查是否有未同步的数据
     */
    @Override
    public boolean hasUnsyncedData() {
        Long count = this.count(new LambdaQueryWrapper<DataChangeLog>()
                .eq(DataChangeLog::getSyncStatus, 0));
        return count > 0;
    }
}

5.2 DataSyncServiceImpl.java(内网端核心)

package com.zskk.pacsonline.modules.sync.service.impl;

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.zskk.pacsonline.component.response.RestResult;
import com.zskk.pacsonline.modules.sync.dto.DataReceiveResult;
import com.zskk.pacsonline.modules.sync.dto.SyncDataItem;
import com.zskk.pacsonline.modules.sync.dto.SyncDataRequest;
import com.zskk.pacsonline.modules.sync.entity.DataChangeLog;
import com.zskk.pacsonline.modules.sync.entity.SyncConfig;
import com.zskk.pacsonline.modules.sync.entity.SyncLog;
import com.zskk.pacsonline.modules.sync.mapper.DataChangeLogMapper;
import com.zskk.pacsonline.modules.sync.service.DataChangeLogService;
import com.zskk.pacsonline.modules.sync.service.DataSyncService;
import com.zskk.pacsonline.modules.sync.service.SyncConfigService;
import com.zskk.pacsonline.modules.sync.service.SyncLogService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.*;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;

/**
 * 数据同步服务实现(内网端)
 */
@Service
public class DataSyncServiceImpl implements DataSyncService {

    private static final Logger logger = LoggerFactory.getLogger(DataSyncServiceImpl.class);

    @Autowired
    private DataChangeLogMapper dataChangeLogMapper;

    @Autowired
    private DataChangeLogService dataChangeLogService;

    @Autowired
    private SyncConfigService syncConfigService;

    @Autowired
    private SyncLogService syncLogService;

    @Autowired
    private RestTemplate restTemplate;

    /**
     * 执行数据同步
     */
    @Override
    @Transactional
    public void syncData(SyncConfig config) {
        String batchId = UUID.randomUUID().toString().replace("-", "");
        logger.info("====== 开始同步数据 ======");
        logger.info("租户:{}", config.getTenantId());
        logger.info("批次:{}", batchId);

        try {
            // 1. 查询未同步的数据变更记录
            List<DataChangeLog> changeLogs = dataChangeLogMapper.selectList(
                    new LambdaQueryWrapper<DataChangeLog>()
                            .eq(DataChangeLog::getSyncStatus, 0)
                            .orderByAsc(DataChangeLog::getChangeTime)
                            .last("LIMIT 1000") // 每批次最多1000条
            );

            if (changeLogs.isEmpty()) {
                logger.info("没有需要同步的数据");
                return;
            }

            logger.info("待同步数据:{} 条", changeLogs.size());

            // 2. 按表名分组
            Map<String, List<DataChangeLog>> groupedLogs = changeLogs.stream()
                    .collect(Collectors.groupingBy(DataChangeLog::getTableName));

            // 3. 逐表同步
            int totalSuccess = 0;
            int totalFail = 0;

            for (Map.Entry<String, List<DataChangeLog>> entry : groupedLogs.entrySet()) {
                String tableName = entry.getKey();
                List<DataChangeLog> logs = entry.getValue();

                logger.info("同步表:{}, 记录数:{}", tableName, logs.size());

                int[] result = syncTable(config, batchId, tableName, logs);
                totalSuccess += result[0];
                totalFail += result[1];
            }

            // 4. 更新最后同步时间
            config.setLastSyncTime(LocalDateTime.now());
            syncConfigService.updateById(config);

            logger.info("====== 同步完成 ======");
            logger.info("成功:{} 条, 失败:{} 条", totalSuccess, totalFail);

        } catch (Exception e) {
            logger.error("同步失败,批次:" + batchId, e);
        }
    }

    /**
     * 同步单个表的数据
     * @return [成功数, 失败数]
     */
    private int[] syncTable(SyncConfig config, String batchId,
                            String tableName, List<DataChangeLog> logs) {

        SyncLog syncLog = new SyncLog();
        syncLog.setTenantId(config.getTenantId());
        syncLog.setBatchId(batchId);
        syncLog.setTableName(tableName);
        syncLog.setRecordCount(logs.size());
        syncLog.setSyncStatus("processing");
        syncLog.setStartTime(LocalDateTime.now());
        syncLog.setRetryCount(0);
        syncLogService.save(syncLog);

        int successCount = 0;
        int failCount = 0;
        List<DataChangeLog> failedLogs = new ArrayList<>();

        try {
            // 构建同步数据请求
            SyncDataRequest request = new SyncDataRequest();
            request.setTenantId(config.getTenantId());
            request.setTableName(tableName);
            request.setChangeLogs(logs.stream().map(log -> {
                SyncDataItem item = new SyncDataItem();
                item.setPrimaryKey(log.getPrimaryKey());
                item.setOperationType(log.getOperationType());
                item.setBeforeData(log.getBeforeData());
                item.setAfterData(log.getAfterData());
                item.setChangeTime(log.getChangeTime());
                return item;
            }).collect(Collectors.toList()));

            // 发送HTTP请求到公网
            HttpHeaders headers = new HttpHeaders();
            headers.setContentType(MediaType.APPLICATION_JSON);
            headers.setBearerAuth(config.getAccessToken());

            HttpEntity<SyncDataRequest> entity = new HttpEntity<>(request, headers);

            String url = config.getCloudApiUrl() + "/api/sync/receiveData";
            logger.info("发送请求:{}", url);

            ResponseEntity<RestResult> response = restTemplate.exchange(
                    url,
                    HttpMethod.POST,
                    entity,
                    RestResult.class
            );

            logger.info("响应状态:{}", response.getStatusCode());

            if (response.getStatusCode() == HttpStatus.OK &&
                    response.getBody() != null &&
                    response.getBody().getCode() == 200) {

                // 同步成功,更新状态
                successCount = logs.size();
                for (DataChangeLog log : logs) {
                    log.setSyncStatus((byte) 1);
                    log.setSyncTime(LocalDateTime.now());
                    log.setSyncBatchId(batchId);
                    dataChangeLogMapper.updateById(log);
                }

                syncLog.setSyncStatus("success");
                logger.info("同步成功:{} 条", successCount);

            } else {
                // 同步失败
                failCount = logs.size();
                failedLogs.addAll(logs);
                syncLog.setSyncStatus("failed");
                syncLog.setErrorMsg("公网返回错误:" + (response.getBody() != null ? response.getBody().getMessage() : "未知错误"));
                logger.error("同步失败:{}", syncLog.getErrorMsg());
            }

        } catch (Exception e) {
            logger.error("同步表 " + tableName + " 失败", e);
            failCount = logs.size();
            failedLogs.addAll(logs);

            // 标记为同步失败
            for (DataChangeLog log : logs) {
                log.setSyncStatus((byte) 2);
                dataChangeLogMapper.updateById(log);
            }

            syncLog.setSyncStatus("failed");
            syncLog.setErrorMsg(e.getMessage());
        }

        // 更新同步日志
        syncLog.setSuccessCount(successCount);
        syncLog.setFailCount(failCount);
        syncLog.setEndTime(LocalDateTime.now());
        syncLog.setUseTime(
                Duration.between(syncLog.getStartTime(), syncLog.getEndTime()).toMillis()
        );

        if (!failedLogs.isEmpty()) {
            syncLog.setDataSnapshot(JSON.toJSONString(failedLogs));
        }

        syncLogService.updateById(syncLog);

        return new int[]{successCount, failCount};
    }

    /**
     * 手动触发同步
     */
    @Override
    public void manualSync() {
        SyncConfig config = syncConfigService.getOne(
                new LambdaQueryWrapper<SyncConfig>()
                        .eq(SyncConfig::getEnabled, 1)
                        .last("LIMIT 1")
        );

        if (config == null) {
            logger.warn("未找到启用的同步配置");
            return;
        }

        syncData(config);
    }

    /**
     * 检查是否有未同步的数据
     */
    @Override
    public boolean hasUnsyncedData() {
        return dataChangeLogService.hasUnsyncedData();
    }
}

5.3 DataReceiveServiceImpl.java(公网端核心)

package com.zskk.pacsonline.modules.sync.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zskk.pacsonline.modules.sync.dto.DataReceiveResult;
import com.zskk.pacsonline.modules.sync.dto.SyncDataItem;
import com.zskk.pacsonline.modules.sync.dto.SyncDataRequest;
import com.zskk.pacsonline.modules.sync.entity.TenantInfo;
import com.zskk.pacsonline.modules.sync.service.DataReceiveService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.ArrayList;
import java.util.List;

/**
 * 数据接收服务实现(公网端)
 */
@Service
public class DataReceiveServiceImpl implements DataReceiveService {

    private static final Logger logger = LoggerFactory.getLogger(DataReceiveServiceImpl.class);

    @Autowired
    private JdbcTemplate jdbcTemplate;

    /**
     * 接收并保存数据
     */
    @Override
    @Transactional
    public DataReceiveResult receiveData(TenantInfo tenant, SyncDataRequest request) {

        logger.info("接收数据:租户={}, 表={}, 记录数={}",
                tenant.getTenantId(), request.getTableName(), request.getChangeLogs().size());

        DataReceiveResult result = new DataReceiveResult();
        result.setSuccessCount(0);
        result.setFailCount(0);

        String tableName = request.getTableName();
        // 多租户数据隔离:添加租户前缀
        String targetTableName = tenant.getTenantId() + "_" + tableName;

        // 确保目标表存在
        ensureTableExists(targetTableName, tableName);

        // 处理每条数据
        for (SyncDataItem item : request.getChangeLogs()) {
            try {
                switch (item.getOperationType()) {
                    case "INSERT":
                        insertData(targetTableName, item.getAfterData());
                        break;
                    case "UPDATE":
                        updateData(targetTableName, item.getPrimaryKey(), item.getAfterData());
                        break;
                    case "DELETE":
                        deleteData(targetTableName, item.getPrimaryKey());
                        break;
                    default:
                        logger.warn("未知的操作类型:{}", item.getOperationType());
                        continue;
                }
                result.setSuccessCount(result.getSuccessCount() + 1);

            } catch (Exception e) {
                logger.error("处理数据失败:" + item, e);
                result.setFailCount(result.getFailCount() + 1);
                result.addFailedItem(item, e.getMessage());
            }
        }

        logger.info("接收完成:成功={}, 失败={}", result.getSuccessCount(), result.getFailCount());

        return result;
    }

    /**
     * 确保目标表存在
     */
    private void ensureTableExists(String targetTableName, String sourceTableName) {
        String checkSql = "SELECT COUNT(*) FROM information_schema.tables " +
                "WHERE table_schema = DATABASE() AND table_name = ?";

        Integer count = jdbcTemplate.queryForObject(checkSql, Integer.class, targetTableName);

        if (count == null || count == 0) {
            // 表不存在,根据源表结构创建
            String createSql = "CREATE TABLE " + targetTableName + " LIKE " + sourceTableName;
            jdbcTemplate.execute(createSql);
            logger.info("创建租户表:{}", targetTableName);
        }
    }

    /**
     * 插入数据
     */
    private void insertData(String tableName, String jsonData) {
        if (jsonData == null) {
            return;
        }

        JSONObject data = JSON.parseObject(jsonData);

        StringBuilder columns = new StringBuilder();
        StringBuilder values = new StringBuilder();
        List<Object> params = new ArrayList<>();

        for (String key : data.keySet()) {
            if (columns.length() > 0) {
                columns.append(", ");
                values.append(", ");
            }
            columns.append("`").append(key).append("`");
            values.append("?");
            params.add(data.get(key));
        }

        String sql = "INSERT INTO " + tableName + " (" + columns + ") VALUES (" + values + ")";
        jdbcTemplate.update(sql, params.toArray());

        logger.debug("插入数据:表={}, SQL={}", tableName, sql);
    }

    /**
     * 更新数据
     */
    private void updateData(String tableName, String primaryKey, String jsonData) {
        if (jsonData == null) {
            return;
        }

        JSONObject data = JSON.parseObject(jsonData);

        StringBuilder sets = new StringBuilder();
        List<Object> params = new ArrayList<>();

        Object pkValue = null;
        for (String key : data.keySet()) {
            if ("id".equals(key)) {
                pkValue = data.get(key);
                continue;
            }

            if (sets.length() > 0) {
                sets.append(", ");
            }
            sets.append("`").append(key).append("` = ?");
            params.add(data.get(key));
        }

        params.add(pkValue);

        String sql = "UPDATE " + tableName + " SET " + sets + " WHERE id = ?";
        jdbcTemplate.update(sql, params.toArray());

        logger.debug("更新数据:表={}, 主键={}", tableName, primaryKey);
    }

    /**
     * 删除数据
     */
    private void deleteData(String tableName, String primaryKey) {
        String sql = "DELETE FROM " + tableName + " WHERE id = ?";
        jdbcTemplate.update(sql, primaryKey);

        logger.debug("删除数据:表={}, 主键={}", tableName, primaryKey);
    }
}

6. 控制器

6.1 SyncController.java(公网端)

package com.zskk.pacsonline.modules.sync.controller;

import com.zskk.pacsonline.component.response.RestResult;
import com.zskk.pacsonline.modules.sync.dto.DataReceiveResult;
import com.zskk.pacsonline.modules.sync.dto.SyncDataRequest;
import com.zskk.pacsonline.modules.sync.entity.TenantInfo;
import com.zskk.pacsonline.modules.sync.service.DataReceiveService;
import com.zskk.pacsonline.modules.sync.service.TenantInfoService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;

/**
 * 数据同步控制器(公网端)
 */
@RestController
@RequestMapping("/api/sync")
public class SyncController {

    private static final Logger logger = LoggerFactory.getLogger(SyncController.class);

    @Autowired
    private TenantInfoService tenantInfoService;

    @Autowired
    private DataReceiveService dataReceiveService;

    /**
     * 接收内网同步的数据
     */
    @PostMapping("/receiveData")
    public RestResult<?> receiveData(@RequestBody SyncDataRequest request,
                                     @RequestHeader("Authorization") String token) {

        try {
            // 1. 验证令牌
            if (token == null || !token.startsWith("Bearer ")) {
                return RestResult.error("无效的访问令牌");
            }

            String accessToken = token.substring(7);

            // 2. 验证租户
            TenantInfo tenant = tenantInfoService.validateToken(request.getTenantId(), accessToken);

            if (tenant == null) {
                logger.warn("租户验证失败:{}", request.getTenantId());
                return RestResult.error("租户验证失败");
            }

            if (tenant.getStatus() == 0) {
                return RestResult.error("租户已被禁用");
            }

            // 3. 检查存储空间
            if (tenant.getMaxStorage() != null &&
                    tenant.getUsedStorage() != null &&
                    tenant.getUsedStorage() >= tenant.getMaxStorage()) {
                return RestResult.error("存储空间已满");
            }

            // 4. 接收并保存数据
            DataReceiveResult result = dataReceiveService.receiveData(tenant, request);

            // 5. 更新租户最后同步时间
            tenant.setLastSyncTime(LocalDateTime.now());
            tenantInfoService.updateById(tenant);

            logger.info("接收数据成功:租户={}, 表={}, 记录数={}, 成功={}, 失败={}",
                    request.getTenantId(),
                    request.getTableName(),
                    request.getChangeLogs().size(),
                    result.getSuccessCount(),
                    result.getFailCount());

            return RestResult.ok("同步成功", result);

        } catch (Exception e) {
            logger.error("接收数据失败", e);
            return RestResult.error("同步失败:" + e.getMessage());
        }
    }

    /**
     * 查询同步状态
     */
    @GetMapping("/status/{tenantId}")
    public RestResult<?> getSyncStatus(@PathVariable String tenantId,
                                       @RequestHeader("Authorization") String token) {

        // 验证租户
        String accessToken = token.substring(7);
        TenantInfo tenant = tenantInfoService.validateToken(tenantId, accessToken);

        if (tenant == null) {
            return RestResult.error("租户验证失败");
        }

        // 查询同步状态
        Map<String, Object> status = new HashMap<>();
        status.put("tenantId", tenant.getTenantId());
        status.put("tenantName", tenant.getTenantName());
        status.put("lastSyncTime", tenant.getLastSyncTime());
        status.put("usedStorage", tenant.getUsedStorage());
        status.put("maxStorage", tenant.getMaxStorage());
        status.put("status", tenant.getStatus());

        return RestResult.ok(status);
    }
}

6.2 SyncManageController.java(内网端)

package com.zskk.pacsonline.modules.sync.controller;

import com.zskk.pacsonline.component.response.RestResult;
import com.zskk.pacsonline.modules.sync.service.DataSyncService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

/**
 * 数据同步管理控制器(内网端)
 */
@RestController
@RequestMapping("/api/syncManage")
public class SyncManageController {

    @Autowired
    private DataSyncService dataSyncService;

    /**
     * 手动触发同步
     */
    @PostMapping("/manual")
    public RestResult<?> manualSync() {
        try {
            dataSyncService.manualSync();
            return RestResult.ok("同步任务已启动");
        } catch (Exception e) {
            return RestResult.error("同步失败:" + e.getMessage());
        }
    }

    /**
     * 检查未同步数据
     */
    @GetMapping("/checkUnsynced")
    public RestResult<?> checkUnsynced() {
        boolean hasUnsynced = dataSyncService.hasUnsyncedData();
        return RestResult.ok(hasUnsynced ? "有未同步的数据" : "所有数据已同步");
    }
}

7. AOP拦截器

7.1 DataChangeCaptureAop.java

package com.zskk.pacsonline.modules.sync.aop;

import com.zskk.pacsonline.modules.sync.service.DataChangeLogService;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 数据变更捕获AOP
 * 拦截所有Mapper的insert/update/delete操作
 */
@Aspect
@Component
public class DataChangeCaptureAop {

    private static final Logger logger = LoggerFactory.getLogger(DataChangeCaptureAop.class);

    @Autowired
    private DataChangeLogService dataChangeLogService;

    /**
     * 拦截Mapper的insert/update/delete方法
     */
    @Around("execution(* com.zskk.pacsonline.modules.*.mapper.*Mapper.insert*(..)) || " +
            "execution(* com.zskk.pacsonline.modules.*.mapper.*Mapper.update*(..)) || " +
            "execution(* com.zskk.pacsonline.modules.*.mapper.*Mapper.delete*(..))")
    public Object captureDataChange(ProceedingJoinPoint jp) throws Throwable {

        // 判断操作类型
        String methodName = jp.getSignature().getName();
        String operationType = getOperationType(methodName);

        // 获取表名
        String tableName = getTableName(jp);

        // 获取实体对象
        Object entity = getEntity(jp.getArgs());

        // 执行原方法
        Object result = jp.proceed();

        // 记录变更日志(异步)
        if (entity != null && tableName != null) {
            try {
                dataChangeLogService.recordChangeAsync(tableName, operationType, entity);
            } catch (Exception e) {
                logger.error("记录数据变更失败", e);
            }
        }

        return result;
    }

    /**
     * 获取操作类型
     */
    private String getOperationType(String methodName) {
        if (methodName.startsWith("insert")) {
            return "INSERT";
        } else if (methodName.startsWith("update")) {
            return "UPDATE";
        } else if (methodName.startsWith("delete")) {
            return "DELETE";
        }
        return "UNKNOWN";
    }

    /**
     * 获取表名
     */
    private String getTableName(ProceedingJoinPoint jp) {
        try {
            // 从Mapper接口名推断表名
            String className = jp.getTarget().getClass().getSimpleName();
            // 例如:SysUserMapper -> sys_user
            String entityName = className.replace("Mapper", "")
                    .replaceAll("([A-Z])", "_$1")
                    .toLowerCase()
                    .substring(1);
            return entityName;
        } catch (Exception e) {
            logger.warn("获取表名失败", e);
            return null;
        }
    }

    /**
     * 获取实体对象
     */
    private Object getEntity(Object[] args) {
        if (args == null || args.length == 0) {
            return null;
        }

        // 第一个参数通常是实体对象
        for (Object arg : args) {
            if (arg != null && !isPrimitiveOrWrapper(arg.getClass())) {
                return arg;
            }
        }

        return null;
    }

    /**
     * 判断是否是基本类型或包装类
     */
    private boolean isPrimitiveOrWrapper(Class<?> clazz) {
        return clazz.isPrimitive() ||
                clazz == String.class ||
                clazz == Integer.class ||
                clazz == Long.class ||
                clazz == Double.class ||
                clazz == Float.class ||
                clazz == Boolean.class ||
                clazz == Byte.class ||
                clazz == Short.class ||
                clazz == Character.class;
    }
}

8. 定时任务

8.1 DataSyncScheduler.java

package com.zskk.pacsonline.modules.sync.scheduler;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.zskk.pacsonline.modules.sync.entity.SyncConfig;
import com.zskk.pacsonline.modules.sync.service.DataSyncService;
import com.zskk.pacsonline.modules.sync.service.SyncConfigService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
 * 数据同步定时任务
 */
@Component
public class DataSyncScheduler {

    private static final Logger logger = LoggerFactory.getLogger(DataSyncScheduler.class);

    @Autowired
    private SyncConfigService syncConfigService;

    @Autowired
    private DataSyncService dataSyncService;

    /**
     * 定时检查并执行同步任务
     * 每分钟检查一次
     */
    @Scheduled(cron = "0 * * * * ?")
    public void checkAndSync() {
        logger.debug("检查同步任务...");

        // 获取启用的同步配置
        List<SyncConfig> configs = syncConfigService.list(
                new LambdaQueryWrapper<SyncConfig>()
                        .eq(SyncConfig::getEnabled, 1)
        );

        if (configs.isEmpty()) {
            return;
        }

        for (SyncConfig config : configs) {
            // 判断是否需要同步
            if (shouldSync(config)) {
                // 异步执行同步任务
                CompletableFuture.runAsync(() -> {
                    try {
                        dataSyncService.syncData(config);
                    } catch (Exception e) {
                        logger.error("同步任务执行失败", e);
                    }
                });
            }
        }
    }

    /**
     * 判断是否需要同步
     */
    private boolean shouldSync(SyncConfig config) {
        String syncMode = config.getSyncMode();

        if ("manual".equals(syncMode)) {
            return false; // 手动模式不自动同步
        }

        if ("auto".equals(syncMode)) {
            // 实时模式:检查是否有未同步的数据
            return dataSyncService.hasUnsyncedData();
        }

        if ("schedule".equals(syncMode)) {
            // 定时模式:判断是否到达同步时间
            LocalDateTime lastSyncTime = config.getLastSyncTime();
            if (lastSyncTime == null) {
                return true; // 从未同步过
            }

            Integer interval = config.getSyncInterval();
            if (interval == null || interval <= 0) {
                return false;
            }

            LocalDateTime nextSyncTime = lastSyncTime.plusSeconds(interval);
            return LocalDateTime.now().isAfter(nextSyncTime);
        }

        return false;
    }
}

9. 工具类

9.1 DataEncryptUtil.java

package com.zskk.pacsonline.utils;

import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
import java.util.Base64;

/**
 * 数据加密工具类
 */
public class DataEncryptUtil {

    private static final String ALGORITHM = "AES";
    private static final String TRANSFORMATION = "AES/ECB/PKCS5Padding";

    /**
     * AES加密
     */
    public static String encrypt(String data, String secretKey) {
        try {
            // 密钥长度必须是16/24/32字节
            byte[] keyBytes = secretKey.getBytes();
            if (keyBytes.length != 16 && keyBytes.length != 24 && keyBytes.length != 32) {
                throw new IllegalArgumentException("密钥长度必须是16、24或32字节");
            }

            SecretKeySpec key = new SecretKeySpec(keyBytes, ALGORITHM);
            Cipher cipher = Cipher.getInstance(TRANSFORMATION);
            cipher.init(Cipher.ENCRYPT_MODE, key);

            byte[] encrypted = cipher.doFinal(data.getBytes("UTF-8"));
            return Base64.getEncoder().encodeToString(encrypted);

        } catch (Exception e) {
            throw new RuntimeException("加密失败", e);
        }
    }

    /**
     * AES解密
     */
    public static String decrypt(String encryptedData, String secretKey) {
        try {
            byte[] keyBytes = secretKey.getBytes();
            if (keyBytes.length != 16 && keyBytes.length != 24 && keyBytes.length != 32) {
                throw new IllegalArgumentException("密钥长度必须是16、24或32字节");
            }

            SecretKeySpec key = new SecretKeySpec(keyBytes, ALGORITHM);
            Cipher cipher = Cipher.getInstance(TRANSFORMATION);
            cipher.init(Cipher.DECRYPT_MODE, key);

            byte[] decrypted = cipher.doFinal(Base64.getDecoder().decode(encryptedData));
            return new String(decrypted, "UTF-8");

        } catch (Exception e) {
            throw new RuntimeException("解密失败", e);
        }
    }

    /**
     * 生成随机密钥(32字节)
     */
    public static String generateKey() {
        String chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
        StringBuilder key = new StringBuilder();
        for (int i = 0; i < 32; i++) {
            int index = (int) (Math.random() * chars.length());
            key.append(chars.charAt(index));
        }
        return key.toString();
    }
}

10. 配置类

10.1 RestTemplateConfig.java

package com.zskk.pacsonline.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;

/**
 * RestTemplate配置
 */
@Configuration
public class RestTemplateConfig {

    @Bean
    public RestTemplate restTemplate() {
        SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
        factory.setConnectTimeout(5000); // 连接超时5秒
        factory.setReadTimeout(30000);   // 读取超时30秒

        return new RestTemplate(factory);
    }
}

11. 使用说明

11.1 内网端部署步骤

  1. 添加依赖(pom.xml已包含)

  2. 配置application.yml

    sync:
    enabled: true
    tenant-id: tenant_001
    tenant-name: 内网环境A
    cloud-api-url: https://cloud.example.com
    access-token: your_jwt_token_here
    sync-mode: auto
    sync-interval: 300
    sync-tables: '["sys_user","sys_role","sys_menu"]'
    
    1. 执行SQL脚本 bash mysql -u root -p your_database < doc/sql/sync_tables.sql
  3. 启动应用

  4. 数据变更会自动记录到data_change_log

  5. 定时任务每分钟检查一次,自动同步到公网

11.2 公网端部署步骤

  1. 配置application.yml

    multi-tenant:
    enabled: true
    
    1. 执行SQL脚本,创建租户 sql INSERT INTO tenant_info (tenant_id, tenant_name, access_token, status) VALUES ('tenant_001', '内网环境A', 'your_jwt_token_here', 1);
  2. 启动应用

  3. 等待内网端推送数据

  4. 数据会自动保存到tenant_001_表名的表中


文档版本:v1.0 最后更新:2025-10-30