从 0 到 1 构建企业级视频智能分析平台:完整工程实践
一份可直接落地的全栈技术方案 覆盖相机接入、流媒体编排、存储治理、智能分析全链路 包含架构设计、核心代码、数据模型、运维体系与性能基准
目录
第一部分:平台基础与相机接入
第二部分:录像存储系统
第三部分:智能分析平台
第四部分:运维与演进
第二部分:录像存储系统
9. 录像系统架构设计
9.1 录像链路的核心挑战
问题 1:策略落不下去
现象:
- 数据库中录像计划显示已启用
- MediaMTX 路径配置未生效或参数错误
- 文件系统无录像文件产生
根因:
- 配置下发与数据库操作不在同一事务
- 下发失败未回滚数据库状态
问题 2:存储不可治理
现象:
- 不知道当前写入哪个盘
- 磁盘满后录像中断,无自动切换
- 跨盘迁移时路径混乱
根因:
- 缺少存储卷抽象
- 容量监控与切换策略分离
问题 3:文件可写不可查
现象:
- 物理文件存在于
/recordings目录 - 回放接口查询为空
- 必须通过目录遍历才能找到文件
根因:
- 缺少文件索引
- 录像与检索解耦不彻底
9.2 录像系统分层设计
┌─────────────────────────────────────────┐│ 策略层 (Policy Layer) ││ - 录像计划管理 ││ - 参数归一化 ││ - 调度决策 │└──────────────┬──────────────────────────┘ │┌──────────────┴──────────────────────────┐│ 媒体层 (Media Layer) ││ - MediaMTX 路径配置 ││ - 切片参数控制 ││ - 生命周期管理 │└──────────────┬──────────────────────────┘ │┌──────────────┴──────────────────────────┐│ 存储层 (Storage Layer) ││ - 存储卷管理 ││ - 容量监控 ││ - 写入路径分配 │└──────────────┬──────────────────────────┘ │┌──────────────┴──────────────────────────┐│ 索引层 (Index Layer) ││ - 文件发现扫描 ││ - 元信息提取 ││ - 检索优化 │└─────────────────────────────────────────┘9.3 录像时序图
[用户] → [录像接口] → [计划服务] → [存储卷服务] │ │ │ └→ 查询可写卷 │ └→ 返回 rootPath │ ├→ 构建录像路径模板 ├→ [MediaMTX 客户端] │ │ │ ├→ DELETE /paths/{pathName} │ └→ POST /paths/add/{pathName} │ (record=true, recordPath=...) │ └→ [数据库] 保存录像计划
[定时任务] → [文件索引器] → 扫描 rootPath 目录 │ ├→ 解析文件名 (时间戳提取) ├→ 计算文件大小 └→ [数据库] upsert recording_file10. 存储卷治理与容量管理
10.1 存储卷表设计
DROP TABLE IF EXISTS `storage_volume`;CREATE TABLE `storage_volume` ( `id` BIGINT NOT NULL AUTO_INCREMENT, `volume_code` VARCHAR(64) NOT NULL COMMENT '卷唯一编码', `name` VARCHAR(64) NOT NULL COMMENT '卷名称', `root_path` VARCHAR(255) NOT NULL COMMENT '挂载路径',
-- 状态字段 `status` TINYINT NOT NULL DEFAULT 1 COMMENT '1读写 2只读 0离线', `capacity_total` BIGINT DEFAULT 0 COMMENT '总容量(Byte)', `capacity_used` BIGINT DEFAULT 0 COMMENT '已用容量(Byte)',
-- 水位线配置 `watermark_high` INT DEFAULT 90 COMMENT '高水位(%)', `watermark_low` INT DEFAULT 75 COMMENT '低水位(%)',
-- 优先级 `priority` INT NOT NULL DEFAULT 0 COMMENT '优先级(越大越优先)', `is_full` TINYINT NOT NULL DEFAULT 0 COMMENT '1已满 0未满',
`remark` VARCHAR(255) DEFAULT NULL, `create_time` DATETIME DEFAULT CURRENT_TIMESTAMP, `update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`), UNIQUE KEY `uk_volume_code` (`volume_code`), UNIQUE KEY `uk_root_path` (`root_path`), KEY `idx_status_priority` (`status`, `priority`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='存储卷表';设计要点:
- 双水位线机制
watermark_high:触发切卷阈值 (例如 90%)watermark_low:恢复写入阈值 (例如 75%)- 避免频繁切换导致抖动
- 优先级调度
- 多个可写卷时,优先选择
priority高的 - 支持手动指定高性能存储
- 多个可写卷时,优先选择
- 路径唯一性
uk_root_path:防止重复挂载同一目录
10.2 容量监控服务
@Service@Slf4jpublic class StorageVolumeService {
@Resource private StorageVolumeRepository volumeRepository;
@Scheduled(fixedDelayString = "${recording.storage.refresh-interval-ms:15000}") public void refreshAllVolumes() { List<StorageVolume> volumes = volumeRepository.findAll();
for (StorageVolume volume : volumes) { try { refreshCapacity(volume); } catch (Exception e) { log.error("Failed to refresh volume {}", volume.getId(), e); } } }
@Transactional(rollbackFor = Exception.class) public void refreshCapacity(StorageVolume volume) { File root = new File(volume.getRootPath());
// 路径校验 if (!root.exists() || !root.isDirectory()) { volume.setStatus(0); // 离线 volume.setCapacityTotal(0L); volume.setCapacityUsed(0L); volumeRepository.updateById(volume); return; }
// 容量获取 long total = root.getTotalSpace(); long free = root.getFreeSpace(); long used = total - free;
volume.setCapacityTotal(total); volume.setCapacityUsed(used);
// 使用率计算 int usage = (total == 0) ? 0 : (int) (used * 100 / total);
// 水位线判断 if (usage >= volume.getWatermarkHigh()) { volume.setIsFull(1); volume.setStatus(2); // 只读 log.warn("Volume {} reached high watermark: {}%", volume.getId(), usage);
} else if (usage <= volume.getWatermarkLow()) { volume.setIsFull(0); if (volume.getStatus() != 0) { volume.setStatus(1); // 恢复读写 log.info("Volume {} recovered to writable: {}%", volume.getId(), usage); } }
volumeRepository.updateById(volume); }
/** * 挑选可写卷 */ public Optional<StorageVolume> pickWritableVolume() { return volumeRepository.findByStatusAndIsFullOrderByPriorityDesc(1, 0) .stream() .min(Comparator.comparingLong(StorageVolume::getCapacityUsed)); }}容量刷新频率:
- 推荐:15-30 秒
- 过高:文件系统 I/O 压力
- 过低:水位线切换不及时
10.3 存储卷添加
@Servicepublic class StorageVolumeService {
@Transactional(rollbackFor = Exception.class) public StorageVolume addVolume(AddVolumeRequest request) { // 1. 路径校验 File file = new File(request.getRootPath()); if (!file.exists() || !file.isDirectory()) { throw new BusinessException("路径不存在或不是目录: " + request.getRootPath()); }
// 2. 重复性校验 if (volumeRepository.existsByRootPath(request.getRootPath())) { throw new BusinessException("路径已被其他卷使用: " + request.getRootPath()); }
// 3. 构建卷实体 StorageVolume volume = StorageVolume.builder() .volumeCode(request.getVolumeCode()) .name(request.getName()) .rootPath(normalizeRootPath(request.getRootPath())) .status(1) .watermarkHigh(request.getWatermarkHigh() != null ? request.getWatermarkHigh() : 90) .watermarkLow(request.getWatermarkLow() != null ? request.getWatermarkLow() : 75) .priority(request.getPriority() != null ? request.getPriority() : 1) .build();
// 4. 初始容量刷新 refreshCapacity(volume);
// 5. 保存 volumeRepository.save(volume);
return volume; }
private String normalizeRootPath(String path) { String normalized = path.replace("\\", "/"); while (normalized.endsWith("/")) { normalized = normalized.substring(0, normalized.length() - 1); } return normalized; }}路径规范化的重要性:
避免这类问题:
/recordings/recordings//recordings//导致拼接错误:
/recordings//cam_xxx/2026-02-13/...10.4 自动切卷策略
@Service@Slf4jpublic class RecordingAutoSwitchService {
@Resource private RecordingPlanRepository planRepository;
@Resource private StorageVolumeService volumeService;
@Resource private RecordingPlanService planService;
@Value("${recording.storage.auto-switch-enabled:true}") private boolean autoSwitchEnabled;
@Value("${recording.storage.migrate-batch-size:20}") private int migrateBatchSize;
@Scheduled(fixedDelayString = "${recording.storage.auto-switch-interval-ms:60000}") public void autoSwitch() { if (!autoSwitchEnabled) { return; }
// 1. 查找满盘且有活跃计划的卷 List<RecordingPlan> affectedPlans = planRepository.findByVolumeFullAndStatusRunning();
if (affectedPlans.isEmpty()) { return; }
log.info("Found {} plans on full volumes", affectedPlans.size());
// 2. 挑选目标卷 Optional<StorageVolume> targetVolumeOpt = volumeService.pickWritableVolume();
if (!targetVolumeOpt.isPresent()) { log.error("No writable volume available for migration"); return; }
StorageVolume targetVolume = targetVolumeOpt.get();
// 3. 分批迁移 List<List<RecordingPlan>> batches = partition(affectedPlans, migrateBatchSize);
for (List<RecordingPlan> batch : batches) { migrateBatch(batch, targetVolume.getId());
// 避免瞬时风暴 try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } }
private void migrateBatch(List<RecordingPlan> plans, Long targetVolumeId) { for (RecordingPlan plan : plans) { try { planService.switchVolume(plan.getStreamName(), targetVolumeId); log.info("Migrated plan {} to volume {}", plan.getId(), targetVolumeId); } catch (Exception e) { log.error("Failed to migrate plan {}", plan.getId(), e); } } }
private <T> List<List<T>> partition(List<T> list, int size) { List<List<T>> batches = new ArrayList<>(); for (int i = 0; i < list.size(); i += size) { batches.add(list.subList(i, Math.min(i + size, list.size()))); } return batches; }}为什么需要分批迁移?
一次性迁移 500 个计划:
- MediaMTX API 并发压力
- 数据库连接池耗尽
- 路径重建风暴
分批策略:
- 每批 20 个,批次间延迟 1 秒
- 总耗时可控,系统压力平滑
11. 录像策略编排
11.1 录像计划表设计
DROP TABLE IF EXISTS `recording_plan`;CREATE TABLE `recording_plan` ( `id` BIGINT NOT NULL AUTO_INCREMENT, `camera_id` BIGINT NOT NULL COMMENT '相机ID', `stream_name` VARCHAR(128) NOT NULL COMMENT '媒体路径名', `rtsp_url` VARCHAR(1024) DEFAULT NULL COMMENT '源RTSP(可回填)',
-- 存储配置 `volume_id` BIGINT NOT NULL COMMENT '写入卷ID', `status` TINYINT NOT NULL DEFAULT 0 COMMENT '1运行 0停止', `record_mode` VARCHAR(20) DEFAULT 'auto' COMMENT 'auto/schedule/event', `week_schedule` JSON DEFAULT NULL COMMENT 'schedule模式的周配置',
-- 录像参数 `retention_days` INT NOT NULL DEFAULT 7 COMMENT '保留天数', `segment_duration` VARCHAR(16) NOT NULL DEFAULT '10m' COMMENT '切片时长', `record_format` VARCHAR(16) NOT NULL DEFAULT 'fmp4' COMMENT '录像格式', `record_path_mask` VARCHAR(512) DEFAULT NULL COMMENT '落盘路径模板',
-- 版本控制 `apply_version` BIGINT NOT NULL DEFAULT 0 COMMENT '下发版本号', `last_apply_time` DATETIME DEFAULT NULL COMMENT '最后下发时间',
`create_by` VARCHAR(64) DEFAULT 'system', `create_time` DATETIME DEFAULT CURRENT_TIMESTAMP, `update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`), UNIQUE KEY `uk_stream_name` (`stream_name`), KEY `idx_camera_status` (`camera_id`, `status`), KEY `idx_volume_status` (`volume_id`, `status`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='录像计划表';设计要点:
-
流名唯一性
uk_stream_name:一条流只有一个生效计划- 避免重复录像
-
版本控制
apply_version:每次下发递增- 用于审计和幂等性判断
-
路径模板落库
record_path_mask:实际使用的路径模板- 便于排障和迁移
11.2 录像路径模板
推荐模板:
{rootPath}/%path/%Y-%m-%d/%H-%M-%S路径示例:
/recordings/cam_10_0_1_20_ch1_main/2026-02-13/14-00-00.mp4/recordings/cam_10_0_1_20_ch1_main/2026-02-13/14-10-00.mp4/recordings/cam_10_0_1_20_ch1_main/2026-02-13/14-20-00.mp4模板变量说明:
| 变量 | 含义 | 示例 |
|---|---|---|
%path | 流路径名 | cam_10_0_1_20_ch1_main |
%Y | 年份 | 2026 |
%m | 月份 | 02 |
%d | 日期 | 13 |
%H | 小时 | 14 |
%M | 分钟 | 00 |
%S | 秒 | 00 |
为什么按日期分桶?
- 清理便利:删除过期录像直接
rm -rf 2026-01-01 - 检索优化:时间范围查询可按目录过滤
- 备份友好:按天归档到对象存储
11.3 录像计划应用
@Service@Slf4jpublic class RecordingPlanService {
@Resource private RecordingPlanRepository planRepository;
@Resource private StorageVolumeService volumeService;
@Resource private StreamPathRepository streamPathRepository;
@Resource private MediaGatewayClient mediaGatewayClient;
@Transactional(rollbackFor = Exception.class) public void applyPlan(RecordingConfigRequest request) { // 1. 参数校验 validateRequest(request);
// 2. 解析源地址 String sourceUrl = resolveRtspUrl(request);
// 3. 加载存储卷 StorageVolume volume = volumeService.getById(request.getVolumeId()); if (volume == null || volume.getStatus() != 1) { throw new BusinessException("存储卷不可用: " + request.getVolumeId()); }
// 4. 构建录像路径 String rootPath = normalizeRootPath(volume.getRootPath()); String recordPath = rootPath + "/%path/%Y-%m-%d/%H-%M-%S";
// 5. 下发 MediaMTX 配置 try { mediaGatewayClient.updateRecordingConfig( request.getStreamName(), sourceUrl, request.getEnabled(), recordPath, request.getRetentionDays(), request.getSegmentDuration() ); } catch (Exception e) { log.error("MediaMTX config failed for {}", request.getStreamName(), e); throw new MediaGatewayException("录像配置下发失败", e); }
// 6. 保存/更新计划 RecordingPlan plan = planRepository.findByStreamName(request.getStreamName()) .orElseGet(RecordingPlan::new);
plan.setCameraId(request.getCameraId()); plan.setStreamName(request.getStreamName()); plan.setRtspUrl(sourceUrl); plan.setVolumeId(request.getVolumeId()); plan.setStatus(Boolean.TRUE.equals(request.getEnabled()) ? 1 : 0); plan.setRecordMode("auto"); plan.setRetentionDays(request.getRetentionDays()); plan.setSegmentDuration(request.getSegmentDuration()); plan.setRecordFormat("fmp4"); plan.setRecordPathMask(recordPath);
// 版本递增 long nextVersion = (plan.getApplyVersion() == null ? 0 : plan.getApplyVersion()) + 1; plan.setApplyVersion(nextVersion); plan.setLastApplyTime(LocalDateTime.now());
planRepository.saveOrUpdate(plan);
log.info("Recording plan applied: stream={}, version={}", request.getStreamName(), nextVersion); }
private String resolveRtspUrl(RecordingConfigRequest request) { if (StringUtils.isNotBlank(request.getRtspUrl())) { return request.getRtspUrl(); }
return streamPathRepository.findByPathName(request.getStreamName()) .map(StreamPath::getSourceUrl) .orElseThrow(() -> new BusinessException("未找到流路径: " + request.getStreamName())); }
private void validateRequest(RecordingConfigRequest request) { if (StringUtils.isBlank(request.getStreamName())) { throw new IllegalArgumentException("streamName 不能为空"); } if (request.getVolumeId() == null) { throw new IllegalArgumentException("volumeId 不能为空"); } if (request.getRetentionDays() == null || request.getRetentionDays() <= 0) { throw new IllegalArgumentException("retentionDays 必须为正数"); } }
private String normalizeRootPath(String path) { String normalized = path.replace("\\", "/"); while (normalized.endsWith("/")) { normalized = normalized.substring(0, normalized.length() - 1); } return normalized; }}11.4 MediaMTX 录像配置下发
@Component@Slf4jpublic class MediaGatewayClient {
@Value("${media.mtx.apiBaseUrl}") private String apiBaseUrl;
private final RestTemplate restTemplate;
public void updateRecordingConfig(String pathName, String sourceUrl, boolean recordEnabled, String recordPath, int retentionDays, String segmentDuration) {
Map<String, Object> payload = new HashMap<>(); payload.put("name", pathName); payload.put("source", sourceUrl); payload.put("sourceProtocol", "tcp"); payload.put("sourceOnDemand", false);
if (recordEnabled) { payload.put("record", true); payload.put("recordPath", recordPath); payload.put("recordFormat", "fmp4"); payload.put("recordPartDuration", "1s"); payload.put("recordSegmentDuration", StringUtils.isBlank(segmentDuration) ? "10m" : segmentDuration); payload.put("recordDeleteAfter", retentionDays + "d"); } else { payload.put("record", false); }
HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<Map<String, Object>> request = new HttpEntity<>(payload, headers);
String baseUrl = apiBaseUrl.endsWith("/") ? apiBaseUrl.substring(0, apiBaseUrl.length() - 1) : apiBaseUrl;
try { // Delete existing path try { restTemplate.delete(baseUrl + "/v3/config/paths/delete/" + pathName); } catch (HttpClientErrorException.NotFound e) { // Path doesn't exist, ignore }
// Add new path ResponseEntity<String> response = restTemplate.postForEntity( baseUrl + "/v3/config/paths/add/" + pathName, request, String.class );
if (!response.getStatusCode().is2xxSuccessful()) { throw new MediaGatewayException("MediaMTX返回错误: " + response.getBody()); }
log.info("Recording config applied: {}", pathName);
} catch (Exception e) { log.error("MediaMTX API调用失败: {}", e.getMessage()); throw new MediaGatewayException("录像配置下发失败", e); } }}Delete + Add 的必要性:
- 配置清洁:避免旧参数残留
- 立即生效:不需要等待旧切片结束
- 幂等性:重复执行结果一致
风险缓解:
@Retryable( value = {MediaGatewayException.class}, maxAttempts = 3, backoff = @Backoff(delay = 1000, multiplier = 2))public void updateRecordingConfig(...) { // 实现}11.5 切换存储卷
@Servicepublic class RecordingPlanService {
@Transactional(rollbackFor = Exception.class) public void switchVolume(String streamName, Long newVolumeId) { RecordingPlan plan = planRepository.findByStreamName(streamName) .orElseThrow(() -> new NotFoundException("录像计划不存在: " + streamName));
// 构造切换请求 RecordingConfigRequest request = new RecordingConfigRequest(); request.setCameraId(plan.getCameraId()); request.setStreamName(streamName); request.setRtspUrl(null); // 自动回查 request.setVolumeId(newVolumeId); request.setEnabled(plan.getStatus() == 1); request.setRetentionDays(plan.getRetentionDays()); request.setSegmentDuration(plan.getSegmentDuration());
// 重新应用计划 applyPlan(request);
log.info("Volume switched for {}: {} -> {}", streamName, plan.getVolumeId(), newVolumeId); }
@Transactional(rollbackFor = Exception.class) public void stopRecording(String streamName) { RecordingPlan plan = planRepository.findByStreamName(streamName) .orElseThrow(() -> new NotFoundException("录像计划不存在: " + streamName));
RecordingConfigRequest request = new RecordingConfigRequest(); request.setCameraId(plan.getCameraId()); request.setStreamName(streamName); request.setVolumeId(plan.getVolumeId()); request.setRetentionDays(plan.getRetentionDays()); request.setSegmentDuration(plan.getSegmentDuration()); request.setEnabled(false); applyPlan(request); log.info("Recording stopped for {}", streamName); }}12. 文件索引与回放检索
12.1 录像文件索引表
DROP TABLE IF EXISTS `recording_file`;CREATE TABLE `recording_file` ( `id` BIGINT NOT NULL AUTO_INCREMENT, `camera_id` BIGINT NOT NULL, `stream_name` VARCHAR(128) NOT NULL, `volume_id` BIGINT NOT NULL,
-- 文件信息 `file_path` VARCHAR(1024) NOT NULL COMMENT '完整路径', `file_name` VARCHAR(255) NOT NULL COMMENT '文件名', `file_size` BIGINT DEFAULT 0 COMMENT '文件大小(Byte)', `codec` VARCHAR(16) DEFAULT NULL COMMENT '编码格式',
-- 时间信息 `start_time` DATETIME NOT NULL COMMENT '开始时间', `end_time` DATETIME DEFAULT NULL COMMENT '结束时间', `duration_sec` INT DEFAULT 0 COMMENT '时长(秒)',
-- 状态 `status` TINYINT NOT NULL DEFAULT 2 COMMENT '1录制中 2已完成 3异常',
`created_by` VARCHAR(64) DEFAULT 'indexer', `create_time` DATETIME DEFAULT CURRENT_TIMESTAMP, `update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`), UNIQUE KEY `uk_file_path` (`file_path`), KEY `idx_stream_time` (`stream_name`, `start_time`), KEY `idx_camera_time` (`camera_id`, `start_time`), KEY `idx_status` (`status`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='录像文件索引表';设计要点:
-
文件路径唯一性
uk_file_path:防止重复索引- 支持幂等扫描
-
时间检索优化
idx_stream_time:按流名+时间查询idx_camera_time:按相机+时间统计
-
状态区分
1 录制中:正在写入2 已完成:完整切片3 异常:意外中断
12.2 文件索引器
@Component@Slf4jpublic class RecordingFileIndexer {
private static final Pattern SEGMENT_PATTERN = Pattern.compile("(\\d{2})-(\\d{2})-(\\d{2})\\.(mp4|m4s)$");
@Resource private StorageVolumeRepository volumeRepository;
@Resource private RecordingPlanRepository planRepository;
@Resource private RecordingFileRepository fileRepository;
@Value("${recording.index.scan-depth:4}") private int scanDepth;
@Value("${recording.index.lookback-hours:24}") private int lookbackHours;
@Scheduled(fixedDelayString = "${recording.index.fixed-delay-ms:30000}") public void scanAndUpsert() { List<StorageVolume> volumes = volumeRepository.findByStatus(1);
for (StorageVolume volume : volumes) { try { scanVolume(volume); } catch (Exception e) { log.error("Failed to scan volume {}", volume.getId(), e); } } }
private void scanVolume(StorageVolume volume) { Path rootPath = Paths.get(volume.getRootPath());
if (!Files.exists(rootPath)) { log.warn("Volume root not exists: {}", volume.getRootPath()); return; }
try (Stream<Path> stream = Files.walk(rootPath, scanDepth)) { stream.filter(Files::isRegularFile) .filter(this::isSegmentFile) .filter(this::isRecentFile) .forEach(file -> upsertSegment(volume, file)); } catch (IOException e) { log.error("Failed to walk volume {}", volume.getId(), e); } }
private boolean isSegmentFile(Path path) { String name = path.getFileName().toString().toLowerCase(); return name.endsWith(".mp4") || name.endsWith(".m4s"); }
private boolean isRecentFile(Path path) { try { FileTime modified = Files.getLastModifiedTime(path); LocalDateTime modifiedTime = LocalDateTime.ofInstant( modified.toInstant(), ZoneId.systemDefault() );
LocalDateTime cutoff = LocalDateTime.now().minusHours(lookbackHours); return modifiedTime.isAfter(cutoff);
} catch (IOException e) { return false; } }
private void upsertSegment(StorageVolume volume, Path file) { try { SegmentMeta meta = parseSegmentMeta(volume, file); if (meta == null) { return; }
RecordingFile entity = meta.toEntity(); fileRepository.upsert(entity);
} catch (Exception e) { log.error("Failed to upsert segment: {}", file, e); } }
private SegmentMeta parseSegmentMeta(StorageVolume volume, Path file) { String filePath = file.toString(); String fileName = file.getFileName().toString();
// 解析路径: /recordings/{streamName}/{date}/{time}.mp4 String[] parts = filePath.replace(volume.getRootPath(), "") .split("/");
if (parts.length < 3) { log.warn("Invalid file path structure: {}", filePath); return null; }
String streamName = parts[parts.length - 3]; String dateStr = parts[parts.length - 2]; // 2026-02-13
// 解析时间 Matcher matcher = SEGMENT_PATTERN.matcher(fileName); if (!matcher.find()) { log.warn("Invalid file name pattern: {}", fileName); return null; }
String hour = matcher.group(1); String minute = matcher.group(2); String second = matcher.group(3);
String startTimeStr = dateStr + " " + hour + ":" + minute + ":" + second; LocalDateTime startTime = LocalDateTime.parse(startTimeStr, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
// 查询计划获取相机ID Long cameraId = planRepository.findByStreamName(streamName) .map(RecordingPlan::getCameraId) .orElse(null);
if (cameraId == null) { log.warn("No plan found for stream: {}", streamName); return null; }
// 获取文件大小 long fileSize; try { fileSize = Files.size(file); } catch (IOException e) { fileSize = 0; }
return SegmentMeta.builder() .cameraId(cameraId) .streamName(streamName) .volumeId(volume.getId()) .filePath(filePath) .fileName(fileName) .fileSize(fileSize) .startTime(startTime) .status(2) // 已完成 .build(); }}
@Data@Builderclass SegmentMeta { private Long cameraId; private String streamName; private Long volumeId; private String filePath; private String fileName; private Long fileSize; private LocalDateTime startTime; private Integer status;
public RecordingFile toEntity() { RecordingFile file = new RecordingFile(); file.setCameraId(this.cameraId); file.setStreamName(this.streamName); file.setVolumeId(this.volumeId); file.setFilePath(this.filePath); file.setFileName(this.fileName); file.setFileSize(this.fileSize); file.setStartTime(this.startTime); file.setStatus(this.status); return file; }}索引策略优化:
-
扫描窗口限制
- 只扫描最近 24 小时的文件
- 避免全盘遍历
-
幂等性保证
uk_file_path确保重复扫描不产生重复记录upsert语义:存在则更新,不存在则插入
-
并发控制
- 定时任务单线程执行
- 避免并发扫描导致重复写入
12.3 回放查询服务
@Servicepublic class RecordingPlaybackService {
@Resource private RecordingFileRepository fileRepository;
/** * 按时间范围查询录像片段 */ public List<RecordingFile> queryByTimeRange( String streamName, LocalDateTime start, LocalDateTime end) {
return fileRepository.findByStreamNameAndTimeRange(streamName, start, end); }
/** * 生成 M3U8 播放列表 */ public String generateM3U8(String streamName, LocalDateTime start, LocalDateTime end) {
List<RecordingFile> files = queryByTimeRange(streamName, start, end);
if (files.isEmpty()) { throw new NotFoundException("未找到录像文件"); }
StringBuilder m3u8 = new StringBuilder(); m3u8.append("#EXTM3U\n"); m3u8.append("#EXT-X-VERSION:3\n"); m3u8.append("#EXT-X-TARGETDURATION:600\n"); // 10分钟 m3u8.append("#EXT-X-MEDIA-SEQUENCE:0\n");
for (RecordingFile file : files) { m3u8.append("#EXTINF:").append(file.getDurationSec()).append(",\n"); m3u8.append(buildPlayUrl(file)).append("\n"); }
m3u8.append("#EXT-X-ENDLIST\n");
return m3u8.toString(); }
private String buildPlayUrl(RecordingFile file) { // 返回文件访问地址 // 可以是 HTTP 静态文件服务,也可以是对象存储 URL return "/recordings/" + file.getStreamName() + "/" + file.getFileName(); }}Repository 实现:
@Repositorypublic interface RecordingFileRepository extends BaseMapper<RecordingFile> {
@Select("SELECT * FROM recording_file " + "WHERE stream_name = #{streamName} " + "AND start_time < #{end} " + "AND COALESCE(end_time, DATE_ADD(start_time, INTERVAL duration_sec SECOND)) > #{start} " + "ORDER BY start_time ASC") List<RecordingFile> findByStreamNameAndTimeRange( @Param("streamName") String streamName, @Param("start") LocalDateTime start, @Param("end") LocalDateTime end );
@Insert("INSERT INTO recording_file (...) VALUES (...) " + "ON DUPLICATE KEY UPDATE file_size=VALUES(file_size), status=VALUES(status)") void upsert(RecordingFile file);}查询优化:
时间范围查询的关键:
start_time < end AND end_time > start这个条件能正确处理:
- 完全包含
- 部分重叠
- 边界对齐
评论