| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475 |
- """
- Step 0b: Semantic text normalization analysis using embeddings and LLM.
- Identifies unclear terms, unknown acronyms, and ambiguous words.
- """
- from typing import List, Dict, Set, Tuple
- from collections import Counter
- import pandas as pd
- import numpy as np
- import re
- from sentence_transformers import SentenceTransformer
- from sklearn.metrics.pairwise import cosine_similarity
- from pipeline.models.base import PipelineStep
- from pipeline.utils.text_utils import normalize_text
- class SemanticNormalizationAnalyzer(PipelineStep):
- """
- Analyze text using semantic methods to identify:
- 1. Unclear/ambiguous terms (low semantic coherence)
- 2. Unknown acronyms (uppercase patterns not in dictionary)
- 3. Domain-specific jargon
- 4. Abbreviations needing expansion
- """
- def __init__(
- self,
- min_frequency: int = 3,
- coherence_threshold: float = 0.4,
- output_dir: str = "./pipeline_output",
- ):
- super().__init__(output_dir)
- self.min_frequency = min_frequency
- self.coherence_threshold = coherence_threshold
- self.logger.info("Loading embedding model: all-mpnet-base-v2...")
- self.embedding_model = SentenceTransformer("all-mpnet-base-v2")
- # Known medical/legal terms (high coherence expected)
- self.known_terms = {
- "doctor",
- "hospital",
- "treatment",
- "patient",
- "medical",
- "surgery",
- "appointment",
- "medication",
- "diagnosis",
- "procedure",
- "discrimination",
- "complaint",
- "lawsuit",
- "legal",
- "attorney",
- }
- # Known acronyms (to exclude from unknown list)
- self.known_acronyms = {
- "msk",
- "er",
- "icu",
- "ob",
- "gyn",
- "pcp",
- "np",
- "pa",
- "rn",
- "emr",
- "ehr",
- "hipaa",
- "lgbtq",
- "lgbt",
- "usa",
- "nyc",
- }
- def execute(self, df: pd.DataFrame) -> Dict[str, List[Dict]]:
- """
- Analyze text to identify unclear terms and unknown acronyms.
- Args:
- df: DataFrame with messages
- Returns:
- Dictionary with unclear terms, unknown acronyms, and suggestions
- """
- self.logger.info("=" * 80)
- self.logger.info("SEMANTIC TEXT NORMALIZATION ANALYSIS")
- self.logger.info("=" * 80)
- self.logger.info(f"Analyzing {len(df):,} messages")
- # Extract words with metadata
- self.logger.info("\nExtracting words and computing frequencies...")
- word_data = self._extract_word_data(df)
- self.logger.info(f"Found {len(word_data):,} unique words")
- # Identify unknown acronyms
- self.logger.info("\nIdentifying unknown acronyms...")
- unknown_acronyms = self._identify_unknown_acronyms(word_data)
- self.logger.info(f"Found {len(unknown_acronyms)} unknown acronyms")
- # Identify unclear terms using semantic coherence
- self.logger.info("\nAnalyzing semantic coherence for unclear terms...")
- unclear_terms = self._identify_unclear_terms(word_data, df)
- self.logger.info(f"Found {len(unclear_terms)} unclear terms")
- # Identify abbreviations
- self.logger.info("\nIdentifying abbreviations...")
- abbreviations = self._identify_abbreviations(word_data)
- self.logger.info(f"Found {len(abbreviations)} abbreviations")
- # Identify domain-specific jargon
- self.logger.info("\nIdentifying domain-specific jargon...")
- jargon = self._identify_jargon(word_data)
- self.logger.info(f"Found {len(jargon)} jargon terms")
- # Compile results
- results = {
- "unknown_acronyms": unknown_acronyms,
- "unclear_terms": unclear_terms,
- "abbreviations": abbreviations,
- "jargon": jargon,
- }
- # Save results
- self._save_normalization_analysis(results)
- return results
- def _extract_word_data(self, df: pd.DataFrame) -> Dict[str, Dict]:
- """Extract words with frequency and context"""
- word_data = {}
- for message in df["message"].fillna(""):
- text = str(message)
- # Extract words with original casing
- words = re.findall(r"\b[a-zA-Z][a-zA-Z0-9]*\b", text)
- for word in words:
- word_lower = word.lower()
- if word_lower not in word_data:
- word_data[word_lower] = {
- "word": word_lower,
- "frequency": 0,
- "original_forms": set(),
- "contexts": [],
- }
- word_data[word_lower]["frequency"] += 1
- word_data[word_lower]["original_forms"].add(word)
- # Store context (surrounding words)
- if len(word_data[word_lower]["contexts"]) < 5:
- # Get 5 words before and after
- word_index = text.lower().find(word_lower)
- if word_index != -1:
- start = max(0, word_index - 50)
- end = min(len(text), word_index + len(word_lower) + 50)
- context = text[start:end]
- word_data[word_lower]["contexts"].append(context)
- # Filter by minimum frequency
- word_data = {
- w: data
- for w, data in word_data.items()
- if data["frequency"] >= self.min_frequency
- }
- return word_data
- def _identify_unknown_acronyms(self, word_data: Dict) -> List[Dict]:
- """Identify potential unknown acronyms"""
- unknown_acronyms = []
- for word, data in word_data.items():
- # Check if it's an acronym pattern
- is_acronym = (
- len(word) >= 2
- and len(word) <= 6
- and any(form.isupper() for form in data["original_forms"])
- and word not in self.known_acronyms
- and data["frequency"] < 1500
- and not word.isdigit()
- )
- if is_acronym:
- unknown_acronyms.append(
- {
- "acronym": word.upper(),
- "frequency": data["frequency"],
- "contexts": data["contexts"][:3],
- "confidence": "high" if data["frequency"] >= 10 else "medium",
- }
- )
- # Sort by frequency
- unknown_acronyms.sort(key=lambda x: x["frequency"], reverse=True)
- return unknown_acronyms
- def _identify_unclear_terms(self, word_data: Dict, df: pd.DataFrame) -> List[Dict]:
- """Identify unclear terms using semantic coherence"""
- unclear_terms = []
- # Sample words for analysis (focus on medium frequency)
- candidate_words = [
- w
- for w, data in word_data.items()
- if 5 <= data["frequency"] <= 200
- and len(w) >= 4
- and w not in self.known_terms
- ]
- if not candidate_words:
- return unclear_terms
- self.logger.info(f" Analyzing {len(candidate_words)} candidate words...")
- # Compute embeddings for candidate words
- word_embeddings = self.embedding_model.encode(
- candidate_words, show_progress_bar=True, batch_size=32
- )
- # Compute embeddings for known terms
- known_embeddings = self.embedding_model.encode(
- list(self.known_terms), show_progress_bar=False
- )
- # Calculate semantic coherence (similarity to known terms)
- similarities = cosine_similarity(word_embeddings, known_embeddings)
- max_similarities = similarities.max(axis=1)
- # Identify words with low coherence
- for i, word in enumerate(candidate_words):
- coherence = float(max_similarities[i])
- if coherence < self.coherence_threshold:
- unclear_terms.append(
- {
- "term": word,
- "frequency": word_data[word]["frequency"],
- "coherence_score": coherence,
- "contexts": word_data[word]["contexts"][:3],
- "reason": "low_semantic_coherence",
- }
- )
- # Sort by coherence (lowest first)
- unclear_terms.sort(key=lambda x: x["coherence_score"])
- return unclear_terms[:200] # Top 200 most unclear
- def _identify_abbreviations(self, word_data: Dict) -> List[Dict]:
- """Identify potential abbreviations"""
- abbreviations = []
- # Common abbreviation patterns
- abbrev_patterns = [
- (r"^[a-z]{2,4}$", "short_word"), # 2-4 letter words
- (r"^[a-z]+\.$", "period_ending"), # Words ending in period
- (r"^[a-z]\d+$", "letter_number"), # Letter + number
- ]
- for word, data in word_data.items():
- for pattern, pattern_type in abbrev_patterns:
- if re.match(pattern, word):
- # Check if it has period in original forms
- has_period = any("." in form for form in data["original_forms"])
- if (has_period or pattern_type == "short_word") and data[
- "frequency"
- ] < 1500:
- abbreviations.append(
- {
- "abbreviation": word,
- "frequency": data["frequency"],
- "pattern_type": pattern_type,
- "contexts": data["contexts"][:2],
- }
- )
- break
- # Sort by frequency
- abbreviations.sort(key=lambda x: x["frequency"], reverse=True)
- return abbreviations[:100] # Top 100
- def _identify_jargon(self, word_data: Dict) -> List[Dict]:
- """Identify domain-specific jargon"""
- jargon = []
- # Jargon indicators
- jargon_indicators = {
- "medical": ["ology", "itis", "ectomy", "oscopy", "therapy"],
- "legal": ["tion", "ment", "ance", "ence"],
- "technical": ["tech", "system", "process", "protocol"],
- }
- for word, data in word_data.items():
- if len(word) < 6:
- continue
- # Check for jargon patterns
- for domain, suffixes in jargon_indicators.items():
- if any(word.endswith(suffix) for suffix in suffixes):
- if word not in self.known_terms:
- jargon.append(
- {
- "term": word,
- "frequency": data["frequency"],
- "domain": domain,
- "contexts": data["contexts"][:2],
- }
- )
- break
- # Sort by frequency
- jargon.sort(key=lambda x: x["frequency"], reverse=True)
- return jargon[:100] # Top 100
- def _save_normalization_analysis(self, results: Dict):
- """Save normalization analysis results"""
- # Save JSON
- json_results = {
- "method": "semantic_analysis",
- "statistics": {
- "unknown_acronyms": len(results["unknown_acronyms"]),
- "unclear_terms": len(results["unclear_terms"]),
- "abbreviations": len(results["abbreviations"]),
- "jargon": len(results["jargon"]),
- },
- "results": results,
- }
- self.save_results(json_results, "semantic_normalization_analysis.json")
- # Save human-readable text
- text_output = []
- text_output.append("SEMANTIC TEXT NORMALIZATION ANALYSIS")
- text_output.append("=" * 80)
- text_output.append("")
- text_output.append(
- "This analysis identifies terms that may need clarification or expansion."
- )
- text_output.append("")
- # Unknown acronyms
- text_output.append("=" * 80)
- text_output.append("UNKNOWN ACRONYMS (Need Investigation)")
- text_output.append("=" * 80)
- text_output.append("")
- if results["unknown_acronyms"]:
- text_output.append(
- f"{'Acronym':<15} {'Frequency':<12} {'Confidence':<12} {'Sample Context'}"
- )
- text_output.append("-" * 80)
- for item in results["unknown_acronyms"][:20]:
- context = item["contexts"][0][:50] if item["contexts"] else "N/A"
- text_output.append(
- f"{item['acronym']:<15} {item['frequency']:<12} "
- f"{item['confidence']:<12} {context}..."
- )
- else:
- text_output.append("No unknown acronyms found.")
- text_output.append("")
- # Unclear terms
- text_output.append("=" * 80)
- text_output.append("UNCLEAR TERMS (Low Semantic Coherence)")
- text_output.append("=" * 80)
- text_output.append("")
- text_output.append(
- "These terms have low semantic similarity to known medical/legal terms."
- )
- text_output.append(
- "They may be typos, slang, or domain-specific terms needing clarification."
- )
- text_output.append("")
- if results["unclear_terms"]:
- text_output.append(
- f"{'Term':<20} {'Frequency':<12} {'Coherence':<12} {'Sample Context'}"
- )
- text_output.append("-" * 80)
- for item in results["unclear_terms"][:20]:
- context = item["contexts"][0][:40] if item["contexts"] else "N/A"
- text_output.append(
- f"{item['term']:<20} {item['frequency']:<12} "
- f"{item['coherence_score']:<12.3f} {context}..."
- )
- else:
- text_output.append("No unclear terms found.")
- text_output.append("")
- # Abbreviations
- text_output.append("=" * 80)
- text_output.append("ABBREVIATIONS (May Need Expansion)")
- text_output.append("=" * 80)
- text_output.append("")
- if results["abbreviations"]:
- text_output.append(
- f"{'Abbreviation':<20} {'Frequency':<12} {'Pattern':<15} {'Context'}"
- )
- text_output.append("-" * 80)
- for item in results["abbreviations"][:15]:
- context = item["contexts"][0][:40] if item["contexts"] else "N/A"
- text_output.append(
- f"{item['abbreviation']:<20} {item['frequency']:<12} "
- f"{item['pattern_type']:<15} {context}..."
- )
- else:
- text_output.append("No abbreviations found.")
- text_output.append("")
- # Jargon
- text_output.append("=" * 80)
- text_output.append("DOMAIN-SPECIFIC JARGON")
- text_output.append("=" * 80)
- text_output.append("")
- if results["jargon"]:
- text_output.append(f"{'Term':<25} {'Frequency':<12} {'Domain':<15}")
- text_output.append("-" * 80)
- for item in results["jargon"][:15]:
- text_output.append(
- f"{item['term']:<25} {item['frequency']:<12} {item['domain']:<15}"
- )
- else:
- text_output.append("No jargon found.")
- text_output.append("")
- text_output.append("=" * 80)
- text_output.append("RECOMMENDATIONS")
- text_output.append("=" * 80)
- text_output.append("")
- text_output.append(
- "1. Investigate unknown acronyms - may be critical case-specific terms"
- )
- text_output.append("2. Review unclear terms - may be typos or need context")
- text_output.append("3. Expand abbreviations in TEXT_EXPANSIONS dictionary")
- text_output.append("4. Add jargon terms to KEY_TOPICS if relevant to case")
- filepath = self.output_dir / "semantic_normalization_analysis.txt"
- with open(filepath, "w") as f:
- f.write("\n".join(text_output))
- self.logger.info(f"\nSaved analysis to: {filepath}")
- if __name__ == "__main__":
- import pandas as pd
- df = pd.read_csv("../_sources/signal_messages.csv")
- analyzer = SemanticNormalizationAnalyzer(min_frequency=1, coherence_threshold=0.4)
- results = analyzer.execute(df)
- print("\nSemantic normalization analysis complete:")
- print(f" Unknown acronyms: {len(results['unknown_acronyms'])}")
- print(f" Unclear terms: {len(results['unclear_terms'])}")
- print(f" Abbreviations: {len(results['abbreviations'])}")
- print(f" Jargon: {len(results['jargon'])}")
|