Merge pull request #5 from sujucu70/claude/check-agent-readiness-status-Exnpc

Translate Phase 3 low-priority backend files (complete Spanish-to-Eng…
This commit is contained in:
sujucu70
2026-02-07 12:18:17 +01:00
committed by GitHub
4 changed files with 217 additions and 217 deletions

View File

@@ -17,11 +17,11 @@ from typing import Any, Mapping, Optional, Dict
def _build_economy_config(economy_data: Optional[Mapping[str, Any]]) -> EconomyConfig: def _build_economy_config(economy_data: Optional[Mapping[str, Any]]) -> EconomyConfig:
""" """
Construye EconomyConfig validando tipos y evitando que el type checker Builds EconomyConfig validating types and preventing the type checker
mezcle floats y dicts en un solo diccionario. from mixing floats and dicts in a single dictionary.
""" """
# Valores por defecto # Default values
default_customer_segments: Dict[str, str] = { default_customer_segments: Dict[str, str] = {
"VIP": "high", "VIP": "high",
"Premium": "high", "Premium": "high",
@@ -45,9 +45,9 @@ def _build_economy_config(economy_data: Optional[Mapping[str, Any]]) -> EconomyC
value = economy_data.get(field, default) value = economy_data.get(field, default)
if isinstance(value, (int, float)): if isinstance(value, (int, float)):
return float(value) return float(value)
raise ValueError(f"El campo '{field}' debe ser numérico (float). Valor recibido: {value!r}") raise ValueError(f"The field '{field}' must be numeric (float). Received value: {value!r}")
# Campos escalares # Scalar fields
labor_cost_per_hour = _get_float("labor_cost_per_hour", 20.0) labor_cost_per_hour = _get_float("labor_cost_per_hour", 20.0)
overhead_rate = _get_float("overhead_rate", 0.10) overhead_rate = _get_float("overhead_rate", 0.10)
tech_costs_annual = _get_float("tech_costs_annual", 5000.0) tech_costs_annual = _get_float("tech_costs_annual", 5000.0)
@@ -55,16 +55,16 @@ def _build_economy_config(economy_data: Optional[Mapping[str, Any]]) -> EconomyC
automation_volume_share = _get_float("automation_volume_share", 0.5) automation_volume_share = _get_float("automation_volume_share", 0.5)
automation_success_rate = _get_float("automation_success_rate", 0.6) automation_success_rate = _get_float("automation_success_rate", 0.6)
# customer_segments puede venir o no; si viene, validarlo # customer_segments may or may not be present; if present, validate it
customer_segments: Dict[str, str] = dict(default_customer_segments) customer_segments: Dict[str, str] = dict(default_customer_segments)
if "customer_segments" in economy_data and economy_data["customer_segments"] is not None: if "customer_segments" in economy_data and economy_data["customer_segments"] is not None:
cs = economy_data["customer_segments"] cs = economy_data["customer_segments"]
if not isinstance(cs, Mapping): if not isinstance(cs, Mapping):
raise ValueError("customer_segments debe ser un diccionario {segment: level}") raise ValueError("customer_segments must be a dictionary {segment: level}")
for k, v in cs.items(): for k, v in cs.items():
if not isinstance(v, str): if not isinstance(v, str):
raise ValueError( raise ValueError(
f"El valor de customer_segments['{k}'] debe ser str. Valor recibido: {v!r}" f"The value of customer_segments['{k}'] must be str. Received value: {v!r}"
) )
customer_segments[str(k)] = v customer_segments[str(k)] = v
@@ -86,31 +86,31 @@ def run_analysis(
company_folder: Optional[str] = None, company_folder: Optional[str] = None,
) -> tuple[Path, Optional[Path]]: ) -> tuple[Path, Optional[Path]]:
""" """
Ejecuta el pipeline sobre un CSV y devuelve: Executes the pipeline on a CSV and returns:
- (results_dir, None) si return_type == "path" - (results_dir, None) if return_type == "path"
- (results_dir, zip_path) si return_type == "zip" - (results_dir, zip_path) if return_type == "zip"
input_path puede ser absoluto o relativo, pero los resultados input_path can be absolute or relative, but results
se escribirán SIEMPRE en la carpeta del CSV, dentro de una will ALWAYS be written to the CSV's folder, inside a
subcarpeta con nombre = timestamp (y opcionalmente prefijada subfolder named timestamp (and optionally prefixed
por company_folder). by company_folder).
""" """
input_path = input_path.resolve() input_path = input_path.resolve()
if not input_path.exists(): if not input_path.exists():
raise FileNotFoundError(f"El CSV no existe: {input_path}") raise FileNotFoundError(f"CSV does not exist: {input_path}")
if not input_path.is_file(): if not input_path.is_file():
raise ValueError(f"La ruta no apunta a un fichero CSV: {input_path}") raise ValueError(f"Path does not point to a CSV file: {input_path}")
# Carpeta donde está el CSV # Folder where the CSV is located
csv_dir = input_path.parent csv_dir = input_path.parent
# DataSource y ResultsSink apuntan a ESA carpeta # DataSource and ResultsSink point to THAT folder
datasource = LocalDataSource(base_dir=str(csv_dir)) datasource = LocalDataSource(base_dir=str(csv_dir))
sink = LocalResultsSink(base_dir=str(csv_dir)) sink = LocalResultsSink(base_dir=str(csv_dir))
# Config de economía # Economy config
economy_cfg = _build_economy_config(economy_data) economy_cfg = _build_economy_config(economy_data)
dimension_params: Dict[str, Mapping[str, Any]] = { dimension_params: Dict[str, Mapping[str, Any]] = {
@@ -119,13 +119,13 @@ def run_analysis(
} }
} }
# Callback de scoring # Scoring callback
def agentic_post_run(results: Dict[str, Any], run_base: str, sink_: ResultsSink) -> None: def agentic_post_run(results: Dict[str, Any], run_base: str, sink_: ResultsSink) -> None:
scorer = AgenticScorer() scorer = AgenticScorer()
try: try:
agentic = scorer.compute_and_return(results) agentic = scorer.compute_and_return(results)
except Exception as e: except Exception as e:
# No rompemos toda la ejecución si el scorer falla # Don't break the entire execution if the scorer fails
agentic = { agentic = {
"error": f"{type(e).__name__}: {e}", "error": f"{type(e).__name__}: {e}",
} }
@@ -139,45 +139,45 @@ def run_analysis(
post_run=[agentic_post_run], post_run=[agentic_post_run],
) )
# Timestamp de ejecución (nombre de la carpeta de resultados) # Execution timestamp (results folder name)
timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S") timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S")
# Ruta lógica de resultados (RELATIVA al base_dir del sink) # Logical results path (RELATIVE to sink's base_dir)
if company_folder: if company_folder:
# Ej: "Cliente_X/20251208-153045" # E.g. "Cliente_X/20251208-153045"
run_dir_rel = f"{company_folder.rstrip('/')}/{timestamp}" run_dir_rel = f"{company_folder.rstrip('/')}/{timestamp}"
else: else:
# Ej: "20251208-153045" # E.g. "20251208-153045"
run_dir_rel = timestamp run_dir_rel = timestamp
# Ejecutar pipeline: el CSV se pasa relativo a csv_dir # Execute pipeline: CSV is passed relative to csv_dir
pipeline.run( pipeline.run(
input_path=input_path.name, input_path=input_path.name,
run_dir=run_dir_rel, run_dir=run_dir_rel,
) )
# Carpeta real con los resultados # Actual folder with results
results_dir = csv_dir / run_dir_rel results_dir = csv_dir / run_dir_rel
if return_type == "path": if return_type == "path":
return results_dir, None return results_dir, None
# --- ZIP de resultados ------------------------------------------------- # --- ZIP results -------------------------------------------------------
# Creamos el ZIP en la MISMA carpeta del CSV, con nombre basado en run_dir # Create the ZIP in the SAME folder as the CSV, with name based on run_dir
zip_name = f"{run_dir_rel.replace('/', '_')}.zip" zip_name = f"{run_dir_rel.replace('/', '_')}.zip"
zip_path = csv_dir / zip_name zip_path = csv_dir / zip_name
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf: with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
for file in results_dir.rglob("*"): for file in results_dir.rglob("*"):
if file.is_file(): if file.is_file():
# Lo guardamos relativo a la carpeta de resultados # Store it relative to the results folder
arcname = file.relative_to(results_dir.parent) arcname = file.relative_to(results_dir.parent)
zipf.write(file, arcname) zipf.write(file, arcname)
return results_dir, zip_path return results_dir, zip_path
from typing import Any, Mapping, Dict # asegúrate de tener estos imports arriba from typing import Any, Mapping, Dict # ensure these imports are at the top
def run_analysis_collect_json( def run_analysis_collect_json(
@@ -187,33 +187,33 @@ def run_analysis_collect_json(
company_folder: Optional[str] = None, company_folder: Optional[str] = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Ejecuta el pipeline y devuelve un único JSON con todos los resultados. Executes the pipeline and returns a single JSON with all results.
A diferencia de run_analysis: Unlike run_analysis:
- NO escribe results.json - Does NOT write results.json
- NO escribe agentic_readiness.json - Does NOT write agentic_readiness.json
- agentic_readiness se incrusta en el dict de resultados - agentic_readiness is embedded in the results dict
El parámetro `analysis` permite elegir el nivel de análisis: The `analysis` parameter allows choosing the analysis level:
- "basic" -> beyond_metrics/configs/basic.json - "basic" -> beyond_metrics/configs/basic.json
- "premium" -> beyond_metrics/configs/beyond_metrics_config.json - "premium" -> beyond_metrics/configs/beyond_metrics_config.json
""" """
# Normalizamos y validamos la ruta del CSV # Normalize and validate the CSV path
input_path = input_path.resolve() input_path = input_path.resolve()
if not input_path.exists(): if not input_path.exists():
raise FileNotFoundError(f"El CSV no existe: {input_path}") raise FileNotFoundError(f"CSV does not exist: {input_path}")
if not input_path.is_file(): if not input_path.is_file():
raise ValueError(f"La ruta no apunta a un fichero CSV: {input_path}") raise ValueError(f"Path does not point to a CSV file: {input_path}")
# Carpeta donde está el CSV # Folder where the CSV is located
csv_dir = input_path.parent csv_dir = input_path.parent
# DataSource y ResultsSink apuntan a ESA carpeta # DataSource and ResultsSink point to THAT folder
datasource = LocalDataSource(base_dir=str(csv_dir)) datasource = LocalDataSource(base_dir=str(csv_dir))
sink = LocalResultsSink(base_dir=str(csv_dir)) sink = LocalResultsSink(base_dir=str(csv_dir))
# Config de economía # Economy config
economy_cfg = _build_economy_config(economy_data) economy_cfg = _build_economy_config(economy_data)
dimension_params: Dict[str, Mapping[str, Any]] = { dimension_params: Dict[str, Mapping[str, Any]] = {
@@ -222,13 +222,13 @@ def run_analysis_collect_json(
} }
} }
# Elegimos el fichero de configuración de dimensiones según `analysis` # Choose the dimensions config file based on `analysis`
if analysis == "basic": if analysis == "basic":
dimensions_config_path = "beyond_metrics/configs/basic.json" dimensions_config_path = "beyond_metrics/configs/basic.json"
else: else:
dimensions_config_path = "beyond_metrics/configs/beyond_metrics_config.json" dimensions_config_path = "beyond_metrics/configs/beyond_metrics_config.json"
# Callback post-run: añadir agentic_readiness al JSON final (sin escribir ficheros) # Post-run callback: add agentic_readiness to the final JSON (without writing files)
def agentic_post_run(results: Dict[str, Any], run_base: str, sink_: ResultsSink) -> None: def agentic_post_run(results: Dict[str, Any], run_base: str, sink_: ResultsSink) -> None:
scorer = AgenticScorer() scorer = AgenticScorer()
try: try:
@@ -245,14 +245,14 @@ def run_analysis_collect_json(
post_run=[agentic_post_run], post_run=[agentic_post_run],
) )
# Timestamp de ejecución (para separar posibles artefactos como plots) # Execution timestamp (to separate possible artifacts like plots)
timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S") timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S")
if company_folder: if company_folder:
run_dir_rel = f"{company_folder.rstrip('/')}/{timestamp}" run_dir_rel = f"{company_folder.rstrip('/')}/{timestamp}"
else: else:
run_dir_rel = timestamp run_dir_rel = timestamp
# Ejecutar pipeline sin escribir results.json # Execute pipeline without writing results.json
results = pipeline.run( results = pipeline.run(
input_path=input_path.name, input_path=input_path.name,
run_dir=run_dir_rel, run_dir=run_dir_rel,

View File

@@ -14,25 +14,25 @@ from openai import OpenAI
DEFAULT_SYSTEM_PROMPT = ( DEFAULT_SYSTEM_PROMPT = (
"Eres un consultor experto en contact centers. " "You are an expert contact center consultant. "
"Vas a recibir resultados analíticos de un sistema de métricas " "You will receive analytical results from a metrics system "
"(BeyondMetrics) en formato JSON. Tu tarea es generar un informe claro, " "(BeyondMetrics) in JSON format. Your task is to generate a clear, "
"accionable y orientado a negocio, destacando los principales hallazgos, " "actionable, business-oriented report, highlighting the main findings, "
"riesgos y oportunidades de mejora." "risks, and opportunities for improvement."
) )
@dataclass @dataclass
class ReportAgentConfig: class ReportAgentConfig:
""" """
Configuración básica del agente de informes. Basic configuration for the report agent.
openai_api_key: openai_api_key:
Se puede pasar explícitamente o leer de la variable de entorno OPENAI_API_KEY. Can be passed explicitly or read from the OPENAI_API_KEY environment variable.
model: model:
Modelo de ChatGPT a utilizar, p.ej. 'gpt-4.1-mini' o similar. ChatGPT model to use, e.g. 'gpt-4.1-mini' or similar.
system_prompt: system_prompt:
Prompt de sistema para controlar el estilo del informe. System prompt to control the report style.
""" """
openai_api_key: Optional[str] = None openai_api_key: Optional[str] = None
@@ -42,15 +42,15 @@ class ReportAgentConfig:
class BeyondMetricsReportAgent: class BeyondMetricsReportAgent:
""" """
Agente muy sencillo que: Simple agent that:
1) Lee el JSON de resultados de una ejecución de BeyondMetrics. 1) Reads the JSON results from a BeyondMetrics execution.
2) Construye un prompt con esos resultados. 2) Builds a prompt with those results.
3) Llama a ChatGPT para generar un informe en texto. 3) Calls ChatGPT to generate a text report.
4) Guarda el informe en un PDF en disco, EMBEBIENDO las imágenes PNG 4) Saves the report to a PDF on disk, EMBEDDING the PNG images
generadas por el pipeline como anexos. generated by the pipeline as attachments.
MVP: centrado en texto + figuras incrustadas. MVP: focused on text + embedded figures.
""" """
def __init__(self, config: Optional[ReportAgentConfig] = None) -> None: def __init__(self, config: Optional[ReportAgentConfig] = None) -> None:
@@ -59,16 +59,16 @@ class BeyondMetricsReportAgent:
api_key = self.config.openai_api_key or os.getenv("OPENAI_API_KEY") api_key = self.config.openai_api_key or os.getenv("OPENAI_API_KEY")
if not api_key: if not api_key:
raise RuntimeError( raise RuntimeError(
"Falta la API key de OpenAI. " "Missing OpenAI API key. "
"Pásala en ReportAgentConfig(openai_api_key=...) o " "Pass it in ReportAgentConfig(openai_api_key=...) or "
"define la variable de entorno OPENAI_API_KEY." "define the OPENAI_API_KEY environment variable."
) )
# Cliente de la nueva API de OpenAI # New OpenAI API client
self._client = OpenAI(api_key=api_key) self._client = OpenAI(api_key=api_key)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# API pública principal # Main public API
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def generate_pdf_report( def generate_pdf_report(
self, self,
@@ -77,48 +77,48 @@ class BeyondMetricsReportAgent:
extra_user_prompt: str = "", extra_user_prompt: str = "",
) -> str: ) -> str:
""" """
Genera un informe en PDF a partir de una carpeta de resultados. Generates a PDF report from a results folder.
Parámetros: Parameters:
- run_base: - run_base:
Carpeta base de la ejecución. Debe contener al menos 'results.json' Base folder for the execution. Must contain at least 'results.json'
y, opcionalmente, imágenes PNG generadas por el pipeline. and, optionally, PNG images generated by the pipeline.
- output_pdf_path: - output_pdf_path:
Ruta completa del PDF de salida. Si es None, se crea Full path for the output PDF. If None, creates
'beyondmetrics_report.pdf' dentro de run_base. 'beyondmetrics_report.pdf' inside run_base.
- extra_user_prompt: - extra_user_prompt:
Texto adicional para afinar la petición al agente Additional text to refine the agent's request
(p.ej. "enfatiza eficiencia y SLA", etc.) (e.g. "emphasize efficiency and SLA", etc.)
Devuelve: Returns:
- La ruta del PDF generado. - The path to the generated PDF.
""" """
run_dir = Path(run_base) run_dir = Path(run_base)
results_json = run_dir / "results.json" results_json = run_dir / "results.json"
if not results_json.exists(): if not results_json.exists():
raise FileNotFoundError( raise FileNotFoundError(
f"No se ha encontrado {results_json}. " f"{results_json} not found. "
"Asegúrate de ejecutar primero el pipeline." "Make sure to run the pipeline first."
) )
# 1) Leer JSON de resultados # 1) Read results JSON
with results_json.open("r", encoding="utf-8") as f: with results_json.open("r", encoding="utf-8") as f:
results_data: Dict[str, Any] = json.load(f) results_data: Dict[str, Any] = json.load(f)
# 2) Buscar imágenes generadas # 2) Find generated images
image_files = sorted(p for p in run_dir.glob("*.png")) image_files = sorted(p for p in run_dir.glob("*.png"))
# 3) Construir prompt de usuario # 3) Build user prompt
user_prompt = self._build_user_prompt( user_prompt = self._build_user_prompt(
results=results_data, results=results_data,
image_files=[p.name for p in image_files], image_files=[p.name for p in image_files],
extra_user_prompt=extra_user_prompt, extra_user_prompt=extra_user_prompt,
) )
# 4) Llamar a ChatGPT para obtener el texto del informe # 4) Call ChatGPT to get the report text
report_text = self._call_chatgpt(user_prompt) report_text = self._call_chatgpt(user_prompt)
# 5) Crear PDF con texto + imágenes embebidas # 5) Create PDF with text + embedded images
if output_pdf_path is None: if output_pdf_path is None:
output_pdf_path = str(run_dir / "beyondmetrics_report.pdf") output_pdf_path = str(run_dir / "beyondmetrics_report.pdf")
@@ -127,7 +127,7 @@ class BeyondMetricsReportAgent:
return output_pdf_path return output_pdf_path
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Construcción del prompt # Prompt construction
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def _build_user_prompt( def _build_user_prompt(
self, self,
@@ -136,34 +136,34 @@ class BeyondMetricsReportAgent:
extra_user_prompt: str = "", extra_user_prompt: str = "",
) -> str: ) -> str:
""" """
Construye el mensaje de usuario que se enviará al modelo. Builds the user message to be sent to the model.
Para un MVP, serializamos el JSON de resultados entero. For an MVP, we serialize the entire results JSON.
Más adelante se puede resumir si el JSON crece demasiado. Later, this can be summarized if the JSON grows too large.
""" """
results_str = json.dumps(results, indent=2, ensure_ascii=False) results_str = json.dumps(results, indent=2, ensure_ascii=False)
images_section = ( images_section = (
"Imágenes generadas en la ejecución:\n" "Images generated in the execution:\n"
+ "\n".join(f"- {name}" for name in image_files) + "\n".join(f"- {name}" for name in image_files)
if image_files if image_files
else "No se han generado imágenes en esta ejecución." else "No images were generated in this execution."
) )
extra = ( extra = (
f"\n\nInstrucciones adicionales del usuario:\n{extra_user_prompt}" f"\n\nAdditional user instructions:\n{extra_user_prompt}"
if extra_user_prompt if extra_user_prompt
else "" else ""
) )
prompt = ( prompt = (
"A continuación te proporciono los resultados de una ejecución de BeyondMetrics " "Below I provide you with the results of a BeyondMetrics execution "
"en formato JSON. Debes elaborar un INFORME EJECUTIVO para un cliente de " "in JSON format. You must produce an EXECUTIVE REPORT for a contact "
"contact center. El informe debe incluir:\n" "center client. The report should include:\n"
"- Resumen ejecutivo en lenguaje de negocio.\n" "- Executive summary in business language.\n"
"- Principales hallazgos por dimensión.\n" "- Main findings by dimension.\n"
"- Riesgos o problemas detectados.\n" "- Detected risks or issues.\n"
"- Recomendaciones accionables.\n\n" "- Actionable recommendations.\n\n"
"Resultados (JSON):\n" "Results (JSON):\n"
f"{results_str}\n\n" f"{results_str}\n\n"
f"{images_section}" f"{images_section}"
f"{extra}" f"{extra}"
@@ -172,12 +172,12 @@ class BeyondMetricsReportAgent:
return prompt return prompt
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Llamada a ChatGPT (nueva API) # ChatGPT call (new API)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def _call_chatgpt(self, user_prompt: str) -> str: def _call_chatgpt(self, user_prompt: str) -> str:
""" """
Llama al modelo de ChatGPT y devuelve el contenido del mensaje de respuesta. Calls the ChatGPT model and returns the content of the response message.
Implementado con la nueva API de OpenAI. Implemented with the new OpenAI API.
""" """
resp = self._client.chat.completions.create( resp = self._client.chat.completions.create(
model=self.config.model, model=self.config.model,
@@ -190,11 +190,11 @@ class BeyondMetricsReportAgent:
content = resp.choices[0].message.content content = resp.choices[0].message.content
if not isinstance(content, str): if not isinstance(content, str):
raise RuntimeError("La respuesta del modelo no contiene texto.") raise RuntimeError("The model response does not contain text.")
return content return content
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Escritura de PDF (texto + imágenes) # PDF writing (text + images)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def _write_pdf( def _write_pdf(
self, self,
@@ -203,11 +203,11 @@ class BeyondMetricsReportAgent:
image_paths: Sequence[Path], image_paths: Sequence[Path],
) -> None: ) -> None:
""" """
Crea un PDF A4 con: Creates an A4 PDF with:
1) Texto del informe (páginas iniciales). 1) Report text (initial pages).
2) Una sección de anexos donde se incrustan las imágenes PNG 2) An appendix section where the PNG images generated by the
generadas por el pipeline, escaladas para encajar en la página. pipeline are embedded, scaled to fit the page.
""" """
output_path = str(output_path) output_path = str(output_path)
c = canvas.Canvas(output_path, pagesize=A4) c = canvas.Canvas(output_path, pagesize=A4)
@@ -220,7 +220,7 @@ class BeyondMetricsReportAgent:
c.setFont("Helvetica", 11) c.setFont("Helvetica", 11)
# --- Escribir texto principal --- # --- Write main text ---
def _wrap_line(line: str, max_chars: int = 100) -> list[str]: def _wrap_line(line: str, max_chars: int = 100) -> list[str]:
parts: list[str] = [] parts: list[str] = []
current: list[str] = [] current: list[str] = []
@@ -248,37 +248,37 @@ class BeyondMetricsReportAgent:
c.drawString(margin_x, y, line) c.drawString(margin_x, y, line)
y -= line_height y -= line_height
# --- Anexar imágenes como figuras --- # --- Append images as figures ---
if image_paths: if image_paths:
# Nueva página para las figuras # New page for figures
c.showPage() c.showPage()
c.setFont("Helvetica-Bold", 14) c.setFont("Helvetica-Bold", 14)
c.drawString(margin_x, height - margin_y, "Anexo: Figuras") c.drawString(margin_x, height - margin_y, "Appendix: Figures")
c.setFont("Helvetica", 11) c.setFont("Helvetica", 11)
current_y = height - margin_y - 2 * line_height current_y = height - margin_y - 2 * line_height
for img_path in image_paths: for img_path in image_paths:
# Si no cabe la imagen en la página, pasamos a la siguiente # If the image doesn't fit on the page, move to the next one
available_height = current_y - margin_y available_height = current_y - margin_y
if available_height < 100: # espacio mínimo if available_height < 100: # minimum space
c.showPage() c.showPage()
c.setFont("Helvetica-Bold", 14) c.setFont("Helvetica-Bold", 14)
c.drawString(margin_x, height - margin_y, "Anexo: Figuras (cont.)") c.drawString(margin_x, height - margin_y, "Appendix: Figures (cont.)")
c.setFont("Helvetica", 11) c.setFont("Helvetica", 11)
current_y = height - margin_y - 2 * line_height current_y = height - margin_y - 2 * line_height
available_height = current_y - margin_y available_height = current_y - margin_y
# Título de la figura # Figure title
title = f"Figura: {img_path.name}" title = f"Figure: {img_path.name}"
c.drawString(margin_x, current_y, title) c.drawString(margin_x, current_y, title)
current_y -= line_height current_y -= line_height
# Cargar imagen y escalarla # Load and scale image
try: try:
img = ImageReader(str(img_path)) img = ImageReader(str(img_path))
iw, ih = img.getSize() iw, ih = img.getSize()
# Escala para encajar en ancho y alto disponibles # Scale to fit available width and height
max_img_height = available_height - 2 * line_height max_img_height = available_height - 2 * line_height
scale = min(max_width / iw, max_img_height / ih) scale = min(max_width / iw, max_img_height / ih)
if scale <= 0: if scale <= 0:
@@ -302,8 +302,8 @@ class BeyondMetricsReportAgent:
current_y = y_img - 2 * line_height current_y = y_img - 2 * line_height
except Exception as e: except Exception as e:
# Si falla la carga, lo indicamos en el PDF # If loading fails, indicate it in the PDF
err_msg = f"No se pudo cargar la imagen {img_path.name}: {e}" err_msg = f"Could not load image {img_path.name}: {e}"
c.drawString(margin_x, current_y, err_msg) c.drawString(margin_x, current_y, err_msg)
current_y -= 2 * line_height current_y -= 2 * line_height

