""" Inference runner for dual Qwen models. """ import json import requests from typing import List, Dict from pathlib import Path import logging from tqdm import tqdm class InferenceRunner: """Run inference on dual Qwen models""" def __init__(self, qwen3_url: str = "http://localhost:8000", qwen25_url: str = "http://localhost:8001", output_dir: str = "./pipeline_output"): self.qwen3_url = qwen3_url self.qwen25_url = qwen25_url self.output_dir = Path(output_dir) self.logger = logging.getLogger("InferenceRunner") self.logger.setLevel(logging.INFO) if not self.logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") handler.setFormatter(formatter) self.logger.addHandler(handler) def load_requests(self, requests_file: str) -> List[Dict]: """Load inference requests from JSONL file""" requests_data = [] with open(requests_file, "r") as f: for line in f: requests_data.append(json.loads(line)) self.logger.info(f"Loaded {len(requests_data)} inference requests") return requests_data def run_inference(self, requests_file: str, temperature: float = 0.1, max_tokens: int = 500): """Run inference on both models""" self.logger.info("=" * 80) self.logger.info("RUNNING DUAL QWEN INFERENCE") self.logger.info("=" * 80) requests_data = self.load_requests(requests_file) self.logger.info("\nRunning Qwen 3 235B inference...") qwen3_results = self._run_model_inference( requests_data, self.qwen3_url, "Qwen3-235B", temperature, max_tokens ) qwen3_file = self.output_dir / "qwen3_results.jsonl" self._save_results(qwen3_results, qwen3_file) self.logger.info("\nRunning Qwen 2.5 72B inference...") qwen25_results = self._run_model_inference( requests_data, self.qwen25_url, "Qwen2.5-72B", temperature, max_tokens ) qwen25_file = self.output_dir / "qwen25_results.jsonl" self._save_results(qwen25_results, qwen25_file) self.logger.info("\n" + "=" * 80) self.logger.info("INFERENCE COMPLETE") self.logger.info("=" * 80) return str(qwen3_file), str(qwen25_file) def _run_model_inference(self, requests_data: List[Dict], model_url: str, model_name: str, temperature: float, max_tokens: int) -> List[Dict]: """Run inference on a single model""" results = [] for req in tqdm(requests_data, desc=f"{model_name} inference"): try: response = requests.post( f"{model_url}/v1/completions", json={ "prompt": req["prompt"], "max_tokens": max_tokens, "temperature": temperature }, timeout=60 ) if response.status_code == 200: result = self._parse_response(response.json(), req, model_name) results.append(result) else: results.append(self._create_error_result(req, model_name)) except Exception as e: self.logger.error(f"Exception for chunk {req['chunk_id']}: {e}") results.append(self._create_error_result(req, model_name)) return results def _parse_response(self, response: Dict, request: Dict, model_name: str) -> Dict: """Parse model response""" try: text = response["choices"][0]["text"] parsed = json.loads(text) return { "chunk_id": request["chunk_id"], "responsive_line_numbers": parsed.get("responsive_line_numbers", []), "reasoning": parsed.get("reasoning", ""), "confidence": parsed.get("confidence", "medium"), "model_name": model_name } except Exception: return self._create_error_result(request, model_name) def _create_error_result(self, request: Dict, model_name: str) -> Dict: """Create error result""" return { "chunk_id": request["chunk_id"], "responsive_line_numbers": [], "reasoning": "Error during inference", "confidence": "low", "model_name": model_name, "error": True } def _save_results(self, results: List[Dict], filepath: Path): """Save results to JSONL""" with open(filepath, "w") as f: for result in results: f.write(json.dumps(result) + "\n") self.logger.info(f"Saved {len(results)} results to {filepath}") if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Run dual Qwen inference") parser.add_argument("requests_file", help="Path to inference requests JSONL") parser.add_argument("--qwen3-url", default="http://localhost:8000") parser.add_argument("--qwen25-url", default="http://localhost:8001") parser.add_argument("--output-dir", default="./pipeline_output") args = parser.parse_args() runner = InferenceRunner(args.qwen3_url, args.qwen25_url, args.output_dir) runner.run_inference(args.requests_file)