import os import cv2 import numpy as np import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader from sklearn.model_selection import train_test_split from sklearn.metrics import classification_report, accuracy_score, confusion_matrix import time from model import ViolenceConv3D # --- Configuration --- # Adjusted for standalone folder usage # We point to the parent directory's dataset to avoid copying errors # In a real GitHub repo, users should place 'Dataset' in the root or update this path. BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # Points to parent of Conv3D_Model DATASET_DIR = os.path.join(BASE_DIR, "Dataset") MODEL_SAVE_PATH = "best_model.pth" # Hyperparameters IMG_SIZE = 112 SEQ_LEN = 16 BATCH_SIZE = 50 EPOCHS = 80 LEARNING_RATE = 1e-4 PATIENCE = 5 # Early stopping patience # --- 1. Data Augmentation --- def augment_video_frames(frames): """ Apply augmentation to a sequence of frames. All frames in the sequence must receive the same transformation parameters. """ augmented_frames = [] # Decisions for augmentation do_flip = np.random.random() > 0.5 do_rotate = np.random.random() > 0.5 angle = np.random.randint(-15, 15) if do_rotate else 0 # Color jitter parameters (Brightness/Contrast) brightness = np.random.uniform(0.8, 1.2) contrast = np.random.uniform(0.8, 1.2) for frame in frames: new_frame = frame.copy() # Horizontal Flip if do_flip: new_frame = cv2.flip(new_frame, 1) # Rotation if do_rotate: (h, w) = new_frame.shape[:2] center = (w // 2, h // 2) M = cv2.getRotationMatrix2D(center, angle, 1.0) new_frame = cv2.warpAffine(new_frame, M, (w, h)) # Color Jitter (Brightness/Contrast) new_frame = cv2.convertScaleAbs(new_frame, alpha=contrast, beta=(brightness-1)*50) augmented_frames.append(new_frame) return np.array(augmented_frames) # --- Dataset Class --- class ViolenceDataset(Dataset): def __init__(self, video_paths, labels, transform=None, augment=False): self.video_paths = video_paths self.labels = labels self.augment = augment def __len__(self): return len(self.video_paths) def __getitem__(self, idx): path = self.video_paths[idx] label = self.labels[idx] try: frames = self._load_video(path) except Exception as e: # Fallback for corrupted video print(f"Error loading {path}: {e}") frames = np.zeros((SEQ_LEN, IMG_SIZE, IMG_SIZE, 3), dtype=np.uint8) if self.augment: frames = augment_video_frames(frames) # Normalize and Channel First (C, D, H, W) frames = torch.tensor(frames, dtype=torch.float32) frames = frames / 255.0 # Normalize 0-1 frames = frames.permute(3, 0, 1, 2) # C, D, H, W return frames, label def _load_video(self, path): cap = cv2.VideoCapture(path) frames = [] try: while True: ret, frame = cap.read() if not ret: break frame = cv2.resize(frame, (IMG_SIZE, IMG_SIZE)) frames.append(frame) finally: cap.release() # Handle frame count adjustments if len(frames) == 0: return np.zeros((SEQ_LEN, IMG_SIZE, IMG_SIZE, 3), dtype=np.uint8) if len(frames) < SEQ_LEN: while len(frames) < SEQ_LEN: frames.append(frames[-1]) elif len(frames) > SEQ_LEN: indices = np.linspace(0, len(frames)-1, SEQ_LEN, dtype=int) frames = [frames[i] for i in indices] return np.array(frames) # --- 2. Data Splitting --- def prepare_data(): violence_dir = os.path.join(DATASET_DIR, 'violence') no_violence_dir = os.path.join(DATASET_DIR, 'no-violence') if not os.path.exists(violence_dir) or not os.path.exists(no_violence_dir): raise FileNotFoundError(f"Dataset directories not found. Expected {violence_dir} and {no_violence_dir}") violence_files = [os.path.join(violence_dir, f) for f in os.listdir(violence_dir) if f.endswith('.avi') or f.endswith('.mp4')] no_violence_files = [os.path.join(no_violence_dir, f) for f in os.listdir(no_violence_dir) if f.endswith('.avi') or f.endswith('.mp4')] print(f"Found {len(violence_files)} Violence videos") print(f"Found {len(no_violence_files)} No-Violence videos") X = violence_files + no_violence_files y = [1] * len(violence_files) + [0] * len(no_violence_files) # Split (70% Train, 15% Val, 15% Test) X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.30, random_state=42, stratify=y) X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.50, random_state=42, stratify=y_temp) print(f"\nDataset Split Stats:") print(f"Train: {len(X_train)} samples") print(f"Val: {len(X_val)} samples") print(f"Test: {len(X_test)} samples") return (X_train, y_train), (X_val, y_val), (X_test, y_test) # --- Early Stopping --- class EarlyStopping: def __init__(self, patience=5, verbose=False, path='checkpoint.pth'): self.patience = patience self.verbose = verbose self.counter = 0 self.best_score = None self.early_stop = False self.val_loss_min = np.inf self.path = path def __call__(self, val_loss, model): score = -val_loss if self.best_score is None: self.best_score = score self.save_checkpoint(val_loss, model) elif score < self.best_score: self.counter += 1 if self.verbose: print(f'EarlyStopping counter: {self.counter} out of {self.patience}') if self.counter >= self.patience: self.early_stop = True else: self.best_score = score self.save_checkpoint(val_loss, model) self.counter = 0 def save_checkpoint(self, val_loss, model): if self.verbose: print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model ...') torch.save(model, self.path) self.val_loss_min = val_loss # --- Main Execution --- if __name__ == "__main__": start_time = time.time() device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {device}") # Prepare Data try: (X_train, y_train), (X_val, y_val), (X_test, y_test) = prepare_data() except Exception as e: print(f"Data preparation failed: {e}") exit(1) train_dataset = ViolenceDataset(X_train, y_train, augment=True) val_dataset = ViolenceDataset(X_val, y_val, augment=False) test_dataset = ViolenceDataset(X_test, y_test, augment=False) train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0) val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0) test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0) # Model Setup model = ViolenceConv3D().to(device) criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE) early_stopping = EarlyStopping(patience=PATIENCE, verbose=True, path=MODEL_SAVE_PATH) # Training Loop print("\nStarting Training...") for epoch in range(EPOCHS): model.train() train_loss = 0.0 correct = 0 total = 0 for batch_idx, (inputs, labels) in enumerate(train_loader): inputs, labels = inputs.to(device), labels.to(device) optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, labels) loss.backward() optimizer.step() train_loss += loss.item() _, predicted = torch.max(outputs.data, 1) total += labels.size(0) correct += (predicted == labels).sum().item() if batch_idx % 10 == 0: print(f"Epoch {epoch+1} Batch {batch_idx}/{len(train_loader)} Loss: {loss.item():.4f}", end='\r') train_acc = 100 * correct / total avg_train_loss = train_loss / len(train_loader) # Validation Phase model.eval() val_loss = 0.0 correct_val = 0 total_val = 0 with torch.no_grad(): for inputs, labels in val_loader: inputs, labels = inputs.to(device), labels.to(device) outputs = model(inputs) loss = criterion(outputs, labels) val_loss += loss.item() _, predicted = torch.max(outputs.data, 1) total_val += labels.size(0) correct_val += (predicted == labels).sum().item() val_acc = 100 * correct_val / total_val avg_val_loss = val_loss / len(val_loader) print(f'\nEpoch [{epoch+1}/{EPOCHS}] ' f'Train Loss: {avg_train_loss:.4f} Acc: {train_acc:.2f}% ' f'Val Loss: {avg_val_loss:.4f} Acc: {val_acc:.2f}%') early_stopping(avg_val_loss, model) if early_stopping.early_stop: print("Early stopping triggered") break # Evaluation print("\nLoading best model for overall evaluation...") if os.path.exists(MODEL_SAVE_PATH): model = torch.load(MODEL_SAVE_PATH) else: print("Warning: Model file not found, using last epoch model.") model.eval() all_preds = [] all_labels = [] print("Evaluating on Test set...") with torch.no_grad(): for inputs, labels in test_loader: inputs, labels = inputs.to(device), labels.to(device) outputs = model(inputs) _, predicted = torch.max(outputs.data, 1) all_preds.extend(predicted.cpu().numpy()) all_labels.extend(labels.cpu().numpy()) print("\n=== Overall Evaluation Report ===") print(classification_report(all_labels, all_preds, target_names=['No Violence', 'Violence'])) print("Confusion Matrix:") cm = confusion_matrix(all_labels, all_preds) print(cm) acc = accuracy_score(all_labels, all_preds) print(f"\nFinal Test Accuracy: {acc*100:.2f}%") elapsed = time.time() - start_time print(f"\nTotal execution time: {elapsed/60:.2f} minutes")