""" CXInsights - Pipeline Tests Tests for the end-to-end pipeline and exports. """ import json import tempfile from datetime import datetime from pathlib import Path import pytest from src.pipeline.models import ( PipelineConfig, PipelineManifest, PipelineStage, StageManifest, StageStatus, ) class TestStageManifest: """Tests for StageManifest.""" def test_create_stage_manifest(self): """Test creating a stage manifest.""" manifest = StageManifest(stage=PipelineStage.TRANSCRIPTION) assert manifest.stage == PipelineStage.TRANSCRIPTION assert manifest.status == StageStatus.PENDING assert manifest.total_items == 0 def test_success_rate(self): """Test success rate calculation.""" manifest = StageManifest( stage=PipelineStage.INFERENCE, total_items=100, processed_items=90, failed_items=10, ) assert manifest.success_rate == 0.8 def test_success_rate_zero_items(self): """Test success rate with zero items.""" manifest = StageManifest(stage=PipelineStage.INFERENCE) assert manifest.success_rate == 0.0 def test_duration(self): """Test duration calculation.""" start = datetime(2024, 1, 1, 10, 0, 0) end = datetime(2024, 1, 1, 10, 5, 30) manifest = StageManifest( stage=PipelineStage.INFERENCE, started_at=start, completed_at=end, ) assert manifest.duration_sec == 330.0 # 5 min 30 sec def test_to_dict(self): """Test serialization.""" manifest = StageManifest( stage=PipelineStage.TRANSCRIPTION, status=StageStatus.COMPLETED, total_items=10, processed_items=10, ) data = manifest.to_dict() assert data["stage"] == "transcription" assert data["status"] == "completed" assert data["total_items"] == 10 def test_from_dict(self): """Test deserialization.""" data = { "stage": "inference", "status": "running", "started_at": "2024-01-01T10:00:00", "completed_at": None, "total_items": 50, "processed_items": 25, "failed_items": 0, "skipped_items": 0, "errors": [], "output_dir": None, "metadata": {}, } manifest = StageManifest.from_dict(data) assert manifest.stage == PipelineStage.INFERENCE assert manifest.status == StageStatus.RUNNING assert manifest.total_items == 50 class TestPipelineManifest: """Tests for PipelineManifest.""" def test_create_manifest(self): """Test creating pipeline manifest.""" manifest = PipelineManifest(batch_id="test_batch") assert manifest.batch_id == "test_batch" assert manifest.status == StageStatus.PENDING assert len(manifest.stages) == len(PipelineStage) def test_mark_stage_started(self): """Test marking stage as started.""" manifest = PipelineManifest(batch_id="test") manifest.mark_stage_started(PipelineStage.TRANSCRIPTION, total_items=100) stage = manifest.stages[PipelineStage.TRANSCRIPTION] assert stage.status == StageStatus.RUNNING assert stage.total_items == 100 assert stage.started_at is not None assert manifest.current_stage == PipelineStage.TRANSCRIPTION def test_mark_stage_completed(self): """Test marking stage as completed.""" manifest = PipelineManifest(batch_id="test") manifest.mark_stage_started(PipelineStage.TRANSCRIPTION, 100) manifest.mark_stage_completed( PipelineStage.TRANSCRIPTION, processed=95, failed=5, metadata={"key": "value"}, ) stage = manifest.stages[PipelineStage.TRANSCRIPTION] assert stage.status == StageStatus.COMPLETED assert stage.processed_items == 95 assert stage.failed_items == 5 assert stage.metadata["key"] == "value" def test_mark_stage_failed(self): """Test marking stage as failed.""" manifest = PipelineManifest(batch_id="test") manifest.mark_stage_started(PipelineStage.INFERENCE, 50) manifest.mark_stage_failed(PipelineStage.INFERENCE, "API error") stage = manifest.stages[PipelineStage.INFERENCE] assert stage.status == StageStatus.FAILED assert len(stage.errors) == 1 assert "API error" in stage.errors[0]["error"] assert manifest.status == StageStatus.FAILED def test_can_resume_from(self): """Test resume capability check.""" manifest = PipelineManifest(batch_id="test") # Mark first two stages as complete manifest.stages[PipelineStage.TRANSCRIPTION].status = StageStatus.COMPLETED manifest.stages[PipelineStage.FEATURE_EXTRACTION].status = StageStatus.COMPLETED # Can resume from compression assert manifest.can_resume_from(PipelineStage.COMPRESSION) is True # Cannot resume from inference (compression not done) assert manifest.can_resume_from(PipelineStage.INFERENCE) is False def test_get_resume_stage(self): """Test getting resume stage.""" manifest = PipelineManifest(batch_id="test") # All pending - resume from first assert manifest.get_resume_stage() == PipelineStage.TRANSCRIPTION # Some complete manifest.stages[PipelineStage.TRANSCRIPTION].status = StageStatus.COMPLETED manifest.stages[PipelineStage.FEATURE_EXTRACTION].status = StageStatus.COMPLETED assert manifest.get_resume_stage() == PipelineStage.COMPRESSION def test_is_complete(self): """Test completion check.""" manifest = PipelineManifest(batch_id="test") assert manifest.is_complete is False for stage in PipelineStage: manifest.stages[stage].status = StageStatus.COMPLETED assert manifest.is_complete is True def test_save_and_load(self): """Test manifest persistence.""" manifest = PipelineManifest( batch_id="persist_test", total_audio_files=100, ) manifest.mark_stage_started(PipelineStage.TRANSCRIPTION, 100) manifest.mark_stage_completed(PipelineStage.TRANSCRIPTION, 100) with tempfile.TemporaryDirectory() as tmp: path = Path(tmp) / "manifest.json" manifest.save(path) loaded = PipelineManifest.load(path) assert loaded.batch_id == "persist_test" assert loaded.total_audio_files == 100 assert loaded.stages[PipelineStage.TRANSCRIPTION].status == StageStatus.COMPLETED class TestPipelineConfig: """Tests for PipelineConfig.""" def test_default_config(self): """Test default configuration.""" config = PipelineConfig() assert config.inference_model == "gpt-4o-mini" assert config.use_compression is True assert "json" in config.export_formats assert "excel" in config.export_formats def test_custom_config(self): """Test custom configuration.""" config = PipelineConfig( inference_model="gpt-4o", use_compression=False, export_formats=["json", "pdf"], ) assert config.inference_model == "gpt-4o" assert config.use_compression is False assert "pdf" in config.export_formats def test_to_dict(self): """Test config serialization.""" config = PipelineConfig() data = config.to_dict() assert "inference_model" in data assert "export_formats" in data assert isinstance(data["export_formats"], list) class TestPipelineStages: """Tests for pipeline stage enum.""" def test_all_stages_defined(self): """Test that all expected stages exist.""" expected = [ "transcription", "feature_extraction", "compression", "inference", "aggregation", "export", ] for stage_name in expected: assert PipelineStage(stage_name) is not None def test_stage_order(self): """Test that stages are in correct order.""" stages = list(PipelineStage) assert stages[0] == PipelineStage.TRANSCRIPTION assert stages[-1] == PipelineStage.EXPORT class TestExports: """Tests for export functions.""" @pytest.fixture def sample_aggregation(self): """Create sample aggregation for export tests.""" from src.aggregation.models import ( BatchAggregation, DriverFrequency, DriverSeverity, ImpactLevel, RCATree, ) return BatchAggregation( batch_id="export_test", total_calls_processed=100, successful_analyses=95, failed_analyses=5, lost_sales_frequencies=[ DriverFrequency( driver_code="PRICE_TOO_HIGH", category="lost_sales", total_occurrences=30, calls_affected=25, total_calls_in_batch=100, occurrence_rate=0.30, call_rate=0.25, avg_confidence=0.85, min_confidence=0.7, max_confidence=0.95, ), ], poor_cx_frequencies=[ DriverFrequency( driver_code="LONG_HOLD", category="poor_cx", total_occurrences=20, calls_affected=20, total_calls_in_batch=100, occurrence_rate=0.20, call_rate=0.20, avg_confidence=0.9, min_confidence=0.8, max_confidence=0.95, ), ], lost_sales_severities=[ DriverSeverity( driver_code="PRICE_TOO_HIGH", category="lost_sales", base_severity=0.8, frequency_factor=0.5, confidence_factor=0.85, co_occurrence_factor=0.2, severity_score=65.0, impact_level=ImpactLevel.HIGH, ), ], poor_cx_severities=[ DriverSeverity( driver_code="LONG_HOLD", category="poor_cx", base_severity=0.7, frequency_factor=0.4, confidence_factor=0.9, co_occurrence_factor=0.1, severity_score=55.0, impact_level=ImpactLevel.HIGH, ), ], rca_tree=RCATree( batch_id="export_test", total_calls=100, calls_with_lost_sales=25, calls_with_poor_cx=20, calls_with_both=5, top_lost_sales_drivers=["PRICE_TOO_HIGH"], top_poor_cx_drivers=["LONG_HOLD"], ), ) @pytest.fixture def sample_analyses(self): """Create sample analyses for export tests.""" from src.models.call_analysis import ( CallAnalysis, CallOutcome, ObservedFeatures, ProcessingStatus, Traceability, ) return [ CallAnalysis( call_id="CALL001", batch_id="export_test", status=ProcessingStatus.SUCCESS, observed=ObservedFeatures(audio_duration_sec=60), outcome=CallOutcome.SALE_LOST, lost_sales_drivers=[], poor_cx_drivers=[], traceability=Traceability( schema_version="1.0", prompt_version="v1.0", model_id="gpt-4o-mini", ), ), ] def test_json_export(self, sample_aggregation, sample_analyses): """Test JSON export.""" from src.exports.json_export import export_to_json with tempfile.TemporaryDirectory() as tmp: output_dir = Path(tmp) result = export_to_json( "test_batch", sample_aggregation, sample_analyses, output_dir, ) assert result.exists() assert result.name == "summary.json" # Verify content with open(result) as f: data = json.load(f) assert data["batch_id"] == "test_batch" assert "summary" in data assert "lost_sales" in data assert "poor_cx" in data def test_pdf_export_html_fallback(self, sample_aggregation): """Test PDF export falls back to HTML.""" from src.exports.pdf_export import export_to_pdf with tempfile.TemporaryDirectory() as tmp: output_dir = Path(tmp) result = export_to_pdf("test_batch", sample_aggregation, output_dir) assert result.exists() # Should be HTML if weasyprint not installed assert result.suffix in [".pdf", ".html"]