Dashboard Features: - 8 navigation sections: Overview, Outcomes, Poor CX, FCR, Churn, Agent, Call Explorer, Export - Beyond Brand Identity styling (colors #6D84E3, Outfit font) - RCA Sankey diagram (Driver → Outcome → Churn Risk flow) - Correlation heatmaps (driver co-occurrence, driver-outcome) - Outcome Deep Dive (root causes, correlation, duration analysis) - Export functionality (Excel, HTML, JSON) Blueprint Compliance: - FCR: 4 categories (Primera Llamada/Rellamada × Sin/Con Riesgo de Fuga) - Churn: Binary view (Sin Riesgo de Fuga / En Riesgo de Fuga) - Agent: Talento Para Replicar / Oportunidades de Mejora - Fixed FCR rate calculation (only FIRST_CALL counts as success) Technical: - Streamlit + Plotly for interactive visualizations - Light theme configuration (.streamlit/config.toml) - Fixed Plotly colorbar titlefont deprecation Documentation: - Updated PROJECT_CONTEXT.md, TODO.md, CHANGELOG.md - Added 4 new technical decisions (TD-014 to TD-017) - Created TROUBLESHOOTING.md with 10 common issues Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
236 lines
7.4 KiB
Python
236 lines
7.4 KiB
Python
"""
|
|
CXInsights Dashboard - Data Loader
|
|
Handles loading and processing of batch analysis data.
|
|
"""
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
import streamlit as st
|
|
|
|
|
|
@st.cache_data(ttl=60)
|
|
def get_available_batches(data_dir: Path) -> list[str]:
|
|
"""
|
|
Get list of available batch IDs.
|
|
|
|
Args:
|
|
data_dir: Path to data/output directory
|
|
|
|
Returns:
|
|
List of batch IDs sorted by modification time (newest last)
|
|
"""
|
|
if not data_dir.exists():
|
|
return []
|
|
|
|
batches = []
|
|
for item in data_dir.iterdir():
|
|
if item.is_dir() and not item.name.startswith("."):
|
|
# Check if it has a summary.json (valid batch)
|
|
summary_path = item / "exports" / "summary.json"
|
|
if summary_path.exists():
|
|
batches.append(item.name)
|
|
|
|
# Sort by modification time (newest last for selectbox default)
|
|
batches.sort(key=lambda x: (data_dir / x).stat().st_mtime)
|
|
return batches
|
|
|
|
|
|
@st.cache_data(ttl=60)
|
|
def load_batch_data(batch_path: Path) -> Optional[dict]:
|
|
"""
|
|
Load all data for a batch.
|
|
|
|
Args:
|
|
batch_path: Path to batch directory
|
|
|
|
Returns:
|
|
Dictionary with summary and analyses, or None if failed
|
|
"""
|
|
try:
|
|
# Load summary
|
|
summary_path = batch_path / "exports" / "summary.json"
|
|
if not summary_path.exists():
|
|
return None
|
|
|
|
with open(summary_path, "r", encoding="utf-8") as f:
|
|
summary = json.load(f)
|
|
|
|
# Load individual analyses
|
|
analyses = []
|
|
analyses_dir = batch_path / "analyses"
|
|
|
|
# Handle nested batch_id directory structure
|
|
if analyses_dir.exists():
|
|
for subdir in analyses_dir.iterdir():
|
|
if subdir.is_dir():
|
|
for json_file in subdir.glob("*.json"):
|
|
try:
|
|
with open(json_file, "r", encoding="utf-8") as f:
|
|
analysis = json.load(f)
|
|
analyses.append(analysis)
|
|
except Exception:
|
|
continue
|
|
|
|
# Also check for flat structure
|
|
if not analyses and analyses_dir.exists():
|
|
for json_file in analyses_dir.glob("*.json"):
|
|
try:
|
|
with open(json_file, "r", encoding="utf-8") as f:
|
|
analysis = json.load(f)
|
|
analyses.append(analysis)
|
|
except Exception:
|
|
continue
|
|
|
|
return {
|
|
"summary": summary,
|
|
"analyses": analyses,
|
|
"batch_id": summary.get("batch_id", batch_path.name),
|
|
}
|
|
|
|
except Exception as e:
|
|
st.error(f"Error loading batch data: {e}")
|
|
return None
|
|
|
|
|
|
def load_transcript(batch_path: Path, call_id: str) -> Optional[dict]:
|
|
"""
|
|
Load transcript for a specific call.
|
|
|
|
Args:
|
|
batch_path: Path to batch directory
|
|
call_id: Call ID to load
|
|
|
|
Returns:
|
|
Transcript dictionary or None
|
|
"""
|
|
try:
|
|
transcript_path = batch_path / "transcripts" / f"{call_id}.json"
|
|
if transcript_path.exists():
|
|
with open(transcript_path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def aggregate_drivers(analyses: list[dict], driver_type: str) -> dict:
|
|
"""
|
|
Aggregate drivers across all analyses.
|
|
|
|
Args:
|
|
analyses: List of analysis dictionaries
|
|
driver_type: One of 'poor_cx_drivers', 'lost_sales_drivers',
|
|
'fcr_failure_drivers', 'churn_risk_drivers'
|
|
|
|
Returns:
|
|
Dictionary with driver_code -> {count, calls, avg_confidence, instances}
|
|
"""
|
|
drivers = {}
|
|
|
|
for analysis in analyses:
|
|
call_id = analysis.get("call_id", "unknown")
|
|
driver_list = analysis.get(driver_type, [])
|
|
|
|
for driver in driver_list:
|
|
code = driver.get("driver_code", "UNKNOWN")
|
|
|
|
if code not in drivers:
|
|
drivers[code] = {
|
|
"count": 0,
|
|
"calls": set(),
|
|
"total_confidence": 0,
|
|
"instances": [],
|
|
}
|
|
|
|
drivers[code]["count"] += 1
|
|
drivers[code]["calls"].add(call_id)
|
|
drivers[code]["total_confidence"] += driver.get("confidence", 0)
|
|
drivers[code]["instances"].append({
|
|
"call_id": call_id,
|
|
**driver,
|
|
})
|
|
|
|
# Calculate averages and convert sets to counts
|
|
result = {}
|
|
for code, data in drivers.items():
|
|
result[code] = {
|
|
"count": data["count"],
|
|
"call_count": len(data["calls"]),
|
|
"avg_confidence": data["total_confidence"] / data["count"] if data["count"] > 0 else 0,
|
|
"instances": data["instances"],
|
|
}
|
|
|
|
return result
|
|
|
|
|
|
def get_fcr_distribution(analyses: list[dict]) -> dict:
|
|
"""Get FCR status distribution."""
|
|
distribution = {}
|
|
for analysis in analyses:
|
|
status = analysis.get("fcr_status", "UNKNOWN")
|
|
distribution[status] = distribution.get(status, 0) + 1
|
|
return distribution
|
|
|
|
|
|
def get_churn_distribution(analyses: list[dict]) -> dict:
|
|
"""Get churn risk distribution."""
|
|
distribution = {}
|
|
for analysis in analyses:
|
|
risk = analysis.get("churn_risk", "UNKNOWN")
|
|
distribution[risk] = distribution.get(risk, 0) + 1
|
|
return distribution
|
|
|
|
|
|
def get_agent_classification_distribution(analyses: list[dict]) -> dict:
|
|
"""Get agent classification distribution."""
|
|
distribution = {}
|
|
for analysis in analyses:
|
|
classification = analysis.get("agent_classification", "UNKNOWN")
|
|
distribution[classification] = distribution.get(classification, 0) + 1
|
|
return distribution
|
|
|
|
|
|
def calculate_kpis(summary: dict, analyses: list[dict]) -> dict:
|
|
"""
|
|
Calculate KPIs for the dashboard.
|
|
|
|
Returns:
|
|
Dictionary with KPI values
|
|
"""
|
|
total = summary.get("summary", {}).get("total_calls", 0)
|
|
successful = summary.get("summary", {}).get("successful_analyses", 0)
|
|
|
|
# Poor CX rate
|
|
calls_with_poor_cx = sum(
|
|
1 for a in analyses
|
|
if len(a.get("poor_cx_drivers", [])) > 0
|
|
)
|
|
poor_cx_rate = (calls_with_poor_cx / total * 100) if total > 0 else 0
|
|
|
|
# FCR rate - Per blueprint: Primera Llamada = FCR success
|
|
fcr_dist = get_fcr_distribution(analyses)
|
|
fcr_success = fcr_dist.get("FIRST_CALL", 0) # Only FIRST_CALL counts as FCR success
|
|
fcr_rate = (fcr_success / total * 100) if total > 0 else 0
|
|
|
|
# Churn risk
|
|
churn_dist = get_churn_distribution(analyses)
|
|
high_risk = churn_dist.get("HIGH", 0) + churn_dist.get("AT_RISK", 0)
|
|
churn_risk_rate = (high_risk / total * 100) if total > 0 else 0
|
|
|
|
# Agent performance
|
|
agent_dist = get_agent_classification_distribution(analyses)
|
|
needs_improvement = agent_dist.get("NEEDS_IMPROVEMENT", 0) + agent_dist.get("POOR", 0)
|
|
improvement_rate = (needs_improvement / total * 100) if total > 0 else 0
|
|
|
|
return {
|
|
"total_calls": total,
|
|
"success_rate": (successful / total * 100) if total > 0 else 0,
|
|
"poor_cx_rate": poor_cx_rate,
|
|
"fcr_rate": fcr_rate,
|
|
"churn_risk_rate": churn_risk_rate,
|
|
"improvement_rate": improvement_rate,
|
|
"total_poor_cx_drivers": summary.get("poor_cx", {}).get("total_drivers_found", 0),
|
|
"total_lost_sales_drivers": summary.get("lost_sales", {}).get("total_drivers_found", 0),
|
|
}
|