step01a_llm_normatlization.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. """
  2. Step 0b (Alternative): LLM-based text normalization analysis.
  3. Uses deployed LLM to identify unclear terms and unknown acronyms.
  4. """
  5. from typing import List, Dict
  6. import pandas as pd
  7. import json
  8. import requests
  9. from collections import Counter
  10. import re
  11. from pipeline.models.base import PipelineStep
  12. class LLMNormalizationAnalyzer(PipelineStep):
  13. """
  14. Use LLM to analyze text and identify unclear terms and unknown acronyms.
  15. """
  16. def __init__(
  17. self,
  18. llm_url: str = "http://localhost:8000",
  19. sample_size: int = 500,
  20. output_dir: str = "./pipeline_output",
  21. model: str = "",
  22. ):
  23. super().__init__(output_dir)
  24. self.llm_url = llm_url
  25. self.sample_size = sample_size
  26. self.model = model
  27. def execute(self, df: pd.DataFrame) -> Dict[str, List[Dict]]:
  28. """
  29. Use LLM to identify unclear terms and unknown acronyms.
  30. Args:
  31. df: DataFrame with messages
  32. Returns:
  33. Dictionary with identified terms and acronyms
  34. """
  35. self.logger.info("=" * 80)
  36. self.logger.info("LLM-BASED TEXT NORMALIZATION ANALYSIS")
  37. self.logger.info("=" * 80)
  38. self.logger.info(f"Using LLM at: {self.llm_url}")
  39. # Extract frequent words and acronyms
  40. word_freq, acronym_freq = self._extract_terms(df)
  41. # Sample messages for LLM analysis
  42. sample_df = df.sample(n=min(self.sample_size, len(df)), random_state=42)
  43. all_unknown_acronyms = []
  44. all_unclear_terms = []
  45. all_expansions = []
  46. for i in range(0, len(df), 100):
  47. chunk = df.iloc[i : i + 100]
  48. messages_sample = "\n".join(chunk["message"].fillna("").tolist())
  49. # Analyze with LLM
  50. self.logger.info("\\nAnalyzing with LLM...")
  51. # Get unknown acronyms
  52. unknown_acronyms = self._identify_acronyms_with_llm(
  53. messages_sample, list(acronym_freq.keys())[:50]
  54. )
  55. all_unknown_acronyms = list(set(all_unknown_acronyms + unknown_acronyms))
  56. # Get unclear terms
  57. unclear_terms = self._identify_unclear_terms_with_llm(
  58. messages_sample, list(word_freq.keys())[:100]
  59. )
  60. all_unclear_terms = list(set(all_unclear_terms + unclear_terms))
  61. # Get expansion suggestions
  62. expansions = self._get_expansion_suggestions_with_llm(
  63. messages_sample, unknown_acronyms
  64. )
  65. all_expansions = list(set(all_expansions + expansions))
  66. results = {
  67. "unknown_acronyms": all_unknown_acronyms,
  68. "unclear_terms": all_unclear_terms,
  69. "suggested_expansions": all_expansions,
  70. }
  71. self._save_llm_analysis(results)
  72. return results
  73. def _extract_terms(self, df: pd.DataFrame) -> tuple:
  74. """Extract words and potential acronyms"""
  75. word_freq = Counter()
  76. acronym_freq = Counter()
  77. for message in df["message"].fillna(""):
  78. text = str(message)
  79. # Extract words
  80. words = re.findall(r"\\b[a-z]+\\b", text.lower())
  81. word_freq.update(words)
  82. # Extract potential acronyms (2-6 uppercase letters)
  83. acronyms = re.findall(r"\\b[A-Z]{2,6}\\b", text)
  84. acronym_freq.update([a.lower() for a in acronyms])
  85. return word_freq, acronym_freq
  86. def _identify_acronyms_with_llm(
  87. self, messages_sample: str, acronym_candidates: List[str]
  88. ) -> List[Dict]:
  89. """Use LLM to identify unknown acronyms"""
  90. prompt = f"""You are analyzing messages.
  91. ACRONYMS FOUND: {', '.join(acronym_candidates[:30])}
  92. SAMPLE MESSAGES:
  93. {messages_sample[:2000]}
  94. Task: Identify which acronyms are UNKNOWN or UNCLEAR (not standard medical/legal acronyms).
  95. For each unknown acronym, try to infer its meaning from context.
  96. Respond with JSON:
  97. {{
  98. "unknown_acronyms": [
  99. {{"acronym": "XYZ", "possible_meaning": "...", "confidence": "high/medium/low"}},
  100. ...
  101. ]
  102. }}"""
  103. try:
  104. response = requests.post(
  105. f"{self.llm_url}/v1/chat/completions",
  106. json={"prompt": prompt, "max_tokens": 1000, "temperature": 0.3},
  107. timeout=120,
  108. )
  109. if response.status_code == 200:
  110. text = response.json()["choices"][0]["text"]
  111. parsed = json.loads(text)
  112. return parsed.get("unknown_acronyms", [])
  113. except Exception as e:
  114. self.logger.error(f"LLM error: {e}")
  115. return []
  116. def _identify_unclear_terms_with_llm(
  117. self, messages_sample: str, word_candidates: List[str]
  118. ) -> List[Dict]:
  119. """Use LLM to identify unclear terms"""
  120. prompt = f"""You are analyzing messages.
  121. FREQUENT WORDS: {', '.join(word_candidates[:50])}
  122. SAMPLE MESSAGES:
  123. {messages_sample[:2000]}
  124. Task: Identify words that are UNCLEAR, AMBIGUOUS, or may be TYPOS/SLANG.
  125. Focus on words that:
  126. - Have unclear meaning in context
  127. - May be misspellings
  128. - Are slang or informal terms
  129. - Need clarification for legal purposes
  130. Respond with JSON:
  131. {{
  132. "unclear_terms": [
  133. {{"term": "word", "reason": "...", "suggested_clarification": "..."}},
  134. ...
  135. ]
  136. }}"""
  137. try:
  138. response = requests.post(
  139. f"{self.llm_url}/v1/chat/completions",
  140. json={"prompt": prompt, "max_tokens": 1000, "temperature": 0.3},
  141. timeout=120,
  142. )
  143. if response.status_code == 200:
  144. text = response.json()["choices"][0]["text"]
  145. parsed = json.loads(text)
  146. return parsed.get("unclear_terms", [])
  147. except Exception as e:
  148. self.logger.error(f"LLM error: {e}")
  149. return []
  150. def _get_expansion_suggestions_with_llm(
  151. self, messages_sample: str, acronyms: List[Dict]
  152. ) -> List[Dict]:
  153. """Get expansion suggestions for acronyms"""
  154. if not acronyms:
  155. return []
  156. acronym_list = ", ".join([a["acronym"] for a in acronyms[:10]])
  157. prompt = f"""Based on these medical/legal messages, suggest expansions for these acronyms:
  158. ACRONYMS: {acronym_list}
  159. SAMPLE MESSAGES:
  160. {messages_sample[:2000]}
  161. Respond with JSON:
  162. {{
  163. "expansions": [
  164. {{"acronym": "ABC", "expansion": "full form", "confidence": "high/medium/low"}},
  165. ...
  166. ]
  167. }}"""
  168. try:
  169. response = requests.post(
  170. f"{self.llm_url}/v1/chat/completions",
  171. json={"prompt": prompt, "max_tokens": 800, "temperature": 0.3},
  172. timeout=120,
  173. )
  174. if response.status_code == 200:
  175. text = response.json()["choices"][0]["text"]
  176. parsed = json.loads(text)
  177. return parsed.get("expansions", [])
  178. except Exception as e:
  179. self.logger.error(f"LLM error: {e}")
  180. return []
  181. def _save_llm_analysis(self, results: Dict):
  182. """Save LLM analysis results"""
  183. self.save_results(results, "llm_normalization_analysis.json")
  184. # Save text
  185. text_output = []
  186. text_output.append("LLM-BASED TEXT NORMALIZATION ANALYSIS")
  187. text_output.append("=" * 80)
  188. text_output.append("")
  189. text_output.append("UNKNOWN ACRONYMS:")
  190. text_output.append("-" * 80)
  191. for item in results["unknown_acronyms"]:
  192. text_output.append(
  193. f" {item['acronym']}: {item.get('possible_meaning', 'Unknown')}"
  194. )
  195. text_output.append("")
  196. text_output.append("UNCLEAR TERMS:")
  197. text_output.append("-" * 80)
  198. for item in results["unclear_terms"]:
  199. text_output.append(f" {item['term']}: {item.get('reason', 'Unclear')}")
  200. text_output.append("")
  201. text_output.append("SUGGESTED EXPANSIONS:")
  202. text_output.append("-" * 80)
  203. for item in results["suggested_expansions"]:
  204. text_output.append(f" {item['acronym']} -> {item['expansion']}")
  205. filepath = self.output_dir / "llm_normalization_analysis.txt"
  206. with open(filepath, "w") as f:
  207. f.write("\\n".join(text_output))
  208. self.logger.info(f"Saved analysis to: {filepath}")
  209. if __name__ == "__main__":
  210. import pandas as pd
  211. df = pd.read_csv("../_sources/signal_messages.csv")
  212. analyzer = LLMNormalizationAnalyzer(
  213. llm_url="http://localhost:8000", sample_size=500
  214. )
  215. results = analyzer.execute(df)
  216. print(f"\\nFound {len(results['unknown_acronyms'])} unknown acronyms")
  217. print(f"Found {len(results['unclear_terms'])} unclear terms")