Python+AI实战:从零构建智能图像识别系统(三)

简介: 教程来源 https://yyvgt.cn/category/jiuwenhua.html 本文构建了完整的智能商品分类系统,涵盖数据处理、EfficientNet模型训练(支持AMP、EMA、早停)、ONNX/TensorRT优化、FastAPI部署、Celery异步任务、Prometheus监控、A/B测试及主动学习等全链路AI工程实践。

第五部分:模型训练

5.1 训练器

# src/train/trainer.py
"""
模型训练器

封装训练循环、验证循环、模型保存等逻辑
"""

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import CosineAnnealingLR, ReduceLROnPlateau
from tqdm import tqdm
from pathlib import Path
from typing import Dict, Optional, Callable
import json
import time

from src.train.metrics import MetricsTracker
from src.utils.logger import logger
from src.utils.metrics import PrometheusMetrics


class ModelTrainer:
    """
    模型训练器

    特性:
    1. 支持自动混合精度(AMP)加速
    2. 支持梯度累积(模拟更大batch size)
    3. 支持模型EMA(指数移动平均)
    4. 支持早停(Early Stopping)
    5. 自动保存最佳模型
    """

    def __init__(
        self,
        model: nn.Module,
        train_loader: DataLoader,
        val_loader: DataLoader,
        criterion: nn.Module,
        optimizer: optim.Optimizer,
        scheduler=None,
        device: str = 'cuda',
        config: Dict = None
    ):
        self.model = model.to(device)
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.criterion = criterion
        self.optimizer = optimizer
        self.scheduler = scheduler
        self.device = device
        self.config = config or {}

        # 训练配置
        self.epochs = self.config.get('epochs', 100)
        self.gradient_accumulation_steps = self.config.get('gradient_accumulation_steps', 1)
        self.use_amp = self.config.get('use_amp', False) and device == 'cuda'
        self.use_ema = self.config.get('use_ema', False)
        self.ema_decay = self.config.get('ema_decay', 0.999)

        # 早停配置
        self.patience = self.config.get('patience', 10)
        self.min_delta = self.config.get('min_delta', 1e-4)

        # 指标追踪
        self.metrics_tracker = MetricsTracker()

        # 自动混合精度
        if self.use_amp:
            self.scaler = torch.cuda.amp.GradScaler()

        # EMA模型
        if self.use_ema:
            self.ema_model = self._create_ema_model()

        # 最佳模型状态
        self.best_val_loss = float('inf')
        self.best_val_acc = 0.0
        self.epochs_no_improve = 0

        # 训练历史
        self.history = {
            'train_loss': [],
            'train_acc': [],
            'val_loss': [],
            'val_acc': [],
            'lr': []
        }

    def _create_ema_model(self):
        """创建EMA模型"""
        ema_model = type(self.model)(**self.model.config)
        ema_model.load_state_dict(self.model.state_dict())
        ema_model.to(self.device)
        return ema_model

    def _update_ema(self):
        """更新EMA模型参数"""
        with torch.no_grad():
            for ema_param, param in zip(self.ema_model.parameters(), self.model.parameters()):
                ema_param.data.mul_(self.ema_decay).add_(param.data, alpha=1 - self.ema_decay)

    def train_epoch(self) -> Dict[str, float]:
        """训练一个epoch"""
        self.model.train()
        total_loss = 0.0
        correct = 0
        total = 0

        pbar = tqdm(self.train_loader, desc="Training")
        for batch_idx, (images, labels) in enumerate(pbar):
            images = images.to(self.device)
            labels = labels.to(self.device)

            # 前向传播
            if self.use_amp:
                with torch.cuda.amp.autocast():
                    outputs = self.model(images)
                    loss = self.criterion(outputs, labels)
            else:
                outputs = self.model(images)
                loss = self.criterion(outputs, labels)

            # 梯度累积
            loss = loss / self.gradient_accumulation_steps

            # 反向传播
            if self.use_amp:
                self.scaler.scale(loss).backward()
            else:
                loss.backward()

            # 更新参数
            if (batch_idx + 1) % self.gradient_accumulation_steps == 0:
                if self.use_amp:
                    self.scaler.step(self.optimizer)
                    self.scaler.update()
                else:
                    self.optimizer.step()

                self.optimizer.zero_grad()

                # 更新EMA
                if self.use_ema:
                    self._update_ema()

            # 统计
            total_loss += loss.item() * self.gradient_accumulation_steps
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

            # 更新进度条
            pbar.set_postfix({
                'loss': total_loss / (batch_idx + 1),
                'acc': 100. * correct / total
            })

        return {
            'loss': total_loss / len(self.train_loader),
            'acc': 100. * correct / total
        }

    def validate(self) -> Dict[str, float]:
        """验证"""
        self.model.eval()
        total_loss = 0.0
        correct = 0
        total = 0

        with torch.no_grad():
            for images, labels in tqdm(self.val_loader, desc="Validation"):
                images = images.to(self.device)
                labels = labels.to(self.device)

                outputs = self.model(images)
                loss = self.criterion(outputs, labels)

                total_loss += loss.item()
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()

        return {
            'loss': total_loss / len(self.val_loader),
            'acc': 100. * correct / total
        }

    def save_checkpoint(self, epoch: int, is_best: bool = False):
        """保存检查点"""
        checkpoint = {
            'epoch': epoch,
            'model_state_dict': self.model.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'history': self.history,
            'config': self.config
        }

        if self.use_ema:
            checkpoint['ema_model_state_dict'] = self.ema_model.state_dict()

        # 定期保存
        if epoch % self.config.get('save_interval', 10) == 0:
            torch.save(checkpoint, f'checkpoints/epoch_{epoch}.pth')

        # 保存最佳模型
        if is_best:
            torch.save(checkpoint, 'checkpoints/best_model.pth')
            logger.info(f"Best model saved at epoch {epoch}")

    def train(self):
        """完整训练流程"""
        logger.info("Starting training...")
        start_time = time.time()

        for epoch in range(1, self.epochs + 1):
            epoch_start = time.time()

            # 训练
            train_metrics = self.train_epoch()

            # 验证
            val_metrics = self.validate()

            # 更新学习率
            if self.scheduler:
                if isinstance(self.scheduler, ReduceLROnPlateau):
                    self.scheduler.step(val_metrics['loss'])
                else:
                    self.scheduler.step()

            # 记录历史
            current_lr = self.optimizer.param_groups[0]['lr']
            self.history['train_loss'].append(train_metrics['loss'])
            self.history['train_acc'].append(train_metrics['acc'])
            self.history['val_loss'].append(val_metrics['loss'])
            self.history['val_acc'].append(val_metrics['acc'])
            self.history['lr'].append(current_lr)

            # 检查是否最佳
            is_best = val_metrics['acc'] > self.best_val_acc
            if is_best:
                self.best_val_acc = val_metrics['acc']
                self.epochs_no_improve = 0
            else:
                self.epochs_no_improve += 1

            # 保存检查点
            self.save_checkpoint(epoch, is_best)

            # 早停
            if self.epochs_no_improve >= self.patience:
                logger.info(f"Early stopping at epoch {epoch}")
                break

            # 打印进度
            epoch_time = time.time() - epoch_start
            logger.info(
                f"Epoch {epoch}/{self.epochs} | "
                f"Train Loss: {train_metrics['loss']:.4f} | "
                f"Train Acc: {train_metrics['acc']:.2f}% | "
                f"Val Loss: {val_metrics['loss']:.4f} | "
                f"Val Acc: {val_metrics['acc']:.2f}% | "
                f"LR: {current_lr:.2e} | "
                f"Time: {epoch_time:.1f}s"
            )

        total_time = time.time() - start_time
        logger.info(f"Training completed in {total_time/60:.1f} minutes")
        logger.info(f"Best validation accuracy: {self.best_val_acc:.2f}%")

        return self.history

    def get_learning_rate(self):
        """获取当前学习率"""
        return self.optimizer.param_groups[0]['lr']

