from __future__ import annotations from pathlib import Path from uuid import uuid4 from datetime import datetime from typing import Optional, Literal import json import zipfile from beyond_metrics.io import LocalDataSource, LocalResultsSink, ResultsSink from beyond_metrics.pipeline import build_pipeline from beyond_metrics.dimensions.EconomyCost import EconomyConfig from beyond_flows.scorers import AgenticScorer from typing import Any, Mapping, Optional, Dict def _build_economy_config(economy_data: Optional[Mapping[str, Any]]) -> EconomyConfig: """ Builds EconomyConfig validating types and preventing the type checker from mixing floats and dicts in a single dictionary. """ # Default values default_customer_segments: Dict[str, str] = { "VIP": "high", "Premium": "high", "Soporte_General": "medium", "Ventas": "medium", "Basico": "low", } if economy_data is None: return EconomyConfig( labor_cost_per_hour=20.0, overhead_rate=0.10, tech_costs_annual=5000.0, automation_cpi=0.20, automation_volume_share=0.5, automation_success_rate=0.6, customer_segments=default_customer_segments, ) def _get_float(field: str, default: float) -> float: value = economy_data.get(field, default) if isinstance(value, (int, float)): return float(value) raise ValueError(f"The field '{field}' must be numeric (float). Received value: {value!r}") # Scalar fields labor_cost_per_hour = _get_float("labor_cost_per_hour", 20.0) overhead_rate = _get_float("overhead_rate", 0.10) tech_costs_annual = _get_float("tech_costs_annual", 5000.0) automation_cpi = _get_float("automation_cpi", 0.20) automation_volume_share = _get_float("automation_volume_share", 0.5) automation_success_rate = _get_float("automation_success_rate", 0.6) # customer_segments may or may not be present; if present, validate it customer_segments: Dict[str, str] = dict(default_customer_segments) if "customer_segments" in economy_data and economy_data["customer_segments"] is not None: cs = economy_data["customer_segments"] if not isinstance(cs, Mapping): raise ValueError("customer_segments must be a dictionary {segment: level}") for k, v in cs.items(): if not isinstance(v, str): raise ValueError( f"The value of customer_segments['{k}'] must be str. Received value: {v!r}" ) customer_segments[str(k)] = v return EconomyConfig( labor_cost_per_hour=labor_cost_per_hour, overhead_rate=overhead_rate, tech_costs_annual=tech_costs_annual, automation_cpi=automation_cpi, automation_volume_share=automation_volume_share, automation_success_rate=automation_success_rate, customer_segments=customer_segments, ) def run_analysis( input_path: Path, economy_data: Optional[dict] = None, return_type: Literal["path", "zip"] = "path", company_folder: Optional[str] = None, ) -> tuple[Path, Optional[Path]]: """ Executes the pipeline on a CSV and returns: - (results_dir, None) if return_type == "path" - (results_dir, zip_path) if return_type == "zip" input_path can be absolute or relative, but results will ALWAYS be written to the CSV's folder, inside a subfolder named timestamp (and optionally prefixed by company_folder). """ input_path = input_path.resolve() if not input_path.exists(): raise FileNotFoundError(f"CSV does not exist: {input_path}") if not input_path.is_file(): raise ValueError(f"Path does not point to a CSV file: {input_path}") # Folder where the CSV is located csv_dir = input_path.parent # DataSource and ResultsSink point to THAT folder datasource = LocalDataSource(base_dir=str(csv_dir)) sink = LocalResultsSink(base_dir=str(csv_dir)) # Economy config economy_cfg = _build_economy_config(economy_data) dimension_params: Dict[str, Mapping[str, Any]] = { "economy_costs": { "config": economy_cfg, } } # Scoring callback def agentic_post_run(results: Dict[str, Any], run_base: str, sink_: ResultsSink) -> None: scorer = AgenticScorer() try: agentic = scorer.compute_and_return(results) except Exception as e: # Don't break the entire execution if the scorer fails agentic = { "error": f"{type(e).__name__}: {e}", } sink_.write_json(f"{run_base}/agentic_readiness.json", agentic) pipeline = build_pipeline( dimensions_config_path="beyond_metrics/configs/beyond_metrics_config.json", datasource=datasource, sink=sink, dimension_params=dimension_params, post_run=[agentic_post_run], ) # Execution timestamp (results folder name) timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S") # Logical results path (RELATIVE to sink's base_dir) if company_folder: # E.g. "Cliente_X/20251208-153045" run_dir_rel = f"{company_folder.rstrip('/')}/{timestamp}" else: # E.g. "20251208-153045" run_dir_rel = timestamp # Execute pipeline: CSV is passed relative to csv_dir pipeline.run( input_path=input_path.name, run_dir=run_dir_rel, ) # Actual folder with results results_dir = csv_dir / run_dir_rel if return_type == "path": return results_dir, None # --- ZIP results ------------------------------------------------------- # Create the ZIP in the SAME folder as the CSV, with name based on run_dir zip_name = f"{run_dir_rel.replace('/', '_')}.zip" zip_path = csv_dir / zip_name with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf: for file in results_dir.rglob("*"): if file.is_file(): # Store it relative to the results folder arcname = file.relative_to(results_dir.parent) zipf.write(file, arcname) return results_dir, zip_path from typing import Any, Mapping, Dict # ensure these imports are at the top def run_analysis_collect_json( input_path: Path, economy_data: Optional[dict] = None, analysis: Literal["basic", "premium"] = "premium", company_folder: Optional[str] = None, ) -> Dict[str, Any]: """ Executes the pipeline and returns a single JSON with all results. Unlike run_analysis: - Does NOT write results.json - Does NOT write agentic_readiness.json - agentic_readiness is embedded in the results dict The `analysis` parameter allows choosing the analysis level: - "basic" -> beyond_metrics/configs/basic.json - "premium" -> beyond_metrics/configs/beyond_metrics_config.json """ # Normalize and validate the CSV path input_path = input_path.resolve() if not input_path.exists(): raise FileNotFoundError(f"CSV does not exist: {input_path}") if not input_path.is_file(): raise ValueError(f"Path does not point to a CSV file: {input_path}") # Folder where the CSV is located csv_dir = input_path.parent # DataSource and ResultsSink point to THAT folder datasource = LocalDataSource(base_dir=str(csv_dir)) sink = LocalResultsSink(base_dir=str(csv_dir)) # Economy config economy_cfg = _build_economy_config(economy_data) dimension_params: Dict[str, Mapping[str, Any]] = { "economy_costs": { "config": economy_cfg, } } # Choose the dimensions config file based on `analysis` if analysis == "basic": dimensions_config_path = "beyond_metrics/configs/basic.json" else: dimensions_config_path = "beyond_metrics/configs/beyond_metrics_config.json" # Post-run callback: add agentic_readiness to the final JSON (without writing files) def agentic_post_run(results: Dict[str, Any], run_base: str, sink_: ResultsSink) -> None: scorer = AgenticScorer() try: agentic = scorer.compute_and_return(results) except Exception as e: agentic = {"error": f"{type(e).__name__}: {e}"} results["agentic_readiness"] = agentic pipeline = build_pipeline( dimensions_config_path=dimensions_config_path, datasource=datasource, sink=sink, dimension_params=dimension_params, post_run=[agentic_post_run], ) # Execution timestamp (to separate possible artifacts like plots) timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S") if company_folder: run_dir_rel = f"{company_folder.rstrip('/')}/{timestamp}" else: run_dir_rel = timestamp # Execute pipeline without writing results.json results = pipeline.run( input_path=input_path.name, run_dir=run_dir_rel, write_results_json=False, ) return results