""" CXInsights Dashboard - Data Loader Handles loading and processing of batch analysis data. """ import json from pathlib import Path from typing import Optional import streamlit as st @st.cache_data(ttl=60) def get_available_batches(data_dir: Path) -> list[str]: """ Get list of available batch IDs. Args: data_dir: Path to data/output directory Returns: List of batch IDs sorted by modification time (newest last) """ if not data_dir.exists(): return [] batches = [] for item in data_dir.iterdir(): if item.is_dir() and not item.name.startswith("."): # Check if it has a summary.json (valid batch) summary_path = item / "exports" / "summary.json" if summary_path.exists(): batches.append(item.name) # Sort by modification time (newest last for selectbox default) batches.sort(key=lambda x: (data_dir / x).stat().st_mtime) return batches @st.cache_data(ttl=60) def load_batch_data(batch_path: Path) -> Optional[dict]: """ Load all data for a batch. Args: batch_path: Path to batch directory Returns: Dictionary with summary and analyses, or None if failed """ try: # Load summary summary_path = batch_path / "exports" / "summary.json" if not summary_path.exists(): return None with open(summary_path, "r", encoding="utf-8") as f: summary = json.load(f) # Load individual analyses analyses = [] analyses_dir = batch_path / "analyses" # Handle nested batch_id directory structure if analyses_dir.exists(): for subdir in analyses_dir.iterdir(): if subdir.is_dir(): for json_file in subdir.glob("*.json"): try: with open(json_file, "r", encoding="utf-8") as f: analysis = json.load(f) analyses.append(analysis) except Exception: continue # Also check for flat structure if not analyses and analyses_dir.exists(): for json_file in analyses_dir.glob("*.json"): try: with open(json_file, "r", encoding="utf-8") as f: analysis = json.load(f) analyses.append(analysis) except Exception: continue return { "summary": summary, "analyses": analyses, "batch_id": summary.get("batch_id", batch_path.name), } except Exception as e: st.error(f"Error loading batch data: {e}") return None def load_transcript(batch_path: Path, call_id: str) -> Optional[dict]: """ Load transcript for a specific call. Args: batch_path: Path to batch directory call_id: Call ID to load Returns: Transcript dictionary or None """ try: transcript_path = batch_path / "transcripts" / f"{call_id}.json" if transcript_path.exists(): with open(transcript_path, "r", encoding="utf-8") as f: return json.load(f) return None except Exception: return None def aggregate_drivers(analyses: list[dict], driver_type: str) -> dict: """ Aggregate drivers across all analyses. Args: analyses: List of analysis dictionaries driver_type: One of 'poor_cx_drivers', 'lost_sales_drivers', 'fcr_failure_drivers', 'churn_risk_drivers' Returns: Dictionary with driver_code -> {count, calls, avg_confidence, instances} """ drivers = {} for analysis in analyses: call_id = analysis.get("call_id", "unknown") driver_list = analysis.get(driver_type, []) for driver in driver_list: code = driver.get("driver_code", "UNKNOWN") if code not in drivers: drivers[code] = { "count": 0, "calls": set(), "total_confidence": 0, "instances": [], } drivers[code]["count"] += 1 drivers[code]["calls"].add(call_id) drivers[code]["total_confidence"] += driver.get("confidence", 0) drivers[code]["instances"].append({ "call_id": call_id, **driver, }) # Calculate averages and convert sets to counts result = {} for code, data in drivers.items(): result[code] = { "count": data["count"], "call_count": len(data["calls"]), "avg_confidence": data["total_confidence"] / data["count"] if data["count"] > 0 else 0, "instances": data["instances"], } return result def get_fcr_distribution(analyses: list[dict]) -> dict: """Get FCR status distribution.""" distribution = {} for analysis in analyses: status = analysis.get("fcr_status", "UNKNOWN") distribution[status] = distribution.get(status, 0) + 1 return distribution def get_churn_distribution(analyses: list[dict]) -> dict: """Get churn risk distribution.""" distribution = {} for analysis in analyses: risk = analysis.get("churn_risk", "UNKNOWN") distribution[risk] = distribution.get(risk, 0) + 1 return distribution def get_agent_classification_distribution(analyses: list[dict]) -> dict: """Get agent classification distribution.""" distribution = {} for analysis in analyses: classification = analysis.get("agent_classification", "UNKNOWN") distribution[classification] = distribution.get(classification, 0) + 1 return distribution def calculate_kpis(summary: dict, analyses: list[dict]) -> dict: """ Calculate KPIs for the dashboard. Returns: Dictionary with KPI values """ total = summary.get("summary", {}).get("total_calls", 0) successful = summary.get("summary", {}).get("successful_analyses", 0) # Poor CX rate calls_with_poor_cx = sum( 1 for a in analyses if len(a.get("poor_cx_drivers", [])) > 0 ) poor_cx_rate = (calls_with_poor_cx / total * 100) if total > 0 else 0 # FCR rate - Per blueprint: Primera Llamada = FCR success fcr_dist = get_fcr_distribution(analyses) fcr_success = fcr_dist.get("FIRST_CALL", 0) # Only FIRST_CALL counts as FCR success fcr_rate = (fcr_success / total * 100) if total > 0 else 0 # Churn risk churn_dist = get_churn_distribution(analyses) high_risk = churn_dist.get("HIGH", 0) + churn_dist.get("AT_RISK", 0) churn_risk_rate = (high_risk / total * 100) if total > 0 else 0 # Agent performance agent_dist = get_agent_classification_distribution(analyses) needs_improvement = agent_dist.get("NEEDS_IMPROVEMENT", 0) + agent_dist.get("POOR", 0) improvement_rate = (needs_improvement / total * 100) if total > 0 else 0 return { "total_calls": total, "success_rate": (successful / total * 100) if total > 0 else 0, "poor_cx_rate": poor_cx_rate, "fcr_rate": fcr_rate, "churn_risk_rate": churn_risk_rate, "improvement_rate": improvement_rate, "total_poor_cx_drivers": summary.get("poor_cx", {}).get("total_drivers_found", 0), "total_lost_sales_drivers": summary.get("lost_sales", {}).get("total_drivers_found", 0), }