step0b1_semantic_keyword_identification.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. """
  2. Step 0a: Semantic keyword identification using embeddings.
  3. Identifies keywords semantically related to subpoena criteria.
  4. """
  5. from typing import List, Dict, Set, Tuple
  6. from collections import Counter
  7. import pandas as pd
  8. import numpy as np
  9. from sentence_transformers import SentenceTransformer
  10. from sklearn.metrics.pairwise import cosine_similarity
  11. from pipeline.models.base import PipelineStep
  12. from pipeline.common_defs import SUBPOENA_CRITERIA
  13. from pipeline.utils.text_utils import normalize_text
  14. from tqdm import tqdm
  15. class SemanticKeywordIdentifier(PipelineStep):
  16. """
  17. Identify keywords semantically related to subpoena criteria.
  18. Uses embedding similarity rather than frequency.
  19. """
  20. def __init__(
  21. self,
  22. similarity_threshold: float = 0.25,
  23. max_keywords_per_criterion: int = 100,
  24. min_word_length: int = 3,
  25. output_dir: str = "./pipeline_output",
  26. ):
  27. super().__init__(output_dir)
  28. self.similarity_threshold = similarity_threshold
  29. self.max_keywords_per_criterion = max_keywords_per_criterion
  30. self.min_word_length = min_word_length
  31. self.logger.info("Loading embedding model: all-mpnet-base-v2...")
  32. self.embedding_model = SentenceTransformer("all-mpnet-base-v2")
  33. def _load_embedding_model(self):
  34. """Load sentence transformer model"""
  35. return
  36. # if self.embedding_model is None:
  37. # self.logger.info("Loading embedding model: all-mpnet-base-v2...")
  38. # self.embedding_model = SentenceTransformer("all-mpnet-base-v2")
  39. def execute(self, df: pd.DataFrame) -> Dict[str, List[Dict]]:
  40. """Identify keywords semantically related to subpoena criteria"""
  41. self.logger.info("SEMANTIC KEYWORD IDENTIFICATION")
  42. self.logger.info(f"Analyzing {len(df):,} messages")
  43. self._load_embedding_model()
  44. # Extract unique words
  45. unique_words = self._extract_unique_words(df)
  46. self.logger.info(f"Found {len(unique_words):,} unique words")
  47. suspicious = [
  48. w for w in unique_words if w.startswith("medical") and len(w) > 10
  49. ]
  50. if suspicious:
  51. self.logger.error(f"SUSPICIOUS WORDS IN EXTRACTION: {suspicious}")
  52. self.logger.info(f"Found {len(unique_words):,} unique words")
  53. # Create criteria descriptions
  54. criteria_descriptions = self._create_criteria_descriptions()
  55. # Compute embeddings
  56. word_embeddings = self._compute_word_embeddings(unique_words)
  57. criteria_embeddings = self._compute_criteria_embeddings(criteria_descriptions)
  58. # Find similar keywords
  59. keywords_by_criterion = self._find_similar_keywords(
  60. unique_words, word_embeddings, criteria_descriptions, criteria_embeddings
  61. )
  62. # Add frequency info
  63. word_freq = self._compute_word_frequencies(df)
  64. keywords_by_criterion = self._add_frequency_info(keywords_by_criterion, word_freq)
  65. # Save results
  66. self._save_semantic_keywords(keywords_by_criterion, criteria_descriptions)
  67. return keywords_by_criterion
  68. # Alternative: If you need progress tracking, use this hybrid approach
  69. def _extract_unique_words(self, df: pd.DataFrame) -> List[str]:
  70. """
  71. Extract unique words - optimized but with progress bar
  72. Good balance between speed and user feedback
  73. """
  74. self.logger.info("extracting unique words")
  75. words = set()
  76. min_len = self.min_word_length
  77. # Process in chunks for progress tracking
  78. chunk_size = 10000
  79. total_chunks = (len(df) + chunk_size - 1) // chunk_size
  80. with tqdm(total=total_chunks, desc="Processing chunks") as pbar:
  81. for i in range(0, len(df), chunk_size):
  82. chunk = df["message"].iloc[i : i + chunk_size]
  83. # Vectorized processing within chunk
  84. normalized = chunk.fillna("").astype(str).apply(normalize_text)
  85. tokens = normalized.str.split().explode()
  86. chunk_words = tokens[
  87. (tokens.str.len() >= min_len) & (tokens.str.isalpha())
  88. ].unique()
  89. words.update(chunk_words)
  90. pbar.update(1)
  91. self.logger.info(f"Found {len(words):,} unique words")
  92. return sorted(list(words))
  93. def _create_criteria_descriptions(self) -> Dict[int, str]:
  94. """Create detailed descriptions for each criterion"""
  95. return SUBPOENA_CRITERIA
  96. def _compute_word_embeddings(self, words: List[str]) -> np.ndarray:
  97. """Compute embeddings for words"""
  98. self.logger.info(f"Computing embeddings for {len(words):,} words...")
  99. return self.embedding_model.encode(words, show_progress_bar=True, batch_size=32)
  100. def _compute_criteria_embeddings(self, criteria_descriptions: Dict[int, str]) -> Dict[int, np.ndarray]:
  101. """Compute embeddings for criteria"""
  102. embeddings = {}
  103. for num, desc in criteria_descriptions.items():
  104. embeddings[num] = self.embedding_model.encode([desc])[0]
  105. return embeddings
  106. def _find_similar_keywords(self, words, word_embeddings, criteria_descriptions, criteria_embeddings):
  107. """Find keywords similar to each criterion"""
  108. keywords_by_criterion = {}
  109. for num, emb in criteria_embeddings.items():
  110. similarities = cosine_similarity(word_embeddings, emb.reshape(1, -1)).flatten()
  111. similar_indices = np.where(similarities >= self.similarity_threshold)[0]
  112. similar_indices = similar_indices[np.argsort(-similarities[similar_indices])]
  113. similar_indices = similar_indices[:self.max_keywords_per_criterion]
  114. keywords_by_criterion[num] = [
  115. {"word": words[idx], "similarity": float(similarities[idx]), "frequency": 0}
  116. for idx in similar_indices
  117. ]
  118. self.logger.info(f"Criterion {num}: {len(keywords_by_criterion[num])} keywords")
  119. return keywords_by_criterion
  120. def _compute_word_frequencies(self, df: pd.DataFrame) -> Counter:
  121. """Compute word frequencies"""
  122. word_freq = Counter()
  123. for message in df["message"].fillna(""):
  124. normalized = normalize_text(str(message))
  125. tokens = [t for t in normalized.split() if len(t) >= self.min_word_length and t.isalpha()]
  126. word_freq.update(tokens)
  127. return word_freq
  128. def _add_frequency_info(self, keywords_by_criterion, word_freq):
  129. """Add frequency information"""
  130. for keywords in keywords_by_criterion.values():
  131. for kw in keywords:
  132. kw["frequency"] = word_freq.get(kw["word"], 0)
  133. return keywords_by_criterion
  134. def _save_semantic_keywords(self, keywords_by_criterion, criteria_descriptions):
  135. """Save results"""
  136. results = {
  137. "method": "semantic_similarity",
  138. "criteria": {str(n): {"keywords": k} for n, k in keywords_by_criterion.items()}
  139. }
  140. self.save_results(results, "semantic_keywords.json")
  141. self.logger.info("Saved semantic keywords")