Files
BeyondCXAnalytics-Demo/backend/beyond_metrics/pipeline.py
Claude 9caa382010 Translate Phase 3 low-priority backend files (complete Spanish-to-English translation)
Phase 3 of Spanish-to-English translation for low-priority backend files:

Backend core modules (4 files):
- Volumetria.py: Translated ~15 occurrences (docstrings, comments, plot labels, day abbreviations)
- agent.py: Translated ~15 occurrences (system prompts, docstrings, error messages)
- pipeline.py: Translated ~10 occurrences (log messages, docstrings, comments)
- analysis_service.py: Translated ~10 occurrences (docstrings, error messages, comments)

All function names, class names, and variable names preserved for API compatibility.
Frontend and backend compilation tested and verified successful.

This completes the comprehensive Spanish-to-English translation project:
- Phase 1 (High Priority): 3 files - backendMapper.ts, analysisGenerator.ts, realDataAnalysis.ts
- Phase 2 (Medium Priority): 5 files - dataTransformation.ts, segmentClassifier.ts, + 3 dimension files
- Phase 3 (Low Priority): 4 files - Volumetria.py, agent.py, pipeline.py, analysis_service.py

Total files translated: 12 files (5 frontend TypeScript + 7 backend Python)
All critical path translations complete.

Related to TRANSLATION_STATUS.md Phase 3 completion.

https://claude.ai/code/session_01GNbnkFoESkRcnPr3bLCYDg
2026-02-07 11:15:47 +00:00

