Skip to content

从 0 到 1 构建企业级视频智能分析平台:完整工程实践(一)

约 9480 字大约 32 分钟

MediaMTXFFmpegWebSocket

2026-02-13

从 0 到 1 构建企业级视频智能分析平台:完整工程实践

一份可直接落地的全栈技术方案 覆盖相机接入、流媒体编排、存储治理、智能分析全链路 包含架构设计、核心代码、数据模型、运维体系与性能基准


目录

第一部分:平台基础与相机接入

  1. 问题定义与架构设计
  2. 技术选型与核心组件
  3. 数据模型设计
  4. ONVIF 协议深度解析与实现
  5. MediaMTX 流媒体网关集成
  6. FFmpeg 编解码治理
  7. 相机接入完整流程
  8. 设备状态巡检与事件闭环

第二部分:录像存储系统

  1. 录像系统架构设计
  2. 存储卷治理与容量管理
  3. 录像策略编排
  4. 文件索引与回放检索

第三部分:智能分析平台

  1. 算法任务模型设计
  2. 算法节点管理与调度
  3. WebSocket 协议规范
  4. 结果处理与告警推送
  5. 预览链路实现

第四部分:运维与演进

  1. 配置管理
  2. 可观测性体系
  3. 典型问题与解决方案
  4. FAQ 与最佳实践
  5. 性能基准与压测
  6. 架构演进路线

第一部分:平台基础与相机接入

1. 问题定义与架构设计

1.1 业务背景与核心挑战

在构建视频智能分析平台的过程中,我们面临着多个维度的工程挑战:

设备层异构性

  • 不同厂商的 ONVIF 协议实现质量参差不齐,部分设备仅支持核心功能子集
  • 编码格式混乱:H.264、H.265、MJPEG 共存,浏览器兼容性差异显著
  • 网络拓扑复杂:设备位于内网,播放端在外网,NAT 穿透成为常态

状态管理复杂性

  • 控制平面可达但媒体平面不可达的"假在线"现象
  • 设备离线后缺少事件追踪,无法形成运维闭环
  • 流地址变化频繁,缺少统一的路径抽象

扩展性瓶颈

  • 录像、算法等下游服务直接依赖设备原始 RTSP,耦合过深
  • 缺少统一的媒体出口,后续功能扩展需要大量重复工作
  • 编码转换策略不清晰,成本与质量难以平衡

1.2 设计目标

基于上述问题,我们设定了分阶段的建设目标:

第一阶段:建立可扩展底座

  1. 统一接入层:设备能力发现、凭据校验、元信息持久化
  2. 统一流模型:自动生成主/子码流路径,建立路径到设备的映射关系
  3. 统一媒体出口:通过 MediaMTX 提供可控、可观测的播放入口
  4. 编码治理:先探测、后决策,按需转码,控制成本
  5. 状态闭环:在线巡检 + 离线事件追踪,形成完整生命周期管理

第二阶段:录像与存储

  1. 录像策略模型化,支持多种录制模式
  2. 存储卷治理,容量可观测、可切换
  3. 文件索引构建,回放检索高效化

第三阶段:智能分析

  1. 一个任务绑定一台相机,支持多算法场景并行
  2. 算法节点能力建模与调度
  3. 实时结果处理与告警分发

1.3 总体架构

我们采用分层解耦的架构设计:

┌─────────────────────────────────────────────────────────┐
                      API Gateway                        
            (RESTful API + WebSocket)                    
└────────────────┬────────────────────────────────────────┘
                 
┌────────────────┴────────────────────────────────────────┐
                  Application Layer                      
  ┌──────────────┬──────────────┬─────────────────────┐ 
   相机接入服务   录像编排服务   算法任务编排服务      
  └──────────────┴──────────────┴─────────────────────┘ 
└────────────────┬────────────────────────────────────────┘
                 
┌────────────────┴────────────────────────────────────────┐
              Infrastructure Layer                       
  ┌──────────────┬──────────────┬─────────────────────┐ 
   ONVIF 适配层  MediaMTX网关   FFmpeg编码引擎       
  └──────────────┴──────────────┴─────────────────────┘ 
  ┌──────────────┬──────────────┬─────────────────────┐ 
     MySQL       存储卷管理     算法节点池            
  └──────────────┴──────────────┴─────────────────────┘ 
└─────────────────────────────────────────────────────────┘

核心设计原则

  1. 控制面与媒体面解耦
    • ONVIF 负责设备能力发现和控制指令
    • MediaMTX 负责媒体流路径管理和分发
    • 两者通过数据库中的路径映射关联
  2. 路径抽象统一化
    • 所有下游服务(录像/算法/播放)只依赖平台分配的路径
    • 设备地址变化时,只需更新路径配置,不影响消费端
  3. 异步化与容错
    • 接入流程中的流同步采用异步执行
    • 媒体注册失败支持延迟重试
    • 关键操作支持事务回滚

2. 技术选型与核心组件

2.1 ONVIF 协议详解

什么是 ONVIF

ONVIF (Open Network Video Interface Forum) 是一个开放的网络视频设备接口标准,旨在实现不同厂商的 IP 摄像机、录像机和视频管理软件之间的互操作性。ONVIF 基于 Web Services 技术栈,核心协议包括:

  • SOAP (Simple Object Access Protocol):消息封装协议
  • WSDL (Web Services Description Language):服务描述语言
  • WS-Security:安全认证机制
  • WS-Discovery:设备发现协议

ONVIF 服务架构

ONVIF 将功能划分为多个服务端点:

  1. Device Service (/onvif/device_service)
    • 设备基础信息 (厂商、型号、序列号、固件版本)
    • 系统时间同步
    • 能力集发现 (Capabilities)
    • 网络配置
  2. Media Service (/onvif/media_service)
    • Profile 管理 (流配置)
    • 获取 RTSP URI
    • 获取快照 URI
    • 视频编码器配置
  3. PTZ Service (/onvif/ptz_service)
    • 云台控制
    • 预置位管理
  4. Events Service (/onvif/events_service)
    • 事件订阅
    • 运动检测通知

为什么选择 ONVIF

  1. 标准化程度高:主流厂商均支持,减少适配成本
  2. 功能完整性:覆盖设备管理到媒体控制的全生命周期
  3. 可扩展性强:通过 Profile 机制支持不同能力级别
  4. 生态成熟:丰富的开源库和工具链

ONVIF 的局限性与应对

  1. 厂商实现差异
    • 问题:部分厂商仅实现 Profile S (Streaming),缺少 PTZ 等扩展能力
    • 应对:设计时采用能力发现机制,降级处理缺失功能
  2. 性能开销
    • 问题:SOAP 协议 XML 解析开销较大
    • 应对:缓存设备信息,减少重复调用;关键路径使用连接池
  3. 时间同步敏感
    • 问题:WS-Security 要求客户端与设备时间误差小于 5 秒
    • 应对:定期校准服务器 NTP;捕获时间窗口错误并重试

2.2 MediaMTX 流媒体网关

什么是 MediaMTX

MediaMTX (前身为 rtsp-simple-server) 是一个现代化的实时流媒体服务器,支持多种协议的统一接入和分发:

  • 输入协议:RTSP、RTMP、HLS、WebRTC、SRT
  • 输出协议:RTSP、HLS、WebRTC
  • 核心特性:
    • 按需拉流 (sourceOnDemand)
    • 按需转码 (runOnDemand)
    • RESTful API 动态配置
    • 低延迟 WebRTC 推流

MediaMTX 的技术优势

  1. 轻量级架构
    • 单一 Go 二进制,资源占用低
    • 无外部依赖,部署简单
  2. 协议转换能力
    • 统一的内部流模型
    • 自动协议适配,降低客户端复杂度
  3. 动态路径管理
    • 支持运行时添加/删除路径
    • 版本化配置接口

MediaMTX 在本架构中的作用

  1. 统一媒体出口
    • 所有 RTSP 源流接入 MediaMTX
    • 对外提供标准化的播放地址
  2. 按需资源调度
    • sourceOnDemand:无客户端时不拉流,节省带宽
    • runOnDemand:触发 FFmpeg 转码进程
  3. 录像与预览集成
    • 原生支持切片录像
    • 实时预览与录像共享同一路径

MediaMTX 路径配置示例

paths:
  cam_10_0_1_20_ch1_main:
    source: rtsp://admin:pass@10.0.1.20:554/Streaming/Channels/101
    sourceOnDemand: yes
    sourceOnDemandStartTimeout: 10s
    sourceOnDemandCloseAfter: 10s
    
    # 录像配置
    record: yes
    recordPath: /recordings/%path/%Y-%m-%d/%H-%M-%S
    recordFormat: fmp4
    recordSegmentDuration: 10m
    recordDeleteAfter: 7d

2.3 FFmpeg 编解码工具链

什么是 FFmpeg

