| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- """
- Step 7: Prepare data for dual Qwen inference.
- """
- from typing import List, Optional
- from pathlib import Path
- import json
- from pipeline.models.base import PipelineStep
- from pipeline.common_defs import Chunk, CASE_NAME, SUBPOENA_CRITERIA
- class InferencePreparation(PipelineStep):
- """Prepare inference requests for Qwen models"""
- def __init__(self, few_shot_file: Optional[str] = None,
- output_dir: str = './pipeline_output'):
- super().__init__(output_dir)
- self.few_shot_file = few_shot_file
- def execute(self, chunks: List[Chunk]) -> str:
- """
- Prepare inference requests for dual Qwen models.
-
- Args:
- chunks: List of filtered chunks
-
- Returns:
- Path to inference requests file
- """
- self.logger.info("Preparing data for dual Qwen inference...")
- self.logger.info(f" Primary: Qwen 3 235B (state-of-the-art)")
- self.logger.info(f" Secondary: Qwen 2.5 72B (proven accuracy)")
- # Create inference requests
- requests = []
- for chunk in chunks:
- request = self._create_request(chunk)
- requests.append(request)
- # Save requests
- filepath = self._save_requests(requests)
- self.logger.info(f"Created {len(requests):,} inference requests")
- self.logger.info(f"Saved to: {filepath}")
- return str(filepath)
- def _create_request(self, chunk: Chunk) -> dict:
- """Create inference request for a chunk"""
- # Format messages
- messages_text = ""
- for msg in chunk.messages:
- messages_text += f"Line {msg.line_number} [{msg.sender}]: {msg.message}\n"
- # Create full prompt
- prompt = f"""
- Review and classify the following messages.
- MESSAGES TO REVIEW (Lines {chunk.start_line}-{chunk.end_line}):
- {messages_text}
- Provide your response as valid JSON following the specified format.
- """
- return {
- "chunk_id": chunk.chunk_id,
- "start_line": chunk.start_line,
- "end_line": chunk.end_line,
- "prompt": prompt,
- "num_messages": len(chunk.messages),
- }
- def _save_requests(self, requests: List[dict]) -> Path:
- """Save inference requests to JSONL"""
- filepath = self.output_dir / "inference_requests.jsonl"
- with open(filepath, 'w') as f:
- for req in requests:
- f.write(json.dumps(req) + '\n')
- return filepath
- if __name__ == "__main__":
- # Example usage
- import json
- from pipeline.common_defs import Chunk, Message
-
- with open('pipeline_output/semantic_filtered_chunks.json', 'r') as f:
- data = json.load(f)
-
- # Reconstruct chunks (simplified)
- chunks = []
- for item in data['filtered_chunks'][:10]: # First 10 for testing
- chunk = Chunk(
- chunk_id=item['chunk_id'],
- start_line=item['start_line'],
- end_line=item['end_line'],
- messages=[Message(item['start_line'], "", "Sender", "Sample", "")],
- combined_text="",
- timestamp_start="",
- timestamp_end=""
- )
- chunks.append(chunk)
-
- prep = InferencePreparation()
- requests_file = prep.execute(chunks)
- print(f"Requests file: {requests_file}")
|