从 0 到 1 构建企业级视频智能分析平台:完整工程实践

一份可直接落地的全栈技术方案 覆盖相机接入、流媒体编排、存储治理、智能分析全链路 包含架构设计、核心代码、数据模型、运维体系与性能基准


目录

第一部分:平台基础与相机接入

  1. 问题定义与架构设计
  2. 技术选型与核心组件
  3. 数据模型设计
  4. ONVIF 协议深度解析与实现
  5. MediaMTX 流媒体网关集成
  6. FFmpeg 编解码治理
  7. 相机接入完整流程
  8. 设备状态巡检与事件闭环

第二部分:录像存储系统

  1. 录像系统架构设计
  2. 存储卷治理与容量管理
  3. 录像策略编排
  4. 文件索引与回放检索

第三部分:智能分析平台

  1. 算法任务模型设计
  2. 算法节点管理与调度
  3. WebSocket 协议规范
  4. 结果处理与告警推送
  5. 预览链路实现

第四部分:运维与演进

  1. 配置管理
  2. 可观测性体系
  3. 典型问题与解决方案
  4. FAQ 与最佳实践
  5. 性能基准与压测
  6. 架构演进路线

第一部分:平台基础与相机接入

1. 问题定义与架构设计

1.1 业务背景与核心挑战

在构建视频智能分析平台的过程中,我们面临着多个维度的工程挑战:

设备层异构性

  • 不同厂商的 ONVIF 协议实现质量参差不齐,部分设备仅支持核心功能子集
  • 编码格式混乱:H.264、H.265、MJPEG 共存,浏览器兼容性差异显著
  • 网络拓扑复杂:设备位于内网,播放端在外网,NAT 穿透成为常态

状态管理复杂性

  • 控制平面可达但媒体平面不可达的”假在线”现象
  • 设备离线后缺少事件追踪,无法形成运维闭环
  • 流地址变化频繁,缺少统一的路径抽象

扩展性瓶颈

  • 录像、算法等下游服务直接依赖设备原始 RTSP,耦合过深
  • 缺少统一的媒体出口,后续功能扩展需要大量重复工作
  • 编码转换策略不清晰,成本与质量难以平衡

1.2 设计目标

基于上述问题,我们设定了分阶段的建设目标:

第一阶段:建立可扩展底座

  1. 统一接入层:设备能力发现、凭据校验、元信息持久化
  2. 统一流模型:自动生成主/子码流路径,建立路径到设备的映射关系
  3. 统一媒体出口:通过 MediaMTX 提供可控、可观测的播放入口
  4. 编码治理:先探测、后决策,按需转码,控制成本
  5. 状态闭环:在线巡检 + 离线事件追踪,形成完整生命周期管理

第二阶段:录像与存储

  1. 录像策略模型化,支持多种录制模式
  2. 存储卷治理,容量可观测、可切换
  3. 文件索引构建,回放检索高效化

第三阶段:智能分析

  1. 一个任务绑定一台相机,支持多算法场景并行
  2. 算法节点能力建模与调度
  3. 实时结果处理与告警分发

1.3 总体架构

我们采用分层解耦的架构设计:

┌─────────────────────────────────────────────────────────┐
│ API Gateway │
│ (RESTful API + WebSocket) │
└────────────────┬────────────────────────────────────────┘
┌────────────────┴────────────────────────────────────────┐
│ Application Layer │
│ ┌──────────────┬──────────────┬─────────────────────┐ │
│ │ 相机接入服务 │ 录像编排服务 │ 算法任务编排服务 │ │
│ └──────────────┴──────────────┴─────────────────────┘ │
└────────────────┬────────────────────────────────────────┘
┌────────────────┴────────────────────────────────────────┐
│ Infrastructure Layer │
│ ┌──────────────┬──────────────┬─────────────────────┐ │
│ │ ONVIF 适配层 │ MediaMTX网关 │ FFmpeg编码引擎 │ │
│ └──────────────┴──────────────┴─────────────────────┘ │
│ ┌──────────────┬──────────────┬─────────────────────┐ │
│ │ MySQL │ 存储卷管理 │ 算法节点池 │ │
│ └──────────────┴──────────────┴─────────────────────┘ │
└─────────────────────────────────────────────────────────┘

核心设计原则

  1. 控制面与媒体面解耦
    • ONVIF 负责设备能力发现和控制指令
    • MediaMTX 负责媒体流路径管理和分发
    • 两者通过数据库中的路径映射关联
  2. 路径抽象统一化
    • 所有下游服务(录像/算法/播放)只依赖平台分配的路径
    • 设备地址变化时,只需更新路径配置,不影响消费端
  3. 异步化与容错
    • 接入流程中的流同步采用异步执行
    • 媒体注册失败支持延迟重试
    • 关键操作支持事务回滚

2. 技术选型与核心组件

2.1 ONVIF 协议详解

什么是 ONVIF

ONVIF (Open Network Video Interface Forum) 是一个开放的网络视频设备接口标准,旨在实现不同厂商的 IP 摄像机、录像机和视频管理软件之间的互操作性。ONVIF 基于 Web Services 技术栈,核心协议包括:

  • SOAP (Simple Object Access Protocol):消息封装协议
  • WSDL (Web Services Description Language):服务描述语言
  • WS-Security:安全认证机制
  • WS-Discovery:设备发现协议

ONVIF 服务架构

ONVIF 将功能划分为多个服务端点:

  1. Device Service (/onvif/device_service)
    • 设备基础信息 (厂商、型号、序列号、固件版本)
    • 系统时间同步
    • 能力集发现 (Capabilities)
    • 网络配置
  2. Media Service (/onvif/media_service)
    • Profile 管理 (流配置)
    • 获取 RTSP URI
    • 获取快照 URI
    • 视频编码器配置
  3. PTZ Service (/onvif/ptz_service)
    • 云台控制
    • 预置位管理
  4. Events Service (/onvif/events_service)
    • 事件订阅
    • 运动检测通知

为什么选择 ONVIF

  1. 标准化程度高:主流厂商均支持,减少适配成本
  2. 功能完整性:覆盖设备管理到媒体控制的全生命周期
  3. 可扩展性强:通过 Profile 机制支持不同能力级别
  4. 生态成熟:丰富的开源库和工具链

ONVIF 的局限性与应对

  1. 厂商实现差异
    • 问题:部分厂商仅实现 Profile S (Streaming),缺少 PTZ 等扩展能力
    • 应对:设计时采用能力发现机制,降级处理缺失功能
  2. 性能开销
    • 问题:SOAP 协议 XML 解析开销较大
    • 应对:缓存设备信息,减少重复调用;关键路径使用连接池
  3. 时间同步敏感
    • 问题:WS-Security 要求客户端与设备时间误差小于 5 秒
    • 应对:定期校准服务器 NTP;捕获时间窗口错误并重试

2.2 MediaMTX 流媒体网关

什么是 MediaMTX

MediaMTX (前身为 rtsp-simple-server) 是一个现代化的实时流媒体服务器,支持多种协议的统一接入和分发:

  • 输入协议:RTSP、RTMP、HLS、WebRTC、SRT
  • 输出协议:RTSP、HLS、WebRTC
  • 核心特性:
    • 按需拉流 (sourceOnDemand)
    • 按需转码 (runOnDemand)
    • RESTful API 动态配置
    • 低延迟 WebRTC 推流

MediaMTX 的技术优势

  1. 轻量级架构
    • 单一 Go 二进制,资源占用低
    • 无外部依赖,部署简单
  2. 协议转换能力
    • 统一的内部流模型
    • 自动协议适配,降低客户端复杂度
  3. 动态路径管理
    • 支持运行时添加/删除路径
    • 版本化配置接口

MediaMTX 在本架构中的作用

  1. 统一媒体出口
    • 所有 RTSP 源流接入 MediaMTX
    • 对外提供标准化的播放地址
  2. 按需资源调度
    • sourceOnDemand:无客户端时不拉流,节省带宽
    • runOnDemand:触发 FFmpeg 转码进程
  3. 录像与预览集成
    • 原生支持切片录像
    • 实时预览与录像共享同一路径

MediaMTX 路径配置示例

