""" CXInsights Dashboard - UI Components Visualization components following Beyond Brand Guidelines. """ import streamlit as st import plotly.express as px import plotly.graph_objects as go import pandas as pd import numpy as np from collections import defaultdict from config import COLORS, CHART_COLORS, get_plotly_layout, format_evidence_quote from data_loader import ( aggregate_drivers, get_fcr_distribution, get_churn_distribution, get_agent_classification_distribution, calculate_kpis, ) # ============================================================================= # KPI CARDS # ============================================================================= def render_kpi_cards(summary: dict, analyses: list[dict]): """Render KPI cards in Beyond style.""" kpis = calculate_kpis(summary, analyses) col1, col2, col3, col4 = st.columns(4) with col1: st.metric( label="Total Calls Analyzed", value=f"{kpis['total_calls']:,}", delta=f"{kpis['success_rate']:.0f}% success rate", ) with col2: st.metric( label="Poor CX Detected", value=f"{kpis['poor_cx_rate']:.1f}%", delta=f"{kpis['total_poor_cx_drivers']} drivers found", delta_color="inverse", ) with col3: st.metric( label="FCR Rate", value=f"{kpis['fcr_rate']:.1f}%", delta="First call resolution", ) with col4: st.metric( label="Churn Risk", value=f"{kpis['churn_risk_rate']:.1f}%", delta="At risk customers", delta_color="inverse", ) # ============================================================================= # OUTCOME CHARTS # ============================================================================= def render_outcome_chart(summary: dict, height: int = 350): """ Render outcome distribution as horizontal bar chart. McKinsey style: conclusions in title, values on bars. """ outcomes = summary.get("outcomes", {}) if not outcomes: st.info("No outcome data available.") return # Prepare data - sort by count descending df = pd.DataFrame([ {"Outcome": k, "Count": v} for k, v in sorted(outcomes.items(), key=lambda x: -x[1]) ]) total = df["Count"].sum() df["Percentage"] = (df["Count"] / total * 100).round(1) # Determine dominant outcome for title top_outcome = df.iloc[0]["Outcome"] if len(df) > 0 else "N/A" top_pct = df.iloc[0]["Percentage"] if len(df) > 0 else 0 # Create horizontal bar chart (Beyond style) fig = go.Figure() fig.add_trace(go.Bar( y=df["Outcome"], x=df["Count"], orientation="h", marker_color=COLORS["blue"], text=[f"{c:,} ({p}%)" for c, p in zip(df["Count"], df["Percentage"])], textposition="outside", textfont={"size": 14, "color": COLORS["black"]}, )) layout = get_plotly_layout( title=f"{top_outcome} represents {top_pct:.0f}% of call outcomes", height=height, ) layout["xaxis"]["title"] = "Number of Calls" layout["yaxis"]["title"] = "" layout["yaxis"]["categoryorder"] = "total ascending" layout["bargap"] = 0.3 fig.update_layout(**layout) st.plotly_chart(fig, use_container_width=True) st.caption(f"Source: CXInsights Analysis - Batch {summary.get('batch_id', 'N/A')}") # ============================================================================= # DRIVER ANALYSIS # ============================================================================= def render_driver_analysis(summary: dict, driver_type: str = "poor_cx", limit: int = 5): """ Render driver analysis with horizontal bars. driver_type: 'poor_cx' or 'lost_sales' """ data = summary.get(driver_type, {}) drivers = data.get("top_drivers", []) if not drivers: st.info(f"No {driver_type.replace('_', ' ')} drivers detected.") return # Limit drivers if specified if limit: drivers = drivers[:limit] df = pd.DataFrame(drivers) # Calculate call percentage total_calls = summary.get("summary", {}).get("total_calls", 1) df["Call %"] = (df["occurrences"] / total_calls * 100).round(1) # Create horizontal bar chart fig = go.Figure() fig.add_trace(go.Bar( y=df["driver_code"], x=df["occurrences"], orientation="h", marker_color=COLORS["blue"], text=[f"{o} calls ({p}%)" for o, p in zip(df["occurrences"], df["Call %"])], textposition="outside", textfont={"size": 12, "color": COLORS["black"]}, )) # Title with insight top_driver = df.iloc[0]["driver_code"] if len(df) > 0 else "N/A" top_pct = df.iloc[0]["Call %"] if len(df) > 0 else 0 layout = get_plotly_layout( title=f"{top_driver} detected in {top_pct:.0f}% of calls", height=max(250, len(drivers) * 50), ) layout["yaxis"]["categoryorder"] = "total ascending" layout["bargap"] = 0.3 layout["xaxis"]["title"] = "Occurrences" fig.update_layout(**layout) st.plotly_chart(fig, use_container_width=True) # Confidence indicator if len(df) > 0: avg_conf = df["avg_confidence"].mean() st.caption(f"Average confidence: {avg_conf:.0%} | Source: LLM Analysis") def render_driver_detail(analyses: list[dict], driver_type: str = "poor_cx_drivers"): """Render detailed driver analysis with evidence.""" drivers = aggregate_drivers(analyses, driver_type) if not drivers: st.info("No drivers found.") return # Sort by count sorted_drivers = sorted(drivers.items(), key=lambda x: -x[1]["count"]) for code, data in sorted_drivers: with st.expander( f"**{code}** — {data['count']} instances in {data['call_count']} calls " f"(Avg. confidence: {data['avg_confidence']:.0%})" ): for instance in data["instances"][:5]: # Show top 5 st.markdown(f"**Call:** `{instance['call_id']}`") # Reasoning if instance.get("reasoning"): st.markdown(f"**Why:** {instance['reasoning']}") # Origin if instance.get("origin"): st.markdown(f"**Origin:** {instance['origin']}") # Corrective action if instance.get("corrective_action"): st.success(f"**Action:** {instance['corrective_action']}") # Evidence evidence = instance.get("evidence_spans", []) if evidence: for e in evidence: st.markdown( format_evidence_quote( e.get("text", ""), e.get("speaker", "unknown"), ), unsafe_allow_html=True, ) st.markdown("---") # ============================================================================= # FCR ANALYSIS # ============================================================================= def render_fcr_analysis(analyses: list[dict], compact: bool = True): """ Render FCR (First Call Resolution) analysis following blueprint beyondCx_FCR_v1. Blueprint defines 4 categories combining FCR status + Churn risk: - Primera Llamada Sin Riesgo de Fuga - Primera Llamada Con Riesgo de Fuga - Rellamada Sin Riesgo de Fuga - Rellamada Con Riesgo de Fuga """ if not analyses: st.info("No FCR data available.") return # Calculate 4 blueprint categories categories = { "Primera Llamada\nSin Riesgo": 0, "Primera Llamada\nCon Riesgo": 0, "Rellamada\nSin Riesgo": 0, "Rellamada\nCon Riesgo": 0, "Desconocido": 0, } # Churn risk mapping: HIGH/AT_RISK = Con Riesgo, others = Sin Riesgo high_risk_values = ["HIGH", "AT_RISK"] for analysis in analyses: fcr_status = analysis.get("fcr_status", "UNKNOWN") churn_risk = analysis.get("churn_risk", "UNKNOWN") has_churn_risk = churn_risk in high_risk_values if fcr_status == "FIRST_CALL": if has_churn_risk: categories["Primera Llamada\nCon Riesgo"] += 1 else: categories["Primera Llamada\nSin Riesgo"] += 1 elif fcr_status in ["REPEAT_CALL", "CALLBACK"]: if has_churn_risk: categories["Rellamada\nCon Riesgo"] += 1 else: categories["Rellamada\nSin Riesgo"] += 1 else: categories["Desconocido"] += 1 total = sum(categories.values()) # Remove empty categories categories = {k: v for k, v in categories.items() if v > 0} if compact: # Compact donut chart labels = list(categories.keys()) values = list(categories.values()) # Color mapping per blueprint colors = [] for label in labels: if "Sin Riesgo" in label and "Primera" in label: colors.append("#81C784") # Green - Best case elif "Sin Riesgo" in label: colors.append("#FFB74D") # Orange - Repeat but no churn risk elif "Con Riesgo" in label and "Primera" in label: colors.append("#FFB74D") # Orange - First call but churn risk elif "Con Riesgo" in label: colors.append("#E57373") # Red - Worst case else: colors.append(COLORS["grey"]) fig = go.Figure(data=[go.Pie( labels=labels, values=values, hole=0.4, marker_colors=colors, textinfo="label+percent", textfont={"size": 10}, )]) # FCR rate = Primera Llamada / Total first_call_total = categories.get("Primera Llamada\nSin Riesgo", 0) + categories.get("Primera Llamada\nCon Riesgo", 0) fcr_rate = (first_call_total / total * 100) if total > 0 else 0 layout = get_plotly_layout( title=f"FCR: {fcr_rate:.0f}% Primera Llamada", height=300, ) layout["showlegend"] = False fig.update_layout(**layout) st.plotly_chart(fig, use_container_width=True) else: # Full view with matrix and details st.markdown("#### FCR Categories (Blueprint)") # Show matrix view col1, col2 = st.columns(2) with col1: # Primera Llamada metrics primera_sin = categories.get("Primera Llamada\nSin Riesgo", 0) primera_con = categories.get("Primera Llamada\nCon Riesgo", 0) primera_total = primera_sin + primera_con st.markdown( f"""

