""" Step 0a (Alternative): LLM-based keyword identification. """ from typing import List, Dict import pandas as pd import requests from pipeline.models.base import PipelineStep from pipeline.common_defs import SUBPOENA_CRITERIA from json_repair import loads from tqdm import tqdm from pipeline.utils.text_utils import normalize_text from concurrent.futures import ThreadPoolExecutor, as_completed class LLMKeywordIdentifier(PipelineStep): """Identify keywords using LLM analysis""" def __init__( self, llm_url: str = "http://localhost:8000", sample_size: int = 1000, model: str = "Qwen/Qwen2.5-14B-Instruct-GPTQ-Int8", output_dir: str = "./pipeline_output", ): super().__init__(output_dir) self.llm_url = llm_url self.sample_size = sample_size self.model = model def execute(self, df: pd.DataFrame) -> Dict[int, List[str]]: """Use LLM to identify relevant keywords""" self.logger.info("LLM-BASED KEYWORD IDENTIFICATION") sample_df = df.sample(n=min(self.sample_size, len(df)), random_state=42) keywords_by_criterion = {} for num, desc in SUBPOENA_CRITERIA.items(): self.logger.info(f"getting keywords for {num}...") keywords = self._identify_keywords_for_criterion(sample_df, num, desc) keywords_by_criterion[num] = keywords self.logger.info(f"Criterion {num}: {len(keywords)} keywords") self._save_llm_keywords(keywords_by_criterion) return keywords_by_criterion def _identify_keywords_for_criterion(self, df, num, desc): """Use LLM to identify keywords - OPTIMIZED""" all_keywords = [] chunk_size = 200 chunks = [df.iloc[i : i + chunk_size] for i in range(0, len(df), chunk_size)] def process_chunk(chunk_data): """Process a single chunk""" chunk_idx, chunk = chunk_data # Normalize text before processing messages_sample = "\n".join( normalize_text(msg) for msg in chunk["message"].fillna("").tolist() ) prompt = f"Identify keywords for: {desc}\n\nMessages:\n{messages_sample}\n\nJSON:" try: response = requests.post( f"{self.llm_url}/v1/chat/completions", # headers={ # "Authorization": "Bearer " # }, json={ "model": self.model, "messages": [{"role": "user", "content": prompt}], "max_tokens": 2048, "temperature": 0.05, }, timeout=120, ) if response.status_code == 200: text = response.json()["choices"][0]["message"]["content"] parsed = loads(text) if isinstance(parsed, dict): return parsed.get("keywords", []) else: self.logger.warning( f"LLM returned status {response.status_code} for chunk {chunk_idx}" ) return [] except Exception as e: self.logger.error(f"Error processing chunk {chunk_idx}: {e}") return [] # Process chunks in parallel max_workers = 4 # Adjust based on your LLM server capacity with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all tasks future_to_chunk = { executor.submit(process_chunk, (i, chunk)): i for i, chunk in enumerate(chunks) } # Process results as they complete with progress bar with tqdm(desc="chunks", total=len(chunks)) as pbar: for future in as_completed(future_to_chunk): keywords = future.result() if isinstance(keywords, list): all_keywords.extend(keywords) pbar.update(1) # Remove duplicates while preserving order (more efficient) seen = set() unique_keywords = [ kw for kw in all_keywords if kw not in seen and not seen.add(kw) ] return unique_keywords def _save_llm_keywords(self, keywords_by_criterion): """Save LLM keywords""" results = {"method": "llm_analysis", "criteria": {str(n): {"keywords": k} for n, k in keywords_by_criterion.items()}} self.save_results(results, "llm_keywords.json") self.logger.info("Saved LLM keywords")