Files
BeyondCXAnalytics_AE/backend/beyond_metrics/pipeline.py
2025-12-29 18:12:32 +01:00

292 lines
9.4 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from importlib import import_module
from typing import Any, Dict, List, Mapping, Optional, cast, Callable
import logging
import os
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from .io import (
DataSource,
ResultsSink,
)
LOGGER = logging.getLogger(__name__)
def setup_basic_logging(level: str = "INFO") -> None:
"""
Configuración básica de logging, por si se necesita desde scripts.
"""
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
format="%(asctime)s %(levelname)s [%(name)s] %(message)s",
)
def _import_class(path: str) -> type:
"""
Import dinámico de una clase a partir de un string tipo:
"beyond_metrics.dimensions.VolumetriaMetrics"
"""
LOGGER.debug("Importando clase %s", path)
module_name, class_name = path.rsplit(".", 1)
module = import_module(module_name)
cls = getattr(module, class_name)
return cls
def _serialize_for_json(obj: Any) -> Any:
"""
Convierte objetos típicos de numpy/pandas en tipos JSON-friendly.
"""
if obj is None or isinstance(obj, (str, int, float, bool)):
return obj
if isinstance(obj, (np.integer, np.floating)):
return float(obj)
if isinstance(obj, pd.DataFrame):
return obj.to_dict(orient="records")
if isinstance(obj, pd.Series):
return obj.to_list()
if isinstance(obj, (list, tuple)):
return [_serialize_for_json(x) for x in obj]
if isinstance(obj, dict):
return {str(k): _serialize_for_json(v) for k, v in obj.items()}
return str(obj)
PostRunCallback = Callable[[Dict[str, Any], str, ResultsSink], None]
@dataclass
class BeyondMetricsPipeline:
"""
Pipeline principal de BeyondMetrics.
- Lee un CSV desde un DataSource (local, S3, Google Drive, etc.).
- Ejecuta dimensiones configuradas en un dict de configuración.
- Serializa resultados numéricos/tabulares a JSON.
- Guarda las imágenes de los métodos que comienzan por 'plot_'.
"""
datasource: DataSource
sink: ResultsSink
dimensions_config: Mapping[str, Any]
dimension_params: Optional[Mapping[str, Mapping[str, Any]]] = None
post_run: Optional[List[PostRunCallback]] = None
def run(
self,
input_path: str,
run_dir: str,
*,
write_results_json: bool = True,
) -> Dict[str, Any]:
LOGGER.info("Inicio de ejecución de BeyondMetricsPipeline")
LOGGER.info("Leyendo CSV de entrada: %s", input_path)
# 1) Leer datos
df = self.datasource.read_csv(input_path)
LOGGER.info("CSV leído con %d filas y %d columnas", df.shape[0], df.shape[1])
# 2) Determinar carpeta/base de salida para esta ejecución
run_base = run_dir.rstrip("/")
LOGGER.info("Ruta base de esta ejecución: %s", run_base)
# 3) Ejecutar dimensiones
dimensions_cfg = self.dimensions_config
if not isinstance(dimensions_cfg, dict):
raise ValueError("El bloque 'dimensions' debe ser un dict.")
all_results: Dict[str, Any] = {}
for dim_name, dim_cfg in dimensions_cfg.items():
if not isinstance(dim_cfg, dict):
raise ValueError(f"Config inválida para dimensión '{dim_name}' (debe ser dict).")
if not dim_cfg.get("enabled", True):
LOGGER.info("Dimensión '%s' desactivada; se omite.", dim_name)
continue
class_path = dim_cfg.get("class")
if not class_path:
raise ValueError(f"Falta 'class' en la dimensión '{dim_name}'.")
metrics: List[str] = dim_cfg.get("metrics", [])
if not metrics:
LOGGER.info("Dimensión '%s' sin métricas configuradas; se omite.", dim_name)
continue
cls = _import_class(class_path)
extra_kwargs = {}
if self.dimension_params is not None:
extra_kwargs = self.dimension_params.get(dim_name, {}) or {}
# Las dimensiones reciben df en el constructor
instance = cls(df, **extra_kwargs)
dim_results: Dict[str, Any] = {}
for metric_name in metrics:
LOGGER.info(" - Ejecutando métrica '%s.%s'", dim_name, metric_name)
result = self._execute_metric(instance, metric_name, run_base, dim_name)
dim_results[metric_name] = result
all_results[dim_name] = dim_results
# 4) Guardar JSON de resultados (opcional)
if write_results_json:
results_json_path = f"{run_base}/results.json"
LOGGER.info("Guardando resultados en JSON: %s", results_json_path)
self.sink.write_json(results_json_path, all_results)
# 5) Ejecutar callbacks post-run (scorers, agentes, etc.)
if self.post_run:
LOGGER.info("Ejecutando %d callbacks post-run...", len(self.post_run))
for cb in self.post_run:
try:
LOGGER.info("Ejecutando post-run callback: %s", cb)
cb(all_results, run_base, self.sink)
except Exception:
LOGGER.exception("Error ejecutando post-run callback %s", cb)
LOGGER.info("Ejecución completada correctamente.")
return all_results
def _execute_metric(
self,
instance: Any,
metric_name: str,
run_base: str,
dim_name: str,
) -> Any:
"""
Ejecuta una métrica:
- Si empieza por 'plot_' -> se asume que devuelve Axes:
- se guarda la figura como PNG
- se devuelve {"type": "image", "path": "..."}
- Si no, se serializa el valor a JSON.
Además, para métricas categóricas (por skill/canal) de la dimensión
'volumetry', devolvemos explícitamente etiquetas y valores para que
el frontend pueda saber a qué pertenece cada número.
"""
method = getattr(instance, metric_name, None)
if method is None or not callable(method):
raise ValueError(
f"La métrica '{metric_name}' no existe en {type(instance).__name__}"
)
# Caso plots
if metric_name.startswith("plot_"):
ax = method()
if not isinstance(ax, Axes):
raise TypeError(
f"La métrica '{metric_name}' de '{type(instance).__name__}' "
f"debería devolver un matplotlib.axes.Axes"
)
fig = ax.get_figure()
if fig is None:
raise RuntimeError(
"Axes.get_figure() devolvió None, lo cual no debería pasar."
)
fig = cast(Figure, fig)
filename = f"{dim_name}_{metric_name}.png"
img_path = f"{run_base}/{filename}"
LOGGER.debug("Guardando figura en %s", img_path)
self.sink.write_figure(img_path, fig)
plt.close(fig)
return {
"type": "image",
"path": img_path,
}
# Caso numérico/tabular
value = method()
# Caso especial: series categóricas de volumetría (por skill / canal)
# Devolvemos {"labels": [...], "values": [...]} para mantener la
# información de etiquetas en el JSON.
if (
dim_name == "volumetry"
and isinstance(value, pd.Series)
and metric_name
in {
"volume_by_channel",
"volume_by_skill",
"channel_distribution_pct",
"skill_distribution_pct",
}
):
labels = [str(idx) for idx in value.index.tolist()]
# Aseguramos que todos los valores sean numéricos JSON-friendly
values = [float(v) for v in value.astype(float).tolist()]
return {
"labels": labels,
"values": values,
}
return _serialize_for_json(value)
def load_dimensions_config(path: str) -> Dict[str, Any]:
"""
Carga un JSON de configuración que contiene solo el bloque 'dimensions'.
"""
import json
from pathlib import Path
with Path(path).open("r", encoding="utf-8") as f:
cfg = json.load(f)
dimensions = cfg.get("dimensions")
if dimensions is None:
raise ValueError("El fichero de configuración debe contener un bloque 'dimensions'.")
return dimensions
def build_pipeline(
dimensions_config_path: str,
datasource: DataSource,
sink: ResultsSink,
dimension_params: Optional[Mapping[str, Mapping[str, Any]]] = None,
post_run: Optional[List[PostRunCallback]] = None,
) -> BeyondMetricsPipeline:
"""
Crea un BeyondMetricsPipeline a partir de:
- ruta al JSON con dimensiones/métricas
- un DataSource ya construido (local/S3/Drive)
- un ResultsSink ya construido (local/S3/Drive)
- una lista opcional de callbacks post_run que se ejecutan al final
(útil para scorers, agentes de IA, etc.)
"""
dims_cfg = load_dimensions_config(dimensions_config_path)
return BeyondMetricsPipeline(
datasource=datasource,
sink=sink,
dimensions_config=dims_cfg,
dimension_params=dimension_params,
post_run=post_run,
)