paths:
cam_10_0_1_20_ch1_main:
source: rtsp://admin:pass@10.0.1.20:554/Streaming/Channels/101
sourceOnDemand: yes
sourceOnDemandStartTimeout: 10s
sourceOnDemandCloseAfter: 10s
# 录像配置
record: yes
recordPath: /recordings/%path/%Y-%m-%d/%H-%M-%S
recordFormat: fmp4
recordSegmentDuration: 10m
recordDeleteAfter: 7d

2.3 FFmpeg 编解码工具链

什么是 FFmpeg

FFmpeg 是一个完整的跨平台音视频处理工具集,包含:

  • ffmpeg:转码工具
  • ffprobe:流分析工具
  • ffplay:播放器
  • libav*:编解码库

核心概念

  1. 容器 (Container)
    • 文件格式,如 MP4、FLV、TS
    • 存储音视频流的封装层
  2. 编解码器 (Codec)
    • 视频:H.264、H.265 (HEVC)、VP9、AV1
    • 音频:AAC、MP3、Opus
  3. 滤镜 (Filter)
    • 视频处理:缩放、裁剪、叠加
    • 音频处理:混音、变速

常用命令详解

1. ffprobe 流探测

Terminal window
ffprobe -v error \
-rtsp_transport tcp \
-analyzeduration 1000000 \
-probesize 32768 \
-select_streams v:0 \
-show_entries stream=codec_name,width,height,avg_frame_rate \
-of json \
"rtsp://admin:pass@10.0.1.20:554/Streaming/Channels/101"

参数解释:

  • -v error:仅输出错误信息
  • -rtsp_transport tcp:强制使用 TCP (避免 UDP 丢包)
  • -analyzeduration:分析时长 (微秒)
  • -probesize:探测数据大小 (字节)
  • -select_streams v:0:仅选择第一个视频流
  • -show_entries:指定输出字段
  • -of json:输出格式为 JSON

输出示例:

{
"streams": [{
"codec_name": "h264",
"width": 1920,
"height": 1080,
"avg_frame_rate": "25/1"
}]
}

2. H.265 转 H.264

Terminal window
ffmpeg -hide_banner -loglevel warning \
-rtsp_transport tcp \
-stimeout 5000000 \
-i "rtsp://admin:pass@10.0.1.20:554/Streaming/Channels/101" \
-an \
-c:v libx264 \
-preset veryfast \
-crf 23 \
-tune zerolatency \
-pix_fmt yuv420p \
-g 50 \
-keyint_min 50 \
-sc_threshold 0 \
-x264-params repeat-headers=1 \
-f rtsp \
-rtsp_transport tcp \
"rtsp://127.0.0.1:8554/output"

关键参数说明:

  • -an:禁用音频 (减少处理开销)
  • -c:v libx264:使用 x264 编码器
  • -preset veryfast:编码速度预设 (速度优先)
  • -crf 23:恒定质量因子 (18-28,值越小质量越高)
  • -tune zerolatency:优化低延迟场景
  • -pix_fmt yuv420p:像素格式 (浏览器兼容)
  • -g 50:GOP 大小 (关键帧间隔)
  • -sc_threshold 0:禁用场景切换检测
  • -x264-params repeat-headers=1:每个关键帧重复 SPS/PPS

3. 硬件加速转码 (NVIDIA)

Terminal window
ffmpeg -hwaccel cuda -hwaccel_output_format cuda \
-i input.mp4 \
-c:v h264_nvenc \
-preset p4 \
-b:v 3000k \
output.mp4

参数说明:

  • -hwaccel cuda:启用 CUDA 硬件加速
  • -hwaccel_output_format cuda:保持 GPU 内存格式
  • -c:v h264_nvenc:使用 NVENC 编码器
  • -preset p4:性能预设 (p1 最快, p7 最慢)

FFmpeg 在本平台中的应用

  1. 编码探测
    • 接入时探测设备编码格式
    • 决策是否需要转码
  2. 实时转码
    • H.265 -> H.264 兼容性转换
    • 分辨率/码率自适应
  3. 录像导出
    • 按时间范围裁剪
    • 多切片拼接

3. 数据模型设计

数据模型是整个系统的基础,直接决定了后续查询效率、扩展性和运维复杂度。我们遵循以下设计原则:

  1. 职责单一:每张表只负责一类业务实体
  2. 冗余可控:适度冗余提升查询性能,但避免数据不一致
  3. 索引精准:基于实际查询模式建立索引
  4. 字段语义化:使用明确的枚举值,避免魔法数字

3.1 设备主档表