5.2 训练脚本

# scripts/train.py
"""
模型训练脚本

使用方式:
    python scripts/train.py --config config/model_config.yaml
"""

import argparse
import yaml
from pathlib import Path
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import CosineAnnealingLR

from src.data.dataset import ProductDataset
from src.models.efficientnet import EfficientNetClassifier
from src.train.trainer import ModelTrainer
from src.utils.logger import logger


def parse_args():
    parser = argparse.ArgumentParser(description='Train product classifier')
    parser.add_argument('--config', type=str, default='config/model_config.yaml',
                       help='Path to config file')
    parser.add_argument('--resume', type=str, default=None,
                       help='Path to checkpoint to resume from')
    return parser.parse_args()


def load_config(config_path: str) -> dict:
    with open(config_path, 'r') as f:
        return yaml.safe_load(f)


def main():
    args = parse_args()
    config = load_config(args.config)

    # 设置设备
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    logger.info(f"Using device: {device}")

    # 数据加载
    train_dataset = ProductDataset(
        data_dir=Path(config['data']['data_dir']),
        mode='train',
        image_size=config['data']['image_size']
    )
    val_dataset = ProductDataset(
        data_dir=Path(config['data']['data_dir']),
        mode='val',
        image_size=config['data']['image_size']
    )

    train_loader = DataLoader(
        train_dataset,
        batch_size=config['training']['batch_size'],
        shuffle=True,
        num_workers=config['data']['num_workers'],
        pin_memory=True
    )
    val_loader = DataLoader(
        val_dataset,
        batch_size=config['training']['batch_size'],
        shuffle=False,
        num_workers=config['data']['num_workers'],
        pin_memory=True
    )

    # 模型
    model = EfficientNetClassifier(
        num_classes=train_dataset.num_classes,
        model_name=config['model']['name'],
        pretrained=config['model']['pretrained'],
        dropout_rate=config['model']['dropout_rate']
    )

    # 损失函数(支持类别权重)
    if config['training']['use_class_weights']:
        class_weights = train_dataset.get_class_weights().to(device)
        criterion = nn.CrossEntropyLoss(weight=class_weights)
    else:
        criterion = nn.CrossEntropyLoss()

    # 优化器
    optimizer = optim.AdamW(
        model.parameters(),
        lr=config['training']['learning_rate'],
        weight_decay=config['training']['weight_decay']
    )

    # 学习率调度器
    scheduler = CosineAnnealingLR(
        optimizer,
        T_max=config['training']['epochs'],
        eta_min=config['training']['min_lr']
    )

    # 训练器
    trainer = ModelTrainer(
        model=model,
        train_loader=train_loader,
        val_loader=val_loader,
        criterion=criterion,
        optimizer=optimizer,
        scheduler=scheduler,
        device=device,
        config=config['training']
    )

    # 恢复训练
    if args.resume:
        checkpoint = torch.load(args.resume, map_location=device)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        start_epoch = checkpoint['epoch'] + 1
        logger.info(f"Resumed from epoch {start_epoch}")

    # 开始训练
    history = trainer.train()

    # 保存类别映射
    train_dataset.save_class_mapping(Path('models/labels.json'))

    logger.info("Training completed!")


