import os import tempfile import subprocess import numpy as np import librosa from fastapi import UploadFile, HTTPException from typing import Tuple import logging from app.config import settings logger = logging.getLogger(__name__) class AudioProcessor: """Production-grade audio preprocessing""" @staticmethod async def validate_file(file: UploadFile) -> Tuple[bytes, str]: """Validate file size and format""" # Check file size contents = await file.read() size_mb = len(contents) / (1024 * 1024) if size_mb > settings.MAX_FILE_SIZE_MB: raise HTTPException( status_code=413, detail=f"File too large. Max {settings.MAX_FILE_SIZE_MB}MB" ) # Check format ext = file.filename.split('.')[-1].lower() if ext not in settings.SUPPORTED_FORMATS: raise HTTPException( status_code=415, detail=f"Unsupported format. Supported: {settings.SUPPORTED_FORMATS}" ) return contents, ext @staticmethod async def convert_to_wav(input_bytes: bytes, input_ext: str) -> bytes: """Convert audio to WAV format (16kHz, mono)""" with tempfile.NamedTemporaryFile(delete=False, suffix=f".{input_ext}") as f_in: f_in.write(input_bytes) input_path = f_in.name with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as f_out: output_path = f_out.name try: # FFmpeg conversion cmd = [ "ffmpeg", "-i", input_path, "-ar", str(settings.TARGET_SAMPLE_RATE), "-ac", "1", "-acodec", "pcm_s16le", "-y", # Overwrite output output_path ] result = subprocess.run( cmd, capture_output=True, text=True, timeout=30 ) if result.returncode != 0: logger.error(f"FFmpeg error: {result.stderr}") raise HTTPException( status_code=422, detail="Audio conversion failed" ) # Read converted file with open(output_path, "rb") as f: return f.read() except subprocess.TimeoutExpired: raise HTTPException( status_code=408, detail="Audio conversion timeout" ) finally: # Cleanup for path in [input_path, output_path]: if os.path.exists(path): os.unlink(path) @staticmethod def get_audio_info(audio_bytes: bytes) -> dict: """Get audio metadata""" with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp: tmp.write(audio_bytes) path = tmp.name try: y, sr = librosa.load(path, sr=None) duration = len(y) / sr return { "duration_seconds": round(duration, 2), "sample_rate": sr, "channels": 1 if len(y.shape) == 1 else y.shape[1], "samples": len(y) } finally: os.unlink(path) audio_processor = AudioProcessor()