""" Step 0a (Alternative): LLM-based keyword identification. """ from typing import List, Dict import pandas as pd import json import requests from pipeline.models.base import PipelineStep from pipeline.common_defs import SUBPOENA_CRITERIA MODEL = "hf.co/bartowski/Qwen2.5-14B-Instruct-GGUF:Q4_K_S" class LLMKeywordIdentifier(PipelineStep): """Identify keywords using LLM analysis""" def __init__(self, llm_url: str = "http://localhost:8000", sample_size: int = 1000, output_dir: str = "./pipeline_output"): super().__init__(output_dir) self.llm_url = llm_url self.sample_size = sample_size def execute(self, df: pd.DataFrame) -> Dict[int, List[str]]: """Use LLM to identify relevant keywords""" self.logger.info("LLM-BASED KEYWORD IDENTIFICATION") sample_df = df.sample(n=min(self.sample_size, len(df)), random_state=42) keywords_by_criterion = {} for num, desc in SUBPOENA_CRITERIA.items(): keywords = self._identify_keywords_for_criterion(sample_df, num, desc) keywords_by_criterion[num] = keywords self.logger.info(f"Criterion {num}: {len(keywords)} keywords") self._save_llm_keywords(keywords_by_criterion) return keywords_by_criterion def _identify_keywords_for_criterion(self, df, num, desc): """Use LLM to identify keywords""" all_keywords = [] # Loop through df in chunks of 100 rows for i in range(0, len(df), 100): chunk = df.iloc[i : i + 100] messages_sample = "\n".join(chunk["message"].fillna("").tolist()) prompt = f"Identify 30-50 keywords for: {desc}\n\nMessages:\n{messages_sample}\n\nJSON:" try: response = requests.post( f"{self.llm_url}/v1/chat/completions", json={"prompt": prompt, "max_tokens": 1000, "model": MODEL}, timeout=120, ) if response.status_code == 200: text = response.json()["choices"][0]["text"] parsed = json.loads(text) keywords = parsed.get("keywords", []) all_keywords.extend(keywords) except Exception as e: self.logger.error(f"Error processing chunk {i//100 + 1}: {e}") # Remove duplicates while preserving order seen = set() unique_keywords = [] for keyword in all_keywords: if keyword not in seen: seen.add(keyword) unique_keywords.append(keyword) return unique_keywords def _save_llm_keywords(self, keywords_by_criterion): """Save LLM keywords""" results = {"method": "llm_analysis", "criteria": {str(n): {"keywords": k} for n, k in keywords_by_criterion.items()}} self.save_results(results, "llm_keywords.json") self.logger.info("Saved LLM keywords")