if __name__ == '__main__':
    main()

第六部分:模型优化与部署

6.1 ONNX导出

# scripts/export_onnx.py
"""
导出ONNX模型

ONNX (Open Neural Network Exchange) 是开放的深度学习模型交换格式,
可以在不同框架之间迁移模型,并且可以使用ONNX Runtime加速推理。
"""

import torch
import onnx
import onnxruntime
import numpy as np
from pathlib import Path

from src.models.efficientnet import EfficientNetClassifier


def export_to_onnx(
    checkpoint_path: str,
    output_path: str,
    input_size: int = 224,
    opset_version: int = 14
):
    """
    导出PyTorch模型为ONNX格式

    Args:
        checkpoint_path: PyTorch模型检查点路径
        output_path: ONNX模型输出路径
        input_size: 输入图片尺寸
        opset_version: ONNX算子集版本
    """
    # 加载模型
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    checkpoint = torch.load(checkpoint_path, map_location=device)
    num_classes = checkpoint['model_state_dict']['backbone.classifier.1.weight'].shape[0]

    model = EfficientNetClassifier(num_classes=num_classes)
    model.load_state_dict(checkpoint['model_state_dict'])
    model.eval()

    # 创建示例输入
    dummy_input = torch.randn(1, 3, input_size, input_size).to(device)

    # 导出ONNX
    torch.onnx.export(
        model,
        dummy_input,
        output_path,
        export_params=True,
        opset_version=opset_version,
        do_constant_folding=True,
        input_names=['input'],
        output_names=['output'],
        dynamic_axes={
            'input': {0: 'batch_size'},
            'output': {0: 'batch_size'}
        }
    )

    print(f"ONNX model exported to {output_path}")

    # 验证ONNX模型
    onnx_model = onnx.load(output_path)
    onnx.checker.check_model(onnx_model)
    print("ONNX model validation passed")

    # 测试推理速度
    test_onnx_inference(output_path, dummy_input.cpu().numpy())


def test_onnx_inference(onnx_path: str, input_data: np.ndarray):
    """
    测试ONNX Runtime推理

    ONNX Runtime比PyTorch原生推理快2-3倍
    """
    import time

    # 创建ONNX Runtime会话
    sess_options = onnxruntime.SessionOptions()
    sess_options.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL

    session = onnxruntime.InferenceSession(onnx_path, sess_options)

    # 预热
    for _ in range(10):
        session.run(['output'], {'input': input_data})

    # 性能测试
    n_iterations = 100
    start = time.time()
    for _ in range(n_iterations):
        session.run(['output'], {'input': input_data})
    elapsed = time.time() - start

    print(f"ONNX Runtime inference: {elapsed / n_iterations * 1000:.2f} ms per image")


def export_to_tensorrt(onnx_path: str, output_path: str, fp16: bool = False):
    """
    导出TensorRT引擎

    TensorRT是NVIDIA的高性能推理优化器,
    可以进一步加速GPU推理。
    """
    import tensorrt as trt

    logger = trt.Logger(trt.Logger.INFO)
    builder = trt.Builder(logger)
    network = builder.create_network(
        1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)
    )

    parser = trt.OnnxParser(network, logger)

    with open(onnx_path, 'rb') as f:
        if not parser.parse(f.read()):
            for error in range(parser.num_errors):
                print(parser.get_error(error))
            raise RuntimeError("Failed to parse ONNX file")

    config = builder.create_builder_config()
    config.max_workspace_size = 1 << 30  # 1GB

    if fp16 and builder.platform_has_fast_fp16:
        config.set_flag(trt.BuilderFlag.FP16)

    engine = builder.build_engine(network, config)

    with open(output_path, 'wb') as f:
        f.write(engine.serialize())

    print(f"TensorRT engine saved to {output_path}")


