Files
BeyondCXAnalytics-Demo/backend/beyond_metrics/dimensions/OperationalPerformance.py
Claude 8c7f5fa827 Translate Phase 2 medium-priority files (frontend utils + backend dimensions)
Phase 2 of Spanish-to-English translation for medium-priority files:

Frontend utils (2 files):
- dataTransformation.ts: Translated ~72 occurrences (comments, docs, console logs)
- segmentClassifier.ts: Translated ~20 occurrences (JSDoc, inline comments, UI strings)

Backend dimensions (3 files):
- OperationalPerformance.py: Translated ~117 lines (docstrings, comments)
- SatisfactionExperience.py: Translated ~33 lines (docstrings, comments)
- EconomyCost.py: Translated ~79 lines (docstrings, comments)

All function names and variable names preserved for API compatibility.
Frontend and backend compilation tested and verified successful.

Related to TRANSLATION_STATUS.md Phase 2 objectives.

https://claude.ai/code/session_01GNbnkFoESkRcnPr3bLCYDg
2026-02-07 11:03:00 +00:00

712 lines
25 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.axes import Axes
import math
REQUIRED_COLUMNS_OP: List[str] = [
"interaction_id",
"datetime_start",
"queue_skill",
"channel",
"duration_talk",
"hold_time",
"wrap_up_time",
"agent_id",
"transfer_flag",
]
@dataclass
class OperationalPerformanceMetrics:
"""
Dimension: OPERATIONAL PERFORMANCE AND SERVICE
Purpose: measure the balance between speed (efficiency) and resolution quality, plus service variability.
Requires at minimum:
- interaction_id
- datetime_start
- queue_skill
- channel
- duration_talk (seconds)
- hold_time (seconds)
- wrap_up_time (seconds)
- agent_id
- transfer_flag (bool/int)
Optional columns:
- is_resolved (bool/int) -> for FCR
- abandoned_flag (bool/int) -> for abandonment rate
- customer_id / caller_id -> for recurrence and channel repetition
- logged_time (seconds) -> for occupancy_rate
"""
df: pd.DataFrame
# Benchmarks / normalization parameters (you can adjust them)
AHT_GOOD: float = 300.0 # 5 min
AHT_BAD: float = 900.0 # 15 min
VAR_RATIO_GOOD: float = 1.2 # P90/P50 ~1.2 muy estable
VAR_RATIO_BAD: float = 3.0 # P90/P50 >=3 muy inestable
def __post_init__(self) -> None:
self._validate_columns()
self._prepare_data()
# ------------------------------------------------------------------ #
# Internal helpers
# ------------------------------------------------------------------ #
def _validate_columns(self) -> None:
missing = [c for c in REQUIRED_COLUMNS_OP if c not in self.df.columns]
if missing:
raise ValueError(
f"Missing required columns for OperationalPerformanceMetrics: {missing}"
)
def _prepare_data(self) -> None:
df = self.df.copy()
# Types
df["datetime_start"] = pd.to_datetime(df["datetime_start"], errors="coerce")
for col in ["duration_talk", "hold_time", "wrap_up_time"]:
df[col] = pd.to_numeric(df[col], errors="coerce")
# Handle Time
df["handle_time"] = (
df["duration_talk"].fillna(0)
+ df["hold_time"].fillna(0)
+ df["wrap_up_time"].fillna(0)
)
# v3.0: Filter NOISE and ZOMBIE for variability calculations
# record_status: 'VALID', 'NOISE', 'ZOMBIE', 'ABANDON'
# For AHT/CV we only use 'VALID' (excludes noise, zombie, abandon)
if "record_status" in df.columns:
df["record_status"] = df["record_status"].astype(str).str.strip().str.upper()
# Create mask for valid records: ONLY "VALID"
# Explicitly excludes NOISE, ZOMBIE, ABANDON and any other value
df["_is_valid_for_cv"] = df["record_status"] == "VALID"
# Log record_status breakdown for debugging
status_counts = df["record_status"].value_counts()
valid_count = int(df["_is_valid_for_cv"].sum())
print(f"[OperationalPerformance] Record status breakdown:")
print(f" Total rows: {len(df)}")
for status, count in status_counts.items():
print(f" - {status}: {count}")
print(f" VALID rows for AHT calculation: {valid_count}")
else:
# Legacy data without record_status: include all
df["_is_valid_for_cv"] = True
print(f"[OperationalPerformance] No record_status column - using all {len(df)} rows")
# Basic normalization
df["queue_skill"] = df["queue_skill"].astype(str).str.strip()
df["channel"] = df["channel"].astype(str).str.strip()
df["agent_id"] = df["agent_id"].astype(str).str.strip()
# Optional flags converted to bool when they exist
for flag_col in ["is_resolved", "abandoned_flag", "transfer_flag"]:
if flag_col in df.columns:
df[flag_col] = df[flag_col].astype(int).astype(bool)
# customer_id: we use customer_id if it exists, otherwise caller_id
if "customer_id" in df.columns:
df["customer_id"] = df["customer_id"].astype(str)
elif "caller_id" in df.columns:
df["customer_id"] = df["caller_id"].astype(str)
else:
df["customer_id"] = None
# logged_time optional
# Normalize logged_time: will always be a float series with NaN if it does not exist
df["logged_time"] = pd.to_numeric(df.get("logged_time", np.nan), errors="coerce")
self.df = df
@property
def is_empty(self) -> bool:
return self.df.empty
# ------------------------------------------------------------------ #
# AHT and variability
# ------------------------------------------------------------------ #
def aht_distribution(self) -> Dict[str, float]:
"""
Returns P10, P50, P90 of AHT and the P90/P50 ratio as a measure of variability.
v3.0: Filters NOISE and ZOMBIE for variability calculation.
Only uses records with record_status='valid' or without status (legacy).
"""
# Filter only valid records for variability calculation
df_valid = self.df[self.df["_is_valid_for_cv"] == True]
ht = df_valid["handle_time"].dropna().astype(float)
if ht.empty:
return {}
p10 = float(np.percentile(ht, 10))
p50 = float(np.percentile(ht, 50))
p90 = float(np.percentile(ht, 90))
ratio = float(p90 / p50) if p50 > 0 else float("nan")
return {
"p10": round(p10, 2),
"p50": round(p50, 2),
"p90": round(p90, 2),
"p90_p50_ratio": round(ratio, 3),
}
def talk_hold_acw_p50_by_skill(self) -> pd.DataFrame:
"""
P50 of talk_time, hold_time and wrap_up_time by skill.
Includes queue_skill as a column (not just index) so that the frontend can lookup by skill name.
"""
df = self.df
def perc(s: pd.Series, q: float) -> float:
s = s.dropna().astype(float)
if s.empty:
return float("nan")
return float(np.percentile(s, q))
grouped = df.groupby("queue_skill")
result = pd.DataFrame(
{
"talk_p50": grouped["duration_talk"].apply(lambda s: perc(s, 50)),
"hold_p50": grouped["hold_time"].apply(lambda s: perc(s, 50)),
"acw_p50": grouped["wrap_up_time"].apply(lambda s: perc(s, 50)),
}
)
# Reset index to include queue_skill as column for frontend lookup
return result.round(2).sort_index().reset_index()
# ------------------------------------------------------------------ #
# FCR, escalation, abandonment, recurrence, channel repetition
# ------------------------------------------------------------------ #
def fcr_rate(self) -> float:
"""
FCR (First Contact Resolution).
Priority 1: Use fcr_real_flag from CSV if it exists
Priority 2: Calculate as 100 - escalation_rate
"""
df = self.df
total = len(df)
if total == 0:
return float("nan")
# Priority 1: Use fcr_real_flag if it exists
if "fcr_real_flag" in df.columns:
col = df["fcr_real_flag"]
# Normalize to boolean
if col.dtype == "O":
fcr_mask = (
col.astype(str)
.str.strip()
.str.lower()
.isin(["true", "t", "1", "yes", "y", "si", ""])
)
else:
fcr_mask = pd.to_numeric(col, errors="coerce").fillna(0) > 0
fcr_count = int(fcr_mask.sum())
fcr = (fcr_count / total) * 100.0
return float(max(0.0, min(100.0, round(fcr, 2))))
# Priority 2: Fallback to 100 - escalation_rate
try:
esc = self.escalation_rate()
except Exception:
esc = float("nan")
if esc is not None and not math.isnan(esc):
fcr = 100.0 - esc
return float(max(0.0, min(100.0, round(fcr, 2))))
return float("nan")
def escalation_rate(self) -> float:
"""
% of interactions that require escalation (transfer_flag == True).
"""
df = self.df
total = len(df)
if total == 0:
return float("nan")
escalated = df["transfer_flag"].sum()
return float(round(escalated / total * 100, 2))
def abandonment_rate(self) -> float:
"""
% of abandoned interactions.
Searches in order: is_abandoned, abandoned_flag, abandoned
If no column exists, returns NaN.
"""
df = self.df
total = len(df)
if total == 0:
return float("nan")
# Search for abandonment column in priority order
abandon_col = None
for col_name in ["is_abandoned", "abandoned_flag", "abandoned"]:
if col_name in df.columns:
abandon_col = col_name
break
if abandon_col is None:
return float("nan")
col = df[abandon_col]
# Normalize to boolean
if col.dtype == "O":
abandon_mask = (
col.astype(str)
.str.strip()
.str.lower()
.isin(["true", "t", "1", "yes", "y", "si", ""])
)
else:
abandon_mask = pd.to_numeric(col, errors="coerce").fillna(0) > 0
abandoned = int(abandon_mask.sum())
return float(round(abandoned / total * 100, 2))
def high_hold_time_rate(self, threshold_seconds: float = 60.0) -> float:
"""
% of interactions with hold_time > threshold (default 60s).
Complexity proxy: if the agent had to put the customer on hold for more than 60 seconds, they probably had to consult/investigate.
"""
df = self.df
total = len(df)
if total == 0:
return float("nan")
hold_times = df["hold_time"].fillna(0)
high_hold_count = (hold_times > threshold_seconds).sum()
return float(round(high_hold_count / total * 100, 2))
def recurrence_rate_7d(self) -> float:
"""
% of customers who contact again in < 7 days for the SAME skill.
Based on customer_id (or caller_id if no customer_id) + queue_skill.
Calculates:
- For each client + skill combination, sorts by datetime_start
- If there are two consecutive contacts separated by < 7 days (same client, same skill), counts as "recurrent"
- Rate = number of recurrent clients / total number of clients
NOTE: Only counts as recurrence if the client calls for the SAME skill.
A client who calls "Sales" and then "Support" is NOT recurrent.
"""
df = self.df.dropna(subset=["datetime_start"]).copy()
# Normalize client identifier
if "customer_id" not in df.columns:
if "caller_id" in df.columns:
df["customer_id"] = df["caller_id"]
else:
# No client identifier -> cannot calculate
return float("nan")
df = df.dropna(subset=["customer_id"])
if df.empty:
return float("nan")
# Sort by client + skill + date
df = df.sort_values(["customer_id", "queue_skill", "datetime_start"])
# Time difference between consecutive contacts by client AND skill
# This ensures we only count re-contacts from the same client for the same skill
df["delta"] = df.groupby(["customer_id", "queue_skill"])["datetime_start"].diff()
# Mark contacts that occur less than 7 days from the previous one (same skill)
recurrence_mask = df["delta"] < pd.Timedelta(days=7)
# Number of clients who have at least one recurrent contact (for any skill)
recurrent_customers = df.loc[recurrence_mask, "customer_id"].nunique()
total_customers = df["customer_id"].nunique()
if total_customers == 0:
return float("nan")
rate = recurrent_customers / total_customers * 100.0
return float(round(rate, 2))
def repeat_channel_rate(self) -> float:
"""
% of recurrences (<7 days) in which the client uses the SAME channel.
If there is no customer_id/caller_id or only one contact per client, returns NaN.
"""
df = self.df.dropna(subset=["datetime_start"]).copy()
if df["customer_id"].isna().all():
return float("nan")
df = df.sort_values(["customer_id", "datetime_start"])
df["next_customer"] = df["customer_id"].shift(-1)
df["next_datetime"] = df["datetime_start"].shift(-1)
df["next_channel"] = df["channel"].shift(-1)
same_customer = df["customer_id"] == df["next_customer"]
within_7d = (df["next_datetime"] - df["datetime_start"]) < pd.Timedelta(days=7)
recurrent_mask = same_customer & within_7d
if not recurrent_mask.any():
return float("nan")
same_channel = df["channel"] == df["next_channel"]
same_channel_recurrent = (recurrent_mask & same_channel).sum()
total_recurrent = recurrent_mask.sum()
return float(round(same_channel_recurrent / total_recurrent * 100, 2))
# ------------------------------------------------------------------ #
# Occupancy
# ------------------------------------------------------------------ #
def occupancy_rate(self) -> float:
"""
Occupancy rate:
occupancy = sum(handle_time) / sum(logged_time) * 100.
Requires 'logged_time' column. If it does not exist or is all 0, returns NaN.
"""
df = self.df
if "logged_time" not in df.columns:
return float("nan")
logged = df["logged_time"].fillna(0)
handle = df["handle_time"].fillna(0)
total_logged = logged.sum()
if total_logged == 0:
return float("nan")
occ = handle.sum() / total_logged
return float(round(occ * 100, 2))
# ------------------------------------------------------------------ #
# Performance score 0-10
# ------------------------------------------------------------------ #
def performance_score(self) -> Dict[str, float]:
"""
Calculates a 0-10 score combining:
- AHT (lower is better)
- FCR (higher is better)
- Variability (P90/P50, lower is better)
- Other factors (occupancy / escalation)
Formula:
score = 0.4 * (10 - AHT_norm) +
0.3 * FCR_norm +
0.2 * (10 - Var_norm) +
0.1 * Otros_score
Where *_norm are values on a 0-10 scale.
"""
dist = self.aht_distribution()
if not dist:
return {"score": float("nan")}
p50 = dist["p50"]
ratio = dist["p90_p50_ratio"]
# AHT_normalized: 0 (better) to 10 (worse)
aht_norm = self._scale_to_0_10(p50, self.AHT_GOOD, self.AHT_BAD)
# FCR_normalized: 0-10 directly from % (0-100)
fcr_pct = self.fcr_rate()
fcr_norm = fcr_pct / 10.0 if not np.isnan(fcr_pct) else 0.0
# Variability_normalized: 0 (good ratio) to 10 (bad ratio)
var_norm = self._scale_to_0_10(ratio, self.VAR_RATIO_GOOD, self.VAR_RATIO_BAD)
# Other factors: combine occupancy (ideal ~80%) and escalation (ideal low)
occ = self.occupancy_rate()
esc = self.escalation_rate()
other_score = self._compute_other_factors_score(occ, esc)
score = (
0.4 * (10.0 - aht_norm)
+ 0.3 * fcr_norm
+ 0.2 * (10.0 - var_norm)
+ 0.1 * other_score
)
# Clamp 0-10
score = max(0.0, min(10.0, score))
return {
"score": round(score, 2),
"aht_norm": round(aht_norm, 2),
"fcr_norm": round(fcr_norm, 2),
"var_norm": round(var_norm, 2),
"other_score": round(other_score, 2),
}
def _scale_to_0_10(self, value: float, good: float, bad: float) -> float:
"""
Linearly scales a value:
- good -> 0
- bad -> 10
With saturation outside range.
"""
if np.isnan(value):
return 5.0 # neutral
if good == bad:
return 5.0
if good < bad:
# Lower is better
if value <= good:
return 0.0
if value >= bad:
return 10.0
return 10.0 * (value - good) / (bad - good)
else:
# Higher is better
if value >= good:
return 0.0
if value <= bad:
return 10.0
return 10.0 * (good - value) / (good - bad)
def _compute_other_factors_score(self, occ_pct: float, esc_pct: float) -> float:
"""
Other factors (0-10) based on:
- ideal occupancy around 80%
- ideal escalation rate low (<10%)
"""
# Occupancy: 0 penalty if between 75-85, penalized outside
if np.isnan(occ_pct):
occ_penalty = 5.0
else:
deviation = abs(occ_pct - 80.0)
occ_penalty = min(10.0, deviation / 5.0 * 2.0) # each 5 points add 2, max 10
occ_score = max(0.0, 10.0 - occ_penalty)
# Escalation: 0-10 where 0% -> 10 points, >=40% -> 0
if np.isnan(esc_pct):
esc_score = 5.0
else:
if esc_pct <= 0:
esc_score = 10.0
elif esc_pct >= 40:
esc_score = 0.0
else:
esc_score = 10.0 * (1.0 - esc_pct / 40.0)
# Simple average of both
return (occ_score + esc_score) / 2.0
# ------------------------------------------------------------------ #
# Plots
# ------------------------------------------------------------------ #
def plot_aht_boxplot_by_skill(self) -> Axes:
"""
Boxplot of AHT by skill (P10-P50-P90 visual).
"""
df = self.df.copy()
if df.empty or "handle_time" not in df.columns:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "No AHT data", ha="center", va="center")
ax.set_axis_off()
return ax
df = df.dropna(subset=["handle_time"])
if df.empty:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "AHT not available", ha="center", va="center")
ax.set_axis_off()
return ax
fig, ax = plt.subplots(figsize=(8, 4))
df.boxplot(column="handle_time", by="queue_skill", ax=ax, showfliers=False)
ax.set_xlabel("Skill / Queue")
ax.set_ylabel("AHT (seconds)")
ax.set_title("AHT distribution by skill")
plt.suptitle("")
plt.xticks(rotation=45, ha="right")
ax.grid(axis="y", alpha=0.3)
return ax
def plot_resolution_funnel_by_skill(self) -> Axes:
"""
Funnel / stacked bars of Talk + Hold + ACW by skill (P50).
Allows viewing the time balance by skill.
"""
p50 = self.talk_hold_acw_p50_by_skill()
if p50.empty:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "No data for funnel", ha="center", va="center")
ax.set_axis_off()
return ax
fig, ax = plt.subplots(figsize=(10, 4))
skills = p50.index
talk = p50["talk_p50"]
hold = p50["hold_p50"]
acw = p50["acw_p50"]
x = np.arange(len(skills))
ax.bar(x, talk, label="Talk P50")
ax.bar(x, hold, bottom=talk, label="Hold P50")
ax.bar(x, acw, bottom=talk + hold, label="ACW P50")
ax.set_xticks(x)
ax.set_xticklabels(skills, rotation=45, ha="right")
ax.set_ylabel("Seconds")
ax.set_title("Resolution funnel (P50) by skill")
ax.legend()
ax.grid(axis="y", alpha=0.3)
return ax
# ------------------------------------------------------------------ #
# Metrics by skill (for frontend cached/fresh consistency)
# ------------------------------------------------------------------ #
def metrics_by_skill(self) -> List[Dict[str, Any]]:
"""
Calculates operational metrics by skill:
- transfer_rate: % of interactions with transfer_flag == True
- abandonment_rate: % of abandoned interactions
- fcr_tecnico: 100 - transfer_rate (without transfer)
- fcr_real: % without transfer AND without 7d re-contact (if there is data)
- volume: number of interactions
Returns a list of dicts, one per skill, so that the frontend has access to real metrics by skill (not estimated).
"""
df = self.df
if df.empty:
return []
results = []
# Detect abandonment column
abandon_col = None
for col_name in ["is_abandoned", "abandoned_flag", "abandoned"]:
if col_name in df.columns:
abandon_col = col_name
break
# Detect repeat_call_7d column for real FCR
repeat_col = None
for col_name in ["repeat_call_7d", "repeat_7d", "is_repeat_7d"]:
if col_name in df.columns:
repeat_col = col_name
break
for skill, group in df.groupby("queue_skill"):
total = len(group)
if total == 0:
continue
# Transfer rate
if "transfer_flag" in group.columns:
transfer_count = group["transfer_flag"].sum()
transfer_rate = float(round(transfer_count / total * 100, 2))
else:
transfer_rate = 0.0
# Technical FCR = 100 - transfer_rate
fcr_tecnico = float(round(100.0 - transfer_rate, 2))
# Abandonment rate
abandonment_rate = 0.0
if abandon_col:
col = group[abandon_col]
if col.dtype == "O":
abandon_mask = (
col.astype(str)
.str.strip()
.str.lower()
.isin(["true", "t", "1", "yes", "y", "si", ""])
)
else:
abandon_mask = pd.to_numeric(col, errors="coerce").fillna(0) > 0
abandoned = int(abandon_mask.sum())
abandonment_rate = float(round(abandoned / total * 100, 2))
# Real FCR (without transfer AND without 7d re-contact)
fcr_real = fcr_tecnico # default to fcr_tecnico if no repeat data
if repeat_col and "transfer_flag" in group.columns:
repeat_data = group[repeat_col]
if repeat_data.dtype == "O":
repeat_mask = (
repeat_data.astype(str)
.str.strip()
.str.lower()
.isin(["true", "t", "1", "yes", "y", "si", ""])
)
else:
repeat_mask = pd.to_numeric(repeat_data, errors="coerce").fillna(0) > 0
# Real FCR: no transfer AND no repeat
fcr_real_mask = (~group["transfer_flag"]) & (~repeat_mask)
fcr_real_count = fcr_real_mask.sum()
fcr_real = float(round(fcr_real_count / total * 100, 2))
# AHT Mean (average of handle_time over valid records)
# Filter only 'valid' records (excludes noise/zombie) for consistency
if "_is_valid_for_cv" in group.columns:
valid_records = group[group["_is_valid_for_cv"]]
else:
valid_records = group
if len(valid_records) > 0 and "handle_time" in valid_records.columns:
aht_mean = float(round(valid_records["handle_time"].mean(), 2))
else:
aht_mean = 0.0
# AHT Total (average of handle_time over ALL records)
# Includes NOISE, ZOMBIE, ABANDON - for information/comparison only
if len(group) > 0 and "handle_time" in group.columns:
aht_total = float(round(group["handle_time"].mean(), 2))
else:
aht_total = 0.0
# Hold Time Mean (average of hold_time over valid records)
# Consistent with fresh path that uses MEAN, not P50
if len(valid_records) > 0 and "hold_time" in valid_records.columns:
hold_time_mean = float(round(valid_records["hold_time"].mean(), 2))
else:
hold_time_mean = 0.0
results.append({
"skill": str(skill),
"volume": int(total),
"transfer_rate": transfer_rate,
"abandonment_rate": abandonment_rate,
"fcr_tecnico": fcr_tecnico,
"fcr_real": fcr_real,
"aht_mean": aht_mean,
"aht_total": aht_total,
"hold_time_mean": hold_time_mean,
})
return results