step0a2_llm_normatlization.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. """
  2. Step 0b (Alternative): LLM-based text normalization analysis.
  3. Uses deployed LLM to identify unclear terms and unknown acronyms.
  4. """
  5. from typing import List, Dict
  6. import pandas as pd
  7. import requests
  8. from collections import Counter
  9. import re
  10. from pipeline.models.base import PipelineStep
  11. from json_repair import loads
  12. class LLMNormalizationAnalyzer(PipelineStep):
  13. """
  14. Use LLM to analyze text and identify unclear terms and unknown acronyms.
  15. """
  16. def __init__(
  17. self,
  18. llm_url: str = "http://localhost:8000",
  19. sample_size: int = 500,
  20. output_dir: str = "./pipeline_output",
  21. model: str = "",
  22. ):
  23. super().__init__(output_dir)
  24. self.llm_url = llm_url
  25. self.sample_size = sample_size
  26. self.model = model
  27. def execute(self, df: pd.DataFrame) -> Dict[str, List[Dict]]:
  28. """
  29. Use LLM to identify unclear terms and unknown acronyms.
  30. Args:
  31. df: DataFrame with messages
  32. Returns:
  33. Dictionary with identified terms and acronyms (deduplicated)
  34. """
  35. self.logger.info("=" * 80)
  36. self.logger.info("LLM-BASED TEXT NORMALIZATION ANALYSIS")
  37. self.logger.info("=" * 80)
  38. self.logger.info(f"Using LLM at: {self.llm_url}")
  39. # Extract frequent words and acronyms
  40. word_freq, acronym_freq = self._extract_terms(df)
  41. # Track results with deduplication
  42. acronym_dict = {} # key: acronym, value: dict with metadata
  43. term_dict = {} # key: term, value: dict with metadata
  44. expansion_dict = {} # key: acronym, value: dict with metadata
  45. sample_df = df.sample(n=min(self.sample_size, len(df)), random_state=42)
  46. # Process in chunks
  47. for i in range(0, len(sample_df), 100):
  48. chunk = sample_df.iloc[i : i + 100]
  49. messages_sample = "\n".join(chunk["message"].fillna("").tolist())
  50. self.logger.info(
  51. f"Analyzing chunk {i//100 + 1} of {(len(sample_df)-1)//100 + 1}..."
  52. )
  53. # Get unknown acronyms
  54. unknown_acronyms = self._identify_acronyms_with_llm(
  55. messages_sample, list(acronym_freq.keys())[:50]
  56. )
  57. for item in unknown_acronyms:
  58. acronym = item.get("acronym", "").lower()
  59. if acronym and acronym not in acronym_dict:
  60. acronym_dict[acronym] = item
  61. # Get unclear terms
  62. unclear_terms = self._identify_unclear_terms_with_llm(
  63. messages_sample, list(word_freq.keys())[:100]
  64. )
  65. for item in unclear_terms:
  66. term = item.get("term", "").lower()
  67. if term and term not in term_dict:
  68. term_dict[term] = item
  69. # Get expansion suggestions (use acronyms found in this chunk)
  70. if unknown_acronyms:
  71. expansions = self._get_expansion_suggestions_with_llm(
  72. messages_sample, unknown_acronyms
  73. )
  74. for item in expansions:
  75. if isinstance(item, dict):
  76. acronym = item.get("acronym", "").lower()
  77. if acronym and acronym not in expansion_dict:
  78. expansion_dict[acronym] = item
  79. # Convert dictionaries back to lists
  80. results = {
  81. "unknown_acronyms": list(acronym_dict.values()),
  82. "unclear_terms": list(term_dict.values()),
  83. "suggested_expansions": list(expansion_dict.values()),
  84. }
  85. self.logger.info(
  86. f"Found {len(results['unknown_acronyms'])} unique unknown acronyms"
  87. )
  88. self.logger.info(f"Found {len(results['unclear_terms'])} unique unclear terms")
  89. self.logger.info(
  90. f"Found {len(results['suggested_expansions'])} unique expansions"
  91. )
  92. self._save_llm_analysis(results)
  93. return results
  94. def _extract_terms(self, df: pd.DataFrame) -> tuple:
  95. """Extract words and potential acronyms"""
  96. word_freq = Counter()
  97. acronym_freq = Counter()
  98. for message in df["message"].fillna(""):
  99. text = str(message)
  100. # Extract words
  101. words = re.findall(r"\b[a-z]+\b", text.lower())
  102. word_freq.update(words)
  103. # Extract potential acronyms (2-6 uppercase letters)
  104. acronyms = re.findall(r"\b[A-Z]{2,6}\b", text)
  105. acronym_freq.update([a.lower() for a in acronyms])
  106. return word_freq, acronym_freq
  107. def _identify_acronyms_with_llm(
  108. self, messages_sample: str, acronym_candidates: List[str]
  109. ) -> List[Dict]:
  110. """Use LLM to identify unknown acronyms"""
  111. self.logger.info("identifying acronyms...")
  112. prompt = f"""You are analyzing messages.
  113. ACRONYMS FOUND: {', '.join(acronym_candidates[:100])}
  114. SAMPLE MESSAGES:
  115. {messages_sample[:2000]}
  116. Task: Identify which acronyms are UNKNOWN or UNCLEAR (not standard medical/legal acronyms).
  117. For each unknown acronym, try to infer its meaning from context.
  118. Respond with JSON:
  119. {{
  120. "unknown_acronyms": [
  121. {{"acronym": "XYZ", "possible_meaning": "...", "confidence": "high/medium/low"}},
  122. ...
  123. ]
  124. }}"""
  125. try:
  126. # print(
  127. # json.dumps(
  128. # {
  129. # "model": self.model,
  130. # "messages": [{"role": "user", "content": prompt}],
  131. # "max_tokens": 2048,
  132. # "temperature": 0.3,
  133. # },
  134. # indent=2,
  135. # )
  136. # )
  137. response = requests.post(
  138. f"{self.llm_url}/v1/chat/completions",
  139. json={
  140. "model": self.model,
  141. "messages": [{"role": "user", "content": prompt}],
  142. "max_tokens": 2048,
  143. "temperature": 0.3,
  144. },
  145. timeout=120,
  146. )
  147. if response.status_code == 200:
  148. text = response.json()["choices"][0]["message"]["content"]
  149. parsed = loads(text)
  150. if isinstance(parsed, dict):
  151. return parsed.get("unknown_acronyms", [])
  152. else:
  153. raise RuntimeError("LLM Error")
  154. except Exception as e:
  155. # raise e
  156. self.logger.error(f"LLM error: {e}")
  157. return []
  158. def _identify_unclear_terms_with_llm(
  159. self, messages_sample: str, word_candidates: List[str]
  160. ) -> List[Dict]:
  161. """Use LLM to identify unclear terms"""
  162. self.logger.info("identifying unclear terms...")
  163. prompt = f"""You are analyzing messages.
  164. FREQUENT WORDS: {', '.join(word_candidates[:100])}
  165. SAMPLE MESSAGES:
  166. {messages_sample[:2000]}
  167. Task: Identify words that are UNCLEAR, AMBIGUOUS, or may be TYPOS/SLANG.
  168. Focus on words that:
  169. - Have unclear meaning in context
  170. - May be misspellings
  171. - Are slang or informal terms
  172. - Need clarification for legal purposes
  173. Respond with JSON:
  174. {{
  175. "unclear_terms": [
  176. {{"term": "word", "reason": "...", "suggested_clarification": "..."}},
  177. ...
  178. ]
  179. }}"""
  180. try:
  181. response = requests.post(
  182. f"{self.llm_url}/v1/chat/completions",
  183. json={
  184. "model": self.model,
  185. "messages": [{"role": "user", "content": prompt}],
  186. "max_tokens": 2048,
  187. "temperature": 0.3,
  188. },
  189. timeout=120,
  190. )
  191. if response.status_code == 200:
  192. text = response.json()["choices"][0]["message"]["content"]
  193. parsed = loads(text)
  194. if isinstance(parsed, dict):
  195. return parsed.get("unclear_terms", [])
  196. else:
  197. raise RuntimeError("LLM Error")
  198. except Exception as e:
  199. # raise e
  200. self.logger.error(f"LLM error: {e}")
  201. return []
  202. def _get_expansion_suggestions_with_llm(
  203. self, messages_sample: str, acronyms: List[Dict]
  204. ) -> List[Dict]:
  205. """Get expansion suggestions for acronyms"""
  206. self.logger.info("getting expansion suggestions...")
  207. if not acronyms:
  208. return []
  209. acronym_list = ", ".join([a["acronym"] for a in acronyms[:100]])
  210. prompt = f"""Based on these medical/legal messages, suggest expansions for these acronyms:
  211. ACRONYMS: {acronym_list}
  212. SAMPLE MESSAGES:
  213. {messages_sample[:2000]}
  214. Respond with JSON:
  215. {{
  216. "expansions": [
  217. {{"acronym": "ABC", "expansion": "full form", "confidence": "high/medium/low"}},
  218. ...
  219. ]
  220. }}"""
  221. try:
  222. # print(
  223. # json.dumps(
  224. # {
  225. # "model": self.model,
  226. # "messages": [{"role": "user", "content": prompt}],
  227. # "max_tokens": 2048,
  228. # "temperature": 0.3,
  229. # },
  230. # indent=2,
  231. # )
  232. # )
  233. response = requests.post(
  234. f"{self.llm_url}/v1/chat/completions",
  235. json={
  236. "model": self.model,
  237. "messages": [{"role": "user", "content": prompt}],
  238. "max_tokens": 2048,
  239. "temperature": 0.3,
  240. },
  241. timeout=120,
  242. )
  243. if response.status_code == 200:
  244. text = response.json()["choices"][0]["message"]["content"]
  245. parsed = loads(text)
  246. if isinstance(parsed, dict):
  247. return parsed.get("expansions", [])
  248. else:
  249. raise RuntimeError("LLM Error")
  250. except Exception as e:
  251. # raise e
  252. self.logger.error(f"LLM error: {e}")
  253. return []
  254. def _save_llm_analysis(self, results: Dict):
  255. """Save LLM analysis results"""
  256. self.save_results(results, "llm_normalization_analysis.json")
  257. # Save text
  258. text_output = []
  259. text_output.append("LLM-BASED TEXT NORMALIZATION ANALYSIS")
  260. text_output.append("=" * 80)
  261. text_output.append("")
  262. text_output.append("UNKNOWN ACRONYMS:")
  263. text_output.append("-" * 80)
  264. for item in results["unknown_acronyms"]:
  265. text_output.append(
  266. f" {item['acronym']}: {item.get('possible_meaning', 'Unknown')}"
  267. )
  268. text_output.append("")
  269. text_output.append("UNCLEAR TERMS:")
  270. text_output.append("-" * 80)
  271. for item in results["unclear_terms"]:
  272. text_output.append(f" {item['term']}: {item.get('reason', 'Unclear')}")
  273. text_output.append("")
  274. text_output.append("SUGGESTED EXPANSIONS:")
  275. text_output.append("-" * 80)
  276. for item in results["suggested_expansions"]:
  277. text_output.append(f" {item['acronym']} -> {item['expansion']}")
  278. filepath = self.output_dir / "llm_normalization_analysis.txt"
  279. with open(filepath, "w") as f:
  280. f.write("\n".join(text_output))
  281. self.logger.info(f"Saved analysis to: {filepath}")
  282. if __name__ == "__main__":
  283. import pandas as pd
  284. df = pd.read_csv("../_sources/signal_messages.csv")
  285. analyzer = LLMNormalizationAnalyzer(
  286. llm_url="http://eos.dgtlu.net:11434",
  287. sample_size=14000,
  288. model="hf.co/bartowski/Qwen2.5-14B-Instruct-GGUF:Q4_K_S",
  289. )
  290. results = analyzer.execute(df)
  291. print(f"Found {len(results['unknown_acronyms'])} unknown acronyms")
  292. print(f"Found {len(results['unclear_terms'])} unclear terms")