from __future__ import annotations from dataclasses import dataclass from typing import Dict, List, Optional, Any import numpy as np import pandas as pd import matplotlib.pyplot as plt from matplotlib.axes import Axes REQUIRED_COLUMNS_ECON: List[str] = [ "interaction_id", "datetime_start", "queue_skill", "channel", "duration_talk", "hold_time", "wrap_up_time", ] @dataclass class EconomyConfig: """ Parámetros manuales para la dimensión de Economía y Costes. - labor_cost_per_hour: coste total/hora de un agente (fully loaded). - overhead_rate: % overhead variable (ej. 0.1 = 10% sobre labor). - tech_costs_annual: coste anual de tecnología (licencias, infra, ...). - automation_cpi: coste por interacción automatizada (ej. 0.15€). - automation_volume_share: % del volumen automatizable (0-1). - automation_success_rate: % éxito de la automatización (0-1). - customer_segments: mapping opcional skill -> segmento ("high"/"medium"/"low") para futuros insights de ROI por segmento. """ labor_cost_per_hour: float overhead_rate: float = 0.0 tech_costs_annual: float = 0.0 automation_cpi: Optional[float] = None automation_volume_share: float = 0.0 automation_success_rate: float = 0.0 customer_segments: Optional[Dict[str, str]] = None @dataclass class EconomyCostMetrics: """ DIMENSIÓN 4: ECONOMÍA y COSTES Propósito: - Cuantificar el COSTE actual (CPI, coste anual). - Estimar el impacto de overhead y tecnología. - Calcular un primer estimado de "coste de ineficiencia" y ahorro potencial. Requiere: - Columnas del dataset transaccional (ver REQUIRED_COLUMNS_ECON). Inputs opcionales vía EconomyConfig: - labor_cost_per_hour (obligatorio para cualquier cálculo de €). - overhead_rate, tech_costs_annual, automation_*. - customer_segments (para insights de ROI por segmento). """ df: pd.DataFrame config: Optional[EconomyConfig] = None def __post_init__(self) -> None: self._validate_columns() self._prepare_data() # ------------------------------------------------------------------ # # Helpers internos # ------------------------------------------------------------------ # def _validate_columns(self) -> None: missing = [c for c in REQUIRED_COLUMNS_ECON if c not in self.df.columns] if missing: raise ValueError( f"Faltan columnas obligatorias para EconomyCostMetrics: {missing}" ) def _prepare_data(self) -> None: df = self.df.copy() df["datetime_start"] = pd.to_datetime(df["datetime_start"], errors="coerce") for col in ["duration_talk", "hold_time", "wrap_up_time"]: df[col] = pd.to_numeric(df[col], errors="coerce") df["queue_skill"] = df["queue_skill"].astype(str).str.strip() df["channel"] = df["channel"].astype(str).str.strip() # Handle time = talk + hold + wrap df["handle_time"] = ( df["duration_talk"].fillna(0) + df["hold_time"].fillna(0) + df["wrap_up_time"].fillna(0) ) # segundos # Filtrar por record_status para cálculos de AHT/CPI # Solo incluir registros VALID (excluir NOISE, ZOMBIE, ABANDON) if "record_status" in df.columns: df["record_status"] = df["record_status"].astype(str).str.strip().str.upper() df["_is_valid_for_cost"] = df["record_status"] == "VALID" else: # Legacy data sin record_status: incluir todo df["_is_valid_for_cost"] = True self.df = df @property def is_empty(self) -> bool: return self.df.empty def _has_cost_config(self) -> bool: return self.config is not None and self.config.labor_cost_per_hour is not None # ------------------------------------------------------------------ # # KPI 1: CPI por canal/skill # ------------------------------------------------------------------ # def cpi_by_skill_channel(self) -> pd.DataFrame: """ CPI (Coste Por Interacción) por skill/canal. CPI = (Labor_cost_per_interaction + Overhead_variable) / EFFECTIVE_PRODUCTIVITY - Labor_cost_per_interaction = (labor_cost_per_hour * AHT_hours) - Overhead_variable = overhead_rate * Labor_cost_per_interaction - EFFECTIVE_PRODUCTIVITY = 0.70 (70% - accounts for non-productive time) Excluye registros abandonados del cálculo de costes para consistencia con el path del frontend (fresh CSV). Si no hay config de costes -> devuelve DataFrame vacío. Incluye queue_skill y channel como columnas (no solo índice) para que el frontend pueda hacer lookup por nombre de skill. """ if not self._has_cost_config(): return pd.DataFrame() cfg = self.config assert cfg is not None # para el type checker df = self.df.copy() if df.empty: return pd.DataFrame() # Filter out abandonments for cost calculation (consistency with frontend) if "is_abandoned" in df.columns: df_cost = df[df["is_abandoned"] != True] else: df_cost = df # Filtrar por record_status: solo VALID para cálculo de AHT # Excluye NOISE, ZOMBIE, ABANDON if "_is_valid_for_cost" in df_cost.columns: df_cost = df_cost[df_cost["_is_valid_for_cost"] == True] if df_cost.empty: return pd.DataFrame() # AHT por skill/canal (en segundos) - solo registros VALID grouped = df_cost.groupby(["queue_skill", "channel"])["handle_time"].mean() if grouped.empty: return pd.DataFrame() aht_sec = grouped aht_hours = aht_sec / 3600.0 # Apply productivity factor (70% effectiveness) # This accounts for non-productive agent time (breaks, training, etc.) EFFECTIVE_PRODUCTIVITY = 0.70 labor_cost = cfg.labor_cost_per_hour * aht_hours overhead = labor_cost * cfg.overhead_rate raw_cpi = labor_cost + overhead cpi = raw_cpi / EFFECTIVE_PRODUCTIVITY out = pd.DataFrame( { "aht_seconds": aht_sec.round(2), "labor_cost": labor_cost.round(4), "overhead_cost": overhead.round(4), "cpi_total": cpi.round(4), } ) # Reset index to include queue_skill and channel as columns for frontend lookup return out.sort_index().reset_index() # ------------------------------------------------------------------ # # KPI 2: coste anual por skill/canal # ------------------------------------------------------------------ # def annual_cost_by_skill_channel(self) -> pd.DataFrame: """ Coste anual por skill/canal. cost_annual = CPI * volumen (cantidad de interacciones de la muestra). Nota: por simplicidad asumimos que el dataset refleja un periodo anual. Si en el futuro quieres anualizar (ej. dataset = 1 mes) se puede añadir un factor de escalado en EconomyConfig. """ cpi_table = self.cpi_by_skill_channel() if cpi_table.empty: return pd.DataFrame() df = self.df.copy() volume = ( df.groupby(["queue_skill", "channel"])["interaction_id"] .nunique() .rename("volume") ) # Set index on cpi_table to match volume's MultiIndex for join cpi_indexed = cpi_table.set_index(["queue_skill", "channel"]) joined = cpi_indexed.join(volume, how="left").fillna({"volume": 0}) joined["annual_cost"] = (joined["cpi_total"] * joined["volume"]).round(2) return joined # ------------------------------------------------------------------ # # KPI 3: desglose de costes (labor / tech / overhead) # ------------------------------------------------------------------ # def cost_breakdown(self) -> Dict[str, float]: """ Desglose % de costes: labor, overhead, tech. labor_total = sum(labor_cost_per_interaction) overhead_total = labor_total * overhead_rate tech_total = tech_costs_annual (si se ha proporcionado) Devuelve porcentajes sobre el total. Si falta configuración de coste -> devuelve {}. """ if not self._has_cost_config(): return {} cfg = self.config assert cfg is not None cpi_table = self.cpi_by_skill_channel() if cpi_table.empty: return {} df = self.df.copy() volume = ( df.groupby(["queue_skill", "channel"])["interaction_id"] .nunique() .rename("volume") ) # Set index on cpi_table to match volume's MultiIndex for join cpi_indexed = cpi_table.set_index(["queue_skill", "channel"]) joined = cpi_indexed.join(volume, how="left").fillna({"volume": 0}) # Costes anuales de labor y overhead annual_labor = (joined["labor_cost"] * joined["volume"]).sum() annual_overhead = (joined["overhead_cost"] * joined["volume"]).sum() annual_tech = cfg.tech_costs_annual total = annual_labor + annual_overhead + annual_tech if total <= 0: return {} return { "labor_pct": round(annual_labor / total * 100, 2), "overhead_pct": round(annual_overhead / total * 100, 2), "tech_pct": round(annual_tech / total * 100, 2), "labor_annual": round(annual_labor, 2), "overhead_annual": round(annual_overhead, 2), "tech_annual": round(annual_tech, 2), "total_annual": round(total, 2), } # ------------------------------------------------------------------ # # KPI 4: coste de ineficiencia (€ por variabilidad/escalación) # ------------------------------------------------------------------ # def inefficiency_cost_by_skill_channel(self) -> pd.DataFrame: """ Estimación muy simplificada de coste de ineficiencia: Para cada skill/canal: - AHT_p50, AHT_p90 (segundos). - Delta = max(0, AHT_p90 - AHT_p50). - Se asume que ~40% de las interacciones están por encima de la mediana. - Ineff_seconds = Delta * volume * 0.4 - Ineff_cost = LaborCPI_per_second * Ineff_seconds NOTA: Es un modelo aproximado para cuantificar "orden de magnitud". """ if not self._has_cost_config(): return pd.DataFrame() cfg = self.config assert cfg is not None df = self.df.copy() # Filtrar por record_status: solo VALID para cálculo de AHT # Excluye NOISE, ZOMBIE, ABANDON if "_is_valid_for_cost" in df.columns: df = df[df["_is_valid_for_cost"] == True] grouped = df.groupby(["queue_skill", "channel"]) stats = grouped["handle_time"].agg( aht_p50=lambda s: float(np.percentile(s.dropna(), 50)), aht_p90=lambda s: float(np.percentile(s.dropna(), 90)), volume="count", ) if stats.empty: return pd.DataFrame() # CPI para obtener coste/segundo de labor # cpi_by_skill_channel now returns with reset_index, so we need to set index for join cpi_table_raw = self.cpi_by_skill_channel() if cpi_table_raw.empty: return pd.DataFrame() # Set queue_skill+channel as index for the join cpi_table = cpi_table_raw.set_index(["queue_skill", "channel"]) merged = stats.join(cpi_table[["labor_cost"]], how="left") merged = merged.fillna(0.0) delta = (merged["aht_p90"] - merged["aht_p50"]).clip(lower=0.0) affected_fraction = 0.4 # aproximación ineff_seconds = delta * merged["volume"] * affected_fraction # labor_cost = coste por interacción con AHT medio; # aproximamos coste/segundo como labor_cost / AHT_medio aht_mean = grouped["handle_time"].mean() merged["aht_mean"] = aht_mean cost_per_second = merged["labor_cost"] / merged["aht_mean"].replace(0, np.nan) cost_per_second = cost_per_second.fillna(0.0) ineff_cost = (ineff_seconds * cost_per_second).round(2) merged["ineff_seconds"] = ineff_seconds.round(2) merged["ineff_cost"] = ineff_cost # Reset index to include queue_skill and channel as columns for frontend lookup return merged[["aht_p50", "aht_p90", "volume", "ineff_seconds", "ineff_cost"]].reset_index() # ------------------------------------------------------------------ # # KPI 5: ahorro potencial anual por automatización # ------------------------------------------------------------------ # def potential_savings(self) -> Dict[str, Any]: """ Ahorro potencial anual basado en: Ahorro = (CPI_humano - CPI_automatizado) * Volumen_automatizable * Tasa_éxito Donde: - CPI_humano = media ponderada de cpi_total. - CPI_automatizado = config.automation_cpi - Volumen_automatizable = volume_total * automation_volume_share - Tasa_éxito = automation_success_rate Si faltan parámetros en config -> devuelve {}. """ if not self._has_cost_config(): return {} cfg = self.config assert cfg is not None if cfg.automation_cpi is None or cfg.automation_volume_share <= 0 or cfg.automation_success_rate <= 0: return {} cpi_table = self.annual_cost_by_skill_channel() if cpi_table.empty: return {} total_volume = cpi_table["volume"].sum() if total_volume <= 0: return {} # CPI humano medio ponderado weighted_cpi = ( (cpi_table["cpi_total"] * cpi_table["volume"]).sum() / total_volume ) volume_automatizable = total_volume * cfg.automation_volume_share effective_volume = volume_automatizable * cfg.automation_success_rate delta_cpi = max(0.0, weighted_cpi - cfg.automation_cpi) annual_savings = delta_cpi * effective_volume return { "cpi_humano": round(weighted_cpi, 4), "cpi_automatizado": round(cfg.automation_cpi, 4), "volume_total": float(total_volume), "volume_automatizable": float(volume_automatizable), "effective_volume": float(effective_volume), "annual_savings": round(annual_savings, 2), } # ------------------------------------------------------------------ # # PLOTS # ------------------------------------------------------------------ # def plot_cost_waterfall(self) -> Axes: """ Waterfall de costes anuales (labor + tech + overhead). """ breakdown = self.cost_breakdown() if not breakdown: fig, ax = plt.subplots() ax.text(0.5, 0.5, "Sin configuración de costes", ha="center", va="center") ax.set_axis_off() return ax labels = ["Labor", "Overhead", "Tech"] values = [ breakdown["labor_annual"], breakdown["overhead_annual"], breakdown["tech_annual"], ] fig, ax = plt.subplots(figsize=(8, 4)) running = 0.0 positions = [] bottoms = [] for v in values: positions.append(running) bottoms.append(running) running += v # barras estilo waterfall x = np.arange(len(labels)) ax.bar(x, values) ax.set_xticks(x) ax.set_xticklabels(labels) ax.set_ylabel("€ anuales") ax.set_title("Desglose anual de costes") for idx, v in enumerate(values): ax.text(idx, v, f"{v:,.0f}", ha="center", va="bottom") ax.grid(axis="y", alpha=0.3) return ax def plot_cpi_by_channel(self) -> Axes: """ Gráfico de barras de CPI medio por canal. """ cpi_table = self.cpi_by_skill_channel() if cpi_table.empty: fig, ax = plt.subplots() ax.text(0.5, 0.5, "Sin configuración de costes", ha="center", va="center") ax.set_axis_off() return ax df = self.df.copy() volume = ( df.groupby(["queue_skill", "channel"])["interaction_id"] .nunique() .rename("volume") ) # Set index on cpi_table to match volume's MultiIndex for join cpi_indexed = cpi_table.set_index(["queue_skill", "channel"]) joined = cpi_indexed.join(volume, how="left").fillna({"volume": 0}) # CPI medio ponderado por canal per_channel = ( joined.reset_index() .groupby("channel") .apply(lambda g: (g["cpi_total"] * g["volume"]).sum() / max(g["volume"].sum(), 1)) .rename("cpi_mean") .round(4) ) fig, ax = plt.subplots(figsize=(6, 4)) per_channel.plot(kind="bar", ax=ax) ax.set_xlabel("Canal") ax.set_ylabel("CPI medio (€)") ax.set_title("Coste por interacción (CPI) por canal") ax.grid(axis="y", alpha=0.3) return ax