从 0 到 1 构建企业级视频智能分析平台:完整工程实践(三)

从 0 到 1 构建企业级视频智能分析平台:完整工程实践
一份可直接落地的全栈技术方案 覆盖相机接入、流媒体编排、存储治理、智能分析全链路 包含架构设计、核心代码、数据模型、运维体系与性能基准
目录
第一部分:平台基础与相机接入
第二部分:录像存储系统
第三部分:智能分析平台
第四部分:运维与演进
第三部分:智能分析平台
13. 算法任务模型设计
13.1 任务模型的核心挑战
问题 1:一个任务一个算法的局限性
真实场景:
- 厂区门口需要同时检测:人员入侵、车辆违停、未戴安全帽
- 传统方案:创建 3 个任务,拉流 3 次,浪费带宽和算力
问题 2:多场景任务的状态复杂性
- 同一任务的不同场景可能分配到不同算法节点
- 需要同时维护任务级状态和场景级状态
- 部分场景失败不应影响其他场景
13.2 任务表设计
DROP TABLE IF EXISTS `ai_task`;
CREATE TABLE `ai_task` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`task_name` VARCHAR(255) DEFAULT NULL COMMENT '任务名称',
`camera_id` BIGINT DEFAULT NULL COMMENT '相机ID',
`group_id` BIGINT DEFAULT NULL COMMENT '分组ID',
`camera_name` VARCHAR(255) DEFAULT NULL COMMENT '相机名称(冗余)',
`stream_path_id` BIGINT DEFAULT NULL COMMENT '流路径ID',
-- 流信息
`rtsp_url` VARCHAR(1024) DEFAULT NULL COMMENT 'RTSP源地址',
`stream_path_name` VARCHAR(255) DEFAULT NULL COMMENT '流路径名',
`stream_type` TINYINT DEFAULT 0 COMMENT '0主码流 1子码流',
-- 算法配置
`scene_codes` JSON DEFAULT NULL COMMENT '场景编码数组',
`region_json` TEXT COMMENT '检测区域',
`algorithm_params` TEXT COMMENT '算法参数',
-- 调度配置
`dispatch_mode` VARCHAR(20) DEFAULT 'MIXED' COMMENT 'MIXED/AUTO_SINGLE/AUTO_SPLIT/FIXED',
`prefer_node_id` BIGINT DEFAULT NULL COMMENT '指定节点ID(FIXED模式)',
-- 检测参数
`conf_threshold` FLOAT DEFAULT 0.6 COMMENT '置信度阈值',
`alarm_enabled` TINYINT DEFAULT 1 COMMENT '是否告警',
`level` INT DEFAULT 1 COMMENT '告警等级',
-- 视频处理参数
`input_fps` INT DEFAULT 15 COMMENT '输入帧率',
`output_fps` INT DEFAULT 15 COMMENT '输出帧率',
`output_width` INT DEFAULT 1280 COMMENT '输出宽度',
`output_height` INT DEFAULT 720 COMMENT '输出高度',
`output_bitrate` VARCHAR(20) DEFAULT '3000k' COMMENT '输出码率',
`process_quality` VARCHAR(20) DEFAULT 'MEDIUM' COMMENT 'LOW/MEDIUM/HIGH',
-- 快照配置
`snapshot_policy` VARCHAR(16) DEFAULT 'ALARM' COMMENT 'OFF/ALL/ALARM',
`snapshot_quality` INT DEFAULT 80 COMMENT '快照质量(1-100)',
-- 推送配置
`push_enabled` TINYINT DEFAULT 1 COMMENT '是否推送',
`push_mode` VARCHAR(32) DEFAULT 'LEVEL_DEFAULT' COMMENT '推送模式',
`push_threshold_count` INT DEFAULT NULL COMMENT '推送阈值次数',
`push_window_seconds` INT DEFAULT NULL COMMENT '推送窗口秒数',
`push_cooldown_seconds` INT DEFAULT 0 COMMENT '推送冷却秒数',
`push_route_key` VARCHAR(64) DEFAULT NULL COMMENT '推送路由键',
-- 运行状态
`status` TINYINT DEFAULT 0 COMMENT '0停止 1运行 2异常',
`error_count` INT DEFAULT 0 COMMENT '错误次数',
`error_message` VARCHAR(1024) DEFAULT NULL COMMENT '错误消息',
`last_run_time` DATETIME DEFAULT NULL COMMENT '最后运行时间',
`last_keepalive_time` DATETIME DEFAULT NULL COMMENT '最后心跳时间',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `idx_camera_status` (`camera_id`, `status`),
KEY `idx_group_id` (`group_id`),
KEY `idx_dispatch_mode` (`dispatch_mode`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='算法任务表';JSON 字段示例:
{
"scene_codes": ["REGION_INTRUSION", "HELMET_DETECTION", "VEHICLE_PARKING"]
}13.3 场景分类表
DROP TABLE IF EXISTS `ai_scene_category`;
CREATE TABLE `ai_scene_category` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`category_name` VARCHAR(64) NOT NULL COMMENT '分类名称',
`category_code` VARCHAR(64) NOT NULL COMMENT '分类编码',
`icon` VARCHAR(128) DEFAULT NULL COMMENT '图标',
`sort_order` INT DEFAULT 0 COMMENT '排序',
`status` TINYINT DEFAULT 1 COMMENT '状态',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_category_code` (`category_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='场景分类表';示例数据:
INSERT INTO ai_scene_category (category_name, category_code, sort_order) VALUES
('安全管理', 'SAFETY', 1),
('行为识别', 'BEHAVIOR', 2),
('车辆管理', 'VEHICLE', 3),
('环境监测', 'ENVIRONMENT', 4);13.4 场景类型表
DROP TABLE IF EXISTS `ai_scene_type`;
CREATE TABLE `ai_scene_type` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`category_id` BIGINT NOT NULL COMMENT '分类ID',
`scene_name` VARCHAR(100) NOT NULL COMMENT '场景名称',
`scene_code` VARCHAR(64) NOT NULL COMMENT '场景编码',
`algorithm_type` VARCHAR(32) DEFAULT NULL COMMENT '算法类型',
`model_path` VARCHAR(255) DEFAULT NULL COMMENT '模型路径',
`default_params` JSON DEFAULT NULL COMMENT '默认参数',
`description` VARCHAR(500) DEFAULT NULL COMMENT '描述',
`icon` VARCHAR(128) DEFAULT NULL COMMENT '图标',
`status` TINYINT DEFAULT 1 COMMENT '状态',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_scene_code` (`scene_code`),
KEY `idx_category_status` (`category_id`, `status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='场景类型表';示例数据:
INSERT INTO ai_scene_type (category_id, scene_name, scene_code, algorithm_type, default_params) VALUES
(1, '区域入侵检测', 'REGION_INTRUSION', 'object_detection',
'{"problem_labels":["person","vehicle"],"conf_threshold":0.6}'),
(1, '安全帽检测', 'HELMET_DETECTION', 'classification',
'{"problem_labels":["no_helmet"],"conf_threshold":0.7}'),
(3, '车辆违停检测', 'VEHICLE_PARKING', 'object_detection',
'{"problem_labels":["car","truck"],"conf_threshold":0.6}');13.5 任务请求模型
@Data
public class AiTaskUpsertRequest {
private Long id;
private String taskName;
// 相机绑定
private Long cameraId;
private Long streamPathId;
private String rtspUrl;
private Integer streamType; // 0主码流 1子码流
// 场景配置 (核心)
private List<String> sceneCodes;
// 检测区域
private String regionJson;
private String algorithmParams;
// 调度配置
private String dispatchMode; // MIXED/AUTO_SINGLE/AUTO_SPLIT/FIXED
private Long preferNodeId;
// 检测参数
private Float confThreshold;
private Integer alarmEnabled;
private Integer level;
// 视频处理
private Integer inputFps;
private Integer outputFps;
private Integer outputWidth;
private Integer outputHeight;
private String outputBitrate;
private String processQuality;
// 快照配置
private String snapshotPolicy;
private Integer snapshotQuality;
// 推送配置
private Integer pushEnabled;
private String pushMode;
private Integer pushThresholdCount;
private Integer pushWindowSeconds;
private Integer pushCooldownSeconds;
private String pushRouteKey;
}13.6 任务创建服务
@Service
@Slf4j
public class AiTaskService {
@Resource
private AiTaskRepository taskRepository;
@Resource
private CameraDeviceRepository cameraRepository;
@Resource
private StreamPathRepository streamPathRepository;
@Transactional(rollbackFor = Exception.class)
public AiTask createTask(AiTaskUpsertRequest request) {
// 1. 参数规范化
normalizeRequest(request);
// 2. 查询相机信息
CameraDevice camera = cameraRepository.findById(request.getCameraId())
.orElseThrow(() -> new NotFoundException("相机不存在"));
// 3. 解析流路径
StreamPath streamPath = resolveStreamPath(request);
// 4. 构建任务实体
AiTask task = new AiTask();
BeanUtils.copyProperties(request, task);
task.setCameraName(camera.getCameraName());
task.setGroupId(camera.getGroupId());
task.setStreamPathName(streamPath.getPathName());
task.setRtspUrl(streamPath.getSourceUrl());
task.setStatus(0); // 初始停止状态
// 5. 保存
taskRepository.save(task);
log.info("Task created: id={}, camera={}, scenes={}",
task.getId(), task.getCameraId(), task.getSceneCodes());
return task;
}
private void normalizeRequest(AiTaskUpsertRequest request) {
// 场景编码去重
if (request.getSceneCodes() != null) {
request.setSceneCodes(
request.getSceneCodes().stream()
.filter(StringUtils::isNotBlank)
.distinct()
.collect(Collectors.toList())
);
}
// 默认值设置
if (request.getInputFps() == null) request.setInputFps(15);
if (request.getOutputFps() == null) request.setOutputFps(request.getInputFps());
if (request.getOutputWidth() == null) request.setOutputWidth(1280);
if (request.getOutputHeight() == null) request.setOutputHeight(720);
if (request.getOutputBitrate() == null) request.setOutputBitrate("3000k");
if (request.getProcessQuality() == null) request.setProcessQuality("MEDIUM");
if (request.getDispatchMode() == null) request.setDispatchMode("MIXED");
if (request.getConfThreshold() == null) request.setConfThreshold(0.6f);
if (request.getAlarmEnabled() == null) request.setAlarmEnabled(1);
if (request.getSnapshotPolicy() == null) request.setSnapshotPolicy("ALARM");
if (request.getSnapshotQuality() == null) request.setSnapshotQuality(80);
}
private StreamPath resolveStreamPath(AiTaskUpsertRequest request) {
if (request.getStreamPathId() != null) {
return streamPathRepository.findById(request.getStreamPathId())
.orElseThrow(() -> new NotFoundException("流路径不存在"));
}
// 按类型查找
Integer subtype = (request.getStreamType() != null && request.getStreamType() == 1) ? 1 : 0;
return streamPathRepository.findByCameraIdAndSubtype(request.getCameraId(), subtype)
.orElseThrow(() -> new NotFoundException("未找到流路径"));
}
}14. 算法节点管理与调度
14.1 算法节点表
DROP TABLE IF EXISTS `ai_engine_node`;
CREATE TABLE `ai_engine_node` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`node_id` VARCHAR(100) NOT NULL COMMENT '节点唯一标识',
`node_name` VARCHAR(100) DEFAULT NULL COMMENT '节点名称',
`ip` VARCHAR(64) NOT NULL COMMENT 'IP地址',
`port` INT DEFAULT NULL COMMENT '端口',
`status` TINYINT DEFAULT 0 COMMENT '1在线 0离线 2异常',
`last_heartbeat_time` DATETIME DEFAULT NULL COMMENT '最后心跳时间',
-- 能力信息
`scene_name_json` JSON DEFAULT NULL COMMENT '支持场景名称',
`scene_code_json` JSON DEFAULT NULL COMMENT '支持场景编码',
-- 负载信息
`load_info` JSON DEFAULT NULL COMMENT '负载信息',
`max_tasks` INT DEFAULT 50 COMMENT '最大任务数',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_node_id` (`node_id`),
KEY `idx_status_heartbeat` (`status`, `last_heartbeat_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='算法节点表';JSON 字段示例:
{
"scene_code_json": ["REGION_INTRUSION", "HELMET_DETECTION", "FIRE_DETECTION"],
"load_info": {
"cpu": 0.42,
"gpu": 0.68,
"memory": 0.55,
"tasks": 12
}
}14.2 任务运行态表
DROP TABLE IF EXISTS `ai_task_runtime`;
CREATE TABLE `ai_task_runtime` (
`task_id` BIGINT NOT NULL COMMENT '任务ID',
`node_id` VARCHAR(100) NOT NULL COMMENT '主节点ID',
`state` VARCHAR(20) NOT NULL COMMENT 'RUNNING/STOPPED/ERROR',
`start_time` DATETIME DEFAULT NULL COMMENT '启动时间',
`last_keepalive` DATETIME DEFAULT NULL COMMENT '最后心跳',
`restart_count` INT DEFAULT 0 COMMENT '重启次数',
`preview_path` VARCHAR(255) DEFAULT NULL COMMENT '预览路径',
`preview_play_url` VARCHAR(512) DEFAULT NULL COMMENT '预览播放地址',
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`task_id`),
KEY `idx_node_state` (`node_id`, `state`),
KEY `idx_keepalive` (`last_keepalive`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='任务运行态表';14.3 场景运行态表
DROP TABLE IF EXISTS `ai_task_scene_runtime`;
CREATE TABLE `ai_task_scene_runtime` (
`task_id` BIGINT NOT NULL COMMENT '任务ID',
`scene_code` VARCHAR(50) NOT NULL COMMENT '场景编码',
`node_id` VARCHAR(100) DEFAULT NULL COMMENT '执行节点ID',
`state` VARCHAR(20) NOT NULL COMMENT 'RUNNING/STOPPED/ERROR',
`start_time` DATETIME DEFAULT NULL COMMENT '启动时间',
`last_keepalive` DATETIME DEFAULT NULL COMMENT '最后心跳',
`restart_count` INT DEFAULT 0 COMMENT '重启次数',
`error_message` VARCHAR(500) DEFAULT NULL COMMENT '错误消息',
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`task_id`, `scene_code`),
KEY `idx_node_state` (`node_id`, `state`),
KEY `idx_keepalive` (`last_keepalive`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='场景运行态表';为什么需要两层运行态?
任务级运行态 (
ai_task_runtime)- 记录任务整体状态
- 绑定主节点 (用于预览)
- 统计全局心跳
场景级运行态 (
ai_task_scene_runtime)- 支持场景拆分到不同节点
- 独立监控每个场景状态
- 精准故障定位
14.4 调度策略实现
@Service
@Slf4j
public class TaskDispatchService {
@Resource
private EngineNodeService nodeService;
/**
* 生成调度计划
*/
public DispatchPlan buildDispatchPlan(AiTask task, List<String> sceneCodes) {
String mode = task.getDispatchMode();
if (mode == null) mode = "MIXED";
Map<String, List<String>> nodeToScenes = new LinkedHashMap<>();
switch (mode) {
case "FIXED":
nodeToScenes = dispatchFixed(task, sceneCodes);
break;
case "AUTO_SINGLE":
nodeToScenes = dispatchAutoSingle(sceneCodes);
break;
case "AUTO_SPLIT":
nodeToScenes = dispatchAutoSplit(sceneCodes);
break;
case "MIXED":
default:
nodeToScenes = dispatchMixed(sceneCodes);
break;
}
String primaryNodeId = selectPrimaryNode(nodeToScenes);
return new DispatchPlan(nodeToScenes, primaryNodeId);
}
/**
* FIXED: 指定节点运行全部场景
*/
private Map<String, List<String>> dispatchFixed(AiTask task, List<String> sceneCodes) {
if (task.getPreferNodeId() == null) {
throw new BusinessException("FIXED模式必须指定preferNodeId");
}
String nodeId = nodeService.getNodeIdById(task.getPreferNodeId());
if (nodeId == null) {
throw new BusinessException("指定节点不存在");
}
if (!nodeService.supportsAllSceneCodes(nodeId, sceneCodes)) {
throw new BusinessException("指定节点不支持全部场景");
}
Map<String, List<String>> result = new HashMap<>();
result.put(nodeId, new ArrayList<>(sceneCodes));
return result;
}
/**
* AUTO_SINGLE: 自动选择单节点运行全部场景
*/
private Map<String, List<String>> dispatchAutoSingle(List<String> sceneCodes) {
String nodeId = nodeService.selectBestNodeForSceneCodes(sceneCodes);
if (nodeId == null) {
throw new BusinessException("未找到支持全部场景的节点");
}
Map<String, List<String>> result = new HashMap<>();
result.put(nodeId, new ArrayList<>(sceneCodes));
return result;
}
/**
* AUTO_SPLIT: 按场景拆分到不同节点
*/
private Map<String, List<String>> dispatchAutoSplit(List<String> sceneCodes) {
Map<String, List<String>> nodeToScenes = new LinkedHashMap<>();
for (String sceneCode : sceneCodes) {
String nodeId = nodeService.selectBestNodeForSceneCode(sceneCode);
if (nodeId == null) {
throw new BusinessException("未找到支持场景的节点: " + sceneCode);
}
nodeToScenes.computeIfAbsent(nodeId, k -> new ArrayList<>()).add(sceneCode);
}
return nodeToScenes;
}
/**
* MIXED: 优先单节点,失败则拆分
*/
private Map<String, List<String>> dispatchMixed(List<String> sceneCodes) {
String singleNodeId = nodeService.selectBestNodeForSceneCodes(sceneCodes);
if (singleNodeId != null) {
Map<String, List<String>> result = new HashMap<>();
result.put(singleNodeId, new ArrayList<>(sceneCodes));
return result;
}
log.info("No single node available, fallback to split mode");
return dispatchAutoSplit(sceneCodes);
}
/**
* 选择主节点 (用于预览)
*/
private String selectPrimaryNode(Map<String, List<String>> nodeToScenes) {
// 选择场景数量最多的节点作为主节点
return nodeToScenes.entrySet().stream()
.max(Comparator.comparingInt(e -> e.getValue().size()))
.map(Map.Entry::getKey)
.orElse(null);
}
}
@Data
@AllArgsConstructor
class DispatchPlan {
private Map<String, List<String>> nodeToScenes;
private String primaryNodeId;
}14.5 节点能力匹配
@Service
@Slf4j
public class EngineNodeService {
@Resource
private EngineNodeRepository nodeRepository;
@Resource
private TaskSceneRuntimeRepository sceneRuntimeRepository;
/**
* 选择最佳节点 (支持全部场景)
*/
public String selectBestNodeForSceneCodes(List<String> sceneCodes) {
List<EngineNode> candidates = getCandidateNodes(sceneCodes);
if (candidates.isEmpty()) {
return null;
}
// 按负载升序排序
return candidates.stream()
.min(Comparator.comparingInt(this::getActiveTaskCount))
.map(EngineNode::getNodeId)
.orElse(null);
}
/**
* 选择最佳节点 (支持单个场景)
*/
public String selectBestNodeForSceneCode(String sceneCode) {
return selectBestNodeForSceneCodes(Collections.singletonList(sceneCode));
}
/**
* 获取候选节点
*/
public List<EngineNode> getCandidateNodes(List<String> sceneCodes) {
List<EngineNode> allNodes = nodeRepository.findByStatus(1);
return allNodes.stream()
.filter(node -> supportsAllSceneCodes(node.getNodeId(), sceneCodes))
.filter(node -> !isOverloaded(node))
.collect(Collectors.toList());
}
/**
* 判断节点是否支持全部场景
*/
public boolean supportsAllSceneCodes(String nodeId, List<String> sceneCodes) {
EngineNode node = nodeRepository.findByNodeId(nodeId);
if (node == null) return false;
List<String> supportedCodes = parseSceneCodes(node.getSceneCodeJson());
return new HashSet<>(supportedCodes).containsAll(sceneCodes);
}
/**
* 获取节点当前运行任务数
*/
private int getActiveTaskCount(EngineNode node) {
return sceneRuntimeRepository.countByNodeIdAndState(node.getNodeId(), "RUNNING");
}
/**
* 判断节点是否过载
*/
private boolean isOverloaded(EngineNode node) {
int activeCount = getActiveTaskCount(node);
return activeCount >= node.getMaxTasks();
}
private List<String> parseSceneCodes(String json) {
if (json == null || json.isEmpty()) {
return Collections.emptyList();
}
try {
return JSON.parseArray(json, String.class);
} catch (Exception e) {
log.error("Failed to parse scene codes: {}", json, e);
return Collections.emptyList();
}
}
}15. WebSocket 协议规范
15.1 协议设计原则
- 消息类型明确:每条消息包含
type字段 - 双向通信:平台->算法,算法->平台
- 幂等性:重复接收相同消息不产生副作用
- 容错性:未知消息类型不中断连接
15.2 节点注册消息
算法节点 -> 平台:
{
"type": "NODE_REGISTER",
"node_id": "gpu-node-01",
"node_name": "GPU-Server-01",
"ip": "10.0.10.21",
"port": 18080,
"max_tasks": 50,
"algorithms": ["REGION_INTRUSION", "HELMET_DETECTION", "FIRE_DETECTION"],
"load": {
"cpu": 0.21,
"gpu": 0.64,
"memory": 0.55,
"tasks": 6
}
}平台处理:
@ServerEndpoint("/ws/ai-engine")
@Component
public class AiEngineSocket {
private static EngineNodeService nodeService;
private String sessionNodeId;
@OnMessage
public void onMessage(String message, Session session) {
try {
JSONObject msg = JSON.parseObject(message);
String type = msg.getString("type");
switch (type) {
case "NODE_REGISTER":
handleRegister(msg, session);
break;
case "NODE_HEARTBEAT":
handleHeartbeat(msg);
break;
case "TASK_KEEPALIVE":
handleTaskKeepalive(msg);
break;
case "AI_RESULT":
handleAiResult(msg);
break;
default:
log.warn("Unknown message type: {}", type);
}
} catch (Exception e) {
log.error("Message processing failed", e);
}
}
private void handleRegister(JSONObject msg, Session session) {
String nodeId = msg.getString("node_id");
// 会话绑定
this.sessionNodeId = nodeId;
session.getUserProperties().put("nodeId", nodeId);
// 更新节点状态
nodeService.upsertNode(msg);
log.info("Node registered: {}", nodeId);
}
@OnClose
public void onClose(Session session) {
if (sessionNodeId != null) {
nodeService.markOffline(sessionNodeId);
log.info("Node disconnected: {}", sessionNodeId);
}
}
}15.3 任务控制消息
平台 -> 算法节点 (启动任务):
{
"type": "START_TASK",
"data": {
"task_id": 1001,
"camera_id": 3001,
"scene_codes": ["REGION_INTRUSION", "HELMET_DETECTION"],
"stream_key": "3001:0",
"rtsp": "rtsp://admin:pass@10.0.1.20:554/Streaming/Channels/101",
"confThreshold": 0.6,
"region": [[0.1,0.1],[0.9,0.1],[0.9,0.9],[0.1,0.9]],
"params": {
"input_fps": 15,
"in_w": 1280,
"in_h": 720,
"result_fps": 8,
"quality": "MEDIUM",
"alarm_enabled": 1,
"snapshot_policy": "ALARM",
"snapshot_quality": 80
}
}
}平台 -> 算法节点 (停止任务):
{
"type": "STOP_TASK",
"data": {
"task_id": 1001
}
}15.4 结果上报消息
算法节点 -> 平台 (单场景结果):
{
"type": "AI_RESULT",
"task_id": 1001,
"camera_id": 3001,
"scene_code": "REGION_INTRUSION",
"ts": 1739426400000,
"frame_width": 1920,
"frame_height": 1080,
"alarm_event": true,
"confidence": 0.91,
"objects": [
{
"label": "person",
"score": 0.91,
"bbox": [0.1, 0.2, 0.3, 0.4]
}
],
"snapshot_path": "/static/snapshots/task_1001/2026-02-13-14-30-00.jpg"
}算法节点 -> 平台 (多场景结果):
{
"type": "AI_RESULT",
"task_id": 1001,
"camera_id": 3001,
"ts": 1739426400000,
"frame_width": 1920,
"frame_height": 1080,
"snapshot_path": "/static/snapshots/task_1001/2026-02-13-14-30-00.jpg",
"results": [
{
"scene_code": "REGION_INTRUSION",
"alarm_event": true,
"confidence": 0.91,
"objects": [...]
},
{
"scene_code": "HELMET_DETECTION",
"alarm_event": false,
"confidence": 0.87,
"objects": [...]
}
]
}15.5 预览控制消息
平台 -> 算法节点 (开始预览):
{
"type": "START_PREVIEW",
"data": {
"task_id": 1001,
"preview_path": "ai_task_1001",
"fps": 15,
"draw": true,
"encoder": "h264_nvenc",
"bitrate": "3000k",
"preview_mode": "RESULT_DRIVEN",
"width": 1280,
"height": 720
}
}平台 -> 算法节点 (停止预览):
{
"type": "STOP_PREVIEW",
"data": {
"task_id": 1001
}
}16. 结果处理与告警推送
16.1 场景最新态表
DROP TABLE IF EXISTS `ai_task_scene_state`;
CREATE TABLE `ai_task_scene_state` (
`task_id` BIGINT NOT NULL COMMENT '任务ID',
`camera_id` BIGINT NOT NULL COMMENT '相机ID',
`scene_code` VARCHAR(50) NOT NULL COMMENT '场景编码',
`scene_type_id` BIGINT DEFAULT NULL COMMENT '场景类型ID',
-- 最新结果
`last_ts` BIGINT NOT NULL COMMENT '最新时间戳',
`last_time` DATETIME NOT NULL COMMENT '最新时间',
`event_type` VARCHAR(20) DEFAULT NULL COMMENT '事件类型',
`event_desc` VARCHAR(255) DEFAULT NULL COMMENT '事件描述',
`alarm_event` TINYINT DEFAULT 0 COMMENT '是否告警',
`total_count` INT DEFAULT 0 COMMENT '检测总数',
-- 统计信息
`label_counts_json` JSON DEFAULT NULL COMMENT '标签计数',
`problem_counts_json` JSON DEFAULT NULL COMMENT '问题计数',
`metrics_json` JSON DEFAULT NULL COMMENT '指标信息',
`last_snapshot_path` VARCHAR(500) DEFAULT NULL COMMENT '最新快照',
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`task_id`, `scene_code`),
KEY `idx_camera_alarm` (`camera_id`, `alarm_event`),
KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='场景最新态表';JSON 字段示例:
{
"label_counts_json": {
"person": 3,
"vehicle": 1
},
"problem_counts_json": {
"no_helmet": 2
},
"metrics_json": {
"avg_confidence": 0.87,
"max_confidence": 0.94
}
}16.2 检测结果表
DROP TABLE IF EXISTS `ai_detection_result`;
CREATE TABLE `ai_detection_result` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`task_id` BIGINT NOT NULL COMMENT '任务ID',
`camera_id` BIGINT NOT NULL COMMENT '相机ID',
`group_id` BIGINT DEFAULT NULL COMMENT '分组ID',
`scene_type_id` BIGINT DEFAULT NULL COMMENT '场景类型ID',
`scene_code` VARCHAR(50) DEFAULT NULL COMMENT '场景编码',
`node_id` VARCHAR(100) DEFAULT NULL COMMENT '节点ID',
-- 检测时间
`detect_time` DATETIME NOT NULL COMMENT '检测时间',
`detect_timestamp` BIGINT NOT NULL COMMENT '检测时间戳',
-- 告警信息
`is_alarm` TINYINT DEFAULT 0 COMMENT '是否告警',
`alarm_level` TINYINT DEFAULT 0 COMMENT '告警等级',
`confidence` FLOAT DEFAULT NULL COMMENT '置信度',
`detect_count` INT DEFAULT 0 COMMENT '检测数量',
-- 检测结果
`objects_json` JSON DEFAULT NULL COMMENT '检测对象',
`event_type` VARCHAR(20) DEFAULT NULL COMMENT '事件类型',
`event_desc` VARCHAR(255) DEFAULT NULL COMMENT '事件描述',
`total_count` INT DEFAULT NULL COMMENT '总数',
`label_counts_json` JSON DEFAULT NULL COMMENT '标签计数',
`problem_counts_json` JSON DEFAULT NULL COMMENT '问题计数',
`metrics_json` JSON DEFAULT NULL COMMENT '指标',
-- 快照
`snapshot_path` VARCHAR(500) DEFAULT NULL COMMENT '快照路径',
-- 推送状态
`is_pushed` TINYINT DEFAULT 0 COMMENT '是否已推送',
`push_time` DATETIME DEFAULT NULL COMMENT '推送时间',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `idx_task_time` (`task_id`, `detect_time`),
KEY `idx_alarm_scene_group_time` (`is_alarm`, `scene_code`, `group_id`, `detect_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='检测结果表';16.3 结果处理流水线
@Service
@Slf4j
public class AiResultProcessorService {
@Resource
private TaskSceneStateService sceneStateService;
@Resource
private DetectionResultService detectionResultService;
@Resource
private AlarmPushService alarmPushService;
@Resource
private WebNotifySocket webNotifySocket;
private final BlockingQueue<JSONObject> resultQueue =
new LinkedBlockingQueue<>(20000);
private final ExecutorService processExecutor;
// 抽样控制
private final ConcurrentHashMap<String, Long> sampleCache = new ConcurrentHashMap<>();
@Value("${ai.result.sample-interval-ms:1000}")
private long sampleIntervalMs;
public AiResultProcessorService() {
int coreThreads = Runtime.getRuntime().availableProcessors();
this.processExecutor = new ThreadPoolExecutor(
coreThreads,
coreThreads * 2,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("result-processor-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 启动处理线程
for (int i = 0; i < coreThreads; i++) {
processExecutor.submit(this::processLoop);
}
}
/**
* 提交结果到队列
*/
public void submit(JSONObject resultMsg) {
boolean alarm = resultMsg.getBooleanValue("alarm_event");
boolean offered = resultQueue.offer(resultMsg);
if (!offered) {
if (!alarm) {
// 非告警直接丢弃
log.warn("Result queue full, dropping non-alarm event");
return;
}
// 告警消息强插
int tries = 0;
while (!offered && tries++ < 10) {
resultQueue.poll(); // 淘汰队头
offered = resultQueue.offer(resultMsg);
}
if (!offered) {
log.error("Failed to enqueue alarm event after retries");
}
}
}
/**
* 处理循环
*/
private void processLoop() {
while (!Thread.currentThread().isInterrupted()) {
try {
JSONObject msg = resultQueue.poll(1, TimeUnit.SECONDS);
if (msg != null) {
processResult(msg);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("Result processing error", e);
}
}
}
/**
* 处理单条结果
*/
private void processResult(JSONObject msg) {
Long taskId = msg.getLong("task_id");
Long cameraId = msg.getLong("camera_id");
Long ts = msg.getLong("ts");
String snapshotPath = msg.getString("snapshot_path");
// 兼容单场景和多场景
JSONArray resultsArray = msg.getJSONArray("results");
if (resultsArray != null && !resultsArray.isEmpty()) {
// 多场景结果
for (int i = 0; i < resultsArray.size(); i++) {
JSONObject sceneResult = resultsArray.getJSONObject(i);
processSceneResult(taskId, cameraId, ts, snapshotPath, sceneResult);
}
} else {
// 单场景结果
processSceneResult(taskId, cameraId, ts, snapshotPath, msg);
}
}
private void processSceneResult(Long taskId, Long cameraId, Long ts,
String snapshotPath, JSONObject sceneResult) {
String sceneCode = sceneResult.getString("scene_code");
Boolean alarmEvent = sceneResult.getBoolean("alarm_event");
// 1. 更新最新态 (总是执行)
sceneStateService.updateState(taskId, cameraId, sceneCode, ts, sceneResult);
// 2. 判断是否需要持久化
if (StringUtils.isBlank(snapshotPath)) {
// 无快照,仅更新状态
return;
}
if (!Boolean.TRUE.equals(alarmEvent)) {
// 非告警,抽样
if (!shouldSample(taskId, sceneCode, ts)) {
return;
}
}
// 3. 持久化结果
DetectionResult result = buildDetectionResult(
taskId, cameraId, sceneCode, ts, snapshotPath, sceneResult
);
detectionResultService.save(result);
// 4. 告警推送
if (Boolean.TRUE.equals(alarmEvent)) {
alarmPushService.onAlarmDetected(result, sceneCode);
}
}
/**
* 抽样判断
*/
private boolean shouldSample(Long taskId, String sceneCode, Long ts) {
String key = taskId + ":" + sceneCode;
Long lastTs = sampleCache.get(key);
if (lastTs == null || (ts - lastTs) >= sampleIntervalMs) {
sampleCache.put(key, ts);
return true;
}
return false;
}
private DetectionResult buildDetectionResult(Long taskId, Long cameraId,
String sceneCode, Long ts,
String snapshotPath,
JSONObject sceneResult) {
DetectionResult result = new DetectionResult();
result.setTaskId(taskId);
result.setCameraId(cameraId);
result.setSceneCode(sceneCode);
result.setDetectTimestamp(ts);
result.setDetectTime(LocalDateTime.ofInstant(
Instant.ofEpochMilli(ts),
ZoneId.systemDefault()
));
result.setIsAlarm(sceneResult.getBoolean("alarm_event") ? 1 : 0);
result.setConfidence(sceneResult.getFloat("confidence"));
result.setObjectsJson(sceneResult.getJSONArray("objects").toJSONString());
result.setSnapshotPath(snapshotPath);
result.setEventType(sceneResult.getString("event_type"));
result.setEventDesc(sceneResult.getString("event_desc"));
result.setTotalCount(sceneResult.getInteger("total_count"));
return result;
}
}处理流程图:
[算法结果] → [快路径: WebSocket推送前端] → [实时框显示]
↓
[慢路径]
↓
[抽样判断]
↓
[有快照?] ──NO→ [仅更新最新态]
│
YES
↓
[告警?] ──NO→ [抽样入库]
│
YES
↓
[入库] → [告警推送]16.4 告警推送服务
@Service
@Slf4j
public class AlarmPushService {
@Resource
private AiTaskRepository taskRepository;
@Resource
private AlarmPushConfigService configService;
@Resource
private AlarmPushLogRepository pushLogRepository;
private final BlockingQueue<PushTask> pushQueue = new LinkedBlockingQueue<>(20000);
private final ExecutorService pushExecutor;
// 推送桶 (用于聚合推送)
private final ConcurrentHashMap<String, PushBucket> buckets = new ConcurrentHashMap<>();
public AlarmPushService() {
this.pushExecutor = Executors.newFixedThreadPool(4,
new ThreadFactoryBuilder().setNameFormat("alarm-push-%d").build());
// 启动推送线程
for (int i = 0; i < 4; i++) {
pushExecutor.submit(this::pushLoop);
}
// 启动桶刷新线程
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(this::flushBuckets, 1, 1, TimeUnit.SECONDS);
}
public void onAlarmDetected(DetectionResult result, String sceneCode) {
AiTask task = taskRepository.findById(result.getTaskId()).orElse(null);
if (task == null) return;
// 获取推送策略
PushStrategy strategy = resolvePushStrategy(task);
if ("IMMEDIATE".equals(strategy.getMode())) {
// 立即推送
pushQueue.offer(new PushTask(result, task, sceneCode));
} else {
// 聚合推送
addToBucket(result, task, sceneCode, strategy);
}
}
private void addToBucket(DetectionResult result, AiTask task,
String sceneCode, PushStrategy strategy) {
String bucketKey = task.getId() + ":" + sceneCode;
PushBucket bucket = buckets.computeIfAbsent(bucketKey,
k -> new PushBucket(task, sceneCode, strategy));
bucket.add(result);
}
private void flushBuckets() {
long now = System.currentTimeMillis();
buckets.values().forEach(bucket -> {
if (bucket.shouldFlush(now)) {
List<DetectionResult> results = bucket.flush();
if (!results.isEmpty()) {
pushQueue.offer(new PushTask(results, bucket.task, bucket.sceneCode));
}
}
});
}
private void pushLoop() {
while (!Thread.currentThread().isInterrupted()) {
try {
PushTask task = pushQueue.poll(1, TimeUnit.SECONDS);
if (task != null) {
executePush(task);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("Push execution error", e);
}
}
}
private void executePush(PushTask task) {
try {
// 构建推送内容
String content = buildPushContent(task);
// 调用飞书/企业微信等 API
PushResponse response = sendToPushService(task.task.getPushRouteKey(), content);
// 记录日志
AlarmPushLog log = new AlarmPushLog();
log.setConfigId(0L);
log.setDetectionResultIds(JSON.toJSONString(
task.results.stream().map(DetectionResult::getId).collect(Collectors.toList())
));
log.setPushChannel("feishu");
log.setPushTarget(task.task.getPushRouteKey());
log.setPushContent(content);
log.setPushStatus(response.isSuccess() ? 1 : 0);
log.setResponseCode(response.getCode());
log.setResponseMessage(response.getMessage());
log.setPushTime(LocalDateTime.now());
pushLogRepository.save(log);
// 更新结果推送状态
if (response.isSuccess()) {
task.results.forEach(r -> {
r.setIsPushed(1);
r.setPushTime(LocalDateTime.now());
});
}
} catch (Exception e) {
log.error("Push failed for task {}", task.task.getId(), e);
}
}
private String buildPushContent(PushTask task) {
StringBuilder content = new StringBuilder();
content.append("【告警通知】\n");
content.append("相机: ").append(task.task.getCameraName()).append("\n");
content.append("场景: ").append(task.sceneCode).append("\n");
content.append("时间: ").append(LocalDateTime.now().format(
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
)).append("\n");
content.append("检测数量: ").append(task.results.size()).append("\n");
return content.toString();
}
private PushResponse sendToPushService(String routeKey, String content) {
// 实际推送逻辑
// 这里简化处理
return new PushResponse(true, 200, "success");
}
private PushStrategy resolvePushStrategy(AiTask task) {
// 简化实现
return new PushStrategy(
task.getPushMode() != null ? task.getPushMode() : "IMMEDIATE",
task.getPushThresholdCount() != null ? task.getPushThresholdCount() : 1,
task.getPushWindowSeconds() != null ? task.getPushWindowSeconds() : 60,
task.getPushCooldownSeconds() != null ? task.getPushCooldownSeconds() : 0
);
}
}
@Data
@AllArgsConstructor
class PushTask {
private List<DetectionResult> results;
private AiTask task;
private String sceneCode;
public PushTask(DetectionResult result, AiTask task, String sceneCode) {
this.results = Collections.singletonList(result);
this.task = task;
this.sceneCode = sceneCode;
}
}
@Data
class PushBucket {
private final AiTask task;
private final String sceneCode;
private final PushStrategy strategy;
private final List<DetectionResult> results = new CopyOnWriteArrayList<>();
private long firstAddTime;
private long lastFlushTime;
public PushBucket(AiTask task, String sceneCode, PushStrategy strategy) {
this.task = task;
this.sceneCode = sceneCode;
this.strategy = strategy;
this.firstAddTime = System.currentTimeMillis();
this.lastFlushTime = System.currentTimeMillis();
}
public void add(DetectionResult result) {
results.add(result);
if (firstAddTime == 0) {
firstAddTime = System.currentTimeMillis();
}
}
public boolean shouldFlush(long now) {
if (results.isEmpty()) return false;
// 冷却期检查
if (now - lastFlushTime < strategy.getCooldownSeconds() * 1000L) {
return false;
}
String mode = strategy.getMode();
if ("COUNT".equals(mode)) {
return results.size() >= strategy.getThresholdCount();
}
if ("WINDOW".equals(mode)) {
return (now - firstAddTime) >= strategy.getWindowSeconds() * 1000L;
}
if ("COUNT_OR_WINDOW".equals(mode)) {
return results.size() >= strategy.getThresholdCount() ||
(now - firstAddTime) >= strategy.getWindowSeconds() * 1000L;
}
return false;
}
public List<DetectionResult> flush() {
List<DetectionResult> flushed = new ArrayList<>(results);
results.clear();
firstAddTime = 0;
lastFlushTime = System.currentTimeMillis();
return flushed;
}
}
@Data
@AllArgsConstructor
class PushStrategy {
private String mode;
private int thresholdCount;
private int windowSeconds;
private int cooldownSeconds;
}
@Data
@AllArgsConstructor
class PushResponse {
private boolean success;
private int code;
private String message;
}推送模式说明:
| 模式 | 触发条件 | 适用场景 |
|---|---|---|
| IMMEDIATE | 每次告警立即推送 | 高优先级告警 |
| COUNT | 累计N次后推送 | 降低推送频率 |
| WINDOW | 时间窗口到期推送 | 定期汇总 |
| COUNT_OR_WINDOW | 满足任一条件 | 兼顾实时性和聚合 |
17. 预览链路实现
17.1 预览设计原则
- 任务运行 ≠ 预览推流
- 任务可以运行但不推预览流
- 预览流可以按需启停
- 预览资源独立管理
- 预览占用编码器资源
- 需要单独的生命周期控制
- 预览地址动态生成
- 节点切换后地址需更新
- 存储在运行态表
17.2 预览控制服务
@Service
@Slf4j
public class PreviewControlService {
@Resource
private TaskRuntimeRepository runtimeRepository;
@Resource
private EngineNodeRepository nodeRepository;
@Resource
private AiEngineSocket engineSocket;
/**
* 开始预览
*/
@Transactional(rollbackFor = Exception.class)
public void startPreview(StartPreviewRequest request) {
// 1. 查询运行态
TaskRuntime runtime = runtimeRepository.findById(request.getTaskId())
.orElseThrow(() -> new NotFoundException("任务未运行"));
if (!"RUNNING".equals(runtime.getState())) {
throw new BusinessException("任务未运行,无法开启预览");
}
// 2. 构建预览路径
String previewPath = request.getPreviewPath();
if (StringUtils.isBlank(previewPath)) {
previewPath = "ai_task_" + request.getTaskId();
}
// 3. 获取节点IP
EngineNode node = nodeRepository.findByNodeId(runtime.getNodeId());
if (node == null) {
throw new BusinessException("节点不存在");
}
// 4. 构建播放地址
String playUrl = String.format("http://%s:8889/%s", node.getIp(), previewPath);
// 5. 发送预览命令
JSONObject cmd = new JSONObject();
cmd.put("type", "START_PREVIEW");
JSONObject data = new JSONObject();
data.put("task_id", request.getTaskId());
data.put("preview_path", previewPath);
data.put("fps", request.getFps() != null ? request.getFps() : 15);
data.put("bitrate", request.getBitrate() != null ? request.getBitrate() : "3000k");
data.put("width", request.getWidth() != null ? request.getWidth() : 1280);
data.put("height", request.getHeight() != null ? request.getHeight() : 720);
data.put("draw", request.getDraw() != null ? request.getDraw() : true);
data.put("encoder", "h264_nvenc");
data.put("preview_mode", "RESULT_DRIVEN");
cmd.put("data", data);
engineSocket.sendCommand(runtime.getNodeId(), cmd);
// 6. 更新运行态
runtime.setPreviewPath(previewPath);
runtime.setPreviewPlayUrl(playUrl);
runtimeRepository.updateById(runtime);
log.info("Preview started: task={}, path={}, url={}",
request.getTaskId(), previewPath, playUrl);
}
/**
* 停止预览
*/
@Transactional(rollbackFor = Exception.class)
public void stopPreview(Long taskId) {
TaskRuntime runtime = runtimeRepository.findById(taskId)
.orElseThrow(() -> new NotFoundException("任务运行态不存在"));
if (StringUtils.isBlank(runtime.getPreviewPath())) {
log.warn("Preview not started for task {}", taskId);
return;
}
// 发送停止命令
JSONObject cmd = new JSONObject();
cmd.put("type", "STOP_PREVIEW");
JSONObject data = new JSONObject();
data.put("task_id", taskId);
cmd.put("data", data);
engineSocket.sendCommand(runtime.getNodeId(), cmd);
// 清除运行态
runtime.setPreviewPath(null);
runtime.setPreviewPlayUrl(null);
runtimeRepository.updateById(runtime);
log.info("Preview stopped: task={}", taskId);
}
/**
* 自动开启预览
*/
public void autoStartPreview(Long taskId, String nodeId, String previewPath) {
try {
StartPreviewRequest request = new StartPreviewRequest();
request.setTaskId(taskId);
request.setPreviewPath(previewPath);
request.setFps(15);
request.setBitrate("3000k");
request.setDraw(true);
startPreview(request);
} catch (Exception e) {
log.error("Auto start preview failed for task {}", taskId, e);
}
}
}
@Data
class StartPreviewRequest {
private Long taskId;
private String previewPath;
private Integer fps;
private String bitrate;
private Integer width;
private Integer height;
private Boolean draw;
}17.3 任务启停与预览联动
@Service
@Slf4j
public class AiTaskService {
@Resource
private TaskDispatchService dispatchService;
@Resource
private TaskRuntimeService runtimeService;
@Resource
private TaskSceneRuntimeService sceneRuntimeService;
@Resource
private PreviewControlService previewService;
@Resource
private AiEngineSocket engineSocket;
@Transactional(rollbackFor = Exception.class)
public void startTask(Long taskId) {
AiTask task = taskRepository.findById(taskId)
.orElseThrow(() -> new NotFoundException("任务不存在"));
// 1. 防重复启动
TaskRuntime runtime = runtimeService.findById(taskId);
if (runtime != null && isAliveRunning(runtime)) {
log.warn("Task {} already running", taskId);
return;
}
// 2. 解析场景编码
List<String> sceneCodes = parseSceneCodes(task.getSceneCodes());
if (sceneCodes.isEmpty()) {
throw new BusinessException("任务未配置场景");
}
// 3. 生成调度计划
DispatchPlan plan = dispatchService.buildDispatchPlan(task, sceneCodes);
// 4. 创建运行态
LocalDateTime now = LocalDateTime.now();
int restartCount = (runtime != null) ? runtime.getRestartCount() + 1 : 0;
runtimeService.upsertRunning(taskId, plan.getPrimaryNodeId(), now, restartCount);
// 5. 启动任务
Set<String> startedNodes = new LinkedHashSet<>();
try {
for (Map.Entry<String, List<String>> entry : plan.getNodeToScenes().entrySet()) {
String nodeId = entry.getKey();
List<String> scopedScenes = entry.getValue();
// 发送启动命令
JSONObject cmd = buildStartCommand(task, scopedScenes);
engineSocket.sendCommand(nodeId, cmd);
startedNodes.add(nodeId);
// 更新场景运行态
for (String sceneCode : scopedScenes) {
sceneRuntimeService.upsertRunning(taskId, sceneCode, nodeId, now, restartCount);
}
}
// 6. 更新任务主表
task.setStatus(1);
task.setLastRunTime(now);
taskRepository.updateById(task);
// 7. 自动开启预览
String previewPath = "ai_task_" + taskId;
previewService.autoStartPreview(taskId, plan.getPrimaryNodeId(), previewPath);
log.info("Task started: id={}, nodes={}", taskId, plan.getNodeToScenes().keySet());
} catch (Exception e) {
log.error("Task start failed: {}", taskId, e);
// 回滚: 停止已启动节点
for (String nodeId : startedNodes) {
safeStopTask(nodeId, taskId);
}
// 标记失败
runtimeService.markError(taskId, now);
sceneRuntimeService.markErrorByTask(taskId, now, "START_TASK failed: " + e.getMessage());
task.setStatus(2);
task.setErrorMessage(e.getMessage());
taskRepository.updateById(task);
throw e;
}
}
@Transactional(rollbackFor = Exception.class)
public void stopTask(Long taskId) {
AiTask task = taskRepository.findById(taskId)
.orElseThrow(() -> new NotFoundException("任务不存在"));
// 1. 停止预览
try {
previewService.stopPreview(taskId);
} catch (Exception e) {
log.error("Stop preview failed for task {}", taskId, e);
}
// 2. 查询运行节点
List<String> nodeIds = sceneRuntimeService.findNodeIdsByTaskId(taskId);
// 3. 发送停止命令
for (String nodeId : nodeIds) {
try {
JSONObject cmd = new JSONObject();
cmd.put("type", "STOP_TASK");
JSONObject data = new JSONObject();
data.put("task_id", taskId);
cmd.put("data", data);
engineSocket.sendCommand(nodeId, cmd);
} catch (Exception e) {
log.error("Stop task failed on node {}", nodeId, e);
}
}
// 4. 更新运行态
runtimeService.markStopped(taskId, LocalDateTime.now());
sceneRuntimeService.markStoppedByTask(taskId, LocalDateTime.now());
// 5. 更新任务主表
task.setStatus(0);
taskRepository.updateById(task);
log.info("Task stopped: id={}", taskId);
}
private boolean isAliveRunning(TaskRuntime runtime) {
if (!"RUNNING".equals(runtime.getState())) {
return false;
}
if (runtime.getLastKeepalive() == null) {
return false;
}
long elapsedSeconds = Duration.between(
runtime.getLastKeepalive(),
LocalDateTime.now()
).getSeconds();
return elapsedSeconds < 30; // 30秒超时
}
private JSONObject buildStartCommand(AiTask task, List<String> sceneCodes) {
JSONObject cmd = new JSONObject();
cmd.put("type", "START_TASK");
JSONObject data = new JSONObject();
data.put("task_id", task.getId());
data.put("camera_id", task.getCameraId());
data.put("scene_codes", sceneCodes);
data.put("stream_key", task.getCameraId() + ":" + task.getStreamType());
data.put("rtsp", task.getRtspUrl());
data.put("confThreshold", task.getConfThreshold());
// 区域
if (StringUtils.isNotBlank(task.getRegionJson())) {
data.put("region", JSON.parseArray(task.getRegionJson()));
}
// 参数
JSONObject params = new JSONObject();
params.put("input_fps", task.getInputFps());
params.put("in_w", task.getOutputWidth());
params.put("in_h", task.getOutputHeight());
params.put("result_fps", task.getOutputFps());
params.put("quality", task.getProcessQuality());
params.put("alarm_enabled", task.getAlarmEnabled());
params.put("snapshot_policy", task.getSnapshotPolicy());
params.put("snapshot_quality", task.getSnapshotQuality());
data.put("params", params);
cmd.put("data", data);
return cmd;
}
private void safeStopTask(String nodeId, Long taskId) {
try {
JSONObject cmd = new JSONObject();
cmd.put("type", "STOP_TASK");
JSONObject data = new JSONObject();
data.put("task_id", taskId);
cmd.put("data", data);
engineSocket.sendCommand(nodeId, cmd);
} catch (Exception e) {
log.error("Safe stop failed: node={}, task={}", nodeId, taskId, e);
}
}
private List<String> parseSceneCodes(String json) {
if (StringUtils.isBlank(json)) {
return Collections.emptyList();
}
try {
return JSON.parseArray(json, String.class);
} catch (Exception e) {
log.error("Failed to parse scene codes: {}", json, e);
return Collections.emptyList();
}
}
}第四部分:运维与演进
18. 配置管理
18.1 应用配置模板
# application.yml
server:
port: 8080
spring:
datasource:
url: jdbc:mysql://localhost:3306/video_analysis?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
username: root
password: ${DB_PASSWORD}
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
maximum-pool-size: 20
minimum-idle: 5
connection-timeout: 30000
# 相机配置
camera:
onboarding:
async-enabled: true
sync-timeout-ms: 15000
duplicate-check:
by-ip-port: true
onvif:
default-port: 80
connect-timeout-ms: 3000
read-timeout-ms: 5000
heartbeat-timeout-ms: 3000
soap:
strict-fault-check: true
allow-capability-fallback: true
stream:
naming: "cam_{ip}_ch{channel}_{subtype}"
subtype-rule:
main-min-width: 1280
retry:
register-max-attempts: 3
register-backoff-ms: 1000
health:
check-interval-ms: 30000
initial-delay-ms: 10000
offline-alarm-threshold-sec: 180
# 编解码配置
codec:
probe:
ffprobe-bin: ffprobe
timeout-ms: 3000
max-concurrency: 8
transcode:
enabled: true
assume-h265-when-unknown: true
ffmpeg-bin: ffmpeg
rtsp-transport: tcp
stimeout-us: 5000000
probesize: 5000000
analyzeduration: 5000000
x264:
preset: veryfast
crf: 23
gop: 50
enable-audio: false
# MediaMTX 配置
media:
mtx:
api-base-url: http://127.0.0.1:9997
rtsp-port: 8554
webrtc-port: 8889
api:
path-add: /v3/config/paths/add/
path-get: /v3/config/paths/get/
path-list: /v3/config/paths/list
path-delete: /v3/config/paths/delete/
# 录像配置
recording:
default:
retention-days: 7
segment-duration: 10m
record-format: fmp4
index:
fixed-delay-ms: 30000
scan-depth: 4
lookback-hours: 24
storage:
refresh-interval-ms: 15000
auto-switch-enabled: true
migrate-batch-size: 20
# 算法任务配置
ai:
task:
keepalive-timeout-seconds: 30
scene-keepalive-timeout-seconds: 30
dedup-running-seconds: 10
result:
queue:
size: 20000
worker:
min: 4
max: 16
sample-interval-ms: 1000
batch:
enabled: false
size: 200
flush-ms: 500
preview:
auto-start: true
default-fps: 15
default-width: 1280
default-height: 720
default-bitrate: 3000k
default-encoder: h264_nvenc
push:
queue-size: 20000
flush-ms: 1000
routes-file: push_routes.json
level-policy-file: level_policy.json
# WebSocket 配置
ws:
ai-engine:
endpoint: /ws/ai-engine
web-notify:
endpoint: /ws/web-notify18.2 MediaMTX 配置模板
# mediamtx.yml
###############################################
# Global settings
# API
api: yes
apiAddress: :9997
# Metrics
metrics: yes
metricsAddress: :9998
###############################################
# RTSP server
rtsp: yes
rtspAddress: :8554
protocols: [tcp]
encryption: "no"
rtspAddress: :8554
###############################################
# WebRTC server
webrtc: yes
webrtcAddress: :8889
webrtcICEServers:
- urls: [stun:stun.l.google.com:19302]
###############################################
# HLS server
hls: yes
hlsAddress: :8888
hlsAlwaysRemux: no
hlsVariant: lowLatency
hlsSegmentCount: 7
hlsSegmentDuration: 1s
hlsPartDuration: 200ms
###############################################
# Recording
record: no
recordPath: /recordings/%path/%Y-%m-%d/%H-%M-%S
recordFormat: fmp4
recordPartDuration: 1s
recordSegmentDuration: 10m
recordDeleteAfter: 7d
###############################################
# Path defaults
paths:
all:
sourceOnDemand: yes
sourceOnDemandStartTimeout: 10s
sourceOnDemandCloseAfter: 10s
runOnDemandStartTimeout: 10s
runOnDemandCloseAfter: 10s18.3 Docker Compose 部署模板
version: "3.8"
services:
mysql:
image: mysql:8.0
container_name: video-mysql
restart: always
environment:
MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD}
MYSQL_DATABASE: video_analysis
TZ: Asia/Shanghai
ports:
- "3306:3306"
volumes:
- ./data/mysql:/var/lib/mysql
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
command: --default-authentication-plugin=mysql_native_password
mediamtx:
image: bluenviron/mediamtx:latest
container_name: video-mediamtx
restart: always
ports:
- "8554:8554" # RTSP
- "8888:8888" # HLS
- "8889:8889" # WebRTC
- "9997:9997" # API
- "9998:9998" # Metrics
volumes:
- ./mediamtx.yml:/mediamtx.yml
- ./data/recordings:/recordings
environment:
TZ: Asia/Shanghai
app:
image: video-analysis-platform:latest
container_name: video-app
restart: always
depends_on:
- mysql
- mediamtx
ports:
- "8080:8080"
environment:
SPRING_DATASOURCE_URL: jdbc:mysql://mysql:3306/video_analysis?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
SPRING_DATASOURCE_USERNAME: root
SPRING_DATASOURCE_PASSWORD: ${MYSQL_ROOT_PASSWORD}
MEDIA_MTX_API_BASE_URL: http://mediamtx:9997
TZ: Asia/Shanghai
volumes:
- ./data/snapshots:/app/snapshots
- ./logs:/app/logs
nginx:
image: nginx:alpine
container_name: video-nginx
restart: always
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./data/recordings:/usr/share/nginx/html/recordings
- ./data/snapshots:/usr/share/nginx/html/snapshots
depends_on:
- app19. 可观测性体系
19.1 核心指标定义
相机接入层:
camera_onboarding_total # 接入总数
camera_onboarding_success_total # 接入成功数
camera_onboarding_duration_seconds # 接入耗时
camera_online_count # 在线设备数
camera_offline_count # 离线设备数
camera_offline_event_duration_seconds # 离线时长流媒体层:
stream_path_total # 路径总数
stream_path_register_success_rate # 注册成功率
stream_transcode_count # 转码路数
media_gateway_request_duration_seconds # API 调用耗时录像层:
recording_plan_active_count # 活跃计划数
recording_file_index_lag_seconds # 索引延迟
storage_volume_usage_percent # 存储使用率
storage_volume_full_count # 满盘卷数算法层:
ai_task_running_count # 运行中任务数
ai_task_error_count # 异常任务数
ai_engine_online_nodes # 在线节点数
ai_result_queue_depth # 结果队列深度
ai_result_drop_total # 丢弃结果数
ai_alarm_push_success_rate # 推送成功率19.2 Prometheus 指标暴露
@Component
public class MetricsCollector {
private final Counter cameraOnboardingTotal;
private final Counter cameraOnboardingSuccess;
private final Histogram cameraOnboardingDuration;
private final Gauge cameraOnlineCount;
private final Gauge cameraOfflineCount;
public MetricsCollector(MeterRegistry registry) {
this.cameraOnboardingTotal = Counter.builder("camera_onboarding_total")
.description("Total camera onboarding attempts")
.register(registry);
this.cameraOnboardingSuccess = Counter.builder("camera_onboarding_success_total")
.description("Successful camera onboardings")
.register(registry);
this.cameraOnboardingDuration = Histogram.builder("camera_onboarding_duration_seconds")
.description("Camera onboarding duration")
.register(registry);
this.cameraOnlineCount = Gauge.builder("camera_online_count", this::getOnlineCount)
.description("Online camera count")
.register(registry);
this.cameraOfflineCount = Gauge.builder("camera_offline_count", this::getOfflineCount)
.description("Offline camera count")
.register(registry);
}
public void recordOnboardingAttempt() {
cameraOnboardingTotal.increment();
}
public void recordOnboardingSuccess(long durationMs) {
cameraOnboardingSuccess.increment();
cameraOnboardingDuration.record(durationMs / 1000.0);
}
private double getOnlineCount() {
return cameraRepository.countByStatus(1);
}
private double getOfflineCount() {
return cameraRepository.countByStatus(0);
}
}19.3 告警规则模板
# prometheus-alerts.yml
groups:
- name: camera_alerts
interval: 30s
rules:
- alert: CameraOnboardingFailureRateHigh
expr: |
(
rate(camera_onboarding_total[5m]) -
rate(camera_onboarding_success_total[5m])
) / rate(camera_onboarding_total[5m]) > 0.05
for: 5m
labels:
severity: warning
annotations:
summary: "相机接入失败率过高"
description: "过去 5 分钟接入失败率超过 5%"
- alert: CameraOfflineCountHigh
expr: camera_offline_count > 10
for: 3m
labels:
severity: critical
annotations:
summary: "离线设备数量过多"
description: "当前离线设备数: {{ $value }}"
- name: storage_alerts
interval: 30s
rules:
- alert: StorageVolumeAlmostFull
expr: storage_volume_usage_percent > 90
for: 3m
labels:
severity: warning
annotations:
summary: "存储卷即将满盘"
description: "卷 {{ $labels.volume_id }} 使用率: {{ $value }}%"
- alert: RecordingIndexLagHigh
expr: recording_file_index_lag_seconds > 120
for: 5m
labels:
severity: warning
annotations:
summary: "录像索引延迟过高"
description: "索引延迟: {{ $value }} 秒"
- name: ai_alerts
interval: 30s
rules:
- alert: AiResultQueueFull
expr: ai_result_queue_depth > 18000
for: 2m
labels:
severity: critical
annotations:
summary: "结果队列即将满"
description: "队列深度: {{ $value }}"
- alert: AlarmPushFailureRateHigh
expr: ai_alarm_push_success_rate < 0.95
for: 5m
labels:
severity: warning
annotations:
summary: "告警推送成功率过低"
description: "成功率: {{ $value }}"19.4 日志规范
日志级别使用:
- ERROR: 系统异常,需要人工介入
- WARN: 降级或重试成功的场景
- INFO: 关键业务事件 (接入成功、任务启动等)
- DEBUG: 详细调试信息
日志格式:
[时间] [级别] [线程] [类名] - [业务标识] 消息内容示例:
2026-02-13 14:30:00.123 INFO [main] c.e.CameraOnboardingService - [camera:1001] Device onboarded successfully: ip=10.0.1.20, manufacturer=Hikvision
2026-02-13 14:30:05.456 WARN [task-1] c.e.MediaPathService - [path:cam_10_0_1_20_ch1_main] MediaMTX registration failed, retrying (1/3)
2026-02-13 14:30:10.789 ERROR [task-2] c.e.OnvifManager - [camera:1002] ONVIF connection timeout: ip=10.0.1.21, timeout=3000ms20. 典型问题与解决方案
20.1 设备接入层
问题 1: ONVIF 连接超时
现象:
OnvifException: Connection timeout after 3000ms排查步骤:
ping设备 IP 确认网络连通性telnet ip 80确认端口开放- 浏览器访问
http://ip/onvif/device_service验证服务 - 检查防火墙规则
解决方案:
- 增加超时时间至 5-10 秒
- 配置网络路由
- 使用设备 Web 界面开启 ONVIF 服务
问题 2: WS-Security 认证失败
现象:
SOAP Fault: The security token could not be authenticated原因: 服务器与设备时间误差 > 5 秒
解决方案:
# 服务器端同步 NTP
ntpdate ntp.aliyun.com
# 设备端配置 NTP
设备 Web 界面 -> 系统配置 -> 时间设置 -> NTP 服务器: ntp.aliyun.com问题 3: 在线但无画面
现象: camera_status=1, 但 RTSP 拉流失败
原因: 控制面可达但媒体面不可达
解决方案:
// 增加媒体可用性检查
public boolean checkMediaAvailable(String rtspUrl) {
try {
ProcessBuilder pb = new ProcessBuilder(
"ffmpeg", "-i", rtspUrl,
"-t", "1", "-f", "null", "-"
);
Process p = pb.start();
return p.waitFor(5, TimeUnit.SECONDS) && p.exitValue() == 0;
} catch (Exception e) {
return false;
}
}20.2 录像层
问题 4: 计划显示已开启但无文件
排查步骤:
- 检查
recording_plan.status是否为 1 - 检查 MediaMTX 路径配置:
GET /v3/config/paths/get/{pathName} - 检查存储卷状态:
storage_volume.status和is_full - 检查源 RTSP 可拉流性
解决方案:
// 增加健康检查
@Scheduled(fixedDelay = 60000)
public void checkRecordingHealth() {
List<RecordingPlan> plans = planRepository.findByStatus(1);
for (RecordingPlan plan : plans) {
// 检查最近 5 分钟是否有新文件
long recentFileCount = fileRepository.countByStreamNameAndTimeSince(
plan.getStreamName(),
LocalDateTime.now().minusMinutes(5)
);
if (recentFileCount == 0) {
log.warn("No recent files for plan {}, reapplying config", plan.getId());
planService.applyPlan(plan.toRequest());
}
}
}问题 5: 回放查询有空洞
现象: 物理文件存在,数据库查不到
原因: 索引任务只扫描最近窗口,漏扫历史补写文件
解决方案:
// 增加全量补扫任务
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨 2 点
public void fullScan() {
List<StorageVolume> volumes = volumeRepository.findByStatus(1);
for (StorageVolume volume : volumes) {
scanVolumeFullRange(volume);
}
}20.3 算法层
问题 6: 任务显示运行中,但算法端无负载
排查步骤:
- 检查
ai_task_runtime.state是否真的为RUNNING - 检查节点 WebSocket 会话是否在线
- 检查算法端日志是否收到
START_TASK命令 - 检查
TASK_KEEPALIVE心跳上报
解决方案:
// 增加守护任务
@Scheduled(fixedDelay = 10000)
public void watchdogTask() {
List<TaskRuntime> runtimes = runtimeRepository.findByState("RUNNING");
for (TaskRuntime runtime : runtimes) {
long elapsed = Duration.between(
runtime.getLastKeepalive(),
LocalDateTime.now()
).getSeconds();
if (elapsed > 30) {
log.warn("Task {} keepalive timeout, marking ERROR", runtime.getTaskId());
runtime.setState("ERROR");
runtimeRepository.updateById(runtime);
// 通知节点断连处理
onNodeDisconnected(runtime.getNodeId());
}
}
}问题 7: 结果队列堆积
现象: ai_result_queue_depth 持续增长,内存告警
原因: 处理速度跟不上生产速度
解决方案:
// 1. 启用非告警抽样
@Value("${ai.result.sample-interval-ms:1000}")
private long sampleIntervalMs;
// 2. 启用批量入库
@Value("${ai.result.batch.enabled:false}")
private boolean batchEnabled;
@Value("${ai.result.batch.size:200}")
private int batchSize;
// 3. 增加处理线程
int coreThreads = Runtime.getRuntime().availableProcessors();
this.processExecutor = new ThreadPoolExecutor(
coreThreads * 2, // 增加并发
coreThreads * 4,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000)
);21. FAQ 与最佳实践
21.1 相机接入
Q: 是否必须支持 ONVIF?
A: 不是必须,但强烈建议。ONVIF 提供标准化的设备管理和流获取方式。不支持 ONVIF 的设备可以通过厂商兜底模式接入,但需要手动维护 RTSP 模板。
Q: 如何处理厂商差异?
A: 分层处理:
- 协议层做最大兼容 (Fault 解析、能力降级)
- 业务层提供厂商兜底开关
- 配置层维护厂商 RTSP 模板库
Q: 主码流和子码流如何选择?
A: 建议策略:
- 录像: 主码流
- 实时预览: 子码流
- 算法分析: 根据精度要求选择
21.2 流媒体
Q: 为什么推荐 MediaMTX 而不是 ZLMediaKit?
A: 两者都是优秀的开源方案,选择 MediaMTX 的原因:
- 单一二进制,部署简单
- RESTful API 设计现代化
- WebRTC 支持原生
- 社区活跃,文档完善
Q: H.265 一定要转码吗?
A: 不是。建议策略:
- 浏览器实时预览: 转 H.264
- 录像存储: 保留 H.265 (节省空间)
- 算法分析: 视算法输入要求而定
Q: 转码会增加多少延迟?
A: 软件转码: 1-3 秒 硬件转码 (NVENC): 500ms-1 秒
21.3 录像
Q: 切片时长怎么选?
A: 综合考虑:
- 1-2 分钟: 文件数量大,IO 压力高
- 5-10 分钟: 推荐,综合最优
- 30-60 分钟: 文件少,但回放拖拽粗糙
Q: FMP4 vs TS 格式?
A: FMP4 优势:
- 现代浏览器原生支持
- 元数据结构清晰
- 后期处理友好
TS 优势:
- 兼容性更广 (老设备/老浏览器)
- 容错性强 (部分损坏不影响播放)
Q: 如何避免磁盘满导致录像中断?
A: 多层防护:
- 水位线机制自动切卷
- 定时清理过期文件
- 磁盘使用率告警
- 降级策略 (关闭非关键路径录像)
21.4 算法
Q: 一个任务可以配置多少个场景?
A: 理论无上限,实际建议:
- 单节点模式: 3-5 个
- 拆分模式: 5-10 个
过多场景会导致:
- 推理开销增加
- 结果处理复杂
- 告警频率过高
Q: 任务失败如何重试?
A: 分级重试:
- 瞬时失败 (网络抖动): 自动重启,计数器 +1
- 持续失败 (设备离线): 标记 ERROR,等待设备恢复
- 配置错误: 不重试,需要人工修复
Q: 如何控制告警推送频率?
A: 使用推送模式:
- 高优先级:
IMMEDIATE - 中优先级:
COUNT=5, WINDOW=600s - 低优先级:
WINDOW=3600s
并配合冷却期避免风暴。
22. 性能基准与压测
22.1 接入性能基准
测试环境:
- CPU: 8C16G
- 网络: 千兆局域网
- 设备: 海康威视 DS-2CD2142FWD-I
测试结果:
| 并发数 | 接入成功率 | P50 耗时 | P95 耗时 | P99 耗时 |
|---|---|---|---|---|
| 10 | 100% | 850ms | 1200ms | 1500ms |
| 50 | 99.2% | 1100ms | 1800ms | 2400ms |
| 100 | 97.5% | 1450ms | 2500ms | 3200ms |
| 200 | 94.8% | 2100ms | 3800ms | 4800ms |
瓶颈分析:
- 网络 I/O 是主要瓶颈
- ONVIF SOAP 解析占用 15-20% CPU
- FFprobe 探测占用 30-40% CPU
优化建议:
- 接入任务异步化
- FFprobe 并发限制 (信号量)
- 设备信息缓存
22.2 录像性能基准
测试环境:
- CPU: 16C32G
- 存储: NVMe SSD
- 路数: 500 路并发录像
测试结果:
| 码率 | 切片时长 | 磁盘写入 | CPU 使用 | 内存使用 |
|---|---|---|---|---|
| 2Mbps | 10m | 125MB/s | 18% | 4.2GB |
| 4Mbps | 10m | 250MB/s | 22% | 5.8GB |
| 6Mbps | 10m | 375MB/s | 28% | 7.5GB |
索引性能:
- 扫描速度: 10000 文件/秒
- 索引延迟: P95 < 30 秒
22.3 算法性能基准
测试环境:
- GPU: NVIDIA RTX 3080
- 算法: YOLOv8m
- 输入: 1280x720 @ 15fps
单节点容量:
| 场景数/任务 | 并发任务数 | GPU 使用 | 推理延迟 |
|---|---|---|---|
| 1 | 50 | 68% | 45ms |
| 2 | 35 | 72% | 52ms |
| 3 | 25 | 78% | 61ms |
结果处理:
- 队列吞吐: 20000 条/秒
- 入库延迟: P95 < 200ms
- 告警推送: P95 < 500ms
23. 架构演进路线
23.1 当前架构 (v1.0)
特点:
- 单体应用
- MySQL 单实例
- 本地文件存储
- 单节点 MediaMTX
适用场景:
- 设备数 < 1000
- 并发流 < 200
- 算法任务 < 100
23.2 中期演进 (v2.0)
目标: 支撑 5000 设备、1000 并发流
改进点:
- 服务拆分
单体应用 → 接入服务 + 录像服务 + 算法服务- 存储优化
本地存储 → 对象存储 (MinIO/OSS)
分层存储: 热数据 SSD + 冷数据 HDD- MediaMTX 集群
单节点 → 多节点 + LVS 负载均衡
路径哈希分片- 数据库读写分离
主库: 写入
从库: 查询 + 统计23.3 长期演进 (v3.0)
目标: 支撑 10万+ 设备、万级并发流
改进点:
- 微服务化
按业务域拆分:
- 设备管理服务
- 流媒体服务
- 录像服务
- 算法服务
- 告警服务
服务间通信: gRPC + MQ- 分布式存储
录像: Ceph 集群
元数据: TiDB
缓存: Redis 集群- 边缘计算
中心节点 + 边缘节点
边缘: 实时分析 + 本地录像
中心: 长期存储 + 全局调度- 云原生改造
容器化: Kubernetes
服务网格: Istio
可观测: Prometheus + Grafana + Jaeger总结
本文从零开始,构建了一个完整的企业级视频智能分析平台,覆盖:
- 相机接入: ONVIF 协议深度实践、流路径治理、编码决策
- 录像存储: 存储卷管理、文件索引、回放检索
- 智能分析: 多场景任务模型、节点调度、结果处理、告警推送
- 运维体系: 配置管理、可观测性、问题排查、性能优化
核心设计原则:
- 分层解耦: 控制面/媒体面、策略/执行、实时/批量
- 统一抽象: 路径模型、场景模型、状态模型
- 容错优先: 异步化、重试、降级、回滚
- 可观测: 日志、指标、链路、告警
技术栈总结:
| 层次 | 技术选型 |
|---|---|
| 协议 | ONVIF (SOAP/WS-Security) |
| 流媒体 | MediaMTX |
| 编解码 | FFmpeg/FFprobe |
| 存储 | MySQL + 文件系统 |
| 通信 | WebSocket + RESTful |
| 监控 | Prometheus + Grafana |
贡献者
flycodeu
版权所有
版权归属:flycodeu
