从 0 到 1 构建企业级视频智能分析平台:完整工程实践(一)

从 0 到 1 构建企业级视频智能分析平台:完整工程实践
一份可直接落地的全栈技术方案 覆盖相机接入、流媒体编排、存储治理、智能分析全链路 包含架构设计、核心代码、数据模型、运维体系与性能基准
目录
第一部分:平台基础与相机接入
第二部分:录像存储系统
第三部分:智能分析平台
第四部分:运维与演进
第一部分:平台基础与相机接入
1. 问题定义与架构设计
1.1 业务背景与核心挑战
在构建视频智能分析平台的过程中,我们面临着多个维度的工程挑战:
设备层异构性
- 不同厂商的 ONVIF 协议实现质量参差不齐,部分设备仅支持核心功能子集
- 编码格式混乱:H.264、H.265、MJPEG 共存,浏览器兼容性差异显著
- 网络拓扑复杂:设备位于内网,播放端在外网,NAT 穿透成为常态
状态管理复杂性
- 控制平面可达但媒体平面不可达的"假在线"现象
- 设备离线后缺少事件追踪,无法形成运维闭环
- 流地址变化频繁,缺少统一的路径抽象
扩展性瓶颈
- 录像、算法等下游服务直接依赖设备原始 RTSP,耦合过深
- 缺少统一的媒体出口,后续功能扩展需要大量重复工作
- 编码转换策略不清晰,成本与质量难以平衡
1.2 设计目标
基于上述问题,我们设定了分阶段的建设目标:
第一阶段:建立可扩展底座
- 统一接入层:设备能力发现、凭据校验、元信息持久化
- 统一流模型:自动生成主/子码流路径,建立路径到设备的映射关系
- 统一媒体出口:通过 MediaMTX 提供可控、可观测的播放入口
- 编码治理:先探测、后决策,按需转码,控制成本
- 状态闭环:在线巡检 + 离线事件追踪,形成完整生命周期管理
第二阶段:录像与存储
- 录像策略模型化,支持多种录制模式
- 存储卷治理,容量可观测、可切换
- 文件索引构建,回放检索高效化
第三阶段:智能分析
- 一个任务绑定一台相机,支持多算法场景并行
- 算法节点能力建模与调度
- 实时结果处理与告警分发
1.3 总体架构
我们采用分层解耦的架构设计:
┌─────────────────────────────────────────────────────────┐
│ API Gateway │
│ (RESTful API + WebSocket) │
└────────────────┬────────────────────────────────────────┘
│
┌────────────────┴────────────────────────────────────────┐
│ Application Layer │
│ ┌──────────────┬──────────────┬─────────────────────┐ │
│ │ 相机接入服务 │ 录像编排服务 │ 算法任务编排服务 │ │
│ └──────────────┴──────────────┴─────────────────────┘ │
└────────────────┬────────────────────────────────────────┘
│
┌────────────────┴────────────────────────────────────────┐
│ Infrastructure Layer │
│ ┌──────────────┬──────────────┬─────────────────────┐ │
│ │ ONVIF 适配层 │ MediaMTX网关 │ FFmpeg编码引擎 │ │
│ └──────────────┴──────────────┴─────────────────────┘ │
│ ┌──────────────┬──────────────┬─────────────────────┐ │
│ │ MySQL │ 存储卷管理 │ 算法节点池 │ │
│ └──────────────┴──────────────┴─────────────────────┘ │
└─────────────────────────────────────────────────────────┘核心设计原则
- 控制面与媒体面解耦
- ONVIF 负责设备能力发现和控制指令
- MediaMTX 负责媒体流路径管理和分发
- 两者通过数据库中的路径映射关联
- 路径抽象统一化
- 所有下游服务(录像/算法/播放)只依赖平台分配的路径
- 设备地址变化时,只需更新路径配置,不影响消费端
- 异步化与容错
- 接入流程中的流同步采用异步执行
- 媒体注册失败支持延迟重试
- 关键操作支持事务回滚
2. 技术选型与核心组件
2.1 ONVIF 协议详解
什么是 ONVIF
ONVIF (Open Network Video Interface Forum) 是一个开放的网络视频设备接口标准,旨在实现不同厂商的 IP 摄像机、录像机和视频管理软件之间的互操作性。ONVIF 基于 Web Services 技术栈,核心协议包括:
- SOAP (Simple Object Access Protocol):消息封装协议
- WSDL (Web Services Description Language):服务描述语言
- WS-Security:安全认证机制
- WS-Discovery:设备发现协议
ONVIF 服务架构
ONVIF 将功能划分为多个服务端点:
- Device Service (
/onvif/device_service)- 设备基础信息 (厂商、型号、序列号、固件版本)
- 系统时间同步
- 能力集发现 (Capabilities)
- 网络配置
- Media Service (
/onvif/media_service)- Profile 管理 (流配置)
- 获取 RTSP URI
- 获取快照 URI
- 视频编码器配置
- PTZ Service (
/onvif/ptz_service)- 云台控制
- 预置位管理
- Events Service (
/onvif/events_service)- 事件订阅
- 运动检测通知
为什么选择 ONVIF
- 标准化程度高:主流厂商均支持,减少适配成本
- 功能完整性:覆盖设备管理到媒体控制的全生命周期
- 可扩展性强:通过 Profile 机制支持不同能力级别
- 生态成熟:丰富的开源库和工具链
ONVIF 的局限性与应对
- 厂商实现差异
- 问题:部分厂商仅实现 Profile S (Streaming),缺少 PTZ 等扩展能力
- 应对:设计时采用能力发现机制,降级处理缺失功能
- 性能开销
- 问题:SOAP 协议 XML 解析开销较大
- 应对:缓存设备信息,减少重复调用;关键路径使用连接池
- 时间同步敏感
- 问题:WS-Security 要求客户端与设备时间误差小于 5 秒
- 应对:定期校准服务器 NTP;捕获时间窗口错误并重试
2.2 MediaMTX 流媒体网关
什么是 MediaMTX
MediaMTX (前身为 rtsp-simple-server) 是一个现代化的实时流媒体服务器,支持多种协议的统一接入和分发:
- 输入协议:RTSP、RTMP、HLS、WebRTC、SRT
- 输出协议:RTSP、HLS、WebRTC
- 核心特性:
- 按需拉流 (sourceOnDemand)
- 按需转码 (runOnDemand)
- RESTful API 动态配置
- 低延迟 WebRTC 推流
MediaMTX 的技术优势
- 轻量级架构
- 单一 Go 二进制,资源占用低
- 无外部依赖,部署简单
- 协议转换能力
- 统一的内部流模型
- 自动协议适配,降低客户端复杂度
- 动态路径管理
- 支持运行时添加/删除路径
- 版本化配置接口
MediaMTX 在本架构中的作用
- 统一媒体出口
- 所有 RTSP 源流接入 MediaMTX
- 对外提供标准化的播放地址
- 按需资源调度
sourceOnDemand:无客户端时不拉流,节省带宽runOnDemand:触发 FFmpeg 转码进程
- 录像与预览集成
- 原生支持切片录像
- 实时预览与录像共享同一路径
MediaMTX 路径配置示例
paths:
cam_10_0_1_20_ch1_main:
source: rtsp://admin:pass@10.0.1.20:554/Streaming/Channels/101
sourceOnDemand: yes
sourceOnDemandStartTimeout: 10s
sourceOnDemandCloseAfter: 10s
# 录像配置
record: yes
recordPath: /recordings/%path/%Y-%m-%d/%H-%M-%S
recordFormat: fmp4
recordSegmentDuration: 10m
recordDeleteAfter: 7d2.3 FFmpeg 编解码工具链
什么是 FFmpeg
FFmpeg 是一个完整的跨平台音视频处理工具集,包含:
- ffmpeg:转码工具
- ffprobe:流分析工具
- ffplay:播放器
- libav*:编解码库
核心概念
- 容器 (Container)
- 文件格式,如 MP4、FLV、TS
- 存储音视频流的封装层
- 编解码器 (Codec)
- 视频:H.264、H.265 (HEVC)、VP9、AV1
- 音频:AAC、MP3、Opus
- 滤镜 (Filter)
- 视频处理:缩放、裁剪、叠加
- 音频处理:混音、变速
常用命令详解
1. ffprobe 流探测
ffprobe -v error \
-rtsp_transport tcp \
-analyzeduration 1000000 \
-probesize 32768 \
-select_streams v:0 \
-show_entries stream=codec_name,width,height,avg_frame_rate \
-of json \
"rtsp://admin:pass@10.0.1.20:554/Streaming/Channels/101"参数解释:
-v error:仅输出错误信息-rtsp_transport tcp:强制使用 TCP (避免 UDP 丢包)-analyzeduration:分析时长 (微秒)-probesize:探测数据大小 (字节)-select_streams v:0:仅选择第一个视频流-show_entries:指定输出字段-of json:输出格式为 JSON
输出示例:
{
"streams": [{
"codec_name": "h264",
"width": 1920,
"height": 1080,
"avg_frame_rate": "25/1"
}]
}2. H.265 转 H.264
ffmpeg -hide_banner -loglevel warning \
-rtsp_transport tcp \
-stimeout 5000000 \
-i "rtsp://admin:pass@10.0.1.20:554/Streaming/Channels/101" \
-an \
-c:v libx264 \
-preset veryfast \
-crf 23 \
-tune zerolatency \
-pix_fmt yuv420p \
-g 50 \
-keyint_min 50 \
-sc_threshold 0 \
-x264-params repeat-headers=1 \
-f rtsp \
-rtsp_transport tcp \
"rtsp://127.0.0.1:8554/output"关键参数说明:
-an:禁用音频 (减少处理开销)-c:v libx264:使用 x264 编码器-preset veryfast:编码速度预设 (速度优先)-crf 23:恒定质量因子 (18-28,值越小质量越高)-tune zerolatency:优化低延迟场景-pix_fmt yuv420p:像素格式 (浏览器兼容)-g 50:GOP 大小 (关键帧间隔)-sc_threshold 0:禁用场景切换检测-x264-params repeat-headers=1:每个关键帧重复 SPS/PPS
3. 硬件加速转码 (NVIDIA)
ffmpeg -hwaccel cuda -hwaccel_output_format cuda \
-i input.mp4 \
-c:v h264_nvenc \
-preset p4 \
-b:v 3000k \
output.mp4参数说明:
-hwaccel cuda:启用 CUDA 硬件加速-hwaccel_output_format cuda:保持 GPU 内存格式-c:v h264_nvenc:使用 NVENC 编码器-preset p4:性能预设 (p1 最快, p7 最慢)
FFmpeg 在本平台中的应用
- 编码探测
- 接入时探测设备编码格式
- 决策是否需要转码
- 实时转码
- H.265 -> H.264 兼容性转换
- 分辨率/码率自适应
- 录像导出
- 按时间范围裁剪
- 多切片拼接
3. 数据模型设计
数据模型是整个系统的基础,直接决定了后续查询效率、扩展性和运维复杂度。我们遵循以下设计原则:
- 职责单一:每张表只负责一类业务实体
- 冗余可控:适度冗余提升查询性能,但避免数据不一致
- 索引精准:基于实际查询模式建立索引
- 字段语义化:使用明确的枚举值,避免魔法数字
3.1 设备主档表
DROP TABLE IF EXISTS `camera_device`;
CREATE TABLE `camera_device` (
`id` BIGINT NOT NULL COMMENT '主键ID',
`camera_name` VARCHAR(100) DEFAULT NULL COMMENT '设备名称',
`group_id` BIGINT DEFAULT NULL COMMENT '分组ID',
-- ONVIF 连接信息
`ip` VARCHAR(50) NOT NULL COMMENT '设备IP',
`port` INT NOT NULL DEFAULT 80 COMMENT 'ONVIF端口',
`username` VARCHAR(50) NOT NULL COMMENT 'ONVIF用户名',
`password` VARCHAR(255) NOT NULL COMMENT 'ONVIF密码(AES加密)',
-- 设备元信息
`serial_number` VARCHAR(100) DEFAULT NULL COMMENT '序列号',
`manufacturer` VARCHAR(100) DEFAULT NULL COMMENT '厂商',
`model` VARCHAR(100) DEFAULT NULL COMMENT '型号',
`firmware_version` VARCHAR(100) DEFAULT NULL COMMENT '固件版本',
-- 状态字段
`camera_status` INT DEFAULT 0 COMMENT '1在线 0离线',
`last_online_time` DATETIME DEFAULT NULL COMMENT '最后在线时间',
-- 审计字段
`create_user` BIGINT DEFAULT NULL,
`create_dept` BIGINT DEFAULT NULL,
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
`update_user` BIGINT DEFAULT NULL,
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`status` INT DEFAULT 1 COMMENT '1启用 0停用',
`is_deleted` INT DEFAULT 0 COMMENT '1已删除 0正常',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_ip_is_deleted` (`ip`, `is_deleted`),
KEY `idx_group_id` (`group_id`),
KEY `idx_camera_status` (`camera_status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='相机设备主档表';设计要点:
- 唯一性约束
uk_ip_is_deleted:同一 IP 的有效设备全局唯一- 支持软删除后重新接入
- 状态索引
idx_camera_status:支持在线/离线设备快速筛选- 用于首页统计看板
- 安全性
password字段应使用 AES 加密存储- 日志输出时需脱敏处理
3.2 分组表
DROP TABLE IF EXISTS `camera_group`;
CREATE TABLE `camera_group` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`group_name` VARCHAR(100) NOT NULL COMMENT '分组名称',
`parent_id` BIGINT DEFAULT NULL COMMENT '父分组ID',
`group_type` TINYINT DEFAULT 0 COMMENT '0普通分组 1地理分组 2功能分组',
`sort_order` INT DEFAULT 0 COMMENT '排序',
`icon` VARCHAR(50) DEFAULT NULL COMMENT '图标',
`description` VARCHAR(500) DEFAULT NULL COMMENT '描述',
`create_user` BIGINT DEFAULT NULL,
`create_dept` BIGINT DEFAULT NULL,
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
`update_user` BIGINT DEFAULT NULL,
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`status` INT DEFAULT 1,
`is_deleted` TINYINT DEFAULT 0,
PRIMARY KEY (`id`),
KEY `idx_parent_id` (`parent_id`),
KEY `idx_group_type` (`group_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='相机分组表';设计要点:
- 树形结构
- 支持多级分组
- 通过
parent_id构建层级关系
- 类型扩展
group_type预留不同分组语义- 便于后续按地理位置、功能区域等维度组织
3.3 流路径表
DROP TABLE IF EXISTS `camera_stream_path`;
CREATE TABLE `camera_stream_path` (
`id` BIGINT NOT NULL COMMENT '主键ID',
`camera_id` BIGINT NOT NULL COMMENT '设备ID',
`path_name` VARCHAR(255) NOT NULL COMMENT '媒体路径主键',
`source_url` VARCHAR(1000) DEFAULT NULL COMMENT 'RTSP源地址',
-- ONVIF Profile 信息
`profile_token` VARCHAR(64) DEFAULT NULL COMMENT 'ONVIF ProfileToken',
`profile_name` VARCHAR(100) DEFAULT NULL COMMENT 'Profile名称',
`video_encoding` VARCHAR(20) DEFAULT NULL COMMENT '编码格式(原始)',
`video_codec` VARCHAR(16) DEFAULT NULL COMMENT '标准化编码(H264/H265/HEVC)',
`resolution` VARCHAR(20) DEFAULT NULL COMMENT '分辨率',
`subtype` INT NOT NULL DEFAULT 0 COMMENT '0主码流 1子码流',
`channel` INT DEFAULT NULL COMMENT '通道号',
`need_transcode` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否转码',
-- 状态字段
`enabled` TINYINT NOT NULL DEFAULT 1 COMMENT '1启用 0停用',
`status` INT NOT NULL DEFAULT 1 COMMENT '1可用 0失败',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_path_name` (`path_name`),
KEY `idx_camera_id` (`camera_id`),
KEY `idx_codec_transcode` (`video_codec`, `need_transcode`),
KEY `idx_channel_subtype` (`camera_id`, `channel`, `subtype`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='流路径表';设计要点:
- 路径唯一性
uk_path_name:全局唯一的媒体入口标识- 所有下游服务依赖此路径
- 编码决策字段
video_codec:标准化编码格式need_transcode:转码决策结果- 索引支持"按编码类型统计转码比例"
- 流类型约束
(camera_id, channel, subtype):保证同设备同通道同类型只有一个生效路径
3.4 离线事件表
DROP TABLE IF EXISTS `camera_offline_event`;
CREATE TABLE `camera_offline_event` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`device_id` BIGINT NOT NULL COMMENT '设备ID',
`device_ip` VARCHAR(50) DEFAULT NULL COMMENT '设备IP(冗余)',
`group_id` BIGINT DEFAULT NULL COMMENT '分组ID(冗余)',
-- 事件时间线
`start_time` DATETIME NOT NULL COMMENT '离线开始时间',
`last_check_time` DATETIME NOT NULL COMMENT '最后检测时间',
`end_time` DATETIME DEFAULT NULL COMMENT '恢复在线时间',
`duration` BIGINT DEFAULT 0 COMMENT '离线时长(秒)',
-- 事件状态
`is_resolved` TINYINT DEFAULT 0 COMMENT '1已恢复 0未恢复',
`alarm_status` TINYINT DEFAULT 0 COMMENT '1已告警 0未告警',
`remark` VARCHAR(255) DEFAULT NULL COMMENT '备注',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `idx_device_status` (`device_id`, `is_resolved`),
KEY `idx_time` (`start_time`, `end_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='设备离线事件表';设计要点:
事件闭环
is_resolved:区分进行中和已结束事件duration:离线时长统计
冗余字段
device_ip/group_id:避免关联查询- 用于告警消息快速组装
时间索引
- 支持按时间范围查询历史离线记录
- 用于生成离线率报表
4. ONVIF 协议深度解析与实现
4.1 ONVIF 调用最小可用集
在实际生产环境中,并非所有 ONVIF 接口都需要实现。我们根据业务需求,定义了最小可用集:
| 接口 | 服务 | 用途 | 调用时机 |
|---|---|---|---|
GetDeviceInformation | Device | 获取厂商/型号/序列号/固件版本 | 设备接入时 |
GetCapabilities | Device | 获取各服务端点地址 | 设备接入时 |
GetProfiles | Media | 获取流配置列表 | 流同步时 |
GetStreamUri | Media | 获取 RTSP 播放地址 | 流同步时 |
GetSnapshotUri | Media | 获取快照地址 | 流同步时 |
GetSystemDateAndTime | Device | 获取设备时间 | 在线心跳 |
为什么是这 6 个接口?
- 接入必需:
GetDeviceInformation和GetCapabilities是设备身份识别和能力发现的基础 - 流管理核心:
GetProfiles、GetStreamUri、GetSnapshotUri构成完整的媒体流管理 - 状态监控:
GetSystemDateAndTime作为心跳接口,开销小且稳定
4.2 设备侧配置要求
很多接入失败的根因不是代码问题,而是设备侧未正确配置。建议在接入前逐项确认:
1. 服务开启
- ✅ ONVIF 服务已启用
- ✅ RTSP 服务已启用
- ❌ 常见错误:只开启了 Web 管理,未开启 ONVIF
2. 账号权限
建议创建专用 ONVIF 账号,而非复用管理员账号
最小权限集:
- 读取设备信息
- 读取媒体配置
- 读取流 URI
不需要:
- PTZ 控制权限
- 参数修改权限3. 时间同步
# NTP 配置示例 (设备端)
NTP Server: ntp.aliyun.com
Time Zone: GMT+8
Sync Interval: 3600s原因:WS-Security 要求客户端与设备时间误差 < 5 秒
4. 网络配置
防火墙规则:
允许入站: TCP 80 (ONVIF), TCP 554 (RTSP)
允许出站: TCP 任意 (用于 RTSP 数据回传)
跨网段访问:
确保路由可达
确认 NAT 规则 (若设备在内网)4.3 WS-Security 认证实现
ONVIF 使用 UsernameToken 认证方式,核心步骤:
1. 生成 Nonce
byte[] nonce = new byte[16];
SecureRandom.getInstanceStrong().nextBytes(nonce);
String nonceBase64 = Base64.getEncoder().encodeToString(nonce);2. 获取 UTC 时间
String created = ZonedDateTime.now(ZoneOffset.UTC)
.format(DateTimeFormatter.ISO_INSTANT);
// 示例: 2026-02-13T14:30:00.000Z3. 计算 PasswordDigest
byte[] digest = MessageDigest.getInstance("SHA-1").digest(
(nonce + created + password).getBytes(StandardCharsets.UTF_8)
);
String passwordDigest = Base64.getEncoder().encodeToString(digest);完整 SOAP 请求封装
public class OnvifSoapClient {
public Document sendSoapRequest(String url, String username, String password, String body)
throws Exception {
String envelope = buildSoapEnvelope(username, password, body);
HttpResponse response = HttpRequest.post(url)
.header("Content-Type", "application/soap+xml; charset=utf-8")
.timeout(5000)
.body(envelope)
.execute();
if (!response.isOk()) {
throw new OnvifException("HTTP " + response.getStatus());
}
Document doc = XmlUtil.readXML(response.body());
validateSoapFault(doc);
return doc;
}
private String buildSoapEnvelope(String username, String password, String body) {
String created = utcNow();
byte[] nonce = generateNonce();
String nonceBase64 = Base64.getEncoder().encodeToString(nonce);
String digest = calculateDigest(nonce, created, password);
return "<?xml version=\"1.0\" encoding=\"utf-8\"?>" +
"<s:Envelope xmlns:s=\"http://www.w3.org/2003/05/soap-envelope\">" +
"<s:Header>" +
"<Security s:mustUnderstand=\"1\" " +
"xmlns=\"http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd\">" +
"<UsernameToken>" +
"<Username>" + username + "</Username>" +
"<Password Type=\"http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordDigest\">" +
digest + "</Password>" +
"<Nonce EncodingType=\"http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary\">" +
nonceBase64 + "</Nonce>" +
"<Created xmlns=\"http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd\">" +
created + "</Created>" +
"</UsernameToken>" +
"</Security>" +
"</s:Header>" +
"<s:Body>" + body + "</s:Body>" +
"</s:Envelope>";
}
private void validateSoapFault(Document doc) throws OnvifFaultException {
NodeList faults = doc.getElementsByTagNameNS("*", "Fault");
if (faults.getLength() > 0) {
Element fault = (Element) faults.item(0);
String code = getTextContent(fault, "Code", "Value");
String reason = getTextContent(fault, "Reason", "Text");
throw new OnvifFaultException(code, reason);
}
}
}为什么必须验证 SOAP Fault?
大量设备会返回 HTTP 200 + SOAP Fault,如果不解析 Fault,会导致:
- 接入误成功,产生脏数据
- 在线误判,影响运维决策
- 错误告警,消耗人工排查成本
常见 SOAP Fault 示例
<s:Envelope>
<s:Body>
<s:Fault>
<s:Code>
<s:Value>s:Sender</s:Value>
<s:Subcode>
<s:Value>ter:InvalidArgVal</s:Value>
</s:Subcode>
</s:Code>
<s:Reason>
<s:Text xml:lang="en">Invalid ProfileToken</s:Text>
</s:Reason>
</s:Fault>
</s:Body>
</s:Envelope>4.4 核心接口调用示例
1. GetDeviceInformation
public class OnvifDeviceService {
public DeviceInfo getDeviceInformation(String ip, int port, String user, String pass)
throws OnvifException {
String url = String.format("http://%s:%d/onvif/device_service", ip, port);
String body = "<tds:GetDeviceInformation xmlns:tds=\"http://www.onvif.org/ver10/device/wsdl\" />";
Document doc = soapClient.sendSoapRequest(url, user, pass, body);
return DeviceInfo.builder()
.manufacturer(getText(doc, "Manufacturer"))
.model(getText(doc, "Model"))
.firmwareVersion(getText(doc, "FirmwareVersion"))
.serialNumber(getText(doc, "SerialNumber"))
.hardwareId(getText(doc, "HardwareId"))
.build();
}
}响应示例:
<GetDeviceInformationResponse>
<Manufacturer>Hikvision</Manufacturer>
<Model>DS-2CD2142FWD-I</Model>
<FirmwareVersion>V5.5.0</FirmwareVersion>
<SerialNumber>DS-2CD2142FWD-I20180101AAWRB12345678</SerialNumber>
<HardwareId>88</HardwareId>
</GetDeviceInformationResponse>2. GetCapabilities
public Capabilities getCapabilities(String ip, int port, String user, String pass)
throws OnvifException {
String url = String.format("http://%s:%d/onvif/device_service", ip, port);
String body =
"<tds:GetCapabilities xmlns:tds=\"http://www.onvif.org/ver10/device/wsdl\">" +
"<tds:Category>Media</tds:Category>" +
"</tds:GetCapabilities>";
Document doc = soapClient.sendSoapRequest(url, user, pass, body);
String mediaXAddr = getText(doc, "Media", "XAddr");
if (mediaXAddr == null || mediaXAddr.isEmpty()) {
// 降级处理:使用 Device Service 地址
mediaXAddr = url;
}
return Capabilities.builder()
.mediaXAddr(mediaXAddr)
.build();
}为什么需要 GetCapabilities?
不同设备的服务端点地址可能不同:
- 标准地址:
http://10.0.1.20/onvif/media_service - 厂商定制:
http://10.0.1.20/onvif/device_service - 带端口:
http://10.0.1.20:8080/onvif/Media
通过 GetCapabilities 动态获取,避免硬编码。
3. GetProfiles
public List<OnvifProfile> getProfiles(String mediaXAddr, String user, String pass)
throws OnvifException {
String body = "<trt:GetProfiles xmlns:trt=\"http://www.onvif.org/ver10/media/wsdl\" />";
Document doc = soapClient.sendSoapRequest(mediaXAddr, user, pass, body);
List<OnvifProfile> profiles = new ArrayList<>();
NodeList nodes = doc.getElementsByTagNameNS("*", "Profiles");
for (int i = 0; i < nodes.getLength(); i++) {
Element elem = (Element) nodes.item(i);
OnvifProfile profile = OnvifProfile.builder()
.token(elem.getAttribute("token"))
.name(getText(elem, "Name"))
.videoSourceToken(getText(elem, "VideoSourceConfiguration", "SourceToken"))
.videoEncodingToken(getText(elem, "VideoEncoderConfiguration", "token"))
.encoding(getText(elem, "VideoEncoderConfiguration", "Encoding"))
.width(getInt(elem, "VideoEncoderConfiguration", "Resolution", "Width"))
.height(getInt(elem, "VideoEncoderConfiguration", "Resolution", "Height"))
.build();
profiles.add(profile);
}
return profiles;
}Profile 解析要点:
- Token 提取
token属性是后续获取流地址的关键
- 通道号提取
// 从 SourceToken 提取通道号
// 例如: VideoSource_1 -> 1
private int extractChannel(String sourceToken) {
if (sourceToken == null) return 1;
Matcher m = Pattern.compile("\\d+").matcher(sourceToken);
return m.find() ? Integer.parseInt(m.group()) : 1;
}- 主/子码流判断
private int detectSubtype(int width, int height, String name) {
if (width >= 1280 || height >= 720) return 0; // 主码流
if (name != null && name.toLowerCase().contains("sub")) return 1;
return 1; // 默认子码流
}4. GetStreamUri
public String getStreamUri(String mediaXAddr, String user, String pass, String profileToken)
throws OnvifException {
String body =
"<trt:GetStreamUri xmlns:trt=\"http://www.onvif.org/ver10/media/wsdl\" " +
"xmlns:tt=\"http://www.onvif.org/ver10/schema\">" +
"<trt:StreamSetup>" +
"<tt:Stream>RTP-Unicast</tt:Stream>" +
"<tt:Transport><tt:Protocol>RTSP</tt:Protocol></tt:Transport>" +
"</trt:StreamSetup>" +
"<trt:ProfileToken>" + profileToken + "</trt:ProfileToken>" +
"</trt:GetStreamUri>";
Document doc = soapClient.sendSoapRequest(mediaXAddr, user, pass, body);
String uri = getText(doc, "Uri");
// 注入凭据
return injectCredentials(uri, user, pass);
}
private String injectCredentials(String rtspUrl, String user, String pass) {
try {
URI uri = new URI(rtspUrl);
String userInfo = user + ":" + pass;
return new URI(
uri.getScheme(),
userInfo,
uri.getHost(),
uri.getPort(),
uri.getPath(),
uri.getQuery(),
uri.getFragment()
).toString();
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid RTSP URL: " + rtspUrl, e);
}
}为什么要注入凭据?
大部分设备返回的 RTSP URI 不包含认证信息:
rtsp://10.0.1.20:554/Streaming/Channels/101需要转换为:
rtsp://admin:password@10.0.1.20:554/Streaming/Channels/1015. GetSystemDateAndTime (心跳)
public void checkOnline(String ip, int port, String user, String pass) throws OnvifException {
String url = String.format("http://%s:%d/onvif/device_service", ip, port);
String body = "<tds:GetSystemDateAndTime xmlns:tds=\"http://www.onvif.org/ver10/device/wsdl\" />";
// 超时设置为 3 秒
Document doc = soapClient.sendSoapRequest(url, user, pass, body, 3000);
// 能成功调用即表示在线,无需解析返回值
}为什么选择 GetSystemDateAndTime 作为心跳?
- 轻量级:不涉及媒体资源,响应快
- 稳定性高:几乎所有设备都实现
- 无副作用:只读操作,不会改变设备状态
4.5 厂商差异处理
问题 1: Media XAddr 为空
部分低端设备 GetCapabilities 返回空地址。
解决方案:
String mediaXAddr = capabilities.getMediaXAddr();
if (StringUtils.isBlank(mediaXAddr)) {
// 降级使用 Device Service 地址
mediaXAddr = String.format("http://%s:%d/onvif/device_service", ip, port);
log.warn("Media XAddr empty, fallback to device service for camera {}", cameraId);
}问题 2: ProfileToken 格式不一致
- 海康:
Profile_1 - 大华:
MediaProfile000 - 宇视:
profile_1_h264
解决方案:
// 不依赖 token 格式,按索引或分辨率选择
OnvifProfile mainStream = profiles.stream()
.filter(p -> p.getWidth() >= 1280)
.findFirst()
.orElse(profiles.get(0));问题 3: 时间窗口错误
错误示例:
SOAP Fault: The security token could not be authenticated or authorized原因:服务器时间与设备时间误差 > 5 秒
解决方案:
@Scheduled(cron = "0 */30 * * * ?") // 每 30 分钟执行
public void syncServerTime() {
try {
ProcessBuilder pb = new ProcessBuilder("ntpdate", "ntp.aliyun.com");
Process process = pb.start();
process.waitFor(10, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("NTP sync failed", e);
}
}4.6 ONVIF 调用性能优化
1. 连接池复用
@Configuration
public class OnvifHttpConfig {
@Bean
public CloseableHttpClient httpClient() {
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
cm.setMaxTotal(200);
cm.setDefaultMaxPerRoute(50);
return HttpClients.custom()
.setConnectionManager(cm)
.setDefaultRequestConfig(
RequestConfig.custom()
.setConnectTimeout(3000)
.setSocketTimeout(5000)
.build()
)
.build();
}
}2. 设备信息缓存
@Cacheable(value = "device-info", key = "#cameraId", unless = "#result == null")
public DeviceInfo getDeviceInfo(Long cameraId) {
CameraDevice device = cameraRepository.findById(cameraId)
.orElseThrow(() -> new NotFoundException("Camera not found"));
return onvifClient.getDeviceInformation(
device.getIp(),
device.getPort(),
device.getUsername(),
device.getPassword()
);
}缓存策略:
- TTL: 24 小时
- 失效条件:设备固件升级或手动清除
3. 并发控制
private final Semaphore onvifSemaphore = new Semaphore(50);
public <T> T callOnvif(Supplier<T> supplier) throws Exception {
onvifSemaphore.acquire();
try {
return supplier.get();
} finally {
onvifSemaphore.release();
}
}原因:避免同时调用大量设备导致网络拥塞。
5. MediaMTX 流媒体网关集成
5.1 MediaMTX 核心概念
Path (路径)
Path 是 MediaMTX 中的核心抽象,代表一个独立的媒体流通道。每个 Path 包含:
- Source:流的来源 (RTSP URL、推流地址、publisher 等)
- Readers:消费该流的客户端连接
- State:路径状态 (idle、ready、publishing)
按需拉流 (Source On Demand)
paths:
cam_*:
source: rtsp://admin:pass@10.0.1.20:554/Streaming/Channels/101
sourceOnDemand: yes
sourceOnDemandStartTimeout: 10s
sourceOnDemandCloseAfter: 10s工作流程:
- 初始状态:Path 存在但未拉流
- 客户端连接:触发 MediaMTX 拉取源流
- 客户端断开:等待
closeAfter时长后释放资源
优势:
- 节省带宽 (无观看时不拉流)
- 降低服务器负载
- 支持大规模路径管理
按需转码 (Run On Demand)
paths:
cam_h265_*:
source: publisher
runOnDemand: ffmpeg -i $SOURCE_URL -c:v libx264 -f rtsp rtsp://127.0.0.1:$RTSP_PORT/$MTX_PATH
runOnDemandRestart: yes工作流程:
- 客户端连接触发命令执行
- FFmpeg 进程启动并推流到 MediaMTX
- 客户端断开后根据配置决定是否重启
环境变量:
$SOURCE_URL:可用于引用上游地址$RTSP_PORT:MediaMTX RTSP 端口$MTX_PATH:当前路径名
5.2 RESTful API 集成
MediaMTX 提供完整的 API 接口用于动态管理:
API 端点:
GET /v3/config/global/get:获取全局配置GET /v3/config/pathdefaults/get:获取路径默认配置POST /v3/config/paths/add/{name}:添加路径GET /v3/config/paths/get/{name}:获取路径配置POST /v3/config/paths/patch/{name}:更新路径配置POST /v3/config/paths/delete/{name}:删除路径GET /v3/config/paths/list:列出所有路径
路径配置结构:
{
"name": "cam_10_0_1_20_ch1_main",
"source": "rtsp://admin:pass@10.0.1.20:554/Streaming/Channels/101",
"sourceProtocol": "tcp",
"sourceOnDemand": true,
"sourceOnDemandStartTimeout": "10s",
"sourceOnDemandCloseAfter": "10s",
"record": false,
"recordPath": "",
"recordFormat": "fmp4",
"recordSegmentDuration": "10m",
"recordDeleteAfter": "7d"
}5.3 路径命名规范
命名模板:
cam_{ip}_{channel}_{subtype}示例:
cam_10_0_1_20_ch1_main:10.0.1.20 设备第 1 通道主码流cam_10_0_1_20_ch1_sub:10.0.1.20 设备第 1 通道子码流cam_10_0_1_21_ch2_main:10.0.1.21 设备第 2 通道主码流
命名规范优势:
- 可读性:从路径名直接识别设备和通道
- 唯一性:全局唯一标识符
- 模式匹配:支持通配符配置
paths:
cam_*_main:
# 所有主码流共享配置
cam_*_sub:
# 所有子码流共享配置5.4 路径注册实现
核心服务类:
@Service
@Slf4j
public class MediaPathService {
@Value("${media.mtx.apiBaseUrl}")
private String apiBaseUrl;
private final RestTemplate restTemplate;
/**
* 注册直连路径
*/
public void registerDirectPath(StreamPath path) {
Map<String, Object> config = new HashMap<>();
config.put("name", path.getPathName());
config.put("source", path.getSourceUrl());
config.put("sourceProtocol", "tcp");
config.put("sourceOnDemand", true);
config.put("sourceOnDemandStartTimeout", "10s");
config.put("sourceOnDemandCloseAfter", "10s");
addPath(path.getPathName(), config);
}
/**
* 注册转码路径
*/
public void registerTranscodePath(StreamPath path) {
String ffmpegCmd = buildFfmpegCommand(path.getSourceUrl());
Map<String, Object> config = new HashMap<>();
config.put("name", path.getPathName());
config.put("source", "publisher");
config.put("runOnDemand", ffmpegCmd);
config.put("runOnDemandRestart", true);
config.put("runOnDemandStartTimeout", "10s");
config.put("runOnDemandCloseAfter", "10s");
addPath(path.getPathName(), config);
}
/**
* 删除并重建路径 (确保配置完全生效)
*/
public void updatePath(StreamPath path) {
deletePath(path.getPathName());
if (path.getNeedTranscode() == 1) {
registerTranscodePath(path);
} else {
registerDirectPath(path);
}
}
private void addPath(String pathName, Map<String, Object> config) {
String url = apiBaseUrl + "/v3/config/paths/add/" + pathName;
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<Map<String, Object>> request = new HttpEntity<>(config, headers);
try {
ResponseEntity<String> response = restTemplate.postForEntity(url, request, String.class);
if (response.getStatusCode().is2xxSuccessful()) {
log.info("Path registered: {}", pathName);
} else {
throw new MediaGatewayException("Failed to register path: " + response.getBody());
}
} catch (Exception e) {
log.error("MediaMTX API error: {}", e.getMessage());
throw new MediaGatewayException("Path registration failed", e);
}
}
private void deletePath(String pathName) {
String url = apiBaseUrl + "/v3/config/paths/delete/" + pathName;
try {
restTemplate.delete(url);
log.info("Path deleted: {}", pathName);
} catch (HttpClientErrorException.NotFound e) {
// 路径不存在,忽略错误
} catch (Exception e) {
log.error("Failed to delete path {}: {}", pathName, e.getMessage());
}
}
private String buildFfmpegCommand(String sourceUrl) {
return String.format(
"ffmpeg -hide_banner -loglevel warning " +
"-rtsp_transport tcp " +
"-i \"%s\" " +
"-an " +
"-c:v libx264 -preset veryfast -crf 23 " +
"-tune zerolatency -pix_fmt yuv420p " +
"-g 50 -keyint_min 50 -sc_threshold 0 " +
"-x264-params repeat-headers=1 " +
"-f rtsp -rtsp_transport tcp " +
"\"rtsp://127.0.0.1:$RTSP_PORT/$MTX_PATH\"",
sourceUrl
);
}
}为什么使用 Delete + Add 而非 Patch?
- 配置完整性:Patch 可能保留旧配置片段,导致状态不一致
- 立即生效:Delete + Add 强制重建,确保新配置立刻生效
- 语义清晰:每次更新等价于发布新版本
风险与缓解:
- 短暂中断:删除与添加之间有窗口期
- 缓解措施:
- 在低峰期批量变更
- 添加失败时回滚到旧配置
- 关键路径增加重试机制
5.5 播放地址生成
RTSP 地址:
rtsp://{MediaMTX_IP}:{RTSP_Port}/{path_name}示例:
rtsp://10.0.10.1:8554/cam_10_0_1_20_ch1_mainHLS 地址:
http://{MediaMTX_IP}:{HLS_Port}/{path_name}/index.m3u8示例:
http://10.0.10.1:8888/cam_10_0_1_20_ch1_main/index.m3u8WebRTC 地址:
http://{MediaMTX_IP}:{WebRTC_Port}/{path_name}示例:
http://10.0.10.1:8889/cam_10_0_1_20_ch1_main播放器选择建议:
| 场景 | 协议 | 延迟 | 兼容性 | 推荐播放器 |
|---|---|---|---|---|
| PC 浏览器实时预览 | WebRTC | 500ms | Chrome/Edge | WebRTC.js |
| 移动端 H5 | HLS | 3-5s | iOS/Android | video.js |
| 客户端应用 | RTSP | 1-2s | 需原生支持 | VLC/ffplay |
| 录像回放 | HLS | 不关键 | 全平台 | video.js |
6. FFmpeg 编解码治理
6.1 编码探测策略
为什么需要探测?
- 浏览器兼容性:Safari 不支持 H.265,Chrome 90+ 才支持
- 成本控制:H.264 可直连,H.265 需转码 (CPU 密集)
- 质量保障:探测失败的流不应接入系统
探测流程:
@Service
@Slf4j
public class CodecProbeService {
@Value("${codec.probe.ffprobe-bin:ffprobe}")
private String ffprobeBin;
@Value("${codec.probe.timeout-ms:3000}")
private int timeoutMs;
private final Semaphore probeSemaphore = new Semaphore(8);
public CodecInfo probe(String rtspUrl) {
try {
probeSemaphore.acquire();
return doProbe(rtspUrl);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return CodecInfo.unknown();
} finally {
probeSemaphore.release();
}
}
private CodecInfo doProbe(String rtspUrl) {
List<String> command = Arrays.asList(
ffprobeBin,
"-v", "error",
"-rtsp_transport", "tcp",
"-analyzeduration", "1000000",
"-probesize", "32768",
"-select_streams", "v:0",
"-show_entries", "stream=codec_name,width,height,avg_frame_rate",
"-of", "json",
rtspUrl
);
ProcessBuilder pb = new ProcessBuilder(command);
pb.redirectErrorStream(true);
try {
Process process = pb.start();
boolean finished = process.waitFor(timeoutMs, TimeUnit.MILLISECONDS);
if (!finished) {
process.destroyForcibly();
log.warn("ffprobe timeout for {}", maskUrl(rtspUrl));
return CodecInfo.unknown();
}
if (process.exitValue() != 0) {
log.warn("ffprobe failed for {}", maskUrl(rtspUrl));
return CodecInfo.unknown();
}
String output = IOUtils.toString(process.getInputStream(), StandardCharsets.UTF_8);
return parseProbeOutput(output);
} catch (Exception e) {
log.error("ffprobe error for {}: {}", maskUrl(rtspUrl), e.getMessage());
return CodecInfo.unknown();
}
}
private CodecInfo parseProbeOutput(String json) {
try {
JSONObject obj = JSON.parseObject(json);
JSONArray streams = obj.getJSONArray("streams");
if (streams == null || streams.isEmpty()) {
return CodecInfo.unknown();
}
JSONObject stream = streams.getJSONObject(0);
String codecName = stream.getString("codec_name");
int width = stream.getIntValue("width");
int height = stream.getIntValue("height");
String frameRate = stream.getString("avg_frame_rate");
return CodecInfo.builder()
.codec(normalizeCodec(codecName))
.width(width)
.height(height)
.frameRate(parseFrameRate(frameRate))
.build();
} catch (Exception e) {
log.error("Failed to parse ffprobe output", e);
return CodecInfo.unknown();
}
}
private String normalizeCodec(String codecName) {
if (codecName == null) return "UNKNOWN";
String lower = codecName.toLowerCase();
if (lower.contains("h264") || lower.equals("avc")) return "H264";
if (lower.contains("h265") || lower.equals("hevc")) return "H265";
if (lower.contains("mjpeg")) return "MJPEG";
return "UNKNOWN";
}
private double parseFrameRate(String frameRate) {
if (frameRate == null || frameRate.isEmpty()) return 0.0;
try {
if (frameRate.contains("/")) {
String[] parts = frameRate.split("/");
double num = Double.parseDouble(parts[0]);
double den = Double.parseDouble(parts[1]);
return den == 0 ? 0 : num / den;
}
return Double.parseDouble(frameRate);
} catch (Exception e) {
return 0.0;
}
}
private String maskUrl(String url) {
return url.replaceAll("://[^@]+@", "://***:***@");
}
}并发控制为什么必要?
批量接入 500 路相机时,如果不限制并发:
- CPU 负载瞬间飙升至 100%
- 磁盘 I/O 饱和 (ffprobe 需要写临时文件)
- 导致正常业务请求超时
推荐并发数:
- 4 核 8G:并发 4-8
- 8 核 16G:并发 8-16
- 16 核 32G:并发 16-32
6.2 转码决策引擎
决策矩阵:
| 编码格式 | 浏览器兼容性 | 决策 | 备注 |
|---|---|---|---|
| H264 | ✅ 全兼容 | 直连 | 无需转码 |
| H265 | ⚠️ 部分兼容 | 转码 | Chrome 90+, Safari 不支持 |
| MJPEG | ⚠️ 兼容但低效 | 转码 | 带宽占用高 |
| UNKNOWN | ❌ 未知 | 保守转码 | 探测失败兜底 |
实现代码:
@Service
public class CodecDecisionService {
@Value("${codec.transcode.enabled:true}")
private boolean transcodeEnabled;
@Value("${codec.transcode.assume-h265-when-unknown:true}")
private boolean assumeH265WhenUnknown;
public boolean needTranscode(String codec) {
if (!transcodeEnabled) {
return false;
}
if (codec == null || "UNKNOWN".equals(codec)) {
return assumeH265WhenUnknown;
}
return "H265".equals(codec) || "HEVC".equals(codec) || "MJPEG".equals(codec);
}
public TranscodeStrategy selectStrategy(CodecInfo info) {
if (!needTranscode(info.getCodec())) {
return TranscodeStrategy.DIRECT;
}
// 根据分辨率选择转码参数
if (info.getWidth() >= 1920 || info.getHeight() >= 1080) {
return TranscodeStrategy.HIGH_QUALITY;
} else if (info.getWidth() >= 1280 || info.getHeight() >= 720) {
return TranscodeStrategy.MEDIUM_QUALITY;
} else {
return TranscodeStrategy.LOW_QUALITY;
}
}
}
public enum TranscodeStrategy {
DIRECT(null, null, null),
HIGH_QUALITY("veryfast", "23", "50"),
MEDIUM_QUALITY("faster", "25", "60"),
LOW_QUALITY("fast", "28", "90");
private final String preset;
private final String crf;
private final String gop;
// constructor and getters
}6.3 转码命令构建
基础转码命令:
public class FfmpegCommandBuilder {
public String buildTranscodeCommand(String sourceUrl, TranscodeStrategy strategy) {
StringBuilder cmd = new StringBuilder();
cmd.append("ffmpeg -hide_banner -loglevel warning ");
cmd.append("-rtsp_transport tcp ");
cmd.append("-stimeout 5000000 ");
cmd.append("-probesize 5000000 ");
cmd.append("-analyzeduration 5000000 ");
cmd.append("-i \"").append(sourceUrl).append("\" ");
// 禁用音频
cmd.append("-an ");
// 视频编码参数
cmd.append("-c:v libx264 ");
cmd.append("-preset ").append(strategy.getPreset()).append(" ");
cmd.append("-crf ").append(strategy.getCrf()).append(" ");
cmd.append("-tune zerolatency ");
cmd.append("-pix_fmt yuv420p ");
cmd.append("-g ").append(strategy.getGop()).append(" ");
cmd.append("-keyint_min ").append(strategy.getGop()).append(" ");
cmd.append("-sc_threshold 0 ");
cmd.append("-x264-params repeat-headers=1 ");
// 输出格式
cmd.append("-f rtsp -rtsp_transport tcp ");
cmd.append("\"rtsp://127.0.0.1:$RTSP_PORT/$MTX_PATH\"");
return cmd.toString();
}
}硬件加速版本 (NVENC):
public String buildNvencCommand(String sourceUrl) {
StringBuilder cmd = new StringBuilder();
cmd.append("ffmpeg -hide_banner -loglevel warning ");
cmd.append("-hwaccel cuda -hwaccel_output_format cuda ");
cmd.append("-rtsp_transport tcp ");
cmd.append("-i \"").append(sourceUrl).append("\" ");
cmd.append("-an ");
cmd.append("-c:v h264_nvenc ");
cmd.append("-preset p4 ");
cmd.append("-b:v 3000k ");
cmd.append("-g 50 ");
cmd.append("-f rtsp -rtsp_transport tcp ");
cmd.append("\"rtsp://127.0.0.1:$RTSP_PORT/$MTX_PATH\"");
return cmd.toString();
}硬件加速检测:
public boolean isNvencAvailable() {
try {
ProcessBuilder pb = new ProcessBuilder("ffmpeg", "-encoders");
Process process = pb.start();
String output = IOUtils.toString(process.getInputStream(), StandardCharsets.UTF_8);
return output.contains("h264_nvenc");
} catch (Exception e) {
return false;
}
}参数调优指南:
| 参数 | 推荐值 | 说明 | 影响 |
|---|---|---|---|
-preset | veryfast | 编码速度预设 | 速度 ↑ 质量 ↓ |
-crf | 23 | 恒定质量因子 | 值越小质量越高 |
-tune | zerolatency | 低延迟优化 | 减少缓冲延迟 |
-g | 50 | GOP 大小 | 影响跳转精度 |
-b:v | 3000k | 码率 | 带宽与质量平衡 |
7. 相机接入完整流程
7.1 接入时序图
[前端] → [接口层] → [业务层] → [ONVIF层] → [设备]
│ │ │ │
│ │ │ └─→ GetDeviceInformation
│ │ │ └─→ GetCapabilities
│ │ │ └─→ GetProfiles
│ │ │ └─→ GetStreamUri (per profile)
│ │ │
│ │ └─→ [数据层] 保存设备主档
│ │ └─→ [异步任务] 流同步
│ │ │
│ │ └─→ FFprobe 探测编码
│ │ └─→ 保存流路径
│ │ └─→ MediaMTX 注册路径
│ │
│ └─→ 返回接入结果7.2 接入请求模型
@Data
public class CameraAddRequest {
@NotBlank(message = "IP不能为空")
private String ip;
@Min(value = 1, message = "端口范围 1-65535")
@Max(value = 65535, message = "端口范围 1-65535")
private Integer port = 80;
@NotBlank(message = "用户名不能为空")
private String username;
@NotBlank(message = "密码不能为空")
private String password;
private String cameraName;
private Long groupId;
/**
* 0: 标准 ONVIF 流程
* 1: 厂商兜底模式 (跳过 ONVIF,使用固定规则)
*/
private Integer skipOnvif = 0;
/**
* 厂商类型 (skipOnvif=1 时有效)
* 支持: HIKVISION, DAHUA, UNIVIEW
*/
private String vendorType;
}7.3 接入服务实现
@Service
@Slf4j
public class CameraOnboardingService {
@Resource
private CameraDeviceRepository deviceRepository;
@Resource
private OnvifManager onvifManager;
@Resource
private StreamPathService streamPathService;
@Resource
private CameraGroupService groupService;
@Transactional(rollbackFor = Exception.class)
public CameraDevice addCamera(CameraAddRequest request) {
// 1. 重复性校验
if (deviceRepository.existsByIpAndIsDeleted(request.getIp(), 0)) {
throw new BusinessException("设备已存在: " + request.getIp());
}
// 2. 构建设备实体
CameraDevice device = new CameraDevice();
BeanUtils.copyProperties(request, device);
// 设置默认名称
if (StringUtils.isBlank(device.getCameraName())) {
device.setCameraName(device.getIp());
}
// 3. ONVIF 信息获取
if (request.getSkipOnvif() == 0) {
try {
OnvifDeviceDetail detail = onvifManager.getDeviceInformation(
device.getIp(),
device.getPort(),
device.getUsername(),
device.getPassword()
);
device.setManufacturer(detail.getManufacturer());
device.setModel(detail.getModel());
device.setSerialNumber(detail.getSerialNumber());
device.setFirmwareVersion(detail.getFirmwareVersion());
} catch (OnvifException e) {
log.error("ONVIF failed for {}, fallback to basic mode", request.getIp(), e);
device.setManufacturer("UNKNOWN");
device.setModel("UNKNOWN");
}
} else {
// 厂商兜底模式
device.setManufacturer(request.getVendorType());
device.setModel("Generic");
}
// 4. 设置初始状态
device.setCameraStatus(1);
device.setLastOnlineTime(LocalDateTime.now());
// 5. 解析分组
if (request.getGroupId() == null) {
device.setGroupId(groupService.getDefaultGroupId());
}
// 6. 保存设备
deviceRepository.save(device);
// 7. 异步同步流路径
CompletableFuture.runAsync(() -> {
try {
streamPathService.syncStreamPaths(device, request);
} catch (Exception e) {
log.error("Stream sync failed for camera {}", device.getId(), e);
}
});
return device;
}
}事务边界设计:
- 同步部分 (在事务内):
- 设备主档写入
- 基础验证
- 异步部分 (事务外):
- 流路径同步
- MediaMTX 注册
为什么异步?
- ONVIF 调用可能超时 (3-5秒)
- FFprobe 探测耗时 (2-3秒/路)
- 避免接口长时间阻塞
7.4 流同步服务
标准 ONVIF 模式:
@Service
@Slf4j
public class StreamPathService {
@Resource
private StreamPathRepository pathRepository;
@Resource
private OnvifManager onvifManager;
@Resource
private CodecProbeService codecProbeService;
@Resource
private CodecDecisionService codecDecisionService;
@Resource
private MediaPathService mediaPathService;
@Transactional(rollbackFor = Exception.class)
public void syncStreamPaths(CameraDevice device, CameraAddRequest request) {
if (request.getSkipOnvif() == 1) {
syncStreamPathsVendorFallback(device, request);
return;
}
// 1. 获取 Profiles
List<OnvifProfile> profiles = onvifManager.getProfiles(
device.getIp(),
device.getPort(),
device.getUsername(),
device.getPassword()
);
if (profiles.isEmpty()) {
throw new BusinessException("No profiles found for camera: " + device.getIp());
}
// 2. 删除旧路径
pathRepository.deleteAllByCameraId(device.getId());
// 3. 处理每个 Profile
for (OnvifProfile profile : profiles) {
try {
processProfile(device, profile);
} catch (Exception e) {
log.error("Failed to process profile {} for camera {}",
profile.getToken(), device.getId(), e);
}
}
}
private void processProfile(CameraDevice device, OnvifProfile profile) {
// 1. 获取 RTSP URI
String rtspUri = onvifManager.getStreamUri(
device.getIp(),
device.getPort(),
device.getUsername(),
device.getPassword(),
profile.getToken()
);
if (StringUtils.isBlank(rtspUri)) {
log.warn("Empty RTSP URI for profile {}", profile.getToken());
return;
}
// 2. 注入凭据
String sourceUrl = injectCredentials(rtspUri, device.getUsername(), device.getPassword());
// 3. 探测编码
CodecInfo codecInfo = codecProbeService.probe(sourceUrl);
String codec = codecInfo.getCodec();
boolean needTranscode = codecDecisionService.needTranscode(codec);
// 4. 构建路径名
int channel = extractChannel(profile.getVideoSourceToken());
int subtype = detectSubtype(profile.getWidth(), profile.getName());
String pathName = buildPathName(device.getIp(), channel, subtype);
// 5. 保存路径记录
StreamPath path = StreamPath.builder()
.cameraId(device.getId())
.pathName(pathName)
.sourceUrl(sourceUrl)
.profileToken(profile.getToken())
.profileName(profile.getName())
.videoEncoding(profile.getEncoding())
.videoCodec(codec)
.resolution(profile.getWidth() + "x" + profile.getHeight())
.subtype(subtype)
.channel(channel)
.needTranscode(needTranscode ? 1 : 0)
.enabled(1)
.status(1)
.build();
pathRepository.save(path);
// 6. 注册到 MediaMTX
try {
mediaPathService.updatePath(path);
log.info("Path registered: {}", pathName);
} catch (Exception e) {
log.error("MediaMTX registration failed for {}", pathName, e);
path.setStatus(0);
pathRepository.updateById(path);
}
}
private String buildPathName(String ip, int channel, int subtype) {
String ipNormalized = ip.replace(".", "_");
String subtypeStr = (subtype == 0) ? "main" : "sub";
return String.format("cam_%s_ch%d_%s", ipNormalized, channel, subtypeStr);
}
private int extractChannel(String sourceToken) {
if (sourceToken == null) return 1;
Matcher m = Pattern.compile("\\d+").matcher(sourceToken);
return m.find() ? Integer.parseInt(m.group()) : 1;
}
private int detectSubtype(int width, String name) {
// 主码流判断
if (width >= 1280) return 0;
// 名称包含 sub/stream2 等关键字
if (name != null) {
String lower = name.toLowerCase();
if (lower.contains("sub") || lower.contains("stream2")) {
return 1;
}
}
// 默认子码流
return 1;
}
private String injectCredentials(String rtspUrl, String user, String pass) {
try {
URI uri = new URI(rtspUrl);
String userInfo = URLEncoder.encode(user, "UTF-8") + ":" +
URLEncoder.encode(pass, "UTF-8");
return new URI(
uri.getScheme(),
userInfo,
uri.getHost(),
uri.getPort(),
uri.getPath(),
uri.getQuery(),
uri.getFragment()
).toString();
} catch (Exception e) {
throw new IllegalArgumentException("Invalid RTSP URL: " + rtspUrl, e);
}
}
}厂商兜底模式:
private void syncStreamPathsVendorFallback(CameraDevice device, CameraAddRequest request) {
String vendor = request.getVendorType();
if (!"HIKVISION".equalsIgnoreCase(vendor)) {
throw new BusinessException("Unsupported vendor fallback: " + vendor);
}
// 海康固定规则
List<String> rtspTemplates = Arrays.asList(
"rtsp://{ip}:554/Streaming/Channels/101", // 主码流
"rtsp://{ip}:554/Streaming/Channels/102" // 子码流
);
pathRepository.deleteAllByCameraId(device.getId());
for (int i = 0; i < rtspTemplates.size(); i++) {
String template = rtspTemplates.get(i);
String rtspUrl = template.replace("{ip}", device.getIp());
String sourceUrl = injectCredentials(rtspUrl, device.getUsername(), device.getPassword());
CodecInfo codecInfo = codecProbeService.probe(sourceUrl);
String codec = codecInfo.getCodec();
boolean needTranscode = codecDecisionService.needTranscode(codec);
int subtype = i; // 0=main, 1=sub
String pathName = buildPathName(device.getIp(), 1, subtype);
StreamPath path = StreamPath.builder()
.cameraId(device.getId())
.pathName(pathName)
.sourceUrl(sourceUrl)
.videoCodec(codec)
.subtype(subtype)
.channel(1)
.needTranscode(needTranscode ? 1 : 0)
.enabled(1)
.status(1)
.build();
pathRepository.save(path);
mediaPathService.updatePath(path);
}
}7.5 接入失败处理
失败分类:
- ONVIF 连接失败
- 原因:网络不通、端口错误、服务未启动
- 处理:返回明确错误信息,不创建设备记录
- 认证失败
- 原因:用户名密码错误、权限不足
- 处理:提示用户检查凭据
- 能力不足
- 原因:设备不支持 Media Service
- 处理:降级到厂商兜底模式
- 流探测失败
- 原因:RTSP 不可达、编码异常
- 处理:标记路径状态为失败,允许后续重试
错误示例:
{
"code": 400,
"message": "设备接入失败",
"details": {
"step": "ONVIF_DEVICE_INFO",
"error": "Connection timeout",
"suggestion": "请检查设备网络连接和 ONVIF 服务是否启动"
}
}8. 设备状态巡检与事件闭环
8.1 在线巡检机制
巡检策略:
@Component
@Slf4j
public class CameraHealthCheckTask {
@Resource
private CameraDeviceRepository deviceRepository;
@Resource
private OnvifManager onvifManager;
@Resource
private CameraOfflineEventService offlineEventService;
@Value("${camera.health.check-interval-ms:30000}")
private long checkInterval;
@Scheduled(fixedDelayString = "${camera.health.check-interval-ms:30000}")
public void checkAllDevices() {
List<CameraDevice> devices = deviceRepository.findAllActive();
log.info("Starting health check for {} devices", devices.size());
for (CameraDevice device : devices) {
try {
checkSingleDevice(device);
} catch (Exception e) {
log.error("Health check failed for device {}", device.getId(), e);
}
}
}
private void checkSingleDevice(CameraDevice device) {
boolean online = isOnline(device);
LocalDateTime now = LocalDateTime.now();
if (online) {
handleOnline(device, now);
} else {
handleOffline(device, now);
}
}
private boolean isOnline(CameraDevice device) {
try {
onvifManager.getSystemDateAndTime(
device.getIp(),
device.getPort(),
device.getUsername(),
device.getPassword()
);
return true;
} catch (Exception e) {
log.debug("Device {} offline: {}", device.getId(), e.getMessage());
return false;
}
}
@Transactional(rollbackFor = Exception.class)
protected void handleOnline(CameraDevice device, LocalDateTime checkTime) {
// 状态变化:离线 -> 在线
if (device.getCameraStatus() == 0) {
device.setCameraStatus(1);
device.setLastOnlineTime(checkTime);
deviceRepository.updateById(device);
// 闭环离线事件
offlineEventService.resolveEvent(device.getId(), checkTime);
log.info("Device {} back online", device.getId());
} else {
// 刷新在线时间
device.setLastOnlineTime(checkTime);
deviceRepository.updateById(device);
}
}
@Transactional(rollbackFor = Exception.class)
protected void handleOffline(CameraDevice device, LocalDateTime checkTime) {
// 状态变化:在线 -> 离线
if (device.getCameraStatus() == 1) {
device.setCameraStatus(0);
deviceRepository.updateById(device);
// 创建离线事件
offlineEventService.createEvent(device, checkTime);
log.warn("Device {} went offline", device.getId());
} else {
// 更新离线事件
offlineEventService.updateEvent(device.getId(), checkTime);
}
}
}为什么必须异常隔离?
如果单台设备异常导致整批巡检中断:
- 其他设备状态陈旧
- 离线事件无法及时创建
- 运维告警失效
8.2 离线事件管理
@Service
@Slf4j
public class CameraOfflineEventService {
@Resource
private CameraOfflineEventRepository eventRepository;
@Transactional(rollbackFor = Exception.class)
public void createEvent(CameraDevice device, LocalDateTime startTime) {
CameraOfflineEvent event = CameraOfflineEvent.builder()
.deviceId(device.getId())
.deviceIp(device.getIp())
.groupId(device.getGroupId())
.startTime(startTime)
.lastCheckTime(startTime)
.isResolved(0)
.alarmStatus(0)
.build();
eventRepository.save(event);
log.info("Offline event created for device {}", device.getId());
}
@Transactional(rollbackFor = Exception.class)
public void updateEvent(Long deviceId, LocalDateTime checkTime) {
CameraOfflineEvent event = eventRepository.findLatestByDeviceId(deviceId);
if (event == null || event.getIsResolved() == 1) {
// 事件已闭环或不存在,不应到达此分支
log.warn("No active offline event for device {}", deviceId);
return;
}
event.setLastCheckTime(checkTime);
long duration = Duration.between(event.getStartTime(), checkTime).getSeconds();
event.setDuration(duration);
eventRepository.updateById(event);
}
@Transactional(rollbackFor = Exception.class)
public void resolveEvent(Long deviceId, LocalDateTime endTime) {
CameraOfflineEvent event = eventRepository.findLatestByDeviceId(deviceId);
if (event == null || event.getIsResolved() == 1) {
return;
}
event.setEndTime(endTime);
event.setIsResolved(1);
long duration = Duration.between(event.getStartTime(), endTime).getSeconds();
event.setDuration(duration);
eventRepository.updateById(event);
log.info("Offline event resolved for device {}, duration: {}s", deviceId, duration);
}
/**
* 查询当前未恢复的离线设备
*/
public List<CameraOfflineEvent> listActiveEvents() {
return eventRepository.findByIsResolved(0);
}
/**
* 按时间范围查询离线历史
*/
public List<CameraOfflineEvent> queryHistory(LocalDateTime start, LocalDateTime end) {
return eventRepository.findByStartTimeBetween(start, end);
}
}事件状态机:
[设备在线] --检测离线--> [创建事件] (is_resolved=0)
│ │
│ ├--持续离线--> [更新 duration]
│ │
│ └--恢复在线--> [闭环事件] (is_resolved=1, end_time)
│
└--始终在线--> [无事件]离线率统计示例:
public OfflineStatistics calculateOfflineRate(LocalDateTime start, LocalDateTime end) {
List<CameraOfflineEvent> events = queryHistory(start, end);
long totalDuration = events.stream()
.mapToLong(CameraOfflineEvent::getDuration)
.sum();
long windowDuration = Duration.between(start, end).getSeconds();
double offlineRate = (double) totalDuration / windowDuration * 100;
return OfflineStatistics.builder()
.eventCount(events.size())
.totalOfflineDuration(totalDuration)
.offlineRate(offlineRate)
.build();
}贡献者
flycodeu
版权所有
版权归属:flycodeu
