""" Step 0b (Alternative): LLM-based text normalization analysis. Uses deployed LLM to identify unclear terms and unknown acronyms. """ from typing import List, Dict import pandas as pd import json import requests from collections import Counter import re from pipeline.models.base import PipelineStep class LLMNormalizationAnalyzer(PipelineStep): """ Use LLM to analyze text and identify unclear terms and unknown acronyms. """ def __init__( self, llm_url: str = "http://localhost:8000", sample_size: int = 500, output_dir: str = "./pipeline_output", model: str = "", ): super().__init__(output_dir) self.llm_url = llm_url self.sample_size = sample_size self.model = model def execute(self, df: pd.DataFrame) -> Dict[str, List[Dict]]: """ Use LLM to identify unclear terms and unknown acronyms. Args: df: DataFrame with messages Returns: Dictionary with identified terms and acronyms """ self.logger.info("=" * 80) self.logger.info("LLM-BASED TEXT NORMALIZATION ANALYSIS") self.logger.info("=" * 80) self.logger.info(f"Using LLM at: {self.llm_url}") # Extract frequent words and acronyms word_freq, acronym_freq = self._extract_terms(df) # Sample messages for LLM analysis sample_df = df.sample(n=min(self.sample_size, len(df)), random_state=42) all_unknown_acronyms = [] all_unclear_terms = [] all_expansions = [] for i in range(0, len(df), 100): chunk = df.iloc[i : i + 100] messages_sample = "\n".join(chunk["message"].fillna("").tolist()) # Analyze with LLM self.logger.info("\\nAnalyzing with LLM...") # Get unknown acronyms unknown_acronyms = self._identify_acronyms_with_llm( messages_sample, list(acronym_freq.keys())[:50] ) all_unknown_acronyms = list(set(all_unknown_acronyms + unknown_acronyms)) # Get unclear terms unclear_terms = self._identify_unclear_terms_with_llm( messages_sample, list(word_freq.keys())[:100] ) all_unclear_terms = list(set(all_unclear_terms + unclear_terms)) # Get expansion suggestions expansions = self._get_expansion_suggestions_with_llm( messages_sample, unknown_acronyms ) all_expansions = list(set(all_expansions + expansions)) results = { "unknown_acronyms": all_unknown_acronyms, "unclear_terms": all_unclear_terms, "suggested_expansions": all_expansions, } self._save_llm_analysis(results) return results def _extract_terms(self, df: pd.DataFrame) -> tuple: """Extract words and potential acronyms""" word_freq = Counter() acronym_freq = Counter() for message in df["message"].fillna(""): text = str(message) # Extract words words = re.findall(r"\\b[a-z]+\\b", text.lower()) word_freq.update(words) # Extract potential acronyms (2-6 uppercase letters) acronyms = re.findall(r"\\b[A-Z]{2,6}\\b", text) acronym_freq.update([a.lower() for a in acronyms]) return word_freq, acronym_freq def _identify_acronyms_with_llm( self, messages_sample: str, acronym_candidates: List[str] ) -> List[Dict]: """Use LLM to identify unknown acronyms""" prompt = f"""You are analyzing messages. ACRONYMS FOUND: {', '.join(acronym_candidates[:30])} SAMPLE MESSAGES: {messages_sample[:2000]} Task: Identify which acronyms are UNKNOWN or UNCLEAR (not standard medical/legal acronyms). For each unknown acronym, try to infer its meaning from context. Respond with JSON: {{ "unknown_acronyms": [ {{"acronym": "XYZ", "possible_meaning": "...", "confidence": "high/medium/low"}}, ... ] }}""" try: response = requests.post( f"{self.llm_url}/v1/chat/completions", json={"prompt": prompt, "max_tokens": 1000, "temperature": 0.3}, timeout=120, ) if response.status_code == 200: text = response.json()["choices"][0]["text"] parsed = json.loads(text) return parsed.get("unknown_acronyms", []) except Exception as e: self.logger.error(f"LLM error: {e}") return [] def _identify_unclear_terms_with_llm( self, messages_sample: str, word_candidates: List[str] ) -> List[Dict]: """Use LLM to identify unclear terms""" prompt = f"""You are analyzing messages. FREQUENT WORDS: {', '.join(word_candidates[:50])} SAMPLE MESSAGES: {messages_sample[:2000]} Task: Identify words that are UNCLEAR, AMBIGUOUS, or may be TYPOS/SLANG. Focus on words that: - Have unclear meaning in context - May be misspellings - Are slang or informal terms - Need clarification for legal purposes Respond with JSON: {{ "unclear_terms": [ {{"term": "word", "reason": "...", "suggested_clarification": "..."}}, ... ] }}""" try: response = requests.post( f"{self.llm_url}/v1/chat/completions", json={"prompt": prompt, "max_tokens": 1000, "temperature": 0.3}, timeout=120, ) if response.status_code == 200: text = response.json()["choices"][0]["text"] parsed = json.loads(text) return parsed.get("unclear_terms", []) except Exception as e: self.logger.error(f"LLM error: {e}") return [] def _get_expansion_suggestions_with_llm( self, messages_sample: str, acronyms: List[Dict] ) -> List[Dict]: """Get expansion suggestions for acronyms""" if not acronyms: return [] acronym_list = ", ".join([a["acronym"] for a in acronyms[:10]]) prompt = f"""Based on these medical/legal messages, suggest expansions for these acronyms: ACRONYMS: {acronym_list} SAMPLE MESSAGES: {messages_sample[:2000]} Respond with JSON: {{ "expansions": [ {{"acronym": "ABC", "expansion": "full form", "confidence": "high/medium/low"}}, ... ] }}""" try: response = requests.post( f"{self.llm_url}/v1/chat/completions", json={"prompt": prompt, "max_tokens": 800, "temperature": 0.3}, timeout=120, ) if response.status_code == 200: text = response.json()["choices"][0]["text"] parsed = json.loads(text) return parsed.get("expansions", []) except Exception as e: self.logger.error(f"LLM error: {e}") return [] def _save_llm_analysis(self, results: Dict): """Save LLM analysis results""" self.save_results(results, "llm_normalization_analysis.json") # Save text text_output = [] text_output.append("LLM-BASED TEXT NORMALIZATION ANALYSIS") text_output.append("=" * 80) text_output.append("") text_output.append("UNKNOWN ACRONYMS:") text_output.append("-" * 80) for item in results["unknown_acronyms"]: text_output.append( f" {item['acronym']}: {item.get('possible_meaning', 'Unknown')}" ) text_output.append("") text_output.append("UNCLEAR TERMS:") text_output.append("-" * 80) for item in results["unclear_terms"]: text_output.append(f" {item['term']}: {item.get('reason', 'Unclear')}") text_output.append("") text_output.append("SUGGESTED EXPANSIONS:") text_output.append("-" * 80) for item in results["suggested_expansions"]: text_output.append(f" {item['acronym']} -> {item['expansion']}") filepath = self.output_dir / "llm_normalization_analysis.txt" with open(filepath, "w") as f: f.write("\\n".join(text_output)) self.logger.info(f"Saved analysis to: {filepath}") if __name__ == "__main__": import pandas as pd df = pd.read_csv("../_sources/signal_messages.csv") analyzer = LLMNormalizationAnalyzer( llm_url="http://localhost:8000", sample_size=500 ) results = analyzer.execute(df) print(f"\\nFound {len(results['unknown_acronyms'])} unknown acronyms") print(f"Found {len(results['unclear_terms'])} unclear terms")