if __name__ == '__main__':
    export_to_onnx(
        checkpoint_path='checkpoints/best_model.pth',
        output_path='models/best_model.onnx'
    )

6.2 推理服务

# src/inference/predictor.py
"""
推理预测器

封装模型加载和推理逻辑,支持多种后端:
- PyTorch原生
- ONNX Runtime
- TensorRT
"""

import numpy as np
import torch
import torch.nn.functional as F
from PIL import Image
from torchvision import transforms
from pathlib import Path
import json
from typing import List, Dict, Tuple, Union
import time

from src.utils.logger import logger


class Predictor:
    """
    推理预测器

    支持多种推理后端,自动选择最优方案
    """

    def __init__(
        self,
        model_path: Union[str, Path],
        labels_path: Union[str, Path],
        device: str = 'cuda',
        backend: str = 'auto'  # auto/pytorch/onnx/tensorrt
    ):
        """
        初始化预测器

        Args:
            model_path: 模型文件路径(.pth/.onnx/.engine)
            labels_path: 标签映射文件路径
            device: 设备(cuda/cpu)
            backend: 推理后端
        """
        self.device = device
        self.labels = self._load_labels(labels_path)
        self.idx_to_label = {v: k for k, v in self.labels.items()}

        # 选择后端
        if backend == 'auto':
            backend = self._select_backend(model_path)

        # 加载模型
        if backend == 'pytorch':
            self.model = self._load_pytorch(model_path)
        elif backend == 'onnx':
            self.model = self._load_onnx(model_path)
        elif backend == 'tensorrt':
            self.model = self._load_tensorrt(model_path)
        else:
            raise ValueError(f"Unsupported backend: {backend}")

        self.backend = backend
        logger.info(f"Predictor initialized with backend: {backend}")

    def _load_labels(self, labels_path: Union[str, Path]) -> Dict[str, int]:
        """加载标签映射"""
        with open(labels_path, 'r') as f:
            return json.load(f)

    def _select_backend(self, model_path: Union[str, Path]) -> str:
        """自动选择推理后端"""
        model_path = str(model_path)

        if model_path.endswith('.engine'):
            return 'tensorrt'
        elif model_path.endswith('.onnx'):
            return 'onnx'
        else:
            return 'pytorch'

    def _load_pytorch(self, model_path: str):
        """加载PyTorch模型"""
        import torch
        from src.models.efficientnet import EfficientNetClassifier

        checkpoint = torch.load(model_path, map_location=self.device)
        num_classes = len(self.labels)

        model = EfficientNetClassifier(num_classes=num_classes)
        model.load_state_dict(checkpoint['model_state_dict'])
        model.eval()
        model.to(self.device)

        return model

    def _load_onnx(self, model_path: str):
        """加载ONNX模型"""
        import onnxruntime as ort

        sess_options = ort.SessionOptions()
        sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL

        providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] if self.device == 'cuda' else ['CPUExecutionProvider']

        session = ort.InferenceSession(model_path, sess_options, providers=providers)
        return session

    def _load_tensorrt(self, model_path: str):
        """加载TensorRT引擎"""
        import tensorrt as trt

        logger = trt.Logger(trt.Logger.INFO)
        runtime = trt.Runtime(logger)

        with open(model_path, 'rb') as f:
            engine = runtime.deserialize_cuda_engine(f.read())

        context = engine.create_execution_context()

        return {'engine': engine, 'context': context}

    def _preprocess(self, image: Image.Image) -> torch.Tensor:
        """图像预处理"""
        transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225]
            )
        ])

        return transform(image).unsqueeze(0)

    def predict(
        self,
        image: Union[str, Path, Image.Image],
        top_k: int = 5
    ) -> List[Dict[str, Union[str, float]]]:
        """
        单张图片预测

        Args:
            image: 图片路径或PIL Image对象
            top_k: 返回Top-K预测结果

        Returns:
            预测结果列表,每个元素包含label和confidence
        """
        # 加载图片
        if isinstance(image, (str, Path)):
            image = Image.open(image).convert('RGB')

        # 预处理
        input_tensor = self._preprocess(image)

        # 推理
        start_time = time.time()

        if self.backend == 'pytorch':
            input_tensor = input_tensor.to(self.device)
            with torch.no_grad():
                output = self.model(input_tensor)
                probs = F.softmax(output, dim=1).cpu().numpy()[0]

        elif self.backend == 'onnx':
            input_numpy = input_tensor.numpy()
            output = self.model.run(['output'], {'input': input_numpy})[0]
            probs = self._softmax(output[0])

        elif self.backend == 'tensorrt':
            # TensorRT推理
            input_numpy = input_tensor.numpy()
            output = np.zeros((1, len(self.labels)), dtype=np.float32)
            self.model['context'].execute_v2([input_numpy, output])
            probs = self._softmax(output[0])

        inference_time = (time.time() - start_time) * 1000

        # 获取Top-K结果
        top_indices = np.argsort(probs)[::-1][:top_k]
        results = []
        for idx in top_indices:
            results.append({
                'label': self.idx_to_label[idx],
                'confidence': float(probs[idx])
            })

        logger.debug(f"Inference time: {inference_time:.2f}ms")

        return results

    def predict_batch(
        self,
        images: List[Union[str, Path, Image.Image]],
        top_k: int = 5
    ) -> List[List[Dict[str, Union[str, float]]]]:
        """
        批量图片预测

        比单张循环预测更高效
        """
        # 加载和预处理所有图片
        batch_tensors = []
        for img in images:
            if isinstance(img, (str, Path)):
                img = Image.open(img).convert('RGB')
            batch_tensors.append(self._preprocess(img))

        batch_input = torch.cat(batch_tensors, dim=0)

        # 批量推理
        start_time = time.time()

        if self.backend == 'pytorch':
            batch_input = batch_input.to(self.device)
            with torch.no_grad():
                outputs = self.model(batch_input)
                probs_batch = F.softmax(outputs, dim=1).cpu().numpy()

        elif self.backend == 'onnx':
            input_numpy = batch_input.numpy()
            outputs = self.model.run(['output'], {'input': input_numpy})[0]
            probs_batch = self._softmax(outputs, axis=1)

        inference_time = (time.time() - start_time) * 1000
        logger.info(f"Batch inference ({len(images)} images): {inference_time:.2f}ms")

        # 解析结果
        results = []
        for probs in probs_batch:
            top_indices = np.argsort(probs)[::-1][:top_k]
            img_results = []
            for idx in top_indices:
                img_results.append({
                    'label': self.idx_to_label[idx],
                    'confidence': float(probs[idx])
                })
            results.append(img_results)

        return results

    @staticmethod
    def _softmax(x, axis=-1):
        """Softmax函数"""
        exp_x = np.exp(x - np.max(x, axis=axis, keepdims=True))
        return exp_x / np.sum(exp_x, axis=axis, keepdims=True)

