from __future__ import annotations from dataclasses import dataclass from typing import Dict, List, Optional, Any import numpy as np import pandas as pd import matplotlib.pyplot as plt from matplotlib.axes import Axes REQUIRED_COLUMNS_ECON: List[str] = [ "interaction_id", "datetime_start", "queue_skill", "channel", "duration_talk", "hold_time", "wrap_up_time", ] @dataclass class EconomyConfig: """ Manual parameters for the Economy and Cost dimension. - labor_cost_per_hour: total cost/hour of an agent (fully loaded). - overhead_rate: % variable overhead (e.g. 0.1 = 10% over labor). - tech_costs_annual: annual technology cost (licenses, infrastructure, ...). - automation_cpi: cost per automated interaction (e.g. 0.15€). - automation_volume_share: % of automatable volume (0-1). - automation_success_rate: % automation success (0-1). - customer_segments: optional mapping skill -> segment ("high"/"medium"/"low") for future ROI insights by segment. """ labor_cost_per_hour: float overhead_rate: float = 0.0 tech_costs_annual: float = 0.0 automation_cpi: Optional[float] = None automation_volume_share: float = 0.0 automation_success_rate: float = 0.0 customer_segments: Optional[Dict[str, str]] = None @dataclass class EconomyCostMetrics: """ DIMENSION 4: ECONOMY and COSTS Purpose: - Quantify the current COST (CPI, annual cost). - Estimate the impact of overhead and technology. - Calculate an initial estimate of "inefficiency cost" and potential savings. Requires: - Columns from the transactional dataset (see REQUIRED_COLUMNS_ECON). Optional inputs via EconomyConfig: - labor_cost_per_hour (required for any € calculation). - overhead_rate, tech_costs_annual, automation_*. - customer_segments (for ROI insights by segment). """ df: pd.DataFrame config: Optional[EconomyConfig] = None def __post_init__(self) -> None: self._validate_columns() self._prepare_data() # ------------------------------------------------------------------ # # Internal helpers # ------------------------------------------------------------------ # def _validate_columns(self) -> None: missing = [c for c in REQUIRED_COLUMNS_ECON if c not in self.df.columns] if missing: raise ValueError( f"Missing required columns for EconomyCostMetrics: {missing}" ) def _prepare_data(self) -> None: df = self.df.copy() df["datetime_start"] = pd.to_datetime(df["datetime_start"], errors="coerce") for col in ["duration_talk", "hold_time", "wrap_up_time"]: df[col] = pd.to_numeric(df[col], errors="coerce") df["queue_skill"] = df["queue_skill"].astype(str).str.strip() df["channel"] = df["channel"].astype(str).str.strip() # Handle time = talk + hold + wrap df["handle_time"] = ( df["duration_talk"].fillna(0) + df["hold_time"].fillna(0) + df["wrap_up_time"].fillna(0) ) # seconds # Filter by record_status for AHT/CPI calculations # Only include VALID records (exclude NOISE, ZOMBIE, ABANDON) if "record_status" in df.columns: df["record_status"] = df["record_status"].astype(str).str.strip().str.upper() df["_is_valid_for_cost"] = df["record_status"] == "VALID" else: # Legacy data without record_status: include all df["_is_valid_for_cost"] = True self.df = df @property def is_empty(self) -> bool: return self.df.empty def _has_cost_config(self) -> bool: return self.config is not None and self.config.labor_cost_per_hour is not None # ------------------------------------------------------------------ # # KPI 1: CPI by channel/skill # ------------------------------------------------------------------ # def cpi_by_skill_channel(self) -> pd.DataFrame: """ CPI (Cost Per Interaction) by skill/channel. CPI = (Labor_cost_per_interaction + Overhead_variable) / EFFECTIVE_PRODUCTIVITY - Labor_cost_per_interaction = (labor_cost_per_hour * AHT_hours) - Overhead_variable = overhead_rate * Labor_cost_per_interaction - EFFECTIVE_PRODUCTIVITY = 0.70 (70% - accounts for non-productive time) Excludes abandoned records from cost calculation for consistency with the frontend path (fresh CSV). If there is no cost config -> returns empty DataFrame. Includes queue_skill and channel as columns (not just index) so that the frontend can lookup by skill name. """ if not self._has_cost_config(): return pd.DataFrame() cfg = self.config assert cfg is not None # for the type checker df = self.df.copy() if df.empty: return pd.DataFrame() # Filter out abandonments for cost calculation (consistency with frontend) if "is_abandoned" in df.columns: df_cost = df[df["is_abandoned"] != True] else: df_cost = df # Filter by record_status: only VALID for AHT calculation # Excludes NOISE, ZOMBIE, ABANDON if "_is_valid_for_cost" in df_cost.columns: df_cost = df_cost[df_cost["_is_valid_for_cost"] == True] if df_cost.empty: return pd.DataFrame() # AHT by skill/channel (in seconds) - only VALID records grouped = df_cost.groupby(["queue_skill", "channel"])["handle_time"].mean() if grouped.empty: return pd.DataFrame() aht_sec = grouped aht_hours = aht_sec / 3600.0 # Apply productivity factor (70% effectiveness) # This accounts for non-productive agent time (breaks, training, etc.) EFFECTIVE_PRODUCTIVITY = 0.70 labor_cost = cfg.labor_cost_per_hour * aht_hours overhead = labor_cost * cfg.overhead_rate raw_cpi = labor_cost + overhead cpi = raw_cpi / EFFECTIVE_PRODUCTIVITY out = pd.DataFrame( { "aht_seconds": aht_sec.round(2), "labor_cost": labor_cost.round(4), "overhead_cost": overhead.round(4), "cpi_total": cpi.round(4), } ) # Reset index to include queue_skill and channel as columns for frontend lookup return out.sort_index().reset_index() # ------------------------------------------------------------------ # # KPI 2: annual cost by skill/channel # ------------------------------------------------------------------ # def annual_cost_by_skill_channel(self) -> pd.DataFrame: """ Annual cost by skill/channel. cost_annual = CPI * volume (number of interactions in the sample). Note: for simplicity we assume the dataset reflects an annual period. If in the future you want to annualize (e.g. dataset = 1 month) you can add a scaling factor in EconomyConfig. """ cpi_table = self.cpi_by_skill_channel() if cpi_table.empty: return pd.DataFrame() df = self.df.copy() volume = ( df.groupby(["queue_skill", "channel"])["interaction_id"] .nunique() .rename("volume") ) # Set index on cpi_table to match volume's MultiIndex for join cpi_indexed = cpi_table.set_index(["queue_skill", "channel"]) joined = cpi_indexed.join(volume, how="left").fillna({"volume": 0}) joined["annual_cost"] = (joined["cpi_total"] * joined["volume"]).round(2) return joined # ------------------------------------------------------------------ # # KPI 3: cost breakdown (labor / tech / overhead) # ------------------------------------------------------------------ # def cost_breakdown(self) -> Dict[str, float]: """ Cost breakdown %: labor, overhead, tech. labor_total = sum(labor_cost_per_interaction) overhead_total = labor_total * overhead_rate tech_total = tech_costs_annual (if provided) Returns percentages of the total. If cost configuration is missing -> returns {}. """ if not self._has_cost_config(): return {} cfg = self.config assert cfg is not None cpi_table = self.cpi_by_skill_channel() if cpi_table.empty: return {} df = self.df.copy() volume = ( df.groupby(["queue_skill", "channel"])["interaction_id"] .nunique() .rename("volume") ) # Set index on cpi_table to match volume's MultiIndex for join cpi_indexed = cpi_table.set_index(["queue_skill", "channel"]) joined = cpi_indexed.join(volume, how="left").fillna({"volume": 0}) # Annual labor and overhead costs annual_labor = (joined["labor_cost"] * joined["volume"]).sum() annual_overhead = (joined["overhead_cost"] * joined["volume"]).sum() annual_tech = cfg.tech_costs_annual total = annual_labor + annual_overhead + annual_tech if total <= 0: return {} return { "labor_pct": round(annual_labor / total * 100, 2), "overhead_pct": round(annual_overhead / total * 100, 2), "tech_pct": round(annual_tech / total * 100, 2), "labor_annual": round(annual_labor, 2), "overhead_annual": round(annual_overhead, 2), "tech_annual": round(annual_tech, 2), "total_annual": round(total, 2), } # ------------------------------------------------------------------ # # KPI 4: inefficiency cost (€ by variability/escalation) # ------------------------------------------------------------------ # def inefficiency_cost_by_skill_channel(self) -> pd.DataFrame: """ Very simplified estimate of inefficiency cost: For each skill/channel: - AHT_p50, AHT_p90 (seconds). - Delta = max(0, AHT_p90 - AHT_p50). - Assumes that ~40% of interactions are above the median. - Ineff_seconds = Delta * volume * 0.4 - Ineff_cost = LaborCPI_per_second * Ineff_seconds NOTE: This is an approximate model to quantify "order of magnitude". """ if not self._has_cost_config(): return pd.DataFrame() cfg = self.config assert cfg is not None df = self.df.copy() # Filter by record_status: only VALID for AHT calculation # Excludes NOISE, ZOMBIE, ABANDON if "_is_valid_for_cost" in df.columns: df = df[df["_is_valid_for_cost"] == True] grouped = df.groupby(["queue_skill", "channel"]) stats = grouped["handle_time"].agg( aht_p50=lambda s: float(np.percentile(s.dropna(), 50)), aht_p90=lambda s: float(np.percentile(s.dropna(), 90)), volume="count", ) if stats.empty: return pd.DataFrame() # CPI to get cost/second of labor # cpi_by_skill_channel now returns with reset_index, so we need to set index for join cpi_table_raw = self.cpi_by_skill_channel() if cpi_table_raw.empty: return pd.DataFrame() # Set queue_skill+channel as index for the join cpi_table = cpi_table_raw.set_index(["queue_skill", "channel"]) merged = stats.join(cpi_table[["labor_cost"]], how="left") merged = merged.fillna(0.0) delta = (merged["aht_p90"] - merged["aht_p50"]).clip(lower=0.0) affected_fraction = 0.4 # approximation ineff_seconds = delta * merged["volume"] * affected_fraction # labor_cost = cost per interaction with average AHT; # approximate cost/second as labor_cost / average_AHT aht_mean = grouped["handle_time"].mean() merged["aht_mean"] = aht_mean cost_per_second = merged["labor_cost"] / merged["aht_mean"].replace(0, np.nan) cost_per_second = cost_per_second.fillna(0.0) ineff_cost = (ineff_seconds * cost_per_second).round(2) merged["ineff_seconds"] = ineff_seconds.round(2) merged["ineff_cost"] = ineff_cost # Reset index to include queue_skill and channel as columns for frontend lookup return merged[["aht_p50", "aht_p90", "volume", "ineff_seconds", "ineff_cost"]].reset_index() # ------------------------------------------------------------------ # # KPI 5: potential annual savings from automation # ------------------------------------------------------------------ # def potential_savings(self) -> Dict[str, Any]: """ Potential annual savings based on: Savings = (Human_CPI - Automated_CPI) * Automatable_volume * Success_rate Where: - Human_CPI = weighted average of cpi_total. - Automated_CPI = config.automation_cpi - Automatable_volume = volume_total * automation_volume_share - Success_rate = automation_success_rate If config parameters are missing -> returns {}. """ if not self._has_cost_config(): return {} cfg = self.config assert cfg is not None if cfg.automation_cpi is None or cfg.automation_volume_share <= 0 or cfg.automation_success_rate <= 0: return {} cpi_table = self.annual_cost_by_skill_channel() if cpi_table.empty: return {} total_volume = cpi_table["volume"].sum() if total_volume <= 0: return {} # Weighted average human CPI weighted_cpi = ( (cpi_table["cpi_total"] * cpi_table["volume"]).sum() / total_volume ) volume_automatizable = total_volume * cfg.automation_volume_share effective_volume = volume_automatizable * cfg.automation_success_rate delta_cpi = max(0.0, weighted_cpi - cfg.automation_cpi) annual_savings = delta_cpi * effective_volume return { "cpi_humano": round(weighted_cpi, 4), "cpi_automatizado": round(cfg.automation_cpi, 4), "volume_total": float(total_volume), "volume_automatizable": float(volume_automatizable), "effective_volume": float(effective_volume), "annual_savings": round(annual_savings, 2), } # ------------------------------------------------------------------ # # PLOTS # ------------------------------------------------------------------ # def plot_cost_waterfall(self) -> Axes: """ Waterfall of annual costs (labor + tech + overhead). """ breakdown = self.cost_breakdown() if not breakdown: fig, ax = plt.subplots() ax.text(0.5, 0.5, "No cost configuration", ha="center", va="center") ax.set_axis_off() return ax labels = ["Labor", "Overhead", "Tech"] values = [ breakdown["labor_annual"], breakdown["overhead_annual"], breakdown["tech_annual"], ] fig, ax = plt.subplots(figsize=(8, 4)) running = 0.0 positions = [] bottoms = [] for v in values: positions.append(running) bottoms.append(running) running += v # waterfall style bars x = np.arange(len(labels)) ax.bar(x, values) ax.set_xticks(x) ax.set_xticklabels(labels) ax.set_ylabel("€ annual") ax.set_title("Annual cost breakdown") for idx, v in enumerate(values): ax.text(idx, v, f"{v:,.0f}", ha="center", va="bottom") ax.grid(axis="y", alpha=0.3) return ax def plot_cpi_by_channel(self) -> Axes: """ Bar chart of average CPI by channel. """ cpi_table = self.cpi_by_skill_channel() if cpi_table.empty: fig, ax = plt.subplots() ax.text(0.5, 0.5, "No cost configuration", ha="center", va="center") ax.set_axis_off() return ax df = self.df.copy() volume = ( df.groupby(["queue_skill", "channel"])["interaction_id"] .nunique() .rename("volume") ) # Set index on cpi_table to match volume's MultiIndex for join cpi_indexed = cpi_table.set_index(["queue_skill", "channel"]) joined = cpi_indexed.join(volume, how="left").fillna({"volume": 0}) # Weighted average CPI by channel per_channel = ( joined.reset_index() .groupby("channel") .apply(lambda g: (g["cpi_total"] * g["volume"]).sum() / max(g["volume"].sum(), 1)) .rename("cpi_mean") .round(4) ) fig, ax = plt.subplots(figsize=(6, 4)) per_channel.plot(kind="bar", ax=ax) ax.set_xlabel("Channel") ax.set_ylabel("Average CPI (€)") ax.set_title("Cost per interaction (CPI) by channel") ax.grid(axis="y", alpha=0.3) return ax