从 0 到 1 构建企业级视频智能分析平台:完整工程实践
一份可直接落地的全栈技术方案 覆盖相机接入、流媒体编排、存储治理、智能分析全链路 包含架构设计、核心代码、数据模型、运维体系与性能基准
目录
第一部分:平台基础与相机接入
第二部分:录像存储系统
第三部分:智能分析平台
第四部分:运维与演进
第三部分:智能分析平台
13. 算法任务模型设计
13.1 任务模型的核心挑战
问题 1:一个任务一个算法的局限性
真实场景:
- 厂区门口需要同时检测:人员入侵、车辆违停、未戴安全帽
- 传统方案:创建 3 个任务,拉流 3 次,浪费带宽和算力
问题 2:多场景任务的状态复杂性
- 同一任务的不同场景可能分配到不同算法节点
- 需要同时维护任务级状态和场景级状态
- 部分场景失败不应影响其他场景
13.2 任务表设计
DROP TABLE IF EXISTS `ai_task`;CREATE TABLE `ai_task` ( `id` BIGINT NOT NULL AUTO_INCREMENT, `task_name` VARCHAR(255) DEFAULT NULL COMMENT '任务名称', `camera_id` BIGINT DEFAULT NULL COMMENT '相机ID', `group_id` BIGINT DEFAULT NULL COMMENT '分组ID', `camera_name` VARCHAR(255) DEFAULT NULL COMMENT '相机名称(冗余)', `stream_path_id` BIGINT DEFAULT NULL COMMENT '流路径ID',
-- 流信息 `rtsp_url` VARCHAR(1024) DEFAULT NULL COMMENT 'RTSP源地址', `stream_path_name` VARCHAR(255) DEFAULT NULL COMMENT '流路径名', `stream_type` TINYINT DEFAULT 0 COMMENT '0主码流 1子码流',
-- 算法配置 `scene_codes` JSON DEFAULT NULL COMMENT '场景编码数组', `region_json` TEXT COMMENT '检测区域', `algorithm_params` TEXT COMMENT '算法参数',
-- 调度配置 `dispatch_mode` VARCHAR(20) DEFAULT 'MIXED' COMMENT 'MIXED/AUTO_SINGLE/AUTO_SPLIT/FIXED', `prefer_node_id` BIGINT DEFAULT NULL COMMENT '指定节点ID(FIXED模式)',
-- 检测参数 `conf_threshold` FLOAT DEFAULT 0.6 COMMENT '置信度阈值', `alarm_enabled` TINYINT DEFAULT 1 COMMENT '是否告警', `level` INT DEFAULT 1 COMMENT '告警等级',
-- 视频处理参数 `input_fps` INT DEFAULT 15 COMMENT '输入帧率', `output_fps` INT DEFAULT 15 COMMENT '输出帧率', `output_width` INT DEFAULT 1280 COMMENT '输出宽度', `output_height` INT DEFAULT 720 COMMENT '输出高度', `output_bitrate` VARCHAR(20) DEFAULT '3000k' COMMENT '输出码率', `process_quality` VARCHAR(20) DEFAULT 'MEDIUM' COMMENT 'LOW/MEDIUM/HIGH',
-- 快照配置 `snapshot_policy` VARCHAR(16) DEFAULT 'ALARM' COMMENT 'OFF/ALL/ALARM', `snapshot_quality` INT DEFAULT 80 COMMENT '快照质量(1-100)',
-- 推送配置 `push_enabled` TINYINT DEFAULT 1 COMMENT '是否推送', `push_mode` VARCHAR(32) DEFAULT 'LEVEL_DEFAULT' COMMENT '推送模式', `push_threshold_count` INT DEFAULT NULL COMMENT '推送阈值次数', `push_window_seconds` INT DEFAULT NULL COMMENT '推送窗口秒数', `push_cooldown_seconds` INT DEFAULT 0 COMMENT '推送冷却秒数', `push_route_key` VARCHAR(64) DEFAULT NULL COMMENT '推送路由键',
-- 运行状态 `status` TINYINT DEFAULT 0 COMMENT '0停止 1运行 2异常', `error_count` INT DEFAULT 0 COMMENT '错误次数', `error_message` VARCHAR(1024) DEFAULT NULL COMMENT '错误消息', `last_run_time` DATETIME DEFAULT NULL COMMENT '最后运行时间', `last_keepalive_time` DATETIME DEFAULT NULL COMMENT '最后心跳时间',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP, `update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`), KEY `idx_camera_status` (`camera_id`, `status`), KEY `idx_group_id` (`group_id`), KEY `idx_dispatch_mode` (`dispatch_mode`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='算法任务表';JSON 字段示例:
{ "scene_codes": ["REGION_INTRUSION", "HELMET_DETECTION", "VEHICLE_PARKING"]}13.3 场景分类表
DROP TABLE IF EXISTS `ai_scene_category`;CREATE TABLE `ai_scene_category` ( `id` BIGINT NOT NULL AUTO_INCREMENT, `category_name` VARCHAR(64) NOT NULL COMMENT '分类名称', `category_code` VARCHAR(64) NOT NULL COMMENT '分类编码', `icon` VARCHAR(128) DEFAULT NULL COMMENT '图标', `sort_order` INT DEFAULT 0 COMMENT '排序', `status` TINYINT DEFAULT 1 COMMENT '状态', `create_time` DATETIME DEFAULT CURRENT_TIMESTAMP, `update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `uk_category_code` (`category_code`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='场景分类表';示例数据:
INSERT INTO ai_scene_category (category_name, category_code, sort_order) VALUES('安全管理', 'SAFETY', 1),('行为识别', 'BEHAVIOR', 2),('车辆管理', 'VEHICLE', 3),('环境监测', 'ENVIRONMENT', 4);13.4 场景类型表
DROP TABLE IF EXISTS `ai_scene_type`;CREATE TABLE `ai_scene_type` ( `id` BIGINT NOT NULL AUTO_INCREMENT, `category_id` BIGINT NOT NULL COMMENT '分类ID', `scene_name` VARCHAR(100) NOT NULL COMMENT '场景名称', `scene_code` VARCHAR(64) NOT NULL COMMENT '场景编码', `algorithm_type` VARCHAR(32) DEFAULT NULL COMMENT '算法类型', `model_path` VARCHAR(255) DEFAULT NULL COMMENT '模型路径', `default_params` JSON DEFAULT NULL COMMENT '默认参数', `description` VARCHAR(500) DEFAULT NULL COMMENT '描述', `icon` VARCHAR(128) DEFAULT NULL COMMENT '图标', `status` TINYINT DEFAULT 1 COMMENT '状态', `create_time` DATETIME DEFAULT CURRENT_TIMESTAMP, `update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `uk_scene_code` (`scene_code`), KEY `idx_category_status` (`category_id`, `status`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='场景类型表';示例数据:
INSERT INTO ai_scene_type (category_id, scene_name, scene_code, algorithm_type, default_params) VALUES(1, '区域入侵检测', 'REGION_INTRUSION', 'object_detection', '{"problem_labels":["person","vehicle"],"conf_threshold":0.6}'),(1, '安全帽检测', 'HELMET_DETECTION', 'classification', '{"problem_labels":["no_helmet"],"conf_threshold":0.7}'),(3, '车辆违停检测', 'VEHICLE_PARKING', 'object_detection', '{"problem_labels":["car","truck"],"conf_threshold":0.6}');13.5 任务请求模型
@Datapublic class AiTaskUpsertRequest { private Long id; private String taskName;
// 相机绑定 private Long cameraId; private Long streamPathId; private String rtspUrl; private Integer streamType; // 0主码流 1子码流
// 场景配置 (核心) private List<String> sceneCodes;
// 检测区域 private String regionJson; private String algorithmParams;
// 调度配置 private String dispatchMode; // MIXED/AUTO_SINGLE/AUTO_SPLIT/FIXED private Long preferNodeId;
// 检测参数 private Float confThreshold; private Integer alarmEnabled; private Integer level;
// 视频处理 private Integer inputFps; private Integer outputFps; private Integer outputWidth; private Integer outputHeight; private String outputBitrate; private String processQuality;
// 快照配置 private String snapshotPolicy; private Integer snapshotQuality;
// 推送配置 private Integer pushEnabled; private String pushMode; private Integer pushThresholdCount; private Integer pushWindowSeconds; private Integer pushCooldownSeconds; private String pushRouteKey;}13.6 任务创建服务
@Service@Slf4jpublic class AiTaskService {
@Resource private AiTaskRepository taskRepository;
@Resource private CameraDeviceRepository cameraRepository;
@Resource private StreamPathRepository streamPathRepository;
@Transactional(rollbackFor = Exception.class) public AiTask createTask(AiTaskUpsertRequest request) { // 1. 参数规范化 normalizeRequest(request);
// 2. 查询相机信息 CameraDevice camera = cameraRepository.findById(request.getCameraId()) .orElseThrow(() -> new NotFoundException("相机不存在"));
// 3. 解析流路径 StreamPath streamPath = resolveStreamPath(request);
// 4. 构建任务实体 AiTask task = new AiTask(); BeanUtils.copyProperties(request, task);
task.setCameraName(camera.getCameraName()); task.setGroupId(camera.getGroupId()); task.setStreamPathName(streamPath.getPathName()); task.setRtspUrl(streamPath.getSourceUrl()); task.setStatus(0); // 初始停止状态
// 5. 保存 taskRepository.save(task);
log.info("Task created: id={}, camera={}, scenes={}", task.getId(), task.getCameraId(), task.getSceneCodes());
return task; }
private void normalizeRequest(AiTaskUpsertRequest request) { // 场景编码去重 if (request.getSceneCodes() != null) { request.setSceneCodes( request.getSceneCodes().stream() .filter(StringUtils::isNotBlank) .distinct() .collect(Collectors.toList()) ); }
// 默认值设置 if (request.getInputFps() == null) request.setInputFps(15); if (request.getOutputFps() == null) request.setOutputFps(request.getInputFps()); if (request.getOutputWidth() == null) request.setOutputWidth(1280); if (request.getOutputHeight() == null) request.setOutputHeight(720); if (request.getOutputBitrate() == null) request.setOutputBitrate("3000k"); if (request.getProcessQuality() == null) request.setProcessQuality("MEDIUM"); if (request.getDispatchMode() == null) request.setDispatchMode("MIXED"); if (request.getConfThreshold() == null) request.setConfThreshold(0.6f); if (request.getAlarmEnabled() == null) request.setAlarmEnabled(1); if (request.getSnapshotPolicy() == null) request.setSnapshotPolicy("ALARM"); if (request.getSnapshotQuality() == null) request.setSnapshotQuality(80); }
private StreamPath resolveStreamPath(AiTaskUpsertRequest request) { if (request.getStreamPathId() != null) { return streamPathRepository.findById(request.getStreamPathId()) .orElseThrow(() -> new NotFoundException("流路径不存在")); }
// 按类型查找 Integer subtype = (request.getStreamType() != null && request.getStreamType() == 1) ? 1 : 0;
return streamPathRepository.findByCameraIdAndSubtype(request.getCameraId(), subtype) .orElseThrow(() -> new NotFoundException("未找到流路径")); }}14. 算法节点管理与调度
14.1 算法节点表
DROP TABLE IF EXISTS `ai_engine_node`;CREATE TABLE `ai_engine_node` ( `id` BIGINT NOT NULL AUTO_INCREMENT, `node_id` VARCHAR(100) NOT NULL COMMENT '节点唯一标识', `node_name` VARCHAR(100) DEFAULT NULL COMMENT '节点名称', `ip` VARCHAR(64) NOT NULL COMMENT 'IP地址', `port` INT DEFAULT NULL COMMENT '端口', `status` TINYINT DEFAULT 0 COMMENT '1在线 0离线 2异常', `last_heartbeat_time` DATETIME DEFAULT NULL COMMENT '最后心跳时间',
-- 能力信息 `scene_name_json` JSON DEFAULT NULL COMMENT '支持场景名称', `scene_code_json` JSON DEFAULT NULL COMMENT '支持场景编码',
-- 负载信息 `load_info` JSON DEFAULT NULL COMMENT '负载信息', `max_tasks` INT DEFAULT 50 COMMENT '最大任务数',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP, `update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`), UNIQUE KEY `uk_node_id` (`node_id`), KEY `idx_status_heartbeat` (`status`, `last_heartbeat_time`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='算法节点表';JSON 字段示例:
{ "scene_code_json": ["REGION_INTRUSION", "HELMET_DETECTION", "FIRE_DETECTION"], "load_info": { "cpu": 0.42, "gpu": 0.68, "memory": 0.55, "tasks": 12 }}14.2 任务运行态表
DROP TABLE IF EXISTS `ai_task_runtime`;CREATE TABLE `ai_task_runtime` ( `task_id` BIGINT NOT NULL COMMENT '任务ID', `node_id` VARCHAR(100) NOT NULL COMMENT '主节点ID', `state` VARCHAR(20) NOT NULL COMMENT 'RUNNING/STOPPED/ERROR', `start_time` DATETIME DEFAULT NULL COMMENT '启动时间', `last_keepalive` DATETIME DEFAULT NULL COMMENT '最后心跳', `restart_count` INT DEFAULT 0 COMMENT '重启次数', `preview_path` VARCHAR(255) DEFAULT NULL COMMENT '预览路径', `preview_play_url` VARCHAR(512) DEFAULT NULL COMMENT '预览播放地址', `update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`task_id`), KEY `idx_node_state` (`node_id`, `state`), KEY `idx_keepalive` (`last_keepalive`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='任务运行态表';14.3 场景运行态表
DROP TABLE IF EXISTS `ai_task_scene_runtime`;CREATE TABLE `ai_task_scene_runtime` ( `task_id` BIGINT NOT NULL COMMENT '任务ID', `scene_code` VARCHAR(50) NOT NULL COMMENT '场景编码', `node_id` VARCHAR(100) DEFAULT NULL COMMENT '执行节点ID', `state` VARCHAR(20) NOT NULL COMMENT 'RUNNING/STOPPED/ERROR', `start_time` DATETIME DEFAULT NULL COMMENT '启动时间', `last_keepalive` DATETIME DEFAULT NULL COMMENT '最后心跳', `restart_count` INT DEFAULT 0 COMMENT '重启次数', `error_message` VARCHAR(500) DEFAULT NULL COMMENT '错误消息', `update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`task_id`, `scene_code`), KEY `idx_node_state` (`node_id`, `state`), KEY `idx_keepalive` (`last_keepalive`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='场景运行态表';为什么需要两层运行态?
-
任务级运行态 (
ai_task_runtime)- 记录任务整体状态
- 绑定主节点 (用于预览)
- 统计全局心跳
-
场景级运行态 (
ai_task_scene_runtime)- 支持场景拆分到不同节点
- 独立监控每个场景状态
- 精准故障定位
14.4 调度策略实现
@Service@Slf4jpublic class TaskDispatchService {
@Resource private EngineNodeService nodeService;
/** * 生成调度计划 */ public DispatchPlan buildDispatchPlan(AiTask task, List<String> sceneCodes) { String mode = task.getDispatchMode(); if (mode == null) mode = "MIXED";
Map<String, List<String>> nodeToScenes = new LinkedHashMap<>();
switch (mode) { case "FIXED": nodeToScenes = dispatchFixed(task, sceneCodes); break; case "AUTO_SINGLE": nodeToScenes = dispatchAutoSingle(sceneCodes); break; case "AUTO_SPLIT": nodeToScenes = dispatchAutoSplit(sceneCodes); break; case "MIXED": default: nodeToScenes = dispatchMixed(sceneCodes); break; }
String primaryNodeId = selectPrimaryNode(nodeToScenes);
return new DispatchPlan(nodeToScenes, primaryNodeId); }
/** * FIXED: 指定节点运行全部场景 */ private Map<String, List<String>> dispatchFixed(AiTask task, List<String> sceneCodes) { if (task.getPreferNodeId() == null) { throw new BusinessException("FIXED模式必须指定preferNodeId"); }
String nodeId = nodeService.getNodeIdById(task.getPreferNodeId()); if (nodeId == null) { throw new BusinessException("指定节点不存在"); }
if (!nodeService.supportsAllSceneCodes(nodeId, sceneCodes)) { throw new BusinessException("指定节点不支持全部场景"); }
Map<String, List<String>> result = new HashMap<>(); result.put(nodeId, new ArrayList<>(sceneCodes)); return result; }
/** * AUTO_SINGLE: 自动选择单节点运行全部场景 */ private Map<String, List<String>> dispatchAutoSingle(List<String> sceneCodes) { String nodeId = nodeService.selectBestNodeForSceneCodes(sceneCodes);
if (nodeId == null) { throw new BusinessException("未找到支持全部场景的节点"); }
Map<String, List<String>> result = new HashMap<>(); result.put(nodeId, new ArrayList<>(sceneCodes)); return result; }
/** * AUTO_SPLIT: 按场景拆分到不同节点 */ private Map<String, List<String>> dispatchAutoSplit(List<String> sceneCodes) { Map<String, List<String>> nodeToScenes = new LinkedHashMap<>();
for (String sceneCode : sceneCodes) { String nodeId = nodeService.selectBestNodeForSceneCode(sceneCode);
if (nodeId == null) { throw new BusinessException("未找到支持场景的节点: " + sceneCode); }
nodeToScenes.computeIfAbsent(nodeId, k -> new ArrayList<>()).add(sceneCode); }
return nodeToScenes; }
/** * MIXED: 优先单节点,失败则拆分 */ private Map<String, List<String>> dispatchMixed(List<String> sceneCodes) { String singleNodeId = nodeService.selectBestNodeForSceneCodes(sceneCodes);
if (singleNodeId != null) { Map<String, List<String>> result = new HashMap<>(); result.put(singleNodeId, new ArrayList<>(sceneCodes)); return result; }
log.info("No single node available, fallback to split mode"); return dispatchAutoSplit(sceneCodes); }
/** * 选择主节点 (用于预览) */ private String selectPrimaryNode(Map<String, List<String>> nodeToScenes) { // 选择场景数量最多的节点作为主节点 return nodeToScenes.entrySet().stream() .max(Comparator.comparingInt(e -> e.getValue().size())) .map(Map.Entry::getKey) .orElse(null); }}
@Data@AllArgsConstructorclass DispatchPlan { private Map<String, List<String>> nodeToScenes; private String primaryNodeId;}14.5 节点能力匹配
@Service@Slf4jpublic class EngineNodeService {
@Resource private EngineNodeRepository nodeRepository;
@Resource private TaskSceneRuntimeRepository sceneRuntimeRepository;
/** * 选择最佳节点 (支持全部场景) */ public String selectBestNodeForSceneCodes(List<String> sceneCodes) { List<EngineNode> candidates = getCandidateNodes(sceneCodes);
if (candidates.isEmpty()) { return null; }
// 按负载升序排序 return candidates.stream() .min(Comparator.comparingInt(this::getActiveTaskCount)) .map(EngineNode::getNodeId) .orElse(null); }
/** * 选择最佳节点 (支持单个场景) */ public String selectBestNodeForSceneCode(String sceneCode) { return selectBestNodeForSceneCodes(Collections.singletonList(sceneCode)); }
/** * 获取候选节点 */ public List<EngineNode> getCandidateNodes(List<String> sceneCodes) { List<EngineNode> allNodes = nodeRepository.findByStatus(1);
return allNodes.stream() .filter(node -> supportsAllSceneCodes(node.getNodeId(), sceneCodes)) .filter(node -> !isOverloaded(node)) .collect(Collectors.toList()); }
/** * 判断节点是否支持全部场景 */ public boolean supportsAllSceneCodes(String nodeId, List<String> sceneCodes) { EngineNode node = nodeRepository.findByNodeId(nodeId); if (node == null) return false;
List<String> supportedCodes = parseSceneCodes(node.getSceneCodeJson()); return new HashSet<>(supportedCodes).containsAll(sceneCodes); }
/** * 获取节点当前运行任务数 */ private int getActiveTaskCount(EngineNode node) { return sceneRuntimeRepository.countByNodeIdAndState(node.getNodeId(), "RUNNING"); }
/** * 判断节点是否过载 */ private boolean isOverloaded(EngineNode node) { int activeCount = getActiveTaskCount(node); return activeCount >= node.getMaxTasks(); }
private List<String> parseSceneCodes(String json) { if (json == null || json.isEmpty()) { return Collections.emptyList(); }
try { return JSON.parseArray(json, String.class); } catch (Exception e) { log.error("Failed to parse scene codes: {}", json, e); return Collections.emptyList(); } }}15. WebSocket 协议规范
15.1 协议设计原则
- 消息类型明确:每条消息包含
type字段 - 双向通信:平台->算法,算法->平台
- 幂等性:重复接收相同消息不产生副作用
- 容错性:未知消息类型不中断连接
15.2 节点注册消息
算法节点 -> 平台:
{ "type": "NODE_REGISTER", "node_id": "gpu-node-01", "node_name": "GPU-Server-01", "ip": "10.0.10.21", "port": 18080, "max_tasks": 50, "algorithms": ["REGION_INTRUSION", "HELMET_DETECTION", "FIRE_DETECTION"], "load": { "cpu": 0.21, "gpu": 0.64, "memory": 0.55, "tasks": 6 }}平台处理:
@ServerEndpoint("/ws/ai-engine")@Componentpublic class AiEngineSocket {
private static EngineNodeService nodeService;
private String sessionNodeId;
@OnMessage public void onMessage(String message, Session session) { try { JSONObject msg = JSON.parseObject(message); String type = msg.getString("type");
switch (type) { case "NODE_REGISTER": handleRegister(msg, session); break; case "NODE_HEARTBEAT": handleHeartbeat(msg); break; case "TASK_KEEPALIVE": handleTaskKeepalive(msg); break; case "AI_RESULT": handleAiResult(msg); break; default: log.warn("Unknown message type: {}", type); } } catch (Exception e) { log.error("Message processing failed", e); } }
private void handleRegister(JSONObject msg, Session session) { String nodeId = msg.getString("node_id");
// 会话绑定 this.sessionNodeId = nodeId; session.getUserProperties().put("nodeId", nodeId);
// 更新节点状态 nodeService.upsertNode(msg);
log.info("Node registered: {}", nodeId); }
@OnClose public void onClose(Session session) { if (sessionNodeId != null) { nodeService.markOffline(sessionNodeId); log.info("Node disconnected: {}", sessionNodeId); } }}15.3 任务控制消息
平台 -> 算法节点 (启动任务):
{ "type": "START_TASK", "data": { "task_id": 1001, "camera_id": 3001, "scene_codes": ["REGION_INTRUSION", "HELMET_DETECTION"], "stream_key": "3001:0", "rtsp": "rtsp://admin:pass@10.0.1.20:554/Streaming/Channels/101", "confThreshold": 0.6, "region": [[0.1,0.1],[0.9,0.1],[0.9,0.9],[0.1,0.9]], "params": { "input_fps": 15, "in_w": 1280, "in_h": 720, "result_fps": 8, "quality": "MEDIUM", "alarm_enabled": 1, "snapshot_policy": "ALARM", "snapshot_quality": 80 } }}平台 -> 算法节点 (停止任务):
{ "type": "STOP_TASK", "data": { "task_id": 1001 }}15.4 结果上报消息
算法节点 -> 平台 (单场景结果):
{ "type": "AI_RESULT", "task_id": 1001, "camera_id": 3001, "scene_code": "REGION_INTRUSION", "ts": 1739426400000, "frame_width": 1920, "frame_height": 1080, "alarm_event": true, "confidence": 0.91, "objects": [ { "label": "person", "score": 0.91, "bbox": [0.1, 0.2, 0.3, 0.4] } ], "snapshot_path": "/static/snapshots/task_1001/2026-02-13-14-30-00.jpg"}算法节点 -> 平台 (多场景结果):
{ "type": "AI_RESULT", "task_id": 1001, "camera_id": 3001, "ts": 1739426400000, "frame_width": 1920, "frame_height": 1080, "snapshot_path": "/static/snapshots/task_1001/2026-02-13-14-30-00.jpg", "results": [ { "scene_code": "REGION_INTRUSION", "alarm_event": true, "confidence": 0.91, "objects": [...] }, { "scene_code": "HELMET_DETECTION", "alarm_event": false, "confidence": 0.87, "objects": [...] } ]}15.5 预览控制消息
平台 -> 算法节点 (开始预览):
{ "type": "START_PREVIEW", "data": { "task_id": 1001, "preview_path": "ai_task_1001", "fps": 15, "draw": true, "encoder": "h264_nvenc", "bitrate": "3000k", "preview_mode": "RESULT_DRIVEN", "width": 1280, "height": 720 }}平台 -> 算法节点 (停止预览):
{ "type": "STOP_PREVIEW", "data": { "task_id": 1001 }}16. 结果处理与告警推送
16.1 场景最新态表
DROP TABLE IF EXISTS `ai_task_scene_state`;CREATE TABLE `ai_task_scene_state` ( `task_id` BIGINT NOT NULL COMMENT '任务ID', `camera_id` BIGINT NOT NULL COMMENT '相机ID', `scene_code` VARCHAR(50) NOT NULL COMMENT '场景编码', `scene_type_id` BIGINT DEFAULT NULL COMMENT '场景类型ID',
-- 最新结果 `last_ts` BIGINT NOT NULL COMMENT '最新时间戳', `last_time` DATETIME NOT NULL COMMENT '最新时间', `event_type` VARCHAR(20) DEFAULT NULL COMMENT '事件类型', `event_desc` VARCHAR(255) DEFAULT NULL COMMENT '事件描述', `alarm_event` TINYINT DEFAULT 0 COMMENT '是否告警', `total_count` INT DEFAULT 0 COMMENT '检测总数',
-- 统计信息 `label_counts_json` JSON DEFAULT NULL COMMENT '标签计数', `problem_counts_json` JSON DEFAULT NULL COMMENT '问题计数', `metrics_json` JSON DEFAULT NULL COMMENT '指标信息', `last_snapshot_path` VARCHAR(500) DEFAULT NULL COMMENT '最新快照',
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`task_id`, `scene_code`), KEY `idx_camera_alarm` (`camera_id`, `alarm_event`), KEY `idx_update_time` (`update_time`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='场景最新态表';JSON 字段示例:
{ "label_counts_json": { "person": 3, "vehicle": 1 }, "problem_counts_json": { "no_helmet": 2 }, "metrics_json": { "avg_confidence": 0.87, "max_confidence": 0.94 }}16.2 检测结果表
DROP TABLE IF EXISTS `ai_detection_result`;CREATE TABLE `ai_detection_result` ( `id` BIGINT NOT NULL AUTO_INCREMENT, `task_id` BIGINT NOT NULL COMMENT '任务ID', `camera_id` BIGINT NOT NULL COMMENT '相机ID', `group_id` BIGINT DEFAULT NULL COMMENT '分组ID', `scene_type_id` BIGINT DEFAULT NULL COMMENT '场景类型ID', `scene_code` VARCHAR(50) DEFAULT NULL COMMENT '场景编码', `node_id` VARCHAR(100) DEFAULT NULL COMMENT '节点ID',
-- 检测时间 `detect_time` DATETIME NOT NULL COMMENT '检测时间', `detect_timestamp` BIGINT NOT NULL COMMENT '检测时间戳',
-- 告警信息 `is_alarm` TINYINT DEFAULT 0 COMMENT '是否告警', `alarm_level` TINYINT DEFAULT 0 COMMENT '告警等级', `confidence` FLOAT DEFAULT NULL COMMENT '置信度', `detect_count` INT DEFAULT 0 COMMENT '检测数量',
-- 检测结果 `objects_json` JSON DEFAULT NULL COMMENT '检测对象', `event_type` VARCHAR(20) DEFAULT NULL COMMENT '事件类型', `event_desc` VARCHAR(255) DEFAULT NULL COMMENT '事件描述', `total_count` INT DEFAULT NULL COMMENT '总数', `label_counts_json` JSON DEFAULT NULL COMMENT '标签计数', `problem_counts_json` JSON DEFAULT NULL COMMENT '问题计数', `metrics_json` JSON DEFAULT NULL COMMENT '指标',
-- 快照 `snapshot_path` VARCHAR(500) DEFAULT NULL COMMENT '快照路径',
-- 推送状态 `is_pushed` TINYINT DEFAULT 0 COMMENT '是否已推送', `push_time` DATETIME DEFAULT NULL COMMENT '推送时间',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`), KEY `idx_task_time` (`task_id`, `detect_time`), KEY `idx_alarm_scene_group_time` (`is_alarm`, `scene_code`, `group_id`, `detect_time`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='检测结果表';16.3 结果处理流水线
@Service@Slf4jpublic class AiResultProcessorService {
@Resource private TaskSceneStateService sceneStateService;
@Resource private DetectionResultService detectionResultService;
@Resource private AlarmPushService alarmPushService;
@Resource private WebNotifySocket webNotifySocket;
private final BlockingQueue<JSONObject> resultQueue = new LinkedBlockingQueue<>(20000);
private final ExecutorService processExecutor;
// 抽样控制 private final ConcurrentHashMap<String, Long> sampleCache = new ConcurrentHashMap<>();
@Value("${ai.result.sample-interval-ms:1000}") private long sampleIntervalMs;
public AiResultProcessorService() { int coreThreads = Runtime.getRuntime().availableProcessors(); this.processExecutor = new ThreadPoolExecutor( coreThreads, coreThreads * 2, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), new ThreadFactoryBuilder().setNameFormat("result-processor-%d").build(), new ThreadPoolExecutor.CallerRunsPolicy() );
// 启动处理线程 for (int i = 0; i < coreThreads; i++) { processExecutor.submit(this::processLoop); } }
/** * 提交结果到队列 */ public void submit(JSONObject resultMsg) { boolean alarm = resultMsg.getBooleanValue("alarm_event");
boolean offered = resultQueue.offer(resultMsg);
if (!offered) { if (!alarm) { // 非告警直接丢弃 log.warn("Result queue full, dropping non-alarm event"); return; }
// 告警消息强插 int tries = 0; while (!offered && tries++ < 10) { resultQueue.poll(); // 淘汰队头 offered = resultQueue.offer(resultMsg); }
if (!offered) { log.error("Failed to enqueue alarm event after retries"); } } }
/** * 处理循环 */ private void processLoop() { while (!Thread.currentThread().isInterrupted()) { try { JSONObject msg = resultQueue.poll(1, TimeUnit.SECONDS); if (msg != null) { processResult(msg); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } catch (Exception e) { log.error("Result processing error", e); } } }
/** * 处理单条结果 */ private void processResult(JSONObject msg) { Long taskId = msg.getLong("task_id"); Long cameraId = msg.getLong("camera_id"); Long ts = msg.getLong("ts"); String snapshotPath = msg.getString("snapshot_path");
// 兼容单场景和多场景 JSONArray resultsArray = msg.getJSONArray("results");
if (resultsArray != null && !resultsArray.isEmpty()) { // 多场景结果 for (int i = 0; i < resultsArray.size(); i++) { JSONObject sceneResult = resultsArray.getJSONObject(i); processSceneResult(taskId, cameraId, ts, snapshotPath, sceneResult); } } else { // 单场景结果 processSceneResult(taskId, cameraId, ts, snapshotPath, msg); } }
private void processSceneResult(Long taskId, Long cameraId, Long ts, String snapshotPath, JSONObject sceneResult) {
String sceneCode = sceneResult.getString("scene_code"); Boolean alarmEvent = sceneResult.getBoolean("alarm_event");
// 1. 更新最新态 (总是执行) sceneStateService.updateState(taskId, cameraId, sceneCode, ts, sceneResult);
// 2. 判断是否需要持久化 if (StringUtils.isBlank(snapshotPath)) { // 无快照,仅更新状态 return; }
if (!Boolean.TRUE.equals(alarmEvent)) { // 非告警,抽样 if (!shouldSample(taskId, sceneCode, ts)) { return; } }
// 3. 持久化结果 DetectionResult result = buildDetectionResult( taskId, cameraId, sceneCode, ts, snapshotPath, sceneResult );
detectionResultService.save(result);
// 4. 告警推送 if (Boolean.TRUE.equals(alarmEvent)) { alarmPushService.onAlarmDetected(result, sceneCode); } }
/** * 抽样判断 */ private boolean shouldSample(Long taskId, String sceneCode, Long ts) { String key = taskId + ":" + sceneCode; Long lastTs = sampleCache.get(key);
if (lastTs == null || (ts - lastTs) >= sampleIntervalMs) { sampleCache.put(key, ts); return true; }
return false; }
private DetectionResult buildDetectionResult(Long taskId, Long cameraId, String sceneCode, Long ts, String snapshotPath, JSONObject sceneResult) {
DetectionResult result = new DetectionResult(); result.setTaskId(taskId); result.setCameraId(cameraId); result.setSceneCode(sceneCode); result.setDetectTimestamp(ts); result.setDetectTime(LocalDateTime.ofInstant( Instant.ofEpochMilli(ts), ZoneId.systemDefault() )); result.setIsAlarm(sceneResult.getBoolean("alarm_event") ? 1 : 0); result.setConfidence(sceneResult.getFloat("confidence")); result.setObjectsJson(sceneResult.getJSONArray("objects").toJSONString()); result.setSnapshotPath(snapshotPath); result.setEventType(sceneResult.getString("event_type")); result.setEventDesc(sceneResult.getString("event_desc")); result.setTotalCount(sceneResult.getInteger("total_count"));
return result; }}处理流程图:
[算法结果] → [快路径: WebSocket推送前端] → [实时框显示] ↓ [慢路径] ↓ [抽样判断] ↓ [有快照?] ──NO→ [仅更新最新态] │ YES ↓ [告警?] ──NO→ [抽样入库] │ YES ↓ [入库] → [告警推送]16.4 告警推送服务
@Service@Slf4jpublic class AlarmPushService {
@Resource private AiTaskRepository taskRepository;
@Resource private AlarmPushConfigService configService;
@Resource private AlarmPushLogRepository pushLogRepository;
private final BlockingQueue<PushTask> pushQueue = new LinkedBlockingQueue<>(20000);
private final ExecutorService pushExecutor;
// 推送桶 (用于聚合推送) private final ConcurrentHashMap<String, PushBucket> buckets = new ConcurrentHashMap<>();
public AlarmPushService() { this.pushExecutor = Executors.newFixedThreadPool(4, new ThreadFactoryBuilder().setNameFormat("alarm-push-%d").build());
// 启动推送线程 for (int i = 0; i < 4; i++) { pushExecutor.submit(this::pushLoop); }
// 启动桶刷新线程 Executors.newSingleThreadScheduledExecutor() .scheduleAtFixedRate(this::flushBuckets, 1, 1, TimeUnit.SECONDS); }
public void onAlarmDetected(DetectionResult result, String sceneCode) { AiTask task = taskRepository.findById(result.getTaskId()).orElse(null); if (task == null) return;
// 获取推送策略 PushStrategy strategy = resolvePushStrategy(task);
if ("IMMEDIATE".equals(strategy.getMode())) { // 立即推送 pushQueue.offer(new PushTask(result, task, sceneCode)); } else { // 聚合推送 addToBucket(result, task, sceneCode, strategy); } }
private void addToBucket(DetectionResult result, AiTask task, String sceneCode, PushStrategy strategy) {
String bucketKey = task.getId() + ":" + sceneCode;
PushBucket bucket = buckets.computeIfAbsent(bucketKey, k -> new PushBucket(task, sceneCode, strategy));
bucket.add(result); }
private void flushBuckets() { long now = System.currentTimeMillis();
buckets.values().forEach(bucket -> { if (bucket.shouldFlush(now)) { List<DetectionResult> results = bucket.flush(); if (!results.isEmpty()) { pushQueue.offer(new PushTask(results, bucket.task, bucket.sceneCode)); } } }); }
private void pushLoop() { while (!Thread.currentThread().isInterrupted()) { try { PushTask task = pushQueue.poll(1, TimeUnit.SECONDS); if (task != null) { executePush(task); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } catch (Exception e) { log.error("Push execution error", e); } } }
private void executePush(PushTask task) { try { // 构建推送内容 String content = buildPushContent(task);
// 调用飞书/企业微信等 API PushResponse response = sendToPushService(task.task.getPushRouteKey(), content);
// 记录日志 AlarmPushLog log = new AlarmPushLog(); log.setConfigId(0L); log.setDetectionResultIds(JSON.toJSONString( task.results.stream().map(DetectionResult::getId).collect(Collectors.toList()) )); log.setPushChannel("feishu"); log.setPushTarget(task.task.getPushRouteKey()); log.setPushContent(content); log.setPushStatus(response.isSuccess() ? 1 : 0); log.setResponseCode(response.getCode()); log.setResponseMessage(response.getMessage()); log.setPushTime(LocalDateTime.now());
pushLogRepository.save(log);
// 更新结果推送状态 if (response.isSuccess()) { task.results.forEach(r -> { r.setIsPushed(1); r.setPushTime(LocalDateTime.now()); }); }
} catch (Exception e) { log.error("Push failed for task {}", task.task.getId(), e); } }
private String buildPushContent(PushTask task) { StringBuilder content = new StringBuilder(); content.append("【告警通知】\n"); content.append("相机: ").append(task.task.getCameraName()).append("\n"); content.append("场景: ").append(task.sceneCode).append("\n"); content.append("时间: ").append(LocalDateTime.now().format( DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss") )).append("\n"); content.append("检测数量: ").append(task.results.size()).append("\n");
return content.toString(); }
private PushResponse sendToPushService(String routeKey, String content) { // 实际推送逻辑 // 这里简化处理 return new PushResponse(true, 200, "success"); }
private PushStrategy resolvePushStrategy(AiTask task) { // 简化实现 return new PushStrategy( task.getPushMode() != null ? task.getPushMode() : "IMMEDIATE", task.getPushThresholdCount() != null ? task.getPushThresholdCount() : 1, task.getPushWindowSeconds() != null ? task.getPushWindowSeconds() : 60, task.getPushCooldownSeconds() != null ? task.getPushCooldownSeconds() : 0 ); }}
@Data@AllArgsConstructorclass PushTask { private List<DetectionResult> results; private AiTask task; private String sceneCode;
public PushTask(DetectionResult result, AiTask task, String sceneCode) { this.results = Collections.singletonList(result); this.task = task; this.sceneCode = sceneCode; }}
@Dataclass PushBucket { private final AiTask task; private final String sceneCode; private final PushStrategy strategy; private final List<DetectionResult> results = new CopyOnWriteArrayList<>(); private long firstAddTime; private long lastFlushTime;
public PushBucket(AiTask task, String sceneCode, PushStrategy strategy) { this.task = task; this.sceneCode = sceneCode; this.strategy = strategy; this.firstAddTime = System.currentTimeMillis(); this.lastFlushTime = System.currentTimeMillis(); }
public void add(DetectionResult result) { results.add(result); if (firstAddTime == 0) { firstAddTime = System.currentTimeMillis(); } }
public boolean shouldFlush(long now) { if (results.isEmpty()) return false;
// 冷却期检查 if (now - lastFlushTime < strategy.getCooldownSeconds() * 1000L) { return false; }
String mode = strategy.getMode();
if ("COUNT".equals(mode)) { return results.size() >= strategy.getThresholdCount(); }
if ("WINDOW".equals(mode)) { return (now - firstAddTime) >= strategy.getWindowSeconds() * 1000L; }
if ("COUNT_OR_WINDOW".equals(mode)) { return results.size() >= strategy.getThresholdCount() || (now - firstAddTime) >= strategy.getWindowSeconds() * 1000L; }
return false; }
public List<DetectionResult> flush() { List<DetectionResult> flushed = new ArrayList<>(results); results.clear(); firstAddTime = 0; lastFlushTime = System.currentTimeMillis(); return flushed; }}
@Data@AllArgsConstructorclass PushStrategy { private String mode; private int thresholdCount; private int windowSeconds; private int cooldownSeconds;}
@Data@AllArgsConstructorclass PushResponse { private boolean success; private int code; private String message;}推送模式说明:
| 模式 | 触发条件 | 适用场景 |
|---|---|---|
| IMMEDIATE | 每次告警立即推送 | 高优先级告警 |
| COUNT | 累计N次后推送 | 降低推送频率 |
| WINDOW | 时间窗口到期推送 | 定期汇总 |
| COUNT_OR_WINDOW | 满足任一条件 | 兼顾实时性和聚合 |
17. 预览链路实现
17.1 预览设计原则
- 任务运行 ≠ 预览推流
- 任务可以运行但不推预览流
- 预览流可以按需启停
- 预览资源独立管理
- 预览占用编码器资源
- 需要单独的生命周期控制
- 预览地址动态生成
- 节点切换后地址需更新
- 存储在运行态表
17.2 预览控制服务
@Service@Slf4jpublic class PreviewControlService {
@Resource private TaskRuntimeRepository runtimeRepository;
@Resource private EngineNodeRepository nodeRepository;
@Resource private AiEngineSocket engineSocket;
/** * 开始预览 */ @Transactional(rollbackFor = Exception.class) public void startPreview(StartPreviewRequest request) { // 1. 查询运行态 TaskRuntime runtime = runtimeRepository.findById(request.getTaskId()) .orElseThrow(() -> new NotFoundException("任务未运行"));
if (!"RUNNING".equals(runtime.getState())) { throw new BusinessException("任务未运行,无法开启预览"); }
// 2. 构建预览路径 String previewPath = request.getPreviewPath(); if (StringUtils.isBlank(previewPath)) { previewPath = "ai_task_" + request.getTaskId(); }
// 3. 获取节点IP EngineNode node = nodeRepository.findByNodeId(runtime.getNodeId()); if (node == null) { throw new BusinessException("节点不存在"); }
// 4. 构建播放地址 String playUrl = String.format("http://%s:8889/%s", node.getIp(), previewPath);
// 5. 发送预览命令 JSONObject cmd = new JSONObject(); cmd.put("type", "START_PREVIEW");
JSONObject data = new JSONObject(); data.put("task_id", request.getTaskId()); data.put("preview_path", previewPath); data.put("fps", request.getFps() != null ? request.getFps() : 15); data.put("bitrate", request.getBitrate() != null ? request.getBitrate() : "3000k"); data.put("width", request.getWidth() != null ? request.getWidth() : 1280); data.put("height", request.getHeight() != null ? request.getHeight() : 720); data.put("draw", request.getDraw() != null ? request.getDraw() : true); data.put("encoder", "h264_nvenc"); data.put("preview_mode", "RESULT_DRIVEN");
cmd.put("data", data);
engineSocket.sendCommand(runtime.getNodeId(), cmd);
// 6. 更新运行态 runtime.setPreviewPath(previewPath); runtime.setPreviewPlayUrl(playUrl); runtimeRepository.updateById(runtime);
log.info("Preview started: task={}, path={}, url={}", request.getTaskId(), previewPath, playUrl); }
/** * 停止预览 */ @Transactional(rollbackFor = Exception.class) public void stopPreview(Long taskId) { TaskRuntime runtime = runtimeRepository.findById(taskId) .orElseThrow(() -> new NotFoundException("任务运行态不存在"));
if (StringUtils.isBlank(runtime.getPreviewPath())) { log.warn("Preview not started for task {}", taskId); return; }
// 发送停止命令 JSONObject cmd = new JSONObject(); cmd.put("type", "STOP_PREVIEW");
JSONObject data = new JSONObject(); data.put("task_id", taskId); cmd.put("data", data);
engineSocket.sendCommand(runtime.getNodeId(), cmd);
// 清除运行态 runtime.setPreviewPath(null); runtime.setPreviewPlayUrl(null); runtimeRepository.updateById(runtime);
log.info("Preview stopped: task={}", taskId); }
/** * 自动开启预览 */ public void autoStartPreview(Long taskId, String nodeId, String previewPath) { try { StartPreviewRequest request = new StartPreviewRequest(); request.setTaskId(taskId); request.setPreviewPath(previewPath); request.setFps(15); request.setBitrate("3000k"); request.setDraw(true);
startPreview(request); } catch (Exception e) { log.error("Auto start preview failed for task {}", taskId, e); } }}
@Dataclass StartPreviewRequest { private Long taskId; private String previewPath; private Integer fps; private String bitrate; private Integer width; private Integer height; private Boolean draw;}17.3 任务启停与预览联动
@Service@Slf4jpublic class AiTaskService {
@Resource private TaskDispatchService dispatchService;
@Resource private TaskRuntimeService runtimeService;
@Resource private TaskSceneRuntimeService sceneRuntimeService;
@Resource private PreviewControlService previewService;
@Resource private AiEngineSocket engineSocket;
@Transactional(rollbackFor = Exception.class) public void startTask(Long taskId) { AiTask task = taskRepository.findById(taskId) .orElseThrow(() -> new NotFoundException("任务不存在"));
// 1. 防重复启动 TaskRuntime runtime = runtimeService.findById(taskId); if (runtime != null && isAliveRunning(runtime)) { log.warn("Task {} already running", taskId); return; }
// 2. 解析场景编码 List<String> sceneCodes = parseSceneCodes(task.getSceneCodes()); if (sceneCodes.isEmpty()) { throw new BusinessException("任务未配置场景"); }
// 3. 生成调度计划 DispatchPlan plan = dispatchService.buildDispatchPlan(task, sceneCodes);
// 4. 创建运行态 LocalDateTime now = LocalDateTime.now(); int restartCount = (runtime != null) ? runtime.getRestartCount() + 1 : 0;
runtimeService.upsertRunning(taskId, plan.getPrimaryNodeId(), now, restartCount);
// 5. 启动任务 Set<String> startedNodes = new LinkedHashSet<>();
try { for (Map.Entry<String, List<String>> entry : plan.getNodeToScenes().entrySet()) { String nodeId = entry.getKey(); List<String> scopedScenes = entry.getValue();
// 发送启动命令 JSONObject cmd = buildStartCommand(task, scopedScenes); engineSocket.sendCommand(nodeId, cmd); startedNodes.add(nodeId);
// 更新场景运行态 for (String sceneCode : scopedScenes) { sceneRuntimeService.upsertRunning(taskId, sceneCode, nodeId, now, restartCount); } }
// 6. 更新任务主表 task.setStatus(1); task.setLastRunTime(now); taskRepository.updateById(task);
// 7. 自动开启预览 String previewPath = "ai_task_" + taskId; previewService.autoStartPreview(taskId, plan.getPrimaryNodeId(), previewPath);
log.info("Task started: id={}, nodes={}", taskId, plan.getNodeToScenes().keySet());
} catch (Exception e) { log.error("Task start failed: {}", taskId, e);
// 回滚: 停止已启动节点 for (String nodeId : startedNodes) { safeStopTask(nodeId, taskId); }
// 标记失败 runtimeService.markError(taskId, now); sceneRuntimeService.markErrorByTask(taskId, now, "START_TASK failed: " + e.getMessage());
task.setStatus(2); task.setErrorMessage(e.getMessage()); taskRepository.updateById(task);
throw e; } }
@Transactional(rollbackFor = Exception.class) public void stopTask(Long taskId) { AiTask task = taskRepository.findById(taskId) .orElseThrow(() -> new NotFoundException("任务不存在"));
// 1. 停止预览 try { previewService.stopPreview(taskId); } catch (Exception e) { log.error("Stop preview failed for task {}", taskId, e); }
// 2. 查询运行节点 List<String> nodeIds = sceneRuntimeService.findNodeIdsByTaskId(taskId);
// 3. 发送停止命令 for (String nodeId : nodeIds) { try { JSONObject cmd = new JSONObject(); cmd.put("type", "STOP_TASK");
JSONObject data = new JSONObject(); data.put("task_id", taskId); cmd.put("data", data);
engineSocket.sendCommand(nodeId, cmd); } catch (Exception e) { log.error("Stop task failed on node {}", nodeId, e); } }
// 4. 更新运行态 runtimeService.markStopped(taskId, LocalDateTime.now()); sceneRuntimeService.markStoppedByTask(taskId, LocalDateTime.now());
// 5. 更新任务主表 task.setStatus(0); taskRepository.updateById(task);
log.info("Task stopped: id={}", taskId); }
private boolean isAliveRunning(TaskRuntime runtime) { if (!"RUNNING".equals(runtime.getState())) { return false; }
if (runtime.getLastKeepalive() == null) { return false; }
long elapsedSeconds = Duration.between( runtime.getLastKeepalive(), LocalDateTime.now() ).getSeconds();
return elapsedSeconds < 30; // 30秒超时 }
private JSONObject buildStartCommand(AiTask task, List<String> sceneCodes) { JSONObject cmd = new JSONObject(); cmd.put("type", "START_TASK");
JSONObject data = new JSONObject(); data.put("task_id", task.getId()); data.put("camera_id", task.getCameraId()); data.put("scene_codes", sceneCodes); data.put("stream_key", task.getCameraId() + ":" + task.getStreamType()); data.put("rtsp", task.getRtspUrl()); data.put("confThreshold", task.getConfThreshold());
// 区域 if (StringUtils.isNotBlank(task.getRegionJson())) { data.put("region", JSON.parseArray(task.getRegionJson())); }
// 参数 JSONObject params = new JSONObject(); params.put("input_fps", task.getInputFps()); params.put("in_w", task.getOutputWidth()); params.put("in_h", task.getOutputHeight()); params.put("result_fps", task.getOutputFps()); params.put("quality", task.getProcessQuality()); params.put("alarm_enabled", task.getAlarmEnabled()); params.put("snapshot_policy", task.getSnapshotPolicy()); params.put("snapshot_quality", task.getSnapshotQuality());
data.put("params", params); cmd.put("data", data);
return cmd; }
private void safeStopTask(String nodeId, Long taskId) { try { JSONObject cmd = new JSONObject(); cmd.put("type", "STOP_TASK");
JSONObject data = new JSONObject(); data.put("task_id", taskId); cmd.put("data", data);
engineSocket.sendCommand(nodeId, cmd); } catch (Exception e) { log.error("Safe stop failed: node={}, task={}", nodeId, taskId, e); } }
private List<String> parseSceneCodes(String json) { if (StringUtils.isBlank(json)) { return Collections.emptyList(); }
try { return JSON.parseArray(json, String.class); } catch (Exception e) { log.error("Failed to parse scene codes: {}", json, e); return Collections.emptyList(); } }}第四部分:运维与演进
18. 配置管理
18.1 应用配置模板
server: port: 8080
spring: datasource: url: jdbc:mysql://localhost:3306/video_analysis?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai username: root password: ${DB_PASSWORD} driver-class-name: com.mysql.cj.jdbc.Driver hikari: maximum-pool-size: 20 minimum-idle: 5 connection-timeout: 30000
# 相机配置camera: onboarding: async-enabled: true sync-timeout-ms: 15000 duplicate-check: by-ip-port: true
onvif: default-port: 80 connect-timeout-ms: 3000 read-timeout-ms: 5000 heartbeat-timeout-ms: 3000 soap: strict-fault-check: true allow-capability-fallback: true
stream: naming: "cam_{ip}_ch{channel}_{subtype}" subtype-rule: main-min-width: 1280 retry: register-max-attempts: 3 register-backoff-ms: 1000
health: check-interval-ms: 30000 initial-delay-ms: 10000 offline-alarm-threshold-sec: 180
# 编解码配置codec: probe: ffprobe-bin: ffprobe timeout-ms: 3000 max-concurrency: 8
transcode: enabled: true assume-h265-when-unknown: true ffmpeg-bin: ffmpeg rtsp-transport: tcp stimeout-us: 5000000 probesize: 5000000 analyzeduration: 5000000 x264: preset: veryfast crf: 23 gop: 50 enable-audio: false
# MediaMTX 配置media: mtx: api-base-url: http://127.0.0.1:9997 rtsp-port: 8554 webrtc-port: 8889 api: path-add: /v3/config/paths/add/ path-get: /v3/config/paths/get/ path-list: /v3/config/paths/list path-delete: /v3/config/paths/delete/
# 录像配置recording: default: retention-days: 7 segment-duration: 10m record-format: fmp4
index: fixed-delay-ms: 30000 scan-depth: 4 lookback-hours: 24
storage: refresh-interval-ms: 15000 auto-switch-enabled: true migrate-batch-size: 20
# 算法任务配置ai: task: keepalive-timeout-seconds: 30 scene-keepalive-timeout-seconds: 30 dedup-running-seconds: 10
result: queue: size: 20000 worker: min: 4 max: 16 sample-interval-ms: 1000 batch: enabled: false size: 200 flush-ms: 500
preview: auto-start: true default-fps: 15 default-width: 1280 default-height: 720 default-bitrate: 3000k default-encoder: h264_nvenc
push: queue-size: 20000 flush-ms: 1000 routes-file: push_routes.json level-policy-file: level_policy.json
# WebSocket 配置ws: ai-engine: endpoint: /ws/ai-engine web-notify: endpoint: /ws/web-notify18.2 MediaMTX 配置模板
################################################ Global settings
# APIapi: yesapiAddress: :9997
# Metricsmetrics: yesmetricsAddress: :9998
################################################ RTSP server
rtsp: yesrtspAddress: :8554protocols: [tcp]encryption: "no"rtspAddress: :8554
################################################ WebRTC server
webrtc: yeswebrtcAddress: :8889webrtcICEServers: - urls: [stun:stun.l.google.com:19302]
################################################ HLS server
hls: yeshlsAddress: :8888hlsAlwaysRemux: nohlsVariant: lowLatencyhlsSegmentCount: 7hlsSegmentDuration: 1shlsPartDuration: 200ms
################################################ Recording
record: norecordPath: /recordings/%path/%Y-%m-%d/%H-%M-%SrecordFormat: fmp4recordPartDuration: 1srecordSegmentDuration: 10mrecordDeleteAfter: 7d
################################################ Path defaults
paths: all: sourceOnDemand: yes sourceOnDemandStartTimeout: 10s sourceOnDemandCloseAfter: 10s runOnDemandStartTimeout: 10s runOnDemandCloseAfter: 10s18.3 Docker Compose 部署模板
version: "3.8"
services: mysql: image: mysql:8.0 container_name: video-mysql restart: always environment: MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD} MYSQL_DATABASE: video_analysis TZ: Asia/Shanghai ports: - "3306:3306" volumes: - ./data/mysql:/var/lib/mysql - ./init.sql:/docker-entrypoint-initdb.d/init.sql command: --default-authentication-plugin=mysql_native_password
mediamtx: image: bluenviron/mediamtx:latest container_name: video-mediamtx restart: always ports: - "8554:8554" # RTSP - "8888:8888" # HLS - "8889:8889" # WebRTC - "9997:9997" # API - "9998:9998" # Metrics volumes: - ./mediamtx.yml:/mediamtx.yml - ./data/recordings:/recordings environment: TZ: Asia/Shanghai
app: image: video-analysis-platform:latest container_name: video-app restart: always depends_on: - mysql - mediamtx ports: - "8080:8080" environment: SPRING_DATASOURCE_URL: jdbc:mysql://mysql:3306/video_analysis?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai SPRING_DATASOURCE_USERNAME: root SPRING_DATASOURCE_PASSWORD: ${MYSQL_ROOT_PASSWORD} MEDIA_MTX_API_BASE_URL: http://mediamtx:9997 TZ: Asia/Shanghai volumes: - ./data/snapshots:/app/snapshots - ./logs:/app/logs
nginx: image: nginx:alpine container_name: video-nginx restart: always ports: - "80:80" - "443:443" volumes: - ./nginx.conf:/etc/nginx/nginx.conf - ./data/recordings:/usr/share/nginx/html/recordings - ./data/snapshots:/usr/share/nginx/html/snapshots depends_on: - app19. 可观测性体系
19.1 核心指标定义
相机接入层:
camera_onboarding_total # 接入总数camera_onboarding_success_total # 接入成功数camera_onboarding_duration_seconds # 接入耗时camera_online_count # 在线设备数camera_offline_count # 离线设备数camera_offline_event_duration_seconds # 离线时长流媒体层:
stream_path_total # 路径总数stream_path_register_success_rate # 注册成功率stream_transcode_count # 转码路数media_gateway_request_duration_seconds # API 调用耗时录像层:
recording_plan_active_count # 活跃计划数recording_file_index_lag_seconds # 索引延迟storage_volume_usage_percent # 存储使用率storage_volume_full_count # 满盘卷数算法层:
ai_task_running_count # 运行中任务数ai_task_error_count # 异常任务数ai_engine_online_nodes # 在线节点数ai_result_queue_depth # 结果队列深度ai_result_drop_total # 丢弃结果数ai_alarm_push_success_rate # 推送成功率19.2 Prometheus 指标暴露
@Componentpublic class MetricsCollector {
private final Counter cameraOnboardingTotal; private final Counter cameraOnboardingSuccess; private final Histogram cameraOnboardingDuration; private final Gauge cameraOnlineCount; private final Gauge cameraOfflineCount;
public MetricsCollector(MeterRegistry registry) { this.cameraOnboardingTotal = Counter.builder("camera_onboarding_total") .description("Total camera onboarding attempts") .register(registry);
this.cameraOnboardingSuccess = Counter.builder("camera_onboarding_success_total") .description("Successful camera onboardings") .register(registry);
this.cameraOnboardingDuration = Histogram.builder("camera_onboarding_duration_seconds") .description("Camera onboarding duration") .register(registry);
this.cameraOnlineCount = Gauge.builder("camera_online_count", this::getOnlineCount) .description("Online camera count") .register(registry);
this.cameraOfflineCount = Gauge.builder("camera_offline_count", this::getOfflineCount) .description("Offline camera count") .register(registry); }
public void recordOnboardingAttempt() { cameraOnboardingTotal.increment(); }
public void recordOnboardingSuccess(long durationMs) { cameraOnboardingSuccess.increment(); cameraOnboardingDuration.record(durationMs / 1000.0); }
private double getOnlineCount() { return cameraRepository.countByStatus(1); }
private double getOfflineCount() { return cameraRepository.countByStatus(0); }}19.3 告警规则模板
groups: - name: camera_alerts interval: 30s rules: - alert: CameraOnboardingFailureRateHigh expr: | ( rate(camera_onboarding_total[5m]) - rate(camera_onboarding_success_total[5m]) ) / rate(camera_onboarding_total[5m]) > 0.05 for: 5m labels: severity: warning annotations: summary: "相机接入失败率过高" description: "过去 5 分钟接入失败率超过 5%"
- alert: CameraOfflineCountHigh expr: camera_offline_count > 10 for: 3m labels: severity: critical annotations: summary: "离线设备数量过多" description: "当前离线设备数: {{ $value }}"
- name: storage_alerts interval: 30s rules: - alert: StorageVolumeAlmostFull expr: storage_volume_usage_percent > 90 for: 3m labels: severity: warning annotations: summary: "存储卷即将满盘" description: "卷 {{ $labels.volume_id }} 使用率: {{ $value }}%"
- alert: RecordingIndexLagHigh expr: recording_file_index_lag_seconds > 120 for: 5m labels: severity: warning annotations: summary: "录像索引延迟过高" description: "索引延迟: {{ $value }} 秒"
- name: ai_alerts interval: 30s rules: - alert: AiResultQueueFull expr: ai_result_queue_depth > 18000 for: 2m labels: severity: critical annotations: summary: "结果队列即将满" description: "队列深度: {{ $value }}"
- alert: AlarmPushFailureRateHigh expr: ai_alarm_push_success_rate < 0.95 for: 5m labels: severity: warning annotations: summary: "告警推送成功率过低" description: "成功率: {{ $value }}"19.4 日志规范
日志级别使用:
- ERROR: 系统异常,需要人工介入
- WARN: 降级或重试成功的场景
- INFO: 关键业务事件 (接入成功、任务启动等)
- DEBUG: 详细调试信息
日志格式:
[时间] [级别] [线程] [类名] - [业务标识] 消息内容示例:
2026-02-13 14:30:00.123 INFO [main] c.e.CameraOnboardingService - [camera:1001] Device onboarded successfully: ip=10.0.1.20, manufacturer=Hikvision2026-02-13 14:30:05.456 WARN [task-1] c.e.MediaPathService - [path:cam_10_0_1_20_ch1_main] MediaMTX registration failed, retrying (1/3)2026-02-13 14:30:10.789 ERROR [task-2] c.e.OnvifManager - [camera:1002] ONVIF connection timeout: ip=10.0.1.21, timeout=3000ms20. 典型问题与解决方案
20.1 设备接入层
问题 1: ONVIF 连接超时
现象:
OnvifException: Connection timeout after 3000ms排查步骤:
ping设备 IP 确认网络连通性telnet ip 80确认端口开放- 浏览器访问
http://ip/onvif/device_service验证服务 - 检查防火墙规则
解决方案:
- 增加超时时间至 5-10 秒
- 配置网络路由
- 使用设备 Web 界面开启 ONVIF 服务
问题 2: WS-Security 认证失败
现象:
SOAP Fault: The security token could not be authenticated原因: 服务器与设备时间误差 > 5 秒
解决方案:
# 服务器端同步 NTPntpdate ntp.aliyun.com
# 设备端配置 NTP设备 Web 界面 -> 系统配置 -> 时间设置 -> NTP 服务器: ntp.aliyun.com问题 3: 在线但无画面
现象: camera_status=1, 但 RTSP 拉流失败
原因: 控制面可达但媒体面不可达
解决方案:
// 增加媒体可用性检查public boolean checkMediaAvailable(String rtspUrl) { try { ProcessBuilder pb = new ProcessBuilder( "ffmpeg", "-i", rtspUrl, "-t", "1", "-f", "null", "-" ); Process p = pb.start(); return p.waitFor(5, TimeUnit.SECONDS) && p.exitValue() == 0; } catch (Exception e) { return false; }}20.2 录像层
问题 4: 计划显示已开启但无文件
排查步骤:
- 检查
recording_plan.status是否为 1 - 检查 MediaMTX 路径配置:
GET /v3/config/paths/get/{pathName} - 检查存储卷状态:
storage_volume.status和is_full - 检查源 RTSP 可拉流性
解决方案:
// 增加健康检查@Scheduled(fixedDelay = 60000)public void checkRecordingHealth() { List<RecordingPlan> plans = planRepository.findByStatus(1);
for (RecordingPlan plan : plans) { // 检查最近 5 分钟是否有新文件 long recentFileCount = fileRepository.countByStreamNameAndTimeSince( plan.getStreamName(), LocalDateTime.now().minusMinutes(5) );
if (recentFileCount == 0) { log.warn("No recent files for plan {}, reapplying config", plan.getId()); planService.applyPlan(plan.toRequest()); } }}问题 5: 回放查询有空洞
现象: 物理文件存在,数据库查不到
原因: 索引任务只扫描最近窗口,漏扫历史补写文件
解决方案:
// 增加全量补扫任务@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨 2 点public void fullScan() { List<StorageVolume> volumes = volumeRepository.findByStatus(1);
for (StorageVolume volume : volumes) { scanVolumeFullRange(volume); }}20.3 算法层
问题 6: 任务显示运行中,但算法端无负载
排查步骤:
- 检查
ai_task_runtime.state是否真的为RUNNING - 检查节点 WebSocket 会话是否在线
- 检查算法端日志是否收到
START_TASK命令 - 检查
TASK_KEEPALIVE心跳上报
解决方案:
// 增加守护任务@Scheduled(fixedDelay = 10000)public void watchdogTask() { List<TaskRuntime> runtimes = runtimeRepository.findByState("RUNNING");
for (TaskRuntime runtime : runtimes) { long elapsed = Duration.between( runtime.getLastKeepalive(), LocalDateTime.now() ).getSeconds();
if (elapsed > 30) { log.warn("Task {} keepalive timeout, marking ERROR", runtime.getTaskId()); runtime.setState("ERROR"); runtimeRepository.updateById(runtime);
// 通知节点断连处理 onNodeDisconnected(runtime.getNodeId()); } }}问题 7: 结果队列堆积
现象: ai_result_queue_depth 持续增长,内存告警
原因: 处理速度跟不上生产速度
解决方案:
// 1. 启用非告警抽样@Value("${ai.result.sample-interval-ms:1000}")private long sampleIntervalMs;
// 2. 启用批量入库@Value("${ai.result.batch.enabled:false}")private boolean batchEnabled;
@Value("${ai.result.batch.size:200}")private int batchSize;
// 3. 增加处理线程int coreThreads = Runtime.getRuntime().availableProcessors();this.processExecutor = new ThreadPoolExecutor( coreThreads * 2, // 增加并发 coreThreads * 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000));21. FAQ 与最佳实践
21.1 相机接入
Q: 是否必须支持 ONVIF?
A: 不是必须,但强烈建议。ONVIF 提供标准化的设备管理和流获取方式。不支持 ONVIF 的设备可以通过厂商兜底模式接入,但需要手动维护 RTSP 模板。
Q: 如何处理厂商差异?
A: 分层处理:
- 协议层做最大兼容 (Fault 解析、能力降级)
- 业务层提供厂商兜底开关
- 配置层维护厂商 RTSP 模板库
Q: 主码流和子码流如何选择?
A: 建议策略:
- 录像: 主码流
- 实时预览: 子码流
- 算法分析: 根据精度要求选择
21.2 流媒体
Q: 为什么推荐 MediaMTX 而不是 ZLMediaKit?
A: 两者都是优秀的开源方案,选择 MediaMTX 的原因:
- 单一二进制,部署简单
- RESTful API 设计现代化
- WebRTC 支持原生
- 社区活跃,文档完善
Q: H.265 一定要转码吗?
A: 不是。建议策略:
- 浏览器实时预览: 转 H.264
- 录像存储: 保留 H.265 (节省空间)
- 算法分析: 视算法输入要求而定
Q: 转码会增加多少延迟?
A: 软件转码: 1-3 秒 硬件转码 (NVENC): 500ms-1 秒
21.3 录像
Q: 切片时长怎么选?
A: 综合考虑:
- 1-2 分钟: 文件数量大,IO 压力高
- 5-10 分钟: 推荐,综合最优
- 30-60 分钟: 文件少,但回放拖拽粗糙
Q: FMP4 vs TS 格式?
A: FMP4 优势:
- 现代浏览器原生支持
- 元数据结构清晰
- 后期处理友好
TS 优势:
- 兼容性更广 (老设备/老浏览器)
- 容错性强 (部分损坏不影响播放)
Q: 如何避免磁盘满导致录像中断?
A: 多层防护:
- 水位线机制自动切卷
- 定时清理过期文件
- 磁盘使用率告警
- 降级策略 (关闭非关键路径录像)
21.4 算法
Q: 一个任务可以配置多少个场景?
A: 理论无上限,实际建议:
- 单节点模式: 3-5 个
- 拆分模式: 5-10 个
过多场景会导致:
- 推理开销增加
- 结果处理复杂
- 告警频率过高
Q: 任务失败如何重试?
A: 分级重试:
- 瞬时失败 (网络抖动): 自动重启,计数器 +1
- 持续失败 (设备离线): 标记 ERROR,等待设备恢复
- 配置错误: 不重试,需要人工修复
Q: 如何控制告警推送频率?
A: 使用推送模式:
- 高优先级:
IMMEDIATE - 中优先级:
COUNT=5, WINDOW=600s - 低优先级:
WINDOW=3600s
并配合冷却期避免风暴。
22. 性能基准与压测
22.1 接入性能基准
测试环境:
- CPU: 8C16G
- 网络: 千兆局域网
- 设备: 海康威视 DS-2CD2142FWD-I
测试结果:
| 并发数 | 接入成功率 | P50 耗时 | P95 耗时 | P99 耗时 |
|---|---|---|---|---|
| 10 | 100% | 850ms | 1200ms | 1500ms |
| 50 | 99.2% | 1100ms | 1800ms | 2400ms |
| 100 | 97.5% | 1450ms | 2500ms | 3200ms |
| 200 | 94.8% | 2100ms | 3800ms | 4800ms |
瓶颈分析:
- 网络 I/O 是主要瓶颈
- ONVIF SOAP 解析占用 15-20% CPU
- FFprobe 探测占用 30-40% CPU
优化建议:
- 接入任务异步化
- FFprobe 并发限制 (信号量)
- 设备信息缓存
22.2 录像性能基准
测试环境:
- CPU: 16C32G
- 存储: NVMe SSD
- 路数: 500 路并发录像
测试结果:
| 码率 | 切片时长 | 磁盘写入 | CPU 使用 | 内存使用 |
|---|---|---|---|---|
| 2Mbps | 10m | 125MB/s | 18% | 4.2GB |
| 4Mbps | 10m | 250MB/s | 22% | 5.8GB |
| 6Mbps | 10m | 375MB/s | 28% | 7.5GB |
索引性能:
- 扫描速度: 10000 文件/秒
- 索引延迟: P95 < 30 秒
22.3 算法性能基准
测试环境:
- GPU: NVIDIA RTX 3080
- 算法: YOLOv8m
- 输入: 1280x720 @ 15fps
单节点容量:
| 场景数/任务 | 并发任务数 | GPU 使用 | 推理延迟 |
|---|---|---|---|
| 1 | 50 | 68% | 45ms |
| 2 | 35 | 72% | 52ms |
| 3 | 25 | 78% | 61ms |
结果处理:
- 队列吞吐: 20000 条/秒
- 入库延迟: P95 < 200ms
- 告警推送: P95 < 500ms
23. 架构演进路线
23.1 当前架构 (v1.0)
特点:
- 单体应用
- MySQL 单实例
- 本地文件存储
- 单节点 MediaMTX
适用场景:
- 设备数 < 1000
- 并发流 < 200
- 算法任务 < 100
23.2 中期演进 (v2.0)
目标: 支撑 5000 设备、1000 并发流
改进点:
- 服务拆分
单体应用 → 接入服务 + 录像服务 + 算法服务- 存储优化
本地存储 → 对象存储 (MinIO/OSS) 分层存储: 热数据 SSD + 冷数据 HDD- MediaMTX 集群
单节点 → 多节点 + LVS 负载均衡 路径哈希分片- 数据库读写分离
主库: 写入 从库: 查询 + 统计23.3 长期演进 (v3.0)
目标: 支撑 10万+ 设备、万级并发流
改进点:
- 微服务化
按业务域拆分: - 设备管理服务 - 流媒体服务 - 录像服务 - 算法服务 - 告警服务
服务间通信: gRPC + MQ- 分布式存储
录像: Ceph 集群 元数据: TiDB 缓存: Redis 集群- 边缘计算
中心节点 + 边缘节点 边缘: 实时分析 + 本地录像 中心: 长期存储 + 全局调度- 云原生改造
容器化: Kubernetes 服务网格: Istio 可观测: Prometheus + Grafana + Jaeger总结
本文从零开始,构建了一个完整的企业级视频智能分析平台,覆盖:
- 相机接入: ONVIF 协议深度实践、流路径治理、编码决策
- 录像存储: 存储卷管理、文件索引、回放检索
- 智能分析: 多场景任务模型、节点调度、结果处理、告警推送
- 运维体系: 配置管理、可观测性、问题排查、性能优化
核心设计原则:
- 分层解耦: 控制面/媒体面、策略/执行、实时/批量
- 统一抽象: 路径模型、场景模型、状态模型
- 容错优先: 异步化、重试、降级、回滚
- 可观测: 日志、指标、链路、告警
技术栈总结:
| 层次 | 技术选型 |
|---|---|
| 协议 | ONVIF (SOAP/WS-Security) |
| 流媒体 | MediaMTX |
| 编解码 | FFmpeg/FFprobe |
| 存储 | MySQL + 文件系统 |
| 通信 | WebSocket + RESTful |
| 监控 | Prometheus + Grafana |
评论