# 内网局域网与公网数据同步解决方案 ## 1. 架构设计概述 ### 1.1 部署架构 ``` ┌─────────────────────────────────────────────────────────────────┐ │ 公网环境 │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ 公网应用服务器 (Cloud Server) │ │ │ │ - 接收数据同步请求 │ │ │ │ - 提供数据查询接口 │ │ │ │ - 多租户数据隔离 │ │ │ │ - 同步状态监控 │ │ │ └──────────────────────────────────────────────────────────┘ │ │ ↑ │ │ │ HTTPS + 令牌认证 │ │ │ │ └───────────────────────────┼──────────────────────────────────────┘ │ │ 防火墙/NAT │ ┌───────────────────────────┼──────────────────────────────────────┐ │ 内网环境 A │ │ ┌────────────────────────┼──────────────────────────────────┐ │ │ │ 内网应用服务器 │ │ │ │ │ - 业务数据产生 │ │ │ │ │ - 定时同步任务 ───────┘ │ │ │ │ - 增量数据捕获 │ │ │ │ - 断点续传支持 │ │ │ └──────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────┐ │ 内网环境 B │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ 内网应用服务器 │ │ │ │ - 独立租户标识 │ │ │ │ - 自动数据同步 │ │ │ └──────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────┘ ``` ### 1.2 核心设计原则 1. **内网主动推送**:内网服务器主动向公网推送数据(因为公网无法主动访问内网) 2. **增量同步**:只同步变更的数据,减少网络传输 3. **多租户隔离**:公网支持多个内网环境的数据隔离存储 4. **安全传输**:HTTPS + JWT令牌 + 数据加密 5. **断点续传**:支持网络中断后继续同步 6. **可配置策略**:支持实时同步、定时同步、手动同步 ## 2. 技术方案选型 ### 2.1 推荐方案:增量数据推送 + CDC ``` 核心技术栈: - 数据捕获:Canal (阿里开源MySQL binlog解析工具) / 触发器 / AOP拦截 - 数据传输:HTTPS RESTful API - 消息队列:RocketMQ / RabbitMQ (可选,异步处理) - 任务调度:XXL-Job / Spring @Scheduled - 数据加密:AES-256 - 认证授权:JWT Token ``` ### 2.2 方案对比 | 方案 | 优点 | 缺点 | 适用场景 | |------|------|------|----------| | **增量推送(推荐)** | 实时性好、网络压力小、可控性强 | 需要开发同步逻辑 | 数据量大、实时性要求高 | | 数据库复制 | 实现简单、一致性好 | 需要开通数据库端口、安全风险高 | 内网可直连公网数据库 | | 定时全量导出 | 实现简单、可靠性高 | 数据重复、效率低 | 数据量小、实时性要求低 | | 消息队列同步 | 解耦性好、高可用 | 复杂度高、需要中间件 | 高并发、分布式场景 | ## 3. 数据库设计 ### 3.1 同步配置表 (sync_config) ```sql CREATE TABLE `sync_config` ( `id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '配置ID', `tenant_id` VARCHAR(50) NOT NULL COMMENT '租户ID(区分不同内网环境)', `tenant_name` VARCHAR(100) DEFAULT NULL COMMENT '租户名称', `sync_mode` VARCHAR(20) NOT NULL DEFAULT 'auto' COMMENT '同步模式:auto-自动,manual-手动,schedule-定时', `sync_interval` INT(11) DEFAULT 300 COMMENT '同步间隔(秒),定时模式使用', `cron_expression` VARCHAR(100) DEFAULT NULL COMMENT 'Cron表达式,定时模式使用', `sync_tables` TEXT COMMENT '需要同步的表名列表,JSON数组格式', `cloud_api_url` VARCHAR(255) NOT NULL COMMENT '公网API地址', `access_token` VARCHAR(500) NOT NULL COMMENT '访问令牌', `secret_key` VARCHAR(100) DEFAULT NULL COMMENT '数据加密密钥', `enabled` TINYINT(1) DEFAULT 1 COMMENT '是否启用:0-禁用,1-启用', `retry_times` INT(11) DEFAULT 3 COMMENT '失败重试次数', `timeout` INT(11) DEFAULT 30000 COMMENT '请求超时时间(毫秒)', `last_sync_time` DATETIME DEFAULT NULL COMMENT '最后同步时间', `remark` VARCHAR(500) DEFAULT NULL COMMENT '备注', `create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`id`), UNIQUE KEY `uk_tenant_id` (`tenant_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据同步配置表'; ``` ### 3.2 同步日志表 (sync_log) ```sql CREATE TABLE `sync_log` ( `id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '日志ID', `tenant_id` VARCHAR(50) NOT NULL COMMENT '租户ID', `batch_id` VARCHAR(50) NOT NULL COMMENT '批次ID(同一次同步任务的唯一标识)', `table_name` VARCHAR(100) NOT NULL COMMENT '同步的表名', `operation_type` VARCHAR(20) NOT NULL COMMENT '操作类型:INSERT/UPDATE/DELETE', `record_count` INT(11) DEFAULT 0 COMMENT '本次同步记录数', `success_count` INT(11) DEFAULT 0 COMMENT '成功数量', `fail_count` INT(11) DEFAULT 0 COMMENT '失败数量', `sync_status` VARCHAR(20) NOT NULL COMMENT '同步状态:pending-待同步,processing-同步中,success-成功,failed-失败', `start_time` DATETIME NOT NULL COMMENT '开始时间', `end_time` DATETIME DEFAULT NULL COMMENT '结束时间', `use_time` BIGINT(20) DEFAULT NULL COMMENT '耗时(毫秒)', `error_msg` TEXT DEFAULT NULL COMMENT '错误信息', `data_snapshot` LONGTEXT DEFAULT NULL COMMENT '数据快照(JSON格式,仅记录失败的数据)', `retry_count` INT(11) DEFAULT 0 COMMENT '重试次数', `create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', PRIMARY KEY (`id`), KEY `idx_tenant_id` (`tenant_id`), KEY `idx_batch_id` (`batch_id`), KEY `idx_sync_status` (`sync_status`), KEY `idx_create_time` (`create_time`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据同步日志表'; ``` ### 3.3 数据变更记录表 (data_change_log) ```sql CREATE TABLE `data_change_log` ( `id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '变更ID', `table_name` VARCHAR(100) NOT NULL COMMENT '表名', `primary_key` VARCHAR(100) NOT NULL COMMENT '主键值', `operation_type` VARCHAR(20) NOT NULL COMMENT '操作类型:INSERT/UPDATE/DELETE', `before_data` LONGTEXT DEFAULT NULL COMMENT '变更前数据(JSON格式)', `after_data` LONGTEXT DEFAULT NULL COMMENT '变更后数据(JSON格式)', `change_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '变更时间', `sync_status` TINYINT(1) DEFAULT 0 COMMENT '同步状态:0-未同步,1-已同步,2-同步失败', `sync_time` DATETIME DEFAULT NULL COMMENT '同步时间', `sync_batch_id` VARCHAR(50) DEFAULT NULL COMMENT '同步批次ID', `operator_id` BIGINT(20) DEFAULT NULL COMMENT '操作人ID', `operator_name` VARCHAR(50) DEFAULT NULL COMMENT '操作人姓名', PRIMARY KEY (`id`), KEY `idx_table_name` (`table_name`), KEY `idx_sync_status` (`sync_status`), KEY `idx_change_time` (`change_time`), KEY `idx_sync_batch_id` (`sync_batch_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据变更记录表'; ``` ### 3.4 租户信息表 (tenant_info) - 仅公网 ```sql CREATE TABLE `tenant_info` ( `id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '租户ID', `tenant_id` VARCHAR(50) NOT NULL COMMENT '租户唯一标识', `tenant_name` VARCHAR(100) NOT NULL COMMENT '租户名称', `tenant_type` VARCHAR(20) DEFAULT 'intranet' COMMENT '租户类型:intranet-内网,cloud-公网', `access_token` VARCHAR(500) NOT NULL COMMENT '访问令牌', `secret_key` VARCHAR(100) DEFAULT NULL COMMENT '数据加密密钥', `ip_whitelist` TEXT DEFAULT NULL COMMENT 'IP白名单,JSON数组格式', `max_storage` BIGINT(20) DEFAULT NULL COMMENT '最大存储空间(字节)', `used_storage` BIGINT(20) DEFAULT 0 COMMENT '已用存储空间(字节)', `status` TINYINT(1) DEFAULT 1 COMMENT '状态:0-禁用,1-启用', `expire_time` DATETIME DEFAULT NULL COMMENT '过期时间', `last_sync_time` DATETIME DEFAULT NULL COMMENT '最后同步时间', `create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`id`), UNIQUE KEY `uk_tenant_id` (`tenant_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='租户信息表'; ``` ## 4. 核心功能实现 ### 4.1 数据变更捕获(内网端) #### 方案一:AOP拦截(推荐,简单易用) ```java /** * 数据变更拦截器 * 拦截所有Service层的增删改操作,自动记录到data_change_log表 */ @Aspect @Component public class DataChangeCaptureAop { @Autowired private DataChangeLogService dataChangeLogService; /** * 拦截所有Mapper的insert/update/delete方法 */ @Around("execution(* com.zskk.pacsonline.modules.*.mapper.*Mapper.insert*(..)) || " + "execution(* com.zskk.pacsonline.modules.*.mapper.*Mapper.update*(..)) || " + "execution(* com.zskk.pacsonline.modules.*.mapper.*Mapper.delete*(..))") public Object captureDataChange(ProceedingJoinPoint jp) throws Throwable { // 判断操作类型 String methodName = jp.getSignature().getName(); String operationType = getOperationType(methodName); // 获取表名和实体数据 String tableName = getTableName(jp); Object entity = getEntity(jp.getArgs()); // 执行原方法 Object result = jp.proceed(); // 记录变更日志(异步) dataChangeLogService.recordChange(tableName, operationType, entity); return result; } } ``` #### 方案二:数据库触发器(无侵入,但性能影响大) ```sql -- 以sys_user表为例 DELIMITER $$ CREATE TRIGGER tr_sys_user_insert AFTER INSERT ON sys_user FOR EACH ROW BEGIN INSERT INTO data_change_log ( table_name, primary_key, operation_type, after_data, change_time ) VALUES ( 'sys_user', NEW.id, 'INSERT', JSON_OBJECT( 'id', NEW.id, 'username', NEW.username, 'nickname', NEW.nickname -- 其他字段... ), NOW() ); END$$ CREATE TRIGGER tr_sys_user_update AFTER UPDATE ON sys_user FOR EACH ROW BEGIN INSERT INTO data_change_log ( table_name, primary_key, operation_type, before_data, after_data, change_time ) VALUES ( 'sys_user', NEW.id, 'UPDATE', JSON_OBJECT('id', OLD.id, 'username', OLD.username, ...), JSON_OBJECT('id', NEW.id, 'username', NEW.username, ...), NOW() ); END$$ CREATE TRIGGER tr_sys_user_delete AFTER DELETE ON sys_user FOR EACH ROW BEGIN INSERT INTO data_change_log ( table_name, primary_key, operation_type, before_data, change_time ) VALUES ( 'sys_user', OLD.id, 'DELETE', JSON_OBJECT('id', OLD.id, 'username', OLD.username, ...), NOW() ); END$$ DELIMITER ; ``` ### 4.2 定时同步任务(内网端) ```java /** * 数据同步定时任务 */ @Component public class DataSyncScheduler { @Autowired private SyncConfigService syncConfigService; @Autowired private DataSyncService dataSyncService; /** * 定时检查并执行同步任务 * 每分钟检查一次 */ @Scheduled(cron = "0 * * * * ?") public void checkAndSync() { // 获取启用的同步配置 List configs = syncConfigService.getEnabledConfigs(); for (SyncConfig config : configs) { // 判断是否需要同步 if (shouldSync(config)) { // 异步执行同步任务 CompletableFuture.runAsync(() -> { dataSyncService.syncData(config); }); } } } /** * 判断是否需要同步 */ private boolean shouldSync(SyncConfig config) { if ("manual".equals(config.getSyncMode())) { return false; // 手动模式不自动同步 } if ("auto".equals(config.getSyncMode())) { // 实时模式:检查是否有未同步的数据 return dataSyncService.hasUnsyncedData(); } if ("schedule".equals(config.getSyncMode())) { // 定时模式:判断是否到达同步时间 LocalDateTime lastSyncTime = config.getLastSyncTime(); if (lastSyncTime == null) { return true; } int interval = config.getSyncInterval(); LocalDateTime nextSyncTime = lastSyncTime.plusSeconds(interval); return LocalDateTime.now().isAfter(nextSyncTime); } return false; } } ``` ### 4.3 数据同步核心服务(内网端) ```java /** * 数据同步服务 */ @Service public class DataSyncServiceImpl implements DataSyncService { @Autowired private DataChangeLogMapper dataChangeLogMapper; @Autowired private SyncLogService syncLogService; @Autowired private RestTemplate restTemplate; private static final Logger logger = LoggerFactory.getLogger(DataSyncServiceImpl.class); /** * 执行数据同步 */ @Override @Transactional public void syncData(SyncConfig config) { String batchId = UUID.randomUUID().toString(); logger.info("开始同步数据,租户:{},批次:{}", config.getTenantId(), batchId); try { // 1. 查询未同步的数据变更记录 List changeLogs = dataChangeLogMapper.selectList( new LambdaQueryWrapper() .eq(DataChangeLog::getSyncStatus, 0) .orderByAsc(DataChangeLog::getChangeTime) .last("LIMIT 1000") // 每批次最多1000条 ); if (changeLogs.isEmpty()) { logger.info("没有需要同步的数据"); return; } // 2. 按表名分组 Map> groupedLogs = changeLogs.stream() .collect(Collectors.groupingBy(DataChangeLog::getTableName)); // 3. 逐表同步 for (Map.Entry> entry : groupedLogs.entrySet()) { String tableName = entry.getKey(); List logs = entry.getValue(); syncTable(config, batchId, tableName, logs); } // 4. 更新最后同步时间 config.setLastSyncTime(LocalDateTime.now()); syncConfigService.updateById(config); logger.info("同步完成,批次:{}", batchId); } catch (Exception e) { logger.error("同步失败,批次:" + batchId, e); } } /** * 同步单个表的数据 */ private void syncTable(SyncConfig config, String batchId, String tableName, List logs) { SyncLog syncLog = new SyncLog(); syncLog.setTenantId(config.getTenantId()); syncLog.setBatchId(batchId); syncLog.setTableName(tableName); syncLog.setRecordCount(logs.size()); syncLog.setSyncStatus("processing"); syncLog.setStartTime(LocalDateTime.now()); syncLogService.save(syncLog); int successCount = 0; int failCount = 0; List failedLogs = new ArrayList<>(); try { // 构建同步数据 SyncDataRequest request = new SyncDataRequest(); request.setTenantId(config.getTenantId()); request.setTableName(tableName); request.setChangeLogs(logs.stream().map(log -> { SyncDataItem item = new SyncDataItem(); item.setPrimaryKey(log.getPrimaryKey()); item.setOperationType(log.getOperationType()); item.setBeforeData(log.getBeforeData()); item.setAfterData(log.getAfterData()); item.setChangeTime(log.getChangeTime()); return item; }).collect(Collectors.toList())); // 发送HTTP请求到公网 HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); headers.setBearerAuth(config.getAccessToken()); HttpEntity entity = new HttpEntity<>(request, headers); ResponseEntity response = restTemplate.exchange( config.getCloudApiUrl() + "/api/sync/receiveData", HttpMethod.POST, entity, RestResult.class ); if (response.getStatusCode() == HttpStatus.OK && response.getBody() != null && response.getBody().getCode() == 200) { // 同步成功,更新状态 successCount = logs.size(); for (DataChangeLog log : logs) { log.setSyncStatus((byte) 1); log.setSyncTime(LocalDateTime.now()); log.setSyncBatchId(batchId); dataChangeLogMapper.updateById(log); } syncLog.setSyncStatus("success"); } else { // 同步失败 failCount = logs.size(); failedLogs.addAll(logs); syncLog.setSyncStatus("failed"); syncLog.setErrorMsg("公网返回错误:" + response.getBody()); } } catch (Exception e) { logger.error("同步表 " + tableName + " 失败", e); failCount = logs.size(); failedLogs.addAll(logs); syncLog.setSyncStatus("failed"); syncLog.setErrorMsg(e.getMessage()); } // 更新同步日志 syncLog.setSuccessCount(successCount); syncLog.setFailCount(failCount); syncLog.setEndTime(LocalDateTime.now()); syncLog.setUseTime( Duration.between(syncLog.getStartTime(), syncLog.getEndTime()).toMillis() ); if (!failedLogs.isEmpty()) { syncLog.setDataSnapshot(JSON.toJSONString(failedLogs)); } syncLogService.updateById(syncLog); } /** * 检查是否有未同步的数据 */ @Override public boolean hasUnsyncedData() { Long count = dataChangeLogMapper.selectCount( new LambdaQueryWrapper() .eq(DataChangeLog::getSyncStatus, 0) ); return count > 0; } } ``` ### 4.4 公网接收接口(公网端) ```java /** * 数据同步接收接口(公网端) */ @RestController @RequestMapping("/api/sync") public class SyncController { @Autowired private TenantInfoService tenantInfoService; @Autowired private DataReceiveService dataReceiveService; private static final Logger logger = LoggerFactory.getLogger(SyncController.class); /** * 接收内网同步的数据 */ @PostMapping("/receiveData") public RestResult receiveData(@RequestBody SyncDataRequest request, @RequestHeader("Authorization") String token) { try { // 1. 验证租户和令牌 if (token == null || !token.startsWith("Bearer ")) { return RestResult.error("无效的访问令牌"); } String accessToken = token.substring(7); TenantInfo tenant = tenantInfoService.validateToken(request.getTenantId(), accessToken); if (tenant == null) { return RestResult.error("租户验证失败"); } if (tenant.getStatus() == 0) { return RestResult.error("租户已被禁用"); } // 2. 检查租户存储空间 if (tenant.getMaxStorage() != null && tenant.getUsedStorage() >= tenant.getMaxStorage()) { return RestResult.error("存储空间已满"); } // 3. 接收并保存数据 DataReceiveResult result = dataReceiveService.receiveData(tenant, request); // 4. 更新租户最后同步时间 tenant.setLastSyncTime(LocalDateTime.now()); tenantInfoService.updateById(tenant); logger.info("接收数据成功,租户:{},表:{},记录数:{}", request.getTenantId(), request.getTableName(), request.getChangeLogs().size()); return RestResult.ok("同步成功", result); } catch (Exception e) { logger.error("接收数据失败", e); return RestResult.error("同步失败:" + e.getMessage()); } } /** * 查询同步状态 */ @GetMapping("/status/{tenantId}") public RestResult getSyncStatus(@PathVariable String tenantId, @RequestHeader("Authorization") String token) { // 验证租户 String accessToken = token.substring(7); TenantInfo tenant = tenantInfoService.validateToken(tenantId, accessToken); if (tenant == null) { return RestResult.error("租户验证失败"); } // 查询同步状态 Map status = new HashMap<>(); status.put("tenantId", tenant.getTenantId()); status.put("tenantName", tenant.getTenantName()); status.put("lastSyncTime", tenant.getLastSyncTime()); status.put("usedStorage", tenant.getUsedStorage()); status.put("maxStorage", tenant.getMaxStorage()); return RestResult.ok(status); } } ``` ### 4.5 数据接收服务(公网端) ```java /** * 数据接收服务(公网端) */ @Service public class DataReceiveServiceImpl implements DataReceiveService { @Autowired private JdbcTemplate jdbcTemplate; @Autowired private TenantInfoService tenantInfoService; private static final Logger logger = LoggerFactory.getLogger(DataReceiveServiceImpl.class); /** * 接收并保存数据 */ @Override @Transactional public DataReceiveResult receiveData(TenantInfo tenant, SyncDataRequest request) { DataReceiveResult result = new DataReceiveResult(); result.setSuccessCount(0); result.setFailCount(0); String tableName = request.getTableName(); // 为多租户数据添加租户前缀,实现数据隔离 String targetTableName = tenant.getTenantId() + "_" + tableName; // 检查目标表是否存在,不存在则创建 ensureTableExists(targetTableName, tableName); for (SyncDataItem item : request.getChangeLogs()) { try { switch (item.getOperationType()) { case "INSERT": insertData(targetTableName, item.getAfterData()); break; case "UPDATE": updateData(targetTableName, item.getPrimaryKey(), item.getAfterData()); break; case "DELETE": deleteData(targetTableName, item.getPrimaryKey()); break; } result.setSuccessCount(result.getSuccessCount() + 1); } catch (Exception e) { logger.error("处理数据失败:" + item, e); result.setFailCount(result.getFailCount() + 1); result.addFailedItem(item, e.getMessage()); } } return result; } /** * 确保目标表存在 */ private void ensureTableExists(String targetTableName, String sourceTableName) { // 检查表是否存在 String checkSql = "SELECT COUNT(*) FROM information_schema.tables " + "WHERE table_schema = DATABASE() AND table_name = ?"; Integer count = jdbcTemplate.queryForObject(checkSql, Integer.class, targetTableName); if (count == 0) { // 表不存在,根据源表结构创建 String createSql = "CREATE TABLE " + targetTableName + " LIKE " + sourceTableName; jdbcTemplate.execute(createSql); logger.info("创建租户表:{}", targetTableName); } } /** * 插入数据 */ private void insertData(String tableName, String jsonData) { JSONObject data = JSON.parseObject(jsonData); StringBuilder columns = new StringBuilder(); StringBuilder values = new StringBuilder(); List params = new ArrayList<>(); for (String key : data.keySet()) { if (columns.length() > 0) { columns.append(", "); values.append(", "); } columns.append("`").append(key).append("`"); values.append("?"); params.add(data.get(key)); } String sql = "INSERT INTO " + tableName + " (" + columns + ") VALUES (" + values + ")"; jdbcTemplate.update(sql, params.toArray()); } /** * 更新数据 */ private void updateData(String tableName, String primaryKey, String jsonData) { JSONObject data = JSON.parseObject(jsonData); StringBuilder sets = new StringBuilder(); List params = new ArrayList<>(); Object pkValue = null; for (String key : data.keySet()) { if ("id".equals(key)) { pkValue = data.get(key); continue; } if (sets.length() > 0) { sets.append(", "); } sets.append("`").append(key).append("` = ?"); params.add(data.get(key)); } params.add(pkValue); String sql = "UPDATE " + tableName + " SET " + sets + " WHERE id = ?"; jdbcTemplate.update(sql, params.toArray()); } /** * 删除数据 */ private void deleteData(String tableName, String primaryKey) { String sql = "DELETE FROM " + tableName + " WHERE id = ?"; jdbcTemplate.update(sql, primaryKey); } } ``` ## 5. 配置文件 ### 5.1 内网端配置 (application-intranet.yml) ```yaml # 应用配置 spring: application: name: pacsonline-intranet profiles: active: intranet # 数据同步配置 sync: # 是否启用数据同步 enabled: true # 租户ID(唯一标识) tenant-id: ${TENANT_ID:tenant_001} # 租户名称 tenant-name: ${TENANT_NAME:内网环境A} # 公网API地址 cloud-api-url: ${CLOUD_API_URL:https://cloud.example.com} # 访问令牌 access-token: ${ACCESS_TOKEN:your_access_token_here} # 数据加密密钥 secret-key: ${SECRET_KEY:your_secret_key_here} # 同步模式:auto-自动,manual-手动,schedule-定时 sync-mode: auto # 定时同步间隔(秒) sync-interval: 300 # 需要同步的表 sync-tables: - sys_user - sys_role - sys_menu - business_data # 失败重试次数 retry-times: 3 # 请求超时时间(毫秒) timeout: 30000 # HTTP客户端配置 http: client: connect-timeout: 5000 read-timeout: 30000 max-connections: 200 ``` ### 5.2 公网端配置 (application-cloud.yml) ```yaml # 应用配置 spring: application: name: pacsonline-cloud profiles: active: cloud # 多租户配置 multi-tenant: # 是否启用多租户 enabled: true # 租户表前缀模式 table-prefix-mode: true # 最大租户数量 max-tenants: 1000 # 数据同步配置 sync: # 是否启用数据接收 enabled: true # 最大批次大小 max-batch-size: 1000 # 数据保留天数 data-retention-days: 365 ``` ## 6. 部署步骤 ### 6.1 公网端部署 ```bash # 1. 创建数据库 mysql -u root -p CREATE DATABASE pacsonline_cloud CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; # 2. 执行数据库脚本 mysql -u root -p pacsonline_cloud < sync_config.sql mysql -u root -p pacsonline_cloud < sync_log.sql mysql -u root -p pacsonline_cloud < tenant_info.sql # 3. 初始化租户信息 INSERT INTO tenant_info ( tenant_id, tenant_name, tenant_type, access_token, secret_key, status ) VALUES ( 'tenant_001', '内网环境A', 'intranet', 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...', -- JWT令牌 'AES256EncryptionKey123456', 1 ); # 4. 打包部署 mvn clean package -Pcloud java -jar target/pacsonline-cloud.jar --spring.profiles.active=cloud ``` ### 6.2 内网端部署 ```bash # 1. 创建数据库 mysql -u root -p CREATE DATABASE pacsonline_intranet CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; # 2. 执行数据库脚本(包含业务表和同步相关表) mysql -u root -p pacsonline_intranet < all_tables.sql mysql -u root -p pacsonline_intranet < data_change_log.sql # 3. 配置环境变量 export TENANT_ID=tenant_001 export TENANT_NAME=内网环境A export CLOUD_API_URL=https://cloud.example.com export ACCESS_TOKEN=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9... # 4. 打包部署 mvn clean package -Pintranet java -jar target/pacsonline-intranet.jar --spring.profiles.active=intranet ``` ## 7. 监控与运维 ### 7.1 同步状态监控 ```sql -- 查询同步状态统计 SELECT DATE(create_time) as sync_date, sync_status, COUNT(*) as count, SUM(record_count) as total_records, AVG(use_time) as avg_time_ms FROM sync_log WHERE create_time >= DATE_SUB(NOW(), INTERVAL 7 DAY) GROUP BY DATE(create_time), sync_status ORDER BY sync_date DESC; -- 查询未同步的数据 SELECT table_name, COUNT(*) as unsynced_count, MIN(change_time) as oldest_change FROM data_change_log WHERE sync_status = 0 GROUP BY table_name; -- 查询失败的同步任务 SELECT * FROM sync_log WHERE sync_status = 'failed' ORDER BY create_time DESC LIMIT 100; ``` ### 7.2 数据清理策略 ```sql -- 清理已同步的变更记录(保留30天) DELETE FROM data_change_log WHERE sync_status = 1 AND sync_time < DATE_SUB(NOW(), INTERVAL 30 DAY); -- 清理同步日志(保留90天) DELETE FROM sync_log WHERE create_time < DATE_SUB(NOW(), INTERVAL 90 DAY); ``` ### 7.3 性能优化建议 1. **批量同步**:每次同步1000条记录,避免单次传输过大 2. **异步处理**:使用异步任务执行同步,不阻塞主业务 3. **索引优化**:为sync_status、change_time等字段添加索引 4. **数据压缩**:传输前压缩JSON数据,减少网络流量 5. **分表策略**:公网端按租户分表,避免单表数据过大 ## 8. 安全加固 ### 8.1 HTTPS配置 ```yaml server: port: 8443 ssl: enabled: true key-store: classpath:keystore.p12 key-store-password: your_password key-store-type: PKCS12 key-alias: tomcat ``` ### 8.2 数据加密 ```java /** * 数据加密工具 */ public class DataEncryptUtil { /** * AES加密 */ public static String encrypt(String data, String secretKey) throws Exception { SecretKeySpec key = new SecretKeySpec(secretKey.getBytes(), "AES"); Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding"); cipher.init(Cipher.ENCRYPT_MODE, key); byte[] encrypted = cipher.doFinal(data.getBytes()); return Base64.getEncoder().encodeToString(encrypted); } /** * AES解密 */ public static String decrypt(String encryptedData, String secretKey) throws Exception { SecretKeySpec key = new SecretKeySpec(secretKey.getBytes(), "AES"); Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding"); cipher.init(Cipher.DECRYPT_MODE, key); byte[] decrypted = cipher.doFinal(Base64.getDecoder().decode(encryptedData)); return new String(decrypted); } } ``` ### 8.3 IP白名单验证 ```java @Component public class IpWhitelistFilter extends OncePerRequestFilter { @Autowired private TenantInfoService tenantInfoService; @Override protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException { String requestPath = request.getRequestURI(); // 只对同步接口进行IP验证 if (requestPath.startsWith("/api/sync/")) { String clientIp = IpUtils.getIpAddr(request); String tenantId = request.getHeader("X-Tenant-Id"); if (!tenantInfoService.isIpAllowed(tenantId, clientIp)) { response.setStatus(HttpServletResponse.SC_FORBIDDEN); response.getWriter().write("IP not in whitelist"); return; } } filterChain.doFilter(request, response); } } ``` ## 9. 常见问题 ### Q1: 内网网络不稳定,同步经常失败怎么办? **A**: 实现了失败重试机制和断点续传: - 同步失败的数据会保留在`data_change_log`表中 - 下次同步时会自动重试 - 可以通过`retry_times`配置重试次数 - 建议设置合理的`sync_interval`,避免频繁同步 ### Q2: 如何处理公网和内网的数据冲突? **A**: - 内网是数据源,公网只接收不回传 - 公网数据仅用于查询和报表 - 如需双向同步,需要实现冲突检测和解决策略(如时间戳比较、版本号等) ### Q3: 数据同步延迟多久? **A**: - 实时模式(auto):检测到数据变更后,1分钟内同步 - 定时模式(schedule):根据配置的间隔时间 - 手动模式(manual):需要手动触发 ### Q4: 如何保证数据安全? **A**: - HTTPS加密传输 - JWT令牌认证 - IP白名单限制 - 可选:敏感字段AES加密 - 公网数据按租户隔离 ### Q5: 公网存储空间不足怎么办? **A**: - 设置`max_storage`限制每个租户的存储空间 - 定期清理历史数据(data_retention_days) - 只同步核心业务数据,非必要数据不同步 - 可以实现数据归档策略 ## 10. 扩展方案 ### 10.1 使用消息队列(高级方案) 适用场景:数据量大、并发高、需要高可用 ```yaml # 架构调整 内网端 → RocketMQ → 公网消费者 → 公网数据库 优点: - 解耦性好,内网发送不需要等待公网响应 - 消息持久化,不会丢失数据 - 支持集群部署,高可用 - 流量削峰 缺点: - 需要部署MQ服务器 - 增加系统复杂度 - 需要公网MQ服务器可被内网访问 ``` ### 10.2 使用Canal监听binlog(无侵入方案) 适用场景:不想修改业务代码,纯技术层面解决 ```yaml 内网MySQL → Canal Server → 自定义Client → 公网API 优点: - 无需修改业务代码 - 实时性高,秒级同步 - 准确捕获所有数据变更 缺点: - 需要开启MySQL binlog - 需要部署Canal服务 - 配置复杂度较高 ``` ## 11. 总结 本方案提供了完整的内网到公网数据同步解决方案,具有以下特点: ✅ **灵活部署**:支持纯内网、纯公网、内网+公网混合部署 ✅ **多租户支持**:公网可同时接收多个内网环境的数据 ✅ **安全可靠**:HTTPS + JWT + 数据加密 + IP白名单 ✅ **性能优化**:增量同步 + 异步处理 + 批量传输 ✅ **易于监控**:完整的同步日志和状态查询 ✅ **容错机制**:失败重试 + 断点续传 根据实际需求选择合适的数据捕获方式: - **AOP拦截**:推荐,简单易用,适合新项目 - **数据库触发器**:无侵入,但性能影响大 - **Canal监听binlog**:最优方案,但配置复杂 --- **文档版本**:v1.0 **最后更新**:2025-10-30 **维护者**:开发团队