""" Step 0b: Semantic text normalization analysis using embeddings and LLM. Identifies unclear terms, unknown acronyms, and ambiguous words. """ from typing import List, Dict, Set, Tuple from collections import Counter import pandas as pd import numpy as np import re from sentence_transformers import SentenceTransformer from sklearn.metrics.pairwise import cosine_similarity from pipeline.models.base import PipelineStep from pipeline.utils.text_utils import normalize_text class SemanticNormalizationAnalyzer(PipelineStep): """ Analyze text using semantic methods to identify: 1. Unclear/ambiguous terms (low semantic coherence) 2. Unknown acronyms (uppercase patterns not in dictionary) 3. Domain-specific jargon 4. Abbreviations needing expansion """ def __init__( self, min_frequency: int = 3, coherence_threshold: float = 0.4, output_dir: str = "./pipeline_output", ): super().__init__(output_dir) self.min_frequency = min_frequency self.coherence_threshold = coherence_threshold self.logger.info("Loading embedding model: all-mpnet-base-v2...") self.embedding_model = SentenceTransformer("all-mpnet-base-v2") # Known medical/legal terms (high coherence expected) self.known_terms = { "doctor", "hospital", "treatment", "patient", "medical", "surgery", "appointment", "medication", "diagnosis", "procedure", "discrimination", "complaint", "lawsuit", "legal", "attorney", } # Known acronyms (to exclude from unknown list) self.known_acronyms = { "msk", "er", "icu", "ob", "gyn", "pcp", "np", "pa", "rn", "emr", "ehr", "hipaa", "lgbtq", "lgbt", "usa", "nyc", } def execute(self, df: pd.DataFrame) -> Dict[str, List[Dict]]: """ Analyze text to identify unclear terms and unknown acronyms. Args: df: DataFrame with messages Returns: Dictionary with unclear terms, unknown acronyms, and suggestions """ self.logger.info("=" * 80) self.logger.info("SEMANTIC TEXT NORMALIZATION ANALYSIS") self.logger.info("=" * 80) self.logger.info(f"Analyzing {len(df):,} messages") # Extract words with metadata self.logger.info("\nExtracting words and computing frequencies...") word_data = self._extract_word_data(df) self.logger.info(f"Found {len(word_data):,} unique words") # Identify unknown acronyms self.logger.info("\nIdentifying unknown acronyms...") unknown_acronyms = self._identify_unknown_acronyms(word_data) self.logger.info(f"Found {len(unknown_acronyms)} unknown acronyms") # Identify unclear terms using semantic coherence self.logger.info("\nAnalyzing semantic coherence for unclear terms...") unclear_terms = self._identify_unclear_terms(word_data, df) self.logger.info(f"Found {len(unclear_terms)} unclear terms") # Identify abbreviations self.logger.info("\nIdentifying abbreviations...") abbreviations = self._identify_abbreviations(word_data) self.logger.info(f"Found {len(abbreviations)} abbreviations") # Identify domain-specific jargon self.logger.info("\nIdentifying domain-specific jargon...") jargon = self._identify_jargon(word_data) self.logger.info(f"Found {len(jargon)} jargon terms") # Compile results results = { "unknown_acronyms": unknown_acronyms, "unclear_terms": unclear_terms, "abbreviations": abbreviations, "jargon": jargon, } # Save results self._save_normalization_analysis(results) return results def _extract_word_data(self, df: pd.DataFrame) -> Dict[str, Dict]: """Extract words with frequency and context""" word_data = {} for message in df["message"].fillna(""): text = str(message) # Extract words with original casing words = re.findall(r"\b[a-zA-Z][a-zA-Z0-9]*\b", text) for word in words: word_lower = word.lower() if word_lower not in word_data: word_data[word_lower] = { "word": word_lower, "frequency": 0, "original_forms": set(), "contexts": [], } word_data[word_lower]["frequency"] += 1 word_data[word_lower]["original_forms"].add(word) # Store context (surrounding words) if len(word_data[word_lower]["contexts"]) < 5: # Get 5 words before and after word_index = text.lower().find(word_lower) if word_index != -1: start = max(0, word_index - 50) end = min(len(text), word_index + len(word_lower) + 50) context = text[start:end] word_data[word_lower]["contexts"].append(context) # Filter by minimum frequency word_data = { w: data for w, data in word_data.items() if data["frequency"] >= self.min_frequency } return word_data def _identify_unknown_acronyms(self, word_data: Dict) -> List[Dict]: """Identify potential unknown acronyms""" unknown_acronyms = [] for word, data in word_data.items(): # Check if it's an acronym pattern is_acronym = ( len(word) >= 2 and len(word) <= 6 and any(form.isupper() for form in data["original_forms"]) and word not in self.known_acronyms and data["frequency"] < 1500 and not word.isdigit() ) if is_acronym: unknown_acronyms.append( { "acronym": word.upper(), "frequency": data["frequency"], "contexts": data["contexts"][:3], "confidence": "high" if data["frequency"] >= 10 else "medium", } ) # Sort by frequency unknown_acronyms.sort(key=lambda x: x["frequency"], reverse=True) return unknown_acronyms def _identify_unclear_terms(self, word_data: Dict, df: pd.DataFrame) -> List[Dict]: """Identify unclear terms using semantic coherence""" unclear_terms = [] # Sample words for analysis (focus on medium frequency) candidate_words = [ w for w, data in word_data.items() if 5 <= data["frequency"] <= 200 and len(w) >= 4 and w not in self.known_terms ] if not candidate_words: return unclear_terms self.logger.info(f" Analyzing {len(candidate_words)} candidate words...") # Compute embeddings for candidate words word_embeddings = self.embedding_model.encode( candidate_words, show_progress_bar=True, batch_size=32 ) # Compute embeddings for known terms known_embeddings = self.embedding_model.encode( list(self.known_terms), show_progress_bar=False ) # Calculate semantic coherence (similarity to known terms) similarities = cosine_similarity(word_embeddings, known_embeddings) max_similarities = similarities.max(axis=1) # Identify words with low coherence for i, word in enumerate(candidate_words): coherence = float(max_similarities[i]) if coherence < self.coherence_threshold: unclear_terms.append( { "term": word, "frequency": word_data[word]["frequency"], "coherence_score": coherence, "contexts": word_data[word]["contexts"][:3], "reason": "low_semantic_coherence", } ) # Sort by coherence (lowest first) unclear_terms.sort(key=lambda x: x["coherence_score"]) return unclear_terms[:200] # Top 200 most unclear def _identify_abbreviations(self, word_data: Dict) -> List[Dict]: """Identify potential abbreviations""" abbreviations = [] # Common abbreviation patterns abbrev_patterns = [ (r"^[a-z]{2,4}$", "short_word"), # 2-4 letter words (r"^[a-z]+\.$", "period_ending"), # Words ending in period (r"^[a-z]\d+$", "letter_number"), # Letter + number ] for word, data in word_data.items(): for pattern, pattern_type in abbrev_patterns: if re.match(pattern, word): # Check if it has period in original forms has_period = any("." in form for form in data["original_forms"]) if (has_period or pattern_type == "short_word") and data[ "frequency" ] < 1500: abbreviations.append( { "abbreviation": word, "frequency": data["frequency"], "pattern_type": pattern_type, "contexts": data["contexts"][:2], } ) break # Sort by frequency abbreviations.sort(key=lambda x: x["frequency"], reverse=True) return abbreviations[:100] # Top 100 def _identify_jargon(self, word_data: Dict) -> List[Dict]: """Identify domain-specific jargon""" jargon = [] # Jargon indicators jargon_indicators = { "medical": ["ology", "itis", "ectomy", "oscopy", "therapy"], "legal": ["tion", "ment", "ance", "ence"], "technical": ["tech", "system", "process", "protocol"], } for word, data in word_data.items(): if len(word) < 6: continue # Check for jargon patterns for domain, suffixes in jargon_indicators.items(): if any(word.endswith(suffix) for suffix in suffixes): if word not in self.known_terms: jargon.append( { "term": word, "frequency": data["frequency"], "domain": domain, "contexts": data["contexts"][:2], } ) break # Sort by frequency jargon.sort(key=lambda x: x["frequency"], reverse=True) return jargon[:100] # Top 100 def _save_normalization_analysis(self, results: Dict): """Save normalization analysis results""" # Save JSON json_results = { "method": "semantic_analysis", "statistics": { "unknown_acronyms": len(results["unknown_acronyms"]), "unclear_terms": len(results["unclear_terms"]), "abbreviations": len(results["abbreviations"]), "jargon": len(results["jargon"]), }, "results": results, } self.save_results(json_results, "semantic_normalization_analysis.json") # Save human-readable text text_output = [] text_output.append("SEMANTIC TEXT NORMALIZATION ANALYSIS") text_output.append("=" * 80) text_output.append("") text_output.append( "This analysis identifies terms that may need clarification or expansion." ) text_output.append("") # Unknown acronyms text_output.append("=" * 80) text_output.append("UNKNOWN ACRONYMS (Need Investigation)") text_output.append("=" * 80) text_output.append("") if results["unknown_acronyms"]: text_output.append( f"{'Acronym':<15} {'Frequency':<12} {'Confidence':<12} {'Sample Context'}" ) text_output.append("-" * 80) for item in results["unknown_acronyms"][:20]: context = item["contexts"][0][:50] if item["contexts"] else "N/A" text_output.append( f"{item['acronym']:<15} {item['frequency']:<12} " f"{item['confidence']:<12} {context}..." ) else: text_output.append("No unknown acronyms found.") text_output.append("") # Unclear terms text_output.append("=" * 80) text_output.append("UNCLEAR TERMS (Low Semantic Coherence)") text_output.append("=" * 80) text_output.append("") text_output.append( "These terms have low semantic similarity to known medical/legal terms." ) text_output.append( "They may be typos, slang, or domain-specific terms needing clarification." ) text_output.append("") if results["unclear_terms"]: text_output.append( f"{'Term':<20} {'Frequency':<12} {'Coherence':<12} {'Sample Context'}" ) text_output.append("-" * 80) for item in results["unclear_terms"][:20]: context = item["contexts"][0][:40] if item["contexts"] else "N/A" text_output.append( f"{item['term']:<20} {item['frequency']:<12} " f"{item['coherence_score']:<12.3f} {context}..." ) else: text_output.append("No unclear terms found.") text_output.append("") # Abbreviations text_output.append("=" * 80) text_output.append("ABBREVIATIONS (May Need Expansion)") text_output.append("=" * 80) text_output.append("") if results["abbreviations"]: text_output.append( f"{'Abbreviation':<20} {'Frequency':<12} {'Pattern':<15} {'Context'}" ) text_output.append("-" * 80) for item in results["abbreviations"][:15]: context = item["contexts"][0][:40] if item["contexts"] else "N/A" text_output.append( f"{item['abbreviation']:<20} {item['frequency']:<12} " f"{item['pattern_type']:<15} {context}..." ) else: text_output.append("No abbreviations found.") text_output.append("") # Jargon text_output.append("=" * 80) text_output.append("DOMAIN-SPECIFIC JARGON") text_output.append("=" * 80) text_output.append("") if results["jargon"]: text_output.append(f"{'Term':<25} {'Frequency':<12} {'Domain':<15}") text_output.append("-" * 80) for item in results["jargon"][:15]: text_output.append( f"{item['term']:<25} {item['frequency']:<12} {item['domain']:<15}" ) else: text_output.append("No jargon found.") text_output.append("") text_output.append("=" * 80) text_output.append("RECOMMENDATIONS") text_output.append("=" * 80) text_output.append("") text_output.append( "1. Investigate unknown acronyms - may be critical case-specific terms" ) text_output.append("2. Review unclear terms - may be typos or need context") text_output.append("3. Expand abbreviations in TEXT_EXPANSIONS dictionary") text_output.append("4. Add jargon terms to KEY_TOPICS if relevant to case") filepath = self.output_dir / "semantic_normalization_analysis.txt" with open(filepath, "w") as f: f.write("\n".join(text_output)) self.logger.info(f"\nSaved analysis to: {filepath}") if __name__ == "__main__": import pandas as pd df = pd.read_csv("../_sources/signal_messages.csv") analyzer = SemanticNormalizationAnalyzer(min_frequency=1, coherence_threshold=0.4) results = analyzer.execute(df) print("\nSemantic normalization analysis complete:") print(f" Unknown acronyms: {len(results['unknown_acronyms'])}") print(f" Unclear terms: {len(results['unclear_terms'])}") print(f" Abbreviations: {len(results['abbreviations'])}") print(f" Jargon: {len(results['jargon'])}")