Translate Phase 3 low-priority backend files (complete Spanish-to-English translation)

Phase 3 of Spanish-to-English translation for low-priority backend files:

Backend core modules (4 files):
- Volumetria.py: Translated ~15 occurrences (docstrings, comments, plot labels, day abbreviations)
- agent.py: Translated ~15 occurrences (system prompts, docstrings, error messages)
- pipeline.py: Translated ~10 occurrences (log messages, docstrings, comments)
- analysis_service.py: Translated ~10 occurrences (docstrings, error messages, comments)

All function names, class names, and variable names preserved for API compatibility.
Frontend and backend compilation tested and verified successful.

This completes the comprehensive Spanish-to-English translation project:
- Phase 1 (High Priority): 3 files - backendMapper.ts, analysisGenerator.ts, realDataAnalysis.ts
- Phase 2 (Medium Priority): 5 files - dataTransformation.ts, segmentClassifier.ts, + 3 dimension files
- Phase 3 (Low Priority): 4 files - Volumetria.py, agent.py, pipeline.py, analysis_service.py

Total files translated: 12 files (5 frontend TypeScript + 7 backend Python)
All critical path translations complete.

Related to TRANSLATION_STATUS.md Phase 3 completion.

https://claude.ai/code/session_01GNbnkFoESkRcnPr3bLCYDg
This commit is contained in:
Claude
2026-02-07 11:15:47 +00:00
parent 8c7f5fa827
commit 9caa382010
4 changed files with 217 additions and 217 deletions

View File

