from __future__ import annotations from dataclasses import dataclass from typing import Any, Dict, List import numpy as np import pandas as pd import matplotlib.pyplot as plt from matplotlib.axes import Axes import math REQUIRED_COLUMNS_OP: List[str] = [ "interaction_id", "datetime_start", "queue_skill", "channel", "duration_talk", "hold_time", "wrap_up_time", "agent_id", "transfer_flag", ] @dataclass class OperationalPerformanceMetrics: """ Dimension: OPERATIONAL PERFORMANCE AND SERVICE Purpose: measure the balance between speed (efficiency) and resolution quality, plus service variability. Requires at minimum: - interaction_id - datetime_start - queue_skill - channel - duration_talk (seconds) - hold_time (seconds) - wrap_up_time (seconds) - agent_id - transfer_flag (bool/int) Optional columns: - is_resolved (bool/int) -> for FCR - abandoned_flag (bool/int) -> for abandonment rate - customer_id / caller_id -> for recurrence and channel repetition - logged_time (seconds) -> for occupancy_rate """ df: pd.DataFrame # Benchmarks / normalization parameters (you can adjust them) AHT_GOOD: float = 300.0 # 5 min AHT_BAD: float = 900.0 # 15 min VAR_RATIO_GOOD: float = 1.2 # P90/P50 ~1.2 muy estable VAR_RATIO_BAD: float = 3.0 # P90/P50 >=3 muy inestable def __post_init__(self) -> None: self._validate_columns() self._prepare_data() # ------------------------------------------------------------------ # # Internal helpers # ------------------------------------------------------------------ # def _validate_columns(self) -> None: missing = [c for c in REQUIRED_COLUMNS_OP if c not in self.df.columns] if missing: raise ValueError( f"Missing required columns for OperationalPerformanceMetrics: {missing}" ) def _prepare_data(self) -> None: df = self.df.copy() # Types df["datetime_start"] = pd.to_datetime(df["datetime_start"], errors="coerce") for col in ["duration_talk", "hold_time", "wrap_up_time"]: df[col] = pd.to_numeric(df[col], errors="coerce") # Handle Time df["handle_time"] = ( df["duration_talk"].fillna(0) + df["hold_time"].fillna(0) + df["wrap_up_time"].fillna(0) ) # v3.0: Filter NOISE and ZOMBIE for variability calculations # record_status: 'VALID', 'NOISE', 'ZOMBIE', 'ABANDON' # For AHT/CV we only use 'VALID' (excludes noise, zombie, abandon) if "record_status" in df.columns: df["record_status"] = df["record_status"].astype(str).str.strip().str.upper() # Create mask for valid records: ONLY "VALID" # Explicitly excludes NOISE, ZOMBIE, ABANDON and any other value df["_is_valid_for_cv"] = df["record_status"] == "VALID" # Log record_status breakdown for debugging status_counts = df["record_status"].value_counts() valid_count = int(df["_is_valid_for_cv"].sum()) print(f"[OperationalPerformance] Record status breakdown:") print(f" Total rows: {len(df)}") for status, count in status_counts.items(): print(f" - {status}: {count}") print(f" VALID rows for AHT calculation: {valid_count}") else: # Legacy data without record_status: include all df["_is_valid_for_cv"] = True print(f"[OperationalPerformance] No record_status column - using all {len(df)} rows") # Basic normalization df["queue_skill"] = df["queue_skill"].astype(str).str.strip() df["channel"] = df["channel"].astype(str).str.strip() df["agent_id"] = df["agent_id"].astype(str).str.strip() # Optional flags converted to bool when they exist for flag_col in ["is_resolved", "abandoned_flag", "transfer_flag"]: if flag_col in df.columns: df[flag_col] = df[flag_col].astype(int).astype(bool) # customer_id: we use customer_id if it exists, otherwise caller_id if "customer_id" in df.columns: df["customer_id"] = df["customer_id"].astype(str) elif "caller_id" in df.columns: df["customer_id"] = df["caller_id"].astype(str) else: df["customer_id"] = None # logged_time optional # Normalize logged_time: will always be a float series with NaN if it does not exist df["logged_time"] = pd.to_numeric(df.get("logged_time", np.nan), errors="coerce") self.df = df @property def is_empty(self) -> bool: return self.df.empty # ------------------------------------------------------------------ # # AHT and variability # ------------------------------------------------------------------ # def aht_distribution(self) -> Dict[str, float]: """ Returns P10, P50, P90 of AHT and the P90/P50 ratio as a measure of variability. v3.0: Filters NOISE and ZOMBIE for variability calculation. Only uses records with record_status='valid' or without status (legacy). """ # Filter only valid records for variability calculation df_valid = self.df[self.df["_is_valid_for_cv"] == True] ht = df_valid["handle_time"].dropna().astype(float) if ht.empty: return {} p10 = float(np.percentile(ht, 10)) p50 = float(np.percentile(ht, 50)) p90 = float(np.percentile(ht, 90)) ratio = float(p90 / p50) if p50 > 0 else float("nan") return { "p10": round(p10, 2), "p50": round(p50, 2), "p90": round(p90, 2), "p90_p50_ratio": round(ratio, 3), } def talk_hold_acw_p50_by_skill(self) -> pd.DataFrame: """ P50 of talk_time, hold_time and wrap_up_time by skill. Includes queue_skill as a column (not just index) so that the frontend can lookup by skill name. """ df = self.df def perc(s: pd.Series, q: float) -> float: s = s.dropna().astype(float) if s.empty: return float("nan") return float(np.percentile(s, q)) grouped = df.groupby("queue_skill") result = pd.DataFrame( { "talk_p50": grouped["duration_talk"].apply(lambda s: perc(s, 50)), "hold_p50": grouped["hold_time"].apply(lambda s: perc(s, 50)), "acw_p50": grouped["wrap_up_time"].apply(lambda s: perc(s, 50)), } ) # Reset index to include queue_skill as column for frontend lookup return result.round(2).sort_index().reset_index() # ------------------------------------------------------------------ # # FCR, escalation, abandonment, recurrence, channel repetition # ------------------------------------------------------------------ # def fcr_rate(self) -> float: """ FCR (First Contact Resolution). Priority 1: Use fcr_real_flag from CSV if it exists Priority 2: Calculate as 100 - escalation_rate """ df = self.df total = len(df) if total == 0: return float("nan") # Priority 1: Use fcr_real_flag if it exists if "fcr_real_flag" in df.columns: col = df["fcr_real_flag"] # Normalize to boolean if col.dtype == "O": fcr_mask = ( col.astype(str) .str.strip() .str.lower() .isin(["true", "t", "1", "yes", "y", "si", "sí"]) ) else: fcr_mask = pd.to_numeric(col, errors="coerce").fillna(0) > 0 fcr_count = int(fcr_mask.sum()) fcr = (fcr_count / total) * 100.0 return float(max(0.0, min(100.0, round(fcr, 2)))) # Priority 2: Fallback to 100 - escalation_rate try: esc = self.escalation_rate() except Exception: esc = float("nan") if esc is not None and not math.isnan(esc): fcr = 100.0 - esc return float(max(0.0, min(100.0, round(fcr, 2)))) return float("nan") def escalation_rate(self) -> float: """ % of interactions that require escalation (transfer_flag == True). """ df = self.df total = len(df) if total == 0: return float("nan") escalated = df["transfer_flag"].sum() return float(round(escalated / total * 100, 2)) def abandonment_rate(self) -> float: """ % of abandoned interactions. Searches in order: is_abandoned, abandoned_flag, abandoned If no column exists, returns NaN. """ df = self.df total = len(df) if total == 0: return float("nan") # Search for abandonment column in priority order abandon_col = None for col_name in ["is_abandoned", "abandoned_flag", "abandoned"]: if col_name in df.columns: abandon_col = col_name break if abandon_col is None: return float("nan") col = df[abandon_col] # Normalize to boolean if col.dtype == "O": abandon_mask = ( col.astype(str) .str.strip() .str.lower() .isin(["true", "t", "1", "yes", "y", "si", "sí"]) ) else: abandon_mask = pd.to_numeric(col, errors="coerce").fillna(0) > 0 abandoned = int(abandon_mask.sum()) return float(round(abandoned / total * 100, 2)) def high_hold_time_rate(self, threshold_seconds: float = 60.0) -> float: """ % of interactions with hold_time > threshold (default 60s). Complexity proxy: if the agent had to put the customer on hold for more than 60 seconds, they probably had to consult/investigate. """ df = self.df total = len(df) if total == 0: return float("nan") hold_times = df["hold_time"].fillna(0) high_hold_count = (hold_times > threshold_seconds).sum() return float(round(high_hold_count / total * 100, 2)) def recurrence_rate_7d(self) -> float: """ % of customers who contact again in < 7 days for the SAME skill. Based on customer_id (or caller_id if no customer_id) + queue_skill. Calculates: - For each client + skill combination, sorts by datetime_start - If there are two consecutive contacts separated by < 7 days (same client, same skill), counts as "recurrent" - Rate = number of recurrent clients / total number of clients NOTE: Only counts as recurrence if the client calls for the SAME skill. A client who calls "Sales" and then "Support" is NOT recurrent. """ df = self.df.dropna(subset=["datetime_start"]).copy() # Normalize client identifier if "customer_id" not in df.columns: if "caller_id" in df.columns: df["customer_id"] = df["caller_id"] else: # No client identifier -> cannot calculate return float("nan") df = df.dropna(subset=["customer_id"]) if df.empty: return float("nan") # Sort by client + skill + date df = df.sort_values(["customer_id", "queue_skill", "datetime_start"]) # Time difference between consecutive contacts by client AND skill # This ensures we only count re-contacts from the same client for the same skill df["delta"] = df.groupby(["customer_id", "queue_skill"])["datetime_start"].diff() # Mark contacts that occur less than 7 days from the previous one (same skill) recurrence_mask = df["delta"] < pd.Timedelta(days=7) # Number of clients who have at least one recurrent contact (for any skill) recurrent_customers = df.loc[recurrence_mask, "customer_id"].nunique() total_customers = df["customer_id"].nunique() if total_customers == 0: return float("nan") rate = recurrent_customers / total_customers * 100.0 return float(round(rate, 2)) def repeat_channel_rate(self) -> float: """ % of recurrences (<7 days) in which the client uses the SAME channel. If there is no customer_id/caller_id or only one contact per client, returns NaN. """ df = self.df.dropna(subset=["datetime_start"]).copy() if df["customer_id"].isna().all(): return float("nan") df = df.sort_values(["customer_id", "datetime_start"]) df["next_customer"] = df["customer_id"].shift(-1) df["next_datetime"] = df["datetime_start"].shift(-1) df["next_channel"] = df["channel"].shift(-1) same_customer = df["customer_id"] == df["next_customer"] within_7d = (df["next_datetime"] - df["datetime_start"]) < pd.Timedelta(days=7) recurrent_mask = same_customer & within_7d if not recurrent_mask.any(): return float("nan") same_channel = df["channel"] == df["next_channel"] same_channel_recurrent = (recurrent_mask & same_channel).sum() total_recurrent = recurrent_mask.sum() return float(round(same_channel_recurrent / total_recurrent * 100, 2)) # ------------------------------------------------------------------ # # Occupancy # ------------------------------------------------------------------ # def occupancy_rate(self) -> float: """ Occupancy rate: occupancy = sum(handle_time) / sum(logged_time) * 100. Requires 'logged_time' column. If it does not exist or is all 0, returns NaN. """ df = self.df if "logged_time" not in df.columns: return float("nan") logged = df["logged_time"].fillna(0) handle = df["handle_time"].fillna(0) total_logged = logged.sum() if total_logged == 0: return float("nan") occ = handle.sum() / total_logged return float(round(occ * 100, 2)) # ------------------------------------------------------------------ # # Performance score 0-10 # ------------------------------------------------------------------ # def performance_score(self) -> Dict[str, float]: """ Calculates a 0-10 score combining: - AHT (lower is better) - FCR (higher is better) - Variability (P90/P50, lower is better) - Other factors (occupancy / escalation) Formula: score = 0.4 * (10 - AHT_norm) + 0.3 * FCR_norm + 0.2 * (10 - Var_norm) + 0.1 * Otros_score Where *_norm are values on a 0-10 scale. """ dist = self.aht_distribution() if not dist: return {"score": float("nan")} p50 = dist["p50"] ratio = dist["p90_p50_ratio"] # AHT_normalized: 0 (better) to 10 (worse) aht_norm = self._scale_to_0_10(p50, self.AHT_GOOD, self.AHT_BAD) # FCR_normalized: 0-10 directly from % (0-100) fcr_pct = self.fcr_rate() fcr_norm = fcr_pct / 10.0 if not np.isnan(fcr_pct) else 0.0 # Variability_normalized: 0 (good ratio) to 10 (bad ratio) var_norm = self._scale_to_0_10(ratio, self.VAR_RATIO_GOOD, self.VAR_RATIO_BAD) # Other factors: combine occupancy (ideal ~80%) and escalation (ideal low) occ = self.occupancy_rate() esc = self.escalation_rate() other_score = self._compute_other_factors_score(occ, esc) score = ( 0.4 * (10.0 - aht_norm) + 0.3 * fcr_norm + 0.2 * (10.0 - var_norm) + 0.1 * other_score ) # Clamp 0-10 score = max(0.0, min(10.0, score)) return { "score": round(score, 2), "aht_norm": round(aht_norm, 2), "fcr_norm": round(fcr_norm, 2), "var_norm": round(var_norm, 2), "other_score": round(other_score, 2), } def _scale_to_0_10(self, value: float, good: float, bad: float) -> float: """ Linearly scales a value: - good -> 0 - bad -> 10 With saturation outside range. """ if np.isnan(value): return 5.0 # neutral if good == bad: return 5.0 if good < bad: # Lower is better if value <= good: return 0.0 if value >= bad: return 10.0 return 10.0 * (value - good) / (bad - good) else: # Higher is better if value >= good: return 0.0 if value <= bad: return 10.0 return 10.0 * (good - value) / (good - bad) def _compute_other_factors_score(self, occ_pct: float, esc_pct: float) -> float: """ Other factors (0-10) based on: - ideal occupancy around 80% - ideal escalation rate low (<10%) """ # Occupancy: 0 penalty if between 75-85, penalized outside if np.isnan(occ_pct): occ_penalty = 5.0 else: deviation = abs(occ_pct - 80.0) occ_penalty = min(10.0, deviation / 5.0 * 2.0) # each 5 points add 2, max 10 occ_score = max(0.0, 10.0 - occ_penalty) # Escalation: 0-10 where 0% -> 10 points, >=40% -> 0 if np.isnan(esc_pct): esc_score = 5.0 else: if esc_pct <= 0: esc_score = 10.0 elif esc_pct >= 40: esc_score = 0.0 else: esc_score = 10.0 * (1.0 - esc_pct / 40.0) # Simple average of both return (occ_score + esc_score) / 2.0 # ------------------------------------------------------------------ # # Plots # ------------------------------------------------------------------ # def plot_aht_boxplot_by_skill(self) -> Axes: """ Boxplot of AHT by skill (P10-P50-P90 visual). """ df = self.df.copy() if df.empty or "handle_time" not in df.columns: fig, ax = plt.subplots() ax.text(0.5, 0.5, "No AHT data", ha="center", va="center") ax.set_axis_off() return ax df = df.dropna(subset=["handle_time"]) if df.empty: fig, ax = plt.subplots() ax.text(0.5, 0.5, "AHT not available", ha="center", va="center") ax.set_axis_off() return ax fig, ax = plt.subplots(figsize=(8, 4)) df.boxplot(column="handle_time", by="queue_skill", ax=ax, showfliers=False) ax.set_xlabel("Skill / Queue") ax.set_ylabel("AHT (seconds)") ax.set_title("AHT distribution by skill") plt.suptitle("") plt.xticks(rotation=45, ha="right") ax.grid(axis="y", alpha=0.3) return ax def plot_resolution_funnel_by_skill(self) -> Axes: """ Funnel / stacked bars of Talk + Hold + ACW by skill (P50). Allows viewing the time balance by skill. """ p50 = self.talk_hold_acw_p50_by_skill() if p50.empty: fig, ax = plt.subplots() ax.text(0.5, 0.5, "No data for funnel", ha="center", va="center") ax.set_axis_off() return ax fig, ax = plt.subplots(figsize=(10, 4)) skills = p50.index talk = p50["talk_p50"] hold = p50["hold_p50"] acw = p50["acw_p50"] x = np.arange(len(skills)) ax.bar(x, talk, label="Talk P50") ax.bar(x, hold, bottom=talk, label="Hold P50") ax.bar(x, acw, bottom=talk + hold, label="ACW P50") ax.set_xticks(x) ax.set_xticklabels(skills, rotation=45, ha="right") ax.set_ylabel("Seconds") ax.set_title("Resolution funnel (P50) by skill") ax.legend() ax.grid(axis="y", alpha=0.3) return ax # ------------------------------------------------------------------ # # Metrics by skill (for frontend cached/fresh consistency) # ------------------------------------------------------------------ # def metrics_by_skill(self) -> List[Dict[str, Any]]: """ Calculates operational metrics by skill: - transfer_rate: % of interactions with transfer_flag == True - abandonment_rate: % of abandoned interactions - fcr_tecnico: 100 - transfer_rate (without transfer) - fcr_real: % without transfer AND without 7d re-contact (if there is data) - volume: number of interactions Returns a list of dicts, one per skill, so that the frontend has access to real metrics by skill (not estimated). """ df = self.df if df.empty: return [] results = [] # Detect abandonment column abandon_col = None for col_name in ["is_abandoned", "abandoned_flag", "abandoned"]: if col_name in df.columns: abandon_col = col_name break # Detect repeat_call_7d column for real FCR repeat_col = None for col_name in ["repeat_call_7d", "repeat_7d", "is_repeat_7d"]: if col_name in df.columns: repeat_col = col_name break for skill, group in df.groupby("queue_skill"): total = len(group) if total == 0: continue # Transfer rate if "transfer_flag" in group.columns: transfer_count = group["transfer_flag"].sum() transfer_rate = float(round(transfer_count / total * 100, 2)) else: transfer_rate = 0.0 # Technical FCR = 100 - transfer_rate fcr_tecnico = float(round(100.0 - transfer_rate, 2)) # Abandonment rate abandonment_rate = 0.0 if abandon_col: col = group[abandon_col] if col.dtype == "O": abandon_mask = ( col.astype(str) .str.strip() .str.lower() .isin(["true", "t", "1", "yes", "y", "si", "sí"]) ) else: abandon_mask = pd.to_numeric(col, errors="coerce").fillna(0) > 0 abandoned = int(abandon_mask.sum()) abandonment_rate = float(round(abandoned / total * 100, 2)) # Real FCR (without transfer AND without 7d re-contact) fcr_real = fcr_tecnico # default to fcr_tecnico if no repeat data if repeat_col and "transfer_flag" in group.columns: repeat_data = group[repeat_col] if repeat_data.dtype == "O": repeat_mask = ( repeat_data.astype(str) .str.strip() .str.lower() .isin(["true", "t", "1", "yes", "y", "si", "sí"]) ) else: repeat_mask = pd.to_numeric(repeat_data, errors="coerce").fillna(0) > 0 # Real FCR: no transfer AND no repeat fcr_real_mask = (~group["transfer_flag"]) & (~repeat_mask) fcr_real_count = fcr_real_mask.sum() fcr_real = float(round(fcr_real_count / total * 100, 2)) # AHT Mean (average of handle_time over valid records) # Filter only 'valid' records (excludes noise/zombie) for consistency if "_is_valid_for_cv" in group.columns: valid_records = group[group["_is_valid_for_cv"]] else: valid_records = group if len(valid_records) > 0 and "handle_time" in valid_records.columns: aht_mean = float(round(valid_records["handle_time"].mean(), 2)) else: aht_mean = 0.0 # AHT Total (average of handle_time over ALL records) # Includes NOISE, ZOMBIE, ABANDON - for information/comparison only if len(group) > 0 and "handle_time" in group.columns: aht_total = float(round(group["handle_time"].mean(), 2)) else: aht_total = 0.0 # Hold Time Mean (average of hold_time over valid records) # Consistent with fresh path that uses MEAN, not P50 if len(valid_records) > 0 and "hold_time" in valid_records.columns: hold_time_mean = float(round(valid_records["hold_time"].mean(), 2)) else: hold_time_mean = 0.0 results.append({ "skill": str(skill), "volume": int(total), "transfer_rate": transfer_rate, "abandonment_rate": abandonment_rate, "fcr_tecnico": fcr_tecnico, "fcr_real": fcr_real, "aht_mean": aht_mean, "aht_total": aht_total, "hold_time_mean": hold_time_mean, }) return results