| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- """
- Step 0a: Semantic keyword identification using embeddings.
- Identifies keywords semantically related to subpoena criteria.
- """
- from typing import List, Dict, Set, Tuple
- from collections import Counter
- import pandas as pd
- import numpy as np
- from sentence_transformers import SentenceTransformer
- from sklearn.metrics.pairwise import cosine_similarity
- from pipeline.models.base import PipelineStep
- from pipeline.common_defs import SUBPOENA_CRITERIA
- from pipeline.utils.text_utils import normalize_text
- class SemanticKeywordIdentifier(PipelineStep):
- """
- Identify keywords semantically related to subpoena criteria.
- Uses embedding similarity rather than frequency.
- """
- def __init__(
- self,
- similarity_threshold: float = 0.25,
- max_keywords_per_criterion: int = 80,
- min_word_length: int = 3,
- output_dir: str = "./pipeline_output",
- ):
- super().__init__(output_dir)
- self.similarity_threshold = similarity_threshold
- self.max_keywords_per_criterion = max_keywords_per_criterion
- self.min_word_length = min_word_length
- self.logger.info("Loading embedding model: all-mpnet-base-v2...")
- self.embedding_model = SentenceTransformer("all-mpnet-base-v2")
- def _load_embedding_model(self):
- """Load sentence transformer model"""
- return
- # if self.embedding_model is None:
- # self.logger.info("Loading embedding model: all-mpnet-base-v2...")
- # self.embedding_model = SentenceTransformer("all-mpnet-base-v2")
- def execute(self, df: pd.DataFrame) -> Dict[str, List[Dict]]:
- """Identify keywords semantically related to subpoena criteria"""
- self.logger.info("SEMANTIC KEYWORD IDENTIFICATION")
- self.logger.info(f"Analyzing {len(df):,} messages")
- self._load_embedding_model()
- # Extract unique words
- unique_words = self._extract_unique_words(df)
- self.logger.info(f"Found {len(unique_words):,} unique words")
- suspicious = [
- w for w in unique_words if w.startswith("medical") and len(w) > 10
- ]
- if suspicious:
- self.logger.error(f"SUSPICIOUS WORDS IN EXTRACTION: {suspicious}")
- self.logger.info(f"Found {len(unique_words):,} unique words")
- # Create criteria descriptions
- criteria_descriptions = self._create_criteria_descriptions()
- # Compute embeddings
- word_embeddings = self._compute_word_embeddings(unique_words)
- criteria_embeddings = self._compute_criteria_embeddings(criteria_descriptions)
- # Find similar keywords
- keywords_by_criterion = self._find_similar_keywords(
- unique_words, word_embeddings, criteria_descriptions, criteria_embeddings
- )
- # Add frequency info
- word_freq = self._compute_word_frequencies(df)
- keywords_by_criterion = self._add_frequency_info(keywords_by_criterion, word_freq)
- # Save results
- self._save_semantic_keywords(keywords_by_criterion, criteria_descriptions)
- return keywords_by_criterion
- def _extract_unique_words(self, df: pd.DataFrame) -> List[str]:
- """Extract unique words from messages"""
- words = set()
- for message in df["message"].fillna(""):
- normalized = normalize_text(str(message))
- tokens = [t for t in normalized.split() if len(t) >= self.min_word_length and t.isalpha()]
- words.update(tokens)
- return sorted(list(words))
- def _create_criteria_descriptions(self) -> Dict[int, str]:
- """Create detailed descriptions for each criterion"""
- return SUBPOENA_CRITERIA
- def _compute_word_embeddings(self, words: List[str]) -> np.ndarray:
- """Compute embeddings for words"""
- self.logger.info(f"Computing embeddings for {len(words):,} words...")
- return self.embedding_model.encode(words, show_progress_bar=True, batch_size=32)
- def _compute_criteria_embeddings(self, criteria_descriptions: Dict[int, str]) -> Dict[int, np.ndarray]:
- """Compute embeddings for criteria"""
- embeddings = {}
- for num, desc in criteria_descriptions.items():
- embeddings[num] = self.embedding_model.encode([desc])[0]
- return embeddings
- def _find_similar_keywords(self, words, word_embeddings, criteria_descriptions, criteria_embeddings):
- """Find keywords similar to each criterion"""
- keywords_by_criterion = {}
- for num, emb in criteria_embeddings.items():
- similarities = cosine_similarity(word_embeddings, emb.reshape(1, -1)).flatten()
- similar_indices = np.where(similarities >= self.similarity_threshold)[0]
- similar_indices = similar_indices[np.argsort(-similarities[similar_indices])]
- similar_indices = similar_indices[:self.max_keywords_per_criterion]
- keywords_by_criterion[num] = [
- {"word": words[idx], "similarity": float(similarities[idx]), "frequency": 0}
- for idx in similar_indices
- ]
- self.logger.info(f"Criterion {num}: {len(keywords_by_criterion[num])} keywords")
- return keywords_by_criterion
- def _compute_word_frequencies(self, df: pd.DataFrame) -> Counter:
- """Compute word frequencies"""
- word_freq = Counter()
- for message in df["message"].fillna(""):
- normalized = normalize_text(str(message))
- tokens = [t for t in normalized.split() if len(t) >= self.min_word_length and t.isalpha()]
- word_freq.update(tokens)
- return word_freq
- def _add_frequency_info(self, keywords_by_criterion, word_freq):
- """Add frequency information"""
- for keywords in keywords_by_criterion.values():
- for kw in keywords:
- kw["frequency"] = word_freq.get(kw["word"], 0)
- return keywords_by_criterion
- def _save_semantic_keywords(self, keywords_by_criterion, criteria_descriptions):
- """Save results"""
- results = {
- "method": "semantic_similarity",
- "criteria": {str(n): {"keywords": k} for n, k in keywords_by_criterion.items()}
- }
- self.save_results(results, "semantic_keywords.json")
- self.logger.info("Saved semantic keywords")
|