step8_merge_results.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. """
  2. Step 8: Merge results from dual Qwen models.
  3. """
  4. from typing import List, Dict
  5. import json
  6. from pipeline.models.base import PipelineStep
  7. from pipeline.common_defs import InferenceResult, MergedResult, ConfidenceLevel
  8. class ResultsMerger(PipelineStep):
  9. """Merge results from Qwen 3 and Qwen 2.5"""
  10. def __init__(self, merge_strategy: str = 'union',
  11. output_dir: str = './pipeline_output'):
  12. super().__init__(output_dir)
  13. self.merge_strategy = merge_strategy
  14. def execute(self, qwen3_results_file: str,
  15. qwen25_results_file: str) -> List[MergedResult]:
  16. """
  17. Merge results from both models.
  18. Args:
  19. qwen3_results_file: Path to Qwen 3 results
  20. qwen25_results_file: Path to Qwen 2.5 results
  21. Returns:
  22. List of merged results
  23. """
  24. self.logger.info("Merging results from dual Qwen models...")
  25. self.logger.info(f"Strategy: {self.merge_strategy}")
  26. # Load results
  27. qwen3_results = self._load_results(qwen3_results_file)
  28. qwen25_results = self._load_results(qwen25_results_file)
  29. if len(qwen3_results) != len(qwen25_results):
  30. self.logger.warning(
  31. f"Result count mismatch: Qwen3={len(qwen3_results)}, "
  32. f"Qwen2.5={len(qwen25_results)}"
  33. )
  34. # Merge results
  35. merged = []
  36. for q3, q25 in zip(qwen3_results, qwen25_results):
  37. merged_result = self._merge_single_result(q3, q25)
  38. merged.append(merged_result)
  39. self.logger.info(f"Merged {len(merged)} results")
  40. # Analyze agreement
  41. self._analyze_agreement(merged)
  42. # Save merged results
  43. self._save_merged_results(merged)
  44. return merged
  45. def _load_results(self, filepath: str) -> List[InferenceResult]:
  46. """Load inference results from file"""
  47. results = []
  48. with open(filepath, 'r') as f:
  49. for line in f:
  50. data = json.loads(line)
  51. result = InferenceResult(
  52. chunk_id=data['chunk_id'],
  53. responsive_line_numbers=data.get('responsive_line_numbers', []),
  54. reasoning=data.get('reasoning', ''),
  55. confidence=ConfidenceLevel(data.get('confidence', 'medium')),
  56. model_name=data.get('model_name', 'unknown')
  57. )
  58. results.append(result)
  59. return results
  60. def _merge_single_result(self, qwen3: InferenceResult,
  61. qwen25: InferenceResult) -> MergedResult:
  62. """Merge results from both models for a single chunk"""
  63. q3_lines = set(qwen3.responsive_line_numbers)
  64. q25_lines = set(qwen25.responsive_line_numbers)
  65. # Apply merge strategy
  66. if self.merge_strategy == 'union':
  67. merged_lines = list(q3_lines | q25_lines)
  68. elif self.merge_strategy == 'intersection':
  69. merged_lines = list(q3_lines & q25_lines)
  70. else: # weighted or other
  71. # For weighted, use union but adjust confidence
  72. merged_lines = list(q3_lines | q25_lines)
  73. # Determine confidence based on agreement
  74. agreement = q3_lines == q25_lines
  75. confidence = self._determine_confidence(q3_lines, q25_lines, agreement)
  76. return MergedResult(
  77. chunk_id=qwen3.chunk_id,
  78. responsive_line_numbers=sorted(merged_lines),
  79. confidence=confidence,
  80. qwen3_lines=sorted(list(q3_lines)),
  81. qwen25_lines=sorted(list(q25_lines)),
  82. agreement=agreement
  83. )
  84. def _determine_confidence(self, q3_lines: set, q25_lines: set,
  85. agreement: bool) -> ConfidenceLevel:
  86. """Determine confidence level based on model agreement"""
  87. if agreement:
  88. return ConfidenceLevel.HIGH
  89. elif q3_lines or q25_lines:
  90. return ConfidenceLevel.MEDIUM
  91. else:
  92. return ConfidenceLevel.LOW
  93. def _analyze_agreement(self, merged: List[MergedResult]):
  94. """Analyze agreement statistics"""
  95. total = len(merged)
  96. high_conf = sum(1 for m in merged if m.confidence == ConfidenceLevel.HIGH)
  97. medium_conf = sum(1 for m in merged if m.confidence == ConfidenceLevel.MEDIUM)
  98. low_conf = sum(1 for m in merged if m.confidence == ConfidenceLevel.LOW)
  99. self.logger.info("Agreement Analysis:")
  100. self.logger.info(f" High confidence (both agree): {high_conf} ({high_conf/total*100:.1f}%)")
  101. self.logger.info(f" Medium confidence (one flags): {medium_conf} ({medium_conf/total*100:.1f}%)")
  102. self.logger.info(f" Low confidence (neither flags): {low_conf} ({low_conf/total*100:.1f}%)")
  103. def _save_merged_results(self, merged: List[MergedResult]):
  104. """Save merged results"""
  105. results_data = []
  106. for m in merged:
  107. result_dict = {
  108. 'chunk_id': m.chunk_id,
  109. 'responsive_line_numbers': m.responsive_line_numbers,
  110. 'confidence': m.confidence.value,
  111. 'qwen3_lines': m.qwen3_lines,
  112. 'qwen25_lines': m.qwen25_lines,
  113. 'agreement': m.agreement,
  114. 'num_responsive': len(m.responsive_line_numbers)
  115. }
  116. results_data.append(result_dict)
  117. self.save_results(results_data, 'merged_results.json')
  118. if __name__ == "__main__":
  119. # Example usage - would need actual inference results
  120. merger = ResultsMerger(merge_strategy='union')
  121. print("Results merger created")
  122. # merged = merger.execute('qwen3_results.jsonl', 'qwen25_results.jsonl')