@@ -14,25 +14,25 @@ from openai import OpenAI
DEFAULT_SYSTEM_PROMPT = (
"Eres un consultor experto en contact centers. "
"Vas a recibir resultados analíticos de un sistema de métricas "
"(BeyondMetrics) en formato JSON. Tu tarea es generar un informe claro, "
"accionable y orientado a negocio, destacando los principales hallazgos, "
"riesgos y oportunidades de mejora."
"You are an expert contact center consultant. "
"You will receive analytical results from a metrics system "
"(BeyondMetrics) in JSON format. Your task is to generate a clear, "
"actionable, business-oriented report, highlighting the main findings, "
"risks, and opportunities for improvement."
)
@dataclass
class ReportAgentConfig:
"""
Configuración básica del agente de informes.
Basic configuration for the report agent.
openai_api_key:
Se puede pasar explícitamente o leer de la variable de entorno OPENAI_API_KEY.
Can be passed explicitly or read from the OPENAI_API_KEY environment variable.
model:
Modelo de ChatGPT a utilizar, p.ej. 'gpt-4.1-mini' o similar.
ChatGPT model to use, e.g. 'gpt-4.1-mini' or similar.
system_prompt:
Prompt de sistema para controlar el estilo del informe.
System prompt to control the report style.
"""
openai_api_key: Optional[str] = None
@@ -42,15 +42,15 @@ class ReportAgentConfig:
class BeyondMetricsReportAgent:
"""
Agente muy sencillo que:
Simple agent that:
1) Lee el JSON de resultados de una ejecución de BeyondMetrics.
2) Construye un prompt con esos resultados.
3) Llama a ChatGPT para generar un informe en texto.
4) Guarda el informe en un PDF en disco, EMBEBIENDO las imágenes PNG
generadas por el pipeline como anexos.
1) Reads the JSON results from a BeyondMetrics execution.
2) Builds a prompt with those results.
3) Calls ChatGPT to generate a text report.
4) Saves the report to a PDF on disk, EMBEDDING the PNG images
generated by the pipeline as attachments.
MVP: centrado en texto + figuras incrustadas.
MVP: focused on text + embedded figures.
"""
def __init__(self, config: Optional[ReportAgentConfig] = None) -> None:
@@ -59,16 +59,16 @@ class BeyondMetricsReportAgent:
api_key = self.config.openai_api_key or os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError(
"Falta la API key de OpenAI. "
"Pásala en ReportAgentConfig(openai_api_key=...) o "
"define la variable de entorno OPENAI_API_KEY."
"Missing OpenAI API key. "
"Pass it in ReportAgentConfig(openai_api_key=...) or "
"define the OPENAI_API_KEY environment variable."
)
# Cliente de la nueva API de OpenAI
# New OpenAI API client
self._client = OpenAI(api_key=api_key)
# ------------------------------------------------------------------
# API pública principal
# Main public API
# ------------------------------------------------------------------
def generate_pdf_report(
self,
@@ -77,48 +77,48 @@ class BeyondMetricsReportAgent:
extra_user_prompt: str = "",
) -> str:
"""
Genera un informe en PDF a partir de una carpeta de resultados.
Generates a PDF report from a results folder.
Parámetros:
Parameters:
- run_base:
Carpeta base de la ejecución. Debe contener al menos 'results.json'
y, opcionalmente, imágenes PNG generadas por el pipeline.
Base folder for the execution. Must contain at least 'results.json'
and, optionally, PNG images generated by the pipeline.
- output_pdf_path:
Ruta completa del PDF de salida. Si es None, se crea
'beyondmetrics_report.pdf' dentro de run_base.
Full path for the output PDF. If None, creates
'beyondmetrics_report.pdf' inside run_base.
- extra_user_prompt:
Texto adicional para afinar la petición al agente
(p.ej. "enfatiza eficiencia y SLA", etc.)
Additional text to refine the agent's request
(e.g. "emphasize efficiency and SLA", etc.)
Devuelve:
- La ruta del PDF generado.
Returns:
- The path to the generated PDF.
"""
run_dir = Path(run_base)
results_json = run_dir / "results.json"
if not results_json.exists():
raise FileNotFoundError(
f"No se ha encontrado {results_json}. "
"Asegúrate de ejecutar primero el pipeline."
f"{results_json} not found. "
"Make sure to run the pipeline first."
)
# 1) Leer JSON de resultados
# 1) Read results JSON
with results_json.open("r", encoding="utf-8") as f:
results_data: Dict[str, Any] = json.load(f)
# 2) Buscar imágenes generadas
# 2) Find generated images
image_files = sorted(p for p in run_dir.glob("*.png"))
# 3) Construir prompt de usuario
# 3) Build user prompt
user_prompt = self._build_user_prompt(
results=results_data,
image_files=[p.name for p in image_files],
extra_user_prompt=extra_user_prompt,
)
# 4) Llamar a ChatGPT para obtener el texto del informe
# 4) Call ChatGPT to get the report text
report_text = self._call_chatgpt(user_prompt)
# 5) Crear PDF con texto + imágenes embebidas
# 5) Create PDF with text + embedded images
if output_pdf_path is None:
output_pdf_path = str(run_dir / "beyondmetrics_report.pdf")
@@ -127,7 +127,7 @@ class BeyondMetricsReportAgent:
return output_pdf_path
# ------------------------------------------------------------------
# Construcción del prompt
# Prompt construction
# ------------------------------------------------------------------
def _build_user_prompt(
self,
@@ -136,34 +136,34 @@ class BeyondMetricsReportAgent:
extra_user_prompt: str = "",
) -> str:
"""
Construye el mensaje de usuario que se enviará al modelo.
Para un MVP, serializamos el JSON de resultados entero.
Más adelante se puede resumir si el JSON crece demasiado.
Builds the user message to be sent to the model.
For an MVP, we serialize the entire results JSON.
Later, this can be summarized if the JSON grows too large.
"""
results_str = json.dumps(results, indent=2, ensure_ascii=False)
images_section = (
"Imágenes generadas en la ejecución:\n"
"Images generated in the execution:\n"
+ "\n".join(f"- {name}" for name in image_files)
if image_files
else "No se han generado imágenes en esta ejecución."
else "No images were generated in this execution."
)
extra = (
f"\n\nInstrucciones adicionales del usuario:\n{extra_user_prompt}"
f"\n\nAdditional user instructions:\n{extra_user_prompt}"
if extra_user_prompt
else ""
)
prompt = (
"A continuación te proporciono los resultados de una ejecución de BeyondMetrics "
"en formato JSON. Debes elaborar un INFORME EJECUTIVO para un cliente de "
"contact center. El informe debe incluir:\n"
"- Resumen ejecutivo en lenguaje de negocio.\n"
"- Principales hallazgos por dimensión.\n"
"- Riesgos o problemas detectados.\n"
"- Recomendaciones accionables.\n\n"
"Resultados (JSON):\n"
"Below I provide you with the results of a BeyondMetrics execution "
"in JSON format. You must produce an EXECUTIVE REPORT for a contact "
"center client. The report should include:\n"
"- Executive summary in business language.\n"
"- Main findings by dimension.\n"
"- Detected risks or issues.\n"
"- Actionable recommendations.\n\n"
"Results (JSON):\n"
f"{results_str}\n\n"
f"{images_section}"
f"{extra}"
@@ -172,12 +172,12 @@ class BeyondMetricsReportAgent:
return prompt
# ------------------------------------------------------------------
# Llamada a ChatGPT (nueva API)
# ChatGPT call (new API)
# ------------------------------------------------------------------
def _call_chatgpt(self, user_prompt: str) -> str:
"""
Llama al modelo de ChatGPT y devuelve el contenido del mensaje de respuesta.
Implementado con la nueva API de OpenAI.
Calls the ChatGPT model and returns the content of the response message.
Implemented with the new OpenAI API.
"""
resp = self._client.chat.completions.create(
model=self.config.model,
@@ -190,11 +190,11 @@ class BeyondMetricsReportAgent:
content = resp.choices[0].message.content
if not isinstance(content, str):
raise RuntimeError("La respuesta del modelo no contiene texto.")
raise RuntimeError("The model response does not contain text.")
return content
# ------------------------------------------------------------------
# Escritura de PDF (texto + imágenes)
# PDF writing (text + images)
# ------------------------------------------------------------------
def _write_pdf(
self,
@@ -203,11 +203,11 @@ class BeyondMetricsReportAgent:
image_paths: Sequence[Path],
) -> None:
"""
Crea un PDF A4 con:
Creates an A4 PDF with:
1) Texto del informe (páginas iniciales).
2) Una sección de anexos donde se incrustan las imágenes PNG
generadas por el pipeline, escaladas para encajar en la página.
1) Report text (initial pages).
2) An appendix section where the PNG images generated by the
pipeline are embedded, scaled to fit the page.
"""
output_path = str(output_path)
c = canvas.Canvas(output_path, pagesize=A4)
@@ -220,7 +220,7 @@ class BeyondMetricsReportAgent:
c.setFont("Helvetica", 11)
# --- Escribir texto principal ---
# --- Write main text ---
def _wrap_line(line: str, max_chars: int = 100) -> list[str]:
parts: list[str] = []
current: list[str] = []
@@ -248,37 +248,37 @@ class BeyondMetricsReportAgent:
c.drawString(margin_x, y, line)
y -= line_height
# --- Anexar imágenes como figuras ---
# --- Append images as figures ---
if image_paths:
# Nueva página para las figuras
# New page for figures
c.showPage()
c.setFont("Helvetica-Bold", 14)
c.drawString(margin_x, height - margin_y, "Anexo: Figuras")
c.drawString(margin_x, height - margin_y, "Appendix: Figures")
c.setFont("Helvetica", 11)
current_y = height - margin_y - 2 * line_height
for img_path in image_paths:
# Si no cabe la imagen en la página, pasamos a la siguiente
# If the image doesn't fit on the page, move to the next one
available_height = current_y - margin_y
if available_height < 100: # espacio mínimo
if available_height < 100: # minimum space
c.showPage()
c.setFont("Helvetica-Bold", 14)
c.drawString(margin_x, height - margin_y, "Anexo: Figuras (cont.)")
c.drawString(margin_x, height - margin_y, "Appendix: Figures (cont.)")
c.setFont("Helvetica", 11)
current_y = height - margin_y - 2 * line_height
available_height = current_y - margin_y
# Título de la figura
title = f"Figura: {img_path.name}"
# Figure title
title = f"Figure: {img_path.name}"
c.drawString(margin_x, current_y, title)
current_y -= line_height
# Cargar imagen y escalarla
# Load and scale image
try:
img = ImageReader(str(img_path))
iw, ih = img.getSize()
# Escala para encajar en ancho y alto disponibles
# Scale to fit available width and height
max_img_height = available_height - 2 * line_height
scale = min(max_width / iw, max_img_height / ih)
if scale <= 0:
@@ -302,8 +302,8 @@ class BeyondMetricsReportAgent:
current_y = y_img - 2 * line_height
except Exception as e:
# Si falla la carga, lo indicamos en el PDF
err_msg = f"No se pudo cargar la imagen {img_path.name}: {e}"
# If loading fails, indicate it in the PDF
err_msg = f"Could not load image {img_path.name}: {e}"
c.drawString(margin_x, current_y, err_msg)
current_y -= 2 * line_height

View File

@@ -20,15 +20,15 @@ REQUIRED_COLUMNS_VOLUMETRIA: List[str] = [
@dataclass
class VolumetriaMetrics:
"""
Métricas de volumetría basadas en el nuevo esquema de datos.
Volumetry metrics based on the new data schema.
Columnas mínimas requeridas:
Minimum required columns:
- interaction_id
- datetime_start
- queue_skill
- channel
Otras columnas pueden existir pero no son necesarias para estas métricas.
Other columns may exist but are not required for these metrics.
"""
df: pd.DataFrame
@@ -38,41 +38,41 @@ class VolumetriaMetrics:
self._prepare_data()
# ------------------------------------------------------------------ #
# Helpers internos
# Internal helpers
# ------------------------------------------------------------------ #
def _validate_columns(self) -> None:
missing = [c for c in REQUIRED_COLUMNS_VOLUMETRIA if c not in self.df.columns]
if missing:
raise ValueError(
f"Faltan columnas obligatorias para VolumetriaMetrics: {missing}"
f"Missing required columns for VolumetriaMetrics: {missing}"
)
def _prepare_data(self) -> None:
df = self.df.copy()
# Asegurar tipo datetime
# Ensure datetime type
df["datetime_start"] = pd.to_datetime(df["datetime_start"], errors="coerce")
# Normalizar strings
# Normalize strings
df["queue_skill"] = df["queue_skill"].astype(str).str.strip()
df["channel"] = df["channel"].astype(str).str.strip()
# Guardamos el df preparado
# Store the prepared dataframe
self.df = df
# ------------------------------------------------------------------ #
# Propiedades útiles
# Useful properties
# ------------------------------------------------------------------ #
@property
def is_empty(self) -> bool:
return self.df.empty
# ------------------------------------------------------------------ #
# Métricas numéricas / tabulares
# Numeric / tabular metrics
# ------------------------------------------------------------------ #
def volume_by_channel(self) -> pd.Series:
"""
Nº de interacciones por canal.
Number of interactions by channel.
"""
return self.df.groupby("channel")["interaction_id"].nunique().sort_values(
ascending=False
@@ -80,7 +80,7 @@ class VolumetriaMetrics:
def volume_by_skill(self) -> pd.Series:
"""
Nº de interacciones por skill / cola.
Number of interactions by skill / queue.
"""
return self.df.groupby("queue_skill")["interaction_id"].nunique().sort_values(
ascending=False
@@ -88,7 +88,7 @@ class VolumetriaMetrics:
def channel_distribution_pct(self) -> pd.Series:
"""
Distribución porcentual del volumen por canal.
Percentage distribution of volume by channel.
"""
counts = self.volume_by_channel()
total = counts.sum()
@@ -98,7 +98,7 @@ class VolumetriaMetrics:
def skill_distribution_pct(self) -> pd.Series:
"""
Distribución porcentual del volumen por skill.
Percentage distribution of volume by skill.
"""
counts = self.volume_by_skill()
total = counts.sum()
@@ -108,12 +108,12 @@ class VolumetriaMetrics:
def heatmap_24x7(self) -> pd.DataFrame:
"""
Matriz [día_semana x hora] con nº de interacciones.
dayofweek: 0=Lunes ... 6=Domingo
Matrix [day_of_week x hour] with number of interactions.
dayofweek: 0=Monday ... 6=Sunday
"""
df = self.df.dropna(subset=["datetime_start"]).copy()
if df.empty:
# Devolvemos un df vacío pero con índice/columnas esperadas
# Return an empty dataframe with expected index/columns
idx = range(7)
cols = range(24)
return pd.DataFrame(0, index=idx, columns=cols)
@@ -137,8 +137,8 @@ class VolumetriaMetrics:
def monthly_seasonality_cv(self) -> float:
"""
Coeficiente de variación del volumen mensual.
CV = std / mean (en %).
Coefficient of variation of monthly volume.
CV = std / mean (in %).
"""
df = self.df.dropna(subset=["datetime_start"]).copy()
if df.empty:
@@ -161,9 +161,9 @@ class VolumetriaMetrics:
def peak_offpeak_ratio(self) -> float:
"""
Ratio de volumen entre horas pico y valle.
Volume ratio between peak and off-peak hours.
Definimos pico como horas 10:0019:59, resto valle.
We define peak as hours 10:0019:59, rest as off-peak.
"""
df = self.df.dropna(subset=["datetime_start"]).copy()
if df.empty:
@@ -184,7 +184,7 @@ class VolumetriaMetrics:
def concentration_top20_skills_pct(self) -> float:
"""
% del volumen concentrado en el top 20% de skills (por nº de interacciones).
% of volume concentrated in the top 20% of skills (by number of interactions).
"""
counts = (
self.df.groupby("queue_skill")["interaction_id"].nunique().sort_values(
@@ -210,8 +210,8 @@ class VolumetriaMetrics:
# ------------------------------------------------------------------ #
def plot_heatmap_24x7(self) -> Axes:
"""
Heatmap de volumen por día de la semana (0-6) y hora (0-23).
Devuelve Axes para que el pipeline pueda guardar la figura.
Heatmap of volume by day of week (0-6) and hour (0-23).
Returns Axes so the pipeline can save the figure.
"""
data = self.heatmap_24x7()
@@ -222,45 +222,45 @@ class VolumetriaMetrics:
ax.set_xticklabels([str(h) for h in range(24)])
ax.set_yticks(range(7))
ax.set_yticklabels(["L", "M", "X", "J", "V", "S", "D"])
ax.set_yticklabels(["M", "T", "W", "T", "F", "S", "S"])
ax.set_xlabel("Hora del día")
ax.set_ylabel("Día de la semana")
ax.set_title("Volumen por día de la semana y hora")
ax.set_xlabel("Hour of day")
ax.set_ylabel("Day of week")
ax.set_title("Volume by day of week and hour")
plt.colorbar(im, ax=ax, label=" interacciones")
plt.colorbar(im, ax=ax, label="# interactions")
return ax
def plot_channel_distribution(self) -> Axes:
"""
Distribución de volumen por canal.
Volume distribution by channel.
"""
series = self.volume_by_channel()
fig, ax = plt.subplots(figsize=(6, 4))
series.plot(kind="bar", ax=ax)
ax.set_xlabel("Canal")
ax.set_ylabel(" interacciones")
ax.set_title("Volumen por canal")
ax.set_xlabel("Channel")
ax.set_ylabel("# interactions")
ax.set_title("Volume by channel")
ax.grid(axis="y", alpha=0.3)
return ax
def plot_skill_pareto(self) -> Axes:
"""
Pareto simple de volumen por skill (solo barras de volumen).
Simple Pareto chart of volume by skill (volume bars only).
"""
series = self.volume_by_skill()
fig, ax = plt.subplots(figsize=(10, 4))
series.plot(kind="bar", ax=ax)
ax.set_xlabel("Skill / Cola")
ax.set_ylabel(" interacciones")
ax.set_title("Pareto de volumen por skill")
ax.set_xlabel("Skill / Queue")
ax.set_ylabel("# interactions")
ax.set_title("Pareto chart of volume by skill")
ax.grid(axis="y", alpha=0.3)
plt.xticks(rotation=45, ha="right")

View File

@@ -23,7 +23,7 @@ LOGGER = logging.getLogger(__name__)
def setup_basic_logging(level: str = "INFO") -> None:
"""
Configuración básica de logging, por si se necesita desde scripts.
Basic logging configuration, if needed from scripts.
"""
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
@@ -33,10 +33,10 @@ def setup_basic_logging(level: str = "INFO") -> None:
def _import_class(path: str) -> type:
"""
Import dinámico de una clase a partir de un string tipo:
Dynamic import of a class from a string like:
"beyond_metrics.dimensions.VolumetriaMetrics"
"""
LOGGER.debug("Importando clase %s", path)
LOGGER.debug("Importing class %s", path)
module_name, class_name = path.rsplit(".", 1)
module = import_module(module_name)
cls = getattr(module, class_name)
@@ -45,7 +45,7 @@ def _import_class(path: str) -> type:
def _serialize_for_json(obj: Any) -> Any:
"""
Convierte objetos típicos de numpy/pandas en tipos JSON-friendly.
Converts typical numpy/pandas objects to JSON-friendly types.
"""
if obj is None or isinstance(obj, (str, int, float, bool)):
return obj
@@ -73,12 +73,12 @@ PostRunCallback = Callable[[Dict[str, Any], str, ResultsSink], None]
@dataclass
class BeyondMetricsPipeline:
"""
Pipeline principal de BeyondMetrics.
Main BeyondMetrics pipeline.
- Lee un CSV desde un DataSource (local, S3, Google Drive, etc.).
- Ejecuta dimensiones configuradas en un dict de configuración.
- Serializa resultados numéricos/tabulares a JSON.
- Guarda las imágenes de los métodos que comienzan por 'plot_'.
- Reads a CSV from a DataSource (local, S3, Google Drive, etc.).
- Executes dimensions configured in a config dict.
- Serializes numeric/tabular results to JSON.
- Saves images from methods starting with 'plot_'.
"""
datasource: DataSource
@@ -95,39 +95,39 @@ class BeyondMetricsPipeline:
write_results_json: bool = True,
) -> Dict[str, Any]:
LOGGER.info("Inicio de ejecución de BeyondMetricsPipeline")
LOGGER.info("Leyendo CSV de entrada: %s", input_path)
LOGGER.info("Starting BeyondMetricsPipeline execution")
LOGGER.info("Reading input CSV: %s", input_path)
# 1) Leer datos
# 1) Read data
df = self.datasource.read_csv(input_path)
LOGGER.info("CSV leído con %d filas y %d columnas", df.shape[0], df.shape[1])
LOGGER.info("CSV read with %d rows and %d columns", df.shape[0], df.shape[1])
# 2) Determinar carpeta/base de salida para esta ejecución
# 2) Determine output folder/base for this execution
run_base = run_dir.rstrip("/")
LOGGER.info("Ruta base de esta ejecución: %s", run_base)
LOGGER.info("Base path for this execution: %s", run_base)
# 3) Ejecutar dimensiones
# 3) Execute dimensions
dimensions_cfg = self.dimensions_config
if not isinstance(dimensions_cfg, dict):
raise ValueError("El bloque 'dimensions' debe ser un dict.")
raise ValueError("The 'dimensions' block must be a dict.")
all_results: Dict[str, Any] = {}
for dim_name, dim_cfg in dimensions_cfg.items():
if not isinstance(dim_cfg, dict):
raise ValueError(f"Config inválida para dimensión '{dim_name}' (debe ser dict).")
raise ValueError(f"Invalid config for dimension '{dim_name}' (must be dict).")
if not dim_cfg.get("enabled", True):
LOGGER.info("Dimensión '%s' desactivada; se omite.", dim_name)
LOGGER.info("Dimension '%s' disabled; skipping.", dim_name)
continue
class_path = dim_cfg.get("class")
if not class_path:
raise ValueError(f"Falta 'class' en la dimensión '{dim_name}'.")
raise ValueError(f"Missing 'class' in dimension '{dim_name}'.")
metrics: List[str] = dim_cfg.get("metrics", [])
if not metrics:
LOGGER.info("Dimensión '%s' sin métricas configuradas; se omite.", dim_name)
LOGGER.info("Dimension '%s' has no configured metrics; skipping.", dim_name)
continue
cls = _import_class(class_path)
@@ -136,35 +136,35 @@ class BeyondMetricsPipeline:
if self.dimension_params is not None:
extra_kwargs = self.dimension_params.get(dim_name, {}) or {}
# Las dimensiones reciben df en el constructor
# Dimensions receive df in the constructor
instance = cls(df, **extra_kwargs)
dim_results: Dict[str, Any] = {}
for metric_name in metrics:
LOGGER.info(" - Ejecutando métrica '%s.%s'", dim_name, metric_name)
LOGGER.info(" - Executing metric '%s.%s'", dim_name, metric_name)
result = self._execute_metric(instance, metric_name, run_base, dim_name)
dim_results[metric_name] = result
all_results[dim_name] = dim_results
# 4) Guardar JSON de resultados (opcional)
# 4) Save results JSON (optional)
if write_results_json:
results_json_path = f"{run_base}/results.json"
LOGGER.info("Guardando resultados en JSON: %s", results_json_path)
LOGGER.info("Saving results to JSON: %s", results_json_path)
self.sink.write_json(results_json_path, all_results)
# 5) Ejecutar callbacks post-run (scorers, agentes, etc.)
# 5) Execute post-run callbacks (scorers, agents, etc.)
if self.post_run:
LOGGER.info("Ejecutando %d callbacks post-run...", len(self.post_run))
LOGGER.info("Executing %d post-run callbacks...", len(self.post_run))
for cb in self.post_run:
try:
LOGGER.info("Ejecutando post-run callback: %s", cb)
LOGGER.info("Executing post-run callback: %s", cb)
cb(all_results, run_base, self.sink)
except Exception:
LOGGER.exception("Error ejecutando post-run callback %s", cb)
LOGGER.exception("Error executing post-run callback %s", cb)
LOGGER.info("Ejecución completada correctamente.")
LOGGER.info("Execution completed successfully.")
return all_results
@@ -176,42 +176,42 @@ class BeyondMetricsPipeline:
dim_name: str,
) -> Any:
"""
Ejecuta una métrica:
Executes a metric:
- Si empieza por 'plot_' -> se asume que devuelve Axes:
- se guarda la figura como PNG
- se devuelve {"type": "image", "path": "..."}
- Si no, se serializa el valor a JSON.
- If it starts with 'plot_' -> assumed to return Axes:
- the figure is saved as PNG
- returns {"type": "image", "path": "..."}
- Otherwise, the value is serialized to JSON.
Además, para métricas categóricas (por skill/canal) de la dimensión
'volumetry', devolvemos explícitamente etiquetas y valores para que
el frontend pueda saber a qué pertenece cada número.
Additionally, for categorical metrics (by skill/channel) from the
'volumetry' dimension, we explicitly return labels and values so
the frontend can know what each number belongs to.
"""
method = getattr(instance, metric_name, None)
if method is None or not callable(method):
raise ValueError(
f"La métrica '{metric_name}' no existe en {type(instance).__name__}"
f"Metric '{metric_name}' does not exist in {type(instance).__name__}"
)
# Caso plots
# Plot case
if metric_name.startswith("plot_"):
ax = method()
if not isinstance(ax, Axes):
raise TypeError(
f"La métrica '{metric_name}' de '{type(instance).__name__}' "
f"debería devolver un matplotlib.axes.Axes"
f"Metric '{metric_name}' of '{type(instance).__name__}' "
f"should return a matplotlib.axes.Axes"
)
fig = ax.get_figure()
if fig is None:
raise RuntimeError(
"Axes.get_figure() devolvió None, lo cual no debería pasar."
"Axes.get_figure() returned None, which should not happen."
)
fig = cast(Figure, fig)
filename = f"{dim_name}_{metric_name}.png"
img_path = f"{run_base}/{filename}"
LOGGER.debug("Guardando figura en %s", img_path)
LOGGER.debug("Saving figure to %s", img_path)
self.sink.write_figure(img_path, fig)
plt.close(fig)
@@ -220,12 +220,12 @@ class BeyondMetricsPipeline:
"path": img_path,
}
# Caso numérico/tabular
# Numeric/tabular case
value = method()
# Caso especial: series categóricas de volumetría (por skill / canal)
# Devolvemos {"labels": [...], "values": [...]} para mantener la
# información de etiquetas en el JSON.
# Special case: categorical series from volumetry (by skill / channel)
# Return {"labels": [...], "values": [...]} to maintain
# label information in the JSON.
if (
dim_name == "volumetry"
and isinstance(value, pd.Series)
@@ -238,7 +238,7 @@ class BeyondMetricsPipeline:
}
):
labels = [str(idx) for idx in value.index.tolist()]
# Aseguramos que todos los valores sean numéricos JSON-friendly
# Ensure all values are JSON-friendly numeric
values = [float(v) for v in value.astype(float).tolist()]
return {
"labels": labels,
@@ -251,7 +251,7 @@ class BeyondMetricsPipeline:
def load_dimensions_config(path: str) -> Dict[str, Any]:
"""
Carga un JSON de configuración que contiene solo el bloque 'dimensions'.
Loads a JSON configuration file containing only the 'dimensions' block.
"""
import json
from pathlib import Path
@@ -261,7 +261,7 @@ def load_dimensions_config(path: str) -> Dict[str, Any]:
dimensions = cfg.get("dimensions")
if dimensions is None:
raise ValueError("El fichero de configuración debe contener un bloque 'dimensions'.")
raise ValueError("The configuration file must contain a 'dimensions' block.")
return dimensions
@@ -274,12 +274,12 @@ def build_pipeline(
post_run: Optional[List[PostRunCallback]] = None,
) -> BeyondMetricsPipeline:
"""
Crea un BeyondMetricsPipeline a partir de:
- ruta al JSON con dimensiones/métricas
- un DataSource ya construido (local/S3/Drive)
- un ResultsSink ya construido (local/S3/Drive)
- una lista opcional de callbacks post_run que se ejecutan al final
(útil para scorers, agentes de IA, etc.)
Creates a BeyondMetricsPipeline from:
- path to JSON with dimensions/metrics
- an already constructed DataSource (local/S3/Drive)
- an already constructed ResultsSink (local/S3/Drive)
- an optional list of post_run callbacks that execute at the end
(useful for scorers, AI agents, etc.)
"""
dims_cfg = load_dimensions_config(dimensions_config_path)
return BeyondMetricsPipeline(