""" Step 7: Prepare data for dual Qwen inference. """ from typing import List, Optional from pathlib import Path import json from pipeline.models.base import PipelineStep from pipeline.common_defs import Chunk, CASE_NAME, SUBPOENA_CRITERIA class InferencePreparation(PipelineStep): """Prepare inference requests for Qwen models""" def __init__(self, few_shot_file: Optional[str] = None, output_dir: str = './pipeline_output'): super().__init__(output_dir) self.few_shot_file = few_shot_file def execute(self, chunks: List[Chunk]) -> str: """ Prepare inference requests for dual Qwen models. Args: chunks: List of filtered chunks Returns: Path to inference requests file """ self.logger.info("Preparing data for dual Qwen inference...") self.logger.info(f" Primary: Qwen 3 235B (state-of-the-art)") self.logger.info(f" Secondary: Qwen 2.5 72B (proven accuracy)") # Create inference requests requests = [] for chunk in chunks: request = self._create_request(chunk) requests.append(request) # Save requests filepath = self._save_requests(requests) self.logger.info(f"Created {len(requests):,} inference requests") self.logger.info(f"Saved to: {filepath}") return str(filepath) def _create_request(self, chunk: Chunk) -> dict: """Create inference request for a chunk""" # Format messages messages_text = "" for msg in chunk.messages: messages_text += f"Line {msg.line_number} [{msg.sender}]: {msg.message}\n" # Create full prompt prompt = f""" Review and classify the following messages. MESSAGES TO REVIEW (Lines {chunk.start_line}-{chunk.end_line}): {messages_text} Provide your response as valid JSON following the specified format. """ return { "chunk_id": chunk.chunk_id, "start_line": chunk.start_line, "end_line": chunk.end_line, "prompt": prompt, "num_messages": len(chunk.messages), } def _save_requests(self, requests: List[dict]) -> Path: """Save inference requests to JSONL""" filepath = self.output_dir / "inference_requests.jsonl" with open(filepath, 'w') as f: for req in requests: f.write(json.dumps(req) + '\n') return filepath if __name__ == "__main__": # Example usage import json from pipeline.common_defs import Chunk, Message with open('pipeline_output/semantic_filtered_chunks.json', 'r') as f: data = json.load(f) # Reconstruct chunks (simplified) chunks = [] for item in data['filtered_chunks'][:10]: # First 10 for testing chunk = Chunk( chunk_id=item['chunk_id'], start_line=item['start_line'], end_line=item['end_line'], messages=[Message(item['start_line'], "", "Sender", "Sample", "")], combined_text="", timestamp_start="", timestamp_end="" ) chunks.append(chunk) prep = InferencePreparation() requests_file = prep.execute(chunks) print(f"Requests file: {requests_file}")