| | """Data loading, cleaning, and filtering helpers for the BI dashboard.""" |
| |
|
| | from __future__ import annotations |
| |
|
| | from dataclasses import dataclass |
| | from io import BytesIO |
| | from pathlib import Path |
| | from typing import Dict, Iterable, List, Mapping, Optional, Tuple |
| |
|
| | import pandas as pd |
| |
|
| | from utils import ( |
| | ColumnTypes, |
| | PREVIEW_ROWS, |
| | coerce_datetime_columns, |
| | ensure_unique_columns, |
| | infer_column_types, |
| | is_supported_file, |
| | ) |
| |
|
| | SAMPLE_DATA_DIR = Path(__file__).resolve().parent / "data" |
| | SAMPLE_DESCRIPTIONS = { |
| | "train.csv": "Weekly Walmart sales with markdowns and holidays (training set).", |
| | "test.csv": "Companion test set without weekly sales labels.", |
| | "features.csv": "Store-level features such as markdowns, CPI, unemployment.", |
| | "stores.csv": "Store metadata including type and size.", |
| | } |
| |
|
| |
|
| | @dataclass(frozen=True) |
| | class DatasetBundle: |
| | """Container storing the dataset and metadata required by the UI.""" |
| |
|
| | dataframe: pd.DataFrame |
| | column_types: ColumnTypes |
| | source_name: str |
| |
|
| |
|
| | def load_dataset(file_obj) -> DatasetBundle: |
| | """Load the provided uploaded file into a pandas DataFrame. |
| | |
| | Parameters |
| | ---------- |
| | file_obj: |
| | File-like object produced by the Gradio upload widget. |
| | |
| | Returns |
| | ------- |
| | DatasetBundle |
| | Loaded dataset alongside inferred column metadata. |
| | |
| | Raises |
| | ------ |
| | ValueError |
| | If the file cannot be read or uses an unsupported format. |
| | """ |
| | if file_obj is None: |
| | raise ValueError("Please upload a CSV or Excel file.") |
| |
|
| | file_name = getattr(file_obj, "name", None) |
| | original_name = getattr(file_obj, "orig_name", file_name) |
| |
|
| | if not original_name or not is_supported_file(original_name): |
| | raise ValueError("Unsupported file type. Please upload a CSV or Excel file.") |
| |
|
| | path_candidate = Path(str(file_name)) if file_name else None |
| | dataframe: Optional[pd.DataFrame] = None |
| |
|
| | try: |
| | if path_candidate and path_candidate.exists(): |
| | dataframe = _read_from_path(path_candidate, original_name) |
| | else: |
| | dataframe = _read_from_buffer(file_obj, original_name) |
| | except Exception as exc: |
| | raise ValueError(f"Unable to load dataset: {exc}") from exc |
| |
|
| | if dataframe is None: |
| | raise ValueError("Failed to load dataset. The file may be empty or corrupted.") |
| |
|
| | dataframe = ensure_unique_columns(dataframe) |
| | dataframe, datetime_cols = coerce_datetime_columns(dataframe) |
| | column_types = infer_column_types(dataframe) |
| |
|
| | |
| | column_types = ColumnTypes( |
| | numeric=column_types.numeric, |
| | categorical=column_types.categorical, |
| | datetime=tuple(sorted(set(column_types.datetime + tuple(datetime_cols)))), |
| | ) |
| |
|
| | return DatasetBundle( |
| | dataframe=dataframe, |
| | column_types=column_types, |
| | source_name=Path(original_name).name, |
| | ) |
| |
|
| |
|
| | def _read_from_path(path: Path, original_name: str) -> pd.DataFrame: |
| | """Read a dataset from disk.""" |
| | suffix = path.suffix.lower() |
| | if suffix == ".csv": |
| | return pd.read_csv(path) |
| | if suffix in {".xlsx", ".xls"}: |
| | return pd.read_excel(path) |
| | raise ValueError(f"Unsupported file extension in {original_name}.") |
| |
|
| |
|
| | def _read_from_buffer(file_obj, original_name: str) -> pd.DataFrame: |
| | """Read a dataset from an in-memory buffer.""" |
| | bytes_data = getattr(file_obj, "read", lambda: b"")() |
| | if not bytes_data: |
| | raise ValueError(f"The uploaded file '{original_name}' is empty.") |
| |
|
| | buffer = BytesIO(bytes_data) |
| | lowered = original_name.lower() |
| | if lowered.endswith(".csv"): |
| | return pd.read_csv(buffer) |
| | if lowered.endswith((".xlsx", ".xls")): |
| | return pd.read_excel(buffer) |
| |
|
| | raise ValueError("Only CSV and Excel files are supported.") |
| |
|
| |
|
| | def dataset_overview(df: pd.DataFrame) -> Dict[str, object]: |
| | """Return basic information about the dataset.""" |
| | info = { |
| | "Rows": int(df.shape[0]), |
| | "Columns": int(df.shape[1]), |
| | "Memory Usage (MB)": round(df.memory_usage(deep=True).sum() / (1024**2), 2), |
| | } |
| | dtypes = pd.DataFrame({"Column": df.columns, "Type": df.dtypes.astype(str)}) |
| | return {"info": info, "dtypes": dtypes} |
| |
|
| |
|
| | def dataset_preview(df: pd.DataFrame, rows: int = PREVIEW_ROWS) -> Dict[str, pd.DataFrame]: |
| | """Return head and tail previews of the dataset.""" |
| | return { |
| | "head": df.head(rows), |
| | "tail": df.tail(rows), |
| | } |
| |
|
| |
|
| | def numeric_summary(df: pd.DataFrame) -> pd.DataFrame: |
| | """Compute descriptive statistics for numeric columns.""" |
| | numeric_df = df.select_dtypes(include=["number"]) |
| | if numeric_df.empty: |
| | return pd.DataFrame() |
| |
|
| | summary = pd.DataFrame( |
| | { |
| | "count": numeric_df.count(), |
| | "mean": numeric_df.mean(), |
| | "median": numeric_df.median(), |
| | "std": numeric_df.std(), |
| | "min": numeric_df.min(), |
| | "25%": numeric_df.quantile(0.25), |
| | "75%": numeric_df.quantile(0.75), |
| | "max": numeric_df.max(), |
| | } |
| | ) |
| |
|
| | summary.index.name = "column" |
| | return summary.round(3) |
| |
|
| |
|
| | def categorical_summary(df: pd.DataFrame, top_values: int = 5) -> pd.DataFrame: |
| | """Compute summary statistics for categorical columns.""" |
| | categorical_cols = df.select_dtypes(exclude=["number", "datetime64[ns]", "datetime64[ns, UTC]"]) |
| | if categorical_cols.empty: |
| | return pd.DataFrame() |
| |
|
| | rows: List[Dict[str, object]] = [] |
| | for column in categorical_cols: |
| | series = categorical_cols[column] |
| | mode_series = series.mode(dropna=True) |
| | mode_value = mode_series.iloc[0] if not mode_series.empty else None |
| | counts = series.value_counts(dropna=True).head(top_values) |
| | top_repr = ", ".join(f"{idx} ({count})" for idx, count in counts.items()) |
| | rows.append( |
| | { |
| | "column": column, |
| | "unique_values": int(series.nunique(dropna=True)), |
| | "mode": mode_value, |
| | "mode_count": int(counts.iloc[0]) if not counts.empty else 0, |
| | f"top_{top_values}": top_repr, |
| | } |
| | ) |
| |
|
| | return pd.DataFrame(rows) |
| |
|
| |
|
| | def missing_value_report(df: pd.DataFrame) -> pd.DataFrame: |
| | """Return the count and percentage of missing values per column.""" |
| | missing_counts = df.isna().sum() |
| | if missing_counts.sum() == 0: |
| | return pd.DataFrame(columns=["column", "missing_count", "missing_pct"]) |
| |
|
| | missing_pct = (missing_counts / len(df)) * 100 |
| | report = pd.DataFrame( |
| | { |
| | "column": missing_counts.index, |
| | "missing_count": missing_counts.values, |
| | "missing_pct": missing_pct.values, |
| | } |
| | ) |
| | return report.sort_values(by="missing_pct", ascending=False).reset_index(drop=True).round({"missing_pct": 2}) |
| |
|
| |
|
| | def correlation_matrix(df: pd.DataFrame) -> pd.DataFrame: |
| | """Compute the correlation matrix for numeric columns.""" |
| | numeric_df = df.select_dtypes(include=["number"]) |
| | if numeric_df.empty or numeric_df.shape[1] < 2: |
| | return pd.DataFrame() |
| | corr = numeric_df.corr() |
| | return corr.round(3) |
| |
|
| |
|
| | def filter_dataframe( |
| | df: pd.DataFrame, |
| | numeric_filters: Mapping[str, Tuple[Optional[float], Optional[float]]], |
| | categorical_filters: Mapping[str, Iterable[str]], |
| | date_filters: Mapping[str, Tuple[Optional[str], Optional[str]]], |
| | ) -> pd.DataFrame: |
| | """Filter the dataset according to the provided filter definitions.""" |
| | filtered = df.copy() |
| |
|
| | for column, bounds in numeric_filters.items(): |
| | if column not in filtered.columns or bounds is None: |
| | continue |
| | lower, upper = bounds |
| | series = filtered[column] |
| | if lower is not None: |
| | filtered = filtered[series >= lower] |
| | if upper is not None: |
| | filtered = filtered[series <= upper] |
| |
|
| | for column, values in categorical_filters.items(): |
| | if column not in filtered.columns: |
| | continue |
| | values = list(values) |
| | if not values: |
| | continue |
| | filtered = filtered[filtered[column].isin(values)] |
| |
|
| | for column, bounds in date_filters.items(): |
| | if column not in filtered.columns or bounds is None: |
| | continue |
| | start, end = bounds |
| | series = pd.to_datetime(filtered[column], errors="coerce") |
| | if start: |
| | filtered = filtered[series >= pd.to_datetime(start)] |
| | if end: |
| | filtered = filtered[series <= pd.to_datetime(end)] |
| |
|
| | return filtered |
| |
|
| |
|
| | def filter_metadata(df: pd.DataFrame, column_types: ColumnTypes, categorical_limit: int = 200) -> Dict[str, object]: |
| | """Pre-compute useful metadata for rendering filter controls.""" |
| | metadata: Dict[str, object] = {"numeric": {}, "categorical": {}, "datetime": {}} |
| |
|
| | for column in column_types.numeric: |
| | series = df[column].dropna() |
| | if series.empty: |
| | continue |
| | metadata["numeric"][column] = { |
| | "min": float(series.min()), |
| | "max": float(series.max()), |
| | } |
| |
|
| | for column in column_types.categorical: |
| | series = df[column].dropna().astype(str) |
| | unique_values = series.unique().tolist() |
| | if len(unique_values) > categorical_limit: |
| | unique_values = unique_values[:categorical_limit] |
| | metadata["categorical"][column] = unique_values |
| |
|
| | for column in column_types.datetime: |
| | series = pd.to_datetime(df[column], errors="coerce") |
| | series = series.dropna() |
| | if series.empty: |
| | continue |
| | metadata["datetime"][column] = { |
| | "min": series.min().date(), |
| | "max": series.max().date(), |
| | } |
| |
|
| | return metadata |
| |
|
| |
|
| | def sample_dataset_options() -> Dict[str, str]: |
| | """Return available bundled datasets and their descriptions.""" |
| | options: Dict[str, str] = {} |
| | if not SAMPLE_DATA_DIR.exists(): |
| | return options |
| |
|
| | for path in sorted(SAMPLE_DATA_DIR.iterdir()): |
| | if not path.is_file(): |
| | continue |
| | if path.suffix.lower() not in {".csv", ".xlsx", ".xls"}: |
| | continue |
| | description = SAMPLE_DESCRIPTIONS.get(path.name, f"Sample dataset sourced from '{path.name}'.") |
| | options[path.name] = description |
| | return options |
| |
|
| |
|
| | def load_sample_dataset(selection: str) -> DatasetBundle: |
| | """Load a dataset bundled inside the local data directory.""" |
| | if not selection: |
| | raise ValueError("Please select a sample dataset from the dropdown.") |
| |
|
| | path = SAMPLE_DATA_DIR / selection |
| | if not path.exists(): |
| | raise ValueError( |
| | f"Sample dataset '{selection}' was not found in the 'data/' directory. " |
| | "Ensure the file exists and try again." |
| | ) |
| |
|
| | dataframe = _read_from_path(path, selection) |
| | dataframe = ensure_unique_columns(dataframe) |
| | dataframe, datetime_cols = coerce_datetime_columns(dataframe) |
| | column_types = infer_column_types(dataframe) |
| | column_types = ColumnTypes( |
| | numeric=column_types.numeric, |
| | categorical=column_types.categorical, |
| | datetime=tuple(sorted(set(column_types.datetime + tuple(datetime_cols)))), |
| | ) |
| |
|
| | return DatasetBundle( |
| | dataframe=dataframe, |
| | column_types=column_types, |
| | source_name=selection, |
| | ) |
| |
|