┌─────────────────────────────────────────────────────────────────┐
│ 公网环境 │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 公网应用服务器 (Cloud Server) │ │
│ │ - 接收数据同步请求 │ │
│ │ - 提供数据查询接口 │ │
│ │ - 多租户数据隔离 │ │
│ │ - 同步状态监控 │ │
│ └──────────────────────────────────────────────────────────┘ │
│ ↑ │
│ │ HTTPS + 令牌认证 │
│ │ │
└───────────────────────────┼──────────────────────────────────────┘
│
│ 防火墙/NAT
│
┌───────────────────────────┼──────────────────────────────────────┐
│ 内网环境 A │
│ ┌────────────────────────┼──────────────────────────────────┐ │
│ │ 内网应用服务器 │ │ │
│ │ - 业务数据产生 │ │ │
│ │ - 定时同步任务 ───────┘ │ │
│ │ - 增量数据捕获 │ │
│ │ - 断点续传支持 │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 内网环境 B │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 内网应用服务器 │ │
│ │ - 独立租户标识 │ │
│ │ - 自动数据同步 │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
核心技术栈:
- 数据捕获:Canal (阿里开源MySQL binlog解析工具) / 触发器 / AOP拦截
- 数据传输:HTTPS RESTful API
- 消息队列:RocketMQ / RabbitMQ (可选,异步处理)
- 任务调度:XXL-Job / Spring @Scheduled
- 数据加密:AES-256
- 认证授权:JWT Token
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 增量推送(推荐) | 实时性好、网络压力小、可控性强 | 需要开发同步逻辑 | 数据量大、实时性要求高 |
| 数据库复制 | 实现简单、一致性好 | 需要开通数据库端口、安全风险高 | 内网可直连公网数据库 |
| 定时全量导出 | 实现简单、可靠性高 | 数据重复、效率低 | 数据量小、实时性要求低 |
| 消息队列同步 | 解耦性好、高可用 | 复杂度高、需要中间件 | 高并发、分布式场景 |
CREATE TABLE `sync_config` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '配置ID',
`tenant_id` VARCHAR(50) NOT NULL COMMENT '租户ID(区分不同内网环境)',
`tenant_name` VARCHAR(100) DEFAULT NULL COMMENT '租户名称',
`sync_mode` VARCHAR(20) NOT NULL DEFAULT 'auto' COMMENT '同步模式:auto-自动,manual-手动,schedule-定时',
`sync_interval` INT(11) DEFAULT 300 COMMENT '同步间隔(秒),定时模式使用',
`cron_expression` VARCHAR(100) DEFAULT NULL COMMENT 'Cron表达式,定时模式使用',
`sync_tables` TEXT COMMENT '需要同步的表名列表,JSON数组格式',
`cloud_api_url` VARCHAR(255) NOT NULL COMMENT '公网API地址',
`access_token` VARCHAR(500) NOT NULL COMMENT '访问令牌',
`secret_key` VARCHAR(100) DEFAULT NULL COMMENT '数据加密密钥',
`enabled` TINYINT(1) DEFAULT 1 COMMENT '是否启用:0-禁用,1-启用',
`retry_times` INT(11) DEFAULT 3 COMMENT '失败重试次数',
`timeout` INT(11) DEFAULT 30000 COMMENT '请求超时时间(毫秒)',
`last_sync_time` DATETIME DEFAULT NULL COMMENT '最后同步时间',
`remark` VARCHAR(500) DEFAULT NULL COMMENT '备注',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_tenant_id` (`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据同步配置表';
CREATE TABLE `sync_log` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '日志ID',
`tenant_id` VARCHAR(50) NOT NULL COMMENT '租户ID',
`batch_id` VARCHAR(50) NOT NULL COMMENT '批次ID(同一次同步任务的唯一标识)',
`table_name` VARCHAR(100) NOT NULL COMMENT '同步的表名',
`operation_type` VARCHAR(20) NOT NULL COMMENT '操作类型:INSERT/UPDATE/DELETE',
`record_count` INT(11) DEFAULT 0 COMMENT '本次同步记录数',
`success_count` INT(11) DEFAULT 0 COMMENT '成功数量',
`fail_count` INT(11) DEFAULT 0 COMMENT '失败数量',
`sync_status` VARCHAR(20) NOT NULL COMMENT '同步状态:pending-待同步,processing-同步中,success-成功,failed-失败',
`start_time` DATETIME NOT NULL COMMENT '开始时间',
`end_time` DATETIME DEFAULT NULL COMMENT '结束时间',
`use_time` BIGINT(20) DEFAULT NULL COMMENT '耗时(毫秒)',
`error_msg` TEXT DEFAULT NULL COMMENT '错误信息',
`data_snapshot` LONGTEXT DEFAULT NULL COMMENT '数据快照(JSON格式,仅记录失败的数据)',
`retry_count` INT(11) DEFAULT 0 COMMENT '重试次数',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
KEY `idx_tenant_id` (`tenant_id`),
KEY `idx_batch_id` (`batch_id`),
KEY `idx_sync_status` (`sync_status`),
KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据同步日志表';
CREATE TABLE `data_change_log` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '变更ID',
`table_name` VARCHAR(100) NOT NULL COMMENT '表名',
`primary_key` VARCHAR(100) NOT NULL COMMENT '主键值',
`operation_type` VARCHAR(20) NOT NULL COMMENT '操作类型:INSERT/UPDATE/DELETE',
`before_data` LONGTEXT DEFAULT NULL COMMENT '变更前数据(JSON格式)',
`after_data` LONGTEXT DEFAULT NULL COMMENT '变更后数据(JSON格式)',
`change_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '变更时间',
`sync_status` TINYINT(1) DEFAULT 0 COMMENT '同步状态:0-未同步,1-已同步,2-同步失败',
`sync_time` DATETIME DEFAULT NULL COMMENT '同步时间',
`sync_batch_id` VARCHAR(50) DEFAULT NULL COMMENT '同步批次ID',
`operator_id` BIGINT(20) DEFAULT NULL COMMENT '操作人ID',
`operator_name` VARCHAR(50) DEFAULT NULL COMMENT '操作人姓名',
PRIMARY KEY (`id`),
KEY `idx_table_name` (`table_name`),
KEY `idx_sync_status` (`sync_status`),
KEY `idx_change_time` (`change_time`),
KEY `idx_sync_batch_id` (`sync_batch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据变更记录表';
CREATE TABLE `tenant_info` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '租户ID',
`tenant_id` VARCHAR(50) NOT NULL COMMENT '租户唯一标识',
`tenant_name` VARCHAR(100) NOT NULL COMMENT '租户名称',
`tenant_type` VARCHAR(20) DEFAULT 'intranet' COMMENT '租户类型:intranet-内网,cloud-公网',
`access_token` VARCHAR(500) NOT NULL COMMENT '访问令牌',
`secret_key` VARCHAR(100) DEFAULT NULL COMMENT '数据加密密钥',
`ip_whitelist` TEXT DEFAULT NULL COMMENT 'IP白名单,JSON数组格式',
`max_storage` BIGINT(20) DEFAULT NULL COMMENT '最大存储空间(字节)',
`used_storage` BIGINT(20) DEFAULT 0 COMMENT '已用存储空间(字节)',
`status` TINYINT(1) DEFAULT 1 COMMENT '状态:0-禁用,1-启用',
`expire_time` DATETIME DEFAULT NULL COMMENT '过期时间',
`last_sync_time` DATETIME DEFAULT NULL COMMENT '最后同步时间',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_tenant_id` (`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='租户信息表';
/**
* 数据变更拦截器
* 拦截所有Service层的增删改操作,自动记录到data_change_log表
*/
@Aspect
@Component
public class DataChangeCaptureAop {
@Autowired
private DataChangeLogService dataChangeLogService;
/**
* 拦截所有Mapper的insert/update/delete方法
*/
@Around("execution(* com.zskk.pacsonline.modules.*.mapper.*Mapper.insert*(..)) || " +
"execution(* com.zskk.pacsonline.modules.*.mapper.*Mapper.update*(..)) || " +
"execution(* com.zskk.pacsonline.modules.*.mapper.*Mapper.delete*(..))")
public Object captureDataChange(ProceedingJoinPoint jp) throws Throwable {
// 判断操作类型
String methodName = jp.getSignature().getName();
String operationType = getOperationType(methodName);
// 获取表名和实体数据
String tableName = getTableName(jp);
Object entity = getEntity(jp.getArgs());
// 执行原方法
Object result = jp.proceed();
// 记录变更日志(异步)
dataChangeLogService.recordChange(tableName, operationType, entity);
return result;
}
}
-- 以sys_user表为例
DELIMITER $$
CREATE TRIGGER tr_sys_user_insert
AFTER INSERT ON sys_user
FOR EACH ROW
BEGIN
INSERT INTO data_change_log (
table_name, primary_key, operation_type, after_data, change_time
) VALUES (
'sys_user',
NEW.id,
'INSERT',
JSON_OBJECT(
'id', NEW.id,
'username', NEW.username,
'nickname', NEW.nickname
-- 其他字段...
),
NOW()
);
END$$
CREATE TRIGGER tr_sys_user_update
AFTER UPDATE ON sys_user
FOR EACH ROW
BEGIN
INSERT INTO data_change_log (
table_name, primary_key, operation_type,
before_data, after_data, change_time
) VALUES (
'sys_user',
NEW.id,
'UPDATE',
JSON_OBJECT('id', OLD.id, 'username', OLD.username, ...),
JSON_OBJECT('id', NEW.id, 'username', NEW.username, ...),
NOW()
);
END$$
CREATE TRIGGER tr_sys_user_delete
AFTER DELETE ON sys_user
FOR EACH ROW
BEGIN
INSERT INTO data_change_log (
table_name, primary_key, operation_type, before_data, change_time
) VALUES (
'sys_user',
OLD.id,
'DELETE',
JSON_OBJECT('id', OLD.id, 'username', OLD.username, ...),
NOW()
);
END$$
DELIMITER ;
/**
* 数据同步定时任务
*/
@Component
public class DataSyncScheduler {
@Autowired
private SyncConfigService syncConfigService;
@Autowired
private DataSyncService dataSyncService;
/**
* 定时检查并执行同步任务
* 每分钟检查一次
*/
@Scheduled(cron = "0 * * * * ?")
public void checkAndSync() {
// 获取启用的同步配置
List<SyncConfig> configs = syncConfigService.getEnabledConfigs();
for (SyncConfig config : configs) {
// 判断是否需要同步
if (shouldSync(config)) {
// 异步执行同步任务
CompletableFuture.runAsync(() -> {
dataSyncService.syncData(config);
});
}
}
}
/**
* 判断是否需要同步
*/
private boolean shouldSync(SyncConfig config) {
if ("manual".equals(config.getSyncMode())) {
return false; // 手动模式不自动同步
}
if ("auto".equals(config.getSyncMode())) {
// 实时模式:检查是否有未同步的数据
return dataSyncService.hasUnsyncedData();
}
if ("schedule".equals(config.getSyncMode())) {
// 定时模式:判断是否到达同步时间
LocalDateTime lastSyncTime = config.getLastSyncTime();
if (lastSyncTime == null) {
return true;
}
int interval = config.getSyncInterval();
LocalDateTime nextSyncTime = lastSyncTime.plusSeconds(interval);
return LocalDateTime.now().isAfter(nextSyncTime);
}
return false;
}
}
/**
* 数据同步服务
*/
@Service
public class DataSyncServiceImpl implements DataSyncService {
@Autowired
private DataChangeLogMapper dataChangeLogMapper;
@Autowired
private SyncLogService syncLogService;
@Autowired
private RestTemplate restTemplate;
private static final Logger logger = LoggerFactory.getLogger(DataSyncServiceImpl.class);
/**
* 执行数据同步
*/
@Override
@Transactional
public void syncData(SyncConfig config) {
String batchId = UUID.randomUUID().toString();
logger.info("开始同步数据,租户:{},批次:{}", config.getTenantId(), batchId);
try {
// 1. 查询未同步的数据变更记录
List<DataChangeLog> changeLogs = dataChangeLogMapper.selectList(
new LambdaQueryWrapper<DataChangeLog>()
.eq(DataChangeLog::getSyncStatus, 0)
.orderByAsc(DataChangeLog::getChangeTime)
.last("LIMIT 1000") // 每批次最多1000条
);
if (changeLogs.isEmpty()) {
logger.info("没有需要同步的数据");
return;
}
// 2. 按表名分组
Map<String, List<DataChangeLog>> groupedLogs = changeLogs.stream()
.collect(Collectors.groupingBy(DataChangeLog::getTableName));
// 3. 逐表同步
for (Map.Entry<String, List<DataChangeLog>> entry : groupedLogs.entrySet()) {
String tableName = entry.getKey();
List<DataChangeLog> logs = entry.getValue();
syncTable(config, batchId, tableName, logs);
}
// 4. 更新最后同步时间
config.setLastSyncTime(LocalDateTime.now());
syncConfigService.updateById(config);
logger.info("同步完成,批次:{}", batchId);
} catch (Exception e) {
logger.error("同步失败,批次:" + batchId, e);
}
}
/**
* 同步单个表的数据
*/
private void syncTable(SyncConfig config, String batchId,
String tableName, List<DataChangeLog> logs) {
SyncLog syncLog = new SyncLog();
syncLog.setTenantId(config.getTenantId());
syncLog.setBatchId(batchId);
syncLog.setTableName(tableName);
syncLog.setRecordCount(logs.size());
syncLog.setSyncStatus("processing");
syncLog.setStartTime(LocalDateTime.now());
syncLogService.save(syncLog);
int successCount = 0;
int failCount = 0;
List<DataChangeLog> failedLogs = new ArrayList<>();
try {
// 构建同步数据
SyncDataRequest request = new SyncDataRequest();
request.setTenantId(config.getTenantId());
request.setTableName(tableName);
request.setChangeLogs(logs.stream().map(log -> {
SyncDataItem item = new SyncDataItem();
item.setPrimaryKey(log.getPrimaryKey());
item.setOperationType(log.getOperationType());
item.setBeforeData(log.getBeforeData());
item.setAfterData(log.getAfterData());
item.setChangeTime(log.getChangeTime());
return item;
}).collect(Collectors.toList()));
// 发送HTTP请求到公网
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.setBearerAuth(config.getAccessToken());
HttpEntity<SyncDataRequest> entity = new HttpEntity<>(request, headers);
ResponseEntity<RestResult> response = restTemplate.exchange(
config.getCloudApiUrl() + "/api/sync/receiveData",
HttpMethod.POST,
entity,
RestResult.class
);
if (response.getStatusCode() == HttpStatus.OK &&
response.getBody() != null &&
response.getBody().getCode() == 200) {
// 同步成功,更新状态
successCount = logs.size();
for (DataChangeLog log : logs) {
log.setSyncStatus((byte) 1);
log.setSyncTime(LocalDateTime.now());
log.setSyncBatchId(batchId);
dataChangeLogMapper.updateById(log);
}
syncLog.setSyncStatus("success");
} else {
// 同步失败
failCount = logs.size();
failedLogs.addAll(logs);
syncLog.setSyncStatus("failed");
syncLog.setErrorMsg("公网返回错误:" + response.getBody());
}
} catch (Exception e) {
logger.error("同步表 " + tableName + " 失败", e);
failCount = logs.size();
failedLogs.addAll(logs);
syncLog.setSyncStatus("failed");
syncLog.setErrorMsg(e.getMessage());
}
// 更新同步日志
syncLog.setSuccessCount(successCount);
syncLog.setFailCount(failCount);
syncLog.setEndTime(LocalDateTime.now());
syncLog.setUseTime(
Duration.between(syncLog.getStartTime(), syncLog.getEndTime()).toMillis()
);
if (!failedLogs.isEmpty()) {
syncLog.setDataSnapshot(JSON.toJSONString(failedLogs));
}
syncLogService.updateById(syncLog);
}
/**
* 检查是否有未同步的数据
*/
@Override
public boolean hasUnsyncedData() {
Long count = dataChangeLogMapper.selectCount(
new LambdaQueryWrapper<DataChangeLog>()
.eq(DataChangeLog::getSyncStatus, 0)
);
return count > 0;
}
}
/**
* 数据同步接收接口(公网端)
*/
@RestController
@RequestMapping("/api/sync")
public class SyncController {
@Autowired
private TenantInfoService tenantInfoService;
@Autowired
private DataReceiveService dataReceiveService;
private static final Logger logger = LoggerFactory.getLogger(SyncController.class);
/**
* 接收内网同步的数据
*/
@PostMapping("/receiveData")
public RestResult<?> receiveData(@RequestBody SyncDataRequest request,
@RequestHeader("Authorization") String token) {
try {
// 1. 验证租户和令牌
if (token == null || !token.startsWith("Bearer ")) {
return RestResult.error("无效的访问令牌");
}
String accessToken = token.substring(7);
TenantInfo tenant = tenantInfoService.validateToken(request.getTenantId(), accessToken);
if (tenant == null) {
return RestResult.error("租户验证失败");
}
if (tenant.getStatus() == 0) {
return RestResult.error("租户已被禁用");
}
// 2. 检查租户存储空间
if (tenant.getMaxStorage() != null &&
tenant.getUsedStorage() >= tenant.getMaxStorage()) {
return RestResult.error("存储空间已满");
}
// 3. 接收并保存数据
DataReceiveResult result = dataReceiveService.receiveData(tenant, request);
// 4. 更新租户最后同步时间
tenant.setLastSyncTime(LocalDateTime.now());
tenantInfoService.updateById(tenant);
logger.info("接收数据成功,租户:{},表:{},记录数:{}",
request.getTenantId(), request.getTableName(), request.getChangeLogs().size());
return RestResult.ok("同步成功", result);
} catch (Exception e) {
logger.error("接收数据失败", e);
return RestResult.error("同步失败:" + e.getMessage());
}
}
/**
* 查询同步状态
*/
@GetMapping("/status/{tenantId}")
public RestResult<?> getSyncStatus(@PathVariable String tenantId,
@RequestHeader("Authorization") String token) {
// 验证租户
String accessToken = token.substring(7);
TenantInfo tenant = tenantInfoService.validateToken(tenantId, accessToken);
if (tenant == null) {
return RestResult.error("租户验证失败");
}
// 查询同步状态
Map<String, Object> status = new HashMap<>();
status.put("tenantId", tenant.getTenantId());
status.put("tenantName", tenant.getTenantName());
status.put("lastSyncTime", tenant.getLastSyncTime());
status.put("usedStorage", tenant.getUsedStorage());
status.put("maxStorage", tenant.getMaxStorage());
return RestResult.ok(status);
}
}
/**
* 数据接收服务(公网端)
*/
@Service
public class DataReceiveServiceImpl implements DataReceiveService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private TenantInfoService tenantInfoService;
private static final Logger logger = LoggerFactory.getLogger(DataReceiveServiceImpl.class);
/**
* 接收并保存数据
*/
@Override
@Transactional
public DataReceiveResult receiveData(TenantInfo tenant, SyncDataRequest request) {
DataReceiveResult result = new DataReceiveResult();
result.setSuccessCount(0);
result.setFailCount(0);
String tableName = request.getTableName();
// 为多租户数据添加租户前缀,实现数据隔离
String targetTableName = tenant.getTenantId() + "_" + tableName;
// 检查目标表是否存在,不存在则创建
ensureTableExists(targetTableName, tableName);
for (SyncDataItem item : request.getChangeLogs()) {
try {
switch (item.getOperationType()) {
case "INSERT":
insertData(targetTableName, item.getAfterData());
break;
case "UPDATE":
updateData(targetTableName, item.getPrimaryKey(), item.getAfterData());
break;
case "DELETE":
deleteData(targetTableName, item.getPrimaryKey());
break;
}
result.setSuccessCount(result.getSuccessCount() + 1);
} catch (Exception e) {
logger.error("处理数据失败:" + item, e);
result.setFailCount(result.getFailCount() + 1);
result.addFailedItem(item, e.getMessage());
}
}
return result;
}
/**
* 确保目标表存在
*/
private void ensureTableExists(String targetTableName, String sourceTableName) {
// 检查表是否存在
String checkSql = "SELECT COUNT(*) FROM information_schema.tables " +
"WHERE table_schema = DATABASE() AND table_name = ?";
Integer count = jdbcTemplate.queryForObject(checkSql, Integer.class, targetTableName);
if (count == 0) {
// 表不存在,根据源表结构创建
String createSql = "CREATE TABLE " + targetTableName + " LIKE " + sourceTableName;
jdbcTemplate.execute(createSql);
logger.info("创建租户表:{}", targetTableName);
}
}
/**
* 插入数据
*/
private void insertData(String tableName, String jsonData) {
JSONObject data = JSON.parseObject(jsonData);
StringBuilder columns = new StringBuilder();
StringBuilder values = new StringBuilder();
List<Object> params = new ArrayList<>();
for (String key : data.keySet()) {
if (columns.length() > 0) {
columns.append(", ");
values.append(", ");
}
columns.append("`").append(key).append("`");
values.append("?");
params.add(data.get(key));
}
String sql = "INSERT INTO " + tableName + " (" + columns + ") VALUES (" + values + ")";
jdbcTemplate.update(sql, params.toArray());
}
/**
* 更新数据
*/
private void updateData(String tableName, String primaryKey, String jsonData) {
JSONObject data = JSON.parseObject(jsonData);
StringBuilder sets = new StringBuilder();
List<Object> params = new ArrayList<>();
Object pkValue = null;
for (String key : data.keySet()) {
if ("id".equals(key)) {
pkValue = data.get(key);
continue;
}
if (sets.length() > 0) {
sets.append(", ");
}
sets.append("`").append(key).append("` = ?");
params.add(data.get(key));
}
params.add(pkValue);
String sql = "UPDATE " + tableName + " SET " + sets + " WHERE id = ?";
jdbcTemplate.update(sql, params.toArray());
}
/**
* 删除数据
*/
private void deleteData(String tableName, String primaryKey) {
String sql = "DELETE FROM " + tableName + " WHERE id = ?";
jdbcTemplate.update(sql, primaryKey);
}
}
# 应用配置
spring:
application:
name: pacsonline-intranet
profiles:
active: intranet
# 数据同步配置
sync:
# 是否启用数据同步
enabled: true
# 租户ID(唯一标识)
tenant-id: ${TENANT_ID:tenant_001}
# 租户名称
tenant-name: ${TENANT_NAME:内网环境A}
# 公网API地址
cloud-api-url: ${CLOUD_API_URL:https://cloud.example.com}
# 访问令牌
access-token: ${ACCESS_TOKEN:your_access_token_here}
# 数据加密密钥
secret-key: ${SECRET_KEY:your_secret_key_here}
# 同步模式:auto-自动,manual-手动,schedule-定时
sync-mode: auto
# 定时同步间隔(秒)
sync-interval: 300
# 需要同步的表
sync-tables:
- sys_user
- sys_role
- sys_menu
- business_data
# 失败重试次数
retry-times: 3
# 请求超时时间(毫秒)
timeout: 30000
# HTTP客户端配置
http:
client:
connect-timeout: 5000
read-timeout: 30000
max-connections: 200
# 应用配置
spring:
application:
name: pacsonline-cloud
profiles:
active: cloud
# 多租户配置
multi-tenant:
# 是否启用多租户
enabled: true
# 租户表前缀模式
table-prefix-mode: true
# 最大租户数量
max-tenants: 1000
# 数据同步配置
sync:
# 是否启用数据接收
enabled: true
# 最大批次大小
max-batch-size: 1000
# 数据保留天数
data-retention-days: 365
# 1. 创建数据库
mysql -u root -p
CREATE DATABASE pacsonline_cloud CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
# 2. 执行数据库脚本
mysql -u root -p pacsonline_cloud < sync_config.sql
mysql -u root -p pacsonline_cloud < sync_log.sql
mysql -u root -p pacsonline_cloud < tenant_info.sql
# 3. 初始化租户信息
INSERT INTO tenant_info (
tenant_id, tenant_name, tenant_type, access_token, secret_key, status
) VALUES (
'tenant_001',
'内网环境A',
'intranet',
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...', -- JWT令牌
'AES256EncryptionKey123456',
1
);
# 4. 打包部署
mvn clean package -Pcloud
java -jar target/pacsonline-cloud.jar --spring.profiles.active=cloud
# 1. 创建数据库
mysql -u root -p
CREATE DATABASE pacsonline_intranet CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
# 2. 执行数据库脚本(包含业务表和同步相关表)
mysql -u root -p pacsonline_intranet < all_tables.sql
mysql -u root -p pacsonline_intranet < data_change_log.sql
# 3. 配置环境变量
export TENANT_ID=tenant_001
export TENANT_NAME=内网环境A
export CLOUD_API_URL=https://cloud.example.com
export ACCESS_TOKEN=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
# 4. 打包部署
mvn clean package -Pintranet
java -jar target/pacsonline-intranet.jar --spring.profiles.active=intranet
-- 查询同步状态统计
SELECT
DATE(create_time) as sync_date,
sync_status,
COUNT(*) as count,
SUM(record_count) as total_records,
AVG(use_time) as avg_time_ms
FROM sync_log
WHERE create_time >= DATE_SUB(NOW(), INTERVAL 7 DAY)
GROUP BY DATE(create_time), sync_status
ORDER BY sync_date DESC;
-- 查询未同步的数据
SELECT
table_name,
COUNT(*) as unsynced_count,
MIN(change_time) as oldest_change
FROM data_change_log
WHERE sync_status = 0
GROUP BY table_name;
-- 查询失败的同步任务
SELECT *
FROM sync_log
WHERE sync_status = 'failed'
ORDER BY create_time DESC
LIMIT 100;
-- 清理已同步的变更记录(保留30天)
DELETE FROM data_change_log
WHERE sync_status = 1
AND sync_time < DATE_SUB(NOW(), INTERVAL 30 DAY);
-- 清理同步日志(保留90天)
DELETE FROM sync_log
WHERE create_time < DATE_SUB(NOW(), INTERVAL 90 DAY);
server:
port: 8443
ssl:
enabled: true
key-store: classpath:keystore.p12
key-store-password: your_password
key-store-type: PKCS12
key-alias: tomcat
/**
* 数据加密工具
*/
public class DataEncryptUtil {
/**
* AES加密
*/
public static String encrypt(String data, String secretKey) throws Exception {
SecretKeySpec key = new SecretKeySpec(secretKey.getBytes(), "AES");
Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
cipher.init(Cipher.ENCRYPT_MODE, key);
byte[] encrypted = cipher.doFinal(data.getBytes());
return Base64.getEncoder().encodeToString(encrypted);
}
/**
* AES解密
*/
public static String decrypt(String encryptedData, String secretKey) throws Exception {
SecretKeySpec key = new SecretKeySpec(secretKey.getBytes(), "AES");
Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
cipher.init(Cipher.DECRYPT_MODE, key);
byte[] decrypted = cipher.doFinal(Base64.getDecoder().decode(encryptedData));
return new String(decrypted);
}
}
@Component
public class IpWhitelistFilter extends OncePerRequestFilter {
@Autowired
private TenantInfoService tenantInfoService;
@Override
protected void doFilterInternal(HttpServletRequest request,
HttpServletResponse response,
FilterChain filterChain) throws ServletException, IOException {
String requestPath = request.getRequestURI();
// 只对同步接口进行IP验证
if (requestPath.startsWith("/api/sync/")) {
String clientIp = IpUtils.getIpAddr(request);
String tenantId = request.getHeader("X-Tenant-Id");
if (!tenantInfoService.isIpAllowed(tenantId, clientIp)) {
response.setStatus(HttpServletResponse.SC_FORBIDDEN);
response.getWriter().write("IP not in whitelist");
return;
}
}
filterChain.doFilter(request, response);
}
}
A: 实现了失败重试机制和断点续传:
data_change_log表中retry_times配置重试次数sync_interval,避免频繁同步A:
A:
A:
A:
max_storage限制每个租户的存储空间适用场景:数据量大、并发高、需要高可用
# 架构调整
内网端 → RocketMQ → 公网消费者 → 公网数据库
优点:
- 解耦性好,内网发送不需要等待公网响应
- 消息持久化,不会丢失数据
- 支持集群部署,高可用
- 流量削峰
缺点:
- 需要部署MQ服务器
- 增加系统复杂度
- 需要公网MQ服务器可被内网访问
适用场景:不想修改业务代码,纯技术层面解决
内网MySQL → Canal Server → 自定义Client → 公网API
优点:
- 无需修改业务代码
- 实时性高,秒级同步
- 准确捕获所有数据变更
缺点:
- 需要开启MySQL binlog
- 需要部署Canal服务
- 配置复杂度较高
本方案提供了完整的内网到公网数据同步解决方案,具有以下特点:
✅ 灵活部署:支持纯内网、纯公网、内网+公网混合部署 ✅ 多租户支持:公网可同时接收多个内网环境的数据 ✅ 安全可靠:HTTPS + JWT + 数据加密 + IP白名单 ✅ 性能优化:增量同步 + 异步处理 + 批量传输 ✅ 易于监控:完整的同步日志和状态查询 ✅ 容错机制:失败重试 + 断点续传
根据实际需求选择合适的数据捕获方式:
文档版本:v1.0 最后更新:2025-10-30 维护者:开发团队