Skip to content

从 0 到 1 构建企业级视频智能分析平台:完整工程实践(三)

约 9679 字大约 32 分钟

MediaMTXFFmpegWebSocket

2026-02-13

从 0 到 1 构建企业级视频智能分析平台:完整工程实践

一份可直接落地的全栈技术方案 覆盖相机接入、流媒体编排、存储治理、智能分析全链路 包含架构设计、核心代码、数据模型、运维体系与性能基准


目录

第一部分:平台基础与相机接入

  1. 问题定义与架构设计
  2. 技术选型与核心组件
  3. 数据模型设计
  4. ONVIF 协议深度解析与实现
  5. MediaMTX 流媒体网关集成
  6. FFmpeg 编解码治理
  7. 相机接入完整流程
  8. 设备状态巡检与事件闭环

第二部分:录像存储系统

  1. 录像系统架构设计
  2. 存储卷治理与容量管理
  3. 录像策略编排
  4. 文件索引与回放检索

第三部分:智能分析平台

  1. 算法任务模型设计
  2. 算法节点管理与调度
  3. WebSocket 协议规范
  4. 结果处理与告警推送
  5. 预览链路实现

第四部分:运维与演进

  1. 配置管理
  2. 可观测性体系
  3. 典型问题与解决方案
  4. FAQ 与最佳实践
  5. 性能基准与压测
  6. 架构演进路线

第三部分:智能分析平台

13. 算法任务模型设计

13.1 任务模型的核心挑战

问题 1:一个任务一个算法的局限性

真实场景:

  • 厂区门口需要同时检测:人员入侵、车辆违停、未戴安全帽
  • 传统方案:创建 3 个任务,拉流 3 次,浪费带宽和算力

问题 2:多场景任务的状态复杂性

  • 同一任务的不同场景可能分配到不同算法节点
  • 需要同时维护任务级状态和场景级状态
  • 部分场景失败不应影响其他场景

13.2 任务表设计

