Skip to content

从 0 到 1 构建企业级视频智能分析平台:完整工程实践(二)

约 4109 字大约 14 分钟

MediaMTXFFmpegWebSocket

2026-02-13

从 0 到 1 构建企业级视频智能分析平台:完整工程实践

一份可直接落地的全栈技术方案 覆盖相机接入、流媒体编排、存储治理、智能分析全链路 包含架构设计、核心代码、数据模型、运维体系与性能基准


目录

第一部分:平台基础与相机接入

  1. 问题定义与架构设计
  2. 技术选型与核心组件
  3. 数据模型设计
  4. ONVIF 协议深度解析与实现
  5. MediaMTX 流媒体网关集成
  6. FFmpeg 编解码治理
  7. 相机接入完整流程
  8. 设备状态巡检与事件闭环

第二部分:录像存储系统

  1. 录像系统架构设计
  2. 存储卷治理与容量管理
  3. 录像策略编排
  4. 文件索引与回放检索

第三部分:智能分析平台

  1. 算法任务模型设计
  2. 算法节点管理与调度
  3. WebSocket 协议规范
  4. 结果处理与告警推送
  5. 预览链路实现

第四部分:运维与演进

  1. 配置管理
  2. 可观测性体系
  3. 典型问题与解决方案
  4. FAQ 与最佳实践
  5. 性能基准与压测
  6. 架构演进路线

第二部分:录像存储系统

9. 录像系统架构设计

9.1 录像链路的核心挑战

问题 1:策略落不下去

现象:

  • 数据库中录像计划显示已启用
  • MediaMTX 路径配置未生效或参数错误
  • 文件系统无录像文件产生

根因:

  • 配置下发与数据库操作不在同一事务
  • 下发失败未回滚数据库状态

问题 2:存储不可治理

现象:

  • 不知道当前写入哪个盘
  • 磁盘满后录像中断,无自动切换
  • 跨盘迁移时路径混乱

根因:

  • 缺少存储卷抽象
  • 容量监控与切换策略分离

问题 3:文件可写不可查

现象:

  • 物理文件存在于 /recordings 目录
  • 回放接口查询为空
  • 必须通过目录遍历才能找到文件

根因:

  • 缺少文件索引
  • 录像与检索解耦不彻底

9.2 录像系统分层设计

┌─────────────────────────────────────────┐
          策略层 (Policy Layer)           
  - 录像计划管理                          
  - 参数归一化                            
  - 调度决策                              
└──────────────┬──────────────────────────┘
               
┌──────────────┴──────────────────────────┐
         媒体层 (Media Layer)             
  - MediaMTX 路径配置                     
  - 切片参数控制                          
  - 生命周期管理                          
└──────────────┬──────────────────────────┘
               
┌──────────────┴──────────────────────────┐
        存储层 (Storage Layer)            
  - 存储卷管理                            
  - 容量监控                              
  - 写入路径分配                          
└──────────────┬──────────────────────────┘
               
┌──────────────┴──────────────────────────┐
         索引层 (Index Layer)             
  - 文件发现扫描                          
  - 元信息提取                            
  - 检索优化                              
└─────────────────────────────────────────┘

9.3 录像时序图

[用户]  [录像接口]  [计划服务]  [存储卷服务]
                                      
                                      └→ 查询可写卷
                                      └→ 返回 rootPath
                        
                        ├→ 构建录像路径模板
                        ├→ [MediaMTX 客户端]
                              
                              ├→ DELETE /paths/{pathName}
                              └→ POST /paths/add/{pathName}
                                      (record=true, recordPath=...)
                        
                        └→ [数据库] 保存录像计划
                        
[定时任务]  [文件索引器]  扫描 rootPath 目录
                             
                             ├→ 解析文件名 (时间戳提取)
                             ├→ 计算文件大小
                             └→ [数据库] upsert recording_file

10. 存储卷治理与容量管理

10.1 存储卷表设计

