from __future__ import annotations from dataclasses import dataclass from datetime import datetime from importlib import import_module from typing import Any, Dict, List, Mapping, Optional, cast, Callable import logging import os import matplotlib.pyplot as plt import numpy as np import pandas as pd from matplotlib.axes import Axes from matplotlib.figure import Figure from .io import ( DataSource, ResultsSink, ) LOGGER = logging.getLogger(__name__) def setup_basic_logging(level: str = "INFO") -> None: """ Basic logging configuration, if needed from scripts. """ logging.basicConfig( level=getattr(logging, level.upper(), logging.INFO), format="%(asctime)s %(levelname)s [%(name)s] %(message)s", ) def _import_class(path: str) -> type: """ Dynamic import of a class from a string like: "beyond_metrics.dimensions.VolumetriaMetrics" """ LOGGER.debug("Importing class %s", path) module_name, class_name = path.rsplit(".", 1) module = import_module(module_name) cls = getattr(module, class_name) return cls def _serialize_for_json(obj: Any) -> Any: """ Converts typical numpy/pandas objects to JSON-friendly types. """ if obj is None or isinstance(obj, (str, int, float, bool)): return obj if isinstance(obj, (np.integer, np.floating)): return float(obj) if isinstance(obj, pd.DataFrame): return obj.to_dict(orient="records") if isinstance(obj, pd.Series): return obj.to_list() if isinstance(obj, (list, tuple)): return [_serialize_for_json(x) for x in obj] if isinstance(obj, dict): return {str(k): _serialize_for_json(v) for k, v in obj.items()} return str(obj) PostRunCallback = Callable[[Dict[str, Any], str, ResultsSink], None] @dataclass class BeyondMetricsPipeline: """ Main BeyondMetrics pipeline. - Reads a CSV from a DataSource (local, S3, Google Drive, etc.). - Executes dimensions configured in a config dict. - Serializes numeric/tabular results to JSON. - Saves images from methods starting with 'plot_'. """ datasource: DataSource sink: ResultsSink dimensions_config: Mapping[str, Any] dimension_params: Optional[Mapping[str, Mapping[str, Any]]] = None post_run: Optional[List[PostRunCallback]] = None def run( self, input_path: str, run_dir: str, *, write_results_json: bool = True, ) -> Dict[str, Any]: LOGGER.info("Starting BeyondMetricsPipeline execution") LOGGER.info("Reading input CSV: %s", input_path) # 1) Read data df = self.datasource.read_csv(input_path) LOGGER.info("CSV read with %d rows and %d columns", df.shape[0], df.shape[1]) # 2) Determine output folder/base for this execution run_base = run_dir.rstrip("/") LOGGER.info("Base path for this execution: %s", run_base) # 3) Execute dimensions dimensions_cfg = self.dimensions_config if not isinstance(dimensions_cfg, dict): raise ValueError("The 'dimensions' block must be a dict.") all_results: Dict[str, Any] = {} for dim_name, dim_cfg in dimensions_cfg.items(): if not isinstance(dim_cfg, dict): raise ValueError(f"Invalid config for dimension '{dim_name}' (must be dict).") if not dim_cfg.get("enabled", True): LOGGER.info("Dimension '%s' disabled; skipping.", dim_name) continue class_path = dim_cfg.get("class") if not class_path: raise ValueError(f"Missing 'class' in dimension '{dim_name}'.") metrics: List[str] = dim_cfg.get("metrics", []) if not metrics: LOGGER.info("Dimension '%s' has no configured metrics; skipping.", dim_name) continue cls = _import_class(class_path) extra_kwargs = {} if self.dimension_params is not None: extra_kwargs = self.dimension_params.get(dim_name, {}) or {} # Dimensions receive df in the constructor instance = cls(df, **extra_kwargs) dim_results: Dict[str, Any] = {} for metric_name in metrics: LOGGER.info(" - Executing metric '%s.%s'", dim_name, metric_name) result = self._execute_metric(instance, metric_name, run_base, dim_name) dim_results[metric_name] = result all_results[dim_name] = dim_results # 4) Save results JSON (optional) if write_results_json: results_json_path = f"{run_base}/results.json" LOGGER.info("Saving results to JSON: %s", results_json_path) self.sink.write_json(results_json_path, all_results) # 5) Execute post-run callbacks (scorers, agents, etc.) if self.post_run: LOGGER.info("Executing %d post-run callbacks...", len(self.post_run)) for cb in self.post_run: try: LOGGER.info("Executing post-run callback: %s", cb) cb(all_results, run_base, self.sink) except Exception: LOGGER.exception("Error executing post-run callback %s", cb) LOGGER.info("Execution completed successfully.") return all_results def _execute_metric( self, instance: Any, metric_name: str, run_base: str, dim_name: str, ) -> Any: """ Executes a metric: - If it starts with 'plot_' -> assumed to return Axes: - the figure is saved as PNG - returns {"type": "image", "path": "..."} - Otherwise, the value is serialized to JSON. Additionally, for categorical metrics (by skill/channel) from the 'volumetry' dimension, we explicitly return labels and values so the frontend can know what each number belongs to. """ method = getattr(instance, metric_name, None) if method is None or not callable(method): raise ValueError( f"Metric '{metric_name}' does not exist in {type(instance).__name__}" ) # Plot case if metric_name.startswith("plot_"): ax = method() if not isinstance(ax, Axes): raise TypeError( f"Metric '{metric_name}' of '{type(instance).__name__}' " f"should return a matplotlib.axes.Axes" ) fig = ax.get_figure() if fig is None: raise RuntimeError( "Axes.get_figure() returned None, which should not happen." ) fig = cast(Figure, fig) filename = f"{dim_name}_{metric_name}.png" img_path = f"{run_base}/{filename}" LOGGER.debug("Saving figure to %s", img_path) self.sink.write_figure(img_path, fig) plt.close(fig) return { "type": "image", "path": img_path, } # Numeric/tabular case value = method() # Special case: categorical series from volumetry (by skill / channel) # Return {"labels": [...], "values": [...]} to maintain # label information in the JSON. if ( dim_name == "volumetry" and isinstance(value, pd.Series) and metric_name in { "volume_by_channel", "volume_by_skill", "channel_distribution_pct", "skill_distribution_pct", } ): labels = [str(idx) for idx in value.index.tolist()] # Ensure all values are JSON-friendly numeric values = [float(v) for v in value.astype(float).tolist()] return { "labels": labels, "values": values, } return _serialize_for_json(value) def load_dimensions_config(path: str) -> Dict[str, Any]: """ Loads a JSON configuration file containing only the 'dimensions' block. """ import json from pathlib import Path with Path(path).open("r", encoding="utf-8") as f: cfg = json.load(f) dimensions = cfg.get("dimensions") if dimensions is None: raise ValueError("The configuration file must contain a 'dimensions' block.") return dimensions def build_pipeline( dimensions_config_path: str, datasource: DataSource, sink: ResultsSink, dimension_params: Optional[Mapping[str, Mapping[str, Any]]] = None, post_run: Optional[List[PostRunCallback]] = None, ) -> BeyondMetricsPipeline: """ Creates a BeyondMetricsPipeline from: - path to JSON with dimensions/metrics - an already constructed DataSource (local/S3/Drive) - an already constructed ResultsSink (local/S3/Drive) - an optional list of post_run callbacks that execute at the end (useful for scorers, AI agents, etc.) """ dims_cfg = load_dimensions_config(dimensions_config_path) return BeyondMetricsPipeline( datasource=datasource, sink=sink, dimensions_config=dims_cfg, dimension_params=dimension_params, post_run=post_run, )