Files
BeyondCXAnalytics-Demo/backend/beyond_metrics/dimensions/EconomyCost.py
Claude 8c7f5fa827 Translate Phase 2 medium-priority files (frontend utils + backend dimensions)
Phase 2 of Spanish-to-English translation for medium-priority files:

Frontend utils (2 files):
- dataTransformation.ts: Translated ~72 occurrences (comments, docs, console logs)
- segmentClassifier.ts: Translated ~20 occurrences (JSDoc, inline comments, UI strings)

Backend dimensions (3 files):
- OperationalPerformance.py: Translated ~117 lines (docstrings, comments)
- SatisfactionExperience.py: Translated ~33 lines (docstrings, comments)
- EconomyCost.py: Translated ~79 lines (docstrings, comments)

All function names and variable names preserved for API compatibility.
Frontend and backend compilation tested and verified successful.

Related to TRANSLATION_STATUS.md Phase 2 objectives.

https://claude.ai/code/session_01GNbnkFoESkRcnPr3bLCYDg
2026-02-07 11:03:00 +00:00

491 lines
17 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.axes import Axes
REQUIRED_COLUMNS_ECON: List[str] = [
"interaction_id",
"datetime_start",
"queue_skill",
"channel",
"duration_talk",
"hold_time",
"wrap_up_time",
]
@dataclass
class EconomyConfig:
"""
Manual parameters for the Economy and Cost dimension.
- labor_cost_per_hour: total cost/hour of an agent (fully loaded).
- overhead_rate: % variable overhead (e.g. 0.1 = 10% over labor).
- tech_costs_annual: annual technology cost (licenses, infrastructure, ...).
- automation_cpi: cost per automated interaction (e.g. 0.15€).
- automation_volume_share: % of automatable volume (0-1).
- automation_success_rate: % automation success (0-1).
- customer_segments: optional mapping skill -> segment ("high"/"medium"/"low") for future ROI insights by segment.
"""
labor_cost_per_hour: float
overhead_rate: float = 0.0
tech_costs_annual: float = 0.0
automation_cpi: Optional[float] = None
automation_volume_share: float = 0.0
automation_success_rate: float = 0.0
customer_segments: Optional[Dict[str, str]] = None
@dataclass
class EconomyCostMetrics:
"""
DIMENSION 4: ECONOMY and COSTS
Purpose:
- Quantify the current COST (CPI, annual cost).
- Estimate the impact of overhead and technology.
- Calculate an initial estimate of "inefficiency cost" and potential savings.
Requires:
- Columns from the transactional dataset (see REQUIRED_COLUMNS_ECON).
Optional inputs via EconomyConfig:
- labor_cost_per_hour (required for any € calculation).
- overhead_rate, tech_costs_annual, automation_*.
- customer_segments (for ROI insights by segment).
"""
df: pd.DataFrame
config: Optional[EconomyConfig] = None
def __post_init__(self) -> None:
self._validate_columns()
self._prepare_data()
# ------------------------------------------------------------------ #
# Internal helpers
# ------------------------------------------------------------------ #
def _validate_columns(self) -> None:
missing = [c for c in REQUIRED_COLUMNS_ECON if c not in self.df.columns]
if missing:
raise ValueError(
f"Missing required columns for EconomyCostMetrics: {missing}"
)
def _prepare_data(self) -> None:
df = self.df.copy()
df["datetime_start"] = pd.to_datetime(df["datetime_start"], errors="coerce")
for col in ["duration_talk", "hold_time", "wrap_up_time"]:
df[col] = pd.to_numeric(df[col], errors="coerce")
df["queue_skill"] = df["queue_skill"].astype(str).str.strip()
df["channel"] = df["channel"].astype(str).str.strip()
# Handle time = talk + hold + wrap
df["handle_time"] = (
df["duration_talk"].fillna(0)
+ df["hold_time"].fillna(0)
+ df["wrap_up_time"].fillna(0)
) # seconds
# Filter by record_status for AHT/CPI calculations
# Only include VALID records (exclude NOISE, ZOMBIE, ABANDON)
if "record_status" in df.columns:
df["record_status"] = df["record_status"].astype(str).str.strip().str.upper()
df["_is_valid_for_cost"] = df["record_status"] == "VALID"
else:
# Legacy data without record_status: include all
df["_is_valid_for_cost"] = True
self.df = df
@property
def is_empty(self) -> bool:
return self.df.empty
def _has_cost_config(self) -> bool:
return self.config is not None and self.config.labor_cost_per_hour is not None
# ------------------------------------------------------------------ #
# KPI 1: CPI by channel/skill
# ------------------------------------------------------------------ #
def cpi_by_skill_channel(self) -> pd.DataFrame:
"""
CPI (Cost Per Interaction) by skill/channel.
CPI = (Labor_cost_per_interaction + Overhead_variable) / EFFECTIVE_PRODUCTIVITY
- Labor_cost_per_interaction = (labor_cost_per_hour * AHT_hours)
- Overhead_variable = overhead_rate * Labor_cost_per_interaction
- EFFECTIVE_PRODUCTIVITY = 0.70 (70% - accounts for non-productive time)
Excludes abandoned records from cost calculation for consistency with the frontend path (fresh CSV).
If there is no cost config -> returns empty DataFrame.
Includes queue_skill and channel as columns (not just index) so that the frontend can lookup by skill name.
"""
if not self._has_cost_config():
return pd.DataFrame()
cfg = self.config
assert cfg is not None # for the type checker
df = self.df.copy()
if df.empty:
return pd.DataFrame()
# Filter out abandonments for cost calculation (consistency with frontend)
if "is_abandoned" in df.columns:
df_cost = df[df["is_abandoned"] != True]
else:
df_cost = df
# Filter by record_status: only VALID for AHT calculation
# Excludes NOISE, ZOMBIE, ABANDON
if "_is_valid_for_cost" in df_cost.columns:
df_cost = df_cost[df_cost["_is_valid_for_cost"] == True]
if df_cost.empty:
return pd.DataFrame()
# AHT by skill/channel (in seconds) - only VALID records
grouped = df_cost.groupby(["queue_skill", "channel"])["handle_time"].mean()
if grouped.empty:
return pd.DataFrame()
aht_sec = grouped
aht_hours = aht_sec / 3600.0
# Apply productivity factor (70% effectiveness)
# This accounts for non-productive agent time (breaks, training, etc.)
EFFECTIVE_PRODUCTIVITY = 0.70
labor_cost = cfg.labor_cost_per_hour * aht_hours
overhead = labor_cost * cfg.overhead_rate
raw_cpi = labor_cost + overhead
cpi = raw_cpi / EFFECTIVE_PRODUCTIVITY
out = pd.DataFrame(
{
"aht_seconds": aht_sec.round(2),
"labor_cost": labor_cost.round(4),
"overhead_cost": overhead.round(4),
"cpi_total": cpi.round(4),
}
)
# Reset index to include queue_skill and channel as columns for frontend lookup
return out.sort_index().reset_index()
# ------------------------------------------------------------------ #
# KPI 2: annual cost by skill/channel
# ------------------------------------------------------------------ #
def annual_cost_by_skill_channel(self) -> pd.DataFrame:
"""
Annual cost by skill/channel.
cost_annual = CPI * volume (number of interactions in the sample).
Note: for simplicity we assume the dataset reflects an annual period.
If in the future you want to annualize (e.g. dataset = 1 month) you can add a scaling factor in EconomyConfig.
"""
cpi_table = self.cpi_by_skill_channel()
if cpi_table.empty:
return pd.DataFrame()
df = self.df.copy()
volume = (
df.groupby(["queue_skill", "channel"])["interaction_id"]
.nunique()
.rename("volume")
)
# Set index on cpi_table to match volume's MultiIndex for join
cpi_indexed = cpi_table.set_index(["queue_skill", "channel"])
joined = cpi_indexed.join(volume, how="left").fillna({"volume": 0})
joined["annual_cost"] = (joined["cpi_total"] * joined["volume"]).round(2)
return joined
# ------------------------------------------------------------------ #
# KPI 3: cost breakdown (labor / tech / overhead)
# ------------------------------------------------------------------ #
def cost_breakdown(self) -> Dict[str, float]:
"""
Cost breakdown %: labor, overhead, tech.
labor_total = sum(labor_cost_per_interaction)
overhead_total = labor_total * overhead_rate
tech_total = tech_costs_annual (if provided)
Returns percentages of the total.
If cost configuration is missing -> returns {}.
"""
if not self._has_cost_config():
return {}
cfg = self.config
assert cfg is not None
cpi_table = self.cpi_by_skill_channel()
if cpi_table.empty:
return {}
df = self.df.copy()
volume = (
df.groupby(["queue_skill", "channel"])["interaction_id"]
.nunique()
.rename("volume")
)
# Set index on cpi_table to match volume's MultiIndex for join
cpi_indexed = cpi_table.set_index(["queue_skill", "channel"])
joined = cpi_indexed.join(volume, how="left").fillna({"volume": 0})
# Annual labor and overhead costs
annual_labor = (joined["labor_cost"] * joined["volume"]).sum()
annual_overhead = (joined["overhead_cost"] * joined["volume"]).sum()
annual_tech = cfg.tech_costs_annual
total = annual_labor + annual_overhead + annual_tech
if total <= 0:
return {}
return {
"labor_pct": round(annual_labor / total * 100, 2),
"overhead_pct": round(annual_overhead / total * 100, 2),
"tech_pct": round(annual_tech / total * 100, 2),
"labor_annual": round(annual_labor, 2),
"overhead_annual": round(annual_overhead, 2),
"tech_annual": round(annual_tech, 2),
"total_annual": round(total, 2),
}
# ------------------------------------------------------------------ #
# KPI 4: inefficiency cost (€ by variability/escalation)
# ------------------------------------------------------------------ #
def inefficiency_cost_by_skill_channel(self) -> pd.DataFrame:
"""
Very simplified estimate of inefficiency cost:
For each skill/channel:
- AHT_p50, AHT_p90 (seconds).
- Delta = max(0, AHT_p90 - AHT_p50).
- Assumes that ~40% of interactions are above the median.
- Ineff_seconds = Delta * volume * 0.4
- Ineff_cost = LaborCPI_per_second * Ineff_seconds
NOTE: This is an approximate model to quantify "order of magnitude".
"""
if not self._has_cost_config():
return pd.DataFrame()
cfg = self.config
assert cfg is not None
df = self.df.copy()
# Filter by record_status: only VALID for AHT calculation
# Excludes NOISE, ZOMBIE, ABANDON
if "_is_valid_for_cost" in df.columns:
df = df[df["_is_valid_for_cost"] == True]
grouped = df.groupby(["queue_skill", "channel"])
stats = grouped["handle_time"].agg(
aht_p50=lambda s: float(np.percentile(s.dropna(), 50)),
aht_p90=lambda s: float(np.percentile(s.dropna(), 90)),
volume="count",
)
if stats.empty:
return pd.DataFrame()
# CPI to get cost/second of labor
# cpi_by_skill_channel now returns with reset_index, so we need to set index for join
cpi_table_raw = self.cpi_by_skill_channel()
if cpi_table_raw.empty:
return pd.DataFrame()
# Set queue_skill+channel as index for the join
cpi_table = cpi_table_raw.set_index(["queue_skill", "channel"])
merged = stats.join(cpi_table[["labor_cost"]], how="left")
merged = merged.fillna(0.0)
delta = (merged["aht_p90"] - merged["aht_p50"]).clip(lower=0.0)
affected_fraction = 0.4 # approximation
ineff_seconds = delta * merged["volume"] * affected_fraction
# labor_cost = cost per interaction with average AHT;
# approximate cost/second as labor_cost / average_AHT
aht_mean = grouped["handle_time"].mean()
merged["aht_mean"] = aht_mean
cost_per_second = merged["labor_cost"] / merged["aht_mean"].replace(0, np.nan)
cost_per_second = cost_per_second.fillna(0.0)
ineff_cost = (ineff_seconds * cost_per_second).round(2)
merged["ineff_seconds"] = ineff_seconds.round(2)
merged["ineff_cost"] = ineff_cost
# Reset index to include queue_skill and channel as columns for frontend lookup
return merged[["aht_p50", "aht_p90", "volume", "ineff_seconds", "ineff_cost"]].reset_index()
# ------------------------------------------------------------------ #
# KPI 5: potential annual savings from automation
# ------------------------------------------------------------------ #
def potential_savings(self) -> Dict[str, Any]:
"""
Potential annual savings based on:
Savings = (Human_CPI - Automated_CPI) * Automatable_volume * Success_rate
Where:
- Human_CPI = weighted average of cpi_total.
- Automated_CPI = config.automation_cpi
- Automatable_volume = volume_total * automation_volume_share
- Success_rate = automation_success_rate
If config parameters are missing -> returns {}.
"""
if not self._has_cost_config():
return {}
cfg = self.config
assert cfg is not None
if cfg.automation_cpi is None or cfg.automation_volume_share <= 0 or cfg.automation_success_rate <= 0:
return {}
cpi_table = self.annual_cost_by_skill_channel()
if cpi_table.empty:
return {}
total_volume = cpi_table["volume"].sum()
if total_volume <= 0:
return {}
# Weighted average human CPI
weighted_cpi = (
(cpi_table["cpi_total"] * cpi_table["volume"]).sum() / total_volume
)
volume_automatizable = total_volume * cfg.automation_volume_share
effective_volume = volume_automatizable * cfg.automation_success_rate
delta_cpi = max(0.0, weighted_cpi - cfg.automation_cpi)
annual_savings = delta_cpi * effective_volume
return {
"cpi_humano": round(weighted_cpi, 4),
"cpi_automatizado": round(cfg.automation_cpi, 4),
"volume_total": float(total_volume),
"volume_automatizable": float(volume_automatizable),
"effective_volume": float(effective_volume),
"annual_savings": round(annual_savings, 2),
}
# ------------------------------------------------------------------ #
# PLOTS
# ------------------------------------------------------------------ #
def plot_cost_waterfall(self) -> Axes:
"""
Waterfall of annual costs (labor + tech + overhead).
"""
breakdown = self.cost_breakdown()
if not breakdown:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "No cost configuration", ha="center", va="center")
ax.set_axis_off()
return ax
labels = ["Labor", "Overhead", "Tech"]
values = [
breakdown["labor_annual"],
breakdown["overhead_annual"],
breakdown["tech_annual"],
]
fig, ax = plt.subplots(figsize=(8, 4))
running = 0.0
positions = []
bottoms = []
for v in values:
positions.append(running)
bottoms.append(running)
running += v
# waterfall style bars
x = np.arange(len(labels))
ax.bar(x, values)
ax.set_xticks(x)
ax.set_xticklabels(labels)
ax.set_ylabel("€ annual")
ax.set_title("Annual cost breakdown")
for idx, v in enumerate(values):
ax.text(idx, v, f"{v:,.0f}", ha="center", va="bottom")
ax.grid(axis="y", alpha=0.3)
return ax
def plot_cpi_by_channel(self) -> Axes:
"""
Bar chart of average CPI by channel.
"""
cpi_table = self.cpi_by_skill_channel()
if cpi_table.empty:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "No cost configuration", ha="center", va="center")
ax.set_axis_off()
return ax
df = self.df.copy()
volume = (
df.groupby(["queue_skill", "channel"])["interaction_id"]
.nunique()
.rename("volume")
)
# Set index on cpi_table to match volume's MultiIndex for join
cpi_indexed = cpi_table.set_index(["queue_skill", "channel"])
joined = cpi_indexed.join(volume, how="left").fillna({"volume": 0})
# Weighted average CPI by channel
per_channel = (
joined.reset_index()
.groupby("channel")
.apply(lambda g: (g["cpi_total"] * g["volume"]).sum() / max(g["volume"].sum(), 1))
.rename("cpi_mean")
.round(4)
)
fig, ax = plt.subplots(figsize=(6, 4))
per_channel.plot(kind="bar", ax=ax)
ax.set_xlabel("Channel")
ax.set_ylabel("Average CPI (€)")
ax.set_title("Cost per interaction (CPI) by channel")
ax.grid(axis="y", alpha=0.3)
return ax