""" Step 8: Merge results from dual Qwen models. """ from typing import List, Dict import json from pipeline.models.base import PipelineStep from pipeline.common_defs import InferenceResult, MergedResult, ConfidenceLevel class ResultsMerger(PipelineStep): """Merge results from Qwen 3 and Qwen 2.5""" def __init__(self, merge_strategy: str = 'union', output_dir: str = './pipeline_output'): super().__init__(output_dir) self.merge_strategy = merge_strategy def execute(self, qwen3_results_file: str, qwen25_results_file: str) -> List[MergedResult]: """ Merge results from both models. Args: qwen3_results_file: Path to Qwen 3 results qwen25_results_file: Path to Qwen 2.5 results Returns: List of merged results """ self.logger.info("Merging results from dual Qwen models...") self.logger.info(f"Strategy: {self.merge_strategy}") # Load results qwen3_results = self._load_results(qwen3_results_file) qwen25_results = self._load_results(qwen25_results_file) if len(qwen3_results) != len(qwen25_results): self.logger.warning( f"Result count mismatch: Qwen3={len(qwen3_results)}, " f"Qwen2.5={len(qwen25_results)}" ) # Merge results merged = [] for q3, q25 in zip(qwen3_results, qwen25_results): merged_result = self._merge_single_result(q3, q25) merged.append(merged_result) self.logger.info(f"Merged {len(merged)} results") # Analyze agreement self._analyze_agreement(merged) # Save merged results self._save_merged_results(merged) return merged def _load_results(self, filepath: str) -> List[InferenceResult]: """Load inference results from file""" results = [] with open(filepath, 'r') as f: for line in f: data = json.loads(line) result = InferenceResult( chunk_id=data['chunk_id'], responsive_line_numbers=data.get('responsive_line_numbers', []), reasoning=data.get('reasoning', ''), confidence=ConfidenceLevel(data.get('confidence', 'medium')), model_name=data.get('model_name', 'unknown') ) results.append(result) return results def _merge_single_result(self, qwen3: InferenceResult, qwen25: InferenceResult) -> MergedResult: """Merge results from both models for a single chunk""" q3_lines = set(qwen3.responsive_line_numbers) q25_lines = set(qwen25.responsive_line_numbers) # Apply merge strategy if self.merge_strategy == 'union': merged_lines = list(q3_lines | q25_lines) elif self.merge_strategy == 'intersection': merged_lines = list(q3_lines & q25_lines) else: # weighted or other # For weighted, use union but adjust confidence merged_lines = list(q3_lines | q25_lines) # Determine confidence based on agreement agreement = q3_lines == q25_lines confidence = self._determine_confidence(q3_lines, q25_lines, agreement) return MergedResult( chunk_id=qwen3.chunk_id, responsive_line_numbers=sorted(merged_lines), confidence=confidence, qwen3_lines=sorted(list(q3_lines)), qwen25_lines=sorted(list(q25_lines)), agreement=agreement ) def _determine_confidence(self, q3_lines: set, q25_lines: set, agreement: bool) -> ConfidenceLevel: """Determine confidence level based on model agreement""" if agreement: return ConfidenceLevel.HIGH elif q3_lines or q25_lines: return ConfidenceLevel.MEDIUM else: return ConfidenceLevel.LOW def _analyze_agreement(self, merged: List[MergedResult]): """Analyze agreement statistics""" total = len(merged) high_conf = sum(1 for m in merged if m.confidence == ConfidenceLevel.HIGH) medium_conf = sum(1 for m in merged if m.confidence == ConfidenceLevel.MEDIUM) low_conf = sum(1 for m in merged if m.confidence == ConfidenceLevel.LOW) self.logger.info("Agreement Analysis:") self.logger.info(f" High confidence (both agree): {high_conf} ({high_conf/total*100:.1f}%)") self.logger.info(f" Medium confidence (one flags): {medium_conf} ({medium_conf/total*100:.1f}%)") self.logger.info(f" Low confidence (neither flags): {low_conf} ({low_conf/total*100:.1f}%)") def _save_merged_results(self, merged: List[MergedResult]): """Save merged results""" results_data = [] for m in merged: result_dict = { 'chunk_id': m.chunk_id, 'responsive_line_numbers': m.responsive_line_numbers, 'confidence': m.confidence.value, 'qwen3_lines': m.qwen3_lines, 'qwen25_lines': m.qwen25_lines, 'agreement': m.agreement, 'num_responsive': len(m.responsive_line_numbers) } results_data.append(result_dict) self.save_results(results_data, 'merged_results.json') if __name__ == "__main__": # Example usage - would need actual inference results merger = ResultsMerger(merge_strategy='union') print("Results merger created") # merged = merger.execute('qwen3_results.jsonl', 'qwen25_results.jsonl')