第七部分:API服务

7.1 FastAPI应用

# src/api/app.py
"""
FastAPI应用入口

提供RESTful API接口,支持图片分类
"""

from fastapi import FastAPI, File, UploadFile, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager
from typing import List, Optional
import asyncio
from pathlib import Path

from src.inference.predictor import Predictor
from src.api.schemas import PredictRequest, PredictResponse, HealthResponse
from src.utils.logger import logger
from src.utils.metrics import setup_metrics


# 全局预测器实例
predictor = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    应用生命周期管理
    """
    # 启动时加载模型
    global predictor
    logger.info("Loading model...")
    predictor = Predictor(
        model_path='models/best_model.onnx',
        labels_path='models/labels.json',
        device='cuda',
        backend='onnx'
    )
    logger.info("Model loaded successfully")

    yield

    # 关闭时清理
    logger.info("Shutting down...")


# 创建FastAPI应用
app = FastAPI(
    title="AI Product Classifier API",
    description="智能商品分类系统API",
    version="1.0.0",
    lifespan=lifespan
)

# CORS配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 健康检查
@app.get("/health", response_model=HealthResponse)
async def health_check():
    """健康检查接口"""
    return HealthResponse(
        status="healthy",
        model_loaded=predictor is not None
    )


# 单张图片分类
@app.post("/predict", response_model=PredictResponse)
async def predict(
    file: UploadFile = File(..., description="商品图片")
):
    """
    单张图片分类

    上传商品图片,返回Top-5预测结果
    """
    # 验证文件类型
    if not file.content_type.startswith('image/'):
        raise HTTPException(400, "File must be an image")

    try:
        # 读取图片
        image = await file.read()
        from PIL import Image
        import io
        img = Image.open(io.BytesIO(image)).convert('RGB')

        # 推理
        results = predictor.predict(img, top_k=5)

        return PredictResponse(
            success=True,
            predictions=results
        )

    except Exception as e:
        logger.error(f"Prediction failed: {e}")
        raise HTTPException(500, f"Prediction failed: {str(e)}")


# 批量图片分类
@app.post("/predict/batch")
async def predict_batch(
    files: List[UploadFile] = File(..., description="商品图片列表")
):
    """
    批量图片分类

    上传多张商品图片,批量返回预测结果
    """
    if len(files) > 10:
        raise HTTPException(400, "Maximum 10 images per batch")

    try:
        # 读取所有图片
        images = []
        for file in files:
            if not file.content_type.startswith('image/'):
                continue
            image_data = await file.read()
            from PIL import Image
            import io
            img = Image.open(io.BytesIO(image_data)).convert('RGB')
            images.append(img)

        # 批量推理
        results = predictor.predict_batch(images, top_k=5)

        return {
            "success": True,
            "results": results
        }

    except Exception as e:
        logger.error(f"Batch prediction failed: {e}")
        raise HTTPException(500, f"Prediction failed: {str(e)}")


if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app, host='0.0.0.0', port=8000)

第八部分:异步任务队列

8.1 Celery配置

# src/tasks/celery_app.py
"""
Celery异步任务配置