292 lines
9.2 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from importlib import import_module
from typing import Any, Dict, List, Mapping, Optional, cast, Callable
import logging
import os
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from .io import (
DataSource,
ResultsSink,
)
LOGGER = logging.getLogger(__name__)
def setup_basic_logging(level: str = "INFO") -> None:
"""
Basic logging configuration, if needed from scripts.
"""
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
format="%(asctime)s %(levelname)s [%(name)s] %(message)s",
)
def _import_class(path: str) -> type:
"""
Dynamic import of a class from a string like:
"beyond_metrics.dimensions.VolumetriaMetrics"
"""
LOGGER.debug("Importing class %s", path)
module_name, class_name = path.rsplit(".", 1)
module = import_module(module_name)
cls = getattr(module, class_name)
return cls
def _serialize_for_json(obj: Any) -> Any:
"""
Converts typical numpy/pandas objects to JSON-friendly types.
"""
if obj is None or isinstance(obj, (str, int, float, bool)):
return obj
if isinstance(obj, (np.integer, np.floating)):
return float(obj)
if isinstance(obj, pd.DataFrame):
return obj.to_dict(orient="records")
if isinstance(obj, pd.Series):
return obj.to_list()
if isinstance(obj, (list, tuple)):
return [_serialize_for_json(x) for x in obj]
if isinstance(obj, dict):
return {str(k): _serialize_for_json(v) for k, v in obj.items()}
return str(obj)
PostRunCallback = Callable[[Dict[str, Any], str, ResultsSink], None]
@dataclass
class BeyondMetricsPipeline:
"""
Main BeyondMetrics pipeline.
- Reads a CSV from a DataSource (local, S3, Google Drive, etc.).
- Executes dimensions configured in a config dict.
- Serializes numeric/tabular results to JSON.
- Saves images from methods starting with 'plot_'.
"""
datasource: DataSource
sink: ResultsSink
dimensions_config: Mapping[str, Any]
dimension_params: Optional[Mapping[str, Mapping[str, Any]]] = None
post_run: Optional[List[PostRunCallback]] = None
def run(
self,
input_path: str,
run_dir: str,
*,
write_results_json: bool = True,
) -> Dict[str, Any]:
LOGGER.info("Starting BeyondMetricsPipeline execution")
LOGGER.info("Reading input CSV: %s", input_path)
# 1) Read data
df = self.datasource.read_csv(input_path)
LOGGER.info("CSV read with %d rows and %d columns", df.shape[0], df.shape[1])
# 2) Determine output folder/base for this execution
run_base = run_dir.rstrip("/")
LOGGER.info("Base path for this execution: %s", run_base)
# 3) Execute dimensions
dimensions_cfg = self.dimensions_config
if not isinstance(dimensions_cfg, dict):
raise ValueError("The 'dimensions' block must be a dict.")
all_results: Dict[str, Any] = {}
for dim_name, dim_cfg in dimensions_cfg.items():
if not isinstance(dim_cfg, dict):
raise ValueError(f"Invalid config for dimension '{dim_name}' (must be dict).")
if not dim_cfg.get("enabled", True):
LOGGER.info("Dimension '%s' disabled; skipping.", dim_name)
continue
class_path = dim_cfg.get("class")
if not class_path:
raise ValueError(f"Missing 'class' in dimension '{dim_name}'.")
metrics: List[str] = dim_cfg.get("metrics", [])
if not metrics:
LOGGER.info("Dimension '%s' has no configured metrics; skipping.", dim_name)
continue
cls = _import_class(class_path)
extra_kwargs = {}
if self.dimension_params is not None:
extra_kwargs = self.dimension_params.get(dim_name, {}) or {}
# Dimensions receive df in the constructor
instance = cls(df, **extra_kwargs)
dim_results: Dict[str, Any] = {}
for metric_name in metrics:
LOGGER.info(" - Executing metric '%s.%s'", dim_name, metric_name)
result = self._execute_metric(instance, metric_name, run_base, dim_name)
dim_results[metric_name] = result
all_results[dim_name] = dim_results
# 4) Save results JSON (optional)
if write_results_json:
results_json_path = f"{run_base}/results.json"
LOGGER.info("Saving results to JSON: %s", results_json_path)
self.sink.write_json(results_json_path, all_results)
# 5) Execute post-run callbacks (scorers, agents, etc.)
if self.post_run:
LOGGER.info("Executing %d post-run callbacks...", len(self.post_run))
for cb in self.post_run:
try:
LOGGER.info("Executing post-run callback: %s", cb)
cb(all_results, run_base, self.sink)
except Exception:
LOGGER.exception("Error executing post-run callback %s", cb)
LOGGER.info("Execution completed successfully.")
return all_results
def _execute_metric(
self,
instance: Any,
metric_name: str,
run_base: str,
dim_name: str,
) -> Any:
"""
Executes a metric:
- If it starts with 'plot_' -> assumed to return Axes:
- the figure is saved as PNG
- returns {"type": "image", "path": "..."}
- Otherwise, the value is serialized to JSON.
Additionally, for categorical metrics (by skill/channel) from the
'volumetry' dimension, we explicitly return labels and values so
the frontend can know what each number belongs to.
"""
method = getattr(instance, metric_name, None)
if method is None or not callable(method):
raise ValueError(
f"Metric '{metric_name}' does not exist in {type(instance).__name__}"
)
# Plot case
if metric_name.startswith("plot_"):
ax = method()
if not isinstance(ax, Axes):
raise TypeError(
f"Metric '{metric_name}' of '{type(instance).__name__}' "
f"should return a matplotlib.axes.Axes"
)
fig = ax.get_figure()
if fig is None:
raise RuntimeError(
"Axes.get_figure() returned None, which should not happen."
)
fig = cast(Figure, fig)
filename = f"{dim_name}_{metric_name}.png"
img_path = f"{run_base}/{filename}"
LOGGER.debug("Saving figure to %s", img_path)
self.sink.write_figure(img_path, fig)
plt.close(fig)
return {
"type": "image",
"path": img_path,
}
# Numeric/tabular case
value = method()
# Special case: categorical series from volumetry (by skill / channel)
# Return {"labels": [...], "values": [...]} to maintain
# label information in the JSON.
if (
dim_name == "volumetry"
and isinstance(value, pd.Series)
and metric_name
in {
"volume_by_channel",
"volume_by_skill",
"channel_distribution_pct",
"skill_distribution_pct",
}
):
labels = [str(idx) for idx in value.index.tolist()]
# Ensure all values are JSON-friendly numeric
values = [float(v) for v in value.astype(float).tolist()]
return {
"labels": labels,
"values": values,
}
return _serialize_for_json(value)
def load_dimensions_config(path: str) -> Dict[str, Any]:
"""
Loads a JSON configuration file containing only the 'dimensions' block.
"""
import json
from pathlib import Path
with Path(path).open("r", encoding="utf-8") as f:
cfg = json.load(f)
dimensions = cfg.get("dimensions")
if dimensions is None:
raise ValueError("The configuration file must contain a 'dimensions' block.")
return dimensions
def build_pipeline(
dimensions_config_path: str,
datasource: DataSource,
sink: ResultsSink,
dimension_params: Optional[Mapping[str, Mapping[str, Any]]] = None,
post_run: Optional[List[PostRunCallback]] = None,
) -> BeyondMetricsPipeline:
"""
Creates a BeyondMetricsPipeline from:
- path to JSON with dimensions/metrics
- an already constructed DataSource (local/S3/Drive)
- an already constructed ResultsSink (local/S3/Drive)
- an optional list of post_run callbacks that execute at the end
(useful for scorers, AI agents, etc.)
"""
dims_cfg = load_dimensions_config(dimensions_config_path)
return BeyondMetricsPipeline(
datasource=datasource,
sink=sink,
dimensions_config=dims_cfg,
dimension_params=dimension_params,
post_run=post_run,
)