""" Step 0b (Alternative): LLM-based text normalization analysis. Uses deployed LLM to identify unclear terms and unknown acronyms. """ from typing import List, Dict import pandas as pd import requests from collections import Counter import re from pipeline.models.base import PipelineStep from json_repair import loads class LLMNormalizationAnalyzer(PipelineStep): """ Use LLM to analyze text and identify unclear terms and unknown acronyms. """ def __init__( self, llm_url: str = "http://localhost:8000", sample_size: int = 500, output_dir: str = "./pipeline_output", model: str = "", ): super().__init__(output_dir) self.llm_url = llm_url self.sample_size = sample_size self.model = model def execute(self, df: pd.DataFrame) -> Dict[str, List[Dict]]: """ Use LLM to identify unclear terms and unknown acronyms. Args: df: DataFrame with messages Returns: Dictionary with identified terms and acronyms (deduplicated) """ self.logger.info("=" * 80) self.logger.info("LLM-BASED TEXT NORMALIZATION ANALYSIS") self.logger.info("=" * 80) self.logger.info(f"Using LLM at: {self.llm_url}") # Extract frequent words and acronyms word_freq, acronym_freq = self._extract_terms(df) # Track results with deduplication acronym_dict = {} # key: acronym, value: dict with metadata term_dict = {} # key: term, value: dict with metadata expansion_dict = {} # key: acronym, value: dict with metadata sample_df = df.sample(n=min(self.sample_size, len(df)), random_state=42) # Process in chunks for i in range(0, len(sample_df), 100): chunk = sample_df.iloc[i : i + 100] messages_sample = "\n".join(chunk["message"].fillna("").tolist()) self.logger.info( f"Analyzing chunk {i//100 + 1} of {(len(sample_df)-1)//100 + 1}..." ) # Get unknown acronyms unknown_acronyms = self._identify_acronyms_with_llm( messages_sample, list(acronym_freq.keys())[:50] ) for item in unknown_acronyms: acronym = item.get("acronym", "").lower() if acronym and acronym not in acronym_dict: acronym_dict[acronym] = item # Get unclear terms unclear_terms = self._identify_unclear_terms_with_llm( messages_sample, list(word_freq.keys())[:100] ) for item in unclear_terms: term = item.get("term", "").lower() if term and term not in term_dict: term_dict[term] = item # Get expansion suggestions (use acronyms found in this chunk) if unknown_acronyms: expansions = self._get_expansion_suggestions_with_llm( messages_sample, unknown_acronyms ) for item in expansions: if isinstance(item, dict): acronym = item.get("acronym", "").lower() if acronym and acronym not in expansion_dict: expansion_dict[acronym] = item # Convert dictionaries back to lists results = { "unknown_acronyms": list(acronym_dict.values()), "unclear_terms": list(term_dict.values()), "suggested_expansions": list(expansion_dict.values()), } self.logger.info( f"Found {len(results['unknown_acronyms'])} unique unknown acronyms" ) self.logger.info(f"Found {len(results['unclear_terms'])} unique unclear terms") self.logger.info( f"Found {len(results['suggested_expansions'])} unique expansions" ) self._save_llm_analysis(results) return results def _extract_terms(self, df: pd.DataFrame) -> tuple: """Extract words and potential acronyms""" word_freq = Counter() acronym_freq = Counter() for message in df["message"].fillna(""): text = str(message) # Extract words words = re.findall(r"\b[a-z]+\b", text.lower()) word_freq.update(words) # Extract potential acronyms (2-6 uppercase letters) acronyms = re.findall(r"\b[A-Z]{2,6}\b", text) acronym_freq.update([a.lower() for a in acronyms]) return word_freq, acronym_freq def _identify_acronyms_with_llm( self, messages_sample: str, acronym_candidates: List[str] ) -> List[Dict]: """Use LLM to identify unknown acronyms""" self.logger.info("identifying acronyms...") prompt = f"""You are analyzing messages. ACRONYMS FOUND: {', '.join(acronym_candidates[:100])} SAMPLE MESSAGES: {messages_sample[:2000]} Task: Identify which acronyms are UNKNOWN or UNCLEAR (not standard medical/legal acronyms). For each unknown acronym, try to infer its meaning from context. Respond with JSON: {{ "unknown_acronyms": [ {{"acronym": "XYZ", "possible_meaning": "...", "confidence": "high/medium/low"}}, ... ] }}""" try: # print( # json.dumps( # { # "model": self.model, # "messages": [{"role": "user", "content": prompt}], # "max_tokens": 2048, # "temperature": 0.3, # }, # indent=2, # ) # ) response = requests.post( f"{self.llm_url}/v1/chat/completions", json={ "model": self.model, "messages": [{"role": "user", "content": prompt}], "max_tokens": 2048, "temperature": 0.3, }, timeout=120, ) if response.status_code == 200: text = response.json()["choices"][0]["message"]["content"] parsed = loads(text) if isinstance(parsed, dict): return parsed.get("unknown_acronyms", []) else: raise RuntimeError("LLM Error") except Exception as e: # raise e self.logger.error(f"LLM error: {e}") return [] def _identify_unclear_terms_with_llm( self, messages_sample: str, word_candidates: List[str] ) -> List[Dict]: """Use LLM to identify unclear terms""" self.logger.info("identifying unclear terms...") prompt = f"""You are analyzing messages. FREQUENT WORDS: {', '.join(word_candidates[:100])} SAMPLE MESSAGES: {messages_sample[:2000]} Task: Identify words that are UNCLEAR, AMBIGUOUS, or may be TYPOS/SLANG. Focus on words that: - Have unclear meaning in context - May be misspellings - Are slang or informal terms - Need clarification for legal purposes Respond with JSON: {{ "unclear_terms": [ {{"term": "word", "reason": "...", "suggested_clarification": "..."}}, ... ] }}""" try: response = requests.post( f"{self.llm_url}/v1/chat/completions", json={ "model": self.model, "messages": [{"role": "user", "content": prompt}], "max_tokens": 2048, "temperature": 0.3, }, timeout=120, ) if response.status_code == 200: text = response.json()["choices"][0]["message"]["content"] parsed = loads(text) if isinstance(parsed, dict): return parsed.get("unclear_terms", []) else: raise RuntimeError("LLM Error") except Exception as e: # raise e self.logger.error(f"LLM error: {e}") return [] def _get_expansion_suggestions_with_llm( self, messages_sample: str, acronyms: List[Dict] ) -> List[Dict]: """Get expansion suggestions for acronyms""" self.logger.info("getting expansion suggestions...") if not acronyms: return [] acronym_list = ", ".join([a["acronym"] for a in acronyms[:100]]) prompt = f"""Based on these medical/legal messages, suggest expansions for these acronyms: ACRONYMS: {acronym_list} SAMPLE MESSAGES: {messages_sample[:2000]} Respond with JSON: {{ "expansions": [ {{"acronym": "ABC", "expansion": "full form", "confidence": "high/medium/low"}}, ... ] }}""" try: # print( # json.dumps( # { # "model": self.model, # "messages": [{"role": "user", "content": prompt}], # "max_tokens": 2048, # "temperature": 0.3, # }, # indent=2, # ) # ) response = requests.post( f"{self.llm_url}/v1/chat/completions", json={ "model": self.model, "messages": [{"role": "user", "content": prompt}], "max_tokens": 2048, "temperature": 0.3, }, timeout=120, ) if response.status_code == 200: text = response.json()["choices"][0]["message"]["content"] parsed = loads(text) if isinstance(parsed, dict): return parsed.get("expansions", []) else: raise RuntimeError("LLM Error") except Exception as e: # raise e self.logger.error(f"LLM error: {e}") return [] def _save_llm_analysis(self, results: Dict): """Save LLM analysis results""" self.save_results(results, "llm_normalization_analysis.json") # Save text text_output = [] text_output.append("LLM-BASED TEXT NORMALIZATION ANALYSIS") text_output.append("=" * 80) text_output.append("") text_output.append("UNKNOWN ACRONYMS:") text_output.append("-" * 80) for item in results["unknown_acronyms"]: text_output.append( f" {item['acronym']}: {item.get('possible_meaning', 'Unknown')}" ) text_output.append("") text_output.append("UNCLEAR TERMS:") text_output.append("-" * 80) for item in results["unclear_terms"]: text_output.append(f" {item['term']}: {item.get('reason', 'Unclear')}") text_output.append("") text_output.append("SUGGESTED EXPANSIONS:") text_output.append("-" * 80) for item in results["suggested_expansions"]: text_output.append(f" {item['acronym']} -> {item['expansion']}") filepath = self.output_dir / "llm_normalization_analysis.txt" with open(filepath, "w") as f: f.write("\n".join(text_output)) self.logger.info(f"Saved analysis to: {filepath}") if __name__ == "__main__": import pandas as pd df = pd.read_csv("../_sources/signal_messages.csv") analyzer = LLMNormalizationAnalyzer( llm_url="http://eos.dgtlu.net:11434", sample_size=14000, model="hf.co/bartowski/Qwen2.5-14B-Instruct-GGUF:Q4_K_S", ) results = analyzer.execute(df) print(f"Found {len(results['unknown_acronyms'])} unknown acronyms") print(f"Found {len(results['unclear_terms'])} unclear terms")