""" Main pipeline orchestrator - runs all steps in sequence. """ import sys from pathlib import Path from typing import Optional import logging # Add pipeline to path sys.path.insert(0, str(Path(__file__).parent.parent)) from pipeline.steps.step1_load_data import DataLoader from pipeline.steps.step2_create_chunks import ChunkCreator from pipeline.steps.step3_keyword_filter import KeywordFilter from pipeline.steps.step4_semantic_filter import SemanticFilter from pipeline.steps.step5_random_sampling import RandomSampler from pipeline.steps.step6_labeling_template import LabelingTemplateGenerator from pipeline.steps.step7_inference_prep import InferencePreparation from pipeline.steps.step8_merge_results import ResultsMerger class DiscoveryPipeline: """Main pipeline orchestrator""" def __init__(self, csv_path: str, output_dir: str = './pipeline_output'): self.csv_path = csv_path self.output_dir = Path(output_dir) self.output_dir.mkdir(exist_ok=True) # Setup logging self.logger = self._setup_logger() # Initialize steps self.data_loader = DataLoader(csv_path, output_dir) self.chunk_creator = ChunkCreator(chunk_size=20, overlap=5, output_dir=output_dir) self.keyword_filter = KeywordFilter(output_dir) self.semantic_filter = SemanticFilter( threshold1=0.25, threshold2=0.25, merge_strategy='union', output_dir=output_dir ) self.random_sampler = RandomSampler(n_samples=20, seed=42, output_dir=output_dir) self.template_generator = LabelingTemplateGenerator(output_dir) self.inference_prep = InferencePreparation(output_dir=output_dir) self.results_merger = ResultsMerger(merge_strategy='union', output_dir=output_dir) def _setup_logger(self) -> logging.Logger: """Setup main pipeline logger""" logger = logging.getLogger('DiscoveryPipeline') logger.setLevel(logging.INFO) if not logger.handlers: # Console handler console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) # File handler file_handler = logging.FileHandler(self.output_dir / 'pipeline.log') file_handler.setLevel(logging.DEBUG) # Formatter formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) console_handler.setFormatter(formatter) file_handler.setFormatter(formatter) logger.addHandler(console_handler) logger.addHandler(file_handler) return logger def run_preprocessing(self): """Run preprocessing steps (1-6)""" self.logger.info("=" * 80) self.logger.info("STARTING PREPROCESSING PIPELINE") self.logger.info("=" * 80) # Step 1: Load data self.logger.info("\nStep 1: Loading data...") df = self.data_loader.execute() # Step 2: Create chunks self.logger.info("\nStep 2: Creating chunks...") chunks = self.chunk_creator.execute(df) # Step 3: Keyword filter self.logger.info("\nStep 3: Applying keyword filter...") keyword_filtered = self.keyword_filter.execute(chunks) # Step 4: Semantic filter self.logger.info("\nStep 4: Applying semantic filter...") semantic_filtered = self.semantic_filter.execute(keyword_filtered) # Step 5: Random sampling self.logger.info("\nStep 5: Random sampling...") samples = self.random_sampler.execute(semantic_filtered) # Step 6: Generate labeling template self.logger.info("\nStep 6: Generating labeling template...") template_path = self.template_generator.execute(samples) # Step 7: Prepare inference requests self.logger.info("\nStep 7: Preparing inference requests...") requests_path = self.inference_prep.execute(semantic_filtered) self.logger.info("\n" + "=" * 80) self.logger.info("PREPROCESSING COMPLETE") self.logger.info("=" * 80) self.logger.info(f"\nTotal messages: {len(df):,}") self.logger.info(f"Total chunks: {len(chunks):,}") self.logger.info(f"After keyword filter: {len(keyword_filtered):,}") self.logger.info(f"After semantic filter: {len(semantic_filtered):,}") self.logger.info(f"Samples for attorney: {len(samples)}") self.logger.info(f"\nNext steps:") self.logger.info(f"1. Attorney completes labeling template: {template_path}") self.logger.info(f"2. Deploy Qwen 3 235B and Qwen 2.5 72B models") self.logger.info(f"3. Run inference using: {requests_path}") self.logger.info(f"4. Run merge_results() with inference outputs") return { 'df': df, 'chunks': chunks, 'keyword_filtered': keyword_filtered, 'semantic_filtered': semantic_filtered, 'samples': samples, 'template_path': template_path, 'requests_path': requests_path } def merge_results(self, qwen3_results_file: str, qwen25_results_file: str): """Merge results from dual model inference (Step 8)""" self.logger.info("=" * 80) self.logger.info("MERGING INFERENCE RESULTS") self.logger.info("=" * 80) merged = self.results_merger.execute(qwen3_results_file, qwen25_results_file) self.logger.info("\n" + "=" * 80) self.logger.info("MERGE COMPLETE") self.logger.info("=" * 80) self.logger.info(f"\nMerged {len(merged)} results") self.logger.info(f"Results saved to: {self.output_dir / 'merged_results.json'}") return merged if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='Legal Discovery Pipeline') parser.add_argument('csv_path', help='Path to Signal messages CSV') parser.add_argument('--output-dir', default='./pipeline_output', help='Output directory') parser.add_argument('--step', choices=['preprocess', 'merge'], default='preprocess', help='Pipeline step to run') parser.add_argument('--qwen3-results', help='Qwen 3 results file (for merge)') parser.add_argument('--qwen25-results', help='Qwen 2.5 results file (for merge)') args = parser.parse_args() pipeline = DiscoveryPipeline(args.csv_path, args.output_dir) if args.step == 'preprocess': results = pipeline.run_preprocessing() elif args.step == 'merge': if not args.qwen3_results or not args.qwen25_results: print("Error: --qwen3-results and --qwen25-results required for merge step") sys.exit(1) results = pipeline.merge_results(args.qwen3_results, args.qwen25_results)