| """ |
| Module pour les IA de prédiction linéaire |
| """ |
|
|
| import numpy as np |
| from typing import List, Tuple, Optional |
| from .neuron import SitiNEUR |
|
|
|
|
| class LinearAI: |
| """ |
| IA de prédiction linéaire avec réseau de neurones |
| |
| Args: |
| input_size: Nombre de features en entrée |
| output_size: Nombre de sorties (1 pour régression simple) |
| hidden_layers: Liste des tailles des couches cachées |
| |
| Example: |
| >>> from sitiai import create |
| >>> ai = create.ai('linear', input_size=3, output_size=1, hidden_layers=[10, 5]) |
| >>> ai.train(X_train, y_train, epochs=100) |
| >>> predictions = ai.predict(X_test) |
| """ |
| |
| def __init__(self, input_size: int, output_size: int = 1, hidden_layers: Optional[List[int]] = None): |
| self.input_size = input_size |
| self.output_size = output_size |
| self.hidden_layers = hidden_layers or [10] |
| |
| |
| self.layers: List[SitiNEUR] = [] |
| |
| |
| layer_sizes = [input_size] + self.hidden_layers + [output_size] |
| |
| for i in range(len(layer_sizes) - 1): |
| |
| activation = 'relu' if i < len(layer_sizes) - 2 else 'linear' |
| layer = SitiNEUR( |
| input_size=layer_sizes[i], |
| output_size=layer_sizes[i + 1], |
| activation=activation |
| ) |
| self.layers.append(layer) |
| |
| self.is_trained = False |
| self.loss_history = [] |
| |
| |
| self.x_mean = None |
| self.x_std = None |
| self.y_mean = None |
| self.y_std = None |
| |
| def _normalize_X(self, X: np.ndarray, fit: bool = False) -> np.ndarray: |
| """Normalise les entrées""" |
| if fit: |
| self.x_mean = np.mean(X, axis=0) |
| self.x_std = np.std(X, axis=0) + 1e-8 |
| |
| return (X - self.x_mean) / self.x_std |
| |
| def _normalize_y(self, y: np.ndarray, fit: bool = False) -> np.ndarray: |
| """Normalise les sorties""" |
| if fit: |
| self.y_mean = np.mean(y, axis=0) |
| self.y_std = np.std(y, axis=0) + 1e-8 |
| |
| return (y - self.y_mean) / self.y_std |
| |
| def _denormalize_y(self, y: np.ndarray) -> np.ndarray: |
| """Dénormalise les prédictions""" |
| if self.y_mean is not None and self.y_std is not None: |
| return y * self.y_std + self.y_mean |
| return y |
| |
| def forward(self, x: np.ndarray) -> np.ndarray: |
| """ |
| Propagation avant à travers tout le réseau |
| |
| Args: |
| x: Données d'entrée |
| |
| Returns: |
| Prédictions |
| """ |
| output = x |
| for layer in self.layers: |
| output = layer.forward(output) |
| return output |
| |
| def train(self, X: np.ndarray, y: np.ndarray, epochs: int = 100, |
| learning_rate: float = 0.01, batch_size: Optional[int] = None, |
| verbose: bool = True): |
| """ |
| Entraîne le modèle sur les données |
| |
| Args: |
| X: Données d'entrée (shape: [n_samples, input_size]) |
| y: Cibles (shape: [n_samples, output_size]) |
| epochs: Nombre d'époques d'entraînement |
| learning_rate: Taux d'apprentissage |
| batch_size: Taille des mini-batches (None = batch complet) |
| verbose: Afficher les logs d'entraînement |
| """ |
| |
| X = np.array(X, dtype=np.float64) |
| y = np.array(y, dtype=np.float64) |
| |
| |
| if y.ndim == 1: |
| y = y.reshape(-1, 1) |
| |
| |
| if np.any(np.isnan(X)) or np.any(np.isnan(y)): |
| raise ValueError("Les données contiennent des valeurs NaN") |
| |
| |
| X_norm = self._normalize_X(X, fit=True) |
| y_norm = self._normalize_y(y, fit=True) |
| |
| n_samples = X_norm.shape[0] |
| batch_size = batch_size or min(32, n_samples) |
| |
| self.loss_history = [] |
| |
| for epoch in range(epochs): |
| |
| indices = np.random.permutation(n_samples) |
| X_shuffled = X_norm[indices] |
| y_shuffled = y_norm[indices] |
| |
| epoch_loss = 0 |
| n_batches = 0 |
| |
| |
| for i in range(0, n_samples, batch_size): |
| batch_X = X_shuffled[i:i + batch_size] |
| batch_y = y_shuffled[i:i + batch_size] |
| |
| |
| predictions = self.forward(batch_X) |
| |
| |
| if np.any(np.isnan(predictions)): |
| if verbose: |
| print(f"⚠️ NaN détecté à l'époque {epoch + 1}, arrêt de l'entraînement") |
| break |
| |
| |
| loss = np.mean((predictions - batch_y) ** 2) |
| epoch_loss += loss |
| n_batches += 1 |
| |
| |
| grad = 2 * (predictions - batch_y) / batch_y.shape[0] |
| |
| for layer in reversed(self.layers): |
| grad = layer.backward(grad, learning_rate) |
| |
| if n_batches == 0: |
| break |
| |
| avg_loss = epoch_loss / n_batches |
| self.loss_history.append(avg_loss) |
| |
| if verbose and (epoch + 1) % 10 == 0: |
| print(f"Époque {epoch + 1}/{epochs}, Perte: {avg_loss:.6f}") |
| |
| self.is_trained = True |
| |
| if verbose: |
| print(f"\n✓ Entraînement terminé! Perte finale: {self.loss_history[-1]:.6f}") |
| |
| def predict(self, X: np.ndarray) -> np.ndarray: |
| """ |
| Fait des prédictions sur de nouvelles données |
| |
| Args: |
| X: Données d'entrée |
| |
| Returns: |
| Prédictions |
| """ |
| if not self.is_trained: |
| print("⚠️ Attention: Le modèle n'est pas entraîné.") |
| |
| X = np.array(X, dtype=np.float64) |
| |
| |
| if self.x_mean is not None: |
| X_norm = self._normalize_X(X, fit=False) |
| predictions_norm = self.forward(X_norm) |
| return self._denormalize_y(predictions_norm) |
| |
| return self.forward(X) |
| |
| def evaluate(self, X: np.ndarray, y: np.ndarray) -> Tuple[float, float]: |
| """ |
| Évalue le modèle sur des données de test |
| |
| Args: |
| X: Données d'entrée |
| y: Vraies valeurs |
| |
| Returns: |
| (mse, r2_score) |
| """ |
| predictions = self.predict(X) |
| y = np.array(y, dtype=np.float32) |
| |
| if y.ndim == 1: |
| y = y.reshape(-1, 1) |
| |
| |
| mse = np.mean((predictions - y) ** 2) |
| |
| |
| ss_res = np.sum((y - predictions) ** 2) |
| ss_tot = np.sum((y - np.mean(y)) ** 2) |
| r2 = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0 |
| |
| return mse, r2 |
| |
| def save_weights(self, filepath: str): |
| """ |
| Sauvegarde les poids du modèle dans un fichier |
| |
| Args: |
| filepath: Chemin du fichier de sauvegarde (.npz) |
| |
| Example: |
| >>> ai.save_weights('my_model.npz') |
| """ |
| if not filepath.endswith('.npz'): |
| filepath += '.npz' |
| |
| weights_data = {} |
| |
| |
| for i, layer in enumerate(self.layers): |
| layer_weights = layer.get_weights() |
| weights_data[f'layer_{i}_weights'] = layer_weights['weights'] |
| weights_data[f'layer_{i}_bias'] = layer_weights['bias'] |
| |
| |
| if self.x_mean is not None: |
| weights_data['x_mean'] = self.x_mean |
| weights_data['x_std'] = self.x_std |
| weights_data['y_mean'] = self.y_mean |
| weights_data['y_std'] = self.y_std |
| |
| |
| weights_data['input_size'] = np.array([self.input_size]) |
| weights_data['output_size'] = np.array([self.output_size]) |
| weights_data['hidden_layers'] = np.array(self.hidden_layers) |
| weights_data['is_trained'] = np.array([self.is_trained]) |
| |
| np.savez(filepath, **weights_data) |
| print(f"✓ Modèle sauvegardé dans '{filepath}'") |
| |
| def load_weights(self, filepath: str): |
| """ |
| Charge les poids du modèle depuis un fichier |
| |
| Args: |
| filepath: Chemin du fichier de sauvegarde (.npz) |
| |
| Example: |
| >>> ai = sitiai.create.ai('linear', input_size=3, output_size=1) |
| >>> ai.load_weights('my_model.npz') |
| """ |
| if not filepath.endswith('.npz'): |
| filepath += '.npz' |
| |
| data = np.load(filepath, allow_pickle=True) |
| |
| |
| for i, layer in enumerate(self.layers): |
| weights_dict = { |
| 'weights': data[f'layer_{i}_weights'], |
| 'bias': data[f'layer_{i}_bias'] |
| } |
| layer.set_weights(weights_dict) |
| |
| |
| if 'x_mean' in data: |
| self.x_mean = data['x_mean'] |
| self.x_std = data['x_std'] |
| self.y_mean = data['y_mean'] |
| self.y_std = data['y_std'] |
| |
| |
| if 'is_trained' in data: |
| self.is_trained = bool(data['is_trained'][0]) |
| |
| print(f"✓ Modèle chargé depuis '{filepath}'") |
| |
| def __repr__(self): |
| status = "entraîné" if self.is_trained else "non entraîné" |
| return f"LinearAI(architecture={[self.input_size] + self.hidden_layers + [self.output_size]}, status='{status}')" |
|
|