FFmpeg 是一个完整的跨平台音视频处理工具集,包含:

  • ffmpeg:转码工具
  • ffprobe:流分析工具
  • ffplay:播放器
  • libav*:编解码库

核心概念

  1. 容器 (Container)
    • 文件格式,如 MP4、FLV、TS
    • 存储音视频流的封装层
  2. 编解码器 (Codec)
    • 视频:H.264、H.265 (HEVC)、VP9、AV1
    • 音频:AAC、MP3、Opus
  3. 滤镜 (Filter)
    • 视频处理:缩放、裁剪、叠加
    • 音频处理:混音、变速

常用命令详解

1. ffprobe 流探测

ffprobe -v error \
  -rtsp_transport tcp \
  -analyzeduration 1000000 \
  -probesize 32768 \
  -select_streams v:0 \
  -show_entries stream=codec_name,width,height,avg_frame_rate \
  -of json \
  "rtsp://admin:pass@10.0.1.20:554/Streaming/Channels/101"

参数解释:

  • -v error:仅输出错误信息
  • -rtsp_transport tcp:强制使用 TCP (避免 UDP 丢包)
  • -analyzeduration:分析时长 (微秒)
  • -probesize:探测数据大小 (字节)
  • -select_streams v:0:仅选择第一个视频流
  • -show_entries:指定输出字段
  • -of json:输出格式为 JSON

输出示例:

{
  "streams": [{
    "codec_name": "h264",
    "width": 1920,
    "height": 1080,
    "avg_frame_rate": "25/1"
  }]
}

2. H.265 转 H.264

ffmpeg -hide_banner -loglevel warning \
  -rtsp_transport tcp \
  -stimeout 5000000 \
  -i "rtsp://admin:pass@10.0.1.20:554/Streaming/Channels/101" \
  -an \
  -c:v libx264 \
  -preset veryfast \
  -crf 23 \
  -tune zerolatency \
  -pix_fmt yuv420p \
  -g 50 \
  -keyint_min 50 \
  -sc_threshold 0 \
  -x264-params repeat-headers=1 \
  -f rtsp \
  -rtsp_transport tcp \
  "rtsp://127.0.0.1:8554/output"

关键参数说明:

  • -an:禁用音频 (减少处理开销)
  • -c:v libx264:使用 x264 编码器
  • -preset veryfast:编码速度预设 (速度优先)
  • -crf 23:恒定质量因子 (18-28,值越小质量越高)
  • -tune zerolatency:优化低延迟场景
  • -pix_fmt yuv420p:像素格式 (浏览器兼容)
  • -g 50:GOP 大小 (关键帧间隔)
  • -sc_threshold 0:禁用场景切换检测
  • -x264-params repeat-headers=1:每个关键帧重复 SPS/PPS

3. 硬件加速转码 (NVIDIA)

ffmpeg -hwaccel cuda -hwaccel_output_format cuda \
  -i input.mp4 \
  -c:v h264_nvenc \
  -preset p4 \
  -b:v 3000k \
  output.mp4

参数说明:

  • -hwaccel cuda:启用 CUDA 硬件加速
  • -hwaccel_output_format cuda:保持 GPU 内存格式
  • -c:v h264_nvenc:使用 NVENC 编码器
  • -preset p4:性能预设 (p1 最快, p7 最慢)

FFmpeg 在本平台中的应用

  1. 编码探测
    • 接入时探测设备编码格式
    • 决策是否需要转码
  2. 实时转码
    • H.265 -> H.264 兼容性转换
    • 分辨率/码率自适应
  3. 录像导出
    • 按时间范围裁剪
    • 多切片拼接

3. 数据模型设计

数据模型是整个系统的基础,直接决定了后续查询效率、扩展性和运维复杂度。我们遵循以下设计原则:

  1. 职责单一:每张表只负责一类业务实体
  2. 冗余可控:适度冗余提升查询性能,但避免数据不一致
  3. 索引精准:基于实际查询模式建立索引
  4. 字段语义化:使用明确的枚举值,避免魔法数字

3.1 设备主档表

