495 lines
17 KiB
Python
495 lines
17 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Dict, List, Optional, Any
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import matplotlib.pyplot as plt
|
|
from matplotlib.axes import Axes
|
|
|
|
|
|
REQUIRED_COLUMNS_ECON: List[str] = [
|
|
"interaction_id",
|
|
"datetime_start",
|
|
"queue_skill",
|
|
"channel",
|
|
"duration_talk",
|
|
"hold_time",
|
|
"wrap_up_time",
|
|
]
|
|
|
|
|
|
@dataclass
|
|
class EconomyConfig:
|
|
"""
|
|
Parámetros manuales para la dimensión de Economía y Costes.
|
|
|
|
- labor_cost_per_hour: coste total/hora de un agente (fully loaded).
|
|
- overhead_rate: % overhead variable (ej. 0.1 = 10% sobre labor).
|
|
- tech_costs_annual: coste anual de tecnología (licencias, infra, ...).
|
|
- automation_cpi: coste por interacción automatizada (ej. 0.15€).
|
|
- automation_volume_share: % del volumen automatizable (0-1).
|
|
- automation_success_rate: % éxito de la automatización (0-1).
|
|
|
|
- customer_segments: mapping opcional skill -> segmento ("high"/"medium"/"low")
|
|
para futuros insights de ROI por segmento.
|
|
"""
|
|
|
|
labor_cost_per_hour: float
|
|
overhead_rate: float = 0.0
|
|
tech_costs_annual: float = 0.0
|
|
automation_cpi: Optional[float] = None
|
|
automation_volume_share: float = 0.0
|
|
automation_success_rate: float = 0.0
|
|
customer_segments: Optional[Dict[str, str]] = None
|
|
|
|
|
|
@dataclass
|
|
class EconomyCostMetrics:
|
|
"""
|
|
DIMENSIÓN 4: ECONOMÍA y COSTES
|
|
|
|
Propósito:
|
|
- Cuantificar el COSTE actual (CPI, coste anual).
|
|
- Estimar el impacto de overhead y tecnología.
|
|
- Calcular un primer estimado de "coste de ineficiencia" y ahorro potencial.
|
|
|
|
Requiere:
|
|
- Columnas del dataset transaccional (ver REQUIRED_COLUMNS_ECON).
|
|
|
|
Inputs opcionales vía EconomyConfig:
|
|
- labor_cost_per_hour (obligatorio para cualquier cálculo de €).
|
|
- overhead_rate, tech_costs_annual, automation_*.
|
|
- customer_segments (para insights de ROI por segmento).
|
|
"""
|
|
|
|
df: pd.DataFrame
|
|
config: Optional[EconomyConfig] = None
|
|
|
|
def __post_init__(self) -> None:
|
|
self._validate_columns()
|
|
self._prepare_data()
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Helpers internos
|
|
# ------------------------------------------------------------------ #
|
|
def _validate_columns(self) -> None:
|
|
missing = [c for c in REQUIRED_COLUMNS_ECON if c not in self.df.columns]
|
|
if missing:
|
|
raise ValueError(
|
|
f"Faltan columnas obligatorias para EconomyCostMetrics: {missing}"
|
|
)
|
|
|
|
def _prepare_data(self) -> None:
|
|
df = self.df.copy()
|
|
|
|
df["datetime_start"] = pd.to_datetime(df["datetime_start"], errors="coerce")
|
|
|
|
for col in ["duration_talk", "hold_time", "wrap_up_time"]:
|
|
df[col] = pd.to_numeric(df[col], errors="coerce")
|
|
|
|
df["queue_skill"] = df["queue_skill"].astype(str).str.strip()
|
|
df["channel"] = df["channel"].astype(str).str.strip()
|
|
|
|
# Handle time = talk + hold + wrap
|
|
df["handle_time"] = (
|
|
df["duration_talk"].fillna(0)
|
|
+ df["hold_time"].fillna(0)
|
|
+ df["wrap_up_time"].fillna(0)
|
|
) # segundos
|
|
|
|
# Filtrar por record_status para cálculos de AHT/CPI
|
|
# Solo incluir registros VALID (excluir NOISE, ZOMBIE, ABANDON)
|
|
if "record_status" in df.columns:
|
|
df["record_status"] = df["record_status"].astype(str).str.strip().str.upper()
|
|
df["_is_valid_for_cost"] = df["record_status"] == "VALID"
|
|
else:
|
|
# Legacy data sin record_status: incluir todo
|
|
df["_is_valid_for_cost"] = True
|
|
|
|
self.df = df
|
|
|
|
@property
|
|
def is_empty(self) -> bool:
|
|
return self.df.empty
|
|
|
|
def _has_cost_config(self) -> bool:
|
|
return self.config is not None and self.config.labor_cost_per_hour is not None
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# KPI 1: CPI por canal/skill
|
|
# ------------------------------------------------------------------ #
|
|
def cpi_by_skill_channel(self) -> pd.DataFrame:
|
|
"""
|
|
CPI (Coste Por Interacción) por skill/canal.
|
|
|
|
CPI = (Labor_cost_per_interaction + Overhead_variable) / EFFECTIVE_PRODUCTIVITY
|
|
|
|
- Labor_cost_per_interaction = (labor_cost_per_hour * AHT_hours)
|
|
- Overhead_variable = overhead_rate * Labor_cost_per_interaction
|
|
- EFFECTIVE_PRODUCTIVITY = 0.70 (70% - accounts for non-productive time)
|
|
|
|
Excluye registros abandonados del cálculo de costes para consistencia
|
|
con el path del frontend (fresh CSV).
|
|
|
|
Si no hay config de costes -> devuelve DataFrame vacío.
|
|
|
|
Incluye queue_skill y channel como columnas (no solo índice) para que
|
|
el frontend pueda hacer lookup por nombre de skill.
|
|
"""
|
|
if not self._has_cost_config():
|
|
return pd.DataFrame()
|
|
|
|
cfg = self.config
|
|
assert cfg is not None # para el type checker
|
|
|
|
df = self.df.copy()
|
|
if df.empty:
|
|
return pd.DataFrame()
|
|
|
|
# Filter out abandonments for cost calculation (consistency with frontend)
|
|
if "is_abandoned" in df.columns:
|
|
df_cost = df[df["is_abandoned"] != True]
|
|
else:
|
|
df_cost = df
|
|
|
|
# Filtrar por record_status: solo VALID para cálculo de AHT
|
|
# Excluye NOISE, ZOMBIE, ABANDON
|
|
if "_is_valid_for_cost" in df_cost.columns:
|
|
df_cost = df_cost[df_cost["_is_valid_for_cost"] == True]
|
|
|
|
if df_cost.empty:
|
|
return pd.DataFrame()
|
|
|
|
# AHT por skill/canal (en segundos) - solo registros VALID
|
|
grouped = df_cost.groupby(["queue_skill", "channel"])["handle_time"].mean()
|
|
|
|
if grouped.empty:
|
|
return pd.DataFrame()
|
|
|
|
aht_sec = grouped
|
|
aht_hours = aht_sec / 3600.0
|
|
|
|
# Apply productivity factor (70% effectiveness)
|
|
# This accounts for non-productive agent time (breaks, training, etc.)
|
|
EFFECTIVE_PRODUCTIVITY = 0.70
|
|
|
|
labor_cost = cfg.labor_cost_per_hour * aht_hours
|
|
overhead = labor_cost * cfg.overhead_rate
|
|
raw_cpi = labor_cost + overhead
|
|
cpi = raw_cpi / EFFECTIVE_PRODUCTIVITY
|
|
|
|
out = pd.DataFrame(
|
|
{
|
|
"aht_seconds": aht_sec.round(2),
|
|
"labor_cost": labor_cost.round(4),
|
|
"overhead_cost": overhead.round(4),
|
|
"cpi_total": cpi.round(4),
|
|
}
|
|
)
|
|
|
|
# Reset index to include queue_skill and channel as columns for frontend lookup
|
|
return out.sort_index().reset_index()
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# KPI 2: coste anual por skill/canal
|
|
# ------------------------------------------------------------------ #
|
|
def annual_cost_by_skill_channel(self) -> pd.DataFrame:
|
|
"""
|
|
Coste anual por skill/canal.
|
|
|
|
cost_annual = CPI * volumen (cantidad de interacciones de la muestra).
|
|
|
|
Nota: por simplicidad asumimos que el dataset refleja un periodo anual.
|
|
Si en el futuro quieres anualizar (ej. dataset = 1 mes) se puede añadir
|
|
un factor de escalado en EconomyConfig.
|
|
"""
|
|
cpi_table = self.cpi_by_skill_channel()
|
|
if cpi_table.empty:
|
|
return pd.DataFrame()
|
|
|
|
df = self.df.copy()
|
|
volume = (
|
|
df.groupby(["queue_skill", "channel"])["interaction_id"]
|
|
.nunique()
|
|
.rename("volume")
|
|
)
|
|
|
|
# Set index on cpi_table to match volume's MultiIndex for join
|
|
cpi_indexed = cpi_table.set_index(["queue_skill", "channel"])
|
|
joined = cpi_indexed.join(volume, how="left").fillna({"volume": 0})
|
|
joined["annual_cost"] = (joined["cpi_total"] * joined["volume"]).round(2)
|
|
|
|
return joined
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# KPI 3: desglose de costes (labor / tech / overhead)
|
|
# ------------------------------------------------------------------ #
|
|
def cost_breakdown(self) -> Dict[str, float]:
|
|
"""
|
|
Desglose % de costes: labor, overhead, tech.
|
|
|
|
labor_total = sum(labor_cost_per_interaction)
|
|
overhead_total = labor_total * overhead_rate
|
|
tech_total = tech_costs_annual (si se ha proporcionado)
|
|
|
|
Devuelve porcentajes sobre el total.
|
|
Si falta configuración de coste -> devuelve {}.
|
|
"""
|
|
if not self._has_cost_config():
|
|
return {}
|
|
|
|
cfg = self.config
|
|
assert cfg is not None
|
|
|
|
cpi_table = self.cpi_by_skill_channel()
|
|
if cpi_table.empty:
|
|
return {}
|
|
|
|
df = self.df.copy()
|
|
volume = (
|
|
df.groupby(["queue_skill", "channel"])["interaction_id"]
|
|
.nunique()
|
|
.rename("volume")
|
|
)
|
|
|
|
# Set index on cpi_table to match volume's MultiIndex for join
|
|
cpi_indexed = cpi_table.set_index(["queue_skill", "channel"])
|
|
joined = cpi_indexed.join(volume, how="left").fillna({"volume": 0})
|
|
|
|
# Costes anuales de labor y overhead
|
|
annual_labor = (joined["labor_cost"] * joined["volume"]).sum()
|
|
annual_overhead = (joined["overhead_cost"] * joined["volume"]).sum()
|
|
annual_tech = cfg.tech_costs_annual
|
|
|
|
total = annual_labor + annual_overhead + annual_tech
|
|
if total <= 0:
|
|
return {}
|
|
|
|
return {
|
|
"labor_pct": round(annual_labor / total * 100, 2),
|
|
"overhead_pct": round(annual_overhead / total * 100, 2),
|
|
"tech_pct": round(annual_tech / total * 100, 2),
|
|
"labor_annual": round(annual_labor, 2),
|
|
"overhead_annual": round(annual_overhead, 2),
|
|
"tech_annual": round(annual_tech, 2),
|
|
"total_annual": round(total, 2),
|
|
}
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# KPI 4: coste de ineficiencia (€ por variabilidad/escalación)
|
|
# ------------------------------------------------------------------ #
|
|
def inefficiency_cost_by_skill_channel(self) -> pd.DataFrame:
|
|
"""
|
|
Estimación muy simplificada de coste de ineficiencia:
|
|
|
|
Para cada skill/canal:
|
|
|
|
- AHT_p50, AHT_p90 (segundos).
|
|
- Delta = max(0, AHT_p90 - AHT_p50).
|
|
- Se asume que ~40% de las interacciones están por encima de la mediana.
|
|
- Ineff_seconds = Delta * volume * 0.4
|
|
- Ineff_cost = LaborCPI_per_second * Ineff_seconds
|
|
|
|
NOTA: Es un modelo aproximado para cuantificar "orden de magnitud".
|
|
"""
|
|
if not self._has_cost_config():
|
|
return pd.DataFrame()
|
|
|
|
cfg = self.config
|
|
assert cfg is not None
|
|
|
|
df = self.df.copy()
|
|
|
|
# Filtrar por record_status: solo VALID para cálculo de AHT
|
|
# Excluye NOISE, ZOMBIE, ABANDON
|
|
if "_is_valid_for_cost" in df.columns:
|
|
df = df[df["_is_valid_for_cost"] == True]
|
|
|
|
grouped = df.groupby(["queue_skill", "channel"])
|
|
|
|
stats = grouped["handle_time"].agg(
|
|
aht_p50=lambda s: float(np.percentile(s.dropna(), 50)),
|
|
aht_p90=lambda s: float(np.percentile(s.dropna(), 90)),
|
|
volume="count",
|
|
)
|
|
|
|
if stats.empty:
|
|
return pd.DataFrame()
|
|
|
|
# CPI para obtener coste/segundo de labor
|
|
# cpi_by_skill_channel now returns with reset_index, so we need to set index for join
|
|
cpi_table_raw = self.cpi_by_skill_channel()
|
|
if cpi_table_raw.empty:
|
|
return pd.DataFrame()
|
|
|
|
# Set queue_skill+channel as index for the join
|
|
cpi_table = cpi_table_raw.set_index(["queue_skill", "channel"])
|
|
|
|
merged = stats.join(cpi_table[["labor_cost"]], how="left")
|
|
merged = merged.fillna(0.0)
|
|
|
|
delta = (merged["aht_p90"] - merged["aht_p50"]).clip(lower=0.0)
|
|
affected_fraction = 0.4 # aproximación
|
|
ineff_seconds = delta * merged["volume"] * affected_fraction
|
|
|
|
# labor_cost = coste por interacción con AHT medio;
|
|
# aproximamos coste/segundo como labor_cost / AHT_medio
|
|
aht_mean = grouped["handle_time"].mean()
|
|
merged["aht_mean"] = aht_mean
|
|
|
|
cost_per_second = merged["labor_cost"] / merged["aht_mean"].replace(0, np.nan)
|
|
cost_per_second = cost_per_second.fillna(0.0)
|
|
|
|
ineff_cost = (ineff_seconds * cost_per_second).round(2)
|
|
|
|
merged["ineff_seconds"] = ineff_seconds.round(2)
|
|
merged["ineff_cost"] = ineff_cost
|
|
|
|
# Reset index to include queue_skill and channel as columns for frontend lookup
|
|
return merged[["aht_p50", "aht_p90", "volume", "ineff_seconds", "ineff_cost"]].reset_index()
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# KPI 5: ahorro potencial anual por automatización
|
|
# ------------------------------------------------------------------ #
|
|
def potential_savings(self) -> Dict[str, Any]:
|
|
"""
|
|
Ahorro potencial anual basado en:
|
|
|
|
Ahorro = (CPI_humano - CPI_automatizado) * Volumen_automatizable * Tasa_éxito
|
|
|
|
Donde:
|
|
- CPI_humano = media ponderada de cpi_total.
|
|
- CPI_automatizado = config.automation_cpi
|
|
- Volumen_automatizable = volume_total * automation_volume_share
|
|
- Tasa_éxito = automation_success_rate
|
|
|
|
Si faltan parámetros en config -> devuelve {}.
|
|
"""
|
|
if not self._has_cost_config():
|
|
return {}
|
|
|
|
cfg = self.config
|
|
assert cfg is not None
|
|
|
|
if cfg.automation_cpi is None or cfg.automation_volume_share <= 0 or cfg.automation_success_rate <= 0:
|
|
return {}
|
|
|
|
cpi_table = self.annual_cost_by_skill_channel()
|
|
if cpi_table.empty:
|
|
return {}
|
|
|
|
total_volume = cpi_table["volume"].sum()
|
|
if total_volume <= 0:
|
|
return {}
|
|
|
|
# CPI humano medio ponderado
|
|
weighted_cpi = (
|
|
(cpi_table["cpi_total"] * cpi_table["volume"]).sum() / total_volume
|
|
)
|
|
|
|
volume_automatizable = total_volume * cfg.automation_volume_share
|
|
effective_volume = volume_automatizable * cfg.automation_success_rate
|
|
|
|
delta_cpi = max(0.0, weighted_cpi - cfg.automation_cpi)
|
|
annual_savings = delta_cpi * effective_volume
|
|
|
|
return {
|
|
"cpi_humano": round(weighted_cpi, 4),
|
|
"cpi_automatizado": round(cfg.automation_cpi, 4),
|
|
"volume_total": float(total_volume),
|
|
"volume_automatizable": float(volume_automatizable),
|
|
"effective_volume": float(effective_volume),
|
|
"annual_savings": round(annual_savings, 2),
|
|
}
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# PLOTS
|
|
# ------------------------------------------------------------------ #
|
|
def plot_cost_waterfall(self) -> Axes:
|
|
"""
|
|
Waterfall de costes anuales (labor + tech + overhead).
|
|
"""
|
|
breakdown = self.cost_breakdown()
|
|
if not breakdown:
|
|
fig, ax = plt.subplots()
|
|
ax.text(0.5, 0.5, "Sin configuración de costes", ha="center", va="center")
|
|
ax.set_axis_off()
|
|
return ax
|
|
|
|
labels = ["Labor", "Overhead", "Tech"]
|
|
values = [
|
|
breakdown["labor_annual"],
|
|
breakdown["overhead_annual"],
|
|
breakdown["tech_annual"],
|
|
]
|
|
|
|
fig, ax = plt.subplots(figsize=(8, 4))
|
|
|
|
running = 0.0
|
|
positions = []
|
|
bottoms = []
|
|
|
|
for v in values:
|
|
positions.append(running)
|
|
bottoms.append(running)
|
|
running += v
|
|
|
|
# barras estilo waterfall
|
|
x = np.arange(len(labels))
|
|
ax.bar(x, values)
|
|
|
|
ax.set_xticks(x)
|
|
ax.set_xticklabels(labels)
|
|
ax.set_ylabel("€ anuales")
|
|
ax.set_title("Desglose anual de costes")
|
|
|
|
for idx, v in enumerate(values):
|
|
ax.text(idx, v, f"{v:,.0f}", ha="center", va="bottom")
|
|
|
|
ax.grid(axis="y", alpha=0.3)
|
|
|
|
return ax
|
|
|
|
def plot_cpi_by_channel(self) -> Axes:
|
|
"""
|
|
Gráfico de barras de CPI medio por canal.
|
|
"""
|
|
cpi_table = self.cpi_by_skill_channel()
|
|
if cpi_table.empty:
|
|
fig, ax = plt.subplots()
|
|
ax.text(0.5, 0.5, "Sin configuración de costes", ha="center", va="center")
|
|
ax.set_axis_off()
|
|
return ax
|
|
|
|
df = self.df.copy()
|
|
volume = (
|
|
df.groupby(["queue_skill", "channel"])["interaction_id"]
|
|
.nunique()
|
|
.rename("volume")
|
|
)
|
|
|
|
# Set index on cpi_table to match volume's MultiIndex for join
|
|
cpi_indexed = cpi_table.set_index(["queue_skill", "channel"])
|
|
joined = cpi_indexed.join(volume, how="left").fillna({"volume": 0})
|
|
|
|
# CPI medio ponderado por canal
|
|
per_channel = (
|
|
joined.reset_index()
|
|
.groupby("channel")
|
|
.apply(lambda g: (g["cpi_total"] * g["volume"]).sum() / max(g["volume"].sum(), 1))
|
|
.rename("cpi_mean")
|
|
.round(4)
|
|
)
|
|
|
|
fig, ax = plt.subplots(figsize=(6, 4))
|
|
per_channel.plot(kind="bar", ax=ax)
|
|
|
|
ax.set_xlabel("Canal")
|
|
ax.set_ylabel("CPI medio (€)")
|
|
ax.set_title("Coste por interacción (CPI) por canal")
|
|
ax.grid(axis="y", alpha=0.3)
|
|
|
|
return ax
|