用于处理耗时较长的批量推理任务
"""

from celery import Celery
from celery.schedules import crontab

from config.settings import REDIS_URL

# 创建Celery应用
celery_app = Celery(
    'ai_classifier',
    broker=REDIS_URL,
    backend=REDIS_URL,
    include=['src.tasks.inference_tasks']
)

# 配置
celery_app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='Asia/Shanghai',
    enable_utc=True,
    task_track_started=True,
    task_time_limit=30 * 60,  # 30分钟
    task_soft_time_limit=25 * 60,
    worker_prefetch_multiplier=1,
    worker_max_tasks_per_child=100,
)


@celery_app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')
# src/tasks/inference_tasks.py
"""
异步推理任务

处理批量图片分类请求
"""

import asyncio
from celery import shared_task
from src.inference.predictor import Predictor
from src.utils.logger import logger

# 全局预测器(每个worker独立)
_predictor = None


def get_predictor():
    """获取预测器实例(懒加载)"""
    global _predictor
    if _predictor is None:
        _predictor = Predictor(
            model_path='models/best_model.onnx',
            labels_path='models/labels.json',
            device='cuda',
            backend='onnx'
        )
    return _predictor


@shared_task(bind=True, name='inference.batch_predict')
def batch_predict(self, image_urls: list):
    """
    批量图片分类任务

    Args:
        image_urls: 图片URL列表

    Returns:
        分类结果列表
    """
    import requests
    from PIL import Image
    import io

    logger.info(f"Processing {len(image_urls)} images")

    # 下载图片
    images = []
    for url in image_urls:
        try:
            response = requests.get(url, timeout=10)
            img = Image.open(io.BytesIO(response.content)).convert('RGB')
            images.append(img)
        except Exception as e:
            logger.error(f"Failed to download image: {url}, error={e}")
            images.append(None)

    # 过滤无效图片
    valid_images = [img for img in images if img is not None]
    valid_indices = [i for i, img in enumerate(images) if img is not None]

    if not valid_images:
        return [{"error": "No valid images"} for _ in images]

    # 推理
    predictor = get_predictor()
    results = predictor.predict_batch(valid_images, top_k=5)

    # 重组结果(保持原始顺序)
    full_results = []
    result_idx = 0
    for i in range(len(images)):
        if i in valid_indices:
            full_results.append(results[result_idx])
            result_idx += 1
        else:
            full_results.append({"error": "Image download failed"})

    logger.info(f"Batch prediction completed: {len(valid_images)} images")

    return full_results

第九部分:监控与A/B测试

9.1 Prometheus监控

# src/utils/metrics.py
"""
Prometheus监控指标
"""

from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST
from fastapi import Response
import time
from functools import wraps


# 推理指标
inference_requests_total = Counter(
    'inference_requests_total',
    'Total number of inference requests',
    ['status']  # success/error
)

inference_duration_seconds = Histogram(
    'inference_duration_seconds',
    'Inference duration in seconds',
    buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5]
)

# 模型性能指标
model_accuracy = Gauge(
    'model_accuracy',
    'Model accuracy on validation set',
    ['model_version']
)

model_latency_p99 = Gauge(
    'model_latency_p99',
    'Model P99 latency in seconds',
    ['model_version']
)

# 业务指标
active_users = Gauge(
    'active_users',
    'Number of active users'
)

daily_predictions = Counter(
    'daily_predictions',
    'Number of predictions per day'
)


def monitor_inference(func):
    """推理监控装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        try:
            result = func(*args, **kwargs)
            inference_requests_total.labels(status='success').inc()
            return result
        except Exception as e:
            inference_requests_total.labels(status='error').inc()
            raise
        finally:
            duration = time.time() - start
            inference_duration_seconds.observe(duration)

    return wrapper


def setup_metrics(app):
    """设置监控端点"""

    @app.get('/metrics')
    async def metrics():
        return Response(
            content=generate_latest(),
            media_type=CONTENT_TYPE_LATEST
        )

9.2 A/B测试框架

# src/ab_testing/ab_framework.py
"""
A/B测试框架

用于比较不同模型版本的线上效果
"""

import random
from typing import Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime
import redis


@dataclass
class ExperimentConfig:
    """实验配置"""
    name: str
    variants: Dict[str, float]  # variant_name -> traffic_percentage
    default_variant: str


