Files
Claude 9caa382010 Translate Phase 3 low-priority backend files (complete Spanish-to-English translation)
Phase 3 of Spanish-to-English translation for low-priority backend files:

Backend core modules (4 files):
- Volumetria.py: Translated ~15 occurrences (docstrings, comments, plot labels, day abbreviations)
- agent.py: Translated ~15 occurrences (system prompts, docstrings, error messages)
- pipeline.py: Translated ~10 occurrences (log messages, docstrings, comments)
- analysis_service.py: Translated ~10 occurrences (docstrings, error messages, comments)

All function names, class names, and variable names preserved for API compatibility.
Frontend and backend compilation tested and verified successful.

This completes the comprehensive Spanish-to-English translation project:
- Phase 1 (High Priority): 3 files - backendMapper.ts, analysisGenerator.ts, realDataAnalysis.ts
- Phase 2 (Medium Priority): 5 files - dataTransformation.ts, segmentClassifier.ts, + 3 dimension files
- Phase 3 (Low Priority): 4 files - Volumetria.py, agent.py, pipeline.py, analysis_service.py

Total files translated: 12 files (5 frontend TypeScript + 7 backend Python)
All critical path translations complete.

Related to TRANSLATION_STATUS.md Phase 3 completion.

https://claude.ai/code/session_01GNbnkFoESkRcnPr3bLCYDg
2026-02-07 11:15:47 +00:00

263 lines
8.9 KiB
Python