DROP TABLE IF EXISTS `camera_device`;
CREATE TABLE `camera_device` (
  `id`               BIGINT NOT NULL COMMENT '主键ID',
  `camera_name`      VARCHAR(100) DEFAULT NULL COMMENT '设备名称',
  `group_id`         BIGINT DEFAULT NULL COMMENT '分组ID',

  -- ONVIF 连接信息
  `ip`               VARCHAR(50) NOT NULL COMMENT '设备IP',
  `port`             INT NOT NULL DEFAULT 80 COMMENT 'ONVIF端口',
  `username`         VARCHAR(50) NOT NULL COMMENT 'ONVIF用户名',
  `password`         VARCHAR(255) NOT NULL COMMENT 'ONVIF密码(AES加密)',

  -- 设备元信息
  `serial_number`    VARCHAR(100) DEFAULT NULL COMMENT '序列号',
  `manufacturer`     VARCHAR(100) DEFAULT NULL COMMENT '厂商',
  `model`            VARCHAR(100) DEFAULT NULL COMMENT '型号',
  `firmware_version` VARCHAR(100) DEFAULT NULL COMMENT '固件版本',

  -- 状态字段
  `camera_status`    INT DEFAULT 0 COMMENT '1在线 0离线',
  `last_online_time` DATETIME DEFAULT NULL COMMENT '最后在线时间',

  -- 审计字段
  `create_user`      BIGINT DEFAULT NULL,
  `create_dept`      BIGINT DEFAULT NULL,
  `create_time`      DATETIME DEFAULT CURRENT_TIMESTAMP,
  `update_user`      BIGINT DEFAULT NULL,
  `update_time`      DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `status`           INT DEFAULT 1 COMMENT '1启用 0停用',
  `is_deleted`       INT DEFAULT 0 COMMENT '1已删除 0正常',

  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_ip_is_deleted` (`ip`, `is_deleted`),
  KEY `idx_group_id` (`group_id`),
  KEY `idx_camera_status` (`camera_status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='相机设备主档表';

设计要点:

  1. 唯一性约束
    • uk_ip_is_deleted:同一 IP 的有效设备全局唯一
    • 支持软删除后重新接入
  2. 状态索引
    • idx_camera_status:支持在线/离线设备快速筛选
    • 用于首页统计看板
  3. 安全性
    • password 字段应使用 AES 加密存储
    • 日志输出时需脱敏处理

3.2 分组表

DROP TABLE IF EXISTS `camera_group`;
CREATE TABLE `camera_group` (
  `id`          BIGINT NOT NULL AUTO_INCREMENT,
  `group_name`  VARCHAR(100) NOT NULL COMMENT '分组名称',
  `parent_id`   BIGINT DEFAULT NULL COMMENT '父分组ID',
  `group_type`  TINYINT DEFAULT 0 COMMENT '0普通分组 1地理分组 2功能分组',
  `sort_order`  INT DEFAULT 0 COMMENT '排序',
  `icon`        VARCHAR(50) DEFAULT NULL COMMENT '图标',
  `description` VARCHAR(500) DEFAULT NULL COMMENT '描述',

  `create_user` BIGINT DEFAULT NULL,
  `create_dept` BIGINT DEFAULT NULL,
  `create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
  `update_user` BIGINT DEFAULT NULL,
  `update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `status`      INT DEFAULT 1,
  `is_deleted`  TINYINT DEFAULT 0,

  PRIMARY KEY (`id`),
  KEY `idx_parent_id` (`parent_id`),
  KEY `idx_group_type` (`group_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='相机分组表';

设计要点:

  1. 树形结构
    • 支持多级分组
    • 通过 parent_id 构建层级关系
  2. 类型扩展
    • group_type 预留不同分组语义
    • 便于后续按地理位置、功能区域等维度组织

3.3 流路径表

DROP TABLE IF EXISTS `camera_stream_path`;
CREATE TABLE `camera_stream_path` (
  `id`             BIGINT NOT NULL COMMENT '主键ID',
  `camera_id`      BIGINT NOT NULL COMMENT '设备ID',
  `path_name`      VARCHAR(255) NOT NULL COMMENT '媒体路径主键',
  `source_url`     VARCHAR(1000) DEFAULT NULL COMMENT 'RTSP源地址',

  -- ONVIF Profile 信息
  `profile_token`  VARCHAR(64) DEFAULT NULL COMMENT 'ONVIF ProfileToken',
  `profile_name`   VARCHAR(100) DEFAULT NULL COMMENT 'Profile名称',
  `video_encoding` VARCHAR(20) DEFAULT NULL COMMENT '编码格式(原始)',
  `video_codec`    VARCHAR(16) DEFAULT NULL COMMENT '标准化编码(H264/H265/HEVC)',
  `resolution`     VARCHAR(20) DEFAULT NULL COMMENT '分辨率',
  `subtype`        INT NOT NULL DEFAULT 0 COMMENT '0主码流 1子码流',
  `channel`        INT DEFAULT NULL COMMENT '通道号',
  `need_transcode` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否转码',

  -- 状态字段
  `enabled`        TINYINT NOT NULL DEFAULT 1 COMMENT '1启用 0停用',
  `status`         INT NOT NULL DEFAULT 1 COMMENT '1可用 0失败',

  `create_time`    DATETIME DEFAULT CURRENT_TIMESTAMP,
  `update_time`    DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_path_name` (`path_name`),
  KEY `idx_camera_id` (`camera_id`),
  KEY `idx_codec_transcode` (`video_codec`, `need_transcode`),
  KEY `idx_channel_subtype` (`camera_id`, `channel`, `subtype`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='流路径表';

设计要点:

  1. 路径唯一性
    • uk_path_name:全局唯一的媒体入口标识
    • 所有下游服务依赖此路径
  2. 编码决策字段
    • video_codec:标准化编码格式
    • need_transcode:转码决策结果
    • 索引支持"按编码类型统计转码比例"
  3. 流类型约束
    • (camera_id, channel, subtype):保证同设备同通道同类型只有一个生效路径

3.4 离线事件表

DROP TABLE IF EXISTS `camera_offline_event`;
CREATE TABLE `camera_offline_event` (
  `id`              BIGINT NOT NULL AUTO_INCREMENT,
  `device_id`       BIGINT NOT NULL COMMENT '设备ID',
  `device_ip`       VARCHAR(50) DEFAULT NULL COMMENT '设备IP(冗余)',
  `group_id`        BIGINT DEFAULT NULL COMMENT '分组ID(冗余)',

  -- 事件时间线
  `start_time`      DATETIME NOT NULL COMMENT '离线开始时间',
  `last_check_time` DATETIME NOT NULL COMMENT '最后检测时间',
  `end_time`        DATETIME DEFAULT NULL COMMENT '恢复在线时间',
  `duration`        BIGINT DEFAULT 0 COMMENT '离线时长(秒)',

  -- 事件状态
  `is_resolved`     TINYINT DEFAULT 0 COMMENT '1已恢复 0未恢复',
  `alarm_status`    TINYINT DEFAULT 0 COMMENT '1已告警 0未告警',
  `remark`          VARCHAR(255) DEFAULT NULL COMMENT '备注',

  `create_time`     DATETIME DEFAULT CURRENT_TIMESTAMP,
  `update_time`     DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

  PRIMARY KEY (`id`),
  KEY `idx_device_status` (`device_id`, `is_resolved`),
  KEY `idx_time` (`start_time`, `end_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='设备离线事件表';

设计要点:

  1. 事件闭环

    • is_resolved:区分进行中和已结束事件
    • duration:离线时长统计
  2. 冗余字段

    • device_ip/group_id:避免关联查询
    • 用于告警消息快速组装
  3. 时间索引

    • 支持按时间范围查询历史离线记录
    • 用于生成离线率报表

4. ONVIF 协议深度解析与实现

4.1 ONVIF 调用最小可用集

在实际生产环境中,并非所有 ONVIF 接口都需要实现。我们根据业务需求,定义了最小可用集:

接口服务用途调用时机
GetDeviceInformationDevice获取厂商/型号/序列号/固件版本设备接入时
GetCapabilitiesDevice获取各服务端点地址设备接入时
GetProfilesMedia获取流配置列表流同步时
GetStreamUriMedia获取 RTSP 播放地址流同步时
GetSnapshotUriMedia获取快照地址流同步时
GetSystemDateAndTimeDevice获取设备时间在线心跳

为什么是这 6 个接口?

  1. 接入必需:GetDeviceInformationGetCapabilities 是设备身份识别和能力发现的基础
  2. 流管理核心:GetProfilesGetStreamUriGetSnapshotUri 构成完整的媒体流管理
  3. 状态监控:GetSystemDateAndTime 作为心跳接口,开销小且稳定

4.2 设备侧配置要求

很多接入失败的根因不是代码问题,而是设备侧未正确配置。建议在接入前逐项确认:

1. 服务开启

  • ✅ ONVIF 服务已启用
  • ✅ RTSP 服务已启用
  • ❌ 常见错误:只开启了 Web 管理,未开启 ONVIF

2. 账号权限

建议创建专用 ONVIF 账号,而非复用管理员账号
最小权限集:
  - 读取设备信息
  - 读取媒体配置
  - 读取流 URI
不需要:
  - PTZ 控制权限
  - 参数修改权限

3. 时间同步

# NTP 配置示例 (设备端)
NTP Server: ntp.aliyun.com
Time Zone: GMT+8
Sync Interval: 3600s

原因:WS-Security 要求客户端与设备时间误差 < 5 秒

4. 网络配置

防火墙规则:
  允许入站: TCP 80 (ONVIF), TCP 554 (RTSP)
  允许出站: TCP 任意 (用于 RTSP 数据回传)

跨网段访问:
  确保路由可达
  确认 NAT 规则 (若设备在内网)

4.3 WS-Security 认证实现

ONVIF 使用 UsernameToken 认证方式,核心步骤:

1. 生成 Nonce

byte[] nonce = new byte[16];
SecureRandom.getInstanceStrong().nextBytes(nonce);
String nonceBase64 = Base64.getEncoder().encodeToString(nonce);

2. 获取 UTC 时间

String created = ZonedDateTime.now(ZoneOffset.UTC)
    .format(DateTimeFormatter.ISO_INSTANT);
// 示例: 2026-02-13T14:30:00.000Z

3. 计算 PasswordDigest

byte[] digest = MessageDigest.getInstance("SHA-1").digest(
    (nonce + created + password).getBytes(StandardCharsets.UTF_8)
);
String passwordDigest = Base64.getEncoder().encodeToString(digest);

完整 SOAP 请求封装

public class OnvifSoapClient {
    
    public Document sendSoapRequest(String url, String username, String password, String body) 
            throws Exception {
        String envelope = buildSoapEnvelope(username, password, body);
        
        HttpResponse response = HttpRequest.post(url)
            .header("Content-Type", "application/soap+xml; charset=utf-8")
            .timeout(5000)
            .body(envelope)
            .execute();
        
        if (!response.isOk()) {
            throw new OnvifException("HTTP " + response.getStatus());
        }
        
        Document doc = XmlUtil.readXML(response.body());
        validateSoapFault(doc);
        return doc;
    }
    
    private String buildSoapEnvelope(String username, String password, String body) {
        String created = utcNow();
        byte[] nonce = generateNonce();
        String nonceBase64 = Base64.getEncoder().encodeToString(nonce);
        String digest = calculateDigest(nonce, created, password);
        
        return "<?xml version=\"1.0\" encoding=\"utf-8\"?>" +
            "<s:Envelope xmlns:s=\"http://www.w3.org/2003/05/soap-envelope\">" +
            "<s:Header>" +
            "<Security s:mustUnderstand=\"1\" " +
            "xmlns=\"http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd\">" +
            "<UsernameToken>" +
            "<Username>" + username + "</Username>" +
            "<Password Type=\"http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordDigest\">" +
            digest + "</Password>" +
            "<Nonce EncodingType=\"http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary\">" +
            nonceBase64 + "</Nonce>" +
            "<Created xmlns=\"http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd\">" +
            created + "</Created>" +
            "</UsernameToken>" +
            "</Security>" +
            "</s:Header>" +
            "<s:Body>" + body + "</s:Body>" +
            "</s:Envelope>";
    }
    
    private void validateSoapFault(Document doc) throws OnvifFaultException {
        NodeList faults = doc.getElementsByTagNameNS("*", "Fault");
        if (faults.getLength() > 0) {
            Element fault = (Element) faults.item(0);
            String code = getTextContent(fault, "Code", "Value");
            String reason = getTextContent(fault, "Reason", "Text");
            throw new OnvifFaultException(code, reason);
        }
    }
}

为什么必须验证 SOAP Fault?

大量设备会返回 HTTP 200 + SOAP Fault,如果不解析 Fault,会导致:

  1. 接入误成功,产生脏数据
  2. 在线误判,影响运维决策
  3. 错误告警,消耗人工排查成本

常见 SOAP Fault 示例

<s:Envelope>
  <s:Body>
    <s:Fault>
      <s:Code>
        <s:Value>s:Sender</s:Value>
        <s:Subcode>
          <s:Value>ter:InvalidArgVal</s:Value>
        </s:Subcode>
      </s:Code>
      <s:Reason>
        <s:Text xml:lang="en">Invalid ProfileToken</s:Text>
      </s:Reason>
    </s:Fault>
  </s:Body>
</s:Envelope>

4.4 核心接口调用示例

1. GetDeviceInformation

public class OnvifDeviceService {
    
    public DeviceInfo getDeviceInformation(String ip, int port, String user, String pass) 
            throws OnvifException {
        String url = String.format("http://%s:%d/onvif/device_service", ip, port);
        String body = "<tds:GetDeviceInformation xmlns:tds=\"http://www.onvif.org/ver10/device/wsdl\" />";
        
        Document doc = soapClient.sendSoapRequest(url, user, pass, body);
        
        return DeviceInfo.builder()
            .manufacturer(getText(doc, "Manufacturer"))
            .model(getText(doc, "Model"))
            .firmwareVersion(getText(doc, "FirmwareVersion"))
            .serialNumber(getText(doc, "SerialNumber"))
            .hardwareId(getText(doc, "HardwareId"))
            .build();
    }
}

响应示例:

<GetDeviceInformationResponse>
  <Manufacturer>Hikvision</Manufacturer>
  <Model>DS-2CD2142FWD-I</Model>
  <FirmwareVersion>V5.5.0</FirmwareVersion>
  <SerialNumber>DS-2CD2142FWD-I20180101AAWRB12345678</SerialNumber>
  <HardwareId>88</HardwareId>
</GetDeviceInformationResponse>

2. GetCapabilities

public Capabilities getCapabilities(String ip, int port, String user, String pass) 
        throws OnvifException {
    String url = String.format("http://%s:%d/onvif/device_service", ip, port);
    String body = 
        "<tds:GetCapabilities xmlns:tds=\"http://www.onvif.org/ver10/device/wsdl\">" +
        "<tds:Category>Media</tds:Category>" +
        "</tds:GetCapabilities>";
    
    Document doc = soapClient.sendSoapRequest(url, user, pass, body);
    
    String mediaXAddr = getText(doc, "Media", "XAddr");
    if (mediaXAddr == null || mediaXAddr.isEmpty()) {
        // 降级处理:使用 Device Service 地址
        mediaXAddr = url;
    }
    
    return Capabilities.builder()
        .mediaXAddr(mediaXAddr)
        .build();
}

为什么需要 GetCapabilities?

不同设备的服务端点地址可能不同:

  • 标准地址:http://10.0.1.20/onvif/media_service
  • 厂商定制:http://10.0.1.20/onvif/device_service
  • 带端口:http://10.0.1.20:8080/onvif/Media

通过 GetCapabilities 动态获取,避免硬编码。

3. GetProfiles

public List<OnvifProfile> getProfiles(String mediaXAddr, String user, String pass) 
        throws OnvifException {
    String body = "<trt:GetProfiles xmlns:trt=\"http://www.onvif.org/ver10/media/wsdl\" />";
    
    Document doc = soapClient.sendSoapRequest(mediaXAddr, user, pass, body);
    
    List<OnvifProfile> profiles = new ArrayList<>();
    NodeList nodes = doc.getElementsByTagNameNS("*", "Profiles");
    
    for (int i = 0; i < nodes.getLength(); i++) {
        Element elem = (Element) nodes.item(i);
        
        OnvifProfile profile = OnvifProfile.builder()
            .token(elem.getAttribute("token"))
            .name(getText(elem, "Name"))
            .videoSourceToken(getText(elem, "VideoSourceConfiguration", "SourceToken"))
            .videoEncodingToken(getText(elem, "VideoEncoderConfiguration", "token"))
            .encoding(getText(elem, "VideoEncoderConfiguration", "Encoding"))
            .width(getInt(elem, "VideoEncoderConfiguration", "Resolution", "Width"))
            .height(getInt(elem, "VideoEncoderConfiguration", "Resolution", "Height"))
            .build();
        
        profiles.add(profile);
    }
    
    return profiles;
}

Profile 解析要点:

  1. Token 提取
    • token 属性是后续获取流地址的关键
  2. 通道号提取
   //  SourceToken 提取通道号
   // 例如: VideoSource_1 -> 1
   private int extractChannel(String sourceToken) {
       if (sourceToken == null) return 1;
       Matcher m = Pattern.compile("\\d+").matcher(sourceToken);
       return m.find() ? Integer.parseInt(m.group()) : 1;
   }
  1. 主/子码流判断
   private int detectSubtype(int width, int height, String name) {
       if (width >= 1280 || height >= 720) return 0; // 主码流
       if (name != null && name.toLowerCase().contains("sub")) return 1;
       return 1; // 默认子码流
   }

4. GetStreamUri

public String getStreamUri(String mediaXAddr, String user, String pass, String profileToken) 
        throws OnvifException {
    String body = 
        "<trt:GetStreamUri xmlns:trt=\"http://www.onvif.org/ver10/media/wsdl\" " +
        "xmlns:tt=\"http://www.onvif.org/ver10/schema\">" +
        "<trt:StreamSetup>" +
        "<tt:Stream>RTP-Unicast</tt:Stream>" +
        "<tt:Transport><tt:Protocol>RTSP</tt:Protocol></tt:Transport>" +
        "</trt:StreamSetup>" +
        "<trt:ProfileToken>" + profileToken + "</trt:ProfileToken>" +
        "</trt:GetStreamUri>";
    
    Document doc = soapClient.sendSoapRequest(mediaXAddr, user, pass, body);
    
    String uri = getText(doc, "Uri");
    
    // 注入凭据
    return injectCredentials(uri, user, pass);
}

private String injectCredentials(String rtspUrl, String user, String pass) {
    try {
        URI uri = new URI(rtspUrl);
        String userInfo = user + ":" + pass;
        return new URI(
            uri.getScheme(),
            userInfo,
            uri.getHost(),
            uri.getPort(),
            uri.getPath(),
            uri.getQuery(),
            uri.getFragment()
        ).toString();
    } catch (URISyntaxException e) {
        throw new IllegalArgumentException("Invalid RTSP URL: " + rtspUrl, e);
    }
}

为什么要注入凭据?

大部分设备返回的 RTSP URI 不包含认证信息:

rtsp://10.0.1.20:554/Streaming/Channels/101

需要转换为:

rtsp://admin:password@10.0.1.20:554/Streaming/Channels/101

5. GetSystemDateAndTime (心跳)

public void checkOnline(String ip, int port, String user, String pass) throws OnvifException {
    String url = String.format("http://%s:%d/onvif/device_service", ip, port);
    String body = "<tds:GetSystemDateAndTime xmlns:tds=\"http://www.onvif.org/ver10/device/wsdl\" />";
    
    // 超时设置为 3 
    Document doc = soapClient.sendSoapRequest(url, user, pass, body, 3000);
    
    // 能成功调用即表示在线,无需解析返回值
}

为什么选择 GetSystemDateAndTime 作为心跳?

  1. 轻量级:不涉及媒体资源,响应快
  2. 稳定性高:几乎所有设备都实现
  3. 无副作用:只读操作,不会改变设备状态

4.5 厂商差异处理

问题 1: Media XAddr 为空

部分低端设备 GetCapabilities 返回空地址。

解决方案:

String mediaXAddr = capabilities.getMediaXAddr();
if (StringUtils.isBlank(mediaXAddr)) {
    // 降级使用 Device Service 地址
    mediaXAddr = String.format("http://%s:%d/onvif/device_service", ip, port);
    log.warn("Media XAddr empty, fallback to device service for camera {}", cameraId);
}

问题 2: ProfileToken 格式不一致

  • 海康:Profile_1
  • 大华:MediaProfile000
  • 宇视:profile_1_h264

解决方案:

// 不依赖 token 格式,按索引或分辨率选择
OnvifProfile mainStream = profiles.stream()
    .filter(p -> p.getWidth() >= 1280)
    .findFirst()
    .orElse(profiles.get(0));

问题 3: 时间窗口错误

错误示例:

SOAP Fault: The security token could not be authenticated or authorized

原因:服务器时间与设备时间误差 > 5 秒

解决方案:

@Scheduled(cron = "0 */30 * * * ?") //  30 分钟执行
public void syncServerTime() {
    try {
        ProcessBuilder pb = new ProcessBuilder("ntpdate", "ntp.aliyun.com");
        Process process = pb.start();
        process.waitFor(10, TimeUnit.SECONDS);
    } catch (Exception e) {
        log.error("NTP sync failed", e);
    }
}

4.6 ONVIF 调用性能优化

1. 连接池复用

@Configuration
public class OnvifHttpConfig {
    
    @Bean
    public CloseableHttpClient httpClient() {
        PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
        cm.setMaxTotal(200);
        cm.setDefaultMaxPerRoute(50);
        
        return HttpClients.custom()
            .setConnectionManager(cm)
            .setDefaultRequestConfig(
                RequestConfig.custom()
                    .setConnectTimeout(3000)
                    .setSocketTimeout(5000)
                    .build()
            )
            .build();
    }
}

2. 设备信息缓存

@Cacheable(value = "device-info", key = "#cameraId", unless = "#result == null")
public DeviceInfo getDeviceInfo(Long cameraId) {
    CameraDevice device = cameraRepository.findById(cameraId)
        .orElseThrow(() -> new NotFoundException("Camera not found"));
    
    return onvifClient.getDeviceInformation(
        device.getIp(),
        device.getPort(),
        device.getUsername(),
        device.getPassword()
    );
}

缓存策略:

  • TTL: 24 小时
  • 失效条件:设备固件升级或手动清除

3. 并发控制

private final Semaphore onvifSemaphore = new Semaphore(50);

public <T> T callOnvif(Supplier<T> supplier) throws Exception {
    onvifSemaphore.acquire();
    try {
        return supplier.get();
    } finally {
        onvifSemaphore.release();
    }
}

原因:避免同时调用大量设备导致网络拥塞。


5. MediaMTX 流媒体网关集成

5.1 MediaMTX 核心概念

Path (路径)

Path 是 MediaMTX 中的核心抽象,代表一个独立的媒体流通道。每个 Path 包含:

  • Source:流的来源 (RTSP URL、推流地址、publisher 等)
  • Readers:消费该流的客户端连接
  • State:路径状态 (idle、ready、publishing)

按需拉流 (Source On Demand)

paths:
  cam_*:
    source: rtsp://admin:pass@10.0.1.20:554/Streaming/Channels/101
    sourceOnDemand: yes
    sourceOnDemandStartTimeout: 10s
    sourceOnDemandCloseAfter: 10s

工作流程:

  1. 初始状态:Path 存在但未拉流
  2. 客户端连接:触发 MediaMTX 拉取源流
  3. 客户端断开:等待 closeAfter 时长后释放资源

优势:

  • 节省带宽 (无观看时不拉流)
  • 降低服务器负载
  • 支持大规模路径管理

按需转码 (Run On Demand)

paths:
  cam_h265_*:
    source: publisher
    runOnDemand: ffmpeg -i $SOURCE_URL -c:v libx264 -f rtsp rtsp://127.0.0.1:$RTSP_PORT/$MTX_PATH
    runOnDemandRestart: yes

工作流程:

  1. 客户端连接触发命令执行
  2. FFmpeg 进程启动并推流到 MediaMTX
  3. 客户端断开后根据配置决定是否重启

环境变量:

  • $SOURCE_URL:可用于引用上游地址
  • $RTSP_PORT:MediaMTX RTSP 端口
  • $MTX_PATH:当前路径名

5.2 RESTful API 集成

MediaMTX 提供完整的 API 接口用于动态管理:

API 端点:

  • GET /v3/config/global/get:获取全局配置
  • GET /v3/config/pathdefaults/get:获取路径默认配置
  • POST /v3/config/paths/add/{name}:添加路径
  • GET /v3/config/paths/get/{name}:获取路径配置
  • POST /v3/config/paths/patch/{name}:更新路径配置
  • POST /v3/config/paths/delete/{name}:删除路径
  • GET /v3/config/paths/list:列出所有路径

路径配置结构:

{
  "name": "cam_10_0_1_20_ch1_main",
  "source": "rtsp://admin:pass@10.0.1.20:554/Streaming/Channels/101",
  "sourceProtocol": "tcp",
  "sourceOnDemand": true,
  "sourceOnDemandStartTimeout": "10s",
  "sourceOnDemandCloseAfter": "10s",
  
  "record": false,
  "recordPath": "",
  "recordFormat": "fmp4",
  "recordSegmentDuration": "10m",
  "recordDeleteAfter": "7d"
}

5.3 路径命名规范

命名模板:

cam_{ip}_{channel}_{subtype}

示例:

  • cam_10_0_1_20_ch1_main:10.0.1.20 设备第 1 通道主码流
  • cam_10_0_1_20_ch1_sub:10.0.1.20 设备第 1 通道子码流
  • cam_10_0_1_21_ch2_main:10.0.1.21 设备第 2 通道主码流

命名规范优势:

  1. 可读性:从路径名直接识别设备和通道
  2. 唯一性:全局唯一标识符
  3. 模式匹配:支持通配符配置
   paths:
     cam_*_main:
       # 所有主码流共享配置
     cam_*_sub:
       # 所有子码流共享配置

5.4 路径注册实现

核心服务类:

@Service
@Slf4j
public class MediaPathService {
    
    @Value("${media.mtx.apiBaseUrl}")
    private String apiBaseUrl;
    
    private final RestTemplate restTemplate;
    
    /**
     * 注册直连路径
     */
    public void registerDirectPath(StreamPath path) {
        Map<String, Object> config = new HashMap<>();
        config.put("name", path.getPathName());
        config.put("source", path.getSourceUrl());
        config.put("sourceProtocol", "tcp");
        config.put("sourceOnDemand", true);
        config.put("sourceOnDemandStartTimeout", "10s");
        config.put("sourceOnDemandCloseAfter", "10s");
        
        addPath(path.getPathName(), config);
    }
    
    /**
     * 注册转码路径
     */
    public void registerTranscodePath(StreamPath path) {
        String ffmpegCmd = buildFfmpegCommand(path.getSourceUrl());
        
        Map<String, Object> config = new HashMap<>();
        config.put("name", path.getPathName());
        config.put("source", "publisher");
        config.put("runOnDemand", ffmpegCmd);
        config.put("runOnDemandRestart", true);
        config.put("runOnDemandStartTimeout", "10s");
        config.put("runOnDemandCloseAfter", "10s");
        
        addPath(path.getPathName(), config);
    }
    
    /**
     * 删除并重建路径 (确保配置完全生效)
     */
    public void updatePath(StreamPath path) {
        deletePath(path.getPathName());
        
        if (path.getNeedTranscode() == 1) {
            registerTranscodePath(path);
        } else {
            registerDirectPath(path);
        }
    }
    
    private void addPath(String pathName, Map<String, Object> config) {
        String url = apiBaseUrl + "/v3/config/paths/add/" + pathName;
        
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        
        HttpEntity<Map<String, Object>> request = new HttpEntity<>(config, headers);
        
        try {
            ResponseEntity<String> response = restTemplate.postForEntity(url, request, String.class);
            
            if (response.getStatusCode().is2xxSuccessful()) {
                log.info("Path registered: {}", pathName);
            } else {
                throw new MediaGatewayException("Failed to register path: " + response.getBody());
            }
        } catch (Exception e) {
            log.error("MediaMTX API error: {}", e.getMessage());
            throw new MediaGatewayException("Path registration failed", e);
        }
    }
    
    private void deletePath(String pathName) {
        String url = apiBaseUrl + "/v3/config/paths/delete/" + pathName;
        
        try {
            restTemplate.delete(url);
            log.info("Path deleted: {}", pathName);
        } catch (HttpClientErrorException.NotFound e) {
            // 路径不存在,忽略错误
        } catch (Exception e) {
            log.error("Failed to delete path {}: {}", pathName, e.getMessage());
        }
    }
    
    private String buildFfmpegCommand(String sourceUrl) {
        return String.format(
            "ffmpeg -hide_banner -loglevel warning " +
            "-rtsp_transport tcp " +
            "-i \"%s\" " +
            "-an " +
            "-c:v libx264 -preset veryfast -crf 23 " +
            "-tune zerolatency -pix_fmt yuv420p " +
            "-g 50 -keyint_min 50 -sc_threshold 0 " +
            "-x264-params repeat-headers=1 " +
            "-f rtsp -rtsp_transport tcp " +
            "\"rtsp://127.0.0.1:$RTSP_PORT/$MTX_PATH\"",
            sourceUrl
        );
    }
}

为什么使用 Delete + Add 而非 Patch?

  1. 配置完整性:Patch 可能保留旧配置片段,导致状态不一致
  2. 立即生效:Delete + Add 强制重建,确保新配置立刻生效
  3. 语义清晰:每次更新等价于发布新版本

风险与缓解:

  • 短暂中断:删除与添加之间有窗口期
  • 缓解措施:
    • 在低峰期批量变更
    • 添加失败时回滚到旧配置
    • 关键路径增加重试机制

5.5 播放地址生成

RTSP 地址:

rtsp://{MediaMTX_IP}:{RTSP_Port}/{path_name}

示例:

rtsp://10.0.10.1:8554/cam_10_0_1_20_ch1_main

HLS 地址:

http://{MediaMTX_IP}:{HLS_Port}/{path_name}/index.m3u8

示例:

http://10.0.10.1:8888/cam_10_0_1_20_ch1_main/index.m3u8

WebRTC 地址:

http://{MediaMTX_IP}:{WebRTC_Port}/{path_name}

示例:

http://10.0.10.1:8889/cam_10_0_1_20_ch1_main

播放器选择建议:

场景协议延迟兼容性推荐播放器
PC 浏览器实时预览WebRTC500msChrome/EdgeWebRTC.js
移动端 H5HLS3-5siOS/Androidvideo.js
客户端应用RTSP1-2s需原生支持VLC/ffplay
录像回放HLS不关键全平台video.js

6. FFmpeg 编解码治理

6.1 编码探测策略

为什么需要探测?

  1. 浏览器兼容性:Safari 不支持 H.265,Chrome 90+ 才支持
  2. 成本控制:H.264 可直连,H.265 需转码 (CPU 密集)
  3. 质量保障:探测失败的流不应接入系统

探测流程:

@Service
@Slf4j
public class CodecProbeService {
    
    @Value("${codec.probe.ffprobe-bin:ffprobe}")
    private String ffprobeBin;
    
    @Value("${codec.probe.timeout-ms:3000}")
    private int timeoutMs;
    
    private final Semaphore probeSemaphore = new Semaphore(8);
    
    public CodecInfo probe(String rtspUrl) {
        try {
            probeSemaphore.acquire();
            return doProbe(rtspUrl);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return CodecInfo.unknown();
        } finally {
            probeSemaphore.release();
        }
    }
    
    private CodecInfo doProbe(String rtspUrl) {
        List<String> command = Arrays.asList(
            ffprobeBin,
            "-v", "error",
            "-rtsp_transport", "tcp",
            "-analyzeduration", "1000000",
            "-probesize", "32768",
            "-select_streams", "v:0",
            "-show_entries", "stream=codec_name,width,height,avg_frame_rate",
            "-of", "json",
            rtspUrl
        );
        
        ProcessBuilder pb = new ProcessBuilder(command);
        pb.redirectErrorStream(true);
        
        try {
            Process process = pb.start();
            boolean finished = process.waitFor(timeoutMs, TimeUnit.MILLISECONDS);
            
            if (!finished) {
                process.destroyForcibly();
                log.warn("ffprobe timeout for {}", maskUrl(rtspUrl));
                return CodecInfo.unknown();
            }
            
            if (process.exitValue() != 0) {
                log.warn("ffprobe failed for {}", maskUrl(rtspUrl));
                return CodecInfo.unknown();
            }
            
            String output = IOUtils.toString(process.getInputStream(), StandardCharsets.UTF_8);
            return parseProbeOutput(output);
            
        } catch (Exception e) {
            log.error("ffprobe error for {}: {}", maskUrl(rtspUrl), e.getMessage());
            return CodecInfo.unknown();
        }
    }
    
    private CodecInfo parseProbeOutput(String json) {
        try {
            JSONObject obj = JSON.parseObject(json);
            JSONArray streams = obj.getJSONArray("streams");
            
            if (streams == null || streams.isEmpty()) {
                return CodecInfo.unknown();
            }
            
            JSONObject stream = streams.getJSONObject(0);
            
            String codecName = stream.getString("codec_name");
            int width = stream.getIntValue("width");
            int height = stream.getIntValue("height");
            String frameRate = stream.getString("avg_frame_rate");
            
            return CodecInfo.builder()
                .codec(normalizeCodec(codecName))
                .width(width)
                .height(height)
                .frameRate(parseFrameRate(frameRate))
                .build();
                
        } catch (Exception e) {
            log.error("Failed to parse ffprobe output", e);
            return CodecInfo.unknown();
        }
    }
    
    private String normalizeCodec(String codecName) {
        if (codecName == null) return "UNKNOWN";
        
        String lower = codecName.toLowerCase();
        if (lower.contains("h264") || lower.equals("avc")) return "H264";
        if (lower.contains("h265") || lower.equals("hevc")) return "H265";
        if (lower.contains("mjpeg")) return "MJPEG";
        
        return "UNKNOWN";
    }
    
    private double parseFrameRate(String frameRate) {
        if (frameRate == null || frameRate.isEmpty()) return 0.0;
        
        try {
            if (frameRate.contains("/")) {
                String[] parts = frameRate.split("/");
                double num = Double.parseDouble(parts[0]);
                double den = Double.parseDouble(parts[1]);
                return den == 0 ? 0 : num / den;
            }
            return Double.parseDouble(frameRate);
        } catch (Exception e) {
            return 0.0;
        }
    }
    
    private String maskUrl(String url) {
        return url.replaceAll("://[^@]+@", "://***:***@");
    }
}

并发控制为什么必要?

批量接入 500 路相机时,如果不限制并发:

  • CPU 负载瞬间飙升至 100%
  • 磁盘 I/O 饱和 (ffprobe 需要写临时文件)
  • 导致正常业务请求超时

推荐并发数:

  • 4 核 8G:并发 4-8
  • 8 核 16G:并发 8-16
  • 16 核 32G:并发 16-32

6.2 转码决策引擎

决策矩阵:

编码格式浏览器兼容性决策备注
H264✅ 全兼容直连无需转码
H265⚠️ 部分兼容转码Chrome 90+, Safari 不支持
MJPEG⚠️ 兼容但低效转码带宽占用高
UNKNOWN❌ 未知保守转码探测失败兜底

实现代码:

@Service
public class CodecDecisionService {
    
    @Value("${codec.transcode.enabled:true}")
    private boolean transcodeEnabled;
    
    @Value("${codec.transcode.assume-h265-when-unknown:true}")
    private boolean assumeH265WhenUnknown;
    
    public boolean needTranscode(String codec) {
        if (!transcodeEnabled) {
            return false;
        }
        
        if (codec == null || "UNKNOWN".equals(codec)) {
            return assumeH265WhenUnknown;
        }
        
        return "H265".equals(codec) || "HEVC".equals(codec) || "MJPEG".equals(codec);
    }
    
    public TranscodeStrategy selectStrategy(CodecInfo info) {
        if (!needTranscode(info.getCodec())) {
            return TranscodeStrategy.DIRECT;
        }
        
        // 根据分辨率选择转码参数
        if (info.getWidth() >= 1920 || info.getHeight() >= 1080) {
            return TranscodeStrategy.HIGH_QUALITY;
        } else if (info.getWidth() >= 1280 || info.getHeight() >= 720) {
            return TranscodeStrategy.MEDIUM_QUALITY;
        } else {
            return TranscodeStrategy.LOW_QUALITY;
        }
    }
}

public enum TranscodeStrategy {
    DIRECT(null, null, null),
    HIGH_QUALITY("veryfast", "23", "50"),
    MEDIUM_QUALITY("faster", "25", "60"),
    LOW_QUALITY("fast", "28", "90");
    
    private final String preset;
    private final String crf;
    private final String gop;
    
    // constructor and getters
}

6.3 转码命令构建

基础转码命令:

public class FfmpegCommandBuilder {
    
    public String buildTranscodeCommand(String sourceUrl, TranscodeStrategy strategy) {
        StringBuilder cmd = new StringBuilder();
        
        cmd.append("ffmpeg -hide_banner -loglevel warning ");
        cmd.append("-rtsp_transport tcp ");
        cmd.append("-stimeout 5000000 ");
        cmd.append("-probesize 5000000 ");
        cmd.append("-analyzeduration 5000000 ");
        cmd.append("-i \"").append(sourceUrl).append("\" ");
        
        // 禁用音频
        cmd.append("-an ");
        
        // 视频编码参数
        cmd.append("-c:v libx264 ");
        cmd.append("-preset ").append(strategy.getPreset()).append(" ");
        cmd.append("-crf ").append(strategy.getCrf()).append(" ");
        cmd.append("-tune zerolatency ");
        cmd.append("-pix_fmt yuv420p ");
        cmd.append("-g ").append(strategy.getGop()).append(" ");
        cmd.append("-keyint_min ").append(strategy.getGop()).append(" ");
        cmd.append("-sc_threshold 0 ");
        cmd.append("-x264-params repeat-headers=1 ");
        
        // 输出格式
        cmd.append("-f rtsp -rtsp_transport tcp ");
        cmd.append("\"rtsp://127.0.0.1:$RTSP_PORT/$MTX_PATH\"");
        
        return cmd.toString();
    }
}

硬件加速版本 (NVENC):

public String buildNvencCommand(String sourceUrl) {
    StringBuilder cmd = new StringBuilder();
    
    cmd.append("ffmpeg -hide_banner -loglevel warning ");
    cmd.append("-hwaccel cuda -hwaccel_output_format cuda ");
    cmd.append("-rtsp_transport tcp ");
    cmd.append("-i \"").append(sourceUrl).append("\" ");
    cmd.append("-an ");
    cmd.append("-c:v h264_nvenc ");
    cmd.append("-preset p4 ");
    cmd.append("-b:v 3000k ");
    cmd.append("-g 50 ");
    cmd.append("-f rtsp -rtsp_transport tcp ");
    cmd.append("\"rtsp://127.0.0.1:$RTSP_PORT/$MTX_PATH\"");
    
    return cmd.toString();
}

硬件加速检测:

public boolean isNvencAvailable() {
    try {
        ProcessBuilder pb = new ProcessBuilder("ffmpeg", "-encoders");
        Process process = pb.start();
        
        String output = IOUtils.toString(process.getInputStream(), StandardCharsets.UTF_8);
        return output.contains("h264_nvenc");
        
    } catch (Exception e) {
        return false;
    }
}

参数调优指南:

参数推荐值说明影响
-presetveryfast编码速度预设速度 ↑ 质量 ↓
-crf23恒定质量因子值越小质量越高
-tunezerolatency低延迟优化减少缓冲延迟
-g50GOP 大小影响跳转精度
-b:v3000k码率带宽与质量平衡

7. 相机接入完整流程

7.1 接入时序图

[前端]  [接口层]  [业务层]  [ONVIF层]  [设备]
                                 
                                 └─→ GetDeviceInformation
                                 └─→ GetCapabilities
                                 └─→ GetProfiles
                                 └─→ GetStreamUri (per profile)
                      
                      └─→ [数据层] 保存设备主档
                      └─→ [异步任务] 流同步
                                  
                                  └─→ FFprobe 探测编码
                                  └─→ 保存流路径
                                  └─→ MediaMTX 注册路径
            
            └─→ 返回接入结果

7.2 接入请求模型

@Data
public class CameraAddRequest {
    @NotBlank(message = "IP不能为空")
    private String ip;
    
    @Min(value = 1, message = "端口范围 1-65535")
    @Max(value = 65535, message = "端口范围 1-65535")
    private Integer port = 80;
    
    @NotBlank(message = "用户名不能为空")
    private String username;
    
    @NotBlank(message = "密码不能为空")
    private String password;
    
    private String cameraName;
    private Long groupId;
    
    /**
     * 0: 标准 ONVIF 流程
     * 1: 厂商兜底模式 (跳过 ONVIF,使用固定规则)
     */
    private Integer skipOnvif = 0;
    
    /**
     * 厂商类型 (skipOnvif=1 时有效)
     * 支持: HIKVISION, DAHUA, UNIVIEW
     */
    private String vendorType;
}

7.3 接入服务实现

@Service
@Slf4j
public class CameraOnboardingService {
    
    @Resource
    private CameraDeviceRepository deviceRepository;
    
    @Resource
    private OnvifManager onvifManager;
    
    @Resource
    private StreamPathService streamPathService;
    
    @Resource
    private CameraGroupService groupService;
    
    @Transactional(rollbackFor = Exception.class)
    public CameraDevice addCamera(CameraAddRequest request) {
        // 1. 重复性校验
        if (deviceRepository.existsByIpAndIsDeleted(request.getIp(), 0)) {
            throw new BusinessException("设备已存在: " + request.getIp());
        }
        
        // 2. 构建设备实体
        CameraDevice device = new CameraDevice();
        BeanUtils.copyProperties(request, device);
        
        // 设置默认名称
        if (StringUtils.isBlank(device.getCameraName())) {
            device.setCameraName(device.getIp());
        }
        
        // 3. ONVIF 信息获取
        if (request.getSkipOnvif() == 0) {
            try {
                OnvifDeviceDetail detail = onvifManager.getDeviceInformation(
                    device.getIp(),
                    device.getPort(),
                    device.getUsername(),
                    device.getPassword()
                );
                
                device.setManufacturer(detail.getManufacturer());
                device.setModel(detail.getModel());
                device.setSerialNumber(detail.getSerialNumber());
                device.setFirmwareVersion(detail.getFirmwareVersion());
                
            } catch (OnvifException e) {
                log.error("ONVIF failed for {}, fallback to basic mode", request.getIp(), e);
                device.setManufacturer("UNKNOWN");
                device.setModel("UNKNOWN");
            }
        } else {
            // 厂商兜底模式
            device.setManufacturer(request.getVendorType());
            device.setModel("Generic");
        }
        
        // 4. 设置初始状态
        device.setCameraStatus(1);
        device.setLastOnlineTime(LocalDateTime.now());
        
        // 5. 解析分组
        if (request.getGroupId() == null) {
            device.setGroupId(groupService.getDefaultGroupId());
        }
        
        // 6. 保存设备
        deviceRepository.save(device);
        
        // 7. 异步同步流路径
        CompletableFuture.runAsync(() -> {
            try {
                streamPathService.syncStreamPaths(device, request);
            } catch (Exception e) {
                log.error("Stream sync failed for camera {}", device.getId(), e);
            }
        });
        
        return device;
    }
}

事务边界设计:

  1. 同步部分 (在事务内):
    • 设备主档写入
    • 基础验证
  2. 异步部分 (事务外):
    • 流路径同步
    • MediaMTX 注册

为什么异步?

  • ONVIF 调用可能超时 (3-5秒)
  • FFprobe 探测耗时 (2-3秒/路)
  • 避免接口长时间阻塞

7.4 流同步服务

标准 ONVIF 模式:

@Service
@Slf4j
public class StreamPathService {
    
    @Resource
    private StreamPathRepository pathRepository;
    
    @Resource
    private OnvifManager onvifManager;
    
    @Resource
    private CodecProbeService codecProbeService;
    
    @Resource
    private CodecDecisionService codecDecisionService;
    
    @Resource
    private MediaPathService mediaPathService;
    
    @Transactional(rollbackFor = Exception.class)
    public void syncStreamPaths(CameraDevice device, CameraAddRequest request) {
        if (request.getSkipOnvif() == 1) {
            syncStreamPathsVendorFallback(device, request);
            return;
        }
        
        // 1. 获取 Profiles
        List<OnvifProfile> profiles = onvifManager.getProfiles(
            device.getIp(),
            device.getPort(),
            device.getUsername(),
            device.getPassword()
        );
        
        if (profiles.isEmpty()) {
            throw new BusinessException("No profiles found for camera: " + device.getIp());
        }
        
        // 2. 删除旧路径
        pathRepository.deleteAllByCameraId(device.getId());
        
        // 3. 处理每个 Profile
        for (OnvifProfile profile : profiles) {
            try {
                processProfile(device, profile);
            } catch (Exception e) {
                log.error("Failed to process profile {} for camera {}", 
                    profile.getToken(), device.getId(), e);
            }
        }
    }
    
    private void processProfile(CameraDevice device, OnvifProfile profile) {
        // 1. 获取 RTSP URI
        String rtspUri = onvifManager.getStreamUri(
            device.getIp(),
            device.getPort(),
            device.getUsername(),
            device.getPassword(),
            profile.getToken()
        );
        
        if (StringUtils.isBlank(rtspUri)) {
            log.warn("Empty RTSP URI for profile {}", profile.getToken());
            return;
        }
        
        // 2. 注入凭据
        String sourceUrl = injectCredentials(rtspUri, device.getUsername(), device.getPassword());
        
        // 3. 探测编码
        CodecInfo codecInfo = codecProbeService.probe(sourceUrl);
        String codec = codecInfo.getCodec();
        boolean needTranscode = codecDecisionService.needTranscode(codec);
        
        // 4. 构建路径名
        int channel = extractChannel(profile.getVideoSourceToken());
        int subtype = detectSubtype(profile.getWidth(), profile.getName());
        String pathName = buildPathName(device.getIp(), channel, subtype);
        
        // 5. 保存路径记录
        StreamPath path = StreamPath.builder()
            .cameraId(device.getId())
            .pathName(pathName)
            .sourceUrl(sourceUrl)
            .profileToken(profile.getToken())
            .profileName(profile.getName())
            .videoEncoding(profile.getEncoding())
            .videoCodec(codec)
            .resolution(profile.getWidth() + "x" + profile.getHeight())
            .subtype(subtype)
            .channel(channel)
            .needTranscode(needTranscode ? 1 : 0)
            .enabled(1)
            .status(1)
            .build();
        
        pathRepository.save(path);
        
        // 6. 注册到 MediaMTX
        try {
            mediaPathService.updatePath(path);
            log.info("Path registered: {}", pathName);
        } catch (Exception e) {
            log.error("MediaMTX registration failed for {}", pathName, e);
            path.setStatus(0);
            pathRepository.updateById(path);
        }
    }
    
    private String buildPathName(String ip, int channel, int subtype) {
        String ipNormalized = ip.replace(".", "_");
        String subtypeStr = (subtype == 0) ? "main" : "sub";
        return String.format("cam_%s_ch%d_%s", ipNormalized, channel, subtypeStr);
    }
    
    private int extractChannel(String sourceToken) {
        if (sourceToken == null) return 1;
        
        Matcher m = Pattern.compile("\\d+").matcher(sourceToken);
        return m.find() ? Integer.parseInt(m.group()) : 1;
    }
    
    private int detectSubtype(int width, String name) {
        // 主码流判断
        if (width >= 1280) return 0;
        
        // 名称包含 sub/stream2 等关键字
        if (name != null) {
            String lower = name.toLowerCase();
            if (lower.contains("sub") || lower.contains("stream2")) {
                return 1;
            }
        }
        
        // 默认子码流
        return 1;
    }
    
    private String injectCredentials(String rtspUrl, String user, String pass) {
        try {
            URI uri = new URI(rtspUrl);
            String userInfo = URLEncoder.encode(user, "UTF-8") + ":" + 
                              URLEncoder.encode(pass, "UTF-8");
            
            return new URI(
                uri.getScheme(),
                userInfo,
                uri.getHost(),
                uri.getPort(),
                uri.getPath(),
                uri.getQuery(),
                uri.getFragment()
            ).toString();
        } catch (Exception e) {
            throw new IllegalArgumentException("Invalid RTSP URL: " + rtspUrl, e);
        }
    }
}

厂商兜底模式:

private void syncStreamPathsVendorFallback(CameraDevice device, CameraAddRequest request) {
    String vendor = request.getVendorType();
    
    if (!"HIKVISION".equalsIgnoreCase(vendor)) {
        throw new BusinessException("Unsupported vendor fallback: " + vendor);
    }
    
    // 海康固定规则
    List<String> rtspTemplates = Arrays.asList(
        "rtsp://{ip}:554/Streaming/Channels/101",  // 主码流
        "rtsp://{ip}:554/Streaming/Channels/102"   // 子码流
    );
    
    pathRepository.deleteAllByCameraId(device.getId());
    
    for (int i = 0; i < rtspTemplates.size(); i++) {
        String template = rtspTemplates.get(i);
        String rtspUrl = template.replace("{ip}", device.getIp());
        String sourceUrl = injectCredentials(rtspUrl, device.getUsername(), device.getPassword());
        
        CodecInfo codecInfo = codecProbeService.probe(sourceUrl);
        String codec = codecInfo.getCodec();
        boolean needTranscode = codecDecisionService.needTranscode(codec);
        
        int subtype = i; // 0=main, 1=sub
        String pathName = buildPathName(device.getIp(), 1, subtype);
        
        StreamPath path = StreamPath.builder()
            .cameraId(device.getId())
            .pathName(pathName)
            .sourceUrl(sourceUrl)
            .videoCodec(codec)
            .subtype(subtype)
            .channel(1)
            .needTranscode(needTranscode ? 1 : 0)
            .enabled(1)
            .status(1)
            .build();
        
        pathRepository.save(path);
        mediaPathService.updatePath(path);
    }
}

7.5 接入失败处理

失败分类:

  1. ONVIF 连接失败
    • 原因:网络不通、端口错误、服务未启动
    • 处理:返回明确错误信息,不创建设备记录
  2. 认证失败
    • 原因:用户名密码错误、权限不足
    • 处理:提示用户检查凭据
  3. 能力不足
    • 原因:设备不支持 Media Service
    • 处理:降级到厂商兜底模式
  4. 流探测失败
    • 原因:RTSP 不可达、编码异常
    • 处理:标记路径状态为失败,允许后续重试

错误示例:

{
  "code": 400,
  "message": "设备接入失败",
  "details": {
    "step": "ONVIF_DEVICE_INFO",
    "error": "Connection timeout",
    "suggestion": "请检查设备网络连接和 ONVIF 服务是否启动"
  }
}

8. 设备状态巡检与事件闭环

8.1 在线巡检机制

巡检策略:

@Component
@Slf4j
public class CameraHealthCheckTask {
    
    @Resource
    private CameraDeviceRepository deviceRepository;
    
    @Resource
    private OnvifManager onvifManager;
    
    @Resource
    private CameraOfflineEventService offlineEventService;
    
    @Value("${camera.health.check-interval-ms:30000}")
    private long checkInterval;
    
    @Scheduled(fixedDelayString = "${camera.health.check-interval-ms:30000}")
    public void checkAllDevices() {
        List<CameraDevice> devices = deviceRepository.findAllActive();
        
        log.info("Starting health check for {} devices", devices.size());
        
        for (CameraDevice device : devices) {
            try {
                checkSingleDevice(device);
            } catch (Exception e) {
                log.error("Health check failed for device {}", device.getId(), e);
            }
        }
    }
    
    private void checkSingleDevice(CameraDevice device) {
        boolean online = isOnline(device);
        LocalDateTime now = LocalDateTime.now();
        
        if (online) {
            handleOnline(device, now);
        } else {
            handleOffline(device, now);
        }
    }
    
    private boolean isOnline(CameraDevice device) {
        try {
            onvifManager.getSystemDateAndTime(
                device.getIp(),
                device.getPort(),
                device.getUsername(),
                device.getPassword()
            );
            return true;
        } catch (Exception e) {
            log.debug("Device {} offline: {}", device.getId(), e.getMessage());
            return false;
        }
    }
    
    @Transactional(rollbackFor = Exception.class)
    protected void handleOnline(CameraDevice device, LocalDateTime checkTime) {
        // 状态变化:离线 -> 在线
        if (device.getCameraStatus() == 0) {
            device.setCameraStatus(1);
            device.setLastOnlineTime(checkTime);
            deviceRepository.updateById(device);
            
            // 闭环离线事件
            offlineEventService.resolveEvent(device.getId(), checkTime);
            
            log.info("Device {} back online", device.getId());
        } else {
            // 刷新在线时间
            device.setLastOnlineTime(checkTime);
            deviceRepository.updateById(device);
        }
    }
    
    @Transactional(rollbackFor = Exception.class)
    protected void handleOffline(CameraDevice device, LocalDateTime checkTime) {
        // 状态变化:在线 -> 离线
        if (device.getCameraStatus() == 1) {
            device.setCameraStatus(0);
            deviceRepository.updateById(device);
            
            // 创建离线事件
            offlineEventService.createEvent(device, checkTime);
            
            log.warn("Device {} went offline", device.getId());
        } else {
            // 更新离线事件
            offlineEventService.updateEvent(device.getId(), checkTime);
        }
    }
}

为什么必须异常隔离?

如果单台设备异常导致整批巡检中断:

  • 其他设备状态陈旧
  • 离线事件无法及时创建
  • 运维告警失效

8.2 离线事件管理

@Service
@Slf4j
public class CameraOfflineEventService {
    
    @Resource
    private CameraOfflineEventRepository eventRepository;
    
    @Transactional(rollbackFor = Exception.class)
    public void createEvent(CameraDevice device, LocalDateTime startTime) {
        CameraOfflineEvent event = CameraOfflineEvent.builder()
            .deviceId(device.getId())
            .deviceIp(device.getIp())
            .groupId(device.getGroupId())
            .startTime(startTime)
            .lastCheckTime(startTime)
            .isResolved(0)
            .alarmStatus(0)
            .build();
        
        eventRepository.save(event);
        
        log.info("Offline event created for device {}", device.getId());
    }
    
    @Transactional(rollbackFor = Exception.class)
    public void updateEvent(Long deviceId, LocalDateTime checkTime) {
        CameraOfflineEvent event = eventRepository.findLatestByDeviceId(deviceId);
        
        if (event == null || event.getIsResolved() == 1) {
            // 事件已闭环或不存在,不应到达此分支
            log.warn("No active offline event for device {}", deviceId);
            return;
        }
        
        event.setLastCheckTime(checkTime);
        
        long duration = Duration.between(event.getStartTime(), checkTime).getSeconds();
        event.setDuration(duration);
        
        eventRepository.updateById(event);
    }
    
    @Transactional(rollbackFor = Exception.class)
    public void resolveEvent(Long deviceId, LocalDateTime endTime) {
        CameraOfflineEvent event = eventRepository.findLatestByDeviceId(deviceId);
        
        if (event == null || event.getIsResolved() == 1) {
            return;
        }
        
        event.setEndTime(endTime);
        event.setIsResolved(1);
        
        long duration = Duration.between(event.getStartTime(), endTime).getSeconds();
        event.setDuration(duration);
        
        eventRepository.updateById(event);
        
        log.info("Offline event resolved for device {}, duration: {}s", deviceId, duration);
    }
    
    /**
     * 查询当前未恢复的离线设备
     */
    public List<CameraOfflineEvent> listActiveEvents() {
        return eventRepository.findByIsResolved(0);
    }
    
    /**
     * 按时间范围查询离线历史
     */
    public List<CameraOfflineEvent> queryHistory(LocalDateTime start, LocalDateTime end) {
        return eventRepository.findByStartTimeBetween(start, end);
    }
}

事件状态机:

[设备在线] --检测离线--> [创建事件] (is_resolved=0)
                             
                             ├--持续离线--> [更新 duration]
                             
                             └--恢复在线--> [闭环事件] (is_resolved=1, end_time)
     
     └--始终在线--> [无事件]

离线率统计示例:

public OfflineStatistics calculateOfflineRate(LocalDateTime start, LocalDateTime end) {
    List<CameraOfflineEvent> events = queryHistory(start, end);
    
    long totalDuration = events.stream()
        .mapToLong(CameraOfflineEvent::getDuration)
        .sum();
    
    long windowDuration = Duration.between(start, end).getSeconds();
    double offlineRate = (double) totalDuration / windowDuration * 100;
    
    return OfflineStatistics.builder()
        .eventCount(events.size())
        .totalOfflineDuration(totalDuration)
        .offlineRate(offlineRate)
        .build();
}

贡献者

  • flycodeuflycodeu

公告板

2025-03-04正式迁移知识库到此项目