class ABTestFramework:
    """
    A/B测试框架

    特性:
    1. 基于用户ID的流量分配(保证同一用户始终看到同一版本)
    2. 实验指标收集
    3. 统计显著性计算
    """

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.experiments = {}

    def register_experiment(self, config: ExperimentConfig):
        """注册实验"""
        self.experiments[config.name] = config

    def get_variant(self, experiment_name: str, user_id: str) -> str:
        """
        获取用户应该看到的变体

        使用一致性哈希保证同一用户始终看到同一版本
        """
        config = self.experiments.get(experiment_name)
        if not config:
            return None

        # 使用用户ID哈希分配
        hash_value = hash(f"{experiment_name}:{user_id}") % 100
        cumulative = 0

        for variant, percentage in config.variants.items():
            cumulative += percentage * 100
            if hash_value < cumulative:
                return variant

        return config.default_variant

    def track_metric(
        self,
        experiment_name: str,
        variant: str,
        metric_name: str,
        value: float = 1.0
    ):
        """记录实验指标"""
        key = f"ab_test:{experiment_name}:{variant}:{metric_name}"
        self.redis.hincrbyfloat(key, 'sum', value)
        self.redis.hincrby(key, 'count', 1)

        # 记录时间戳
        self.redis.lpush(f"{key}:timestamps", datetime.now().isoformat())

    def get_results(self, experiment_name: str) -> Dict:
        """获取实验结果"""
        config = self.experiments[experiment_name]
        results = {}

        for variant in config.variants.keys():
            variant_results = {}
            keys = self.redis.keys(f"ab_test:{experiment_name}:{variant}:*")

            for key in keys:
                metric_name = key.decode().split(':')[-1]
                data = self.redis.hgetall(key)

                variant_results[metric_name] = {
                    'sum': float(data.get(b'sum', 0)),
                    'count': int(data.get(b'count', 0)),
                    'avg': float(data.get(b'sum', 0)) / int(data.get(b'count', 1))
                }

            results[variant] = variant_results

        return results

第十部分:持续学习

10.1 主动学习

# src/active_learning/active_learner.py
"""
主动学习模块

自动选择不确定性最高的样本进行人工标注,
用最少的标注成本提升模型性能。
"""

import numpy as np
from typing import List, Tuple
from sklearn.cluster import KMeans


class ActiveLearner:
    """
    主动学习器

    策略:
    1. 不确定性采样:选择预测概率最低的样本
    2. 多样性采样:选择特征空间中距离最远的样本
    3. 混合策略:结合不确定性和多样性
    """

    def __init__(self, strategy: str = 'uncertainty'):
        self.strategy = strategy

    def select_samples_uncertainty(
        self,
        probabilities: np.ndarray,
        n_samples: int
    ) -> List[int]:
        """
        不确定性采样

        选择模型最不确定的样本(预测概率接近0.5)
        """
        # 计算不确定性(1 - 最大概率)
        uncertainties = 1 - np.max(probabilities, axis=1)
        return np.argsort(uncertainties)[-n_samples:].tolist()

    def select_samples_diversity(
        self,
        features: np.ndarray,
        n_samples: int
    ) -> List[int]:
        """
        多样性采样

        使用K-Means聚类,选择每个聚类中心最近的样本
        """
        kmeans = KMeans(n_clusters=n_samples, random_state=42)
        kmeans.fit(features)

        # 找到每个聚类中心最近的样本
        selected_indices = []
        for center in kmeans.cluster_centers_:
            distances = np.linalg.norm(features - center, axis=1)
            selected_indices.append(np.argmin(distances))

        return selected_indices

    def select_samples_hybrid(
        self,
        probabilities: np.ndarray,
        features: np.ndarray,
        n_samples: int,
        uncertainty_weight: float = 0.5
    ) -> List[int]:
        """
        混合策略

        结合不确定性和多样性评分
        """
        # 不确定性评分
        uncertainties = 1 - np.max(probabilities, axis=1)
        uncertainty_scores = (uncertainties - uncertainties.min()) / (uncertainties.max() - uncertainties.min())

        # 多样性评分(与已选样本的距离)
        n_data = len(features)
        diversity_scores = np.zeros(n_data)

        for i in range(n_data):
            for j in range(n_data):
                if i != j:
                    diversity_scores[i] += np.linalg.norm(features[i] - features[j])

        diversity_scores = (diversity_scores - diversity_scores.min()) / (diversity_scores.max() - diversity_scores.min())

        # 综合评分
        combined_scores = uncertainty_weight * uncertainty_scores + (1 - uncertainty_weight) * diversity_scores

        return np.argsort(combined_scores)[-n_samples:].tolist()

    def select_samples(
        self,
        probabilities: np.ndarray,
        features: np.ndarray,
        n_samples: int
    ) -> List[int]:
        """选择需要标注的样本"""
        if self.strategy == 'uncertainty':
            return self.select_samples_uncertainty(probabilities, n_samples)
        elif self.strategy == 'diversity':
            return self.select_samples_diversity(features, n_samples)
        elif self.strategy == 'hybrid':
            return self.select_samples_hybrid(probabilities, features, n_samples)
        else:
            raise ValueError(f"Unknown strategy: {self.strategy}")

