| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- """
- Step 8: Merge results from dual Qwen models.
- """
- from typing import List, Dict
- import json
- from pipeline.models.base import PipelineStep
- from pipeline.common_defs import InferenceResult, MergedResult, ConfidenceLevel
- class ResultsMerger(PipelineStep):
- """Merge results from Qwen 3 and Qwen 2.5"""
-
- def __init__(self, merge_strategy: str = 'union',
- output_dir: str = './pipeline_output'):
- super().__init__(output_dir)
- self.merge_strategy = merge_strategy
-
- def execute(self, qwen3_results_file: str,
- qwen25_results_file: str) -> List[MergedResult]:
- """
- Merge results from both models.
-
- Args:
- qwen3_results_file: Path to Qwen 3 results
- qwen25_results_file: Path to Qwen 2.5 results
-
- Returns:
- List of merged results
- """
- self.logger.info("Merging results from dual Qwen models...")
- self.logger.info(f"Strategy: {self.merge_strategy}")
-
- # Load results
- qwen3_results = self._load_results(qwen3_results_file)
- qwen25_results = self._load_results(qwen25_results_file)
-
- if len(qwen3_results) != len(qwen25_results):
- self.logger.warning(
- f"Result count mismatch: Qwen3={len(qwen3_results)}, "
- f"Qwen2.5={len(qwen25_results)}"
- )
-
- # Merge results
- merged = []
- for q3, q25 in zip(qwen3_results, qwen25_results):
- merged_result = self._merge_single_result(q3, q25)
- merged.append(merged_result)
-
- self.logger.info(f"Merged {len(merged)} results")
-
- # Analyze agreement
- self._analyze_agreement(merged)
-
- # Save merged results
- self._save_merged_results(merged)
-
- return merged
-
- def _load_results(self, filepath: str) -> List[InferenceResult]:
- """Load inference results from file"""
- results = []
-
- with open(filepath, 'r') as f:
- for line in f:
- data = json.loads(line)
- result = InferenceResult(
- chunk_id=data['chunk_id'],
- responsive_line_numbers=data.get('responsive_line_numbers', []),
- reasoning=data.get('reasoning', ''),
- confidence=ConfidenceLevel(data.get('confidence', 'medium')),
- model_name=data.get('model_name', 'unknown')
- )
- results.append(result)
-
- return results
-
- def _merge_single_result(self, qwen3: InferenceResult,
- qwen25: InferenceResult) -> MergedResult:
- """Merge results from both models for a single chunk"""
- q3_lines = set(qwen3.responsive_line_numbers)
- q25_lines = set(qwen25.responsive_line_numbers)
-
- # Apply merge strategy
- if self.merge_strategy == 'union':
- merged_lines = list(q3_lines | q25_lines)
- elif self.merge_strategy == 'intersection':
- merged_lines = list(q3_lines & q25_lines)
- else: # weighted or other
- # For weighted, use union but adjust confidence
- merged_lines = list(q3_lines | q25_lines)
-
- # Determine confidence based on agreement
- agreement = q3_lines == q25_lines
- confidence = self._determine_confidence(q3_lines, q25_lines, agreement)
-
- return MergedResult(
- chunk_id=qwen3.chunk_id,
- responsive_line_numbers=sorted(merged_lines),
- confidence=confidence,
- qwen3_lines=sorted(list(q3_lines)),
- qwen25_lines=sorted(list(q25_lines)),
- agreement=agreement
- )
-
- def _determine_confidence(self, q3_lines: set, q25_lines: set,
- agreement: bool) -> ConfidenceLevel:
- """Determine confidence level based on model agreement"""
- if agreement:
- return ConfidenceLevel.HIGH
- elif q3_lines or q25_lines:
- return ConfidenceLevel.MEDIUM
- else:
- return ConfidenceLevel.LOW
-
- def _analyze_agreement(self, merged: List[MergedResult]):
- """Analyze agreement statistics"""
- total = len(merged)
- high_conf = sum(1 for m in merged if m.confidence == ConfidenceLevel.HIGH)
- medium_conf = sum(1 for m in merged if m.confidence == ConfidenceLevel.MEDIUM)
- low_conf = sum(1 for m in merged if m.confidence == ConfidenceLevel.LOW)
-
- self.logger.info("Agreement Analysis:")
- self.logger.info(f" High confidence (both agree): {high_conf} ({high_conf/total*100:.1f}%)")
- self.logger.info(f" Medium confidence (one flags): {medium_conf} ({medium_conf/total*100:.1f}%)")
- self.logger.info(f" Low confidence (neither flags): {low_conf} ({low_conf/total*100:.1f}%)")
-
- def _save_merged_results(self, merged: List[MergedResult]):
- """Save merged results"""
- results_data = []
- for m in merged:
- result_dict = {
- 'chunk_id': m.chunk_id,
- 'responsive_line_numbers': m.responsive_line_numbers,
- 'confidence': m.confidence.value,
- 'qwen3_lines': m.qwen3_lines,
- 'qwen25_lines': m.qwen25_lines,
- 'agreement': m.agreement,
- 'num_responsive': len(m.responsive_line_numbers)
- }
- results_data.append(result_dict)
-
- self.save_results(results_data, 'merged_results.json')
- if __name__ == "__main__":
- # Example usage - would need actual inference results
- merger = ResultsMerger(merge_strategy='union')
- print("Results merger created")
- # merged = merger.execute('qwen3_results.jsonl', 'qwen25_results.jsonl')
|