DROP TABLE IF EXISTS `storage_volume`;
CREATE TABLE `storage_volume` (
  `id`                BIGINT NOT NULL AUTO_INCREMENT,
  `volume_code`       VARCHAR(64)  NOT NULL COMMENT '卷唯一编码',
  `name`              VARCHAR(64)  NOT NULL COMMENT '卷名称',
  `root_path`         VARCHAR(255) NOT NULL COMMENT '挂载路径',
  
  -- 状态字段
  `status`            TINYINT NOT NULL DEFAULT 1 COMMENT '1读写 2只读 0离线',
  `capacity_total`    BIGINT DEFAULT 0 COMMENT '总容量(Byte)',
  `capacity_used`     BIGINT DEFAULT 0 COMMENT '已用容量(Byte)',
  
  -- 水位线配置
  `watermark_high`    INT DEFAULT 90 COMMENT '高水位(%)',
  `watermark_low`     INT DEFAULT 75 COMMENT '低水位(%)',
  
  -- 优先级
  `priority`          INT NOT NULL DEFAULT 0 COMMENT '优先级(越大越优先)',
  `is_full`           TINYINT NOT NULL DEFAULT 0 COMMENT '1已满 0未满',
  
  `remark`            VARCHAR(255) DEFAULT NULL,
  `create_time`       DATETIME DEFAULT CURRENT_TIMESTAMP,
  `update_time`       DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_volume_code` (`volume_code`),
  UNIQUE KEY `uk_root_path` (`root_path`),
  KEY `idx_status_priority` (`status`, `priority`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='存储卷表';

设计要点:

  1. 双水位线机制
    • watermark_high:触发切卷阈值 (例如 90%)
    • watermark_low:恢复写入阈值 (例如 75%)
    • 避免频繁切换导致抖动
  2. 优先级调度
    • 多个可写卷时,优先选择 priority 高的
    • 支持手动指定高性能存储
  3. 路径唯一性
    • uk_root_path:防止重复挂载同一目录

10.2 容量监控服务

@Service
@Slf4j
public class StorageVolumeService {
    
    @Resource
    private StorageVolumeRepository volumeRepository;
    
    @Scheduled(fixedDelayString = "${recording.storage.refresh-interval-ms:15000}")
    public void refreshAllVolumes() {
        List<StorageVolume> volumes = volumeRepository.findAll();
        
        for (StorageVolume volume : volumes) {
            try {
                refreshCapacity(volume);
            } catch (Exception e) {
                log.error("Failed to refresh volume {}", volume.getId(), e);
            }
        }
    }
    
    @Transactional(rollbackFor = Exception.class)
    public void refreshCapacity(StorageVolume volume) {
        File root = new File(volume.getRootPath());
        
        // 路径校验
        if (!root.exists() || !root.isDirectory()) {
            volume.setStatus(0); // 离线
            volume.setCapacityTotal(0L);
            volume.setCapacityUsed(0L);
            volumeRepository.updateById(volume);
            return;
        }
        
        // 容量获取
        long total = root.getTotalSpace();
        long free = root.getFreeSpace();
        long used = total - free;
        
        volume.setCapacityTotal(total);
        volume.setCapacityUsed(used);
        
        // 使用率计算
        int usage = (total == 0) ? 0 : (int) (used * 100 / total);
        
        // 水位线判断
        if (usage >= volume.getWatermarkHigh()) {
            volume.setIsFull(1);
            volume.setStatus(2); // 只读
            log.warn("Volume {} reached high watermark: {}%", volume.getId(), usage);
            
        } else if (usage <= volume.getWatermarkLow()) {
            volume.setIsFull(0);
            if (volume.getStatus() != 0) {
                volume.setStatus(1); // 恢复读写
                log.info("Volume {} recovered to writable: {}%", volume.getId(), usage);
            }
        }
        
        volumeRepository.updateById(volume);
    }
    
    /**
     * 挑选可写卷
     */
    public Optional<StorageVolume> pickWritableVolume() {
        return volumeRepository.findByStatusAndIsFullOrderByPriorityDesc(1, 0)
            .stream()
            .min(Comparator.comparingLong(StorageVolume::getCapacityUsed));
    }
}

容量刷新频率:

  • 推荐:15-30 秒
  • 过高:文件系统 I/O 压力
  • 过低:水位线切换不及时

10.3 存储卷添加

@Service
public class StorageVolumeService {
    
    @Transactional(rollbackFor = Exception.class)
    public StorageVolume addVolume(AddVolumeRequest request) {
        // 1. 路径校验
        File file = new File(request.getRootPath());
        if (!file.exists() || !file.isDirectory()) {
            throw new BusinessException("路径不存在或不是目录: " + request.getRootPath());
        }
        
        // 2. 重复性校验
        if (volumeRepository.existsByRootPath(request.getRootPath())) {
            throw new BusinessException("路径已被其他卷使用: " + request.getRootPath());
        }
        
        // 3. 构建卷实体
        StorageVolume volume = StorageVolume.builder()
            .volumeCode(request.getVolumeCode())
            .name(request.getName())
            .rootPath(normalizeRootPath(request.getRootPath()))
            .status(1)
            .watermarkHigh(request.getWatermarkHigh() != null ? request.getWatermarkHigh() : 90)
            .watermarkLow(request.getWatermarkLow() != null ? request.getWatermarkLow() : 75)
            .priority(request.getPriority() != null ? request.getPriority() : 1)
            .build();
        
        // 4. 初始容量刷新
        refreshCapacity(volume);
        
        // 5. 保存
        volumeRepository.save(volume);
        
        return volume;
    }
    
    private String normalizeRootPath(String path) {
        String normalized = path.replace("\\", "/");
        while (normalized.endsWith("/")) {
            normalized = normalized.substring(0, normalized.length() - 1);
        }
        return normalized;
    }
}

路径规范化的重要性:

避免这类问题:

/recordings  
/recordings/
/recordings//

导致拼接错误:

/recordings//cam_xxx/2026-02-13/...

10.4 自动切卷策略

@Service
@Slf4j
public class RecordingAutoSwitchService {
    
    @Resource
    private RecordingPlanRepository planRepository;
    
    @Resource
    private StorageVolumeService volumeService;
    
    @Resource
    private RecordingPlanService planService;
    
    @Value("${recording.storage.auto-switch-enabled:true}")
    private boolean autoSwitchEnabled;
    
    @Value("${recording.storage.migrate-batch-size:20}")
    private int migrateBatchSize;
    
    @Scheduled(fixedDelayString = "${recording.storage.auto-switch-interval-ms:60000}")
    public void autoSwitch() {
        if (!autoSwitchEnabled) {
            return;
        }
        
        // 1. 查找满盘且有活跃计划的卷
        List<RecordingPlan> affectedPlans = planRepository.findByVolumeFullAndStatusRunning();
        
        if (affectedPlans.isEmpty()) {
            return;
        }
        
        log.info("Found {} plans on full volumes", affectedPlans.size());
        
        // 2. 挑选目标卷
        Optional<StorageVolume> targetVolumeOpt = volumeService.pickWritableVolume();
        
        if (!targetVolumeOpt.isPresent()) {
            log.error("No writable volume available for migration");
            return;
        }
        
        StorageVolume targetVolume = targetVolumeOpt.get();
        
        // 3. 分批迁移
        List<List<RecordingPlan>> batches = partition(affectedPlans, migrateBatchSize);
        
        for (List<RecordingPlan> batch : batches) {
            migrateBatch(batch, targetVolume.getId());
            
            // 避免瞬时风暴
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
    
    private void migrateBatch(List<RecordingPlan> plans, Long targetVolumeId) {
        for (RecordingPlan plan : plans) {
            try {
                planService.switchVolume(plan.getStreamName(), targetVolumeId);
                log.info("Migrated plan {} to volume {}", plan.getId(), targetVolumeId);
            } catch (Exception e) {
                log.error("Failed to migrate plan {}", plan.getId(), e);
            }
        }
    }
    
    private <T> List<List<T>> partition(List<T> list, int size) {
        List<List<T>> batches = new ArrayList<>();
        for (int i = 0; i < list.size(); i += size) {
            batches.add(list.subList(i, Math.min(i + size, list.size())));
        }
        return batches;
    }
}

为什么需要分批迁移?

一次性迁移 500 个计划:

  • MediaMTX API 并发压力
  • 数据库连接池耗尽
  • 路径重建风暴

分批策略:

  • 每批 20 个,批次间延迟 1 秒
  • 总耗时可控,系统压力平滑

11. 录像策略编排

11.1 录像计划表设计

DROP TABLE IF EXISTS `recording_plan`;
CREATE TABLE `recording_plan` (
  `id`                 BIGINT NOT NULL AUTO_INCREMENT,
  `camera_id`          BIGINT NOT NULL COMMENT '相机ID',
  `stream_name`        VARCHAR(128) NOT NULL COMMENT '媒体路径名',
  `rtsp_url`           VARCHAR(1024) DEFAULT NULL COMMENT '源RTSP(可回填)',
  
  -- 存储配置
  `volume_id`          BIGINT NOT NULL COMMENT '写入卷ID',
  `status`             TINYINT NOT NULL DEFAULT 0 COMMENT '1运行 0停止',
  `record_mode`        VARCHAR(20) DEFAULT 'auto' COMMENT 'auto/schedule/event',
  `week_schedule`      JSON DEFAULT NULL COMMENT 'schedule模式的周配置',
  
  -- 录像参数
  `retention_days`     INT NOT NULL DEFAULT 7 COMMENT '保留天数',
  `segment_duration`   VARCHAR(16) NOT NULL DEFAULT '10m' COMMENT '切片时长',
  `record_format`      VARCHAR(16) NOT NULL DEFAULT 'fmp4' COMMENT '录像格式',
  `record_path_mask`   VARCHAR(512) DEFAULT NULL COMMENT '落盘路径模板',
  
  -- 版本控制
  `apply_version`      BIGINT NOT NULL DEFAULT 0 COMMENT '下发版本号',
  `last_apply_time`    DATETIME DEFAULT NULL COMMENT '最后下发时间',
  
  `create_by`          VARCHAR(64) DEFAULT 'system',
  `create_time`        DATETIME DEFAULT CURRENT_TIMESTAMP,
  `update_time`        DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_stream_name` (`stream_name`),
  KEY `idx_camera_status` (`camera_id`, `status`),
  KEY `idx_volume_status` (`volume_id`, `status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='录像计划表';

设计要点:

  1. 流名唯一性

    • uk_stream_name:一条流只有一个生效计划
    • 避免重复录像
  2. 版本控制

    • apply_version:每次下发递增
    • 用于审计和幂等性判断
  3. 路径模板落库

    • record_path_mask:实际使用的路径模板
    • 便于排障和迁移

11.2 录像路径模板

推荐模板:

{rootPath}/%path/%Y-%m-%d/%H-%M-%S

路径示例:

/recordings/cam_10_0_1_20_ch1_main/2026-02-13/14-00-00.mp4
/recordings/cam_10_0_1_20_ch1_main/2026-02-13/14-10-00.mp4
/recordings/cam_10_0_1_20_ch1_main/2026-02-13/14-20-00.mp4

模板变量说明:

变量含义示例
%path流路径名cam_10_0_1_20_ch1_main
%Y年份2026
%m月份02
%d日期13
%H小时14
%M分钟00
%S00

为什么按日期分桶?

  1. 清理便利:删除过期录像直接 rm -rf 2026-01-01
  2. 检索优化:时间范围查询可按目录过滤
  3. 备份友好:按天归档到对象存储

11.3 录像计划应用

@Service
@Slf4j
public class RecordingPlanService {
    
    @Resource
    private RecordingPlanRepository planRepository;
    
    @Resource
    private StorageVolumeService volumeService;
    
    @Resource
    private StreamPathRepository streamPathRepository;
    
    @Resource
    private MediaGatewayClient mediaGatewayClient;
    
    @Transactional(rollbackFor = Exception.class)
    public void applyPlan(RecordingConfigRequest request) {
        // 1. 参数校验
        validateRequest(request);
        
        // 2. 解析源地址
        String sourceUrl = resolveRtspUrl(request);
        
        // 3. 加载存储卷
        StorageVolume volume = volumeService.getById(request.getVolumeId());
        if (volume == null || volume.getStatus() != 1) {
            throw new BusinessException("存储卷不可用: " + request.getVolumeId());
        }
        
        // 4. 构建录像路径
        String rootPath = normalizeRootPath(volume.getRootPath());
        String recordPath = rootPath + "/%path/%Y-%m-%d/%H-%M-%S";
        
        // 5. 下发 MediaMTX 配置
        try {
            mediaGatewayClient.updateRecordingConfig(
                request.getStreamName(),
                sourceUrl,
                request.getEnabled(),
                recordPath,
                request.getRetentionDays(),
                request.getSegmentDuration()
            );
        } catch (Exception e) {
            log.error("MediaMTX config failed for {}", request.getStreamName(), e);
            throw new MediaGatewayException("录像配置下发失败", e);
        }
        
        // 6. 保存/更新计划
        RecordingPlan plan = planRepository.findByStreamName(request.getStreamName())
            .orElseGet(RecordingPlan::new);
        
        plan.setCameraId(request.getCameraId());
        plan.setStreamName(request.getStreamName());
        plan.setRtspUrl(sourceUrl);
        plan.setVolumeId(request.getVolumeId());
        plan.setStatus(Boolean.TRUE.equals(request.getEnabled()) ? 1 : 0);
        plan.setRecordMode("auto");
        plan.setRetentionDays(request.getRetentionDays());
        plan.setSegmentDuration(request.getSegmentDuration());
        plan.setRecordFormat("fmp4");
        plan.setRecordPathMask(recordPath);
        
        // 版本递增
        long nextVersion = (plan.getApplyVersion() == null ? 0 : plan.getApplyVersion()) + 1;
        plan.setApplyVersion(nextVersion);
        plan.setLastApplyTime(LocalDateTime.now());
        
        planRepository.saveOrUpdate(plan);
        
        log.info("Recording plan applied: stream={}, version={}", 
            request.getStreamName(), nextVersion);
    }
    
    private String resolveRtspUrl(RecordingConfigRequest request) {
        if (StringUtils.isNotBlank(request.getRtspUrl())) {
            return request.getRtspUrl();
        }
        
        return streamPathRepository.findByPathName(request.getStreamName())
            .map(StreamPath::getSourceUrl)
            .orElseThrow(() -> new BusinessException("未找到流路径: " + request.getStreamName()));
    }
    
    private void validateRequest(RecordingConfigRequest request) {
        if (StringUtils.isBlank(request.getStreamName())) {
            throw new IllegalArgumentException("streamName 不能为空");
        }
        if (request.getVolumeId() == null) {
            throw new IllegalArgumentException("volumeId 不能为空");
        }
        if (request.getRetentionDays() == null || request.getRetentionDays() <= 0) {
            throw new IllegalArgumentException("retentionDays 必须为正数");
        }
    }
    
    private String normalizeRootPath(String path) {
        String normalized = path.replace("\\", "/");
        while (normalized.endsWith("/")) {
            normalized = normalized.substring(0, normalized.length() - 1);
        }
        return normalized;
    }
}

11.4 MediaMTX 录像配置下发

@Component
@Slf4j
public class MediaGatewayClient {
    
    @Value("${media.mtx.apiBaseUrl}")
    private String apiBaseUrl;
    
    private final RestTemplate restTemplate;
    
    public void updateRecordingConfig(String pathName,
                                      String sourceUrl,
                                      boolean recordEnabled,
                                      String recordPath,
                                      int retentionDays,
                                      String segmentDuration) {
        
        Map<String, Object> payload = new HashMap<>();
        payload.put("name", pathName);
        payload.put("source", sourceUrl);
        payload.put("sourceProtocol", "tcp");
        payload.put("sourceOnDemand", false);
        
        if (recordEnabled) {
            payload.put("record", true);
            payload.put("recordPath", recordPath);
            payload.put("recordFormat", "fmp4");
            payload.put("recordPartDuration", "1s");
            payload.put("recordSegmentDuration", 
                StringUtils.isBlank(segmentDuration) ? "10m" : segmentDuration);
            payload.put("recordDeleteAfter", retentionDays + "d");
        } else {
            payload.put("record", false);
        }
        
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        
        HttpEntity<Map<String, Object>> request = new HttpEntity<>(payload, headers);
        
        String baseUrl = apiBaseUrl.endsWith("/") 
            ? apiBaseUrl.substring(0, apiBaseUrl.length() - 1) 
            : apiBaseUrl;
        
        try {
            // Delete existing path
            try {
                restTemplate.delete(baseUrl + "/v3/config/paths/delete/" + pathName);
            } catch (HttpClientErrorException.NotFound e) {
                // Path doesn't exist, ignore
            }
            
            // Add new path
            ResponseEntity<String> response = restTemplate.postForEntity(
                baseUrl + "/v3/config/paths/add/" + pathName,
                request,
                String.class
            );
            
            if (!response.getStatusCode().is2xxSuccessful()) {
                throw new MediaGatewayException("MediaMTX返回错误: " + response.getBody());
            }
            
            log.info("Recording config applied: {}", pathName);
            
        } catch (Exception e) {
            log.error("MediaMTX API调用失败: {}", e.getMessage());
            throw new MediaGatewayException("录像配置下发失败", e);
        }
    }
}

Delete + Add 的必要性:

  1. 配置清洁:避免旧参数残留
  2. 立即生效:不需要等待旧切片结束
  3. 幂等性:重复执行结果一致

风险缓解:

@Retryable(
    value = {MediaGatewayException.class},
    maxAttempts = 3,
    backoff = @Backoff(delay = 1000, multiplier = 2)
)
public void updateRecordingConfig(...) {
    // 实现
}

11.5 切换存储卷

@Service
public class RecordingPlanService {
    
    @Transactional(rollbackFor = Exception.class)
    public void switchVolume(String streamName, Long newVolumeId) {
        RecordingPlan plan = planRepository.findByStreamName(streamName)
            .orElseThrow(() -> new NotFoundException("录像计划不存在: " + streamName));
        
        // 构造切换请求
        RecordingConfigRequest request = new RecordingConfigRequest();
        request.setCameraId(plan.getCameraId());
        request.setStreamName(streamName);
        request.setRtspUrl(null); // 自动回查
        request.setVolumeId(newVolumeId);
        request.setEnabled(plan.getStatus() == 1);
        request.setRetentionDays(plan.getRetentionDays());
        request.setSegmentDuration(plan.getSegmentDuration());
        
        // 重新应用计划
        applyPlan(request);
        
        log.info("Volume switched for {}: {} -> {}", 
            streamName, plan.getVolumeId(), newVolumeId);
    }
    
    @Transactional(rollbackFor = Exception.class)
    public void stopRecording(String streamName) {
        RecordingPlan plan = planRepository.findByStreamName(streamName)
            .orElseThrow(() -> new NotFoundException("录像计划不存在: " + streamName));
        
        RecordingConfigRequest request = new RecordingConfigRequest();
		request.setCameraId(plan.getCameraId());
		request.setStreamName(streamName);
		request.setVolumeId(plan.getVolumeId());
		request.setRetentionDays(plan.getRetentionDays());
		request.setSegmentDuration(plan.getSegmentDuration());
		request.setEnabled(false);    applyPlan(request);    
        log.info("Recording stopped for {}", streamName);
	}
}

12. 文件索引与回放检索

12.1 录像文件索引表

DROP TABLE IF EXISTS `recording_file`;
CREATE TABLE `recording_file` (
  `id`               BIGINT NOT NULL AUTO_INCREMENT,
  `camera_id`        BIGINT NOT NULL,
  `stream_name`      VARCHAR(128) NOT NULL,
  `volume_id`        BIGINT NOT NULL,
  
  -- 文件信息
  `file_path`        VARCHAR(1024) NOT NULL COMMENT '完整路径',
  `file_name`        VARCHAR(255) NOT NULL COMMENT '文件名',
  `file_size`        BIGINT DEFAULT 0 COMMENT '文件大小(Byte)',
  `codec`            VARCHAR(16) DEFAULT NULL COMMENT '编码格式',
  
  -- 时间信息
  `start_time`       DATETIME NOT NULL COMMENT '开始时间',
  `end_time`         DATETIME DEFAULT NULL COMMENT '结束时间',
  `duration_sec`     INT DEFAULT 0 COMMENT '时长(秒)',
  
  -- 状态
  `status`           TINYINT NOT NULL DEFAULT 2 COMMENT '1录制中 2已完成 3异常',
  
  `created_by`       VARCHAR(64) DEFAULT 'indexer',
  `create_time`      DATETIME DEFAULT CURRENT_TIMESTAMP,
  `update_time`      DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_file_path` (`file_path`),
  KEY `idx_stream_time` (`stream_name`, `start_time`),
  KEY `idx_camera_time` (`camera_id`, `start_time`),
  KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='录像文件索引表';

设计要点:

  1. 文件路径唯一性

    • uk_file_path:防止重复索引
    • 支持幂等扫描
  2. 时间检索优化

    • idx_stream_time:按流名+时间查询
    • idx_camera_time:按相机+时间统计
  3. 状态区分

    • 1 录制中:正在写入
    • 2 已完成:完整切片
    • 3 异常:意外中断

12.2 文件索引器

@Component
@Slf4j
public class RecordingFileIndexer {
    
    private static final Pattern SEGMENT_PATTERN = 
        Pattern.compile("(\\d{2})-(\\d{2})-(\\d{2})\\.(mp4|m4s)$");
    
    @Resource
    private StorageVolumeRepository volumeRepository;
    
    @Resource
    private RecordingPlanRepository planRepository;
    
    @Resource
    private RecordingFileRepository fileRepository;
    
    @Value("${recording.index.scan-depth:4}")
    private int scanDepth;
    
    @Value("${recording.index.lookback-hours:24}")
    private int lookbackHours;
    
    @Scheduled(fixedDelayString = "${recording.index.fixed-delay-ms:30000}")
    public void scanAndUpsert() {
        List<StorageVolume> volumes = volumeRepository.findByStatus(1);
        
        for (StorageVolume volume : volumes) {
            try {
                scanVolume(volume);
            } catch (Exception e) {
                log.error("Failed to scan volume {}", volume.getId(), e);
            }
        }
    }
    
    private void scanVolume(StorageVolume volume) {
        Path rootPath = Paths.get(volume.getRootPath());
        
        if (!Files.exists(rootPath)) {
            log.warn("Volume root not exists: {}", volume.getRootPath());
            return;
        }
        
        try (Stream<Path> stream = Files.walk(rootPath, scanDepth)) {
            stream.filter(Files::isRegularFile)
                .filter(this::isSegmentFile)
                .filter(this::isRecentFile)
                .forEach(file -> upsertSegment(volume, file));
        } catch (IOException e) {
            log.error("Failed to walk volume {}", volume.getId(), e);
        }
    }
    
    private boolean isSegmentFile(Path path) {
        String name = path.getFileName().toString().toLowerCase();
        return name.endsWith(".mp4") || name.endsWith(".m4s");
    }
    
    private boolean isRecentFile(Path path) {
        try {
            FileTime modified = Files.getLastModifiedTime(path);
            LocalDateTime modifiedTime = LocalDateTime.ofInstant(
                modified.toInstant(),
                ZoneId.systemDefault()
            );
            
            LocalDateTime cutoff = LocalDateTime.now().minusHours(lookbackHours);
            return modifiedTime.isAfter(cutoff);
            
        } catch (IOException e) {
            return false;
        }
    }
    
    private void upsertSegment(StorageVolume volume, Path file) {
        try {
            SegmentMeta meta = parseSegmentMeta(volume, file);
            if (meta == null) {
                return;
            }
            
            RecordingFile entity = meta.toEntity();
            fileRepository.upsert(entity);
            
        } catch (Exception e) {
            log.error("Failed to upsert segment: {}", file, e);
        }
    }
    
    private SegmentMeta parseSegmentMeta(StorageVolume volume, Path file) {
        String filePath = file.toString();
        String fileName = file.getFileName().toString();
        
        // 解析路径: /recordings/{streamName}/{date}/{time}.mp4
        String[] parts = filePath.replace(volume.getRootPath(), "")
            .split("/");
        
        if (parts.length < 3) {
            log.warn("Invalid file path structure: {}", filePath);
            return null;
        }
        
        String streamName = parts[parts.length - 3];
        String dateStr = parts[parts.length - 2]; // 2026-02-13
        
        // 解析时间
        Matcher matcher = SEGMENT_PATTERN.matcher(fileName);
        if (!matcher.find()) {
            log.warn("Invalid file name pattern: {}", fileName);
            return null;
        }
        
        String hour = matcher.group(1);
        String minute = matcher.group(2);
        String second = matcher.group(3);
        
        String startTimeStr = dateStr + " " + hour + ":" + minute + ":" + second;
        LocalDateTime startTime = LocalDateTime.parse(startTimeStr,
            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        
        // 查询计划获取相机ID
        Long cameraId = planRepository.findByStreamName(streamName)
            .map(RecordingPlan::getCameraId)
            .orElse(null);
        
        if (cameraId == null) {
            log.warn("No plan found for stream: {}", streamName);
            return null;
        }
        
        // 获取文件大小
        long fileSize;
        try {
            fileSize = Files.size(file);
        } catch (IOException e) {
            fileSize = 0;
        }
        
        return SegmentMeta.builder()
            .cameraId(cameraId)
            .streamName(streamName)
            .volumeId(volume.getId())
            .filePath(filePath)
            .fileName(fileName)
            .fileSize(fileSize)
            .startTime(startTime)
            .status(2) // 已完成
            .build();
    }
}

@Data
@Builder
class SegmentMeta {
    private Long cameraId;
    private String streamName;
    private Long volumeId;
    private String filePath;
    private String fileName;
    private Long fileSize;
    private LocalDateTime startTime;
    private Integer status;
    
    public RecordingFile toEntity() {
        RecordingFile file = new RecordingFile();
        file.setCameraId(this.cameraId);
        file.setStreamName(this.streamName);
        file.setVolumeId(this.volumeId);
        file.setFilePath(this.filePath);
        file.setFileName(this.fileName);
        file.setFileSize(this.fileSize);
        file.setStartTime(this.startTime);
        file.setStatus(this.status);
        return file;
    }
}

索引策略优化:

  1. 扫描窗口限制

    • 只扫描最近 24 小时的文件
    • 避免全盘遍历
  2. 幂等性保证

    • uk_file_path 确保重复扫描不产生重复记录
    • upsert 语义:存在则更新,不存在则插入
  3. 并发控制

    • 定时任务单线程执行
    • 避免并发扫描导致重复写入

12.3 回放查询服务

@Service
public class RecordingPlaybackService {
    
    @Resource
    private RecordingFileRepository fileRepository;
    
    /**
     * 按时间范围查询录像片段
     */
    public List<RecordingFile> queryByTimeRange(
            String streamName,
            LocalDateTime start,
            LocalDateTime end) {
        
        return fileRepository.findByStreamNameAndTimeRange(streamName, start, end);
    }
    
    /**
     * 生成 M3U8 播放列表
     */
    public String generateM3U8(String streamName, 
                               LocalDateTime start, 
                               LocalDateTime end) {
        
        List<RecordingFile> files = queryByTimeRange(streamName, start, end);
        
        if (files.isEmpty()) {
            throw new NotFoundException("未找到录像文件");
        }
        
        StringBuilder m3u8 = new StringBuilder();
        m3u8.append("#EXTM3U\n");
        m3u8.append("#EXT-X-VERSION:3\n");
        m3u8.append("#EXT-X-TARGETDURATION:600\n"); // 10分钟
        m3u8.append("#EXT-X-MEDIA-SEQUENCE:0\n");
        
        for (RecordingFile file : files) {
            m3u8.append("#EXTINF:").append(file.getDurationSec()).append(",\n");
            m3u8.append(buildPlayUrl(file)).append("\n");
        }
        
        m3u8.append("#EXT-X-ENDLIST\n");
        
        return m3u8.toString();
    }
    
    private String buildPlayUrl(RecordingFile file) {
        // 返回文件访问地址
        // 可以是 HTTP 静态文件服务,也可以是对象存储 URL
        return "/recordings/" + file.getStreamName() + "/" + file.getFileName();
    }
}

Repository 实现:

@Repository
public interface RecordingFileRepository extends BaseMapper<RecordingFile> {
    
    @Select("SELECT * FROM recording_file " +
            "WHERE stream_name = #{streamName} " +
            "AND start_time < #{end} " +
            "AND COALESCE(end_time, DATE_ADD(start_time, INTERVAL duration_sec SECOND)) > #{start} " +
            "ORDER BY start_time ASC")
    List<RecordingFile> findByStreamNameAndTimeRange(
        @Param("streamName") String streamName,
        @Param("start") LocalDateTime start,
        @Param("end") LocalDateTime end
    );
    
    @Insert("INSERT INTO recording_file (...) VALUES (...) " +
            "ON DUPLICATE KEY UPDATE file_size=VALUES(file_size), status=VALUES(status)")
    void upsert(RecordingFile file);
}

查询优化:

时间范围查询的关键:

start_time < end AND end_time > start

这个条件能正确处理:

  • 完全包含
  • 部分重叠
  • 边界对齐

贡献者

  • flycodeuflycodeu

公告板

2025-03-04正式迁移知识库到此项目