DROP TABLE IF EXISTS `ai_task`;
CREATE TABLE `ai_task` (
  `id`                    BIGINT NOT NULL AUTO_INCREMENT,
  `task_name`             VARCHAR(255) DEFAULT NULL COMMENT '任务名称',
  `camera_id`             BIGINT DEFAULT NULL COMMENT '相机ID',
  `group_id`              BIGINT DEFAULT NULL COMMENT '分组ID',
  `camera_name`           VARCHAR(255) DEFAULT NULL COMMENT '相机名称(冗余)',
  `stream_path_id`        BIGINT DEFAULT NULL COMMENT '流路径ID',
  
  -- 流信息
  `rtsp_url`              VARCHAR(1024) DEFAULT NULL COMMENT 'RTSP源地址',
  `stream_path_name`      VARCHAR(255) DEFAULT NULL COMMENT '流路径名',
  `stream_type`           TINYINT DEFAULT 0 COMMENT '0主码流 1子码流',
  
  -- 算法配置
  `scene_codes`           JSON DEFAULT NULL COMMENT '场景编码数组',
  `region_json`           TEXT COMMENT '检测区域',
  `algorithm_params`      TEXT COMMENT '算法参数',
  
  -- 调度配置
  `dispatch_mode`         VARCHAR(20) DEFAULT 'MIXED' COMMENT 'MIXED/AUTO_SINGLE/AUTO_SPLIT/FIXED',
  `prefer_node_id`        BIGINT DEFAULT NULL COMMENT '指定节点ID(FIXED模式)',
  
  -- 检测参数
  `conf_threshold`        FLOAT DEFAULT 0.6 COMMENT '置信度阈值',
  `alarm_enabled`         TINYINT DEFAULT 1 COMMENT '是否告警',
  `level`                 INT DEFAULT 1 COMMENT '告警等级',
  
  -- 视频处理参数
  `input_fps`             INT DEFAULT 15 COMMENT '输入帧率',
  `output_fps`            INT DEFAULT 15 COMMENT '输出帧率',
  `output_width`          INT DEFAULT 1280 COMMENT '输出宽度',
  `output_height`         INT DEFAULT 720 COMMENT '输出高度',
  `output_bitrate`        VARCHAR(20) DEFAULT '3000k' COMMENT '输出码率',
  `process_quality`       VARCHAR(20) DEFAULT 'MEDIUM' COMMENT 'LOW/MEDIUM/HIGH',
  
  -- 快照配置
  `snapshot_policy`       VARCHAR(16) DEFAULT 'ALARM' COMMENT 'OFF/ALL/ALARM',
  `snapshot_quality`      INT DEFAULT 80 COMMENT '快照质量(1-100)',
  
  -- 推送配置
  `push_enabled`          TINYINT DEFAULT 1 COMMENT '是否推送',
  `push_mode`             VARCHAR(32) DEFAULT 'LEVEL_DEFAULT' COMMENT '推送模式',
  `push_threshold_count`  INT DEFAULT NULL COMMENT '推送阈值次数',
  `push_window_seconds`   INT DEFAULT NULL COMMENT '推送窗口秒数',
  `push_cooldown_seconds` INT DEFAULT 0 COMMENT '推送冷却秒数',
  `push_route_key`        VARCHAR(64) DEFAULT NULL COMMENT '推送路由键',
  
  -- 运行状态
  `status`                TINYINT DEFAULT 0 COMMENT '0停止 1运行 2异常',
  `error_count`           INT DEFAULT 0 COMMENT '错误次数',
  `error_message`         VARCHAR(1024) DEFAULT NULL COMMENT '错误消息',
  `last_run_time`         DATETIME DEFAULT NULL COMMENT '最后运行时间',
  `last_keepalive_time`   DATETIME DEFAULT NULL COMMENT '最后心跳时间',
  
  `create_time`           DATETIME DEFAULT CURRENT_TIMESTAMP,
  `update_time`           DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  
  PRIMARY KEY (`id`),
  KEY `idx_camera_status` (`camera_id`, `status`),
  KEY `idx_group_id` (`group_id`),
  KEY `idx_dispatch_mode` (`dispatch_mode`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='算法任务表';

JSON 字段示例:

{
  "scene_codes": ["REGION_INTRUSION", "HELMET_DETECTION", "VEHICLE_PARKING"]
}

13.3 场景分类表

DROP TABLE IF EXISTS `ai_scene_category`;
CREATE TABLE `ai_scene_category` (
  `id`             BIGINT NOT NULL AUTO_INCREMENT,
  `category_name`  VARCHAR(64) NOT NULL COMMENT '分类名称',
  `category_code`  VARCHAR(64) NOT NULL COMMENT '分类编码',
  `icon`           VARCHAR(128) DEFAULT NULL COMMENT '图标',
  `sort_order`     INT DEFAULT 0 COMMENT '排序',
  `status`         TINYINT DEFAULT 1 COMMENT '状态',
  `create_time`    DATETIME DEFAULT CURRENT_TIMESTAMP,
  `update_time`    DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_category_code` (`category_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='场景分类表';

示例数据:

INSERT INTO ai_scene_category (category_name, category_code, sort_order) VALUES
('安全管理', 'SAFETY', 1),
('行为识别', 'BEHAVIOR', 2),
('车辆管理', 'VEHICLE', 3),
('环境监测', 'ENVIRONMENT', 4);

13.4 场景类型表

DROP TABLE IF EXISTS `ai_scene_type`;
CREATE TABLE `ai_scene_type` (
  `id`              BIGINT NOT NULL AUTO_INCREMENT,
  `category_id`     BIGINT NOT NULL COMMENT '分类ID',
  `scene_name`      VARCHAR(100) NOT NULL COMMENT '场景名称',
  `scene_code`      VARCHAR(64) NOT NULL COMMENT '场景编码',
  `algorithm_type`  VARCHAR(32) DEFAULT NULL COMMENT '算法类型',
  `model_path`      VARCHAR(255) DEFAULT NULL COMMENT '模型路径',
  `default_params`  JSON DEFAULT NULL COMMENT '默认参数',
  `description`     VARCHAR(500) DEFAULT NULL COMMENT '描述',
  `icon`            VARCHAR(128) DEFAULT NULL COMMENT '图标',
  `status`          TINYINT DEFAULT 1 COMMENT '状态',
  `create_time`     DATETIME DEFAULT CURRENT_TIMESTAMP,
  `update_time`     DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_scene_code` (`scene_code`),
  KEY `idx_category_status` (`category_id`, `status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='场景类型表';

示例数据:

INSERT INTO ai_scene_type (category_id, scene_name, scene_code, algorithm_type, default_params) VALUES
(1, '区域入侵检测', 'REGION_INTRUSION', 'object_detection', 
 '{"problem_labels":["person","vehicle"],"conf_threshold":0.6}'),
(1, '安全帽检测', 'HELMET_DETECTION', 'classification',
 '{"problem_labels":["no_helmet"],"conf_threshold":0.7}'),
(3, '车辆违停检测', 'VEHICLE_PARKING', 'object_detection',
 '{"problem_labels":["car","truck"],"conf_threshold":0.6}');

13.5 任务请求模型

@Data
public class AiTaskUpsertRequest {
    private Long id;
    private String taskName;
    
    // 相机绑定
    private Long cameraId;
    private Long streamPathId;
    private String rtspUrl;
    private Integer streamType; // 0主码流 1子码流
    
    // 场景配置 (核心)
    private List<String> sceneCodes;
    
    // 检测区域
    private String regionJson;
    private String algorithmParams;
    
    // 调度配置
    private String dispatchMode; // MIXED/AUTO_SINGLE/AUTO_SPLIT/FIXED
    private Long preferNodeId;
    
    // 检测参数
    private Float confThreshold;
    private Integer alarmEnabled;
    private Integer level;
    
    // 视频处理
    private Integer inputFps;
    private Integer outputFps;
    private Integer outputWidth;
    private Integer outputHeight;
    private String outputBitrate;
    private String processQuality;
    
    // 快照配置
    private String snapshotPolicy;
    private Integer snapshotQuality;
    
    // 推送配置
    private Integer pushEnabled;
    private String pushMode;
    private Integer pushThresholdCount;
    private Integer pushWindowSeconds;
    private Integer pushCooldownSeconds;
    private String pushRouteKey;
}

13.6 任务创建服务

@Service
@Slf4j
public class AiTaskService {
    
    @Resource
    private AiTaskRepository taskRepository;
    
    @Resource
    private CameraDeviceRepository cameraRepository;
    
    @Resource
    private StreamPathRepository streamPathRepository;
    
    @Transactional(rollbackFor = Exception.class)
    public AiTask createTask(AiTaskUpsertRequest request) {
        // 1. 参数规范化
        normalizeRequest(request);
        
        // 2. 查询相机信息
        CameraDevice camera = cameraRepository.findById(request.getCameraId())
            .orElseThrow(() -> new NotFoundException("相机不存在"));
        
        // 3. 解析流路径
        StreamPath streamPath = resolveStreamPath(request);
        
        // 4. 构建任务实体
        AiTask task = new AiTask();
        BeanUtils.copyProperties(request, task);
        
        task.setCameraName(camera.getCameraName());
        task.setGroupId(camera.getGroupId());
        task.setStreamPathName(streamPath.getPathName());
        task.setRtspUrl(streamPath.getSourceUrl());
        task.setStatus(0); // 初始停止状态
        
        // 5. 保存
        taskRepository.save(task);
        
        log.info("Task created: id={}, camera={}, scenes={}", 
            task.getId(), task.getCameraId(), task.getSceneCodes());
        
        return task;
    }
    
    private void normalizeRequest(AiTaskUpsertRequest request) {
        // 场景编码去重
        if (request.getSceneCodes() != null) {
            request.setSceneCodes(
                request.getSceneCodes().stream()
                    .filter(StringUtils::isNotBlank)
                    .distinct()
                    .collect(Collectors.toList())
            );
        }
        
        // 默认值设置
        if (request.getInputFps() == null) request.setInputFps(15);
        if (request.getOutputFps() == null) request.setOutputFps(request.getInputFps());
        if (request.getOutputWidth() == null) request.setOutputWidth(1280);
        if (request.getOutputHeight() == null) request.setOutputHeight(720);
        if (request.getOutputBitrate() == null) request.setOutputBitrate("3000k");
        if (request.getProcessQuality() == null) request.setProcessQuality("MEDIUM");
        if (request.getDispatchMode() == null) request.setDispatchMode("MIXED");
        if (request.getConfThreshold() == null) request.setConfThreshold(0.6f);
        if (request.getAlarmEnabled() == null) request.setAlarmEnabled(1);
        if (request.getSnapshotPolicy() == null) request.setSnapshotPolicy("ALARM");
        if (request.getSnapshotQuality() == null) request.setSnapshotQuality(80);
    }
    
    private StreamPath resolveStreamPath(AiTaskUpsertRequest request) {
        if (request.getStreamPathId() != null) {
            return streamPathRepository.findById(request.getStreamPathId())
                .orElseThrow(() -> new NotFoundException("流路径不存在"));
        }
        
        // 按类型查找
        Integer subtype = (request.getStreamType() != null && request.getStreamType() == 1) ? 1 : 0;
        
        return streamPathRepository.findByCameraIdAndSubtype(request.getCameraId(), subtype)
            .orElseThrow(() -> new NotFoundException("未找到流路径"));
    }
}

14. 算法节点管理与调度

14.1 算法节点表

DROP TABLE IF EXISTS `ai_engine_node`;
CREATE TABLE `ai_engine_node` (
  `id`                   BIGINT NOT NULL AUTO_INCREMENT,
  `node_id`              VARCHAR(100) NOT NULL COMMENT '节点唯一标识',
  `node_name`            VARCHAR(100) DEFAULT NULL COMMENT '节点名称',
  `ip`                   VARCHAR(64) NOT NULL COMMENT 'IP地址',
  `port`                 INT DEFAULT NULL COMMENT '端口',
  `status`               TINYINT DEFAULT 0 COMMENT '1在线 0离线 2异常',
  `last_heartbeat_time`  DATETIME DEFAULT NULL COMMENT '最后心跳时间',
  
  -- 能力信息
  `scene_name_json`      JSON DEFAULT NULL COMMENT '支持场景名称',
  `scene_code_json`      JSON DEFAULT NULL COMMENT '支持场景编码',
  
  -- 负载信息
  `load_info`            JSON DEFAULT NULL COMMENT '负载信息',
  `max_tasks`            INT DEFAULT 50 COMMENT '最大任务数',
  
  `create_time`          DATETIME DEFAULT CURRENT_TIMESTAMP,
  `update_time`          DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_node_id` (`node_id`),
  KEY `idx_status_heartbeat` (`status`, `last_heartbeat_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='算法节点表';

JSON 字段示例:

{
  "scene_code_json": ["REGION_INTRUSION", "HELMET_DETECTION", "FIRE_DETECTION"],
  "load_info": {
    "cpu": 0.42,
    "gpu": 0.68,
    "memory": 0.55,
    "tasks": 12
  }
}

14.2 任务运行态表

DROP TABLE IF EXISTS `ai_task_runtime`;
CREATE TABLE `ai_task_runtime` (
  `task_id`           BIGINT NOT NULL COMMENT '任务ID',
  `node_id`           VARCHAR(100) NOT NULL COMMENT '主节点ID',
  `state`             VARCHAR(20) NOT NULL COMMENT 'RUNNING/STOPPED/ERROR',
  `start_time`        DATETIME DEFAULT NULL COMMENT '启动时间',
  `last_keepalive`    DATETIME DEFAULT NULL COMMENT '最后心跳',
  `restart_count`     INT DEFAULT 0 COMMENT '重启次数',
  `preview_path`      VARCHAR(255) DEFAULT NULL COMMENT '预览路径',
  `preview_play_url`  VARCHAR(512) DEFAULT NULL COMMENT '预览播放地址',
  `update_time`       DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`task_id`),
  KEY `idx_node_state` (`node_id`, `state`),
  KEY `idx_keepalive` (`last_keepalive`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='任务运行态表';

14.3 场景运行态表

DROP TABLE IF EXISTS `ai_task_scene_runtime`;
CREATE TABLE `ai_task_scene_runtime` (
  `task_id`         BIGINT NOT NULL COMMENT '任务ID',
  `scene_code`      VARCHAR(50) NOT NULL COMMENT '场景编码',
  `node_id`         VARCHAR(100) DEFAULT NULL COMMENT '执行节点ID',
  `state`           VARCHAR(20) NOT NULL COMMENT 'RUNNING/STOPPED/ERROR',
  `start_time`      DATETIME DEFAULT NULL COMMENT '启动时间',
  `last_keepalive`  DATETIME DEFAULT NULL COMMENT '最后心跳',
  `restart_count`   INT DEFAULT 0 COMMENT '重启次数',
  `error_message`   VARCHAR(500) DEFAULT NULL COMMENT '错误消息',
  `update_time`     DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`task_id`, `scene_code`),
  KEY `idx_node_state` (`node_id`, `state`),
  KEY `idx_keepalive` (`last_keepalive`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='场景运行态表';

为什么需要两层运行态?

  1. 任务级运行态 (ai_task_runtime)

    • 记录任务整体状态
    • 绑定主节点 (用于预览)
    • 统计全局心跳
  2. 场景级运行态 (ai_task_scene_runtime)

    • 支持场景拆分到不同节点
    • 独立监控每个场景状态
    • 精准故障定位

14.4 调度策略实现

@Service
@Slf4j
public class TaskDispatchService {
    
    @Resource
    private EngineNodeService nodeService;
    
    /**
     * 生成调度计划
     */
    public DispatchPlan buildDispatchPlan(AiTask task, List<String> sceneCodes) {
        String mode = task.getDispatchMode();
        if (mode == null) mode = "MIXED";
        
        Map<String, List<String>> nodeToScenes = new LinkedHashMap<>();
        
        switch (mode) {
            case "FIXED":
                nodeToScenes = dispatchFixed(task, sceneCodes);
                break;
            case "AUTO_SINGLE":
                nodeToScenes = dispatchAutoSingle(sceneCodes);
                break;
            case "AUTO_SPLIT":
                nodeToScenes = dispatchAutoSplit(sceneCodes);
                break;
            case "MIXED":
            default:
                nodeToScenes = dispatchMixed(sceneCodes);
                break;
        }
        
        String primaryNodeId = selectPrimaryNode(nodeToScenes);
        
        return new DispatchPlan(nodeToScenes, primaryNodeId);
    }
    
    /**
     * FIXED: 指定节点运行全部场景
     */
    private Map<String, List<String>> dispatchFixed(AiTask task, List<String> sceneCodes) {
        if (task.getPreferNodeId() == null) {
            throw new BusinessException("FIXED模式必须指定preferNodeId");
        }
        
        String nodeId = nodeService.getNodeIdById(task.getPreferNodeId());
        if (nodeId == null) {
            throw new BusinessException("指定节点不存在");
        }
        
        if (!nodeService.supportsAllSceneCodes(nodeId, sceneCodes)) {
            throw new BusinessException("指定节点不支持全部场景");
        }
        
        Map<String, List<String>> result = new HashMap<>();
        result.put(nodeId, new ArrayList<>(sceneCodes));
        return result;
    }
    
    /**
     * AUTO_SINGLE: 自动选择单节点运行全部场景
     */
    private Map<String, List<String>> dispatchAutoSingle(List<String> sceneCodes) {
        String nodeId = nodeService.selectBestNodeForSceneCodes(sceneCodes);
        
        if (nodeId == null) {
            throw new BusinessException("未找到支持全部场景的节点");
        }
        
        Map<String, List<String>> result = new HashMap<>();
        result.put(nodeId, new ArrayList<>(sceneCodes));
        return result;
    }
    
    /**
     * AUTO_SPLIT: 按场景拆分到不同节点
     */
    private Map<String, List<String>> dispatchAutoSplit(List<String> sceneCodes) {
        Map<String, List<String>> nodeToScenes = new LinkedHashMap<>();
        
        for (String sceneCode : sceneCodes) {
            String nodeId = nodeService.selectBestNodeForSceneCode(sceneCode);
            
            if (nodeId == null) {
                throw new BusinessException("未找到支持场景的节点: " + sceneCode);
            }
            
            nodeToScenes.computeIfAbsent(nodeId, k -> new ArrayList<>()).add(sceneCode);
        }
        
        return nodeToScenes;
    }
    
    /**
     * MIXED: 优先单节点,失败则拆分
     */
    private Map<String, List<String>> dispatchMixed(List<String> sceneCodes) {
        String singleNodeId = nodeService.selectBestNodeForSceneCodes(sceneCodes);
        
        if (singleNodeId != null) {
            Map<String, List<String>> result = new HashMap<>();
            result.put(singleNodeId, new ArrayList<>(sceneCodes));
            return result;
        }
        
        log.info("No single node available, fallback to split mode");
        return dispatchAutoSplit(sceneCodes);
    }
    
    /**
     * 选择主节点 (用于预览)
     */
    private String selectPrimaryNode(Map<String, List<String>> nodeToScenes) {
        // 选择场景数量最多的节点作为主节点
        return nodeToScenes.entrySet().stream()
            .max(Comparator.comparingInt(e -> e.getValue().size()))
            .map(Map.Entry::getKey)
            .orElse(null);
    }
}

@Data
@AllArgsConstructor
class DispatchPlan {
    private Map<String, List<String>> nodeToScenes;
    private String primaryNodeId;
}

14.5 节点能力匹配

@Service
@Slf4j
public class EngineNodeService {
    
    @Resource
    private EngineNodeRepository nodeRepository;
    
    @Resource
    private TaskSceneRuntimeRepository sceneRuntimeRepository;
    
    /**
     * 选择最佳节点 (支持全部场景)
     */
    public String selectBestNodeForSceneCodes(List<String> sceneCodes) {
        List<EngineNode> candidates = getCandidateNodes(sceneCodes);
        
        if (candidates.isEmpty()) {
            return null;
        }
        
        // 按负载升序排序
        return candidates.stream()
            .min(Comparator.comparingInt(this::getActiveTaskCount))
            .map(EngineNode::getNodeId)
            .orElse(null);
    }
    
    /**
     * 选择最佳节点 (支持单个场景)
     */
    public String selectBestNodeForSceneCode(String sceneCode) {
        return selectBestNodeForSceneCodes(Collections.singletonList(sceneCode));
    }
    
    /**
     * 获取候选节点
     */
    public List<EngineNode> getCandidateNodes(List<String> sceneCodes) {
        List<EngineNode> allNodes = nodeRepository.findByStatus(1);
        
        return allNodes.stream()
            .filter(node -> supportsAllSceneCodes(node.getNodeId(), sceneCodes))
            .filter(node -> !isOverloaded(node))
            .collect(Collectors.toList());
    }
    
    /**
     * 判断节点是否支持全部场景
     */
    public boolean supportsAllSceneCodes(String nodeId, List<String> sceneCodes) {
        EngineNode node = nodeRepository.findByNodeId(nodeId);
        if (node == null) return false;
        
        List<String> supportedCodes = parseSceneCodes(node.getSceneCodeJson());
        return new HashSet<>(supportedCodes).containsAll(sceneCodes);
    }
    
    /**
     * 获取节点当前运行任务数
     */
    private int getActiveTaskCount(EngineNode node) {
        return sceneRuntimeRepository.countByNodeIdAndState(node.getNodeId(), "RUNNING");
    }
    
    /**
     * 判断节点是否过载
     */
    private boolean isOverloaded(EngineNode node) {
        int activeCount = getActiveTaskCount(node);
        return activeCount >= node.getMaxTasks();
    }
    
    private List<String> parseSceneCodes(String json) {
        if (json == null || json.isEmpty()) {
            return Collections.emptyList();
        }
        
        try {
            return JSON.parseArray(json, String.class);
        } catch (Exception e) {
            log.error("Failed to parse scene codes: {}", json, e);
            return Collections.emptyList();
        }
    }
}

15. WebSocket 协议规范

15.1 协议设计原则

  1. 消息类型明确:每条消息包含 type 字段
  2. 双向通信:平台->算法,算法->平台
  3. 幂等性:重复接收相同消息不产生副作用
  4. 容错性:未知消息类型不中断连接

15.2 节点注册消息

算法节点 -> 平台:

{
  "type": "NODE_REGISTER",
  "node_id": "gpu-node-01",
  "node_name": "GPU-Server-01",
  "ip": "10.0.10.21",
  "port": 18080,
  "max_tasks": 50,
  "algorithms": ["REGION_INTRUSION", "HELMET_DETECTION", "FIRE_DETECTION"],
  "load": {
    "cpu": 0.21,
    "gpu": 0.64,
    "memory": 0.55,
    "tasks": 6
  }
}

平台处理:

@ServerEndpoint("/ws/ai-engine")
@Component
public class AiEngineSocket {
    
    private static EngineNodeService nodeService;
    
    private String sessionNodeId;
    
    @OnMessage
    public void onMessage(String message, Session session) {
        try {
            JSONObject msg = JSON.parseObject(message);
            String type = msg.getString("type");
            
            switch (type) {
                case "NODE_REGISTER":
                    handleRegister(msg, session);
                    break;
                case "NODE_HEARTBEAT":
                    handleHeartbeat(msg);
                    break;
                case "TASK_KEEPALIVE":
                    handleTaskKeepalive(msg);
                    break;
                case "AI_RESULT":
                    handleAiResult(msg);
                    break;
                default:
                    log.warn("Unknown message type: {}", type);
            }
        } catch (Exception e) {
            log.error("Message processing failed", e);
        }
    }
    
    private void handleRegister(JSONObject msg, Session session) {
        String nodeId = msg.getString("node_id");
        
        // 会话绑定
        this.sessionNodeId = nodeId;
        session.getUserProperties().put("nodeId", nodeId);
        
        // 更新节点状态
        nodeService.upsertNode(msg);
        
        log.info("Node registered: {}", nodeId);
    }
    
    @OnClose
    public void onClose(Session session) {
        if (sessionNodeId != null) {
            nodeService.markOffline(sessionNodeId);
            log.info("Node disconnected: {}", sessionNodeId);
        }
    }
}

15.3 任务控制消息

平台 -> 算法节点 (启动任务):

{
  "type": "START_TASK",
  "data": {
    "task_id": 1001,
    "camera_id": 3001,
    "scene_codes": ["REGION_INTRUSION", "HELMET_DETECTION"],
    "stream_key": "3001:0",
    "rtsp": "rtsp://admin:pass@10.0.1.20:554/Streaming/Channels/101",
    "confThreshold": 0.6,
    "region": [[0.1,0.1],[0.9,0.1],[0.9,0.9],[0.1,0.9]],
    "params": {
      "input_fps": 15,
      "in_w": 1280,
      "in_h": 720,
      "result_fps": 8,
      "quality": "MEDIUM",
      "alarm_enabled": 1,
      "snapshot_policy": "ALARM",
      "snapshot_quality": 80
    }
  }
}

平台 -> 算法节点 (停止任务):

{
  "type": "STOP_TASK",
  "data": {
    "task_id": 1001
  }
}

15.4 结果上报消息

算法节点 -> 平台 (单场景结果):

{
  "type": "AI_RESULT",
  "task_id": 1001,
  "camera_id": 3001,
  "scene_code": "REGION_INTRUSION",
  "ts": 1739426400000,
  "frame_width": 1920,
  "frame_height": 1080,
  "alarm_event": true,
  "confidence": 0.91,
  "objects": [
    {
      "label": "person",
      "score": 0.91,
      "bbox": [0.1, 0.2, 0.3, 0.4]
    }
  ],
  "snapshot_path": "/static/snapshots/task_1001/2026-02-13-14-30-00.jpg"
}

算法节点 -> 平台 (多场景结果):

{
  "type": "AI_RESULT",
  "task_id": 1001,
  "camera_id": 3001,
  "ts": 1739426400000,
  "frame_width": 1920,
  "frame_height": 1080,
  "snapshot_path": "/static/snapshots/task_1001/2026-02-13-14-30-00.jpg",
  "results": [
    {
      "scene_code": "REGION_INTRUSION",
      "alarm_event": true,
      "confidence": 0.91,
      "objects": [...]
    },
    {
      "scene_code": "HELMET_DETECTION",
      "alarm_event": false,
      "confidence": 0.87,
      "objects": [...]
    }
  ]
}

15.5 预览控制消息

平台 -> 算法节点 (开始预览):

{
  "type": "START_PREVIEW",
  "data": {
    "task_id": 1001,
    "preview_path": "ai_task_1001",
    "fps": 15,
    "draw": true,
    "encoder": "h264_nvenc",
    "bitrate": "3000k",
    "preview_mode": "RESULT_DRIVEN",
    "width": 1280,
    "height": 720
  }
}

平台 -> 算法节点 (停止预览):

{
  "type": "STOP_PREVIEW",
  "data": {
    "task_id": 1001
  }
}

16. 结果处理与告警推送

16.1 场景最新态表

DROP TABLE IF EXISTS `ai_task_scene_state`;
CREATE TABLE `ai_task_scene_state` (
  `task_id`             BIGINT NOT NULL COMMENT '任务ID',
  `camera_id`           BIGINT NOT NULL COMMENT '相机ID',
  `scene_code`          VARCHAR(50) NOT NULL COMMENT '场景编码',
  `scene_type_id`       BIGINT DEFAULT NULL COMMENT '场景类型ID',
  
  -- 最新结果
  `last_ts`             BIGINT NOT NULL COMMENT '最新时间戳',
  `last_time`           DATETIME NOT NULL COMMENT '最新时间',
  `event_type`          VARCHAR(20) DEFAULT NULL COMMENT '事件类型',
  `event_desc`          VARCHAR(255) DEFAULT NULL COMMENT '事件描述',
  `alarm_event`         TINYINT DEFAULT 0 COMMENT '是否告警',
  `total_count`         INT DEFAULT 0 COMMENT '检测总数',
  
  -- 统计信息
  `label_counts_json`   JSON DEFAULT NULL COMMENT '标签计数',
  `problem_counts_json` JSON DEFAULT NULL COMMENT '问题计数',
  `metrics_json`        JSON DEFAULT NULL COMMENT '指标信息',
  `last_snapshot_path`  VARCHAR(500) DEFAULT NULL COMMENT '最新快照',
  
  `update_time`         DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  
  PRIMARY KEY (`task_id`, `scene_code`),
  KEY `idx_camera_alarm` (`camera_id`, `alarm_event`),
  KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='场景最新态表';

JSON 字段示例:

{
  "label_counts_json": {
    "person": 3,
    "vehicle": 1
  },
  "problem_counts_json": {
    "no_helmet": 2
  },
  "metrics_json": {
    "avg_confidence": 0.87,
    "max_confidence": 0.94
  }
}

16.2 检测结果表

DROP TABLE IF EXISTS `ai_detection_result`;
CREATE TABLE `ai_detection_result` (
  `id`                 BIGINT NOT NULL AUTO_INCREMENT,
  `task_id`            BIGINT NOT NULL COMMENT '任务ID',
  `camera_id`          BIGINT NOT NULL COMMENT '相机ID',
  `group_id`           BIGINT DEFAULT NULL COMMENT '分组ID',
  `scene_type_id`      BIGINT DEFAULT NULL COMMENT '场景类型ID',
  `scene_code`         VARCHAR(50) DEFAULT NULL COMMENT '场景编码',
  `node_id`            VARCHAR(100) DEFAULT NULL COMMENT '节点ID',
  
  -- 检测时间
  `detect_time`        DATETIME NOT NULL COMMENT '检测时间',
  `detect_timestamp`   BIGINT NOT NULL COMMENT '检测时间戳',
  
  -- 告警信息
  `is_alarm`           TINYINT DEFAULT 0 COMMENT '是否告警',
  `alarm_level`        TINYINT DEFAULT 0 COMMENT '告警等级',
  `confidence`         FLOAT DEFAULT NULL COMMENT '置信度',
  `detect_count`       INT DEFAULT 0 COMMENT '检测数量',
  
  -- 检测结果
  `objects_json`       JSON DEFAULT NULL COMMENT '检测对象',
  `event_type`         VARCHAR(20) DEFAULT NULL COMMENT '事件类型',
  `event_desc`         VARCHAR(255) DEFAULT NULL COMMENT '事件描述',
  `total_count`        INT DEFAULT NULL COMMENT '总数',
  `label_counts_json`  JSON DEFAULT NULL COMMENT '标签计数',
  `problem_counts_json` JSON DEFAULT NULL COMMENT '问题计数',
  `metrics_json`       JSON DEFAULT NULL COMMENT '指标',
  
  -- 快照
  `snapshot_path`      VARCHAR(500) DEFAULT NULL COMMENT '快照路径',
  
  -- 推送状态
  `is_pushed`          TINYINT DEFAULT 0 COMMENT '是否已推送',
  `push_time`          DATETIME DEFAULT NULL COMMENT '推送时间',
  
  `create_time`        DATETIME DEFAULT CURRENT_TIMESTAMP,
  
  PRIMARY KEY (`id`),
  KEY `idx_task_time` (`task_id`, `detect_time`),
  KEY `idx_alarm_scene_group_time` (`is_alarm`, `scene_code`, `group_id`, `detect_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='检测结果表';

16.3 结果处理流水线

@Service
@Slf4j
public class AiResultProcessorService {
    
    @Resource
    private TaskSceneStateService sceneStateService;
    
    @Resource
    private DetectionResultService detectionResultService;
    
    @Resource
    private AlarmPushService alarmPushService;
    
    @Resource
    private WebNotifySocket webNotifySocket;
    
    private final BlockingQueue<JSONObject> resultQueue = 
        new LinkedBlockingQueue<>(20000);
    
    private final ExecutorService processExecutor;
    
    // 抽样控制
    private final ConcurrentHashMap<String, Long> sampleCache = new ConcurrentHashMap<>();
    
    @Value("${ai.result.sample-interval-ms:1000}")
    private long sampleIntervalMs;
    
    public AiResultProcessorService() {
        int coreThreads = Runtime.getRuntime().availableProcessors();
        this.processExecutor = new ThreadPoolExecutor(
            coreThreads,
            coreThreads * 2,
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1000),
            new ThreadFactoryBuilder().setNameFormat("result-processor-%d").build(),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
        
        // 启动处理线程
        for (int i = 0; i < coreThreads; i++) {
            processExecutor.submit(this::processLoop);
        }
    }
    
    /**
     * 提交结果到队列
     */
    public void submit(JSONObject resultMsg) {
        boolean alarm = resultMsg.getBooleanValue("alarm_event");
        
        boolean offered = resultQueue.offer(resultMsg);
        
        if (!offered) {
            if (!alarm) {
                // 非告警直接丢弃
                log.warn("Result queue full, dropping non-alarm event");
                return;
            }
            
            // 告警消息强插
            int tries = 0;
            while (!offered && tries++ < 10) {
                resultQueue.poll(); // 淘汰队头
                offered = resultQueue.offer(resultMsg);
            }
            
            if (!offered) {
                log.error("Failed to enqueue alarm event after retries");
            }
        }
    }
    
    /**
     * 处理循环
     */
    private void processLoop() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                JSONObject msg = resultQueue.poll(1, TimeUnit.SECONDS);
                if (msg != null) {
                    processResult(msg);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                log.error("Result processing error", e);
            }
        }
    }
    
    /**
     * 处理单条结果
     */
    private void processResult(JSONObject msg) {
        Long taskId = msg.getLong("task_id");
        Long cameraId = msg.getLong("camera_id");
        Long ts = msg.getLong("ts");
        String snapshotPath = msg.getString("snapshot_path");
        
        // 兼容单场景和多场景
        JSONArray resultsArray = msg.getJSONArray("results");
        
        if (resultsArray != null && !resultsArray.isEmpty()) {
            // 多场景结果
            for (int i = 0; i < resultsArray.size(); i++) {
                JSONObject sceneResult = resultsArray.getJSONObject(i);
                processSceneResult(taskId, cameraId, ts, snapshotPath, sceneResult);
            }
        } else {
            // 单场景结果
            processSceneResult(taskId, cameraId, ts, snapshotPath, msg);
        }
    }
    
    private void processSceneResult(Long taskId, Long cameraId, Long ts, 
                                     String snapshotPath, JSONObject sceneResult) {
        
        String sceneCode = sceneResult.getString("scene_code");
        Boolean alarmEvent = sceneResult.getBoolean("alarm_event");
        
        // 1. 更新最新态 (总是执行)
        sceneStateService.updateState(taskId, cameraId, sceneCode, ts, sceneResult);
        
        // 2. 判断是否需要持久化
        if (StringUtils.isBlank(snapshotPath)) {
            // 无快照,仅更新状态
            return;
        }
        
        if (!Boolean.TRUE.equals(alarmEvent)) {
            // 非告警,抽样
            if (!shouldSample(taskId, sceneCode, ts)) {
                return;
            }
        }
        
        // 3. 持久化结果
        DetectionResult result = buildDetectionResult(
            taskId, cameraId, sceneCode, ts, snapshotPath, sceneResult
        );
        
        detectionResultService.save(result);
        
        // 4. 告警推送
        if (Boolean.TRUE.equals(alarmEvent)) {
            alarmPushService.onAlarmDetected(result, sceneCode);
        }
    }
    
    /**
     * 抽样判断
     */
    private boolean shouldSample(Long taskId, String sceneCode, Long ts) {
        String key = taskId + ":" + sceneCode;
        Long lastTs = sampleCache.get(key);
        
        if (lastTs == null || (ts - lastTs) >= sampleIntervalMs) {
            sampleCache.put(key, ts);
            return true;
        }
        
        return false;
    }
    
    private DetectionResult buildDetectionResult(Long taskId, Long cameraId, 
                                                  String sceneCode, Long ts,
                                                  String snapshotPath,
                                                  JSONObject sceneResult) {
        
        DetectionResult result = new DetectionResult();
        result.setTaskId(taskId);
        result.setCameraId(cameraId);
        result.setSceneCode(sceneCode);
        result.setDetectTimestamp(ts);
        result.setDetectTime(LocalDateTime.ofInstant(
            Instant.ofEpochMilli(ts),
            ZoneId.systemDefault()
        ));
        result.setIsAlarm(sceneResult.getBoolean("alarm_event") ? 1 : 0);
        result.setConfidence(sceneResult.getFloat("confidence"));
        result.setObjectsJson(sceneResult.getJSONArray("objects").toJSONString());
        result.setSnapshotPath(snapshotPath);
        result.setEventType(sceneResult.getString("event_type"));
        result.setEventDesc(sceneResult.getString("event_desc"));
        result.setTotalCount(sceneResult.getInteger("total_count"));
        
        return result;
    }
}

处理流程图:

[算法结果]  [快路径: WebSocket推送前端]  [实时框显示]
            
         [慢路径]
            
      [抽样判断]
            
    [有快照?] ──NO→ [仅更新最新态]
        
       YES
        
    [告警?] ──NO→ [抽样入库]
        
       YES
        
    [入库]  [告警推送]

16.4 告警推送服务

@Service
@Slf4j
public class AlarmPushService {
    
    @Resource
    private AiTaskRepository taskRepository;
    
    @Resource
    private AlarmPushConfigService configService;
    
    @Resource
    private AlarmPushLogRepository pushLogRepository;
    
    private final BlockingQueue<PushTask> pushQueue = new LinkedBlockingQueue<>(20000);
    
    private final ExecutorService pushExecutor;
    
    // 推送桶 (用于聚合推送)
    private final ConcurrentHashMap<String, PushBucket> buckets = new ConcurrentHashMap<>();
    
    public AlarmPushService() {
        this.pushExecutor = Executors.newFixedThreadPool(4,
            new ThreadFactoryBuilder().setNameFormat("alarm-push-%d").build());
        
        // 启动推送线程
        for (int i = 0; i < 4; i++) {
            pushExecutor.submit(this::pushLoop);
        }
        
        // 启动桶刷新线程
        Executors.newSingleThreadScheduledExecutor()
            .scheduleAtFixedRate(this::flushBuckets, 1, 1, TimeUnit.SECONDS);
    }
    
    public void onAlarmDetected(DetectionResult result, String sceneCode) {
        AiTask task = taskRepository.findById(result.getTaskId()).orElse(null);
        if (task == null) return;
        
        // 获取推送策略
        PushStrategy strategy = resolvePushStrategy(task);
        
        if ("IMMEDIATE".equals(strategy.getMode())) {
            // 立即推送
            pushQueue.offer(new PushTask(result, task, sceneCode));
        } else {
            // 聚合推送
            addToBucket(result, task, sceneCode, strategy);
        }
    }
    
    private void addToBucket(DetectionResult result, AiTask task, 
                             String sceneCode, PushStrategy strategy) {
        
        String bucketKey = task.getId() + ":" + sceneCode;
        
        PushBucket bucket = buckets.computeIfAbsent(bucketKey, 
            k -> new PushBucket(task, sceneCode, strategy));
        
        bucket.add(result);
    }
    
    private void flushBuckets() {
        long now = System.currentTimeMillis();
        
        buckets.values().forEach(bucket -> {
            if (bucket.shouldFlush(now)) {
                List<DetectionResult> results = bucket.flush();
                if (!results.isEmpty()) {
                    pushQueue.offer(new PushTask(results, bucket.task, bucket.sceneCode));
                }
            }
        });
    }
    
    private void pushLoop() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                PushTask task = pushQueue.poll(1, TimeUnit.SECONDS);
                if (task != null) {
                    executePush(task);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                log.error("Push execution error", e);
            }
        }
    }
    
    private void executePush(PushTask task) {
        try {
            // 构建推送内容
            String content = buildPushContent(task);
            
            // 调用飞书/企业微信等 API
            PushResponse response = sendToPushService(task.task.getPushRouteKey(), content);
            
            // 记录日志
            AlarmPushLog log = new AlarmPushLog();
            log.setConfigId(0L);
            log.setDetectionResultIds(JSON.toJSONString(
                task.results.stream().map(DetectionResult::getId).collect(Collectors.toList())
            ));
            log.setPushChannel("feishu");
            log.setPushTarget(task.task.getPushRouteKey());
            log.setPushContent(content);
            log.setPushStatus(response.isSuccess() ? 1 : 0);
            log.setResponseCode(response.getCode());
            log.setResponseMessage(response.getMessage());
            log.setPushTime(LocalDateTime.now());
            
            pushLogRepository.save(log);
            
            // 更新结果推送状态
            if (response.isSuccess()) {
                task.results.forEach(r -> {
                    r.setIsPushed(1);
                    r.setPushTime(LocalDateTime.now());
                });
            }
            
        } catch (Exception e) {
            log.error("Push failed for task {}", task.task.getId(), e);
        }
    }
    
    private String buildPushContent(PushTask task) {
        StringBuilder content = new StringBuilder();
        content.append("【告警通知】\n");
        content.append("相机: ").append(task.task.getCameraName()).append("\n");
        content.append("场景: ").append(task.sceneCode).append("\n");
        content.append("时间: ").append(LocalDateTime.now().format(
            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
        )).append("\n");
        content.append("检测数量: ").append(task.results.size()).append("\n");
        
        return content.toString();
    }
    
    private PushResponse sendToPushService(String routeKey, String content) {
        // 实际推送逻辑
        // 这里简化处理
        return new PushResponse(true, 200, "success");
    }
    
    private PushStrategy resolvePushStrategy(AiTask task) {
        // 简化实现
        return new PushStrategy(
            task.getPushMode() != null ? task.getPushMode() : "IMMEDIATE",
            task.getPushThresholdCount() != null ? task.getPushThresholdCount() : 1,
            task.getPushWindowSeconds() != null ? task.getPushWindowSeconds() : 60,
            task.getPushCooldownSeconds() != null ? task.getPushCooldownSeconds() : 0
        );
    }
}

@Data
@AllArgsConstructor
class PushTask {
    private List<DetectionResult> results;
    private AiTask task;
    private String sceneCode;
    
    public PushTask(DetectionResult result, AiTask task, String sceneCode) {
        this.results = Collections.singletonList(result);
        this.task = task;
        this.sceneCode = sceneCode;
    }
}

@Data
class PushBucket {
    private final AiTask task;
    private final String sceneCode;
    private final PushStrategy strategy;
    private final List<DetectionResult> results = new CopyOnWriteArrayList<>();
    private long firstAddTime;
    private long lastFlushTime;
    
    public PushBucket(AiTask task, String sceneCode, PushStrategy strategy) {
        this.task = task;
        this.sceneCode = sceneCode;
        this.strategy = strategy;
        this.firstAddTime = System.currentTimeMillis();
        this.lastFlushTime = System.currentTimeMillis();
    }
    
    public void add(DetectionResult result) {
        results.add(result);
        if (firstAddTime == 0) {
            firstAddTime = System.currentTimeMillis();
        }
    }
    
    public boolean shouldFlush(long now) {
        if (results.isEmpty()) return false;
        
        // 冷却期检查
        if (now - lastFlushTime < strategy.getCooldownSeconds() * 1000L) {
            return false;
        }
        
        String mode = strategy.getMode();
        
        if ("COUNT".equals(mode)) {
            return results.size() >= strategy.getThresholdCount();
        }
        
        if ("WINDOW".equals(mode)) {
            return (now - firstAddTime) >= strategy.getWindowSeconds() * 1000L;
        }
        
        if ("COUNT_OR_WINDOW".equals(mode)) {
            return results.size() >= strategy.getThresholdCount() ||
                   (now - firstAddTime) >= strategy.getWindowSeconds() * 1000L;
        }
        
        return false;
    }
    
    public List<DetectionResult> flush() {
        List<DetectionResult> flushed = new ArrayList<>(results);
        results.clear();
        firstAddTime = 0;
        lastFlushTime = System.currentTimeMillis();
        return flushed;
    }
}

@Data
@AllArgsConstructor
class PushStrategy {
    private String mode;
    private int thresholdCount;
    private int windowSeconds;
    private int cooldownSeconds;
}

@Data
@AllArgsConstructor
class PushResponse {
    private boolean success;
    private int code;
    private String message;
}

推送模式说明:

模式触发条件适用场景
IMMEDIATE每次告警立即推送高优先级告警
COUNT累计N次后推送降低推送频率
WINDOW时间窗口到期推送定期汇总
COUNT_OR_WINDOW满足任一条件兼顾实时性和聚合

17. 预览链路实现

17.1 预览设计原则

  1. 任务运行 ≠ 预览推流
    • 任务可以运行但不推预览流
    • 预览流可以按需启停
  2. 预览资源独立管理
    • 预览占用编码器资源
    • 需要单独的生命周期控制
  3. 预览地址动态生成
    • 节点切换后地址需更新
    • 存储在运行态表

17.2 预览控制服务

@Service
@Slf4j
public class PreviewControlService {
    
    @Resource
    private TaskRuntimeRepository runtimeRepository;
    
    @Resource
    private EngineNodeRepository nodeRepository;
    
    @Resource
    private AiEngineSocket engineSocket;
    
    /**
     * 开始预览
     */
    @Transactional(rollbackFor = Exception.class)
    public void startPreview(StartPreviewRequest request) {
        // 1. 查询运行态
        TaskRuntime runtime = runtimeRepository.findById(request.getTaskId())
            .orElseThrow(() -> new NotFoundException("任务未运行"));
        
        if (!"RUNNING".equals(runtime.getState())) {
            throw new BusinessException("任务未运行,无法开启预览");
        }
        
        // 2. 构建预览路径
        String previewPath = request.getPreviewPath();
        if (StringUtils.isBlank(previewPath)) {
            previewPath = "ai_task_" + request.getTaskId();
        }
        
        // 3. 获取节点IP
        EngineNode node = nodeRepository.findByNodeId(runtime.getNodeId());
        if (node == null) {
            throw new BusinessException("节点不存在");
        }
        
        // 4. 构建播放地址
        String playUrl = String.format("http://%s:8889/%s", node.getIp(), previewPath);
        
        // 5. 发送预览命令
        JSONObject cmd = new JSONObject();
        cmd.put("type", "START_PREVIEW");
        
        JSONObject data = new JSONObject();
        data.put("task_id", request.getTaskId());
        data.put("preview_path", previewPath);
        data.put("fps", request.getFps() != null ? request.getFps() : 15);
        data.put("bitrate", request.getBitrate() != null ? request.getBitrate() : "3000k");
        data.put("width", request.getWidth() != null ? request.getWidth() : 1280);
        data.put("height", request.getHeight() != null ? request.getHeight() : 720);
        data.put("draw", request.getDraw() != null ? request.getDraw() : true);
        data.put("encoder", "h264_nvenc");
        data.put("preview_mode", "RESULT_DRIVEN");
        
        cmd.put("data", data);
        
        engineSocket.sendCommand(runtime.getNodeId(), cmd);
        
        // 6. 更新运行态
        runtime.setPreviewPath(previewPath);
        runtime.setPreviewPlayUrl(playUrl);
        runtimeRepository.updateById(runtime);
        
        log.info("Preview started: task={}, path={}, url={}", 
            request.getTaskId(), previewPath, playUrl);
    }
    
    /**
     * 停止预览
     */
    @Transactional(rollbackFor = Exception.class)
    public void stopPreview(Long taskId) {
        TaskRuntime runtime = runtimeRepository.findById(taskId)
            .orElseThrow(() -> new NotFoundException("任务运行态不存在"));
        
        if (StringUtils.isBlank(runtime.getPreviewPath())) {
            log.warn("Preview not started for task {}", taskId);
            return;
        }
        
        // 发送停止命令
        JSONObject cmd = new JSONObject();
        cmd.put("type", "STOP_PREVIEW");
        
        JSONObject data = new JSONObject();
        data.put("task_id", taskId);
        cmd.put("data", data);
        
        engineSocket.sendCommand(runtime.getNodeId(), cmd);
        
        // 清除运行态
        runtime.setPreviewPath(null);
        runtime.setPreviewPlayUrl(null);
        runtimeRepository.updateById(runtime);
        
        log.info("Preview stopped: task={}", taskId);
    }
    
    /**
     * 自动开启预览
     */
    public void autoStartPreview(Long taskId, String nodeId, String previewPath) {
        try {
            StartPreviewRequest request = new StartPreviewRequest();
            request.setTaskId(taskId);
            request.setPreviewPath(previewPath);
            request.setFps(15);
            request.setBitrate("3000k");
            request.setDraw(true);
            
            startPreview(request);
        } catch (Exception e) {
            log.error("Auto start preview failed for task {}", taskId, e);
        }
    }
}

@Data
class StartPreviewRequest {
    private Long taskId;
    private String previewPath;
    private Integer fps;
    private String bitrate;
    private Integer width;
    private Integer height;
    private Boolean draw;
}

17.3 任务启停与预览联动

@Service
@Slf4j
public class AiTaskService {
    
    @Resource
    private TaskDispatchService dispatchService;
    
    @Resource
    private TaskRuntimeService runtimeService;
    
    @Resource
    private TaskSceneRuntimeService sceneRuntimeService;
    
    @Resource
    private PreviewControlService previewService;
    
    @Resource
    private AiEngineSocket engineSocket;
    
    @Transactional(rollbackFor = Exception.class)
    public void startTask(Long taskId) {
        AiTask task = taskRepository.findById(taskId)
            .orElseThrow(() -> new NotFoundException("任务不存在"));
        
        // 1. 防重复启动
        TaskRuntime runtime = runtimeService.findById(taskId);
        if (runtime != null && isAliveRunning(runtime)) {
            log.warn("Task {} already running", taskId);
            return;
        }
        
        // 2. 解析场景编码
        List<String> sceneCodes = parseSceneCodes(task.getSceneCodes());
        if (sceneCodes.isEmpty()) {
            throw new BusinessException("任务未配置场景");
        }
        
        // 3. 生成调度计划
        DispatchPlan plan = dispatchService.buildDispatchPlan(task, sceneCodes);
        
        // 4. 创建运行态
        LocalDateTime now = LocalDateTime.now();
        int restartCount = (runtime != null) ? runtime.getRestartCount() + 1 : 0;
        
        runtimeService.upsertRunning(taskId, plan.getPrimaryNodeId(), now, restartCount);
        
        // 5. 启动任务
        Set<String> startedNodes = new LinkedHashSet<>();
        
        try {
            for (Map.Entry<String, List<String>> entry : plan.getNodeToScenes().entrySet()) {
                String nodeId = entry.getKey();
                List<String> scopedScenes = entry.getValue();
                
                // 发送启动命令
                JSONObject cmd = buildStartCommand(task, scopedScenes);
                engineSocket.sendCommand(nodeId, cmd);
                startedNodes.add(nodeId);
                
                // 更新场景运行态
                for (String sceneCode : scopedScenes) {
                    sceneRuntimeService.upsertRunning(taskId, sceneCode, nodeId, now, restartCount);
                }
            }
            
            // 6. 更新任务主表
            task.setStatus(1);
            task.setLastRunTime(now);
            taskRepository.updateById(task);
            
            // 7. 自动开启预览
            String previewPath = "ai_task_" + taskId;
            previewService.autoStartPreview(taskId, plan.getPrimaryNodeId(), previewPath);
            
            log.info("Task started: id={}, nodes={}", taskId, plan.getNodeToScenes().keySet());
            
        } catch (Exception e) {
            log.error("Task start failed: {}", taskId, e);
            
            // 回滚: 停止已启动节点
            for (String nodeId : startedNodes) {
                safeStopTask(nodeId, taskId);
            }
            
            // 标记失败
            runtimeService.markError(taskId, now);
            sceneRuntimeService.markErrorByTask(taskId, now, "START_TASK failed: " + e.getMessage());
            
            task.setStatus(2);
            task.setErrorMessage(e.getMessage());
            taskRepository.updateById(task);
            
            throw e;
        }
    }
    
    @Transactional(rollbackFor = Exception.class)
    public void stopTask(Long taskId) {
        AiTask task = taskRepository.findById(taskId)
            .orElseThrow(() -> new NotFoundException("任务不存在"));
        
        // 1. 停止预览
        try {
            previewService.stopPreview(taskId);
        } catch (Exception e) {
            log.error("Stop preview failed for task {}", taskId, e);
        }
        
        // 2. 查询运行节点
        List<String> nodeIds = sceneRuntimeService.findNodeIdsByTaskId(taskId);
        
        // 3. 发送停止命令
        for (String nodeId : nodeIds) {
            try {
                JSONObject cmd = new JSONObject();
                cmd.put("type", "STOP_TASK");
                
                JSONObject data = new JSONObject();
                data.put("task_id", taskId);
                cmd.put("data", data);
                
                engineSocket.sendCommand(nodeId, cmd);
            } catch (Exception e) {
                log.error("Stop task failed on node {}", nodeId, e);
            }
        }
        
        // 4. 更新运行态
        runtimeService.markStopped(taskId, LocalDateTime.now());
        sceneRuntimeService.markStoppedByTask(taskId, LocalDateTime.now());
        
        // 5. 更新任务主表
        task.setStatus(0);
        taskRepository.updateById(task);
        
        log.info("Task stopped: id={}", taskId);
    }
    
    private boolean isAliveRunning(TaskRuntime runtime) {
        if (!"RUNNING".equals(runtime.getState())) {
            return false;
        }
        
        if (runtime.getLastKeepalive() == null) {
            return false;
        }
        
        long elapsedSeconds = Duration.between(
            runtime.getLastKeepalive(),
            LocalDateTime.now()
        ).getSeconds();
        
        return elapsedSeconds < 30; // 30秒超时
    }
    
    private JSONObject buildStartCommand(AiTask task, List<String> sceneCodes) {
        JSONObject cmd = new JSONObject();
        cmd.put("type", "START_TASK");
        
        JSONObject data = new JSONObject();
        data.put("task_id", task.getId());
        data.put("camera_id", task.getCameraId());
        data.put("scene_codes", sceneCodes);
        data.put("stream_key", task.getCameraId() + ":" + task.getStreamType());
        data.put("rtsp", task.getRtspUrl());
        data.put("confThreshold", task.getConfThreshold());
        
        // 区域
        if (StringUtils.isNotBlank(task.getRegionJson())) {
            data.put("region", JSON.parseArray(task.getRegionJson()));
        }
        
        // 参数
        JSONObject params = new JSONObject();
        params.put("input_fps", task.getInputFps());
        params.put("in_w", task.getOutputWidth());
        params.put("in_h", task.getOutputHeight());
        params.put("result_fps", task.getOutputFps());
        params.put("quality", task.getProcessQuality());
        params.put("alarm_enabled", task.getAlarmEnabled());
        params.put("snapshot_policy", task.getSnapshotPolicy());
        params.put("snapshot_quality", task.getSnapshotQuality());
        
        data.put("params", params);
        cmd.put("data", data);
        
        return cmd;
    }
    
    private void safeStopTask(String nodeId, Long taskId) {
        try {
            JSONObject cmd = new JSONObject();
            cmd.put("type", "STOP_TASK");
            
            JSONObject data = new JSONObject();
            data.put("task_id", taskId);
            cmd.put("data", data);
            
            engineSocket.sendCommand(nodeId, cmd);
        } catch (Exception e) {
            log.error("Safe stop failed: node={}, task={}", nodeId, taskId, e);
        }
    }
    
    private List<String> parseSceneCodes(String json) {
        if (StringUtils.isBlank(json)) {
            return Collections.emptyList();
        }
        
        try {
            return JSON.parseArray(json, String.class);
        } catch (Exception e) {
            log.error("Failed to parse scene codes: {}", json, e);
            return Collections.emptyList();
        }
    }
}

第四部分:运维与演进

18. 配置管理

18.1 应用配置模板

# application.yml
server:
  port: 8080

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/video_analysis?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
    username: root
    password: ${DB_PASSWORD}
    driver-class-name: com.mysql.cj.jdbc.Driver
    hikari:
      maximum-pool-size: 20
      minimum-idle: 5
      connection-timeout: 30000

# 相机配置
camera:
  onboarding:
    async-enabled: true
    sync-timeout-ms: 15000
    duplicate-check:
      by-ip-port: true
  
  onvif:
    default-port: 80
    connect-timeout-ms: 3000
    read-timeout-ms: 5000
    heartbeat-timeout-ms: 3000
    soap:
      strict-fault-check: true
      allow-capability-fallback: true
  
  stream:
    naming: "cam_{ip}_ch{channel}_{subtype}"
    subtype-rule:
      main-min-width: 1280
    retry:
      register-max-attempts: 3
      register-backoff-ms: 1000
  
  health:
    check-interval-ms: 30000
    initial-delay-ms: 10000
    offline-alarm-threshold-sec: 180

# 编解码配置
codec:
  probe:
    ffprobe-bin: ffprobe
    timeout-ms: 3000
    max-concurrency: 8
  
  transcode:
    enabled: true
    assume-h265-when-unknown: true
    ffmpeg-bin: ffmpeg
    rtsp-transport: tcp
    stimeout-us: 5000000
    probesize: 5000000
    analyzeduration: 5000000
    x264:
      preset: veryfast
      crf: 23
      gop: 50
    enable-audio: false

# MediaMTX 配置
media:
  mtx:
    api-base-url: http://127.0.0.1:9997
    rtsp-port: 8554
    webrtc-port: 8889
    api:
      path-add: /v3/config/paths/add/
      path-get: /v3/config/paths/get/
      path-list: /v3/config/paths/list
      path-delete: /v3/config/paths/delete/

# 录像配置
recording:
  default:
    retention-days: 7
    segment-duration: 10m
    record-format: fmp4
  
  index:
    fixed-delay-ms: 30000
    scan-depth: 4
    lookback-hours: 24
  
  storage:
    refresh-interval-ms: 15000
    auto-switch-enabled: true
    migrate-batch-size: 20

# 算法任务配置
ai:
  task:
    keepalive-timeout-seconds: 30
    scene-keepalive-timeout-seconds: 30
    dedup-running-seconds: 10
  
  result:
    queue:
      size: 20000
    worker:
      min: 4
      max: 16
    sample-interval-ms: 1000
    batch:
      enabled: false
      size: 200
      flush-ms: 500
  
  preview:
    auto-start: true
    default-fps: 15
    default-width: 1280
    default-height: 720
    default-bitrate: 3000k
    default-encoder: h264_nvenc
  
  push:
    queue-size: 20000
    flush-ms: 1000
    routes-file: push_routes.json
    level-policy-file: level_policy.json

# WebSocket 配置
ws:
  ai-engine:
    endpoint: /ws/ai-engine
  web-notify:
    endpoint: /ws/web-notify

18.2 MediaMTX 配置模板

# mediamtx.yml
###############################################
# Global settings

# API
api: yes
apiAddress: :9997

# Metrics
metrics: yes
metricsAddress: :9998

###############################################
# RTSP server

rtsp: yes
rtspAddress: :8554
protocols: [tcp]
encryption: "no"
rtspAddress: :8554

###############################################
# WebRTC server

webrtc: yes
webrtcAddress: :8889
webrtcICEServers:
  - urls: [stun:stun.l.google.com:19302]

###############################################
# HLS server

hls: yes
hlsAddress: :8888
hlsAlwaysRemux: no
hlsVariant: lowLatency
hlsSegmentCount: 7
hlsSegmentDuration: 1s
hlsPartDuration: 200ms

###############################################
# Recording

record: no
recordPath: /recordings/%path/%Y-%m-%d/%H-%M-%S
recordFormat: fmp4
recordPartDuration: 1s
recordSegmentDuration: 10m
recordDeleteAfter: 7d

###############################################
# Path defaults

paths:
  all:
    sourceOnDemand: yes
    sourceOnDemandStartTimeout: 10s
    sourceOnDemandCloseAfter: 10s
    runOnDemandStartTimeout: 10s
    runOnDemandCloseAfter: 10s

18.3 Docker Compose 部署模板

version: "3.8"

services:
  mysql:
    image: mysql:8.0
    container_name: video-mysql
    restart: always
    environment:
      MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD}
      MYSQL_DATABASE: video_analysis
      TZ: Asia/Shanghai
    ports:
      - "3306:3306"
    volumes:
      - ./data/mysql:/var/lib/mysql
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    command: --default-authentication-plugin=mysql_native_password

  mediamtx:
    image: bluenviron/mediamtx:latest
    container_name: video-mediamtx
    restart: always
    ports:
      - "8554:8554"   # RTSP
      - "8888:8888"   # HLS
      - "8889:8889"   # WebRTC
      - "9997:9997"   # API
      - "9998:9998"   # Metrics
    volumes:
      - ./mediamtx.yml:/mediamtx.yml
      - ./data/recordings:/recordings
    environment:
      TZ: Asia/Shanghai

  app:
    image: video-analysis-platform:latest
    container_name: video-app
    restart: always
    depends_on:
      - mysql
      - mediamtx
    ports:
      - "8080:8080"
    environment:
      SPRING_DATASOURCE_URL: jdbc:mysql://mysql:3306/video_analysis?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
      SPRING_DATASOURCE_USERNAME: root
      SPRING_DATASOURCE_PASSWORD: ${MYSQL_ROOT_PASSWORD}
      MEDIA_MTX_API_BASE_URL: http://mediamtx:9997
      TZ: Asia/Shanghai
    volumes:
      - ./data/snapshots:/app/snapshots
      - ./logs:/app/logs

  nginx:
    image: nginx:alpine
    container_name: video-nginx
    restart: always
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./data/recordings:/usr/share/nginx/html/recordings
      - ./data/snapshots:/usr/share/nginx/html/snapshots
    depends_on:
      - app

19. 可观测性体系

19.1 核心指标定义

相机接入层:

camera_onboarding_total               # 接入总数
camera_onboarding_success_total       # 接入成功数
camera_onboarding_duration_seconds    # 接入耗时
camera_online_count                   # 在线设备数
camera_offline_count                  # 离线设备数
camera_offline_event_duration_seconds # 离线时长

流媒体层:

stream_path_total                     # 路径总数
stream_path_register_success_rate     # 注册成功率
stream_transcode_count                # 转码路数
media_gateway_request_duration_seconds # API 调用耗时

录像层:

recording_plan_active_count           # 活跃计划数
recording_file_index_lag_seconds      # 索引延迟
storage_volume_usage_percent          # 存储使用率
storage_volume_full_count             # 满盘卷数

算法层:

ai_task_running_count                 # 运行中任务数
ai_task_error_count                   # 异常任务数
ai_engine_online_nodes                # 在线节点数
ai_result_queue_depth                 # 结果队列深度
ai_result_drop_total                  # 丢弃结果数
ai_alarm_push_success_rate            # 推送成功率

19.2 Prometheus 指标暴露

@Component
public class MetricsCollector {
    
    private final Counter cameraOnboardingTotal;
    private final Counter cameraOnboardingSuccess;
    private final Histogram cameraOnboardingDuration;
    private final Gauge cameraOnlineCount;
    private final Gauge cameraOfflineCount;
    
    public MetricsCollector(MeterRegistry registry) {
        this.cameraOnboardingTotal = Counter.builder("camera_onboarding_total")
            .description("Total camera onboarding attempts")
            .register(registry);
        
        this.cameraOnboardingSuccess = Counter.builder("camera_onboarding_success_total")
            .description("Successful camera onboardings")
            .register(registry);
        
        this.cameraOnboardingDuration = Histogram.builder("camera_onboarding_duration_seconds")
            .description("Camera onboarding duration")
            .register(registry);
        
        this.cameraOnlineCount = Gauge.builder("camera_online_count", this::getOnlineCount)
            .description("Online camera count")
            .register(registry);
        
        this.cameraOfflineCount = Gauge.builder("camera_offline_count", this::getOfflineCount)
            .description("Offline camera count")
            .register(registry);
    }
    
    public void recordOnboardingAttempt() {
        cameraOnboardingTotal.increment();
    }
    
    public void recordOnboardingSuccess(long durationMs) {
        cameraOnboardingSuccess.increment();
        cameraOnboardingDuration.record(durationMs / 1000.0);
    }
    
    private double getOnlineCount() {
        return cameraRepository.countByStatus(1);
    }
    
    private double getOfflineCount() {
        return cameraRepository.countByStatus(0);
    }
}

19.3 告警规则模板

# prometheus-alerts.yml
groups:
  - name: camera_alerts
    interval: 30s
    rules:
      - alert: CameraOnboardingFailureRateHigh
        expr: |
          (
            rate(camera_onboarding_total[5m]) - 
            rate(camera_onboarding_success_total[5m])
          ) / rate(camera_onboarding_total[5m]) > 0.05
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "相机接入失败率过高"
          description: "过去 5 分钟接入失败率超过 5%"
      
      - alert: CameraOfflineCountHigh
        expr: camera_offline_count > 10
        for: 3m
        labels:
          severity: critical
        annotations:
          summary: "离线设备数量过多"
          description: "当前离线设备数: {{ $value }}"
  
  - name: storage_alerts
    interval: 30s
    rules:
      - alert: StorageVolumeAlmostFull
        expr: storage_volume_usage_percent > 90
        for: 3m
        labels:
          severity: warning
        annotations:
          summary: "存储卷即将满盘"
          description: "卷 {{ $labels.volume_id }} 使用率: {{ $value }}%"
      
      - alert: RecordingIndexLagHigh
        expr: recording_file_index_lag_seconds > 120
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "录像索引延迟过高"
          description: "索引延迟: {{ $value }} 秒"
  
  - name: ai_alerts
    interval: 30s
    rules:
      - alert: AiResultQueueFull
        expr: ai_result_queue_depth > 18000
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "结果队列即将满"
          description: "队列深度: {{ $value }}"
      
      - alert: AlarmPushFailureRateHigh
        expr: ai_alarm_push_success_rate < 0.95
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "告警推送成功率过低"
          description: "成功率: {{ $value }}"

19.4 日志规范

日志级别使用:

  • ERROR: 系统异常,需要人工介入
  • WARN: 降级或重试成功的场景
  • INFO: 关键业务事件 (接入成功、任务启动等)
  • DEBUG: 详细调试信息

日志格式:

[时间] [级别] [线程] [类名] - [业务标识] 消息内容

示例:

2026-02-13 14:30:00.123 INFO  [main] c.e.CameraOnboardingService - [camera:1001] Device onboarded successfully: ip=10.0.1.20, manufacturer=Hikvision
2026-02-13 14:30:05.456 WARN  [task-1] c.e.MediaPathService - [path:cam_10_0_1_20_ch1_main] MediaMTX registration failed, retrying (1/3)
2026-02-13 14:30:10.789 ERROR [task-2] c.e.OnvifManager - [camera:1002] ONVIF connection timeout: ip=10.0.1.21, timeout=3000ms

20. 典型问题与解决方案

20.1 设备接入层

问题 1: ONVIF 连接超时

现象:

OnvifException: Connection timeout after 3000ms

排查步骤:

  1. ping 设备 IP 确认网络连通性
  2. telnet ip 80 确认端口开放
  3. 浏览器访问 http://ip/onvif/device_service 验证服务
  4. 检查防火墙规则

解决方案:

  • 增加超时时间至 5-10 秒
  • 配置网络路由
  • 使用设备 Web 界面开启 ONVIF 服务

问题 2: WS-Security 认证失败

现象:

SOAP Fault: The security token could not be authenticated

原因: 服务器与设备时间误差 > 5 秒

解决方案:

# 服务器端同步 NTP
ntpdate ntp.aliyun.com

# 设备端配置 NTP
设备 Web 界面 -> 系统配置 -> 时间设置 -> NTP 服务器: ntp.aliyun.com

问题 3: 在线但无画面

现象: camera_status=1, 但 RTSP 拉流失败

原因: 控制面可达但媒体面不可达

解决方案:

// 增加媒体可用性检查
public boolean checkMediaAvailable(String rtspUrl) {
    try {
        ProcessBuilder pb = new ProcessBuilder(
            "ffmpeg", "-i", rtspUrl, 
            "-t", "1", "-f", "null", "-"
        );
        Process p = pb.start();
        return p.waitFor(5, TimeUnit.SECONDS) && p.exitValue() == 0;
    } catch (Exception e) {
        return false;
    }
}

20.2 录像层

问题 4: 计划显示已开启但无文件

排查步骤:

  1. 检查 recording_plan.status 是否为 1
  2. 检查 MediaMTX 路径配置: GET /v3/config/paths/get/{pathName}
  3. 检查存储卷状态: storage_volume.statusis_full
  4. 检查源 RTSP 可拉流性

解决方案:

// 增加健康检查
@Scheduled(fixedDelay = 60000)
public void checkRecordingHealth() {
    List<RecordingPlan> plans = planRepository.findByStatus(1);
    
    for (RecordingPlan plan : plans) {
        // 检查最近 5 分钟是否有新文件
        long recentFileCount = fileRepository.countByStreamNameAndTimeSince(
            plan.getStreamName(),
            LocalDateTime.now().minusMinutes(5)
        );
        
        if (recentFileCount == 0) {
            log.warn("No recent files for plan {}, reapplying config", plan.getId());
            planService.applyPlan(plan.toRequest());
        }
    }
}

问题 5: 回放查询有空洞

现象: 物理文件存在,数据库查不到

原因: 索引任务只扫描最近窗口,漏扫历史补写文件

解决方案:

// 增加全量补扫任务
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨 2 
public void fullScan() {
    List<StorageVolume> volumes = volumeRepository.findByStatus(1);
    
    for (StorageVolume volume : volumes) {
        scanVolumeFullRange(volume);
    }
}

20.3 算法层

问题 6: 任务显示运行中,但算法端无负载

排查步骤:

  1. 检查 ai_task_runtime.state 是否真的为 RUNNING
  2. 检查节点 WebSocket 会话是否在线
  3. 检查算法端日志是否收到 START_TASK 命令
  4. 检查 TASK_KEEPALIVE 心跳上报

解决方案:

// 增加守护任务
@Scheduled(fixedDelay = 10000)
public void watchdogTask() {
    List<TaskRuntime> runtimes = runtimeRepository.findByState("RUNNING");
    
    for (TaskRuntime runtime : runtimes) {
        long elapsed = Duration.between(
            runtime.getLastKeepalive(),
            LocalDateTime.now()
        ).getSeconds();
        
        if (elapsed > 30) {
            log.warn("Task {} keepalive timeout, marking ERROR", runtime.getTaskId());
            runtime.setState("ERROR");
            runtimeRepository.updateById(runtime);
            
            // 通知节点断连处理
            onNodeDisconnected(runtime.getNodeId());
        }
    }
}

问题 7: 结果队列堆积

现象: ai_result_queue_depth 持续增长,内存告警

原因: 处理速度跟不上生产速度

解决方案:

// 1. 启用非告警抽样
@Value("${ai.result.sample-interval-ms:1000}")
private long sampleIntervalMs;

// 2. 启用批量入库
@Value("${ai.result.batch.enabled:false}")
private boolean batchEnabled;

@Value("${ai.result.batch.size:200}")
private int batchSize;

// 3. 增加处理线程
int coreThreads = Runtime.getRuntime().availableProcessors();
this.processExecutor = new ThreadPoolExecutor(
    coreThreads * 2,  // 增加并发
    coreThreads * 4,
    60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(1000)
);

21. FAQ 与最佳实践

21.1 相机接入

Q: 是否必须支持 ONVIF?

A: 不是必须,但强烈建议。ONVIF 提供标准化的设备管理和流获取方式。不支持 ONVIF 的设备可以通过厂商兜底模式接入,但需要手动维护 RTSP 模板。

Q: 如何处理厂商差异?

A: 分层处理:

  1. 协议层做最大兼容 (Fault 解析、能力降级)
  2. 业务层提供厂商兜底开关
  3. 配置层维护厂商 RTSP 模板库

Q: 主码流和子码流如何选择?

A: 建议策略:

  • 录像: 主码流
  • 实时预览: 子码流
  • 算法分析: 根据精度要求选择

21.2 流媒体

Q: 为什么推荐 MediaMTX 而不是 ZLMediaKit?

A: 两者都是优秀的开源方案,选择 MediaMTX 的原因:

  1. 单一二进制,部署简单
  2. RESTful API 设计现代化
  3. WebRTC 支持原生
  4. 社区活跃,文档完善

Q: H.265 一定要转码吗?

A: 不是。建议策略:

  • 浏览器实时预览: 转 H.264
  • 录像存储: 保留 H.265 (节省空间)
  • 算法分析: 视算法输入要求而定

Q: 转码会增加多少延迟?

A: 软件转码: 1-3 秒 硬件转码 (NVENC): 500ms-1 秒

21.3 录像

Q: 切片时长怎么选?

A: 综合考虑:

  • 1-2 分钟: 文件数量大,IO 压力高
  • 5-10 分钟: 推荐,综合最优
  • 30-60 分钟: 文件少,但回放拖拽粗糙

Q: FMP4 vs TS 格式?

A: FMP4 优势:

  • 现代浏览器原生支持
  • 元数据结构清晰
  • 后期处理友好

TS 优势:

  • 兼容性更广 (老设备/老浏览器)
  • 容错性强 (部分损坏不影响播放)

Q: 如何避免磁盘满导致录像中断?

A: 多层防护:

  1. 水位线机制自动切卷
  2. 定时清理过期文件
  3. 磁盘使用率告警
  4. 降级策略 (关闭非关键路径录像)

21.4 算法

Q: 一个任务可以配置多少个场景?

A: 理论无上限,实际建议:

  • 单节点模式: 3-5 个
  • 拆分模式: 5-10 个

过多场景会导致:

  • 推理开销增加
  • 结果处理复杂
  • 告警频率过高

Q: 任务失败如何重试?

A: 分级重试:

  1. 瞬时失败 (网络抖动): 自动重启,计数器 +1
  2. 持续失败 (设备离线): 标记 ERROR,等待设备恢复
  3. 配置错误: 不重试,需要人工修复

Q: 如何控制告警推送频率?

A: 使用推送模式:

  • 高优先级: IMMEDIATE
  • 中优先级: COUNT=5, WINDOW=600s
  • 低优先级: WINDOW=3600s

并配合冷却期避免风暴。


22. 性能基准与压测

22.1 接入性能基准

测试环境:

  • CPU: 8C16G
  • 网络: 千兆局域网
  • 设备: 海康威视 DS-2CD2142FWD-I

测试结果:

并发数接入成功率P50 耗时P95 耗时P99 耗时
10100%850ms1200ms1500ms
5099.2%1100ms1800ms2400ms
10097.5%1450ms2500ms3200ms
20094.8%2100ms3800ms4800ms

瓶颈分析:

  • 网络 I/O 是主要瓶颈
  • ONVIF SOAP 解析占用 15-20% CPU
  • FFprobe 探测占用 30-40% CPU

优化建议:

  • 接入任务异步化
  • FFprobe 并发限制 (信号量)
  • 设备信息缓存

22.2 录像性能基准

测试环境:

  • CPU: 16C32G
  • 存储: NVMe SSD
  • 路数: 500 路并发录像

测试结果:

码率切片时长磁盘写入CPU 使用内存使用
2Mbps10m125MB/s18%4.2GB
4Mbps10m250MB/s22%5.8GB
6Mbps10m375MB/s28%7.5GB

索引性能:

  • 扫描速度: 10000 文件/秒
  • 索引延迟: P95 < 30 秒

22.3 算法性能基准

测试环境:

  • GPU: NVIDIA RTX 3080
  • 算法: YOLOv8m
  • 输入: 1280x720 @ 15fps

单节点容量:

场景数/任务并发任务数GPU 使用推理延迟
15068%45ms
23572%52ms
32578%61ms

结果处理:

  • 队列吞吐: 20000 条/秒
  • 入库延迟: P95 < 200ms
  • 告警推送: P95 < 500ms

23. 架构演进路线

23.1 当前架构 (v1.0)

特点:

  • 单体应用
  • MySQL 单实例
  • 本地文件存储
  • 单节点 MediaMTX

适用场景:

  • 设备数 < 1000
  • 并发流 < 200
  • 算法任务 < 100

23.2 中期演进 (v2.0)

目标: 支撑 5000 设备、1000 并发流

改进点:

  1. 服务拆分
   单体应用  接入服务 + 录像服务 + 算法服务
  1. 存储优化
   本地存储  对象存储 (MinIO/OSS)
   分层存储: 热数据 SSD + 冷数据 HDD
  1. MediaMTX 集群
   单节点  多节点 + LVS 负载均衡
   路径哈希分片
  1. 数据库读写分离
   主库: 写入
   从库: 查询 + 统计

23.3 长期演进 (v3.0)

目标: 支撑 10万+ 设备、万级并发流

改进点:

  1. 微服务化
   按业务域拆分:
   - 设备管理服务
   - 流媒体服务
   - 录像服务
   - 算法服务
   - 告警服务
   
   服务间通信: gRPC + MQ
  1. 分布式存储
   录像: Ceph 集群
   元数据: TiDB
   缓存: Redis 集群
  1. 边缘计算
   中心节点 + 边缘节点
   边缘: 实时分析 + 本地录像
   中心: 长期存储 + 全局调度
  1. 云原生改造
   容器化: Kubernetes
   服务网格: Istio
   可观测: Prometheus + Grafana + Jaeger

总结

本文从零开始,构建了一个完整的企业级视频智能分析平台,覆盖:

  1. 相机接入: ONVIF 协议深度实践、流路径治理、编码决策
  2. 录像存储: 存储卷管理、文件索引、回放检索
  3. 智能分析: 多场景任务模型、节点调度、结果处理、告警推送
  4. 运维体系: 配置管理、可观测性、问题排查、性能优化

核心设计原则:

  • 分层解耦: 控制面/媒体面、策略/执行、实时/批量
  • 统一抽象: 路径模型、场景模型、状态模型
  • 容错优先: 异步化、重试、降级、回滚
  • 可观测: 日志、指标、链路、告警

技术栈总结:

层次技术选型
协议ONVIF (SOAP/WS-Security)
流媒体MediaMTX
编解码FFmpeg/FFprobe
存储MySQL + 文件系统
通信WebSocket + RESTful
监控Prometheus + Grafana

贡献者

  • flycodeuflycodeu

公告板

2025-03-04正式迁移知识库到此项目