数据同步解决方案.md 38 KB

内网局域网与公网数据同步解决方案

1. 架构设计概述

1.1 部署架构

┌─────────────────────────────────────────────────────────────────┐
│                          公网环境                                 │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │  公网应用服务器 (Cloud Server)                             │   │
│  │  - 接收数据同步请求                                        │   │
│  │  - 提供数据查询接口                                        │   │
│  │  - 多租户数据隔离                                          │   │
│  │  - 同步状态监控                                            │   │
│  └──────────────────────────────────────────────────────────┘   │
│                           ↑                                      │
│                           │ HTTPS + 令牌认证                      │
│                           │                                      │
└───────────────────────────┼──────────────────────────────────────┘
                            │
                            │ 防火墙/NAT
                            │
┌───────────────────────────┼──────────────────────────────────────┐
│                          内网环境 A                                │
│  ┌────────────────────────┼──────────────────────────────────┐   │
│  │  内网应用服务器         │                                   │   │
│  │  - 业务数据产生        │                                   │   │
│  │  - 定时同步任务 ───────┘                                   │   │
│  │  - 增量数据捕获                                            │   │
│  │  - 断点续传支持                                            │   │
│  └──────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────┐
│                          内网环境 B                                │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │  内网应用服务器                                            │   │
│  │  - 独立租户标识                                            │   │
│  │  - 自动数据同步                                            │   │
│  └──────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘

1.2 核心设计原则

  1. 内网主动推送:内网服务器主动向公网推送数据(因为公网无法主动访问内网)
  2. 增量同步:只同步变更的数据,减少网络传输
  3. 多租户隔离:公网支持多个内网环境的数据隔离存储
  4. 安全传输:HTTPS + JWT令牌 + 数据加密
  5. 断点续传:支持网络中断后继续同步
  6. 可配置策略:支持实时同步、定时同步、手动同步

2. 技术方案选型

2.1 推荐方案:增量数据推送 + CDC

核心技术栈:
- 数据捕获:Canal (阿里开源MySQL binlog解析工具) / 触发器 / AOP拦截
- 数据传输:HTTPS RESTful API
- 消息队列:RocketMQ / RabbitMQ (可选,异步处理)
- 任务调度:XXL-Job / Spring @Scheduled
- 数据加密:AES-256
- 认证授权:JWT Token

2.2 方案对比

方案 优点 缺点 适用场景
增量推送(推荐) 实时性好、网络压力小、可控性强 需要开发同步逻辑 数据量大、实时性要求高
数据库复制 实现简单、一致性好 需要开通数据库端口、安全风险高 内网可直连公网数据库
定时全量导出 实现简单、可靠性高 数据重复、效率低 数据量小、实时性要求低
消息队列同步 解耦性好、高可用 复杂度高、需要中间件 高并发、分布式场景

3. 数据库设计

3.1 同步配置表 (sync_config)