Primera Llamada

{primera_total}
Sin Riesgo: {primera_sin} | Con Riesgo: {primera_con}
""", unsafe_allow_html=True, ) with col2: # Rellamada metrics rellamada_sin = categories.get("Rellamada\nSin Riesgo", 0) rellamada_con = categories.get("Rellamada\nCon Riesgo", 0) rellamada_total = rellamada_sin + rellamada_con st.markdown( f"""

Rellamada

{rellamada_total}
Sin Riesgo: {rellamada_sin} | Con Riesgo: {rellamada_con}
""", unsafe_allow_html=True, ) # Bar chart with 4 categories labels = list(categories.keys()) values = list(categories.values()) percentages = [(v / total * 100) for v in values] colors = [] for label in labels: if "Sin Riesgo" in label and "Primera" in label: colors.append("#81C784") elif "Sin Riesgo" in label: colors.append("#FFB74D") elif "Con Riesgo" in label and "Primera" in label: colors.append("#FFB74D") elif "Con Riesgo" in label: colors.append("#E57373") else: colors.append(COLORS["grey"]) fig = go.Figure(go.Bar( x=labels, y=values, marker_color=colors, text=[f"{v} ({p:.0f}%)" for v, p in zip(values, percentages)], textposition="outside", )) fig.update_layout( title=dict( text="Distribución FCR según Blueprint", font=dict(size=14, color=COLORS["black"]), ), xaxis_title="Categoría", yaxis_title="Llamadas", height=350, margin=dict(l=10, r=10, t=50, b=10), paper_bgcolor=COLORS["white"], plot_bgcolor=COLORS["white"], ) st.plotly_chart(fig, use_container_width=True) # Key insight if rellamada_con > 0: st.error( f"**Alerta:** {rellamada_con} llamadas son rellamadas con riesgo de fuga. " f"Estas requieren atención inmediata para retención." ) if primera_con > primera_sin: st.warning( f"**Atención:** Más primeras llamadas con riesgo ({primera_con}) que sin riesgo ({primera_sin}). " f"Revisar proceso de resolución." ) # ============================================================================= # CHURN RISK ANALYSIS # ============================================================================= def render_churn_risk_analysis(analyses: list[dict], compact: bool = True): """ Render churn risk distribution following blueprint beyondCx_Close_The_Loop. Blueprint defines binary categories: - Sin Riesgo de Fuga (LOW, MEDIUM) - En Riesgo de Fuga (AT_RISK, HIGH) """ if not analyses: st.info("No churn risk data available.") return # Calculate blueprint binary categories sin_riesgo = 0 en_riesgo = 0 desconocido = 0 # Detailed breakdown detailed = {"LOW": 0, "MEDIUM": 0, "AT_RISK": 0, "HIGH": 0, "UNKNOWN": 0} for analysis in analyses: risk = analysis.get("churn_risk", "UNKNOWN") detailed[risk] = detailed.get(risk, 0) + 1 if risk in ["HIGH", "AT_RISK"]: en_riesgo += 1 elif risk in ["LOW", "MEDIUM"]: sin_riesgo += 1 else: desconocido += 1 total = sin_riesgo + en_riesgo + desconocido if compact: # Compact donut with blueprint categories categories = {} if sin_riesgo > 0: categories["Sin Riesgo\nde Fuga"] = sin_riesgo if en_riesgo > 0: categories["En Riesgo\nde Fuga"] = en_riesgo if desconocido > 0: categories["Desconocido"] = desconocido labels = list(categories.keys()) values = list(categories.values()) colors = [] for label in labels: if "Sin Riesgo" in label: colors.append("#81C784") # Green elif "En Riesgo" in label: colors.append("#E57373") # Red else: colors.append(COLORS["grey"]) fig = go.Figure(data=[go.Pie( labels=labels, values=values, hole=0.4, marker_colors=colors, textinfo="label+percent", textfont={"size": 10}, )]) en_riesgo_pct = (en_riesgo / total * 100) if total > 0 else 0 layout = get_plotly_layout( title=f"{en_riesgo_pct:.0f}% En Riesgo de Fuga", height=300, ) layout["showlegend"] = False fig.update_layout(**layout) st.plotly_chart(fig, use_container_width=True) else: # Full view with blueprint categories + detail st.markdown("#### Riesgo de Fuga (Blueprint)") col1, col2 = st.columns(2) with col1: st.markdown( f"""

