| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- """
- Inference runner for dual Qwen models.
- """
- import json
- import requests
- from typing import List, Dict
- from pathlib import Path
- import logging
- from tqdm import tqdm
- class InferenceRunner:
- """Run inference on dual Qwen models"""
-
- def __init__(self, qwen3_url: str = "http://localhost:8000",
- qwen25_url: str = "http://localhost:8001",
- output_dir: str = "./pipeline_output"):
- self.qwen3_url = qwen3_url
- self.qwen25_url = qwen25_url
- self.output_dir = Path(output_dir)
-
- self.logger = logging.getLogger("InferenceRunner")
- self.logger.setLevel(logging.INFO)
-
- if not self.logger.handlers:
- handler = logging.StreamHandler()
- formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
- handler.setFormatter(formatter)
- self.logger.addHandler(handler)
-
- def load_requests(self, requests_file: str) -> List[Dict]:
- """Load inference requests from JSONL file"""
- requests_data = []
-
- with open(requests_file, "r") as f:
- for line in f:
- requests_data.append(json.loads(line))
-
- self.logger.info(f"Loaded {len(requests_data)} inference requests")
- return requests_data
-
- def run_inference(self, requests_file: str,
- temperature: float = 0.1,
- max_tokens: int = 500):
- """Run inference on both models"""
- self.logger.info("=" * 80)
- self.logger.info("RUNNING DUAL QWEN INFERENCE")
- self.logger.info("=" * 80)
-
- requests_data = self.load_requests(requests_file)
-
- self.logger.info("\nRunning Qwen 3 235B inference...")
- qwen3_results = self._run_model_inference(
- requests_data, self.qwen3_url, "Qwen3-235B", temperature, max_tokens
- )
-
- qwen3_file = self.output_dir / "qwen3_results.jsonl"
- self._save_results(qwen3_results, qwen3_file)
-
- self.logger.info("\nRunning Qwen 2.5 72B inference...")
- qwen25_results = self._run_model_inference(
- requests_data, self.qwen25_url, "Qwen2.5-72B", temperature, max_tokens
- )
-
- qwen25_file = self.output_dir / "qwen25_results.jsonl"
- self._save_results(qwen25_results, qwen25_file)
-
- self.logger.info("\n" + "=" * 80)
- self.logger.info("INFERENCE COMPLETE")
- self.logger.info("=" * 80)
-
- return str(qwen3_file), str(qwen25_file)
-
- def _run_model_inference(self, requests_data: List[Dict],
- model_url: str, model_name: str,
- temperature: float, max_tokens: int) -> List[Dict]:
- """Run inference on a single model"""
- results = []
-
- for req in tqdm(requests_data, desc=f"{model_name} inference"):
- try:
- response = requests.post(
- f"{model_url}/v1/completions",
- json={
- "prompt": req["prompt"],
- "max_tokens": max_tokens,
- "temperature": temperature
- },
- timeout=60
- )
-
- if response.status_code == 200:
- result = self._parse_response(response.json(), req, model_name)
- results.append(result)
- else:
- results.append(self._create_error_result(req, model_name))
-
- except Exception as e:
- self.logger.error(f"Exception for chunk {req['chunk_id']}: {e}")
- results.append(self._create_error_result(req, model_name))
-
- return results
-
- def _parse_response(self, response: Dict, request: Dict, model_name: str) -> Dict:
- """Parse model response"""
- try:
- text = response["choices"][0]["text"]
- parsed = json.loads(text)
-
- return {
- "chunk_id": request["chunk_id"],
- "responsive_line_numbers": parsed.get("responsive_line_numbers", []),
- "reasoning": parsed.get("reasoning", ""),
- "confidence": parsed.get("confidence", "medium"),
- "model_name": model_name
- }
- except Exception:
- return self._create_error_result(request, model_name)
-
- def _create_error_result(self, request: Dict, model_name: str) -> Dict:
- """Create error result"""
- return {
- "chunk_id": request["chunk_id"],
- "responsive_line_numbers": [],
- "reasoning": "Error during inference",
- "confidence": "low",
- "model_name": model_name,
- "error": True
- }
-
- def _save_results(self, results: List[Dict], filepath: Path):
- """Save results to JSONL"""
- with open(filepath, "w") as f:
- for result in results:
- f.write(json.dumps(result) + "\n")
-
- self.logger.info(f"Saved {len(results)} results to {filepath}")
- if __name__ == "__main__":
- import argparse
-
- parser = argparse.ArgumentParser(description="Run dual Qwen inference")
- parser.add_argument("requests_file", help="Path to inference requests JSONL")
- parser.add_argument("--qwen3-url", default="http://localhost:8000")
- parser.add_argument("--qwen25-url", default="http://localhost:8001")
- parser.add_argument("--output-dir", default="./pipeline_output")
-
- args = parser.parse_args()
-
- runner = InferenceRunner(args.qwen3_url, args.qwen25_url, args.output_dir)
- runner.run_inference(args.requests_file)
|