| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- """
- Main pipeline orchestrator - runs all steps in sequence.
- """
- import sys
- from pathlib import Path
- from typing import Optional
- import logging
- # Add pipeline to path
- sys.path.insert(0, str(Path(__file__).parent.parent))
- from pipeline.steps.step1_load_data import DataLoader
- from pipeline.steps.step2_create_chunks import ChunkCreator
- from pipeline.steps.step3_keyword_filter import KeywordFilter
- from pipeline.steps.step4_semantic_filter import SemanticFilter
- from pipeline.steps.step5_random_sampling import RandomSampler
- from pipeline.steps.step6_labeling_template import LabelingTemplateGenerator
- from pipeline.steps.step7_inference_prep import InferencePreparation
- from pipeline.steps.step8_merge_results import ResultsMerger
- class DiscoveryPipeline:
- """Main pipeline orchestrator"""
-
- def __init__(self, csv_path: str, output_dir: str = './pipeline_output'):
- self.csv_path = csv_path
- self.output_dir = Path(output_dir)
- self.output_dir.mkdir(exist_ok=True)
-
- # Setup logging
- self.logger = self._setup_logger()
-
- # Initialize steps
- self.data_loader = DataLoader(csv_path, output_dir)
- self.chunk_creator = ChunkCreator(chunk_size=20, overlap=5, output_dir=output_dir)
- self.keyword_filter = KeywordFilter(output_dir)
- self.semantic_filter = SemanticFilter(
- threshold1=0.25,
- threshold2=0.25,
- merge_strategy='union',
- output_dir=output_dir
- )
- self.random_sampler = RandomSampler(n_samples=20, seed=42, output_dir=output_dir)
- self.template_generator = LabelingTemplateGenerator(output_dir)
- self.inference_prep = InferencePreparation(output_dir=output_dir)
- self.results_merger = ResultsMerger(merge_strategy='union', output_dir=output_dir)
-
- def _setup_logger(self) -> logging.Logger:
- """Setup main pipeline logger"""
- logger = logging.getLogger('DiscoveryPipeline')
- logger.setLevel(logging.INFO)
-
- if not logger.handlers:
- # Console handler
- console_handler = logging.StreamHandler()
- console_handler.setLevel(logging.INFO)
-
- # File handler
- file_handler = logging.FileHandler(self.output_dir / 'pipeline.log')
- file_handler.setLevel(logging.DEBUG)
-
- # Formatter
- formatter = logging.Formatter(
- '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
- )
- console_handler.setFormatter(formatter)
- file_handler.setFormatter(formatter)
-
- logger.addHandler(console_handler)
- logger.addHandler(file_handler)
-
- return logger
-
- def run_preprocessing(self):
- """Run preprocessing steps (1-6)"""
- self.logger.info("=" * 80)
- self.logger.info("STARTING PREPROCESSING PIPELINE")
- self.logger.info("=" * 80)
-
- # Step 1: Load data
- self.logger.info("\nStep 1: Loading data...")
- df = self.data_loader.execute()
-
- # Step 2: Create chunks
- self.logger.info("\nStep 2: Creating chunks...")
- chunks = self.chunk_creator.execute(df)
-
- # Step 3: Keyword filter
- self.logger.info("\nStep 3: Applying keyword filter...")
- keyword_filtered = self.keyword_filter.execute(chunks)
-
- # Step 4: Semantic filter
- self.logger.info("\nStep 4: Applying semantic filter...")
- semantic_filtered = self.semantic_filter.execute(keyword_filtered)
-
- # Step 5: Random sampling
- self.logger.info("\nStep 5: Random sampling...")
- samples = self.random_sampler.execute(semantic_filtered)
-
- # Step 6: Generate labeling template
- self.logger.info("\nStep 6: Generating labeling template...")
- template_path = self.template_generator.execute(samples)
-
- # Step 7: Prepare inference requests
- self.logger.info("\nStep 7: Preparing inference requests...")
- requests_path = self.inference_prep.execute(semantic_filtered)
-
- self.logger.info("\n" + "=" * 80)
- self.logger.info("PREPROCESSING COMPLETE")
- self.logger.info("=" * 80)
- self.logger.info(f"\nTotal messages: {len(df):,}")
- self.logger.info(f"Total chunks: {len(chunks):,}")
- self.logger.info(f"After keyword filter: {len(keyword_filtered):,}")
- self.logger.info(f"After semantic filter: {len(semantic_filtered):,}")
- self.logger.info(f"Samples for attorney: {len(samples)}")
- self.logger.info(f"\nNext steps:")
- self.logger.info(f"1. Attorney completes labeling template: {template_path}")
- self.logger.info(f"2. Deploy Qwen 3 235B and Qwen 2.5 72B models")
- self.logger.info(f"3. Run inference using: {requests_path}")
- self.logger.info(f"4. Run merge_results() with inference outputs")
-
- return {
- 'df': df,
- 'chunks': chunks,
- 'keyword_filtered': keyword_filtered,
- 'semantic_filtered': semantic_filtered,
- 'samples': samples,
- 'template_path': template_path,
- 'requests_path': requests_path
- }
-
- def merge_results(self, qwen3_results_file: str, qwen25_results_file: str):
- """Merge results from dual model inference (Step 8)"""
- self.logger.info("=" * 80)
- self.logger.info("MERGING INFERENCE RESULTS")
- self.logger.info("=" * 80)
-
- merged = self.results_merger.execute(qwen3_results_file, qwen25_results_file)
-
- self.logger.info("\n" + "=" * 80)
- self.logger.info("MERGE COMPLETE")
- self.logger.info("=" * 80)
- self.logger.info(f"\nMerged {len(merged)} results")
- self.logger.info(f"Results saved to: {self.output_dir / 'merged_results.json'}")
-
- return merged
- if __name__ == "__main__":
- import argparse
-
- parser = argparse.ArgumentParser(description='Legal Discovery Pipeline')
- parser.add_argument('csv_path', help='Path to Signal messages CSV')
- parser.add_argument('--output-dir', default='./pipeline_output',
- help='Output directory')
- parser.add_argument('--step', choices=['preprocess', 'merge'],
- default='preprocess',
- help='Pipeline step to run')
- parser.add_argument('--qwen3-results', help='Qwen 3 results file (for merge)')
- parser.add_argument('--qwen25-results', help='Qwen 2.5 results file (for merge)')
-
- args = parser.parse_args()
-
- pipeline = DiscoveryPipeline(args.csv_path, args.output_dir)
-
- if args.step == 'preprocess':
- results = pipeline.run_preprocessing()
- elif args.step == 'merge':
- if not args.qwen3_results or not args.qwen25_results:
- print("Error: --qwen3-results and --qwen25-results required for merge step")
- sys.exit(1)
- results = pipeline.merge_results(args.qwen3_results, args.qwen25_results)
|