Phase 2 of Spanish-to-English translation for medium-priority files: Frontend utils (2 files): - dataTransformation.ts: Translated ~72 occurrences (comments, docs, console logs) - segmentClassifier.ts: Translated ~20 occurrences (JSDoc, inline comments, UI strings) Backend dimensions (3 files): - OperationalPerformance.py: Translated ~117 lines (docstrings, comments) - SatisfactionExperience.py: Translated ~33 lines (docstrings, comments) - EconomyCost.py: Translated ~79 lines (docstrings, comments) All function names and variable names preserved for API compatibility. Frontend and backend compilation tested and verified successful. Related to TRANSLATION_STATUS.md Phase 2 objectives. https://claude.ai/code/session_01GNbnkFoESkRcnPr3bLCYDg
491 lines
17 KiB
Python
491 lines
17 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Dict, List, Optional, Any
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import matplotlib.pyplot as plt
|
|
from matplotlib.axes import Axes
|
|
|
|
|
|
REQUIRED_COLUMNS_ECON: List[str] = [
|
|
"interaction_id",
|
|
"datetime_start",
|
|
"queue_skill",
|
|
"channel",
|
|
"duration_talk",
|
|
"hold_time",
|
|
"wrap_up_time",
|
|
]
|
|
|
|
|
|
@dataclass
|
|
class EconomyConfig:
|
|
"""
|
|
Manual parameters for the Economy and Cost dimension.
|
|
|
|
- labor_cost_per_hour: total cost/hour of an agent (fully loaded).
|
|
- overhead_rate: % variable overhead (e.g. 0.1 = 10% over labor).
|
|
- tech_costs_annual: annual technology cost (licenses, infrastructure, ...).
|
|
- automation_cpi: cost per automated interaction (e.g. 0.15€).
|
|
- automation_volume_share: % of automatable volume (0-1).
|
|
- automation_success_rate: % automation success (0-1).
|
|
|
|
- customer_segments: optional mapping skill -> segment ("high"/"medium"/"low") for future ROI insights by segment.
|
|
"""
|
|
|
|
labor_cost_per_hour: float
|
|
overhead_rate: float = 0.0
|
|
tech_costs_annual: float = 0.0
|
|
automation_cpi: Optional[float] = None
|
|
automation_volume_share: float = 0.0
|
|
automation_success_rate: float = 0.0
|
|
customer_segments: Optional[Dict[str, str]] = None
|
|
|
|
|
|
@dataclass
|
|
class EconomyCostMetrics:
|
|
"""
|
|
DIMENSION 4: ECONOMY and COSTS
|
|
|
|
Purpose:
|
|
- Quantify the current COST (CPI, annual cost).
|
|
- Estimate the impact of overhead and technology.
|
|
- Calculate an initial estimate of "inefficiency cost" and potential savings.
|
|
|
|
Requires:
|
|
- Columns from the transactional dataset (see REQUIRED_COLUMNS_ECON).
|
|
|
|
Optional inputs via EconomyConfig:
|
|
- labor_cost_per_hour (required for any € calculation).
|
|
- overhead_rate, tech_costs_annual, automation_*.
|
|
- customer_segments (for ROI insights by segment).
|
|
"""
|
|
|
|
df: pd.DataFrame
|
|
config: Optional[EconomyConfig] = None
|
|
|
|
def __post_init__(self) -> None:
|
|
self._validate_columns()
|
|
self._prepare_data()
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Internal helpers
|
|
# ------------------------------------------------------------------ #
|
|
def _validate_columns(self) -> None:
|
|
missing = [c for c in REQUIRED_COLUMNS_ECON if c not in self.df.columns]
|
|
if missing:
|
|
raise ValueError(
|
|
f"Missing required columns for EconomyCostMetrics: {missing}"
|
|
)
|
|
|
|
def _prepare_data(self) -> None:
|
|
df = self.df.copy()
|
|
|
|
df["datetime_start"] = pd.to_datetime(df["datetime_start"], errors="coerce")
|
|
|
|
for col in ["duration_talk", "hold_time", "wrap_up_time"]:
|
|
df[col] = pd.to_numeric(df[col], errors="coerce")
|
|
|
|
df["queue_skill"] = df["queue_skill"].astype(str).str.strip()
|
|
df["channel"] = df["channel"].astype(str).str.strip()
|
|
|
|
# Handle time = talk + hold + wrap
|
|
df["handle_time"] = (
|
|
df["duration_talk"].fillna(0)
|
|
+ df["hold_time"].fillna(0)
|
|
+ df["wrap_up_time"].fillna(0)
|
|
) # seconds
|
|
|
|
# Filter by record_status for AHT/CPI calculations
|
|
# Only include VALID records (exclude NOISE, ZOMBIE, ABANDON)
|
|
if "record_status" in df.columns:
|
|
df["record_status"] = df["record_status"].astype(str).str.strip().str.upper()
|
|
df["_is_valid_for_cost"] = df["record_status"] == "VALID"
|
|
else:
|
|
# Legacy data without record_status: include all
|
|
df["_is_valid_for_cost"] = True
|
|
|
|
self.df = df
|
|
|
|
@property
|
|
def is_empty(self) -> bool:
|
|
return self.df.empty
|
|
|
|
def _has_cost_config(self) -> bool:
|
|
return self.config is not None and self.config.labor_cost_per_hour is not None
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# KPI 1: CPI by channel/skill
|
|
# ------------------------------------------------------------------ #
|
|
def cpi_by_skill_channel(self) -> pd.DataFrame:
|
|
"""
|
|
CPI (Cost Per Interaction) by skill/channel.
|
|
|
|
CPI = (Labor_cost_per_interaction + Overhead_variable) / EFFECTIVE_PRODUCTIVITY
|
|
|
|
- Labor_cost_per_interaction = (labor_cost_per_hour * AHT_hours)
|
|
- Overhead_variable = overhead_rate * Labor_cost_per_interaction
|
|
- EFFECTIVE_PRODUCTIVITY = 0.70 (70% - accounts for non-productive time)
|
|
|
|
Excludes abandoned records from cost calculation for consistency with the frontend path (fresh CSV).
|
|
|
|
If there is no cost config -> returns empty DataFrame.
|
|
|
|
Includes queue_skill and channel as columns (not just index) so that the frontend can lookup by skill name.
|
|
"""
|
|
if not self._has_cost_config():
|
|
return pd.DataFrame()
|
|
|
|
cfg = self.config
|
|
assert cfg is not None # for the type checker
|
|
|
|
df = self.df.copy()
|
|
if df.empty:
|
|
return pd.DataFrame()
|
|
|
|
# Filter out abandonments for cost calculation (consistency with frontend)
|
|
if "is_abandoned" in df.columns:
|
|
df_cost = df[df["is_abandoned"] != True]
|
|
else:
|
|
df_cost = df
|
|
|
|
# Filter by record_status: only VALID for AHT calculation
|
|
# Excludes NOISE, ZOMBIE, ABANDON
|
|
if "_is_valid_for_cost" in df_cost.columns:
|
|
df_cost = df_cost[df_cost["_is_valid_for_cost"] == True]
|
|
|
|
if df_cost.empty:
|
|
return pd.DataFrame()
|
|
|
|
# AHT by skill/channel (in seconds) - only VALID records
|
|
grouped = df_cost.groupby(["queue_skill", "channel"])["handle_time"].mean()
|
|
|
|
if grouped.empty:
|
|
return pd.DataFrame()
|
|
|
|
aht_sec = grouped
|
|
aht_hours = aht_sec / 3600.0
|
|
|
|
# Apply productivity factor (70% effectiveness)
|
|
# This accounts for non-productive agent time (breaks, training, etc.)
|
|
EFFECTIVE_PRODUCTIVITY = 0.70
|
|
|
|
labor_cost = cfg.labor_cost_per_hour * aht_hours
|
|
overhead = labor_cost * cfg.overhead_rate
|
|
raw_cpi = labor_cost + overhead
|
|
cpi = raw_cpi / EFFECTIVE_PRODUCTIVITY
|
|
|
|
out = pd.DataFrame(
|
|
{
|
|
"aht_seconds": aht_sec.round(2),
|
|
"labor_cost": labor_cost.round(4),
|
|
"overhead_cost": overhead.round(4),
|
|
"cpi_total": cpi.round(4),
|
|
}
|
|
)
|
|
|
|
# Reset index to include queue_skill and channel as columns for frontend lookup
|
|
return out.sort_index().reset_index()
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# KPI 2: annual cost by skill/channel
|
|
# ------------------------------------------------------------------ #
|
|
def annual_cost_by_skill_channel(self) -> pd.DataFrame:
|
|
"""
|
|
Annual cost by skill/channel.
|
|
|
|
cost_annual = CPI * volume (number of interactions in the sample).
|
|
|
|
Note: for simplicity we assume the dataset reflects an annual period.
|
|
If in the future you want to annualize (e.g. dataset = 1 month) you can add a scaling factor in EconomyConfig.
|
|
"""
|
|
cpi_table = self.cpi_by_skill_channel()
|
|
if cpi_table.empty:
|
|
return pd.DataFrame()
|
|
|
|
df = self.df.copy()
|
|
volume = (
|
|
df.groupby(["queue_skill", "channel"])["interaction_id"]
|
|
.nunique()
|
|
.rename("volume")
|
|
)
|
|
|
|
# Set index on cpi_table to match volume's MultiIndex for join
|
|
cpi_indexed = cpi_table.set_index(["queue_skill", "channel"])
|
|
joined = cpi_indexed.join(volume, how="left").fillna({"volume": 0})
|
|
joined["annual_cost"] = (joined["cpi_total"] * joined["volume"]).round(2)
|
|
|
|
return joined
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# KPI 3: cost breakdown (labor / tech / overhead)
|
|
# ------------------------------------------------------------------ #
|
|
def cost_breakdown(self) -> Dict[str, float]:
|
|
"""
|
|
Cost breakdown %: labor, overhead, tech.
|
|
|
|
labor_total = sum(labor_cost_per_interaction)
|
|
overhead_total = labor_total * overhead_rate
|
|
tech_total = tech_costs_annual (if provided)
|
|
|
|
Returns percentages of the total.
|
|
If cost configuration is missing -> returns {}.
|
|
"""
|
|
if not self._has_cost_config():
|
|
return {}
|
|
|
|
cfg = self.config
|
|
assert cfg is not None
|
|
|
|
cpi_table = self.cpi_by_skill_channel()
|
|
if cpi_table.empty:
|
|
return {}
|
|
|
|
df = self.df.copy()
|
|
volume = (
|
|
df.groupby(["queue_skill", "channel"])["interaction_id"]
|
|
.nunique()
|
|
.rename("volume")
|
|
)
|
|
|
|
# Set index on cpi_table to match volume's MultiIndex for join
|
|
cpi_indexed = cpi_table.set_index(["queue_skill", "channel"])
|
|
joined = cpi_indexed.join(volume, how="left").fillna({"volume": 0})
|
|
|
|
# Annual labor and overhead costs
|
|
annual_labor = (joined["labor_cost"] * joined["volume"]).sum()
|
|
annual_overhead = (joined["overhead_cost"] * joined["volume"]).sum()
|
|
annual_tech = cfg.tech_costs_annual
|
|
|
|
total = annual_labor + annual_overhead + annual_tech
|
|
if total <= 0:
|
|
return {}
|
|
|
|
return {
|
|
"labor_pct": round(annual_labor / total * 100, 2),
|
|
"overhead_pct": round(annual_overhead / total * 100, 2),
|
|
"tech_pct": round(annual_tech / total * 100, 2),
|
|
"labor_annual": round(annual_labor, 2),
|
|
"overhead_annual": round(annual_overhead, 2),
|
|
"tech_annual": round(annual_tech, 2),
|
|
"total_annual": round(total, 2),
|
|
}
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# KPI 4: inefficiency cost (€ by variability/escalation)
|
|
# ------------------------------------------------------------------ #
|
|
def inefficiency_cost_by_skill_channel(self) -> pd.DataFrame:
|
|
"""
|
|
Very simplified estimate of inefficiency cost:
|
|
|
|
For each skill/channel:
|
|
|
|
- AHT_p50, AHT_p90 (seconds).
|
|
- Delta = max(0, AHT_p90 - AHT_p50).
|
|
- Assumes that ~40% of interactions are above the median.
|
|
- Ineff_seconds = Delta * volume * 0.4
|
|
- Ineff_cost = LaborCPI_per_second * Ineff_seconds
|
|
|
|
NOTE: This is an approximate model to quantify "order of magnitude".
|
|
"""
|
|
if not self._has_cost_config():
|
|
return pd.DataFrame()
|
|
|
|
cfg = self.config
|
|
assert cfg is not None
|
|
|
|
df = self.df.copy()
|
|
|
|
# Filter by record_status: only VALID for AHT calculation
|
|
# Excludes NOISE, ZOMBIE, ABANDON
|
|
if "_is_valid_for_cost" in df.columns:
|
|
df = df[df["_is_valid_for_cost"] == True]
|
|
|
|
grouped = df.groupby(["queue_skill", "channel"])
|
|
|
|
stats = grouped["handle_time"].agg(
|
|
aht_p50=lambda s: float(np.percentile(s.dropna(), 50)),
|
|
aht_p90=lambda s: float(np.percentile(s.dropna(), 90)),
|
|
volume="count",
|
|
)
|
|
|
|
if stats.empty:
|
|
return pd.DataFrame()
|
|
|
|
# CPI to get cost/second of labor
|
|
# cpi_by_skill_channel now returns with reset_index, so we need to set index for join
|
|
cpi_table_raw = self.cpi_by_skill_channel()
|
|
if cpi_table_raw.empty:
|
|
return pd.DataFrame()
|
|
|
|
# Set queue_skill+channel as index for the join
|
|
cpi_table = cpi_table_raw.set_index(["queue_skill", "channel"])
|
|
|
|
merged = stats.join(cpi_table[["labor_cost"]], how="left")
|
|
merged = merged.fillna(0.0)
|
|
|
|
delta = (merged["aht_p90"] - merged["aht_p50"]).clip(lower=0.0)
|
|
affected_fraction = 0.4 # approximation
|
|
ineff_seconds = delta * merged["volume"] * affected_fraction
|
|
|
|
# labor_cost = cost per interaction with average AHT;
|
|
# approximate cost/second as labor_cost / average_AHT
|
|
aht_mean = grouped["handle_time"].mean()
|
|
merged["aht_mean"] = aht_mean
|
|
|
|
cost_per_second = merged["labor_cost"] / merged["aht_mean"].replace(0, np.nan)
|
|
cost_per_second = cost_per_second.fillna(0.0)
|
|
|
|
ineff_cost = (ineff_seconds * cost_per_second).round(2)
|
|
|
|
merged["ineff_seconds"] = ineff_seconds.round(2)
|
|
merged["ineff_cost"] = ineff_cost
|
|
|
|
# Reset index to include queue_skill and channel as columns for frontend lookup
|
|
return merged[["aht_p50", "aht_p90", "volume", "ineff_seconds", "ineff_cost"]].reset_index()
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# KPI 5: potential annual savings from automation
|
|
# ------------------------------------------------------------------ #
|
|
def potential_savings(self) -> Dict[str, Any]:
|
|
"""
|
|
Potential annual savings based on:
|
|
|
|
Savings = (Human_CPI - Automated_CPI) * Automatable_volume * Success_rate
|
|
|
|
Where:
|
|
- Human_CPI = weighted average of cpi_total.
|
|
- Automated_CPI = config.automation_cpi
|
|
- Automatable_volume = volume_total * automation_volume_share
|
|
- Success_rate = automation_success_rate
|
|
|
|
If config parameters are missing -> returns {}.
|
|
"""
|
|
if not self._has_cost_config():
|
|
return {}
|
|
|
|
cfg = self.config
|
|
assert cfg is not None
|
|
|
|
if cfg.automation_cpi is None or cfg.automation_volume_share <= 0 or cfg.automation_success_rate <= 0:
|
|
return {}
|
|
|
|
cpi_table = self.annual_cost_by_skill_channel()
|
|
if cpi_table.empty:
|
|
return {}
|
|
|
|
total_volume = cpi_table["volume"].sum()
|
|
if total_volume <= 0:
|
|
return {}
|
|
|
|
# Weighted average human CPI
|
|
weighted_cpi = (
|
|
(cpi_table["cpi_total"] * cpi_table["volume"]).sum() / total_volume
|
|
)
|
|
|
|
volume_automatizable = total_volume * cfg.automation_volume_share
|
|
effective_volume = volume_automatizable * cfg.automation_success_rate
|
|
|
|
delta_cpi = max(0.0, weighted_cpi - cfg.automation_cpi)
|
|
annual_savings = delta_cpi * effective_volume
|
|
|
|
return {
|
|
"cpi_humano": round(weighted_cpi, 4),
|
|
"cpi_automatizado": round(cfg.automation_cpi, 4),
|
|
"volume_total": float(total_volume),
|
|
"volume_automatizable": float(volume_automatizable),
|
|
"effective_volume": float(effective_volume),
|
|
"annual_savings": round(annual_savings, 2),
|
|
}
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# PLOTS
|
|
# ------------------------------------------------------------------ #
|
|
def plot_cost_waterfall(self) -> Axes:
|
|
"""
|
|
Waterfall of annual costs (labor + tech + overhead).
|
|
"""
|
|
breakdown = self.cost_breakdown()
|
|
if not breakdown:
|
|
fig, ax = plt.subplots()
|
|
ax.text(0.5, 0.5, "No cost configuration", ha="center", va="center")
|
|
ax.set_axis_off()
|
|
return ax
|
|
|
|
labels = ["Labor", "Overhead", "Tech"]
|
|
values = [
|
|
breakdown["labor_annual"],
|
|
breakdown["overhead_annual"],
|
|
breakdown["tech_annual"],
|
|
]
|
|
|
|
fig, ax = plt.subplots(figsize=(8, 4))
|
|
|
|
running = 0.0
|
|
positions = []
|
|
bottoms = []
|
|
|
|
for v in values:
|
|
positions.append(running)
|
|
bottoms.append(running)
|
|
running += v
|
|
|
|
# waterfall style bars
|
|
x = np.arange(len(labels))
|
|
ax.bar(x, values)
|
|
|
|
ax.set_xticks(x)
|
|
ax.set_xticklabels(labels)
|
|
ax.set_ylabel("€ annual")
|
|
ax.set_title("Annual cost breakdown")
|
|
|
|
for idx, v in enumerate(values):
|
|
ax.text(idx, v, f"{v:,.0f}", ha="center", va="bottom")
|
|
|
|
ax.grid(axis="y", alpha=0.3)
|
|
|
|
return ax
|
|
|
|
def plot_cpi_by_channel(self) -> Axes:
|
|
"""
|
|
Bar chart of average CPI by channel.
|
|
"""
|
|
cpi_table = self.cpi_by_skill_channel()
|
|
if cpi_table.empty:
|
|
fig, ax = plt.subplots()
|
|
ax.text(0.5, 0.5, "No cost configuration", ha="center", va="center")
|
|
ax.set_axis_off()
|
|
return ax
|
|
|
|
df = self.df.copy()
|
|
volume = (
|
|
df.groupby(["queue_skill", "channel"])["interaction_id"]
|
|
.nunique()
|
|
.rename("volume")
|
|
)
|
|
|
|
# Set index on cpi_table to match volume's MultiIndex for join
|
|
cpi_indexed = cpi_table.set_index(["queue_skill", "channel"])
|
|
joined = cpi_indexed.join(volume, how="left").fillna({"volume": 0})
|
|
|
|
# Weighted average CPI by channel
|
|
per_channel = (
|
|
joined.reset_index()
|
|
.groupby("channel")
|
|
.apply(lambda g: (g["cpi_total"] * g["volume"]).sum() / max(g["volume"].sum(), 1))
|
|
.rename("cpi_mean")
|
|
.round(4)
|
|
)
|
|
|
|
fig, ax = plt.subplots(figsize=(6, 4))
|
|
per_channel.plot(kind="bar", ax=ax)
|
|
|
|
ax.set_xlabel("Channel")
|
|
ax.set_ylabel("Average CPI (€)")
|
|
ax.set_title("Cost per interaction (CPI) by channel")
|
|
ax.grid(axis="y", alpha=0.3)
|
|
|
|
return ax
|