DROP TABLE IF EXISTS `camera_device`;
CREATE TABLE `camera_device` (
`id` BIGINT NOT NULL COMMENT '主键ID',
`camera_name` VARCHAR(100) DEFAULT NULL COMMENT '设备名称',
`group_id` BIGINT DEFAULT NULL COMMENT '分组ID',
-- ONVIF 连接信息
`ip` VARCHAR(50) NOT NULL COMMENT '设备IP',
`port` INT NOT NULL DEFAULT 80 COMMENT 'ONVIF端口',
`username` VARCHAR(50) NOT NULL COMMENT 'ONVIF用户名',
`password` VARCHAR(255) NOT NULL COMMENT 'ONVIF密码(AES加密)',
-- 设备元信息
`serial_number` VARCHAR(100) DEFAULT NULL COMMENT '序列号',
`manufacturer` VARCHAR(100) DEFAULT NULL COMMENT '厂商',
`model` VARCHAR(100) DEFAULT NULL COMMENT '型号',
`firmware_version` VARCHAR(100) DEFAULT NULL COMMENT '固件版本',
-- 状态字段
`camera_status` INT DEFAULT 0 COMMENT '1在线 0离线',
`last_online_time` DATETIME DEFAULT NULL COMMENT '最后在线时间',
-- 审计字段
`create_user` BIGINT DEFAULT NULL,
`create_dept` BIGINT DEFAULT NULL,
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
`update_user` BIGINT DEFAULT NULL,
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`status` INT DEFAULT 1 COMMENT '1启用 0停用',
`is_deleted` INT DEFAULT 0 COMMENT '1已删除 0正常',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_ip_is_deleted` (`ip`, `is_deleted`),
KEY `idx_group_id` (`group_id`),
KEY `idx_camera_status` (`camera_status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='相机设备主档表';

设计要点:

  1. 唯一性约束
    • uk_ip_is_deleted:同一 IP 的有效设备全局唯一
    • 支持软删除后重新接入
  2. 状态索引
    • idx_camera_status:支持在线/离线设备快速筛选
    • 用于首页统计看板
  3. 安全性
    • password 字段应使用 AES 加密存储
    • 日志输出时需脱敏处理

3.2 分组表

DROP TABLE IF EXISTS `camera_group`;
CREATE TABLE `camera_group` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`group_name` VARCHAR(100) NOT NULL COMMENT '分组名称',
`parent_id` BIGINT DEFAULT NULL COMMENT '父分组ID',
`group_type` TINYINT DEFAULT 0 COMMENT '0普通分组 1地理分组 2功能分组',
`sort_order` INT DEFAULT 0 COMMENT '排序',
`icon` VARCHAR(50) DEFAULT NULL COMMENT '图标',
`description` VARCHAR(500) DEFAULT NULL COMMENT '描述',
`create_user` BIGINT DEFAULT NULL,
`create_dept` BIGINT DEFAULT NULL,
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
`update_user` BIGINT DEFAULT NULL,
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`status` INT DEFAULT 1,
`is_deleted` TINYINT DEFAULT 0,
PRIMARY KEY (`id`),
KEY `idx_parent_id` (`parent_id`),
KEY `idx_group_type` (`group_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='相机分组表';

设计要点:

  1. 树形结构
    • 支持多级分组
    • 通过 parent_id 构建层级关系
  2. 类型扩展
    • group_type 预留不同分组语义
    • 便于后续按地理位置、功能区域等维度组织

3.3 流路径表

DROP TABLE IF EXISTS `camera_stream_path`;
CREATE TABLE `camera_stream_path` (
`id` BIGINT NOT NULL COMMENT '主键ID',
`camera_id` BIGINT NOT NULL COMMENT '设备ID',
`path_name` VARCHAR(255) NOT NULL COMMENT '媒体路径主键',
`source_url` VARCHAR(1000) DEFAULT NULL COMMENT 'RTSP源地址',
-- ONVIF Profile 信息
`profile_token` VARCHAR(64) DEFAULT NULL COMMENT 'ONVIF ProfileToken',
`profile_name` VARCHAR(100) DEFAULT NULL COMMENT 'Profile名称',
`video_encoding` VARCHAR(20) DEFAULT NULL COMMENT '编码格式(原始)',
`video_codec` VARCHAR(16) DEFAULT NULL COMMENT '标准化编码(H264/H265/HEVC)',
`resolution` VARCHAR(20) DEFAULT NULL COMMENT '分辨率',
`subtype` INT NOT NULL DEFAULT 0 COMMENT '0主码流 1子码流',
`channel` INT DEFAULT NULL COMMENT '通道号',
`need_transcode` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否转码',
-- 状态字段
`enabled` TINYINT NOT NULL DEFAULT 1 COMMENT '1启用 0停用',
`status` INT NOT NULL DEFAULT 1 COMMENT '1可用 0失败',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_path_name` (`path_name`),
KEY `idx_camera_id` (`camera_id`),
KEY `idx_codec_transcode` (`video_codec`, `need_transcode`),
KEY `idx_channel_subtype` (`camera_id`, `channel`, `subtype`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='流路径表';

设计要点:

  1. 路径唯一性
    • uk_path_name:全局唯一的媒体入口标识
    • 所有下游服务依赖此路径
  2. 编码决策字段
    • video_codec:标准化编码格式
    • need_transcode:转码决策结果
    • 索引支持”按编码类型统计转码比例”
  3. 流类型约束
    • (camera_id, channel, subtype):保证同设备同通道同类型只有一个生效路径

3.4 离线事件表

DROP TABLE IF EXISTS `camera_offline_event`;
CREATE TABLE `camera_offline_event` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`device_id` BIGINT NOT NULL COMMENT '设备ID',
`device_ip` VARCHAR(50) DEFAULT NULL COMMENT '设备IP(冗余)',
`group_id` BIGINT DEFAULT NULL COMMENT '分组ID(冗余)',
-- 事件时间线
`start_time` DATETIME NOT NULL COMMENT '离线开始时间',
`last_check_time` DATETIME NOT NULL COMMENT '最后检测时间',
`end_time` DATETIME DEFAULT NULL COMMENT '恢复在线时间',
`duration` BIGINT DEFAULT 0 COMMENT '离线时长(秒)',
-- 事件状态
`is_resolved` TINYINT DEFAULT 0 COMMENT '1已恢复 0未恢复',
`alarm_status` TINYINT DEFAULT 0 COMMENT '1已告警 0未告警',
`remark` VARCHAR(255) DEFAULT NULL COMMENT '备注',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `idx_device_status` (`device_id`, `is_resolved`),
KEY `idx_time` (`start_time`, `end_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='设备离线事件表';

设计要点:

  1. 事件闭环

    • is_resolved:区分进行中和已结束事件
    • duration:离线时长统计
  2. 冗余字段

    • device_ip/group_id:避免关联查询
    • 用于告警消息快速组装
  3. 时间索引

    • 支持按时间范围查询历史离线记录
    • 用于生成离线率报表

4. ONVIF 协议深度解析与实现

4.1 ONVIF 调用最小可用集

在实际生产环境中,并非所有 ONVIF 接口都需要实现。我们根据业务需求,定义了最小可用集:

接口服务用途调用时机
GetDeviceInformationDevice获取厂商/型号/序列号/固件版本设备接入时
GetCapabilitiesDevice获取各服务端点地址设备接入时
GetProfilesMedia获取流配置列表流同步时
GetStreamUriMedia获取 RTSP 播放地址流同步时
GetSnapshotUriMedia获取快照地址流同步时
GetSystemDateAndTimeDevice获取设备时间在线心跳

为什么是这 6 个接口?

  1. 接入必需:GetDeviceInformationGetCapabilities 是设备身份识别和能力发现的基础
  2. 流管理核心:GetProfilesGetStreamUriGetSnapshotUri 构成完整的媒体流管理
  3. 状态监控:GetSystemDateAndTime 作为心跳接口,开销小且稳定

4.2 设备侧配置要求

很多接入失败的根因不是代码问题,而是设备侧未正确配置。建议在接入前逐项确认:

1. 服务开启

  • ✅ ONVIF 服务已启用
  • ✅ RTSP 服务已启用
  • ❌ 常见错误:只开启了 Web 管理,未开启 ONVIF

2. 账号权限

建议创建专用 ONVIF 账号,而非复用管理员账号
最小权限集:
- 读取设备信息
- 读取媒体配置
- 读取流 URI
不需要:
- PTZ 控制权限
- 参数修改权限

3. 时间同步

Terminal window
# NTP 配置示例 (设备端)
NTP Server: ntp.aliyun.com
Time Zone: GMT+8
Sync Interval: 3600s

原因:WS-Security 要求客户端与设备时间误差 < 5 秒

4. 网络配置

防火墙规则:
允许入站: TCP 80 (ONVIF), TCP 554 (RTSP)
允许出站: TCP 任意 (用于 RTSP 数据回传)
跨网段访问:
确保路由可达
确认 NAT 规则 (若设备在内网)

4.3 WS-Security 认证实现

ONVIF 使用 UsernameToken 认证方式,核心步骤:

1. 生成 Nonce

byte[] nonce = new byte[16];
SecureRandom.getInstanceStrong().nextBytes(nonce);
String nonceBase64 = Base64.getEncoder().encodeToString(nonce);

2. 获取 UTC 时间

00.000Z
String created = ZonedDateTime.now(ZoneOffset.UTC)
.format(DateTimeFormatter.ISO_INSTANT);

3. 计算 PasswordDigest

byte[] digest = MessageDigest.getInstance("SHA-1").digest(
(nonce + created + password).getBytes(StandardCharsets.UTF_8)
);
String passwordDigest = Base64.getEncoder().encodeToString(digest);

完整 SOAP 请求封装

public class OnvifSoapClient {
public Document sendSoapRequest(String url, String username, String password, String body)
throws Exception {
String envelope = buildSoapEnvelope(username, password, body);
HttpResponse response = HttpRequest.post(url)
.header("Content-Type", "application/soap+xml; charset=utf-8")
.timeout(5000)
.body(envelope)
.execute();
if (!response.isOk()) {
throw new OnvifException("HTTP " + response.getStatus());
}
Document doc = XmlUtil.readXML(response.body());
validateSoapFault(doc);
return doc;
}
private String buildSoapEnvelope(String username, String password, String body) {
String created = utcNow();
byte[] nonce = generateNonce();
String nonceBase64 = Base64.getEncoder().encodeToString(nonce);
String digest = calculateDigest(nonce, created, password);
return "<?xml version=\"1.0\" encoding=\"utf-8\"?>" +
"<s:Envelope xmlns:s=\"http://www.w3.org/2003/05/soap-envelope\">" +
"<s:Header>" +
"<Security s:mustUnderstand=\"1\" " +
"xmlns=\"http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd\">" +
"<UsernameToken>" +
"<Username>" + username + "</Username>" +
"<Password Type=\"http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordDigest\">" +
digest + "</Password>" +
"<Nonce EncodingType=\"http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary\">" +
nonceBase64 + "</Nonce>" +
"<Created xmlns=\"http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd\">" +
created + "</Created>" +
"</UsernameToken>" +
"</Security>" +
"</s:Header>" +
"<s:Body>" + body + "</s:Body>" +
"</s:Envelope>";
}
private void validateSoapFault(Document doc) throws OnvifFaultException {
NodeList faults = doc.getElementsByTagNameNS("*", "Fault");
if (faults.getLength() > 0) {
Element fault = (Element) faults.item(0);
String code = getTextContent(fault, "Code", "Value");
String reason = getTextContent(fault, "Reason", "Text");
throw new OnvifFaultException(code, reason);
}
}
}

为什么必须验证 SOAP Fault?

大量设备会返回 HTTP 200 + SOAP Fault,如果不解析 Fault,会导致:

  1. 接入误成功,产生脏数据
  2. 在线误判,影响运维决策
  3. 错误告警,消耗人工排查成本

常见 SOAP Fault 示例

<s:Envelope>
<s:Body>
<s:Fault>
<s:Code>
<s:Value>s:Sender</s:Value>
<s:Subcode>
<s:Value>ter:InvalidArgVal</s:Value>
</s:Subcode>
</s:Code>
<s:Reason>
<s:Text xml:lang="en">Invalid ProfileToken</s:Text>
</s:Reason>
</s:Fault>
</s:Body>
</s:Envelope>

4.4 核心接口调用示例

1. GetDeviceInformation

public class OnvifDeviceService {
public DeviceInfo getDeviceInformation(String ip, int port, String user, String pass)
throws OnvifException {
String url = String.format("http://%s:%d/onvif/device_service", ip, port);
String body = "<tds:GetDeviceInformation xmlns:tds=\"http://www.onvif.org/ver10/device/wsdl\" />";
Document doc = soapClient.sendSoapRequest(url, user, pass, body);
return DeviceInfo.builder()
.manufacturer(getText(doc, "Manufacturer"))
.model(getText(doc, "Model"))
.firmwareVersion(getText(doc, "FirmwareVersion"))
.serialNumber(getText(doc, "SerialNumber"))
.hardwareId(getText(doc, "HardwareId"))
.build();
}
}

响应示例:

<GetDeviceInformationResponse>
<Manufacturer>Hikvision</Manufacturer>
<Model>DS-2CD2142FWD-I</Model>
<FirmwareVersion>V5.5.0</FirmwareVersion>
<SerialNumber>DS-2CD2142FWD-I20180101AAWRB12345678</SerialNumber>
<HardwareId>88</HardwareId>
</GetDeviceInformationResponse>

2. GetCapabilities

public Capabilities getCapabilities(String ip, int port, String user, String pass)
throws OnvifException {
String url = String.format("http://%s:%d/onvif/device_service", ip, port);
String body =
"<tds:GetCapabilities xmlns:tds=\"http://www.onvif.org/ver10/device/wsdl\">" +
"<tds:Category>Media</tds:Category>" +
"</tds:GetCapabilities>";
Document doc = soapClient.sendSoapRequest(url, user, pass, body);
String mediaXAddr = getText(doc, "Media", "XAddr");
if (mediaXAddr == null || mediaXAddr.isEmpty()) {
// 降级处理:使用 Device Service 地址
mediaXAddr = url;
}
return Capabilities.builder()
.mediaXAddr(mediaXAddr)
.build();
}

为什么需要 GetCapabilities?

不同设备的服务端点地址可能不同:

  • 标准地址:http://10.0.1.20/onvif/media_service
  • 厂商定制:http://10.0.1.20/onvif/device_service
  • 带端口:http://10.0.1.20:8080/onvif/Media

通过 GetCapabilities 动态获取,避免硬编码。

3. GetProfiles

public List<OnvifProfile> getProfiles(String mediaXAddr, String user, String pass)
throws OnvifException {
String body = "<trt:GetProfiles xmlns:trt=\"http://www.onvif.org/ver10/media/wsdl\" />";
Document doc = soapClient.sendSoapRequest(mediaXAddr, user, pass, body);
List<OnvifProfile> profiles = new ArrayList<>();
NodeList nodes = doc.getElementsByTagNameNS("*", "Profiles");
for (int i = 0; i < nodes.getLength(); i++) {
Element elem = (Element) nodes.item(i);
OnvifProfile profile = OnvifProfile.builder()
.token(elem.getAttribute("token"))
.name(getText(elem, "Name"))
.videoSourceToken(getText(elem, "VideoSourceConfiguration", "SourceToken"))
.videoEncodingToken(getText(elem, "VideoEncoderConfiguration", "token"))
.encoding(getText(elem, "VideoEncoderConfiguration", "Encoding"))
.width(getInt(elem, "VideoEncoderConfiguration", "Resolution", "Width"))
.height(getInt(elem, "VideoEncoderConfiguration", "Resolution", "Height"))
.build();
profiles.add(profile);
}
return profiles;
}

Profile 解析要点:

  1. Token 提取
    • token 属性是后续获取流地址的关键
  2. 通道号提取
// 从 SourceToken 提取通道号
// 例如: VideoSource_1 -> 1
private int extractChannel(String sourceToken) {
if (sourceToken == null) return 1;
Matcher m = Pattern.compile("\\d+").matcher(sourceToken);
return m.find() ? Integer.parseInt(m.group()) : 1;
}
  1. 主/子码流判断
private int detectSubtype(int width, int height, String name) {
if (width >= 1280 || height >= 720) return 0; // 主码流
if (name != null && name.toLowerCase().contains("sub")) return 1;
return 1; // 默认子码流
}

4. GetStreamUri

public String getStreamUri(String mediaXAddr, String user, String pass, String profileToken)
throws OnvifException {
String body =
"<trt:GetStreamUri xmlns:trt=\"http://www.onvif.org/ver10/media/wsdl\" " +
"xmlns:tt=\"http://www.onvif.org/ver10/schema\">" +
"<trt:StreamSetup>" +
"<tt:Stream>RTP-Unicast</tt:Stream>" +
"<tt:Transport><tt:Protocol>RTSP</tt:Protocol></tt:Transport>" +
"</trt:StreamSetup>" +
"<trt:ProfileToken>" + profileToken + "</trt:ProfileToken>" +
"</trt:GetStreamUri>";
Document doc = soapClient.sendSoapRequest(mediaXAddr, user, pass, body);
String uri = getText(doc, "Uri");
// 注入凭据
return injectCredentials(uri, user, pass);
}
private String injectCredentials(String rtspUrl, String user, String pass) {
try {
URI uri = new URI(rtspUrl);
String userInfo = user + ":" + pass;
return new URI(
uri.getScheme(),
userInfo,
uri.getHost(),
uri.getPort(),
uri.getPath(),
uri.getQuery(),
uri.getFragment()
).toString();
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid RTSP URL: " + rtspUrl, e);
}
}

为什么要注入凭据?

大部分设备返回的 RTSP URI 不包含认证信息:

rtsp://10.0.1.20:554/Streaming/Channels/101

需要转换为:

rtsp://admin:password@10.0.1.20:554/Streaming/Channels/101

5. GetSystemDateAndTime (心跳)

public void checkOnline(String ip, int port, String user, String pass) throws OnvifException {
String url = String.format("http://%s:%d/onvif/device_service", ip, port);
String body = "<tds:GetSystemDateAndTime xmlns:tds=\"http://www.onvif.org/ver10/device/wsdl\" />";
// 超时设置为 3 秒
Document doc = soapClient.sendSoapRequest(url, user, pass, body, 3000);
// 能成功调用即表示在线,无需解析返回值
}

为什么选择 GetSystemDateAndTime 作为心跳?

  1. 轻量级:不涉及媒体资源,响应快
  2. 稳定性高:几乎所有设备都实现
  3. 无副作用:只读操作,不会改变设备状态

4.5 厂商差异处理

问题 1: Media XAddr 为空

部分低端设备 GetCapabilities 返回空地址。

解决方案:

String mediaXAddr = capabilities.getMediaXAddr();
if (StringUtils.isBlank(mediaXAddr)) {
// 降级使用 Device Service 地址
mediaXAddr = String.format("http://%s:%d/onvif/device_service", ip, port);
log.warn("Media XAddr empty, fallback to device service for camera {}", cameraId);
}

问题 2: ProfileToken 格式不一致

  • 海康:Profile_1
  • 大华:MediaProfile000
  • 宇视:profile_1_h264

解决方案:

// 不依赖 token 格式,按索引或分辨率选择
OnvifProfile mainStream = profiles.stream()
.filter(p -> p.getWidth() >= 1280)
.findFirst()
.orElse(profiles.get(0));

问题 3: 时间窗口错误

错误示例:

SOAP Fault: The security token could not be authenticated or authorized

原因:服务器时间与设备时间误差 > 5 秒

解决方案:

@Scheduled(cron = "0 */30 * * * ?") // 每 30 分钟执行
public void syncServerTime() {
try {
ProcessBuilder pb = new ProcessBuilder("ntpdate", "ntp.aliyun.com");
Process process = pb.start();
process.waitFor(10, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("NTP sync failed", e);
}
}

4.6 ONVIF 调用性能优化

1. 连接池复用

@Configuration
public class OnvifHttpConfig {
@Bean
public CloseableHttpClient httpClient() {
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
cm.setMaxTotal(200);
cm.setDefaultMaxPerRoute(50);
return HttpClients.custom()
.setConnectionManager(cm)
.setDefaultRequestConfig(
RequestConfig.custom()
.setConnectTimeout(3000)
.setSocketTimeout(5000)
.build()
)
.build();
}
}

2. 设备信息缓存

@Cacheable(value = "device-info", key = "#cameraId", unless = "#result == null")
public DeviceInfo getDeviceInfo(Long cameraId) {
CameraDevice device = cameraRepository.findById(cameraId)
.orElseThrow(() -> new NotFoundException("Camera not found"));
return onvifClient.getDeviceInformation(
device.getIp(),
device.getPort(),
device.getUsername(),
device.getPassword()
);
}

缓存策略:

  • TTL: 24 小时
  • 失效条件:设备固件升级或手动清除

3. 并发控制

private final Semaphore onvifSemaphore = new Semaphore(50);
public <T> T callOnvif(Supplier<T> supplier) throws Exception {
onvifSemaphore.acquire();
try {
return supplier.get();
} finally {
onvifSemaphore.release();
}
}

原因:避免同时调用大量设备导致网络拥塞。


5. MediaMTX 流媒体网关集成

5.1 MediaMTX 核心概念

Path (路径)

Path 是 MediaMTX 中的核心抽象,代表一个独立的媒体流通道。每个 Path 包含:

  • Source:流的来源 (RTSP URL、推流地址、publisher 等)
  • Readers:消费该流的客户端连接
  • State:路径状态 (idle、ready、publishing)

按需拉流 (Source On Demand)

paths:
cam_*:
source: rtsp://admin:pass@10.0.1.20:554/Streaming/Channels/101
sourceOnDemand: yes
sourceOnDemandStartTimeout: 10s
sourceOnDemandCloseAfter: 10s

工作流程:

  1. 初始状态:Path 存在但未拉流
  2. 客户端连接:触发 MediaMTX 拉取源流
  3. 客户端断开:等待 closeAfter 时长后释放资源

优势:

  • 节省带宽 (无观看时不拉流)
  • 降低服务器负载
  • 支持大规模路径管理

按需转码 (Run On Demand)

paths:
cam_h265_*:
source: publisher
runOnDemand: ffmpeg -i $SOURCE_URL -c:v libx264 -f rtsp rtsp://127.0.0.1:$RTSP_PORT/$MTX_PATH
runOnDemandRestart: yes

工作流程:

  1. 客户端连接触发命令执行
  2. FFmpeg 进程启动并推流到 MediaMTX
  3. 客户端断开后根据配置决定是否重启

环境变量:

  • $SOURCE_URL:可用于引用上游地址
  • $RTSP_PORT:MediaMTX RTSP 端口
  • $MTX_PATH:当前路径名

5.2 RESTful API 集成

MediaMTX 提供完整的 API 接口用于动态管理:

API 端点:

  • GET /v3/config/global/get:获取全局配置
  • GET /v3/config/pathdefaults/get:获取路径默认配置
  • POST /v3/config/paths/add/{name}:添加路径
  • GET /v3/config/paths/get/{name}:获取路径配置
  • POST /v3/config/paths/patch/{name}:更新路径配置
  • POST /v3/config/paths/delete/{name}:删除路径
  • GET /v3/config/paths/list:列出所有路径

路径配置结构:

{
"name": "cam_10_0_1_20_ch1_main",
"source": "rtsp://admin:pass@10.0.1.20:554/Streaming/Channels/101",
"sourceProtocol": "tcp",
"sourceOnDemand": true,
"sourceOnDemandStartTimeout": "10s",
"sourceOnDemandCloseAfter": "10s",
"record": false,
"recordPath": "",
"recordFormat": "fmp4",
"recordSegmentDuration": "10m",
"recordDeleteAfter": "7d"
}

5.3 路径命名规范

命名模板:

cam_{ip}_{channel}_{subtype}

示例:

  • cam_10_0_1_20_ch1_main:10.0.1.20 设备第 1 通道主码流
  • cam_10_0_1_20_ch1_sub:10.0.1.20 设备第 1 通道子码流
  • cam_10_0_1_21_ch2_main:10.0.1.21 设备第 2 通道主码流

命名规范优势:

  1. 可读性:从路径名直接识别设备和通道
  2. 唯一性:全局唯一标识符
  3. 模式匹配:支持通配符配置
paths:
cam_*_main:
# 所有主码流共享配置
cam_*_sub:
# 所有子码流共享配置

5.4 路径注册实现

核心服务类:

@Service
@Slf4j
public class MediaPathService {
@Value("${media.mtx.apiBaseUrl}")
private String apiBaseUrl;
private final RestTemplate restTemplate;
/**
* 注册直连路径
*/
public void registerDirectPath(StreamPath path) {
Map<String, Object> config = new HashMap<>();
config.put("name", path.getPathName());
config.put("source", path.getSourceUrl());
config.put("sourceProtocol", "tcp");
config.put("sourceOnDemand", true);
config.put("sourceOnDemandStartTimeout", "10s");
config.put("sourceOnDemandCloseAfter", "10s");
addPath(path.getPathName(), config);
}
/**
* 注册转码路径
*/
public void registerTranscodePath(StreamPath path) {
String ffmpegCmd = buildFfmpegCommand(path.getSourceUrl());
Map<String, Object> config = new HashMap<>();
config.put("name", path.getPathName());
config.put("source", "publisher");
config.put("runOnDemand", ffmpegCmd);
config.put("runOnDemandRestart", true);
config.put("runOnDemandStartTimeout", "10s");
config.put("runOnDemandCloseAfter", "10s");
addPath(path.getPathName(), config);
}
/**
* 删除并重建路径 (确保配置完全生效)
*/
public void updatePath(StreamPath path) {
deletePath(path.getPathName());
if (path.getNeedTranscode() == 1) {
registerTranscodePath(path);
} else {
registerDirectPath(path);
}
}
private void addPath(String pathName, Map<String, Object> config) {
String url = apiBaseUrl + "/v3/config/paths/add/" + pathName;
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<Map<String, Object>> request = new HttpEntity<>(config, headers);
try {
ResponseEntity<String> response = restTemplate.postForEntity(url, request, String.class);
if (response.getStatusCode().is2xxSuccessful()) {
log.info("Path registered: {}", pathName);
} else {
throw new MediaGatewayException("Failed to register path: " + response.getBody());
}
} catch (Exception e) {
log.error("MediaMTX API error: {}", e.getMessage());
throw new MediaGatewayException("Path registration failed", e);
}
}
private void deletePath(String pathName) {
String url = apiBaseUrl + "/v3/config/paths/delete/" + pathName;
try {
restTemplate.delete(url);
log.info("Path deleted: {}", pathName);
} catch (HttpClientErrorException.NotFound e) {
// 路径不存在,忽略错误
} catch (Exception e) {
log.error("Failed to delete path {}: {}", pathName, e.getMessage());
}
}
private String buildFfmpegCommand(String sourceUrl) {
return String.format(
"ffmpeg -hide_banner -loglevel warning " +
"-rtsp_transport tcp " +
"-i \"%s\" " +
"-an " +
"-c:v libx264 -preset veryfast -crf 23 " +
"-tune zerolatency -pix_fmt yuv420p " +
"-g 50 -keyint_min 50 -sc_threshold 0 " +
"-x264-params repeat-headers=1 " +
"-f rtsp -rtsp_transport tcp " +
"\"rtsp://127.0.0.1:$RTSP_PORT/$MTX_PATH\"",
sourceUrl
);
}
}

为什么使用 Delete + Add 而非 Patch?

  1. 配置完整性:Patch 可能保留旧配置片段,导致状态不一致
  2. 立即生效:Delete + Add 强制重建,确保新配置立刻生效
  3. 语义清晰:每次更新等价于发布新版本

风险与缓解:

  • 短暂中断:删除与添加之间有窗口期
  • 缓解措施:
    • 在低峰期批量变更
    • 添加失败时回滚到旧配置
    • 关键路径增加重试机制

5.5 播放地址生成

RTSP 地址:

rtsp://{MediaMTX_IP}:{RTSP_Port}/{path_name}

示例:

rtsp://10.0.10.1:8554/cam_10_0_1_20_ch1_main

HLS 地址:

http://{MediaMTX_IP}:{HLS_Port}/{path_name}/index.m3u8

示例:

http://10.0.10.1:8888/cam_10_0_1_20_ch1_main/index.m3u8

WebRTC 地址:

http://{MediaMTX_IP}:{WebRTC_Port}/{path_name}

示例:

http://10.0.10.1:8889/cam_10_0_1_20_ch1_main

播放器选择建议:

场景协议延迟兼容性推荐播放器
PC 浏览器实时预览WebRTC500msChrome/EdgeWebRTC.js
移动端 H5HLS3-5siOS/Androidvideo.js
客户端应用RTSP1-2s需原生支持VLC/ffplay
录像回放HLS不关键全平台video.js

6. FFmpeg 编解码治理

6.1 编码探测策略

为什么需要探测?

  1. 浏览器兼容性:Safari 不支持 H.265,Chrome 90+ 才支持
  2. 成本控制:H.264 可直连,H.265 需转码 (CPU 密集)
  3. 质量保障:探测失败的流不应接入系统

探测流程:

@Service
@Slf4j
public class CodecProbeService {
@Value("${codec.probe.ffprobe-bin:ffprobe}")
private String ffprobeBin;
@Value("${codec.probe.timeout-ms:3000}")
private int timeoutMs;
private final Semaphore probeSemaphore = new Semaphore(8);
public CodecInfo probe(String rtspUrl) {
try {
probeSemaphore.acquire();
return doProbe(rtspUrl);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return CodecInfo.unknown();
} finally {
probeSemaphore.release();
}
}
private CodecInfo doProbe(String rtspUrl) {
List<String> command = Arrays.asList(
ffprobeBin,
"-v", "error",
"-rtsp_transport", "tcp",
"-analyzeduration", "1000000",
"-probesize", "32768",
"-select_streams", "v:0",
"-show_entries", "stream=codec_name,width,height,avg_frame_rate",
"-of", "json",
rtspUrl
);
ProcessBuilder pb = new ProcessBuilder(command);
pb.redirectErrorStream(true);
try {
Process process = pb.start();
boolean finished = process.waitFor(timeoutMs, TimeUnit.MILLISECONDS);
if (!finished) {
process.destroyForcibly();
log.warn("ffprobe timeout for {}", maskUrl(rtspUrl));
return CodecInfo.unknown();
}
if (process.exitValue() != 0) {
log.warn("ffprobe failed for {}", maskUrl(rtspUrl));
return CodecInfo.unknown();
}
String output = IOUtils.toString(process.getInputStream(), StandardCharsets.UTF_8);
return parseProbeOutput(output);
} catch (Exception e) {
log.error("ffprobe error for {}: {}", maskUrl(rtspUrl), e.getMessage());
return CodecInfo.unknown();
}
}
private CodecInfo parseProbeOutput(String json) {
try {
JSONObject obj = JSON.parseObject(json);
JSONArray streams = obj.getJSONArray("streams");
if (streams == null || streams.isEmpty()) {
return CodecInfo.unknown();
}
JSONObject stream = streams.getJSONObject(0);
String codecName = stream.getString("codec_name");
int width = stream.getIntValue("width");
int height = stream.getIntValue("height");
String frameRate = stream.getString("avg_frame_rate");
return CodecInfo.builder()
.codec(normalizeCodec(codecName))
.width(width)
.height(height)
.frameRate(parseFrameRate(frameRate))
.build();
} catch (Exception e) {
log.error("Failed to parse ffprobe output", e);
return CodecInfo.unknown();
}
}
private String normalizeCodec(String codecName) {
if (codecName == null) return "UNKNOWN";
String lower = codecName.toLowerCase();
if (lower.contains("h264") || lower.equals("avc")) return "H264";
if (lower.contains("h265") || lower.equals("hevc")) return "H265";
if (lower.contains("mjpeg")) return "MJPEG";
return "UNKNOWN";
}
private double parseFrameRate(String frameRate) {
if (frameRate == null || frameRate.isEmpty()) return 0.0;
try {
if (frameRate.contains("/")) {
String[] parts = frameRate.split("/");
double num = Double.parseDouble(parts[0]);
double den = Double.parseDouble(parts[1]);
return den == 0 ? 0 : num / den;
}
return Double.parseDouble(frameRate);
} catch (Exception e) {
return 0.0;
}
}
private String maskUrl(String url) {
return url.replaceAll("://[^@]+@", "://***:***@");
}
}

并发控制为什么必要?

批量接入 500 路相机时,如果不限制并发:

  • CPU 负载瞬间飙升至 100%
  • 磁盘 I/O 饱和 (ffprobe 需要写临时文件)
  • 导致正常业务请求超时

推荐并发数:

  • 4 核 8G:并发 4-8
  • 8 核 16G:并发 8-16
  • 16 核 32G:并发 16-32

6.2 转码决策引擎

决策矩阵:

编码格式浏览器兼容性决策备注
H264✅ 全兼容直连无需转码
H265⚠️ 部分兼容转码Chrome 90+, Safari 不支持
MJPEG⚠️ 兼容但低效转码带宽占用高
UNKNOWN❌ 未知保守转码探测失败兜底

实现代码:

@Service
public class CodecDecisionService {
@Value("${codec.transcode.enabled:true}")
private boolean transcodeEnabled;
@Value("${codec.transcode.assume-h265-when-unknown:true}")
private boolean assumeH265WhenUnknown;
public boolean needTranscode(String codec) {
if (!transcodeEnabled) {
return false;
}
if (codec == null || "UNKNOWN".equals(codec)) {
return assumeH265WhenUnknown;
}
return "H265".equals(codec) || "HEVC".equals(codec) || "MJPEG".equals(codec);
}
public TranscodeStrategy selectStrategy(CodecInfo info) {
if (!needTranscode(info.getCodec())) {
return TranscodeStrategy.DIRECT;
}
// 根据分辨率选择转码参数
if (info.getWidth() >= 1920 || info.getHeight() >= 1080) {
return TranscodeStrategy.HIGH_QUALITY;
} else if (info.getWidth() >= 1280 || info.getHeight() >= 720) {
return TranscodeStrategy.MEDIUM_QUALITY;
} else {
return TranscodeStrategy.LOW_QUALITY;
}
}
}
public enum TranscodeStrategy {
DIRECT(null, null, null),
HIGH_QUALITY("veryfast", "23", "50"),
MEDIUM_QUALITY("faster", "25", "60"),
LOW_QUALITY("fast", "28", "90");
private final String preset;
private final String crf;
private final String gop;
// constructor and getters
}

6.3 转码命令构建

基础转码命令:

public class FfmpegCommandBuilder {
public String buildTranscodeCommand(String sourceUrl, TranscodeStrategy strategy) {
StringBuilder cmd = new StringBuilder();
cmd.append("ffmpeg -hide_banner -loglevel warning ");
cmd.append("-rtsp_transport tcp ");
cmd.append("-stimeout 5000000 ");
cmd.append("-probesize 5000000 ");
cmd.append("-analyzeduration 5000000 ");
cmd.append("-i \"").append(sourceUrl).append("\" ");
// 禁用音频
cmd.append("-an ");
// 视频编码参数
cmd.append("-c:v libx264 ");
cmd.append("-preset ").append(strategy.getPreset()).append(" ");
cmd.append("-crf ").append(strategy.getCrf()).append(" ");
cmd.append("-tune zerolatency ");
cmd.append("-pix_fmt yuv420p ");
cmd.append("-g ").append(strategy.getGop()).append(" ");
cmd.append("-keyint_min ").append(strategy.getGop()).append(" ");
cmd.append("-sc_threshold 0 ");
cmd.append("-x264-params repeat-headers=1 ");
// 输出格式
cmd.append("-f rtsp -rtsp_transport tcp ");
cmd.append("\"rtsp://127.0.0.1:$RTSP_PORT/$MTX_PATH\"");
return cmd.toString();
}
}

硬件加速版本 (NVENC):

public String buildNvencCommand(String sourceUrl) {
StringBuilder cmd = new StringBuilder();
cmd.append("ffmpeg -hide_banner -loglevel warning ");
cmd.append("-hwaccel cuda -hwaccel_output_format cuda ");
cmd.append("-rtsp_transport tcp ");
cmd.append("-i \"").append(sourceUrl).append("\" ");
cmd.append("-an ");
cmd.append("-c:v h264_nvenc ");
cmd.append("-preset p4 ");
cmd.append("-b:v 3000k ");
cmd.append("-g 50 ");
cmd.append("-f rtsp -rtsp_transport tcp ");
cmd.append("\"rtsp://127.0.0.1:$RTSP_PORT/$MTX_PATH\"");
return cmd.toString();
}

硬件加速检测:

public boolean isNvencAvailable() {
try {
ProcessBuilder pb = new ProcessBuilder("ffmpeg", "-encoders");
Process process = pb.start();
String output = IOUtils.toString(process.getInputStream(), StandardCharsets.UTF_8);
return output.contains("h264_nvenc");
} catch (Exception e) {
return false;
}
}

参数调优指南:

参数推荐值说明影响
-presetveryfast编码速度预设速度 ↑ 质量 ↓
-crf23恒定质量因子值越小质量越高
-tunezerolatency低延迟优化减少缓冲延迟
-g50GOP 大小影响跳转精度
-b:v3000k码率带宽与质量平衡

7. 相机接入完整流程

7.1 接入时序图

[前端] → [接口层] → [业务层] → [ONVIF层] → [设备]
│ │ │ │
│ │ │ └─→ GetDeviceInformation
│ │ │ └─→ GetCapabilities
│ │ │ └─→ GetProfiles
│ │ │ └─→ GetStreamUri (per profile)
│ │ │
│ │ └─→ [数据层] 保存设备主档
│ │ └─→ [异步任务] 流同步
│ │ │
│ │ └─→ FFprobe 探测编码
│ │ └─→ 保存流路径
│ │ └─→ MediaMTX 注册路径
│ │
│ └─→ 返回接入结果

7.2 接入请求模型

@Data
public class CameraAddRequest {
@NotBlank(message = "IP不能为空")
private String ip;
@Min(value = 1, message = "端口范围 1-65535")
@Max(value = 65535, message = "端口范围 1-65535")
private Integer port = 80;
@NotBlank(message = "用户名不能为空")
private String username;
@NotBlank(message = "密码不能为空")
private String password;
private String cameraName;
private Long groupId;
/**
* 0: 标准 ONVIF 流程
* 1: 厂商兜底模式 (跳过 ONVIF,使用固定规则)
*/
private Integer skipOnvif = 0;
/**
* 厂商类型 (skipOnvif=1 时有效)
* 支持: HIKVISION, DAHUA, UNIVIEW
*/
private String vendorType;
}

7.3 接入服务实现

@Service
@Slf4j
public class CameraOnboardingService {
@Resource
private CameraDeviceRepository deviceRepository;
@Resource
private OnvifManager onvifManager;
@Resource
private StreamPathService streamPathService;
@Resource
private CameraGroupService groupService;
@Transactional(rollbackFor = Exception.class)
public CameraDevice addCamera(CameraAddRequest request) {
// 1. 重复性校验
if (deviceRepository.existsByIpAndIsDeleted(request.getIp(), 0)) {
throw new BusinessException("设备已存在: " + request.getIp());
}
// 2. 构建设备实体
CameraDevice device = new CameraDevice();
BeanUtils.copyProperties(request, device);
// 设置默认名称
if (StringUtils.isBlank(device.getCameraName())) {
device.setCameraName(device.getIp());
}
// 3. ONVIF 信息获取
if (request.getSkipOnvif() == 0) {
try {
OnvifDeviceDetail detail = onvifManager.getDeviceInformation(
device.getIp(),
device.getPort(),
device.getUsername(),
device.getPassword()
);
device.setManufacturer(detail.getManufacturer());
device.setModel(detail.getModel());
device.setSerialNumber(detail.getSerialNumber());
device.setFirmwareVersion(detail.getFirmwareVersion());
} catch (OnvifException e) {
log.error("ONVIF failed for {}, fallback to basic mode", request.getIp(), e);
device.setManufacturer("UNKNOWN");
device.setModel("UNKNOWN");
}
} else {
// 厂商兜底模式
device.setManufacturer(request.getVendorType());
device.setModel("Generic");
}
// 4. 设置初始状态
device.setCameraStatus(1);
device.setLastOnlineTime(LocalDateTime.now());
// 5. 解析分组
if (request.getGroupId() == null) {
device.setGroupId(groupService.getDefaultGroupId());
}
// 6. 保存设备
deviceRepository.save(device);
// 7. 异步同步流路径
CompletableFuture.runAsync(() -> {
try {
streamPathService.syncStreamPaths(device, request);
} catch (Exception e) {
log.error("Stream sync failed for camera {}", device.getId(), e);
}
});
return device;
}
}

事务边界设计:

  1. 同步部分 (在事务内):
    • 设备主档写入
    • 基础验证
  2. 异步部分 (事务外):
    • 流路径同步
    • MediaMTX 注册

为什么异步?

  • ONVIF 调用可能超时 (3-5秒)
  • FFprobe 探测耗时 (2-3秒/路)
  • 避免接口长时间阻塞

7.4 流同步服务

标准 ONVIF 模式:

@Service
@Slf4j
public class StreamPathService {
@Resource
private StreamPathRepository pathRepository;
@Resource
private OnvifManager onvifManager;
@Resource
private CodecProbeService codecProbeService;
@Resource
private CodecDecisionService codecDecisionService;
@Resource
private MediaPathService mediaPathService;
@Transactional(rollbackFor = Exception.class)
public void syncStreamPaths(CameraDevice device, CameraAddRequest request) {
if (request.getSkipOnvif() == 1) {
syncStreamPathsVendorFallback(device, request);
return;
}
// 1. 获取 Profiles
List<OnvifProfile> profiles = onvifManager.getProfiles(
device.getIp(),
device.getPort(),
device.getUsername(),
device.getPassword()
);
if (profiles.isEmpty()) {
throw new BusinessException("No profiles found for camera: " + device.getIp());
}
// 2. 删除旧路径
pathRepository.deleteAllByCameraId(device.getId());
// 3. 处理每个 Profile
for (OnvifProfile profile : profiles) {
try {
processProfile(device, profile);
} catch (Exception e) {
log.error("Failed to process profile {} for camera {}",
profile.getToken(), device.getId(), e);
}
}
}
private void processProfile(CameraDevice device, OnvifProfile profile) {
// 1. 获取 RTSP URI
String rtspUri = onvifManager.getStreamUri(
device.getIp(),
device.getPort(),
device.getUsername(),
device.getPassword(),
profile.getToken()
);
if (StringUtils.isBlank(rtspUri)) {
log.warn("Empty RTSP URI for profile {}", profile.getToken());
return;
}
// 2. 注入凭据
String sourceUrl = injectCredentials(rtspUri, device.getUsername(), device.getPassword());
// 3. 探测编码
CodecInfo codecInfo = codecProbeService.probe(sourceUrl);
String codec = codecInfo.getCodec();
boolean needTranscode = codecDecisionService.needTranscode(codec);
// 4. 构建路径名
int channel = extractChannel(profile.getVideoSourceToken());
int subtype = detectSubtype(profile.getWidth(), profile.getName());
String pathName = buildPathName(device.getIp(), channel, subtype);
// 5. 保存路径记录
StreamPath path = StreamPath.builder()
.cameraId(device.getId())
.pathName(pathName)
.sourceUrl(sourceUrl)
.profileToken(profile.getToken())
.profileName(profile.getName())
.videoEncoding(profile.getEncoding())
.videoCodec(codec)
.resolution(profile.getWidth() + "x" + profile.getHeight())
.subtype(subtype)
.channel(channel)
.needTranscode(needTranscode ? 1 : 0)
.enabled(1)
.status(1)
.build();
pathRepository.save(path);
// 6. 注册到 MediaMTX
try {
mediaPathService.updatePath(path);
log.info("Path registered: {}", pathName);
} catch (Exception e) {
log.error("MediaMTX registration failed for {}", pathName, e);
path.setStatus(0);
pathRepository.updateById(path);
}
}
private String buildPathName(String ip, int channel, int subtype) {
String ipNormalized = ip.replace(".", "_");
String subtypeStr = (subtype == 0) ? "main" : "sub";
return String.format("cam_%s_ch%d_%s", ipNormalized, channel, subtypeStr);
}
private int extractChannel(String sourceToken) {
if (sourceToken == null) return 1;
Matcher m = Pattern.compile("\\d+").matcher(sourceToken);
return m.find() ? Integer.parseInt(m.group()) : 1;
}
private int detectSubtype(int width, String name) {
// 主码流判断
if (width >= 1280) return 0;
// 名称包含 sub/stream2 等关键字
if (name != null) {
String lower = name.toLowerCase();
if (lower.contains("sub") || lower.contains("stream2")) {
return 1;
}
}
// 默认子码流
return 1;
}
private String injectCredentials(String rtspUrl, String user, String pass) {
try {
URI uri = new URI(rtspUrl);
String userInfo = URLEncoder.encode(user, "UTF-8") + ":" +
URLEncoder.encode(pass, "UTF-8");
return new URI(
uri.getScheme(),
userInfo,
uri.getHost(),
uri.getPort(),
uri.getPath(),
uri.getQuery(),
uri.getFragment()
).toString();
} catch (Exception e) {
throw new IllegalArgumentException("Invalid RTSP URL: " + rtspUrl, e);
}
}
}

厂商兜底模式:

private void syncStreamPathsVendorFallback(CameraDevice device, CameraAddRequest request) {
String vendor = request.getVendorType();
if (!"HIKVISION".equalsIgnoreCase(vendor)) {
throw new BusinessException("Unsupported vendor fallback: " + vendor);
}
// 海康固定规则
List<String> rtspTemplates = Arrays.asList(
"rtsp://{ip}:554/Streaming/Channels/101", // 主码流
"rtsp://{ip}:554/Streaming/Channels/102" // 子码流
);
pathRepository.deleteAllByCameraId(device.getId());
for (int i = 0; i < rtspTemplates.size(); i++) {
String template = rtspTemplates.get(i);
String rtspUrl = template.replace("{ip}", device.getIp());
String sourceUrl = injectCredentials(rtspUrl, device.getUsername(), device.getPassword());
CodecInfo codecInfo = codecProbeService.probe(sourceUrl);
String codec = codecInfo.getCodec();
boolean needTranscode = codecDecisionService.needTranscode(codec);
int subtype = i; // 0=main, 1=sub
String pathName = buildPathName(device.getIp(), 1, subtype);
StreamPath path = StreamPath.builder()
.cameraId(device.getId())
.pathName(pathName)
.sourceUrl(sourceUrl)
.videoCodec(codec)
.subtype(subtype)
.channel(1)
.needTranscode(needTranscode ? 1 : 0)
.enabled(1)
.status(1)
.build();
pathRepository.save(path);
mediaPathService.updatePath(path);
}
}

7.5 接入失败处理

失败分类:

  1. ONVIF 连接失败
    • 原因:网络不通、端口错误、服务未启动
    • 处理:返回明确错误信息,不创建设备记录
  2. 认证失败
    • 原因:用户名密码错误、权限不足
    • 处理:提示用户检查凭据
  3. 能力不足
    • 原因:设备不支持 Media Service
    • 处理:降级到厂商兜底模式
  4. 流探测失败
    • 原因:RTSP 不可达、编码异常
    • 处理:标记路径状态为失败,允许后续重试

错误示例:

{
"code": 400,
"message": "设备接入失败",
"details": {
"step": "ONVIF_DEVICE_INFO",
"error": "Connection timeout",
"suggestion": "请检查设备网络连接和 ONVIF 服务是否启动"
}
}

8. 设备状态巡检与事件闭环

8.1 在线巡检机制

巡检策略:

@Component
@Slf4j
public class CameraHealthCheckTask {
@Resource
private CameraDeviceRepository deviceRepository;
@Resource
private OnvifManager onvifManager;
@Resource
private CameraOfflineEventService offlineEventService;
@Value("${camera.health.check-interval-ms:30000}")
private long checkInterval;
@Scheduled(fixedDelayString = "${camera.health.check-interval-ms:30000}")
public void checkAllDevices() {
List<CameraDevice> devices = deviceRepository.findAllActive();
log.info("Starting health check for {} devices", devices.size());
for (CameraDevice device : devices) {
try {
checkSingleDevice(device);
} catch (Exception e) {
log.error("Health check failed for device {}", device.getId(), e);
}
}
}
private void checkSingleDevice(CameraDevice device) {
boolean online = isOnline(device);
LocalDateTime now = LocalDateTime.now();
if (online) {
handleOnline(device, now);
} else {
handleOffline(device, now);
}
}
private boolean isOnline(CameraDevice device) {
try {
onvifManager.getSystemDateAndTime(
device.getIp(),
device.getPort(),
device.getUsername(),
device.getPassword()
);
return true;
} catch (Exception e) {
log.debug("Device {} offline: {}", device.getId(), e.getMessage());
return false;
}
}
@Transactional(rollbackFor = Exception.class)
protected void handleOnline(CameraDevice device, LocalDateTime checkTime) {
// 状态变化:离线 -> 在线
if (device.getCameraStatus() == 0) {
device.setCameraStatus(1);
device.setLastOnlineTime(checkTime);
deviceRepository.updateById(device);
// 闭环离线事件
offlineEventService.resolveEvent(device.getId(), checkTime);
log.info("Device {} back online", device.getId());
} else {
// 刷新在线时间
device.setLastOnlineTime(checkTime);
deviceRepository.updateById(device);
}
}
@Transactional(rollbackFor = Exception.class)
protected void handleOffline(CameraDevice device, LocalDateTime checkTime) {
// 状态变化:在线 -> 离线
if (device.getCameraStatus() == 1) {
device.setCameraStatus(0);
deviceRepository.updateById(device);
// 创建离线事件
offlineEventService.createEvent(device, checkTime);
log.warn("Device {} went offline", device.getId());
} else {
// 更新离线事件
offlineEventService.updateEvent(device.getId(), checkTime);
}
}
}

为什么必须异常隔离?

如果单台设备异常导致整批巡检中断:

  • 其他设备状态陈旧
  • 离线事件无法及时创建
  • 运维告警失效

8.2 离线事件管理

@Service
@Slf4j
public class CameraOfflineEventService {
@Resource
private CameraOfflineEventRepository eventRepository;
@Transactional(rollbackFor = Exception.class)
public void createEvent(CameraDevice device, LocalDateTime startTime) {
CameraOfflineEvent event = CameraOfflineEvent.builder()
.deviceId(device.getId())
.deviceIp(device.getIp())
.groupId(device.getGroupId())
.startTime(startTime)
.lastCheckTime(startTime)
.isResolved(0)
.alarmStatus(0)
.build();
eventRepository.save(event);
log.info("Offline event created for device {}", device.getId());
}
@Transactional(rollbackFor = Exception.class)
public void updateEvent(Long deviceId, LocalDateTime checkTime) {
CameraOfflineEvent event = eventRepository.findLatestByDeviceId(deviceId);
if (event == null || event.getIsResolved() == 1) {
// 事件已闭环或不存在,不应到达此分支
log.warn("No active offline event for device {}", deviceId);
return;
}
event.setLastCheckTime(checkTime);
long duration = Duration.between(event.getStartTime(), checkTime).getSeconds();
event.setDuration(duration);
eventRepository.updateById(event);
}
@Transactional(rollbackFor = Exception.class)
public void resolveEvent(Long deviceId, LocalDateTime endTime) {
CameraOfflineEvent event = eventRepository.findLatestByDeviceId(deviceId);
if (event == null || event.getIsResolved() == 1) {
return;
}
event.setEndTime(endTime);
event.setIsResolved(1);
long duration = Duration.between(event.getStartTime(), endTime).getSeconds();
event.setDuration(duration);
eventRepository.updateById(event);
log.info("Offline event resolved for device {}, duration: {}s", deviceId, duration);
}
/**
* 查询当前未恢复的离线设备
*/
public List<CameraOfflineEvent> listActiveEvents() {
return eventRepository.findByIsResolved(0);
}
/**
* 按时间范围查询离线历史
*/
public List<CameraOfflineEvent> queryHistory(LocalDateTime start, LocalDateTime end) {
return eventRepository.findByStartTimeBetween(start, end);
}
}

事件状态机:

[设备在线] --检测离线--> [创建事件] (is_resolved=0)
│ │
│ ├--持续离线--> [更新 duration]
│ │
│ └--恢复在线--> [闭环事件] (is_resolved=1, end_time)
└--始终在线--> [无事件]

离线率统计示例:

public OfflineStatistics calculateOfflineRate(LocalDateTime start, LocalDateTime end) {
List<CameraOfflineEvent> events = queryHistory(start, end);
long totalDuration = events.stream()
.mapToLong(CameraOfflineEvent::getDuration)
.sum();
long windowDuration = Duration.between(start, end).getSeconds();
double offlineRate = (double) totalDuration / windowDuration * 100;
return OfflineStatistics.builder()
.eventCount(events.size())
.totalOfflineDuration(totalDuration)
.offlineRate(offlineRate)
.build();
}