Sin Riesgo de Fuga

{sin_riesgo}
LOW: {detailed['LOW']} | MEDIUM: {detailed['MEDIUM']}
""", unsafe_allow_html=True, ) with col2: st.markdown( f"""

En Riesgo de Fuga

{en_riesgo}
AT_RISK: {detailed['AT_RISK']} | HIGH: {detailed['HIGH']}
""", unsafe_allow_html=True, ) # Detailed breakdown bar chart labels = ["Sin Riesgo\nde Fuga", "En Riesgo\nde Fuga"] if desconocido > 0: labels.append("Desconocido") values = [sin_riesgo, en_riesgo] if desconocido > 0: values.append(desconocido) percentages = [(v / total * 100) for v in values] colors = ["#81C784", "#E57373"] if desconocido > 0: colors.append(COLORS["grey"]) fig = go.Figure(go.Bar( x=labels, y=values, marker_color=colors, text=[f"{v} ({p:.0f}%)" for v, p in zip(values, percentages)], textposition="outside", )) fig.update_layout( title=dict( text="Distribución Riesgo de Fuga", font=dict(size=14, color=COLORS["black"]), ), xaxis_title="Categoría", yaxis_title="Clientes", height=300, margin=dict(l=10, r=10, t=50, b=10), paper_bgcolor=COLORS["white"], plot_bgcolor=COLORS["white"], ) st.plotly_chart(fig, use_container_width=True) # Key insight if en_riesgo > sin_riesgo: st.error( f"**Alerta:** Más clientes en riesgo de fuga ({en_riesgo}) que sin riesgo ({sin_riesgo}). " f"Requiere atención inmediata del equipo de retención." ) # ============================================================================= # AGENT PERFORMANCE # ============================================================================= def render_agent_performance(analyses: list[dict]): """ Render agent performance analysis following blueprint beyondCx_Close_The_Loop. Blueprint "Desarrollar el Talento Interno" defines: - Talento Para Replicar (positive skills to replicate) - Oportunidades de Mejora (areas for improvement) """ st.markdown("### Clasificación de Agentes") distribution = get_agent_classification_distribution(analyses) if not distribution: st.info("No hay datos de clasificación de agentes.") return # Map to blueprint categories: Buen Comercial vs Necesita Mejora buen_comercial = 0 necesita_mejora = 0 for classification, count in distribution.items(): if classification in ["EXCELLENT", "GOOD", "SATISFACTORY"]: buen_comercial += count elif classification in ["NEEDS_IMPROVEMENT", "POOR"]: necesita_mejora += count total = buen_comercial + necesita_mejora # Show blueprint binary categories col1, col2 = st.columns(2) with col1: st.markdown( f"""

Buen Desempeño

{buen_comercial}
{(buen_comercial/total*100):.0f}% de las llamadas
""", unsafe_allow_html=True, ) with col2: st.markdown( f"""

Necesita Mejora