from __future__ import annotations
from pathlib import Path
from uuid import uuid4
from datetime import datetime
from typing import Optional, Literal
import json
import zipfile
from beyond_metrics.io import LocalDataSource, LocalResultsSink, ResultsSink
from beyond_metrics.pipeline import build_pipeline
from beyond_metrics.dimensions.EconomyCost import EconomyConfig
from beyond_flows.scorers import AgenticScorer
from typing import Any, Mapping, Optional, Dict
def _build_economy_config(economy_data: Optional[Mapping[str, Any]]) -> EconomyConfig:
"""
Builds EconomyConfig validating types and preventing the type checker
from mixing floats and dicts in a single dictionary.
"""
# Default values
default_customer_segments: Dict[str, str] = {
"VIP": "high",
"Premium": "high",
"Soporte_General": "medium",
"Ventas": "medium",
"Basico": "low",
}
if economy_data is None:
return EconomyConfig(
labor_cost_per_hour=20.0,
overhead_rate=0.10,
tech_costs_annual=5000.0,
automation_cpi=0.20,
automation_volume_share=0.5,
automation_success_rate=0.6,
customer_segments=default_customer_segments,
)
def _get_float(field: str, default: float) -> float:
value = economy_data.get(field, default)
if isinstance(value, (int, float)):
return float(value)
raise ValueError(f"The field '{field}' must be numeric (float). Received value: {value!r}")
# Scalar fields
labor_cost_per_hour = _get_float("labor_cost_per_hour", 20.0)
overhead_rate = _get_float("overhead_rate", 0.10)
tech_costs_annual = _get_float("tech_costs_annual", 5000.0)
automation_cpi = _get_float("automation_cpi", 0.20)
automation_volume_share = _get_float("automation_volume_share", 0.5)
automation_success_rate = _get_float("automation_success_rate", 0.6)
# customer_segments may or may not be present; if present, validate it
customer_segments: Dict[str, str] = dict(default_customer_segments)
if "customer_segments" in economy_data and economy_data["customer_segments"] is not None:
cs = economy_data["customer_segments"]
if not isinstance(cs, Mapping):
raise ValueError("customer_segments must be a dictionary {segment: level}")
for k, v in cs.items():
if not isinstance(v, str):
raise ValueError(
f"The value of customer_segments['{k}'] must be str. Received value: {v!r}"
)
customer_segments[str(k)] = v
return EconomyConfig(
labor_cost_per_hour=labor_cost_per_hour,
overhead_rate=overhead_rate,
tech_costs_annual=tech_costs_annual,
automation_cpi=automation_cpi,
automation_volume_share=automation_volume_share,
automation_success_rate=automation_success_rate,
customer_segments=customer_segments,
)
def run_analysis(
input_path: Path,
economy_data: Optional[dict] = None,
return_type: Literal["path", "zip"] = "path",
company_folder: Optional[str] = None,
) -> tuple[Path, Optional[Path]]:
"""
Executes the pipeline on a CSV and returns:
- (results_dir, None) if return_type == "path"
- (results_dir, zip_path) if return_type == "zip"
input_path can be absolute or relative, but results
will ALWAYS be written to the CSV's folder, inside a
subfolder named timestamp (and optionally prefixed
by company_folder).
"""
input_path = input_path.resolve()
if not input_path.exists():
raise FileNotFoundError(f"CSV does not exist: {input_path}")
if not input_path.is_file():
raise ValueError(f"Path does not point to a CSV file: {input_path}")
# Folder where the CSV is located
csv_dir = input_path.parent
# DataSource and ResultsSink point to THAT folder
datasource = LocalDataSource(base_dir=str(csv_dir))
sink = LocalResultsSink(base_dir=str(csv_dir))
# Economy config
economy_cfg = _build_economy_config(economy_data)
dimension_params: Dict[str, Mapping[str, Any]] = {
"economy_costs": {
"config": economy_cfg,
}
}
# Scoring callback
def agentic_post_run(results: Dict[str, Any], run_base: str, sink_: ResultsSink) -> None:
scorer = AgenticScorer()
try:
agentic = scorer.compute_and_return(results)
except Exception as e:
# Don't break the entire execution if the scorer fails
agentic = {
"error": f"{type(e).__name__}: {e}",
}
sink_.write_json(f"{run_base}/agentic_readiness.json", agentic)
pipeline = build_pipeline(
dimensions_config_path="beyond_metrics/configs/beyond_metrics_config.json",
datasource=datasource,
sink=sink,
dimension_params=dimension_params,
post_run=[agentic_post_run],
)
# Execution timestamp (results folder name)
timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S")
# Logical results path (RELATIVE to sink's base_dir)
if company_folder:
# E.g. "Cliente_X/20251208-153045"
run_dir_rel = f"{company_folder.rstrip('/')}/{timestamp}"
else:
# E.g. "20251208-153045"
run_dir_rel = timestamp
# Execute pipeline: CSV is passed relative to csv_dir
pipeline.run(
input_path=input_path.name,
run_dir=run_dir_rel,
)
# Actual folder with results
results_dir = csv_dir / run_dir_rel
if return_type == "path":
return results_dir, None
# --- ZIP results -------------------------------------------------------
# Create the ZIP in the SAME folder as the CSV, with name based on run_dir
zip_name = f"{run_dir_rel.replace('/', '_')}.zip"
zip_path = csv_dir / zip_name
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
for file in results_dir.rglob("*"):
if file.is_file():
# Store it relative to the results folder
arcname = file.relative_to(results_dir.parent)
zipf.write(file, arcname)
return results_dir, zip_path
from typing import Any, Mapping, Dict # ensure these imports are at the top
def run_analysis_collect_json(
input_path: Path,
economy_data: Optional[dict] = None,
analysis: Literal["basic", "premium"] = "premium",
company_folder: Optional[str] = None,
) -> Dict[str, Any]:
"""
Executes the pipeline and returns a single JSON with all results.
Unlike run_analysis:
- Does NOT write results.json
- Does NOT write agentic_readiness.json
- agentic_readiness is embedded in the results dict
The `analysis` parameter allows choosing the analysis level:
- "basic" -> beyond_metrics/configs/basic.json
- "premium" -> beyond_metrics/configs/beyond_metrics_config.json
"""
# Normalize and validate the CSV path
input_path = input_path.resolve()
if not input_path.exists():
raise FileNotFoundError(f"CSV does not exist: {input_path}")
if not input_path.is_file():
raise ValueError(f"Path does not point to a CSV file: {input_path}")
# Folder where the CSV is located
csv_dir = input_path.parent
# DataSource and ResultsSink point to THAT folder
datasource = LocalDataSource(base_dir=str(csv_dir))
sink = LocalResultsSink(base_dir=str(csv_dir))
# Economy config
economy_cfg = _build_economy_config(economy_data)
dimension_params: Dict[str, Mapping[str, Any]] = {
"economy_costs": {
"config": economy_cfg,
}
}
# Choose the dimensions config file based on `analysis`
if analysis == "basic":
dimensions_config_path = "beyond_metrics/configs/basic.json"
else:
dimensions_config_path = "beyond_metrics/configs/beyond_metrics_config.json"
# Post-run callback: add agentic_readiness to the final JSON (without writing files)
def agentic_post_run(results: Dict[str, Any], run_base: str, sink_: ResultsSink) -> None:
scorer = AgenticScorer()
try:
agentic = scorer.compute_and_return(results)
except Exception as e:
agentic = {"error": f"{type(e).__name__}: {e}"}
results["agentic_readiness"] = agentic
pipeline = build_pipeline(
dimensions_config_path=dimensions_config_path,
datasource=datasource,
sink=sink,
dimension_params=dimension_params,
post_run=[agentic_post_run],
)
# Execution timestamp (to separate possible artifacts like plots)
timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S")
if company_folder:
run_dir_rel = f"{company_folder.rstrip('/')}/{timestamp}"
else:
run_dir_rel = timestamp
# Execute pipeline without writing results.json
results = pipeline.run(
input_path=input_path.name,
run_dir=run_dir_rel,
write_results_json=False,
)
return results