View File

@@ -20,15 +20,15 @@ REQUIRED_COLUMNS_VOLUMETRIA: List[str] = [
@dataclass @dataclass
class VolumetriaMetrics: class VolumetriaMetrics:
""" """
Métricas de volumetría basadas en el nuevo esquema de datos. Volumetry metrics based on the new data schema.
Columnas mínimas requeridas: Minimum required columns:
- interaction_id - interaction_id
- datetime_start - datetime_start
- queue_skill - queue_skill
- channel - channel
Otras columnas pueden existir pero no son necesarias para estas métricas. Other columns may exist but are not required for these metrics.
""" """
df: pd.DataFrame df: pd.DataFrame
@@ -38,41 +38,41 @@ class VolumetriaMetrics:
self._prepare_data() self._prepare_data()
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
# Helpers internos # Internal helpers
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
def _validate_columns(self) -> None: def _validate_columns(self) -> None:
missing = [c for c in REQUIRED_COLUMNS_VOLUMETRIA if c not in self.df.columns] missing = [c for c in REQUIRED_COLUMNS_VOLUMETRIA if c not in self.df.columns]
if missing: if missing:
raise ValueError( raise ValueError(
f"Faltan columnas obligatorias para VolumetriaMetrics: {missing}" f"Missing required columns for VolumetriaMetrics: {missing}"
) )
def _prepare_data(self) -> None: def _prepare_data(self) -> None:
df = self.df.copy() df = self.df.copy()
# Asegurar tipo datetime # Ensure datetime type
df["datetime_start"] = pd.to_datetime(df["datetime_start"], errors="coerce") df["datetime_start"] = pd.to_datetime(df["datetime_start"], errors="coerce")
# Normalizar strings # Normalize strings
df["queue_skill"] = df["queue_skill"].astype(str).str.strip() df["queue_skill"] = df["queue_skill"].astype(str).str.strip()
df["channel"] = df["channel"].astype(str).str.strip() df["channel"] = df["channel"].astype(str).str.strip()
# Guardamos el df preparado # Store the prepared dataframe
self.df = df self.df = df
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
# Propiedades útiles # Useful properties
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
@property @property
def is_empty(self) -> bool: def is_empty(self) -> bool:
return self.df.empty return self.df.empty
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
# Métricas numéricas / tabulares # Numeric / tabular metrics
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
def volume_by_channel(self) -> pd.Series: def volume_by_channel(self) -> pd.Series:
""" """
Nº de interacciones por canal. Number of interactions by channel.
""" """
return self.df.groupby("channel")["interaction_id"].nunique().sort_values( return self.df.groupby("channel")["interaction_id"].nunique().sort_values(
ascending=False ascending=False
@@ -80,7 +80,7 @@ class VolumetriaMetrics:
def volume_by_skill(self) -> pd.Series: def volume_by_skill(self) -> pd.Series:
""" """
Nº de interacciones por skill / cola. Number of interactions by skill / queue.
""" """
return self.df.groupby("queue_skill")["interaction_id"].nunique().sort_values( return self.df.groupby("queue_skill")["interaction_id"].nunique().sort_values(
ascending=False ascending=False
@@ -88,7 +88,7 @@ class VolumetriaMetrics:
def channel_distribution_pct(self) -> pd.Series: def channel_distribution_pct(self) -> pd.Series:
""" """
Distribución porcentual del volumen por canal. Percentage distribution of volume by channel.
""" """
counts = self.volume_by_channel() counts = self.volume_by_channel()
total = counts.sum() total = counts.sum()
@@ -98,7 +98,7 @@ class VolumetriaMetrics:
def skill_distribution_pct(self) -> pd.Series: def skill_distribution_pct(self) -> pd.Series:
""" """
Distribución porcentual del volumen por skill. Percentage distribution of volume by skill.
""" """
counts = self.volume_by_skill() counts = self.volume_by_skill()
total = counts.sum() total = counts.sum()
@@ -108,12 +108,12 @@ class VolumetriaMetrics:
def heatmap_24x7(self) -> pd.DataFrame: def heatmap_24x7(self) -> pd.DataFrame:
""" """
Matriz [día_semana x hora] con nº de interacciones. Matrix [day_of_week x hour] with number of interactions.
dayofweek: 0=Lunes ... 6=Domingo dayofweek: 0=Monday ... 6=Sunday
""" """
df = self.df.dropna(subset=["datetime_start"]).copy() df = self.df.dropna(subset=["datetime_start"]).copy()
if df.empty: if df.empty:
# Devolvemos un df vacío pero con índice/columnas esperadas # Return an empty dataframe with expected index/columns
idx = range(7) idx = range(7)
cols = range(24) cols = range(24)
return pd.DataFrame(0, index=idx, columns=cols) return pd.DataFrame(0, index=idx, columns=cols)
@@ -137,8 +137,8 @@ class VolumetriaMetrics:
def monthly_seasonality_cv(self) -> float: def monthly_seasonality_cv(self) -> float:
""" """
Coeficiente de variación del volumen mensual. Coefficient of variation of monthly volume.
CV = std / mean (en %). CV = std / mean (in %).
""" """
df = self.df.dropna(subset=["datetime_start"]).copy() df = self.df.dropna(subset=["datetime_start"]).copy()
if df.empty: if df.empty:
@@ -161,9 +161,9 @@ class VolumetriaMetrics:
def peak_offpeak_ratio(self) -> float: def peak_offpeak_ratio(self) -> float:
""" """
Ratio de volumen entre horas pico y valle. Volume ratio between peak and off-peak hours.
Definimos pico como horas 10:0019:59, resto valle. We define peak as hours 10:0019:59, rest as off-peak.
""" """
df = self.df.dropna(subset=["datetime_start"]).copy() df = self.df.dropna(subset=["datetime_start"]).copy()
if df.empty: if df.empty:
@@ -184,7 +184,7 @@ class VolumetriaMetrics:
def concentration_top20_skills_pct(self) -> float: def concentration_top20_skills_pct(self) -> float:
""" """
% del volumen concentrado en el top 20% de skills (por nº de interacciones). % of volume concentrated in the top 20% of skills (by number of interactions).
""" """
counts = ( counts = (
self.df.groupby("queue_skill")["interaction_id"].nunique().sort_values( self.df.groupby("queue_skill")["interaction_id"].nunique().sort_values(
@@ -210,8 +210,8 @@ class VolumetriaMetrics:
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
def plot_heatmap_24x7(self) -> Axes: def plot_heatmap_24x7(self) -> Axes:
""" """
Heatmap de volumen por día de la semana (0-6) y hora (0-23). Heatmap of volume by day of week (0-6) and hour (0-23).
Devuelve Axes para que el pipeline pueda guardar la figura. Returns Axes so the pipeline can save the figure.
""" """
data = self.heatmap_24x7() data = self.heatmap_24x7()
@@ -222,45 +222,45 @@ class VolumetriaMetrics:
ax.set_xticklabels([str(h) for h in range(24)]) ax.set_xticklabels([str(h) for h in range(24)])
ax.set_yticks(range(7)) ax.set_yticks(range(7))
ax.set_yticklabels(["L", "M", "X", "J", "V", "S", "D"]) ax.set_yticklabels(["M", "T", "W", "T", "F", "S", "S"])
ax.set_xlabel("Hora del día") ax.set_xlabel("Hour of day")
ax.set_ylabel("Día de la semana") ax.set_ylabel("Day of week")
ax.set_title("Volumen por día de la semana y hora") ax.set_title("Volume by day of week and hour")
plt.colorbar(im, ax=ax, label=" interacciones") plt.colorbar(im, ax=ax, label="# interactions")
return ax return ax
def plot_channel_distribution(self) -> Axes: def plot_channel_distribution(self) -> Axes:
""" """
Distribución de volumen por canal. Volume distribution by channel.
""" """
series = self.volume_by_channel() series = self.volume_by_channel()
fig, ax = plt.subplots(figsize=(6, 4)) fig, ax = plt.subplots(figsize=(6, 4))
series.plot(kind="bar", ax=ax) series.plot(kind="bar", ax=ax)
ax.set_xlabel("Canal") ax.set_xlabel("Channel")
ax.set_ylabel(" interacciones") ax.set_ylabel("# interactions")
ax.set_title("Volumen por canal") ax.set_title("Volume by channel")
ax.grid(axis="y", alpha=0.3) ax.grid(axis="y", alpha=0.3)
return ax return ax
def plot_skill_pareto(self) -> Axes: def plot_skill_pareto(self) -> Axes:
""" """
Pareto simple de volumen por skill (solo barras de volumen). Simple Pareto chart of volume by skill (volume bars only).
""" """
series = self.volume_by_skill() series = self.volume_by_skill()
fig, ax = plt.subplots(figsize=(10, 4)) fig, ax = plt.subplots(figsize=(10, 4))
series.plot(kind="bar", ax=ax) series.plot(kind="bar", ax=ax)
ax.set_xlabel("Skill / Cola") ax.set_xlabel("Skill / Queue")
ax.set_ylabel(" interacciones") ax.set_ylabel("# interactions")
ax.set_title("Pareto de volumen por skill") ax.set_title("Pareto chart of volume by skill")
ax.grid(axis="y", alpha=0.3) ax.grid(axis="y", alpha=0.3)
plt.xticks(rotation=45, ha="right") plt.xticks(rotation=45, ha="right")

View File

@@ -23,7 +23,7 @@ LOGGER = logging.getLogger(__name__)
def setup_basic_logging(level: str = "INFO") -> None: def setup_basic_logging(level: str = "INFO") -> None:
""" """
Configuración básica de logging, por si se necesita desde scripts. Basic logging configuration, if needed from scripts.
""" """
logging.basicConfig( logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO), level=getattr(logging, level.upper(), logging.INFO),
@@ -33,10 +33,10 @@ def setup_basic_logging(level: str = "INFO") -> None:
def _import_class(path: str) -> type: def _import_class(path: str) -> type:
""" """
Import dinámico de una clase a partir de un string tipo: Dynamic import of a class from a string like:
"beyond_metrics.dimensions.VolumetriaMetrics" "beyond_metrics.dimensions.VolumetriaMetrics"
""" """
LOGGER.debug("Importando clase %s", path) LOGGER.debug("Importing class %s", path)
module_name, class_name = path.rsplit(".", 1) module_name, class_name = path.rsplit(".", 1)
module = import_module(module_name) module = import_module(module_name)
cls = getattr(module, class_name) cls = getattr(module, class_name)
@@ -45,7 +45,7 @@ def _import_class(path: str) -> type:
def _serialize_for_json(obj: Any) -> Any: def _serialize_for_json(obj: Any) -> Any:
""" """
Convierte objetos típicos de numpy/pandas en tipos JSON-friendly. Converts typical numpy/pandas objects to JSON-friendly types.
""" """
if obj is None or isinstance(obj, (str, int, float, bool)): if obj is None or isinstance(obj, (str, int, float, bool)):
return obj return obj
@@ -73,12 +73,12 @@ PostRunCallback = Callable[[Dict[str, Any], str, ResultsSink], None]
@dataclass @dataclass
class BeyondMetricsPipeline: class BeyondMetricsPipeline:
""" """
Pipeline principal de BeyondMetrics. Main BeyondMetrics pipeline.
- Lee un CSV desde un DataSource (local, S3, Google Drive, etc.). - Reads a CSV from a DataSource (local, S3, Google Drive, etc.).
- Ejecuta dimensiones configuradas en un dict de configuración. - Executes dimensions configured in a config dict.
- Serializa resultados numéricos/tabulares a JSON. - Serializes numeric/tabular results to JSON.
- Guarda las imágenes de los métodos que comienzan por 'plot_'. - Saves images from methods starting with 'plot_'.
""" """
datasource: DataSource datasource: DataSource
@@ -95,39 +95,39 @@ class BeyondMetricsPipeline:
write_results_json: bool = True, write_results_json: bool = True,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
LOGGER.info("Inicio de ejecución de BeyondMetricsPipeline") LOGGER.info("Starting BeyondMetricsPipeline execution")
LOGGER.info("Leyendo CSV de entrada: %s", input_path) LOGGER.info("Reading input CSV: %s", input_path)
# 1) Leer datos # 1) Read data
df = self.datasource.read_csv(input_path) df = self.datasource.read_csv(input_path)
LOGGER.info("CSV leído con %d filas y %d columnas", df.shape[0], df.shape[1]) LOGGER.info("CSV read with %d rows and %d columns", df.shape[0], df.shape[1])
# 2) Determinar carpeta/base de salida para esta ejecución # 2) Determine output folder/base for this execution
run_base = run_dir.rstrip("/") run_base = run_dir.rstrip("/")
LOGGER.info("Ruta base de esta ejecución: %s", run_base) LOGGER.info("Base path for this execution: %s", run_base)
# 3) Ejecutar dimensiones # 3) Execute dimensions
dimensions_cfg = self.dimensions_config dimensions_cfg = self.dimensions_config
if not isinstance(dimensions_cfg, dict): if not isinstance(dimensions_cfg, dict):
raise ValueError("El bloque 'dimensions' debe ser un dict.") raise ValueError("The 'dimensions' block must be a dict.")
all_results: Dict[str, Any] = {} all_results: Dict[str, Any] = {}
for dim_name, dim_cfg in dimensions_cfg.items(): for dim_name, dim_cfg in dimensions_cfg.items():
if not isinstance(dim_cfg, dict): if not isinstance(dim_cfg, dict):
raise ValueError(f"Config inválida para dimensión '{dim_name}' (debe ser dict).") raise ValueError(f"Invalid config for dimension '{dim_name}' (must be dict).")
if not dim_cfg.get("enabled", True): if not dim_cfg.get("enabled", True):
LOGGER.info("Dimensión '%s' desactivada; se omite.", dim_name) LOGGER.info("Dimension '%s' disabled; skipping.", dim_name)
continue continue
class_path = dim_cfg.get("class") class_path = dim_cfg.get("class")
if not class_path: if not class_path:
raise ValueError(f"Falta 'class' en la dimensión '{dim_name}'.") raise ValueError(f"Missing 'class' in dimension '{dim_name}'.")
metrics: List[str] = dim_cfg.get("metrics", []) metrics: List[str] = dim_cfg.get("metrics", [])
if not metrics: if not metrics:
LOGGER.info("Dimensión '%s' sin métricas configuradas; se omite.", dim_name) LOGGER.info("Dimension '%s' has no configured metrics; skipping.", dim_name)
continue continue
cls = _import_class(class_path) cls = _import_class(class_path)
@@ -136,35 +136,35 @@ class BeyondMetricsPipeline:
if self.dimension_params is not None: if self.dimension_params is not None:
extra_kwargs = self.dimension_params.get(dim_name, {}) or {} extra_kwargs = self.dimension_params.get(dim_name, {}) or {}
# Las dimensiones reciben df en el constructor # Dimensions receive df in the constructor
instance = cls(df, **extra_kwargs) instance = cls(df, **extra_kwargs)
dim_results: Dict[str, Any] = {} dim_results: Dict[str, Any] = {}
for metric_name in metrics: for metric_name in metrics:
LOGGER.info(" - Ejecutando métrica '%s.%s'", dim_name, metric_name) LOGGER.info(" - Executing metric '%s.%s'", dim_name, metric_name)
result = self._execute_metric(instance, metric_name, run_base, dim_name) result = self._execute_metric(instance, metric_name, run_base, dim_name)
dim_results[metric_name] = result dim_results[metric_name] = result
all_results[dim_name] = dim_results all_results[dim_name] = dim_results
# 4) Guardar JSON de resultados (opcional) # 4) Save results JSON (optional)
if write_results_json: if write_results_json:
results_json_path = f"{run_base}/results.json" results_json_path = f"{run_base}/results.json"
LOGGER.info("Guardando resultados en JSON: %s", results_json_path) LOGGER.info("Saving results to JSON: %s", results_json_path)
self.sink.write_json(results_json_path, all_results) self.sink.write_json(results_json_path, all_results)
# 5) Ejecutar callbacks post-run (scorers, agentes, etc.) # 5) Execute post-run callbacks (scorers, agents, etc.)
if self.post_run: if self.post_run:
LOGGER.info("Ejecutando %d callbacks post-run...", len(self.post_run)) LOGGER.info("Executing %d post-run callbacks...", len(self.post_run))
for cb in self.post_run: for cb in self.post_run:
try: try:
LOGGER.info("Ejecutando post-run callback: %s", cb) LOGGER.info("Executing post-run callback: %s", cb)
cb(all_results, run_base, self.sink) cb(all_results, run_base, self.sink)
except Exception: except Exception:
LOGGER.exception("Error ejecutando post-run callback %s", cb) LOGGER.exception("Error executing post-run callback %s", cb)
LOGGER.info("Ejecución completada correctamente.") LOGGER.info("Execution completed successfully.")
return all_results return all_results
@@ -176,42 +176,42 @@ class BeyondMetricsPipeline:
dim_name: str, dim_name: str,
) -> Any: ) -> Any:
""" """
Ejecuta una métrica: Executes a metric:
- Si empieza por 'plot_' -> se asume que devuelve Axes: - If it starts with 'plot_' -> assumed to return Axes:
- se guarda la figura como PNG - the figure is saved as PNG
- se devuelve {"type": "image", "path": "..."} - returns {"type": "image", "path": "..."}
- Si no, se serializa el valor a JSON. - Otherwise, the value is serialized to JSON.
Además, para métricas categóricas (por skill/canal) de la dimensión Additionally, for categorical metrics (by skill/channel) from the
'volumetry', devolvemos explícitamente etiquetas y valores para que 'volumetry' dimension, we explicitly return labels and values so
el frontend pueda saber a qué pertenece cada número. the frontend can know what each number belongs to.
""" """
method = getattr(instance, metric_name, None) method = getattr(instance, metric_name, None)
if method is None or not callable(method): if method is None or not callable(method):
raise ValueError( raise ValueError(
f"La métrica '{metric_name}' no existe en {type(instance).__name__}" f"Metric '{metric_name}' does not exist in {type(instance).__name__}"
) )
# Caso plots # Plot case
if metric_name.startswith("plot_"): if metric_name.startswith("plot_"):
ax = method() ax = method()
if not isinstance(ax, Axes): if not isinstance(ax, Axes):
raise TypeError( raise TypeError(
f"La métrica '{metric_name}' de '{type(instance).__name__}' " f"Metric '{metric_name}' of '{type(instance).__name__}' "
f"debería devolver un matplotlib.axes.Axes" f"should return a matplotlib.axes.Axes"
) )
fig = ax.get_figure() fig = ax.get_figure()
if fig is None: if fig is None:
raise RuntimeError( raise RuntimeError(
"Axes.get_figure() devolvió None, lo cual no debería pasar." "Axes.get_figure() returned None, which should not happen."
) )
fig = cast(Figure, fig) fig = cast(Figure, fig)
filename = f"{dim_name}_{metric_name}.png" filename = f"{dim_name}_{metric_name}.png"
img_path = f"{run_base}/{filename}" img_path = f"{run_base}/{filename}"
LOGGER.debug("Guardando figura en %s", img_path) LOGGER.debug("Saving figure to %s", img_path)
self.sink.write_figure(img_path, fig) self.sink.write_figure(img_path, fig)
plt.close(fig) plt.close(fig)
@@ -220,12 +220,12 @@ class BeyondMetricsPipeline:
"path": img_path, "path": img_path,
} }
# Caso numérico/tabular # Numeric/tabular case
value = method() value = method()
# Caso especial: series categóricas de volumetría (por skill / canal) # Special case: categorical series from volumetry (by skill / channel)
# Devolvemos {"labels": [...], "values": [...]} para mantener la # Return {"labels": [...], "values": [...]} to maintain
# información de etiquetas en el JSON. # label information in the JSON.
if ( if (
dim_name == "volumetry" dim_name == "volumetry"
and isinstance(value, pd.Series) and isinstance(value, pd.Series)
@@ -238,7 +238,7 @@ class BeyondMetricsPipeline:
} }
): ):
labels = [str(idx) for idx in value.index.tolist()] labels = [str(idx) for idx in value.index.tolist()]
# Aseguramos que todos los valores sean numéricos JSON-friendly # Ensure all values are JSON-friendly numeric
values = [float(v) for v in value.astype(float).tolist()] values = [float(v) for v in value.astype(float).tolist()]
return { return {
"labels": labels, "labels": labels,
@@ -251,7 +251,7 @@ class BeyondMetricsPipeline:
def load_dimensions_config(path: str) -> Dict[str, Any]: def load_dimensions_config(path: str) -> Dict[str, Any]:
""" """
Carga un JSON de configuración que contiene solo el bloque 'dimensions'. Loads a JSON configuration file containing only the 'dimensions' block.
""" """
import json import json
from pathlib import Path from pathlib import Path
@@ -261,7 +261,7 @@ def load_dimensions_config(path: str) -> Dict[str, Any]:
dimensions = cfg.get("dimensions") dimensions = cfg.get("dimensions")
if dimensions is None: if dimensions is None:
raise ValueError("El fichero de configuración debe contener un bloque 'dimensions'.") raise ValueError("The configuration file must contain a 'dimensions' block.")
return dimensions return dimensions
@@ -274,12 +274,12 @@ def build_pipeline(
post_run: Optional[List[PostRunCallback]] = None, post_run: Optional[List[PostRunCallback]] = None,
) -> BeyondMetricsPipeline: ) -> BeyondMetricsPipeline:
""" """
Crea un BeyondMetricsPipeline a partir de: Creates a BeyondMetricsPipeline from:
- ruta al JSON con dimensiones/métricas - path to JSON with dimensions/metrics
- un DataSource ya construido (local/S3/Drive) - an already constructed DataSource (local/S3/Drive)
- un ResultsSink ya construido (local/S3/Drive) - an already constructed ResultsSink (local/S3/Drive)
- una lista opcional de callbacks post_run que se ejecutan al final - an optional list of post_run callbacks that execute at the end
(útil para scorers, agentes de IA, etc.) (useful for scorers, AI agents, etc.)
""" """
dims_cfg = load_dimensions_config(dimensions_config_path) dims_cfg = load_dimensions_config(dimensions_config_path)
return BeyondMetricsPipeline( return BeyondMetricsPipeline(