从 0 到 1 构建企业级视频智能分析平台:完整工程实践(二)

从 0 到 1 构建企业级视频智能分析平台:完整工程实践
一份可直接落地的全栈技术方案 覆盖相机接入、流媒体编排、存储治理、智能分析全链路 包含架构设计、核心代码、数据模型、运维体系与性能基准
目录
第一部分:平台基础与相机接入
第二部分:录像存储系统
第三部分:智能分析平台
第四部分:运维与演进
第二部分:录像存储系统
9. 录像系统架构设计
9.1 录像链路的核心挑战
问题 1:策略落不下去
现象:
- 数据库中录像计划显示已启用
- MediaMTX 路径配置未生效或参数错误
- 文件系统无录像文件产生
根因:
- 配置下发与数据库操作不在同一事务
- 下发失败未回滚数据库状态
问题 2:存储不可治理
现象:
- 不知道当前写入哪个盘
- 磁盘满后录像中断,无自动切换
- 跨盘迁移时路径混乱
根因:
- 缺少存储卷抽象
- 容量监控与切换策略分离
问题 3:文件可写不可查
现象:
- 物理文件存在于
/recordings目录 - 回放接口查询为空
- 必须通过目录遍历才能找到文件
根因:
- 缺少文件索引
- 录像与检索解耦不彻底
9.2 录像系统分层设计
┌─────────────────────────────────────────┐
│ 策略层 (Policy Layer) │
│ - 录像计划管理 │
│ - 参数归一化 │
│ - 调度决策 │
└──────────────┬──────────────────────────┘
│
┌──────────────┴──────────────────────────┐
│ 媒体层 (Media Layer) │
│ - MediaMTX 路径配置 │
│ - 切片参数控制 │
│ - 生命周期管理 │
└──────────────┬──────────────────────────┘
│
┌──────────────┴──────────────────────────┐
│ 存储层 (Storage Layer) │
│ - 存储卷管理 │
│ - 容量监控 │
│ - 写入路径分配 │
└──────────────┬──────────────────────────┘
│
┌──────────────┴──────────────────────────┐
│ 索引层 (Index Layer) │
│ - 文件发现扫描 │
│ - 元信息提取 │
│ - 检索优化 │
└─────────────────────────────────────────┘9.3 录像时序图
[用户] → [录像接口] → [计划服务] → [存储卷服务]
│ │
│ └→ 查询可写卷
│ └→ 返回 rootPath
│
├→ 构建录像路径模板
├→ [MediaMTX 客户端]
│ │
│ ├→ DELETE /paths/{pathName}
│ └→ POST /paths/add/{pathName}
│ (record=true, recordPath=...)
│
└→ [数据库] 保存录像计划
[定时任务] → [文件索引器] → 扫描 rootPath 目录
│
├→ 解析文件名 (时间戳提取)
├→ 计算文件大小
└→ [数据库] upsert recording_file10. 存储卷治理与容量管理
10.1 存储卷表设计
DROP TABLE IF EXISTS `storage_volume`;
CREATE TABLE `storage_volume` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`volume_code` VARCHAR(64) NOT NULL COMMENT '卷唯一编码',
`name` VARCHAR(64) NOT NULL COMMENT '卷名称',
`root_path` VARCHAR(255) NOT NULL COMMENT '挂载路径',
-- 状态字段
`status` TINYINT NOT NULL DEFAULT 1 COMMENT '1读写 2只读 0离线',
`capacity_total` BIGINT DEFAULT 0 COMMENT '总容量(Byte)',
`capacity_used` BIGINT DEFAULT 0 COMMENT '已用容量(Byte)',
-- 水位线配置
`watermark_high` INT DEFAULT 90 COMMENT '高水位(%)',
`watermark_low` INT DEFAULT 75 COMMENT '低水位(%)',
-- 优先级
`priority` INT NOT NULL DEFAULT 0 COMMENT '优先级(越大越优先)',
`is_full` TINYINT NOT NULL DEFAULT 0 COMMENT '1已满 0未满',
`remark` VARCHAR(255) DEFAULT NULL,
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_volume_code` (`volume_code`),
UNIQUE KEY `uk_root_path` (`root_path`),
KEY `idx_status_priority` (`status`, `priority`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='存储卷表';设计要点:
- 双水位线机制
watermark_high:触发切卷阈值 (例如 90%)watermark_low:恢复写入阈值 (例如 75%)- 避免频繁切换导致抖动
- 优先级调度
- 多个可写卷时,优先选择
priority高的 - 支持手动指定高性能存储
- 多个可写卷时,优先选择
- 路径唯一性
uk_root_path:防止重复挂载同一目录
10.2 容量监控服务
@Service
@Slf4j
public class StorageVolumeService {
@Resource
private StorageVolumeRepository volumeRepository;
@Scheduled(fixedDelayString = "${recording.storage.refresh-interval-ms:15000}")
public void refreshAllVolumes() {
List<StorageVolume> volumes = volumeRepository.findAll();
for (StorageVolume volume : volumes) {
try {
refreshCapacity(volume);
} catch (Exception e) {
log.error("Failed to refresh volume {}", volume.getId(), e);
}
}
}
@Transactional(rollbackFor = Exception.class)
public void refreshCapacity(StorageVolume volume) {
File root = new File(volume.getRootPath());
// 路径校验
if (!root.exists() || !root.isDirectory()) {
volume.setStatus(0); // 离线
volume.setCapacityTotal(0L);
volume.setCapacityUsed(0L);
volumeRepository.updateById(volume);
return;
}
// 容量获取
long total = root.getTotalSpace();
long free = root.getFreeSpace();
long used = total - free;
volume.setCapacityTotal(total);
volume.setCapacityUsed(used);
// 使用率计算
int usage = (total == 0) ? 0 : (int) (used * 100 / total);
// 水位线判断
if (usage >= volume.getWatermarkHigh()) {
volume.setIsFull(1);
volume.setStatus(2); // 只读
log.warn("Volume {} reached high watermark: {}%", volume.getId(), usage);
} else if (usage <= volume.getWatermarkLow()) {
volume.setIsFull(0);
if (volume.getStatus() != 0) {
volume.setStatus(1); // 恢复读写
log.info("Volume {} recovered to writable: {}%", volume.getId(), usage);
}
}
volumeRepository.updateById(volume);
}
/**
* 挑选可写卷
*/
public Optional<StorageVolume> pickWritableVolume() {
return volumeRepository.findByStatusAndIsFullOrderByPriorityDesc(1, 0)
.stream()
.min(Comparator.comparingLong(StorageVolume::getCapacityUsed));
}
}容量刷新频率:
- 推荐:15-30 秒
- 过高:文件系统 I/O 压力
- 过低:水位线切换不及时
10.3 存储卷添加
@Service
public class StorageVolumeService {
@Transactional(rollbackFor = Exception.class)
public StorageVolume addVolume(AddVolumeRequest request) {
// 1. 路径校验
File file = new File(request.getRootPath());
if (!file.exists() || !file.isDirectory()) {
throw new BusinessException("路径不存在或不是目录: " + request.getRootPath());
}
// 2. 重复性校验
if (volumeRepository.existsByRootPath(request.getRootPath())) {
throw new BusinessException("路径已被其他卷使用: " + request.getRootPath());
}
// 3. 构建卷实体
StorageVolume volume = StorageVolume.builder()
.volumeCode(request.getVolumeCode())
.name(request.getName())
.rootPath(normalizeRootPath(request.getRootPath()))
.status(1)
.watermarkHigh(request.getWatermarkHigh() != null ? request.getWatermarkHigh() : 90)
.watermarkLow(request.getWatermarkLow() != null ? request.getWatermarkLow() : 75)
.priority(request.getPriority() != null ? request.getPriority() : 1)
.build();
// 4. 初始容量刷新
refreshCapacity(volume);
// 5. 保存
volumeRepository.save(volume);
return volume;
}
private String normalizeRootPath(String path) {
String normalized = path.replace("\\", "/");
while (normalized.endsWith("/")) {
normalized = normalized.substring(0, normalized.length() - 1);
}
return normalized;
}
}路径规范化的重要性:
避免这类问题:
/recordings
/recordings/
/recordings//导致拼接错误:
/recordings//cam_xxx/2026-02-13/...10.4 自动切卷策略
@Service
@Slf4j
public class RecordingAutoSwitchService {
@Resource
private RecordingPlanRepository planRepository;
@Resource
private StorageVolumeService volumeService;
@Resource
private RecordingPlanService planService;
@Value("${recording.storage.auto-switch-enabled:true}")
private boolean autoSwitchEnabled;
@Value("${recording.storage.migrate-batch-size:20}")
private int migrateBatchSize;
@Scheduled(fixedDelayString = "${recording.storage.auto-switch-interval-ms:60000}")
public void autoSwitch() {
if (!autoSwitchEnabled) {
return;
}
// 1. 查找满盘且有活跃计划的卷
List<RecordingPlan> affectedPlans = planRepository.findByVolumeFullAndStatusRunning();
if (affectedPlans.isEmpty()) {
return;
}
log.info("Found {} plans on full volumes", affectedPlans.size());
// 2. 挑选目标卷
Optional<StorageVolume> targetVolumeOpt = volumeService.pickWritableVolume();
if (!targetVolumeOpt.isPresent()) {
log.error("No writable volume available for migration");
return;
}
StorageVolume targetVolume = targetVolumeOpt.get();
// 3. 分批迁移
List<List<RecordingPlan>> batches = partition(affectedPlans, migrateBatchSize);
for (List<RecordingPlan> batch : batches) {
migrateBatch(batch, targetVolume.getId());
// 避免瞬时风暴
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
private void migrateBatch(List<RecordingPlan> plans, Long targetVolumeId) {
for (RecordingPlan plan : plans) {
try {
planService.switchVolume(plan.getStreamName(), targetVolumeId);
log.info("Migrated plan {} to volume {}", plan.getId(), targetVolumeId);
} catch (Exception e) {
log.error("Failed to migrate plan {}", plan.getId(), e);
}
}
}
private <T> List<List<T>> partition(List<T> list, int size) {
List<List<T>> batches = new ArrayList<>();
for (int i = 0; i < list.size(); i += size) {
batches.add(list.subList(i, Math.min(i + size, list.size())));
}
return batches;
}
}为什么需要分批迁移?
一次性迁移 500 个计划:
- MediaMTX API 并发压力
- 数据库连接池耗尽
- 路径重建风暴
分批策略:
- 每批 20 个,批次间延迟 1 秒
- 总耗时可控,系统压力平滑
11. 录像策略编排
11.1 录像计划表设计
DROP TABLE IF EXISTS `recording_plan`;
CREATE TABLE `recording_plan` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`camera_id` BIGINT NOT NULL COMMENT '相机ID',
`stream_name` VARCHAR(128) NOT NULL COMMENT '媒体路径名',
`rtsp_url` VARCHAR(1024) DEFAULT NULL COMMENT '源RTSP(可回填)',
-- 存储配置
`volume_id` BIGINT NOT NULL COMMENT '写入卷ID',
`status` TINYINT NOT NULL DEFAULT 0 COMMENT '1运行 0停止',
`record_mode` VARCHAR(20) DEFAULT 'auto' COMMENT 'auto/schedule/event',
`week_schedule` JSON DEFAULT NULL COMMENT 'schedule模式的周配置',
-- 录像参数
`retention_days` INT NOT NULL DEFAULT 7 COMMENT '保留天数',
`segment_duration` VARCHAR(16) NOT NULL DEFAULT '10m' COMMENT '切片时长',
`record_format` VARCHAR(16) NOT NULL DEFAULT 'fmp4' COMMENT '录像格式',
`record_path_mask` VARCHAR(512) DEFAULT NULL COMMENT '落盘路径模板',
-- 版本控制
`apply_version` BIGINT NOT NULL DEFAULT 0 COMMENT '下发版本号',
`last_apply_time` DATETIME DEFAULT NULL COMMENT '最后下发时间',
`create_by` VARCHAR(64) DEFAULT 'system',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_stream_name` (`stream_name`),
KEY `idx_camera_status` (`camera_id`, `status`),
KEY `idx_volume_status` (`volume_id`, `status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='录像计划表';设计要点:
流名唯一性
uk_stream_name:一条流只有一个生效计划- 避免重复录像
版本控制
apply_version:每次下发递增- 用于审计和幂等性判断
路径模板落库
record_path_mask:实际使用的路径模板- 便于排障和迁移
11.2 录像路径模板
推荐模板:
{rootPath}/%path/%Y-%m-%d/%H-%M-%S路径示例:
/recordings/cam_10_0_1_20_ch1_main/2026-02-13/14-00-00.mp4
/recordings/cam_10_0_1_20_ch1_main/2026-02-13/14-10-00.mp4
/recordings/cam_10_0_1_20_ch1_main/2026-02-13/14-20-00.mp4模板变量说明:
| 变量 | 含义 | 示例 |
|---|---|---|
%path | 流路径名 | cam_10_0_1_20_ch1_main |
%Y | 年份 | 2026 |
%m | 月份 | 02 |
%d | 日期 | 13 |
%H | 小时 | 14 |
%M | 分钟 | 00 |
%S | 秒 | 00 |
为什么按日期分桶?
- 清理便利:删除过期录像直接
rm -rf 2026-01-01 - 检索优化:时间范围查询可按目录过滤
- 备份友好:按天归档到对象存储
11.3 录像计划应用
@Service
@Slf4j
public class RecordingPlanService {
@Resource
private RecordingPlanRepository planRepository;
@Resource
private StorageVolumeService volumeService;
@Resource
private StreamPathRepository streamPathRepository;
@Resource
private MediaGatewayClient mediaGatewayClient;
@Transactional(rollbackFor = Exception.class)
public void applyPlan(RecordingConfigRequest request) {
// 1. 参数校验
validateRequest(request);
// 2. 解析源地址
String sourceUrl = resolveRtspUrl(request);
// 3. 加载存储卷
StorageVolume volume = volumeService.getById(request.getVolumeId());
if (volume == null || volume.getStatus() != 1) {
throw new BusinessException("存储卷不可用: " + request.getVolumeId());
}
// 4. 构建录像路径
String rootPath = normalizeRootPath(volume.getRootPath());
String recordPath = rootPath + "/%path/%Y-%m-%d/%H-%M-%S";
// 5. 下发 MediaMTX 配置
try {
mediaGatewayClient.updateRecordingConfig(
request.getStreamName(),
sourceUrl,
request.getEnabled(),
recordPath,
request.getRetentionDays(),
request.getSegmentDuration()
);
} catch (Exception e) {
log.error("MediaMTX config failed for {}", request.getStreamName(), e);
throw new MediaGatewayException("录像配置下发失败", e);
}
// 6. 保存/更新计划
RecordingPlan plan = planRepository.findByStreamName(request.getStreamName())
.orElseGet(RecordingPlan::new);
plan.setCameraId(request.getCameraId());
plan.setStreamName(request.getStreamName());
plan.setRtspUrl(sourceUrl);
plan.setVolumeId(request.getVolumeId());
plan.setStatus(Boolean.TRUE.equals(request.getEnabled()) ? 1 : 0);
plan.setRecordMode("auto");
plan.setRetentionDays(request.getRetentionDays());
plan.setSegmentDuration(request.getSegmentDuration());
plan.setRecordFormat("fmp4");
plan.setRecordPathMask(recordPath);
// 版本递增
long nextVersion = (plan.getApplyVersion() == null ? 0 : plan.getApplyVersion()) + 1;
plan.setApplyVersion(nextVersion);
plan.setLastApplyTime(LocalDateTime.now());
planRepository.saveOrUpdate(plan);
log.info("Recording plan applied: stream={}, version={}",
request.getStreamName(), nextVersion);
}
private String resolveRtspUrl(RecordingConfigRequest request) {
if (StringUtils.isNotBlank(request.getRtspUrl())) {
return request.getRtspUrl();
}
return streamPathRepository.findByPathName(request.getStreamName())
.map(StreamPath::getSourceUrl)
.orElseThrow(() -> new BusinessException("未找到流路径: " + request.getStreamName()));
}
private void validateRequest(RecordingConfigRequest request) {
if (StringUtils.isBlank(request.getStreamName())) {
throw new IllegalArgumentException("streamName 不能为空");
}
if (request.getVolumeId() == null) {
throw new IllegalArgumentException("volumeId 不能为空");
}
if (request.getRetentionDays() == null || request.getRetentionDays() <= 0) {
throw new IllegalArgumentException("retentionDays 必须为正数");
}
}
private String normalizeRootPath(String path) {
String normalized = path.replace("\\", "/");
while (normalized.endsWith("/")) {
normalized = normalized.substring(0, normalized.length() - 1);
}
return normalized;
}
}11.4 MediaMTX 录像配置下发
@Component
@Slf4j
public class MediaGatewayClient {
@Value("${media.mtx.apiBaseUrl}")
private String apiBaseUrl;
private final RestTemplate restTemplate;
public void updateRecordingConfig(String pathName,
String sourceUrl,
boolean recordEnabled,
String recordPath,
int retentionDays,
String segmentDuration) {
Map<String, Object> payload = new HashMap<>();
payload.put("name", pathName);
payload.put("source", sourceUrl);
payload.put("sourceProtocol", "tcp");
payload.put("sourceOnDemand", false);
if (recordEnabled) {
payload.put("record", true);
payload.put("recordPath", recordPath);
payload.put("recordFormat", "fmp4");
payload.put("recordPartDuration", "1s");
payload.put("recordSegmentDuration",
StringUtils.isBlank(segmentDuration) ? "10m" : segmentDuration);
payload.put("recordDeleteAfter", retentionDays + "d");
} else {
payload.put("record", false);
}
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<Map<String, Object>> request = new HttpEntity<>(payload, headers);
String baseUrl = apiBaseUrl.endsWith("/")
? apiBaseUrl.substring(0, apiBaseUrl.length() - 1)
: apiBaseUrl;
try {
// Delete existing path
try {
restTemplate.delete(baseUrl + "/v3/config/paths/delete/" + pathName);
} catch (HttpClientErrorException.NotFound e) {
// Path doesn't exist, ignore
}
// Add new path
ResponseEntity<String> response = restTemplate.postForEntity(
baseUrl + "/v3/config/paths/add/" + pathName,
request,
String.class
);
if (!response.getStatusCode().is2xxSuccessful()) {
throw new MediaGatewayException("MediaMTX返回错误: " + response.getBody());
}
log.info("Recording config applied: {}", pathName);
} catch (Exception e) {
log.error("MediaMTX API调用失败: {}", e.getMessage());
throw new MediaGatewayException("录像配置下发失败", e);
}
}
}Delete + Add 的必要性:
- 配置清洁:避免旧参数残留
- 立即生效:不需要等待旧切片结束
- 幂等性:重复执行结果一致
风险缓解:
@Retryable(
value = {MediaGatewayException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2)
)
public void updateRecordingConfig(...) {
// 实现
}11.5 切换存储卷
@Service
public class RecordingPlanService {
@Transactional(rollbackFor = Exception.class)
public void switchVolume(String streamName, Long newVolumeId) {
RecordingPlan plan = planRepository.findByStreamName(streamName)
.orElseThrow(() -> new NotFoundException("录像计划不存在: " + streamName));
// 构造切换请求
RecordingConfigRequest request = new RecordingConfigRequest();
request.setCameraId(plan.getCameraId());
request.setStreamName(streamName);
request.setRtspUrl(null); // 自动回查
request.setVolumeId(newVolumeId);
request.setEnabled(plan.getStatus() == 1);
request.setRetentionDays(plan.getRetentionDays());
request.setSegmentDuration(plan.getSegmentDuration());
// 重新应用计划
applyPlan(request);
log.info("Volume switched for {}: {} -> {}",
streamName, plan.getVolumeId(), newVolumeId);
}
@Transactional(rollbackFor = Exception.class)
public void stopRecording(String streamName) {
RecordingPlan plan = planRepository.findByStreamName(streamName)
.orElseThrow(() -> new NotFoundException("录像计划不存在: " + streamName));
RecordingConfigRequest request = new RecordingConfigRequest();
request.setCameraId(plan.getCameraId());
request.setStreamName(streamName);
request.setVolumeId(plan.getVolumeId());
request.setRetentionDays(plan.getRetentionDays());
request.setSegmentDuration(plan.getSegmentDuration());
request.setEnabled(false); applyPlan(request);
log.info("Recording stopped for {}", streamName);
}
}12. 文件索引与回放检索
12.1 录像文件索引表
DROP TABLE IF EXISTS `recording_file`;
CREATE TABLE `recording_file` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`camera_id` BIGINT NOT NULL,
`stream_name` VARCHAR(128) NOT NULL,
`volume_id` BIGINT NOT NULL,
-- 文件信息
`file_path` VARCHAR(1024) NOT NULL COMMENT '完整路径',
`file_name` VARCHAR(255) NOT NULL COMMENT '文件名',
`file_size` BIGINT DEFAULT 0 COMMENT '文件大小(Byte)',
`codec` VARCHAR(16) DEFAULT NULL COMMENT '编码格式',
-- 时间信息
`start_time` DATETIME NOT NULL COMMENT '开始时间',
`end_time` DATETIME DEFAULT NULL COMMENT '结束时间',
`duration_sec` INT DEFAULT 0 COMMENT '时长(秒)',
-- 状态
`status` TINYINT NOT NULL DEFAULT 2 COMMENT '1录制中 2已完成 3异常',
`created_by` VARCHAR(64) DEFAULT 'indexer',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_file_path` (`file_path`),
KEY `idx_stream_time` (`stream_name`, `start_time`),
KEY `idx_camera_time` (`camera_id`, `start_time`),
KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='录像文件索引表';设计要点:
文件路径唯一性
uk_file_path:防止重复索引- 支持幂等扫描
时间检索优化
idx_stream_time:按流名+时间查询idx_camera_time:按相机+时间统计
状态区分
1 录制中:正在写入2 已完成:完整切片3 异常:意外中断
12.2 文件索引器
@Component
@Slf4j
public class RecordingFileIndexer {
private static final Pattern SEGMENT_PATTERN =
Pattern.compile("(\\d{2})-(\\d{2})-(\\d{2})\\.(mp4|m4s)$");
@Resource
private StorageVolumeRepository volumeRepository;
@Resource
private RecordingPlanRepository planRepository;
@Resource
private RecordingFileRepository fileRepository;
@Value("${recording.index.scan-depth:4}")
private int scanDepth;
@Value("${recording.index.lookback-hours:24}")
private int lookbackHours;
@Scheduled(fixedDelayString = "${recording.index.fixed-delay-ms:30000}")
public void scanAndUpsert() {
List<StorageVolume> volumes = volumeRepository.findByStatus(1);
for (StorageVolume volume : volumes) {
try {
scanVolume(volume);
} catch (Exception e) {
log.error("Failed to scan volume {}", volume.getId(), e);
}
}
}
private void scanVolume(StorageVolume volume) {
Path rootPath = Paths.get(volume.getRootPath());
if (!Files.exists(rootPath)) {
log.warn("Volume root not exists: {}", volume.getRootPath());
return;
}
try (Stream<Path> stream = Files.walk(rootPath, scanDepth)) {
stream.filter(Files::isRegularFile)
.filter(this::isSegmentFile)
.filter(this::isRecentFile)
.forEach(file -> upsertSegment(volume, file));
} catch (IOException e) {
log.error("Failed to walk volume {}", volume.getId(), e);
}
}
private boolean isSegmentFile(Path path) {
String name = path.getFileName().toString().toLowerCase();
return name.endsWith(".mp4") || name.endsWith(".m4s");
}
private boolean isRecentFile(Path path) {
try {
FileTime modified = Files.getLastModifiedTime(path);
LocalDateTime modifiedTime = LocalDateTime.ofInstant(
modified.toInstant(),
ZoneId.systemDefault()
);
LocalDateTime cutoff = LocalDateTime.now().minusHours(lookbackHours);
return modifiedTime.isAfter(cutoff);
} catch (IOException e) {
return false;
}
}
private void upsertSegment(StorageVolume volume, Path file) {
try {
SegmentMeta meta = parseSegmentMeta(volume, file);
if (meta == null) {
return;
}
RecordingFile entity = meta.toEntity();
fileRepository.upsert(entity);
} catch (Exception e) {
log.error("Failed to upsert segment: {}", file, e);
}
}
private SegmentMeta parseSegmentMeta(StorageVolume volume, Path file) {
String filePath = file.toString();
String fileName = file.getFileName().toString();
// 解析路径: /recordings/{streamName}/{date}/{time}.mp4
String[] parts = filePath.replace(volume.getRootPath(), "")
.split("/");
if (parts.length < 3) {
log.warn("Invalid file path structure: {}", filePath);
return null;
}
String streamName = parts[parts.length - 3];
String dateStr = parts[parts.length - 2]; // 2026-02-13
// 解析时间
Matcher matcher = SEGMENT_PATTERN.matcher(fileName);
if (!matcher.find()) {
log.warn("Invalid file name pattern: {}", fileName);
return null;
}
String hour = matcher.group(1);
String minute = matcher.group(2);
String second = matcher.group(3);
String startTimeStr = dateStr + " " + hour + ":" + minute + ":" + second;
LocalDateTime startTime = LocalDateTime.parse(startTimeStr,
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
// 查询计划获取相机ID
Long cameraId = planRepository.findByStreamName(streamName)
.map(RecordingPlan::getCameraId)
.orElse(null);
if (cameraId == null) {
log.warn("No plan found for stream: {}", streamName);
return null;
}
// 获取文件大小
long fileSize;
try {
fileSize = Files.size(file);
} catch (IOException e) {
fileSize = 0;
}
return SegmentMeta.builder()
.cameraId(cameraId)
.streamName(streamName)
.volumeId(volume.getId())
.filePath(filePath)
.fileName(fileName)
.fileSize(fileSize)
.startTime(startTime)
.status(2) // 已完成
.build();
}
}
@Data
@Builder
class SegmentMeta {
private Long cameraId;
private String streamName;
private Long volumeId;
private String filePath;
private String fileName;
private Long fileSize;
private LocalDateTime startTime;
private Integer status;
public RecordingFile toEntity() {
RecordingFile file = new RecordingFile();
file.setCameraId(this.cameraId);
file.setStreamName(this.streamName);
file.setVolumeId(this.volumeId);
file.setFilePath(this.filePath);
file.setFileName(this.fileName);
file.setFileSize(this.fileSize);
file.setStartTime(this.startTime);
file.setStatus(this.status);
return file;
}
}索引策略优化:
扫描窗口限制
- 只扫描最近 24 小时的文件
- 避免全盘遍历
幂等性保证
uk_file_path确保重复扫描不产生重复记录upsert语义:存在则更新,不存在则插入
并发控制
- 定时任务单线程执行
- 避免并发扫描导致重复写入
12.3 回放查询服务
@Service
public class RecordingPlaybackService {
@Resource
private RecordingFileRepository fileRepository;
/**
* 按时间范围查询录像片段
*/
public List<RecordingFile> queryByTimeRange(
String streamName,
LocalDateTime start,
LocalDateTime end) {
return fileRepository.findByStreamNameAndTimeRange(streamName, start, end);
}
/**
* 生成 M3U8 播放列表
*/
public String generateM3U8(String streamName,
LocalDateTime start,
LocalDateTime end) {
List<RecordingFile> files = queryByTimeRange(streamName, start, end);
if (files.isEmpty()) {
throw new NotFoundException("未找到录像文件");
}
StringBuilder m3u8 = new StringBuilder();
m3u8.append("#EXTM3U\n");
m3u8.append("#EXT-X-VERSION:3\n");
m3u8.append("#EXT-X-TARGETDURATION:600\n"); // 10分钟
m3u8.append("#EXT-X-MEDIA-SEQUENCE:0\n");
for (RecordingFile file : files) {
m3u8.append("#EXTINF:").append(file.getDurationSec()).append(",\n");
m3u8.append(buildPlayUrl(file)).append("\n");
}
m3u8.append("#EXT-X-ENDLIST\n");
return m3u8.toString();
}
private String buildPlayUrl(RecordingFile file) {
// 返回文件访问地址
// 可以是 HTTP 静态文件服务,也可以是对象存储 URL
return "/recordings/" + file.getStreamName() + "/" + file.getFileName();
}
}Repository 实现:
@Repository
public interface RecordingFileRepository extends BaseMapper<RecordingFile> {
@Select("SELECT * FROM recording_file " +
"WHERE stream_name = #{streamName} " +
"AND start_time < #{end} " +
"AND COALESCE(end_time, DATE_ADD(start_time, INTERVAL duration_sec SECOND)) > #{start} " +
"ORDER BY start_time ASC")
List<RecordingFile> findByStreamNameAndTimeRange(
@Param("streamName") String streamName,
@Param("start") LocalDateTime start,
@Param("end") LocalDateTime end
);
@Insert("INSERT INTO recording_file (...) VALUES (...) " +
"ON DUPLICATE KEY UPDATE file_size=VALUES(file_size), status=VALUES(status)")
void upsert(RecordingFile file);
}查询优化:
时间范围查询的关键:
start_time < end AND end_time > start这个条件能正确处理:
- 完全包含
- 部分重叠
- 边界对齐
贡献者
flycodeu
版权所有
版权归属:flycodeu