本文通过构建一个完整的智能商品分类系统,全面展示了Python+AI项目开发的各个环节:

数据准备:数据集下载、清洗、划分

数据处理:数据增强、数据集类、批量加载

模型设计:EfficientNet、多任务学习、模型集成

模型训练:训练器、学习率调度、EMA、早停

模型优化:ONNX导出、TensorRT加速、量化

模型部署:FastAPI服务、异步任务队列

监控运维:Prometheus指标、A/B测试

持续学习:主动学习、在线更新
来源:
https://yyvgt.cn/category/jiushige.html

相关文章
|
1月前
|
数据采集 机器学习/深度学习 人工智能
Python+AI实战:从零构建智能图像识别系统(一)
教程来源 https://yyvgt.cn/category/jiulishi.html 本文详解如何用Python从零构建生产级智能商品分类系统,涵盖数据采集、增强、模型训练(ResNet/EfficientNet/ViT)、优化、ONNX/TensorRT部署、FastAPI服务、A/B测试与持续学习全链路,直面真实AI落地挑战。
|
1月前
|
JavaScript 前端开发 安全
前端组件库——Naive UI知识点大全(一)
教程来源 https://hllft.cn/category/artificial-intelligence.html Naive UI是Vue 3 + TypeScript现代化UI库,由图森未来开源。主打轻量、高性能、零CSS导入、全组件Tree Shaking及类型安全主题系统,已获GitHub 1.5w+ Star,适合追求开发体验与性能的中后台项目。
|
JSON JavaScript 前端开发
116.【SpringBoot和Vue结合-图书馆管理系统】(一)
116.【SpringBoot和Vue结合-图书馆管理系统】
415 0
|
前端开发 JavaScript Java
基于springboot的医院门诊管理系统
就此系统开发之前,对医院进行了对于这个系统的功能需求进行了解,主要围绕患者挂号、就诊,体检,医生门诊,药房、药库管理等功能的实现,医院管理系统不仅仅只是针对医院功能的管理,更是将患者的详细信息、医院的门诊信息、医生的诊断信息、药库的药品信息等等都列为此次系统将要实现的功能。因此,医院管理系统的开发会使医院更加方便快捷高效的医生患者进行管理,这种轻便快捷、成本低廉、应用性强的中小型医院管理系统既方便了医院的管理也方便了医患之间的操作。
基于springboot的医院门诊管理系统
|
1月前
|
消息中间件 运维 监控
海尔智家 x 阿里云 Kafka 实践:轻松支撑百亿级消息,稳定性与效率双提升
海尔智家通过与阿里云深度共创,采用定制化迁移与调优方案,平滑升级至Kafka Serverless,不仅保障了极致稳定性,更实现运维自动化,大幅释放研发人力。
178 27
|
1月前
|
缓存 数据处理
Python+AI实战:从零构建智能图像识别系统(二)
教程来源 https://yyvgt.cn/category/jiuqi.html 本项目构建了完整的商品图像分类系统:提供支持缓存、类别权重与多模式(train/val/test)的`ProductDataset`;集成Albumentations增强及Mixup/CutMix等先进策略;基于EfficientNet-B0实现单任务与多任务分类模型;并支持投票、加权平均、快照集成等多种模型融合方法。
|
1月前
|
存储 运维 监控
智算中心建设项目一般过程解析
智算中心是支撑AI、大数据发展的新型算力基础设施。九章云极主导建设运营,覆盖立项、设计、部署等六阶段全流程,3年内目标纳管10万P算力。(239字)
|
1月前
|
缓存 Java 数据库
Java进阶之路:从初级开发到高级工程师的能力提升指南
很多Java初学者在掌握了基础语法和核心特性后,都会陷入一个困惑:如何从初级开发工程师,逐步成长为高级Java开发工程师?
187 1
|
2月前
|
消息中间件 弹性计算 监控
在阿里云上搭建低延迟行情监控系统(WebSocket实战)
本文详解如何在阿里云ECS(Ubuntu 22.04)上用Python构建生产级WebSocket行情客户端:支持自动重连、心跳保活、多市场(股票/加密货币)实时订阅,并通过消息队列解耦处理,显著提升稳定性与低延迟。
|
1月前
|
存储 Java 数据库
JAVA语言企业项目实战(二)
教程来源 http://oplhc.cn/category/hardware-review.html 本节详解秒杀系统数据库设计:强调读写分离、热点隔离、冗余字段与索引优化;详述用户、秒杀商品、订单及记录四张核心表结构,含BCrypt加密、乐观锁、唯一约束等关键设计;并给出HikariCP连接池合理配置策略。