""" Step 0a: Semantic keyword identification using embeddings. Identifies keywords semantically related to subpoena criteria. """ from typing import List, Dict, Set, Tuple from collections import Counter import pandas as pd import numpy as np from sentence_transformers import SentenceTransformer from sklearn.metrics.pairwise import cosine_similarity from pipeline.models.base import PipelineStep from pipeline.common_defs import SUBPOENA_CRITERIA from pipeline.utils.text_utils import normalize_text class SemanticKeywordIdentifier(PipelineStep): """ Identify keywords semantically related to subpoena criteria. Uses embedding similarity rather than frequency. """ def __init__( self, similarity_threshold: float = 0.25, max_keywords_per_criterion: int = 80, min_word_length: int = 3, output_dir: str = "./pipeline_output", ): super().__init__(output_dir) self.similarity_threshold = similarity_threshold self.max_keywords_per_criterion = max_keywords_per_criterion self.min_word_length = min_word_length self.logger.info("Loading embedding model: all-mpnet-base-v2...") self.embedding_model = SentenceTransformer("all-mpnet-base-v2") def _load_embedding_model(self): """Load sentence transformer model""" return # if self.embedding_model is None: # self.logger.info("Loading embedding model: all-mpnet-base-v2...") # self.embedding_model = SentenceTransformer("all-mpnet-base-v2") def execute(self, df: pd.DataFrame) -> Dict[str, List[Dict]]: """Identify keywords semantically related to subpoena criteria""" self.logger.info("SEMANTIC KEYWORD IDENTIFICATION") self.logger.info(f"Analyzing {len(df):,} messages") self._load_embedding_model() # Extract unique words unique_words = self._extract_unique_words(df) self.logger.info(f"Found {len(unique_words):,} unique words") suspicious = [ w for w in unique_words if w.startswith("medical") and len(w) > 10 ] if suspicious: self.logger.error(f"SUSPICIOUS WORDS IN EXTRACTION: {suspicious}") self.logger.info(f"Found {len(unique_words):,} unique words") # Create criteria descriptions criteria_descriptions = self._create_criteria_descriptions() # Compute embeddings word_embeddings = self._compute_word_embeddings(unique_words) criteria_embeddings = self._compute_criteria_embeddings(criteria_descriptions) # Find similar keywords keywords_by_criterion = self._find_similar_keywords( unique_words, word_embeddings, criteria_descriptions, criteria_embeddings ) # Add frequency info word_freq = self._compute_word_frequencies(df) keywords_by_criterion = self._add_frequency_info(keywords_by_criterion, word_freq) # Save results self._save_semantic_keywords(keywords_by_criterion, criteria_descriptions) return keywords_by_criterion def _extract_unique_words(self, df: pd.DataFrame) -> List[str]: """Extract unique words from messages""" words = set() for message in df["message"].fillna(""): normalized = normalize_text(str(message)) tokens = [t for t in normalized.split() if len(t) >= self.min_word_length and t.isalpha()] words.update(tokens) return sorted(list(words)) def _create_criteria_descriptions(self) -> Dict[int, str]: """Create detailed descriptions for each criterion""" return SUBPOENA_CRITERIA def _compute_word_embeddings(self, words: List[str]) -> np.ndarray: """Compute embeddings for words""" self.logger.info(f"Computing embeddings for {len(words):,} words...") return self.embedding_model.encode(words, show_progress_bar=True, batch_size=32) def _compute_criteria_embeddings(self, criteria_descriptions: Dict[int, str]) -> Dict[int, np.ndarray]: """Compute embeddings for criteria""" embeddings = {} for num, desc in criteria_descriptions.items(): embeddings[num] = self.embedding_model.encode([desc])[0] return embeddings def _find_similar_keywords(self, words, word_embeddings, criteria_descriptions, criteria_embeddings): """Find keywords similar to each criterion""" keywords_by_criterion = {} for num, emb in criteria_embeddings.items(): similarities = cosine_similarity(word_embeddings, emb.reshape(1, -1)).flatten() similar_indices = np.where(similarities >= self.similarity_threshold)[0] similar_indices = similar_indices[np.argsort(-similarities[similar_indices])] similar_indices = similar_indices[:self.max_keywords_per_criterion] keywords_by_criterion[num] = [ {"word": words[idx], "similarity": float(similarities[idx]), "frequency": 0} for idx in similar_indices ] self.logger.info(f"Criterion {num}: {len(keywords_by_criterion[num])} keywords") return keywords_by_criterion def _compute_word_frequencies(self, df: pd.DataFrame) -> Counter: """Compute word frequencies""" word_freq = Counter() for message in df["message"].fillna(""): normalized = normalize_text(str(message)) tokens = [t for t in normalized.split() if len(t) >= self.min_word_length and t.isalpha()] word_freq.update(tokens) return word_freq def _add_frequency_info(self, keywords_by_criterion, word_freq): """Add frequency information""" for keywords in keywords_by_criterion.values(): for kw in keywords: kw["frequency"] = word_freq.get(kw["word"], 0) return keywords_by_criterion def _save_semantic_keywords(self, keywords_by_criterion, criteria_descriptions): """Save results""" results = { "method": "semantic_similarity", "criteria": {str(n): {"keywords": k} for n, k in keywords_by_criterion.items()} } self.save_results(results, "semantic_keywords.json") self.logger.info("Saved semantic keywords")