{necesita_mejora}
{(necesita_mejora/total*100):.0f}% de las llamadas
""", unsafe_allow_html=True, ) # Detailed breakdown classification_order = ["EXCELLENT", "GOOD", "SATISFACTORY", "NEEDS_IMPROVEMENT", "POOR", "UNKNOWN"] df = pd.DataFrame([ {"Classification": k, "Count": distribution.get(k, 0)} for k in classification_order if k in distribution ]) total_all = df["Count"].sum() df["Percentage"] = (df["Count"] / total_all * 100).round(1) # Color mapping color_map = { "EXCELLENT": "#81C784", "GOOD": "#81C784", "SATISFACTORY": COLORS["blue"], "NEEDS_IMPROVEMENT": "#FFB74D", "POOR": "#E57373", "UNKNOWN": COLORS["light_grey"], } fig = go.Figure() fig.add_trace(go.Bar( x=df["Classification"], y=df["Count"], marker_color=[color_map.get(c, COLORS["grey"]) for c in df["Classification"]], text=[f"{cnt} ({pct}%)" for cnt, pct in zip(df["Count"], df["Percentage"])], textposition="outside", )) layout = get_plotly_layout( title="Distribución Detallada de Clasificación", height=300, ) fig.update_layout(**layout) st.plotly_chart(fig, use_container_width=True) st.markdown("---") # Skills breakdown - Blueprint terminology col1, col2 = st.columns(2) with col1: st.markdown("### Talento Para Replicar") st.caption("Buenas prácticas identificadas para replicar en otros agentes") render_skills_list(analyses, "agent_positive_skills", positive=True) with col2: st.markdown("### Oportunidades de Mejora") st.caption("Áreas de mejora identificadas con recomendaciones de coaching") render_skills_list(analyses, "agent_improvement_areas", positive=False) def render_skills_list(analyses: list[dict], skill_key: str, positive: bool = True): """Render aggregated skills list.""" skills = {} for analysis in analyses: for skill in analysis.get(skill_key, []): code = skill.get("skill_code", "UNKNOWN") if code not in skills: skills[code] = {"count": 0, "descriptions": [], "recommendations": []} skills[code]["count"] += 1 if skill.get("description"): skills[code]["descriptions"].append(skill["description"]) rec = skill.get("coaching_recommendation") or skill.get("replicable_practice") if rec: skills[code]["recommendations"].append(rec) if not skills: st.info("No skills data available.") return # Sort by count sorted_skills = sorted(skills.items(), key=lambda x: -x[1]["count"]) for code, data in sorted_skills[:5]: icon = "✓" if positive else "!" color = COLORS["blue"] if positive else COLORS["grey"] st.markdown( f"{icon} " f"**{code}** — {data['count']} instances", unsafe_allow_html=True, ) if data["recommendations"]: # Show most common recommendation rec = data["recommendations"][0] if positive: st.caption(f"Best practice: {rec}") else: st.caption(f"Recommendation: {rec}") # ============================================================================= # CORRELATION HEATMAP # ============================================================================= def render_driver_correlation_heatmap(analyses: list[dict], driver_type: str = "poor_cx_drivers"): """ Render a correlation heatmap showing co-occurrence of drivers. Helps identify patterns like "LONG_WAIT always appears with LOW_EMPATHY". """ # Build co-occurrence matrix driver_sets_per_call = [] all_drivers = set() for analysis in analyses: drivers_in_call = set() for d in analysis.get(driver_type, []): code = d.get("driver_code", "") if code: drivers_in_call.add(code) all_drivers.add(code) if drivers_in_call: driver_sets_per_call.append(drivers_in_call) if len(all_drivers) < 2: st.info("Not enough driver variety to show correlations. Need at least 2 different drivers.") return # Sort drivers by frequency driver_counts = defaultdict(int) for driver_set in driver_sets_per_call: for d in driver_set: driver_counts[d] += 1 sorted_drivers = sorted(all_drivers, key=lambda x: -driver_counts[x]) # Limit to top 10 drivers for readability if len(sorted_drivers) > 10: sorted_drivers = sorted_drivers[:10] st.caption(f"Showing top 10 drivers by frequency") n = len(sorted_drivers) driver_index = {d: i for i, d in enumerate(sorted_drivers)} # Build co-occurrence matrix cooccurrence = np.zeros((n, n)) for driver_set in driver_sets_per_call: relevant_drivers = [d for d in driver_set if d in driver_index] for d1 in relevant_drivers: for d2 in relevant_drivers: cooccurrence[driver_index[d1]][driver_index[d2]] += 1 # Normalize to get correlation-like values (Jaccard similarity) correlation = np.zeros((n, n)) for i in range(n): for j in range(n): if i == j: correlation[i][j] = 1.0 else: # Jaccard similarity: intersection / union intersection = cooccurrence[i][j] union = cooccurrence[i][i] + cooccurrence[j][j] - intersection if union > 0: correlation[i][j] = intersection / union else: correlation[i][j] = 0 # Create DataFrame for heatmap df_corr = pd.DataFrame( correlation, index=sorted_drivers, columns=sorted_drivers ) # Create heatmap with Plotly fig = go.Figure(data=go.Heatmap( z=correlation, x=sorted_drivers, y=sorted_drivers, colorscale=[ [0, COLORS["white"]], [0.25, COLORS["light_grey"]], [0.5, COLORS["grey"]], [0.75, "#8BA3E8"], # Light blue [1, COLORS["blue"]], ], text=np.round(correlation, 2), texttemplate="%{text:.2f}", textfont={"size": 11}, hovertemplate="%{x}%{y}
Co-occurrence: %{z:.2f}", showscale=True, colorbar=dict( title=dict(text="Correlation", font=dict(size=12)), tickfont=dict(size=10), ), )) # Find strongest correlations for title max_corr = 0 max_pair = ("", "") for i in range(n): for j in range(i + 1, n): if correlation[i][j] > max_corr: max_corr = correlation[i][j] max_pair = (sorted_drivers[i], sorted_drivers[j]) title = f"Driver Co-occurrence Matrix" if max_corr > 0.3: title = f"{max_pair[0]} and {max_pair[1]} show strongest correlation ({max_corr:.0%})" fig.update_layout( title=dict( text=title, font=dict(size=16, color=COLORS["black"]), x=0, xanchor="left", ), xaxis=dict( tickangle=45, tickfont=dict(size=10), side="bottom", ), yaxis=dict( tickfont=dict(size=10), autorange="reversed", ), height=max(400, 50 * n), margin=dict(l=120, r=40, t=60, b=120), paper_bgcolor=COLORS["white"], plot_bgcolor=COLORS["white"], ) st.plotly_chart(fig, use_container_width=True) # Show insights st.markdown("#### Key Patterns Identified") # Find top correlations (excluding diagonal) correlations_list = [] for i in range(n): for j in range(i + 1, n): if correlation[i][j] > 0.2: # Threshold for significant correlation correlations_list.append({ "driver1": sorted_drivers[i], "driver2": sorted_drivers[j], "correlation": correlation[i][j], "co_occurrences": int(cooccurrence[i][j]), }) if correlations_list: # Sort by correlation correlations_list.sort(key=lambda x: -x["correlation"]) for corr in correlations_list[:5]: strength = "strong" if corr["correlation"] > 0.5 else "moderate" st.markdown( f"- **{corr['driver1']}** ↔ **{corr['driver2']}**: " f"{corr['correlation']:.0%} correlation " f"({corr['co_occurrences']} co-occurrences) — *{strength}*" ) else: st.info("No significant correlations found (threshold: 20%)") st.caption("Correlation based on Jaccard similarity of driver co-occurrence within calls.") def render_driver_outcome_heatmap(analyses: list[dict]): """ Render heatmap showing which drivers are associated with which outcomes. """ # Build driver-outcome matrix driver_outcome_counts = defaultdict(lambda: defaultdict(int)) all_drivers = set() all_outcomes = set() for analysis in analyses: outcome = analysis.get("outcome", "UNKNOWN") all_outcomes.add(outcome) for d in analysis.get("poor_cx_drivers", []): code = d.get("driver_code", "") if code: all_drivers.add(code) driver_outcome_counts[code][outcome] += 1 if not all_drivers or not all_outcomes: st.info("Not enough data to show driver-outcome relationships.") return # Sort by frequency sorted_drivers = sorted(all_drivers, key=lambda x: -sum(driver_outcome_counts[x].values()))[:10] sorted_outcomes = sorted(all_outcomes, key=lambda x: -sum( driver_outcome_counts[d][x] for d in all_drivers )) # Build matrix matrix = [] for driver in sorted_drivers: row = [driver_outcome_counts[driver][outcome] for outcome in sorted_outcomes] matrix.append(row) matrix = np.array(matrix) # Normalize by row (driver) to show distribution row_sums = matrix.sum(axis=1, keepdims=True) matrix_normalized = np.divide(matrix, row_sums, where=row_sums != 0) fig = go.Figure(data=go.Heatmap( z=matrix_normalized, x=sorted_outcomes, y=sorted_drivers, colorscale=[ [0, COLORS["white"]], [0.5, "#8BA3E8"], [1, COLORS["blue"]], ], text=matrix, # Show raw counts texttemplate="%{text}", textfont={"size": 11}, hovertemplate="%{y}%{x}
Count: %{text}
Rate: %{z:.0%}", showscale=True, colorbar=dict( title=dict(text="Rate", font=dict(size=12)), ), )) fig.update_layout( title=dict( text="Driver Distribution by Outcome", font=dict(size=16, color=COLORS["black"]), x=0, xanchor="left", ), xaxis=dict( tickangle=45, tickfont=dict(size=10), title="Outcome", ), yaxis=dict( tickfont=dict(size=10), title="Driver", ), height=max(350, 40 * len(sorted_drivers)), margin=dict(l=150, r=40, t=60, b=100), paper_bgcolor=COLORS["white"], plot_bgcolor=COLORS["white"], ) st.plotly_chart(fig, use_container_width=True) st.caption("Numbers show raw counts. Colors show percentage distribution per driver.") # ============================================================================= # CALL EXPLORER # ============================================================================= def render_call_explorer(analyses: list[dict]): """Render detailed call explorer.""" st.markdown("### Call Analysis Explorer") if not analyses: st.info("No analyses available.") return # Filters col1, col2, col3 = st.columns(3) with col1: outcomes = list(set(a.get("outcome", "UNKNOWN") for a in analyses)) selected_outcomes = st.multiselect( "Filter by Outcome", outcomes, default=outcomes, ) with col2: fcr_statuses = list(set(a.get("fcr_status", "UNKNOWN") for a in analyses)) selected_fcr = st.multiselect( "Filter by FCR Status", fcr_statuses, default=fcr_statuses, ) with col3: churn_risks = list(set(a.get("churn_risk", "UNKNOWN") for a in analyses)) selected_churn = st.multiselect( "Filter by Churn Risk", churn_risks, default=churn_risks, ) # Apply filters filtered = [ a for a in analyses if a.get("outcome", "UNKNOWN") in selected_outcomes and a.get("fcr_status", "UNKNOWN") in selected_fcr and a.get("churn_risk", "UNKNOWN") in selected_churn ] st.markdown(f"**Showing {len(filtered)} of {len(analyses)} calls**") # Summary table if filtered: df = pd.DataFrame([ { "Call ID": a["call_id"], "Outcome": a.get("outcome", "N/A"), "FCR": a.get("fcr_status", "N/A"), "Churn Risk": a.get("churn_risk", "N/A"), "Agent": a.get("agent_classification", "N/A"), "Poor CX": len(a.get("poor_cx_drivers", [])), "Duration (s)": a.get("observed", {}).get("audio_duration_sec", "N/A"), } for a in filtered ]) st.dataframe(df, use_container_width=True, hide_index=True) st.markdown("---") # Individual call detail st.markdown("### Call Detail View") call_ids = [a["call_id"] for a in filtered] if call_ids: selected_call = st.selectbox("Select call to explore", call_ids) # Find the analysis analysis = next((a for a in filtered if a["call_id"] == selected_call), None) if analysis: render_call_detail(analysis) def render_call_detail(analysis: dict): """Render detailed view of a single call analysis.""" # Header metrics col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Outcome", analysis.get("outcome", "N/A")) with col2: st.metric("FCR Status", analysis.get("fcr_status", "N/A")) with col3: st.metric("Churn Risk", analysis.get("churn_risk", "N/A")) with col4: st.metric("Agent Rating", analysis.get("agent_classification", "N/A")) st.markdown("---") # Tabs for different sections tab1, tab2, tab3, tab4 = st.tabs([ "Poor CX Drivers", "FCR Failure Drivers", "Churn Risk Drivers", "Agent Assessment", ]) with tab1: drivers = analysis.get("poor_cx_drivers", []) if drivers: for d in drivers: render_driver_card(d) else: st.success("No poor CX drivers detected.") with tab2: drivers = analysis.get("fcr_failure_drivers", []) if drivers: for d in drivers: render_driver_card(d) else: st.success("No FCR failure drivers detected.") with tab3: drivers = analysis.get("churn_risk_drivers", []) if drivers: for d in drivers: render_driver_card(d) else: st.success("No churn risk drivers detected.") with tab4: st.markdown("**Positive Skills:**") for skill in analysis.get("agent_positive_skills", []): st.markdown(f"✓ **{skill.get('skill_code', 'N/A')}** ({skill.get('confidence', 0):.0%})") if skill.get("replicable_practice"): st.caption(f"Best practice: {skill['replicable_practice']}") st.markdown("**Areas for Improvement:**") for skill in analysis.get("agent_improvement_areas", []): st.markdown(f"! **{skill.get('skill_code', 'N/A')}** ({skill.get('confidence', 0):.0%})") if skill.get("coaching_recommendation"): st.caption(f"Recommendation: {skill['coaching_recommendation']}") def render_driver_card(driver: dict): """Render a single driver card with evidence.""" confidence = driver.get("confidence", 0) code = driver.get("driver_code", "UNKNOWN") st.markdown( f"
" f"{code}" f"" f"Confidence: {confidence:.0%}
", unsafe_allow_html=True, ) col1, col2 = st.columns(2) with col1: if driver.get("reasoning"): st.markdown(f"**Why:** {driver['reasoning']}") if driver.get("origin"): st.markdown(f"**Origin:** `{driver['origin']}`") with col2: if driver.get("corrective_action"): st.success(f"**Action:** {driver['corrective_action']}") # Evidence evidence = driver.get("evidence_spans", []) if evidence: st.markdown("**Evidence from transcript:**") for e in evidence: st.markdown( format_evidence_quote(e.get("text", ""), e.get("speaker", "unknown")), unsafe_allow_html=True, ) st.markdown("") # ============================================================================= # RCA SANKEY DIAGRAM # ============================================================================= def render_rca_sankey(analyses: list[dict]): """ Render Root Cause Analysis as a Sankey diagram. Shows flow: Driver → Outcome → Churn Risk """ if not analyses: st.info("No analysis data available for RCA visualization.") return # Collect flow data flows = defaultdict(int) # (source, target) -> count for analysis in analyses: outcome = analysis.get("outcome", "UNKNOWN") churn_risk = analysis.get("churn_risk", "UNKNOWN") drivers = analysis.get("poor_cx_drivers", []) if drivers: for d in drivers: driver_code = d.get("driver_code", "UNKNOWN") # Flow 1: Driver → Outcome flows[(driver_code, f"[O] {outcome}")] += 1 # Flow 2: Outcome → Churn Risk flows[(f"[O] {outcome}", f"[R] {churn_risk}")] += 1 else: # Calls without drivers still flow to outcome and churn flows[("No CX Issues", f"[O] {outcome}")] += 1 flows[(f"[O] {outcome}", f"[R] {churn_risk}")] += 1 if not flows: st.info("Not enough data to generate RCA Sankey diagram.") return # Build node list all_nodes = set() for (source, target) in flows.keys(): all_nodes.add(source) all_nodes.add(target) # Sort nodes by category for better visual layout drivers = sorted([n for n in all_nodes if not n.startswith("[O]") and not n.startswith("[R]")]) outcomes = sorted([n for n in all_nodes if n.startswith("[O]")]) churn_levels = sorted([n for n in all_nodes if n.startswith("[R]")]) # Order: Drivers first, then Outcomes, then Churn Risk node_list = drivers + outcomes + churn_levels node_indices = {node: i for i, node in enumerate(node_list)} # Build links sources = [] targets = [] values = [] for (source, target), count in flows.items(): sources.append(node_indices[source]) targets.append(node_indices[target]) values.append(count) # Node colors based on category node_colors = [] for node in node_list: if node.startswith("[O]"): node_colors.append(COLORS["blue"]) # Blue for outcomes elif node.startswith("[R]"): # Churn risk colors if "HIGH" in node or "AT_RISK" in node: node_colors.append("#E57373") # Red elif "MEDIUM" in node: node_colors.append("#FFB74D") # Orange else: node_colors.append("#81C784") # Green elif node == "No CX Issues": node_colors.append("#81C784") # Green for no issues else: node_colors.append("#E57373") # Red for drivers # Link colors (lighter versions) link_colors = [] for source_idx in sources: base_color = node_colors[source_idx] # Make semi-transparent link_colors.append(base_color.replace("#", "rgba(") + ", 0.4)" if "#" in base_color else "rgba(150,150,150,0.4)") # Convert hex to rgba for link colors link_colors_rgba = [] for source_idx in sources: base = node_colors[source_idx] if base.startswith("#"): r = int(base[1:3], 16) g = int(base[3:5], 16) b = int(base[5:7], 16) link_colors_rgba.append(f"rgba({r},{g},{b},0.4)") else: link_colors_rgba.append("rgba(150,150,150,0.4)") # Clean labels for display (remove prefixes) display_labels = [] for node in node_list: if node.startswith("[O] "): display_labels.append(node[4:]) # Remove "[O] " elif node.startswith("[R] "): display_labels.append(f"Risk: {node[4:]}") # Change "[R] " to "Risk: " else: display_labels.append(node) # Create Sankey diagram fig = go.Figure(data=[go.Sankey( node=dict( pad=25, thickness=20, line=dict(color="white", width=1), label=display_labels, color=node_colors, hovertemplate="%{label}
Total: %{value}", ), link=dict( source=sources, target=targets, value=values, color=link_colors_rgba, hovertemplate="%{source.label}%{target.label}
Count: %{value}", ), textfont=dict(size=12, color=COLORS["black"], family="Arial Black"), )]) # Find main flow for title - clean up prefix max_flow = max(flows.items(), key=lambda x: x[1]) src_clean = max_flow[0][0].replace("[O] ", "").replace("[R] ", "") tgt_clean = max_flow[0][1].replace("[O] ", "").replace("[R] ", "") main_path = f"{src_clean} → {tgt_clean}" fig.update_layout( title=dict( text=f"Root Cause Analysis Flow — Top pattern: {main_path} ({max_flow[1]} calls)", font=dict(size=14, color=COLORS["black"]), x=0, xanchor="left", ), font=dict(size=12, color=COLORS["black"]), height=500, margin=dict(l=10, r=10, t=50, b=10), paper_bgcolor=COLORS["white"], ) st.plotly_chart(fig, use_container_width=True) # Legend with colored boxes st.markdown( f"""
Poor CX Driver No Issues Outcome Medium Risk High Risk
""", unsafe_allow_html=True, ) # Insights st.markdown("---") st.markdown("#### Key Insights") # Calculate top risky flows risky_flows = [ (src, tgt, val) for (src, tgt), val in flows.items() if "HIGH" in tgt or "AT_RISK" in tgt ] risky_flows.sort(key=lambda x: -x[2]) if risky_flows: st.warning(f"**{len(risky_flows)} paths lead to elevated churn risk:**") for src, tgt, val in risky_flows[:5]: # Clean up display src_disp = src.replace("[O] ", "").replace("[R] ", "") tgt_disp = tgt.replace("[O] ", "Risk: ").replace("[R] ", "Risk: ") st.markdown(f"- **{src_disp}** → {tgt_disp}: **{val}** calls") else: st.success("No significant paths to high churn risk detected.") # ============================================================================= # OUTCOME DEEP DIVE ANALYSIS # ============================================================================= def render_outcome_deep_dive(analyses: list[dict], selected_outcome: str): """ Render deep dive analysis for a specific outcome. Shows: root causes, driver correlation, call duration comparison. """ if not analyses or not selected_outcome: st.info("Select an outcome to analyze.") return # Filter calls for this outcome outcome_calls = [a for a in analyses if a.get("outcome") == selected_outcome] other_calls = [a for a in analyses if a.get("outcome") != selected_outcome] if not outcome_calls: st.warning(f"No calls found with outcome: {selected_outcome}") return st.markdown(f"### Why {selected_outcome}?") st.markdown(f"Deep analysis of **{len(outcome_calls)}** calls with this outcome.") # --------------------------------------------------------------------- # 1. ROOT CAUSES - Most frequent drivers leading to this outcome # --------------------------------------------------------------------- st.markdown("---") st.markdown("#### Root Causes") st.markdown("Poor CX drivers most frequently associated with this outcome:") # Aggregate drivers for this outcome driver_counts = defaultdict(lambda: {"count": 0, "confidence_sum": 0, "examples": []}) for analysis in outcome_calls: for driver in analysis.get("poor_cx_drivers", []): code = driver.get("driver_code", "UNKNOWN") driver_counts[code]["count"] += 1 driver_counts[code]["confidence_sum"] += driver.get("confidence", 0) if len(driver_counts[code]["examples"]) < 2: driver_counts[code]["examples"].append({ "reasoning": driver.get("reasoning", ""), "action": driver.get("corrective_action", ""), }) if driver_counts: # Sort by count sorted_drivers = sorted(driver_counts.items(), key=lambda x: -x[1]["count"]) # Create bar chart driver_names = [d[0] for d in sorted_drivers[:8]] driver_vals = [d[1]["count"] for d in sorted_drivers[:8]] fig = go.Figure(go.Bar( x=driver_vals, y=driver_names, orientation="h", marker_color="#E57373", text=driver_vals, textposition="outside", )) fig.update_layout( title=dict( text=f"Top drivers in {selected_outcome} calls", font=dict(size=14, color=COLORS["black"]), ), xaxis_title="Occurrences", yaxis=dict(autorange="reversed"), height=max(250, 40 * len(driver_names)), margin=dict(l=10, r=10, t=40, b=40), paper_bgcolor=COLORS["white"], plot_bgcolor=COLORS["white"], ) st.plotly_chart(fig, use_container_width=True) # Show top driver details if sorted_drivers: top_driver = sorted_drivers[0] st.info( f"**Primary root cause:** `{top_driver[0]}` appears in " f"**{top_driver[1]['count']}** of {len(outcome_calls)} calls " f"({top_driver[1]['count']/len(outcome_calls)*100:.0f}%)" ) # Show example reasoning and actions if top_driver[1]["examples"]: with st.expander(f"Details: {top_driver[0]}"): for ex in top_driver[1]["examples"]: if ex["reasoning"]: st.markdown(f"**Why:** {ex['reasoning']}") if ex["action"]: st.success(f"**Recommended action:** {ex['action']}") else: st.success(f"No Poor CX drivers detected in {selected_outcome} calls.") # --------------------------------------------------------------------- # 2. CORRELATION - Compare driver rates vs other outcomes # --------------------------------------------------------------------- st.markdown("---") st.markdown("#### Driver Correlation") st.markdown(f"How driver rates in `{selected_outcome}` compare to other outcomes:") # Calculate driver rates for this outcome vs others def get_driver_rate(calls_list): if not calls_list: return {} rates = defaultdict(int) for a in calls_list: for d in a.get("poor_cx_drivers", []): rates[d.get("driver_code", "")] += 1 return {k: v / len(calls_list) for k, v in rates.items()} outcome_rates = get_driver_rate(outcome_calls) other_rates = get_driver_rate(other_calls) if outcome_rates and other_calls: # Find drivers that are significantly more common in this outcome all_drivers = set(outcome_rates.keys()) | set(other_rates.keys()) comparison_data = [] for driver in all_drivers: rate_this = outcome_rates.get(driver, 0) rate_other = other_rates.get(driver, 0) diff = rate_this - rate_other comparison_data.append({ "Driver": driver, f"Rate in {selected_outcome}": f"{rate_this*100:.0f}%", "Rate in Other Outcomes": f"{rate_other*100:.0f}%", "Difference": diff, "Diff_Display": f"+{diff*100:.0f}%" if diff > 0 else f"{diff*100:.0f}%", }) # Sort by difference comparison_data.sort(key=lambda x: -x["Difference"]) # Show as table df_comparison = pd.DataFrame(comparison_data[:6]) df_display = df_comparison[["Driver", f"Rate in {selected_outcome}", "Rate in Other Outcomes", "Diff_Display"]] df_display = df_display.rename(columns={"Diff_Display": "Difference"}) st.dataframe(df_display, use_container_width=True, hide_index=True) # Highlight key insight if comparison_data and comparison_data[0]["Difference"] > 0.1: top = comparison_data[0] st.warning( f"**Key insight:** `{top['Driver']}` is **{top['Difference']*100:.0f}%** more likely " f"in {selected_outcome} calls than in other outcomes." ) elif not other_calls: st.info("Not enough data from other outcomes for comparison.") # --------------------------------------------------------------------- # 3. CALL DURATION - Are these calls longer? # --------------------------------------------------------------------- st.markdown("---") st.markdown("#### Call Duration Analysis") st.markdown(f"Are `{selected_outcome}` calls longer than average?") # Extract durations def get_duration(analysis): observed = analysis.get("observed", {}) return observed.get("audio_duration_sec", 0) outcome_durations = [get_duration(a) for a in outcome_calls if get_duration(a) > 0] other_durations = [get_duration(a) for a in other_calls if get_duration(a) > 0] if outcome_durations: avg_outcome = sum(outcome_durations) / len(outcome_durations) avg_other = sum(other_durations) / len(other_durations) if other_durations else 0 avg_all = sum(outcome_durations + other_durations) / len(outcome_durations + other_durations) col1, col2, col3 = st.columns(3) with col1: st.metric( label=f"Avg Duration: {selected_outcome}", value=f"{avg_outcome/60:.1f} min", delta=f"{(avg_outcome - avg_all)/60:+.1f} min vs avg" if avg_all > 0 else None, delta_color="inverse" if avg_outcome > avg_all else "normal", ) with col2: st.metric( label="Avg Duration: Other Outcomes", value=f"{avg_other/60:.1f} min" if avg_other > 0 else "N/A", ) with col3: diff_pct = ((avg_outcome - avg_other) / avg_other * 100) if avg_other > 0 else 0 st.metric( label="Duration Difference", value=f"{diff_pct:+.0f}%", delta="longer" if diff_pct > 0 else "shorter", delta_color="inverse" if diff_pct > 10 else "normal", ) # Duration distribution chart if other_durations: fig = go.Figure() fig.add_trace(go.Box( y=[d/60 for d in outcome_durations], name=selected_outcome, marker_color="#E57373", boxmean=True, )) fig.add_trace(go.Box( y=[d/60 for d in other_durations], name="Other Outcomes", marker_color=COLORS["blue"], boxmean=True, )) fig.update_layout( title=dict( text="Call Duration Distribution (minutes)", font=dict(size=14, color=COLORS["black"]), ), yaxis_title="Duration (min)", showlegend=False, height=300, margin=dict(l=10, r=10, t=40, b=10), paper_bgcolor=COLORS["white"], plot_bgcolor=COLORS["white"], ) st.plotly_chart(fig, use_container_width=True) # Insight if diff_pct > 15: st.warning( f"**Insight:** {selected_outcome} calls are **{diff_pct:.0f}% longer** than average. " f"This may indicate complexity in handling these issues or inefficiency in the process." ) elif diff_pct < -15: st.info( f"**Insight:** {selected_outcome} calls are **{abs(diff_pct):.0f}% shorter** than average. " f"Quick resolution or early abandonment may be factors." ) else: st.info("No duration data available for analysis.") # --------------------------------------------------------------------- # 4. RECOMMENDATIONS # --------------------------------------------------------------------- st.markdown("---") st.markdown("#### Recommendations") # Collect unique corrective actions actions = set() for analysis in outcome_calls: for driver in analysis.get("poor_cx_drivers", []): if driver.get("corrective_action"): actions.add(driver["corrective_action"]) if actions: st.markdown(f"Based on root cause analysis, prioritize these actions to reduce `{selected_outcome}`:") for i, action in enumerate(list(actions)[:5], 1): st.markdown(f"{i}. {action}") else: st.success("No specific corrective actions identified.")