CREATE TABLE `sync_config` (
  `id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '配置ID',
  `tenant_id` VARCHAR(50) NOT NULL COMMENT '租户ID(区分不同内网环境)',
  `tenant_name` VARCHAR(100) DEFAULT NULL COMMENT '租户名称',
  `sync_mode` VARCHAR(20) NOT NULL DEFAULT 'auto' COMMENT '同步模式:auto-自动,manual-手动,schedule-定时',
  `sync_interval` INT(11) DEFAULT 300 COMMENT '同步间隔(秒),定时模式使用',
  `cron_expression` VARCHAR(100) DEFAULT NULL COMMENT 'Cron表达式,定时模式使用',
  `sync_tables` TEXT COMMENT '需要同步的表名列表,JSON数组格式',
  `cloud_api_url` VARCHAR(255) NOT NULL COMMENT '公网API地址',
  `access_token` VARCHAR(500) NOT NULL COMMENT '访问令牌',
  `secret_key` VARCHAR(100) DEFAULT NULL COMMENT '数据加密密钥',
  `enabled` TINYINT(1) DEFAULT 1 COMMENT '是否启用:0-禁用,1-启用',
  `retry_times` INT(11) DEFAULT 3 COMMENT '失败重试次数',
  `timeout` INT(11) DEFAULT 30000 COMMENT '请求超时时间(毫秒)',
  `last_sync_time` DATETIME DEFAULT NULL COMMENT '最后同步时间',
  `remark` VARCHAR(500) DEFAULT NULL COMMENT '备注',
  `create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_tenant_id` (`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据同步配置表';

3.2 同步日志表 (sync_log)

CREATE TABLE `sync_log` (
  `id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '日志ID',
  `tenant_id` VARCHAR(50) NOT NULL COMMENT '租户ID',
  `batch_id` VARCHAR(50) NOT NULL COMMENT '批次ID(同一次同步任务的唯一标识)',
  `table_name` VARCHAR(100) NOT NULL COMMENT '同步的表名',
  `operation_type` VARCHAR(20) NOT NULL COMMENT '操作类型:INSERT/UPDATE/DELETE',
  `record_count` INT(11) DEFAULT 0 COMMENT '本次同步记录数',
  `success_count` INT(11) DEFAULT 0 COMMENT '成功数量',
  `fail_count` INT(11) DEFAULT 0 COMMENT '失败数量',
  `sync_status` VARCHAR(20) NOT NULL COMMENT '同步状态:pending-待同步,processing-同步中,success-成功,failed-失败',
  `start_time` DATETIME NOT NULL COMMENT '开始时间',
  `end_time` DATETIME DEFAULT NULL COMMENT '结束时间',
  `use_time` BIGINT(20) DEFAULT NULL COMMENT '耗时(毫秒)',
  `error_msg` TEXT DEFAULT NULL COMMENT '错误信息',
  `data_snapshot` LONGTEXT DEFAULT NULL COMMENT '数据快照(JSON格式,仅记录失败的数据)',
  `retry_count` INT(11) DEFAULT 0 COMMENT '重试次数',
  `create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  PRIMARY KEY (`id`),
  KEY `idx_tenant_id` (`tenant_id`),
  KEY `idx_batch_id` (`batch_id`),
  KEY `idx_sync_status` (`sync_status`),
  KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据同步日志表';

3.3 数据变更记录表 (data_change_log)

CREATE TABLE `data_change_log` (
  `id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '变更ID',
  `table_name` VARCHAR(100) NOT NULL COMMENT '表名',
  `primary_key` VARCHAR(100) NOT NULL COMMENT '主键值',
  `operation_type` VARCHAR(20) NOT NULL COMMENT '操作类型:INSERT/UPDATE/DELETE',
  `before_data` LONGTEXT DEFAULT NULL COMMENT '变更前数据(JSON格式)',
  `after_data` LONGTEXT DEFAULT NULL COMMENT '变更后数据(JSON格式)',
  `change_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '变更时间',
  `sync_status` TINYINT(1) DEFAULT 0 COMMENT '同步状态:0-未同步,1-已同步,2-同步失败',
  `sync_time` DATETIME DEFAULT NULL COMMENT '同步时间',
  `sync_batch_id` VARCHAR(50) DEFAULT NULL COMMENT '同步批次ID',
  `operator_id` BIGINT(20) DEFAULT NULL COMMENT '操作人ID',
  `operator_name` VARCHAR(50) DEFAULT NULL COMMENT '操作人姓名',
  PRIMARY KEY (`id`),
  KEY `idx_table_name` (`table_name`),
  KEY `idx_sync_status` (`sync_status`),
  KEY `idx_change_time` (`change_time`),
  KEY `idx_sync_batch_id` (`sync_batch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据变更记录表';

3.4 租户信息表 (tenant_info) - 仅公网

CREATE TABLE `tenant_info` (
  `id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '租户ID',
  `tenant_id` VARCHAR(50) NOT NULL COMMENT '租户唯一标识',
  `tenant_name` VARCHAR(100) NOT NULL COMMENT '租户名称',
  `tenant_type` VARCHAR(20) DEFAULT 'intranet' COMMENT '租户类型:intranet-内网,cloud-公网',
  `access_token` VARCHAR(500) NOT NULL COMMENT '访问令牌',
  `secret_key` VARCHAR(100) DEFAULT NULL COMMENT '数据加密密钥',
  `ip_whitelist` TEXT DEFAULT NULL COMMENT 'IP白名单,JSON数组格式',
  `max_storage` BIGINT(20) DEFAULT NULL COMMENT '最大存储空间(字节)',
  `used_storage` BIGINT(20) DEFAULT 0 COMMENT '已用存储空间(字节)',
  `status` TINYINT(1) DEFAULT 1 COMMENT '状态:0-禁用,1-启用',
  `expire_time` DATETIME DEFAULT NULL COMMENT '过期时间',
  `last_sync_time` DATETIME DEFAULT NULL COMMENT '最后同步时间',
  `create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_tenant_id` (`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='租户信息表';

4. 核心功能实现

4.1 数据变更捕获(内网端)

方案一:AOP拦截(推荐,简单易用)

/**
 * 数据变更拦截器
 * 拦截所有Service层的增删改操作,自动记录到data_change_log表
 */
@Aspect
@Component
public class DataChangeCaptureAop {

    @Autowired
    private DataChangeLogService dataChangeLogService;

    /**
     * 拦截所有Mapper的insert/update/delete方法
     */
    @Around("execution(* com.zskk.pacsonline.modules.*.mapper.*Mapper.insert*(..)) || " +
            "execution(* com.zskk.pacsonline.modules.*.mapper.*Mapper.update*(..)) || " +
            "execution(* com.zskk.pacsonline.modules.*.mapper.*Mapper.delete*(..))")
    public Object captureDataChange(ProceedingJoinPoint jp) throws Throwable {
        // 判断操作类型
        String methodName = jp.getSignature().getName();
        String operationType = getOperationType(methodName);

        // 获取表名和实体数据
        String tableName = getTableName(jp);
        Object entity = getEntity(jp.getArgs());

        // 执行原方法
        Object result = jp.proceed();

        // 记录变更日志(异步)
        dataChangeLogService.recordChange(tableName, operationType, entity);

        return result;
    }
}

方案二:数据库触发器(无侵入,但性能影响大)

-- 以sys_user表为例
DELIMITER $$

CREATE TRIGGER tr_sys_user_insert
AFTER INSERT ON sys_user
FOR EACH ROW
BEGIN
    INSERT INTO data_change_log (
        table_name, primary_key, operation_type, after_data, change_time
    ) VALUES (
        'sys_user',
        NEW.id,
        'INSERT',
        JSON_OBJECT(
            'id', NEW.id,
            'username', NEW.username,
            'nickname', NEW.nickname
            -- 其他字段...
        ),
        NOW()
    );
END$$

CREATE TRIGGER tr_sys_user_update
AFTER UPDATE ON sys_user
FOR EACH ROW
BEGIN
    INSERT INTO data_change_log (
        table_name, primary_key, operation_type,
        before_data, after_data, change_time
    ) VALUES (
        'sys_user',
        NEW.id,
        'UPDATE',
        JSON_OBJECT('id', OLD.id, 'username', OLD.username, ...),
        JSON_OBJECT('id', NEW.id, 'username', NEW.username, ...),
        NOW()
    );
END$$

CREATE TRIGGER tr_sys_user_delete
AFTER DELETE ON sys_user
FOR EACH ROW
BEGIN
    INSERT INTO data_change_log (
        table_name, primary_key, operation_type, before_data, change_time
    ) VALUES (
        'sys_user',
        OLD.id,
        'DELETE',
        JSON_OBJECT('id', OLD.id, 'username', OLD.username, ...),
        NOW()
    );
END$$

DELIMITER ;

4.2 定时同步任务(内网端)

/**
 * 数据同步定时任务
 */
@Component
public class DataSyncScheduler {

    @Autowired
    private SyncConfigService syncConfigService;

    @Autowired
    private DataSyncService dataSyncService;

    /**
     * 定时检查并执行同步任务
     * 每分钟检查一次
     */
    @Scheduled(cron = "0 * * * * ?")
    public void checkAndSync() {
        // 获取启用的同步配置
        List<SyncConfig> configs = syncConfigService.getEnabledConfigs();

        for (SyncConfig config : configs) {
            // 判断是否需要同步
            if (shouldSync(config)) {
                // 异步执行同步任务
                CompletableFuture.runAsync(() -> {
                    dataSyncService.syncData(config);
                });
            }
        }
    }

    /**
     * 判断是否需要同步
     */
    private boolean shouldSync(SyncConfig config) {
        if ("manual".equals(config.getSyncMode())) {
            return false; // 手动模式不自动同步
        }

        if ("auto".equals(config.getSyncMode())) {
            // 实时模式:检查是否有未同步的数据
            return dataSyncService.hasUnsyncedData();
        }

        if ("schedule".equals(config.getSyncMode())) {
            // 定时模式:判断是否到达同步时间
            LocalDateTime lastSyncTime = config.getLastSyncTime();
            if (lastSyncTime == null) {
                return true;
            }

            int interval = config.getSyncInterval();
            LocalDateTime nextSyncTime = lastSyncTime.plusSeconds(interval);
            return LocalDateTime.now().isAfter(nextSyncTime);
        }

        return false;
    }
}

4.3 数据同步核心服务(内网端)

/**
 * 数据同步服务
 */
@Service
public class DataSyncServiceImpl implements DataSyncService {

    @Autowired
    private DataChangeLogMapper dataChangeLogMapper;

    @Autowired
    private SyncLogService syncLogService;

    @Autowired
    private RestTemplate restTemplate;

    private static final Logger logger = LoggerFactory.getLogger(DataSyncServiceImpl.class);

    /**
     * 执行数据同步
     */
    @Override
    @Transactional
    public void syncData(SyncConfig config) {
        String batchId = UUID.randomUUID().toString();
        logger.info("开始同步数据,租户:{},批次:{}", config.getTenantId(), batchId);

        try {
            // 1. 查询未同步的数据变更记录
            List<DataChangeLog> changeLogs = dataChangeLogMapper.selectList(
                new LambdaQueryWrapper<DataChangeLog>()
                    .eq(DataChangeLog::getSyncStatus, 0)
                    .orderByAsc(DataChangeLog::getChangeTime)
                    .last("LIMIT 1000") // 每批次最多1000条
            );

            if (changeLogs.isEmpty()) {
                logger.info("没有需要同步的数据");
                return;
            }

            // 2. 按表名分组
            Map<String, List<DataChangeLog>> groupedLogs = changeLogs.stream()
                .collect(Collectors.groupingBy(DataChangeLog::getTableName));

            // 3. 逐表同步
            for (Map.Entry<String, List<DataChangeLog>> entry : groupedLogs.entrySet()) {
                String tableName = entry.getKey();
                List<DataChangeLog> logs = entry.getValue();

                syncTable(config, batchId, tableName, logs);
            }

            // 4. 更新最后同步时间
            config.setLastSyncTime(LocalDateTime.now());
            syncConfigService.updateById(config);

            logger.info("同步完成,批次:{}", batchId);

        } catch (Exception e) {
            logger.error("同步失败,批次:" + batchId, e);
        }
    }

    /**
     * 同步单个表的数据
     */
    private void syncTable(SyncConfig config, String batchId,
                          String tableName, List<DataChangeLog> logs) {

        SyncLog syncLog = new SyncLog();
        syncLog.setTenantId(config.getTenantId());
        syncLog.setBatchId(batchId);
        syncLog.setTableName(tableName);
        syncLog.setRecordCount(logs.size());
        syncLog.setSyncStatus("processing");
        syncLog.setStartTime(LocalDateTime.now());
        syncLogService.save(syncLog);

        int successCount = 0;
        int failCount = 0;
        List<DataChangeLog> failedLogs = new ArrayList<>();

        try {
            // 构建同步数据
            SyncDataRequest request = new SyncDataRequest();
            request.setTenantId(config.getTenantId());
            request.setTableName(tableName);
            request.setChangeLogs(logs.stream().map(log -> {
                SyncDataItem item = new SyncDataItem();
                item.setPrimaryKey(log.getPrimaryKey());
                item.setOperationType(log.getOperationType());
                item.setBeforeData(log.getBeforeData());
                item.setAfterData(log.getAfterData());
                item.setChangeTime(log.getChangeTime());
                return item;
            }).collect(Collectors.toList()));

            // 发送HTTP请求到公网
            HttpHeaders headers = new HttpHeaders();
            headers.setContentType(MediaType.APPLICATION_JSON);
            headers.setBearerAuth(config.getAccessToken());

            HttpEntity<SyncDataRequest> entity = new HttpEntity<>(request, headers);

            ResponseEntity<RestResult> response = restTemplate.exchange(
                config.getCloudApiUrl() + "/api/sync/receiveData",
                HttpMethod.POST,
                entity,
                RestResult.class
            );

            if (response.getStatusCode() == HttpStatus.OK &&
                response.getBody() != null &&
                response.getBody().getCode() == 200) {

                // 同步成功,更新状态
                successCount = logs.size();
                for (DataChangeLog log : logs) {
                    log.setSyncStatus((byte) 1);
                    log.setSyncTime(LocalDateTime.now());
                    log.setSyncBatchId(batchId);
                    dataChangeLogMapper.updateById(log);
                }

                syncLog.setSyncStatus("success");

            } else {
                // 同步失败
                failCount = logs.size();
                failedLogs.addAll(logs);
                syncLog.setSyncStatus("failed");
                syncLog.setErrorMsg("公网返回错误:" + response.getBody());
            }

        } catch (Exception e) {
            logger.error("同步表 " + tableName + " 失败", e);
            failCount = logs.size();
            failedLogs.addAll(logs);
            syncLog.setSyncStatus("failed");
            syncLog.setErrorMsg(e.getMessage());
        }

        // 更新同步日志
        syncLog.setSuccessCount(successCount);
        syncLog.setFailCount(failCount);
        syncLog.setEndTime(LocalDateTime.now());
        syncLog.setUseTime(
            Duration.between(syncLog.getStartTime(), syncLog.getEndTime()).toMillis()
        );

        if (!failedLogs.isEmpty()) {
            syncLog.setDataSnapshot(JSON.toJSONString(failedLogs));
        }

        syncLogService.updateById(syncLog);
    }

    /**
     * 检查是否有未同步的数据
     */
    @Override
    public boolean hasUnsyncedData() {
        Long count = dataChangeLogMapper.selectCount(
            new LambdaQueryWrapper<DataChangeLog>()
                .eq(DataChangeLog::getSyncStatus, 0)
        );
        return count > 0;
    }
}

4.4 公网接收接口(公网端)

/**
 * 数据同步接收接口(公网端)
 */
@RestController
@RequestMapping("/api/sync")
public class SyncController {

    @Autowired
    private TenantInfoService tenantInfoService;

    @Autowired
    private DataReceiveService dataReceiveService;

    private static final Logger logger = LoggerFactory.getLogger(SyncController.class);

    /**
     * 接收内网同步的数据
     */
    @PostMapping("/receiveData")
    public RestResult<?> receiveData(@RequestBody SyncDataRequest request,
                                     @RequestHeader("Authorization") String token) {

        try {
            // 1. 验证租户和令牌
            if (token == null || !token.startsWith("Bearer ")) {
                return RestResult.error("无效的访问令牌");
            }

            String accessToken = token.substring(7);
            TenantInfo tenant = tenantInfoService.validateToken(request.getTenantId(), accessToken);

            if (tenant == null) {
                return RestResult.error("租户验证失败");
            }

            if (tenant.getStatus() == 0) {
                return RestResult.error("租户已被禁用");
            }

            // 2. 检查租户存储空间
            if (tenant.getMaxStorage() != null &&
                tenant.getUsedStorage() >= tenant.getMaxStorage()) {
                return RestResult.error("存储空间已满");
            }

            // 3. 接收并保存数据
            DataReceiveResult result = dataReceiveService.receiveData(tenant, request);

            // 4. 更新租户最后同步时间
            tenant.setLastSyncTime(LocalDateTime.now());
            tenantInfoService.updateById(tenant);

            logger.info("接收数据成功,租户:{},表:{},记录数:{}",
                request.getTenantId(), request.getTableName(), request.getChangeLogs().size());

            return RestResult.ok("同步成功", result);

        } catch (Exception e) {
            logger.error("接收数据失败", e);
            return RestResult.error("同步失败:" + e.getMessage());
        }
    }

    /**
     * 查询同步状态
     */
    @GetMapping("/status/{tenantId}")
    public RestResult<?> getSyncStatus(@PathVariable String tenantId,
                                       @RequestHeader("Authorization") String token) {

        // 验证租户
        String accessToken = token.substring(7);
        TenantInfo tenant = tenantInfoService.validateToken(tenantId, accessToken);

        if (tenant == null) {
            return RestResult.error("租户验证失败");
        }

        // 查询同步状态
        Map<String, Object> status = new HashMap<>();
        status.put("tenantId", tenant.getTenantId());
        status.put("tenantName", tenant.getTenantName());
        status.put("lastSyncTime", tenant.getLastSyncTime());
        status.put("usedStorage", tenant.getUsedStorage());
        status.put("maxStorage", tenant.getMaxStorage());

        return RestResult.ok(status);
    }
}

4.5 数据接收服务(公网端)

/**
 * 数据接收服务(公网端)
 */
@Service
public class DataReceiveServiceImpl implements DataReceiveService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Autowired
    private TenantInfoService tenantInfoService;

    private static final Logger logger = LoggerFactory.getLogger(DataReceiveServiceImpl.class);

    /**
     * 接收并保存数据
     */
    @Override
    @Transactional
    public DataReceiveResult receiveData(TenantInfo tenant, SyncDataRequest request) {

        DataReceiveResult result = new DataReceiveResult();
        result.setSuccessCount(0);
        result.setFailCount(0);

        String tableName = request.getTableName();
        // 为多租户数据添加租户前缀,实现数据隔离
        String targetTableName = tenant.getTenantId() + "_" + tableName;

        // 检查目标表是否存在,不存在则创建
        ensureTableExists(targetTableName, tableName);

        for (SyncDataItem item : request.getChangeLogs()) {
            try {
                switch (item.getOperationType()) {
                    case "INSERT":
                        insertData(targetTableName, item.getAfterData());
                        break;
                    case "UPDATE":
                        updateData(targetTableName, item.getPrimaryKey(), item.getAfterData());
                        break;
                    case "DELETE":
                        deleteData(targetTableName, item.getPrimaryKey());
                        break;
                }
                result.setSuccessCount(result.getSuccessCount() + 1);

            } catch (Exception e) {
                logger.error("处理数据失败:" + item, e);
                result.setFailCount(result.getFailCount() + 1);
                result.addFailedItem(item, e.getMessage());
            }
        }

        return result;
    }

    /**
     * 确保目标表存在
     */
    private void ensureTableExists(String targetTableName, String sourceTableName) {
        // 检查表是否存在
        String checkSql = "SELECT COUNT(*) FROM information_schema.tables " +
                         "WHERE table_schema = DATABASE() AND table_name = ?";

        Integer count = jdbcTemplate.queryForObject(checkSql, Integer.class, targetTableName);

        if (count == 0) {
            // 表不存在,根据源表结构创建
            String createSql = "CREATE TABLE " + targetTableName + " LIKE " + sourceTableName;
            jdbcTemplate.execute(createSql);
            logger.info("创建租户表:{}", targetTableName);
        }
    }

    /**
     * 插入数据
     */
    private void insertData(String tableName, String jsonData) {
        JSONObject data = JSON.parseObject(jsonData);

        StringBuilder columns = new StringBuilder();
        StringBuilder values = new StringBuilder();
        List<Object> params = new ArrayList<>();

        for (String key : data.keySet()) {
            if (columns.length() > 0) {
                columns.append(", ");
                values.append(", ");
            }
            columns.append("`").append(key).append("`");
            values.append("?");
            params.add(data.get(key));
        }

        String sql = "INSERT INTO " + tableName + " (" + columns + ") VALUES (" + values + ")";
        jdbcTemplate.update(sql, params.toArray());
    }

    /**
     * 更新数据
     */
    private void updateData(String tableName, String primaryKey, String jsonData) {
        JSONObject data = JSON.parseObject(jsonData);

        StringBuilder sets = new StringBuilder();
        List<Object> params = new ArrayList<>();

        Object pkValue = null;
        for (String key : data.keySet()) {
            if ("id".equals(key)) {
                pkValue = data.get(key);
                continue;
            }

            if (sets.length() > 0) {
                sets.append(", ");
            }
            sets.append("`").append(key).append("` = ?");
            params.add(data.get(key));
        }

        params.add(pkValue);

        String sql = "UPDATE " + tableName + " SET " + sets + " WHERE id = ?";
        jdbcTemplate.update(sql, params.toArray());
    }

    /**
     * 删除数据
     */
    private void deleteData(String tableName, String primaryKey) {
        String sql = "DELETE FROM " + tableName + " WHERE id = ?";
        jdbcTemplate.update(sql, primaryKey);
    }
}

5. 配置文件

5.1 内网端配置 (application-intranet.yml)

# 应用配置
spring:
  application:
    name: pacsonline-intranet
  profiles:
    active: intranet

# 数据同步配置
sync:
  # 是否启用数据同步
  enabled: true
  # 租户ID(唯一标识)
  tenant-id: ${TENANT_ID:tenant_001}
  # 租户名称
  tenant-name: ${TENANT_NAME:内网环境A}
  # 公网API地址
  cloud-api-url: ${CLOUD_API_URL:https://cloud.example.com}
  # 访问令牌
  access-token: ${ACCESS_TOKEN:your_access_token_here}
  # 数据加密密钥
  secret-key: ${SECRET_KEY:your_secret_key_here}
  # 同步模式:auto-自动,manual-手动,schedule-定时
  sync-mode: auto
  # 定时同步间隔(秒)
  sync-interval: 300
  # 需要同步的表
  sync-tables:
    - sys_user
    - sys_role
    - sys_menu
    - business_data
  # 失败重试次数
  retry-times: 3
  # 请求超时时间(毫秒)
  timeout: 30000

# HTTP客户端配置
http:
  client:
    connect-timeout: 5000
    read-timeout: 30000
    max-connections: 200

5.2 公网端配置 (application-cloud.yml)

# 应用配置
spring:
  application:
    name: pacsonline-cloud
  profiles:
    active: cloud

# 多租户配置
multi-tenant:
  # 是否启用多租户
  enabled: true
  # 租户表前缀模式
  table-prefix-mode: true
  # 最大租户数量
  max-tenants: 1000

# 数据同步配置
sync:
  # 是否启用数据接收
  enabled: true
  # 最大批次大小
  max-batch-size: 1000
  # 数据保留天数
  data-retention-days: 365

6. 部署步骤

6.1 公网端部署

# 1. 创建数据库
mysql -u root -p
CREATE DATABASE pacsonline_cloud CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

# 2. 执行数据库脚本
mysql -u root -p pacsonline_cloud < sync_config.sql
mysql -u root -p pacsonline_cloud < sync_log.sql
mysql -u root -p pacsonline_cloud < tenant_info.sql

# 3. 初始化租户信息
INSERT INTO tenant_info (
    tenant_id, tenant_name, tenant_type, access_token, secret_key, status
) VALUES (
    'tenant_001',
    '内网环境A',
    'intranet',
    'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...', -- JWT令牌
    'AES256EncryptionKey123456',
    1
);

# 4. 打包部署
mvn clean package -Pcloud
java -jar target/pacsonline-cloud.jar --spring.profiles.active=cloud

6.2 内网端部署

# 1. 创建数据库
mysql -u root -p
CREATE DATABASE pacsonline_intranet CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

# 2. 执行数据库脚本(包含业务表和同步相关表)
mysql -u root -p pacsonline_intranet < all_tables.sql
mysql -u root -p pacsonline_intranet < data_change_log.sql

# 3. 配置环境变量
export TENANT_ID=tenant_001
export TENANT_NAME=内网环境A
export CLOUD_API_URL=https://cloud.example.com
export ACCESS_TOKEN=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...

# 4. 打包部署
mvn clean package -Pintranet
java -jar target/pacsonline-intranet.jar --spring.profiles.active=intranet

7. 监控与运维

7.1 同步状态监控

-- 查询同步状态统计
SELECT
    DATE(create_time) as sync_date,
    sync_status,
    COUNT(*) as count,
    SUM(record_count) as total_records,
    AVG(use_time) as avg_time_ms
FROM sync_log
WHERE create_time >= DATE_SUB(NOW(), INTERVAL 7 DAY)
GROUP BY DATE(create_time), sync_status
ORDER BY sync_date DESC;

-- 查询未同步的数据
SELECT
    table_name,
    COUNT(*) as unsynced_count,
    MIN(change_time) as oldest_change
FROM data_change_log
WHERE sync_status = 0
GROUP BY table_name;

-- 查询失败的同步任务
SELECT *
FROM sync_log
WHERE sync_status = 'failed'
ORDER BY create_time DESC
LIMIT 100;

7.2 数据清理策略

-- 清理已同步的变更记录(保留30天)
DELETE FROM data_change_log
WHERE sync_status = 1
  AND sync_time < DATE_SUB(NOW(), INTERVAL 30 DAY);

-- 清理同步日志(保留90天)
DELETE FROM sync_log
WHERE create_time < DATE_SUB(NOW(), INTERVAL 90 DAY);

7.3 性能优化建议

  1. 批量同步:每次同步1000条记录,避免单次传输过大
  2. 异步处理:使用异步任务执行同步,不阻塞主业务
  3. 索引优化:为sync_status、change_time等字段添加索引
  4. 数据压缩:传输前压缩JSON数据,减少网络流量
  5. 分表策略:公网端按租户分表,避免单表数据过大

8. 安全加固

8.1 HTTPS配置

server:
  port: 8443
  ssl:
    enabled: true
    key-store: classpath:keystore.p12
    key-store-password: your_password
    key-store-type: PKCS12
    key-alias: tomcat

8.2 数据加密

/**
 * 数据加密工具
 */
public class DataEncryptUtil {

    /**
     * AES加密
     */
    public static String encrypt(String data, String secretKey) throws Exception {
        SecretKeySpec key = new SecretKeySpec(secretKey.getBytes(), "AES");
        Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
        cipher.init(Cipher.ENCRYPT_MODE, key);
        byte[] encrypted = cipher.doFinal(data.getBytes());
        return Base64.getEncoder().encodeToString(encrypted);
    }

    /**
     * AES解密
     */
    public static String decrypt(String encryptedData, String secretKey) throws Exception {
        SecretKeySpec key = new SecretKeySpec(secretKey.getBytes(), "AES");
        Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
        cipher.init(Cipher.DECRYPT_MODE, key);
        byte[] decrypted = cipher.doFinal(Base64.getDecoder().decode(encryptedData));
        return new String(decrypted);
    }
}

8.3 IP白名单验证

@Component
public class IpWhitelistFilter extends OncePerRequestFilter {

    @Autowired
    private TenantInfoService tenantInfoService;

    @Override
    protected void doFilterInternal(HttpServletRequest request,
                                   HttpServletResponse response,
                                   FilterChain filterChain) throws ServletException, IOException {

        String requestPath = request.getRequestURI();

        // 只对同步接口进行IP验证
        if (requestPath.startsWith("/api/sync/")) {
            String clientIp = IpUtils.getIpAddr(request);
            String tenantId = request.getHeader("X-Tenant-Id");

            if (!tenantInfoService.isIpAllowed(tenantId, clientIp)) {
                response.setStatus(HttpServletResponse.SC_FORBIDDEN);
                response.getWriter().write("IP not in whitelist");
                return;
            }
        }

        filterChain.doFilter(request, response);
    }
}

9. 常见问题

Q1: 内网网络不稳定,同步经常失败怎么办?

A: 实现了失败重试机制和断点续传:

  • 同步失败的数据会保留在data_change_log表中
  • 下次同步时会自动重试
  • 可以通过retry_times配置重试次数
  • 建议设置合理的sync_interval,避免频繁同步

Q2: 如何处理公网和内网的数据冲突?

A:

  • 内网是数据源,公网只接收不回传
  • 公网数据仅用于查询和报表
  • 如需双向同步,需要实现冲突检测和解决策略(如时间戳比较、版本号等)

Q3: 数据同步延迟多久?

A:

  • 实时模式(auto):检测到数据变更后,1分钟内同步
  • 定时模式(schedule):根据配置的间隔时间
  • 手动模式(manual):需要手动触发

Q4: 如何保证数据安全?

A:

  • HTTPS加密传输
  • JWT令牌认证
  • IP白名单限制
  • 可选:敏感字段AES加密
  • 公网数据按租户隔离

Q5: 公网存储空间不足怎么办?

A:

  • 设置max_storage限制每个租户的存储空间
  • 定期清理历史数据(data_retention_days)
  • 只同步核心业务数据,非必要数据不同步
  • 可以实现数据归档策略

10. 扩展方案

10.1 使用消息队列(高级方案)

适用场景:数据量大、并发高、需要高可用

# 架构调整
内网端 → RocketMQ → 公网消费者 → 公网数据库

优点:
- 解耦性好,内网发送不需要等待公网响应
- 消息持久化,不会丢失数据
- 支持集群部署,高可用
- 流量削峰

缺点:
- 需要部署MQ服务器
- 增加系统复杂度
- 需要公网MQ服务器可被内网访问

10.2 使用Canal监听binlog(无侵入方案)

适用场景:不想修改业务代码,纯技术层面解决

内网MySQL → Canal Server → 自定义Client → 公网API

优点:
- 无需修改业务代码
- 实时性高,秒级同步
- 准确捕获所有数据变更

缺点:
- 需要开启MySQL binlog
- 需要部署Canal服务
- 配置复杂度较高

11. 总结

本方案提供了完整的内网到公网数据同步解决方案,具有以下特点:

灵活部署:支持纯内网、纯公网、内网+公网混合部署 ✅ 多租户支持:公网可同时接收多个内网环境的数据 ✅ 安全可靠:HTTPS + JWT + 数据加密 + IP白名单 ✅ 性能优化:增量同步 + 异步处理 + 批量传输 ✅ 易于监控:完整的同步日志和状态查询 ✅ 容错机制:失败重试 + 断点续传

根据实际需求选择合适的数据捕获方式:

  • AOP拦截:推荐,简单易用,适合新项目
  • 数据库触发器:无侵入,但性能影响大
  • Canal监听binlog:最优方案,但配置复杂

文档版本:v1.0 最后更新:2025-10-30 维护者:开发团队