feat: Add Law 10/2025 compliance analysis tab

- Add new Law10Tab with compliance analysis for Spanish Law 10/2025
- Sections: LAW-01 (Response Speed), LAW-02 (Resolution Quality), LAW-07 (Time Coverage)
- Add Data Maturity Summary showing available/estimable/missing data
- Add Validation Questionnaire for manual data input
- Add Dimension Connections linking to other analysis tabs
- Fix KPI consistency: use correct field names (abandonment_rate, aht_seconds)
- Fix cache directory path for Windows compatibility
- Update economic calculations to use actual economicModel data

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
sujucu70
2026-01-22 21:58:26 +01:00
parent 62454c6b6a
commit 88d7e4c10d
20 changed files with 5554 additions and 1285 deletions

View File

@@ -8,6 +8,7 @@ from __future__ import annotations
import json
import os
import shutil
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
@@ -23,12 +24,38 @@ router = APIRouter(
tags=["cache"],
)
# Directory for cache files
CACHE_DIR = Path(os.getenv("CACHE_DIR", "/data/cache"))
# Directory for cache files - use platform-appropriate default
def _get_default_cache_dir() -> Path:
"""Get a platform-appropriate default cache directory."""
env_cache_dir = os.getenv("CACHE_DIR")
if env_cache_dir:
return Path(env_cache_dir)
# On Windows, check if C:/data/cache exists (legacy location)
# Otherwise use a local .cache directory relative to the backend
# On Unix/Docker, use /data/cache
if sys.platform == "win32":
# Check legacy location first (for backwards compatibility)
legacy_cache = Path("C:/data/cache")
if legacy_cache.exists():
return legacy_cache
# Fallback to local .cache directory in the backend folder
backend_dir = Path(__file__).parent.parent.parent
return backend_dir / ".cache"
else:
return Path("/data/cache")
CACHE_DIR = _get_default_cache_dir()
CACHED_FILE = CACHE_DIR / "cached_data.csv"
METADATA_FILE = CACHE_DIR / "metadata.json"
DRILLDOWN_FILE = CACHE_DIR / "drilldown_data.json"
# Log cache directory on module load
import logging
logger = logging.getLogger(__name__)
logger.info(f"[Cache] Using cache directory: {CACHE_DIR}")
logger.info(f"[Cache] Drilldown file path: {DRILLDOWN_FILE}")
class CacheMetadata(BaseModel):
fileName: str
@@ -158,7 +185,11 @@ def get_cached_drilldown(current_user: str = Depends(get_current_user)):
Get the cached drilldownData JSON.
Returns the pre-calculated drilldown data for fast cache usage.
"""
logger.info(f"[Cache] GET /drilldown - checking file: {DRILLDOWN_FILE}")
logger.info(f"[Cache] File exists: {DRILLDOWN_FILE.exists()}")
if not DRILLDOWN_FILE.exists():
logger.warning(f"[Cache] Drilldown file not found at: {DRILLDOWN_FILE}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No cached drilldown data found"
@@ -167,8 +198,10 @@ def get_cached_drilldown(current_user: str = Depends(get_current_user)):
try:
with open(DRILLDOWN_FILE, "r", encoding="utf-8") as f:
drilldown_data = json.load(f)
logger.info(f"[Cache] Loaded drilldown with {len(drilldown_data)} skills")
return JSONResponse(content={"success": True, "drilldownData": drilldown_data})
except Exception as e:
logger.error(f"[Cache] Error reading drilldown: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error reading drilldown data: {str(e)}"
@@ -185,16 +218,21 @@ async def save_cached_drilldown(
Called by frontend after calculating drilldown from uploaded file.
Receives JSON as form field.
"""
logger.info(f"[Cache] POST /drilldown - saving to: {DRILLDOWN_FILE}")
logger.info(f"[Cache] Cache directory: {CACHE_DIR}")
ensure_cache_dir()
logger.info(f"[Cache] Cache dir exists after ensure: {CACHE_DIR.exists()}")
try:
# Parse and validate JSON
drilldown_data = json.loads(drilldown_json)
logger.info(f"[Cache] Parsed drilldown JSON with {len(drilldown_data)} skills")
# Save to file
with open(DRILLDOWN_FILE, "w", encoding="utf-8") as f:
json.dump(drilldown_data, f)
logger.info(f"[Cache] Drilldown saved successfully, file exists: {DRILLDOWN_FILE.exists()}")
return JSONResponse(content={
"success": True,
"message": f"Cached drilldown data with {len(drilldown_data)} skills"

View File

@@ -19,7 +19,9 @@ app = FastAPI()
origins = [
"http://localhost:3000",
"http://localhost:3001",
"http://127.0.0.1:3000",
"http://127.0.0.1:3001",
]
app.add_middleware(

View File

@@ -20,6 +20,7 @@
"metrics": [
"aht_distribution",
"talk_hold_acw_p50_by_skill",
"metrics_by_skill",
"fcr_rate",
"escalation_rate",
"abandonment_rate",

View File

@@ -99,6 +99,15 @@ class EconomyCostMetrics:
+ df["wrap_up_time"].fillna(0)
) # segundos
# Filtrar por record_status para cálculos de AHT/CPI
# Solo incluir registros VALID (excluir NOISE, ZOMBIE, ABANDON)
if "record_status" in df.columns:
df["record_status"] = df["record_status"].astype(str).str.strip().str.upper()
df["_is_valid_for_cost"] = df["record_status"] == "VALID"
else:
# Legacy data sin record_status: incluir todo
df["_is_valid_for_cost"] = True
self.df = df
@property
@@ -115,12 +124,19 @@ class EconomyCostMetrics:
"""
CPI (Coste Por Interacción) por skill/canal.
CPI = Labor_cost_per_interaction + Overhead_variable
CPI = (Labor_cost_per_interaction + Overhead_variable) / EFFECTIVE_PRODUCTIVITY
- Labor_cost_per_interaction = (labor_cost_per_hour * AHT_hours)
- Overhead_variable = overhead_rate * Labor_cost_per_interaction
- EFFECTIVE_PRODUCTIVITY = 0.70 (70% - accounts for non-productive time)
Excluye registros abandonados del cálculo de costes para consistencia
con el path del frontend (fresh CSV).
Si no hay config de costes -> devuelve DataFrame vacío.
Incluye queue_skill y channel como columnas (no solo índice) para que
el frontend pueda hacer lookup por nombre de skill.
"""
if not self._has_cost_config():
return pd.DataFrame()
@@ -132,8 +148,22 @@ class EconomyCostMetrics:
if df.empty:
return pd.DataFrame()
# AHT por skill/canal (en segundos)
grouped = df.groupby(["queue_skill", "channel"])["handle_time"].mean()
# Filter out abandonments for cost calculation (consistency with frontend)
if "is_abandoned" in df.columns:
df_cost = df[df["is_abandoned"] != True]
else:
df_cost = df
# Filtrar por record_status: solo VALID para cálculo de AHT
# Excluye NOISE, ZOMBIE, ABANDON
if "_is_valid_for_cost" in df_cost.columns:
df_cost = df_cost[df_cost["_is_valid_for_cost"] == True]
if df_cost.empty:
return pd.DataFrame()
# AHT por skill/canal (en segundos) - solo registros VALID
grouped = df_cost.groupby(["queue_skill", "channel"])["handle_time"].mean()
if grouped.empty:
return pd.DataFrame()
@@ -141,9 +171,14 @@ class EconomyCostMetrics:
aht_sec = grouped
aht_hours = aht_sec / 3600.0
# Apply productivity factor (70% effectiveness)
# This accounts for non-productive agent time (breaks, training, etc.)
EFFECTIVE_PRODUCTIVITY = 0.70
labor_cost = cfg.labor_cost_per_hour * aht_hours
overhead = labor_cost * cfg.overhead_rate
cpi = labor_cost + overhead
raw_cpi = labor_cost + overhead
cpi = raw_cpi / EFFECTIVE_PRODUCTIVITY
out = pd.DataFrame(
{
@@ -154,7 +189,8 @@ class EconomyCostMetrics:
}
)
return out.sort_index()
# Reset index to include queue_skill and channel as columns for frontend lookup
return out.sort_index().reset_index()
# ------------------------------------------------------------------ #
# KPI 2: coste anual por skill/canal
@@ -180,7 +216,9 @@ class EconomyCostMetrics:
.rename("volume")
)
joined = cpi_table.join(volume, how="left").fillna({"volume": 0})
# Set index on cpi_table to match volume's MultiIndex for join
cpi_indexed = cpi_table.set_index(["queue_skill", "channel"])
joined = cpi_indexed.join(volume, how="left").fillna({"volume": 0})
joined["annual_cost"] = (joined["cpi_total"] * joined["volume"]).round(2)
return joined
@@ -216,7 +254,9 @@ class EconomyCostMetrics:
.rename("volume")
)
joined = cpi_table.join(volume, how="left").fillna({"volume": 0})
# Set index on cpi_table to match volume's MultiIndex for join
cpi_indexed = cpi_table.set_index(["queue_skill", "channel"])
joined = cpi_indexed.join(volume, how="left").fillna({"volume": 0})
# Costes anuales de labor y overhead
annual_labor = (joined["labor_cost"] * joined["volume"]).sum()
@@ -252,7 +292,7 @@ class EconomyCostMetrics:
- Ineff_seconds = Delta * volume * 0.4
- Ineff_cost = LaborCPI_per_second * Ineff_seconds
⚠️ Es un modelo aproximado para cuantificar "orden de magnitud".
NOTA: Es un modelo aproximado para cuantificar "orden de magnitud".
"""
if not self._has_cost_config():
return pd.DataFrame()
@@ -261,6 +301,12 @@ class EconomyCostMetrics:
assert cfg is not None
df = self.df.copy()
# Filtrar por record_status: solo VALID para cálculo de AHT
# Excluye NOISE, ZOMBIE, ABANDON
if "_is_valid_for_cost" in df.columns:
df = df[df["_is_valid_for_cost"] == True]
grouped = df.groupby(["queue_skill", "channel"])
stats = grouped["handle_time"].agg(
@@ -273,10 +319,14 @@ class EconomyCostMetrics:
return pd.DataFrame()
# CPI para obtener coste/segundo de labor
cpi_table = self.cpi_by_skill_channel()
if cpi_table.empty:
# cpi_by_skill_channel now returns with reset_index, so we need to set index for join
cpi_table_raw = self.cpi_by_skill_channel()
if cpi_table_raw.empty:
return pd.DataFrame()
# Set queue_skill+channel as index for the join
cpi_table = cpi_table_raw.set_index(["queue_skill", "channel"])
merged = stats.join(cpi_table[["labor_cost"]], how="left")
merged = merged.fillna(0.0)
@@ -297,7 +347,8 @@ class EconomyCostMetrics:
merged["ineff_seconds"] = ineff_seconds.round(2)
merged["ineff_cost"] = ineff_cost
return merged[["aht_p50", "aht_p90", "volume", "ineff_seconds", "ineff_cost"]]
# Reset index to include queue_skill and channel as columns for frontend lookup
return merged[["aht_p50", "aht_p90", "volume", "ineff_seconds", "ineff_cost"]].reset_index()
# ------------------------------------------------------------------ #
# KPI 5: ahorro potencial anual por automatización
@@ -419,7 +470,9 @@ class EconomyCostMetrics:
.rename("volume")
)
joined = cpi_table.join(volume, how="left").fillna({"volume": 0})
# Set index on cpi_table to match volume's MultiIndex for join
cpi_indexed = cpi_table.set_index(["queue_skill", "channel"])
joined = cpi_indexed.join(volume, how="left").fillna({"volume": 0})
# CPI medio ponderado por canal
per_channel = (

View File

@@ -1,7 +1,7 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, List
from typing import Any, Dict, List
import numpy as np
import pandas as pd
@@ -87,14 +87,26 @@ class OperationalPerformanceMetrics:
)
# v3.0: Filtrar NOISE y ZOMBIE para cálculos de variabilidad
# record_status: 'valid', 'noise', 'zombie', 'abandon'
# Para AHT/CV solo usamos 'valid' (o sin status = legacy data)
# record_status: 'VALID', 'NOISE', 'ZOMBIE', 'ABANDON'
# Para AHT/CV solo usamos 'VALID' (excluye noise, zombie, abandon)
if "record_status" in df.columns:
df["record_status"] = df["record_status"].astype(str).str.strip().str.upper()
# Crear máscara para registros válidos (para cálculos de CV/variabilidad)
df["_is_valid_for_cv"] = df["record_status"].isin(["VALID", "NAN", ""]) | df["record_status"].isna()
# Crear máscara para registros válidos: SOLO "VALID"
# Excluye explícitamente NOISE, ZOMBIE, ABANDON y cualquier otro valor
df["_is_valid_for_cv"] = df["record_status"] == "VALID"
# Log record_status breakdown for debugging
status_counts = df["record_status"].value_counts()
valid_count = int(df["_is_valid_for_cv"].sum())
print(f"[OperationalPerformance] Record status breakdown:")
print(f" Total rows: {len(df)}")
for status, count in status_counts.items():
print(f" - {status}: {count}")
print(f" VALID rows for AHT calculation: {valid_count}")
else:
# Legacy data sin record_status: incluir todo
df["_is_valid_for_cv"] = True
print(f"[OperationalPerformance] No record_status column - using all {len(df)} rows")
# Normalización básica
df["queue_skill"] = df["queue_skill"].astype(str).str.strip()
@@ -156,6 +168,9 @@ class OperationalPerformanceMetrics:
def talk_hold_acw_p50_by_skill(self) -> pd.DataFrame:
"""
P50 de talk_time, hold_time y wrap_up_time por skill.
Incluye queue_skill como columna (no solo índice) para que
el frontend pueda hacer lookup por nombre de skill.
"""
df = self.df
@@ -173,7 +188,8 @@ class OperationalPerformanceMetrics:
"acw_p50": grouped["wrap_up_time"].apply(lambda s: perc(s, 50)),
}
)
return result.round(2).sort_index()
# Reset index to include queue_skill as column for frontend lookup
return result.round(2).sort_index().reset_index()
# ------------------------------------------------------------------ #
# FCR, escalación, abandono, reincidencia, repetición canal
@@ -290,13 +306,17 @@ class OperationalPerformanceMetrics:
def recurrence_rate_7d(self) -> float:
"""
% de clientes que vuelven a contactar en < 7 días.
% de clientes que vuelven a contactar en < 7 días para el MISMO skill.
Se basa en customer_id (o caller_id si no hay customer_id).
Se basa en customer_id (o caller_id si no hay customer_id) + queue_skill.
Calcula:
- Para cada cliente, ordena por datetime_start
- Si hay dos contactos consecutivos separados < 7 días, cuenta como "recurrente"
- Para cada combinación cliente + skill, ordena por datetime_start
- Si hay dos contactos consecutivos separados < 7 días (mismo cliente, mismo skill),
cuenta como "recurrente"
- Tasa = nº clientes recurrentes / nº total de clientes
NOTA: Solo cuenta como recurrencia si el cliente llama por el MISMO skill.
Un cliente que llama a "Ventas" y luego a "Soporte" NO es recurrente.
"""
df = self.df.dropna(subset=["datetime_start"]).copy()
@@ -313,16 +333,17 @@ class OperationalPerformanceMetrics:
if df.empty:
return float("nan")
# Ordenar por cliente + fecha
df = df.sort_values(["customer_id", "datetime_start"])
# Ordenar por cliente + skill + fecha
df = df.sort_values(["customer_id", "queue_skill", "datetime_start"])
# Diferencia de tiempo entre contactos consecutivos por cliente
df["delta"] = df.groupby("customer_id")["datetime_start"].diff()
# Diferencia de tiempo entre contactos consecutivos por cliente Y skill
# Esto asegura que solo contamos recontactos del mismo cliente para el mismo skill
df["delta"] = df.groupby(["customer_id", "queue_skill"])["datetime_start"].diff()
# Marcamos los contactos que ocurren a menos de 7 días del anterior
# Marcamos los contactos que ocurren a menos de 7 días del anterior (mismo skill)
recurrence_mask = df["delta"] < pd.Timedelta(days=7)
# Nº de clientes que tienen al menos un contacto recurrente
# Nº de clientes que tienen al menos un contacto recurrente (para cualquier skill)
recurrent_customers = df.loc[recurrence_mask, "customer_id"].nunique()
total_customers = df["customer_id"].nunique()
@@ -568,3 +589,128 @@ class OperationalPerformanceMetrics:
ax.grid(axis="y", alpha=0.3)
return ax
# ------------------------------------------------------------------ #
# Métricas por skill (para consistencia frontend cached/fresh)
# ------------------------------------------------------------------ #
def metrics_by_skill(self) -> List[Dict[str, Any]]:
"""
Calcula métricas operacionales por skill:
- transfer_rate: % de interacciones con transfer_flag == True
- abandonment_rate: % de interacciones abandonadas
- fcr_tecnico: 100 - transfer_rate (sin transferencia)
- fcr_real: % sin transferencia Y sin recontacto 7d (si hay datos)
- volume: número de interacciones
Devuelve una lista de dicts, uno por skill, para que el frontend
tenga acceso a las métricas reales por skill (no estimadas).
"""
df = self.df
if df.empty:
return []
results = []
# Detectar columna de abandono
abandon_col = None
for col_name in ["is_abandoned", "abandoned_flag", "abandoned"]:
if col_name in df.columns:
abandon_col = col_name
break
# Detectar columna de repeat_call_7d para FCR real
repeat_col = None
for col_name in ["repeat_call_7d", "repeat_7d", "is_repeat_7d"]:
if col_name in df.columns:
repeat_col = col_name
break
for skill, group in df.groupby("queue_skill"):
total = len(group)
if total == 0:
continue
# Transfer rate
if "transfer_flag" in group.columns:
transfer_count = group["transfer_flag"].sum()
transfer_rate = float(round(transfer_count / total * 100, 2))
else:
transfer_rate = 0.0
# FCR Técnico = 100 - transfer_rate
fcr_tecnico = float(round(100.0 - transfer_rate, 2))
# Abandonment rate
abandonment_rate = 0.0
if abandon_col:
col = group[abandon_col]
if col.dtype == "O":
abandon_mask = (
col.astype(str)
.str.strip()
.str.lower()
.isin(["true", "t", "1", "yes", "y", "si", ""])
)
else:
abandon_mask = pd.to_numeric(col, errors="coerce").fillna(0) > 0
abandoned = int(abandon_mask.sum())
abandonment_rate = float(round(abandoned / total * 100, 2))
# FCR Real (sin transferencia Y sin recontacto 7d)
fcr_real = fcr_tecnico # default to fcr_tecnico if no repeat data
if repeat_col and "transfer_flag" in group.columns:
repeat_data = group[repeat_col]
if repeat_data.dtype == "O":
repeat_mask = (
repeat_data.astype(str)
.str.strip()
.str.lower()
.isin(["true", "t", "1", "yes", "y", "si", ""])
)
else:
repeat_mask = pd.to_numeric(repeat_data, errors="coerce").fillna(0) > 0
# FCR Real: no transfer AND no repeat
fcr_real_mask = (~group["transfer_flag"]) & (~repeat_mask)
fcr_real_count = fcr_real_mask.sum()
fcr_real = float(round(fcr_real_count / total * 100, 2))
# AHT Mean (promedio de handle_time sobre registros válidos)
# Filtramos solo registros 'valid' (excluye noise/zombie) para consistencia
if "_is_valid_for_cv" in group.columns:
valid_records = group[group["_is_valid_for_cv"]]
else:
valid_records = group
if len(valid_records) > 0 and "handle_time" in valid_records.columns:
aht_mean = float(round(valid_records["handle_time"].mean(), 2))
else:
aht_mean = 0.0
# AHT Total (promedio de handle_time sobre TODOS los registros)
# Incluye NOISE, ZOMBIE, ABANDON - solo para información/comparación
if len(group) > 0 and "handle_time" in group.columns:
aht_total = float(round(group["handle_time"].mean(), 2))
else:
aht_total = 0.0
# Hold Time Mean (promedio de hold_time sobre registros válidos)
# Consistente con fresh path que usa MEAN, no P50
if len(valid_records) > 0 and "hold_time" in valid_records.columns:
hold_time_mean = float(round(valid_records["hold_time"].mean(), 2))
else:
hold_time_mean = 0.0
results.append({
"skill": str(skill),
"volume": int(total),
"transfer_rate": transfer_rate,
"abandonment_rate": abandonment_rate,
"fcr_tecnico": fcr_tecnico,
"fcr_real": fcr_real,
"aht_mean": aht_mean,
"aht_total": aht_total,
"hold_time_mean": hold_time_mean,
})
return results