| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357 |
- """
- Step 0b (Alternative): LLM-based text normalization analysis.
- Uses deployed LLM to identify unclear terms and unknown acronyms.
- """
- from typing import List, Dict
- import pandas as pd
- import requests
- from collections import Counter
- import re
- from pipeline.models.base import PipelineStep
- from json_repair import loads
- class LLMNormalizationAnalyzer(PipelineStep):
- """
- Use LLM to analyze text and identify unclear terms and unknown acronyms.
- """
- def __init__(
- self,
- llm_url: str = "http://localhost:8000",
- sample_size: int = 500,
- output_dir: str = "./pipeline_output",
- model: str = "",
- ):
- super().__init__(output_dir)
- self.llm_url = llm_url
- self.sample_size = sample_size
- self.model = model
- def execute(self, df: pd.DataFrame) -> Dict[str, List[Dict]]:
- """
- Use LLM to identify unclear terms and unknown acronyms.
- Args:
- df: DataFrame with messages
- Returns:
- Dictionary with identified terms and acronyms (deduplicated)
- """
- self.logger.info("=" * 80)
- self.logger.info("LLM-BASED TEXT NORMALIZATION ANALYSIS")
- self.logger.info("=" * 80)
- self.logger.info(f"Using LLM at: {self.llm_url}")
- # Extract frequent words and acronyms
- word_freq, acronym_freq = self._extract_terms(df)
- # Track results with deduplication
- acronym_dict = {} # key: acronym, value: dict with metadata
- term_dict = {} # key: term, value: dict with metadata
- expansion_dict = {} # key: acronym, value: dict with metadata
- sample_df = df.sample(n=min(self.sample_size, len(df)), random_state=42)
- # Process in chunks
- for i in range(0, len(sample_df), 100):
- chunk = sample_df.iloc[i : i + 100]
- messages_sample = "\n".join(chunk["message"].fillna("").tolist())
- self.logger.info(
- f"Analyzing chunk {i//100 + 1} of {(len(sample_df)-1)//100 + 1}..."
- )
- # Get unknown acronyms
- unknown_acronyms = self._identify_acronyms_with_llm(
- messages_sample, list(acronym_freq.keys())[:50]
- )
- for item in unknown_acronyms:
- acronym = item.get("acronym", "").lower()
- if acronym and acronym not in acronym_dict:
- acronym_dict[acronym] = item
- # Get unclear terms
- unclear_terms = self._identify_unclear_terms_with_llm(
- messages_sample, list(word_freq.keys())[:100]
- )
- for item in unclear_terms:
- term = item.get("term", "").lower()
- if term and term not in term_dict:
- term_dict[term] = item
- # Get expansion suggestions (use acronyms found in this chunk)
- if unknown_acronyms:
- expansions = self._get_expansion_suggestions_with_llm(
- messages_sample, unknown_acronyms
- )
- for item in expansions:
- if isinstance(item, dict):
- acronym = item.get("acronym", "").lower()
- if acronym and acronym not in expansion_dict:
- expansion_dict[acronym] = item
- # Convert dictionaries back to lists
- results = {
- "unknown_acronyms": list(acronym_dict.values()),
- "unclear_terms": list(term_dict.values()),
- "suggested_expansions": list(expansion_dict.values()),
- }
- self.logger.info(
- f"Found {len(results['unknown_acronyms'])} unique unknown acronyms"
- )
- self.logger.info(f"Found {len(results['unclear_terms'])} unique unclear terms")
- self.logger.info(
- f"Found {len(results['suggested_expansions'])} unique expansions"
- )
- self._save_llm_analysis(results)
- return results
- def _extract_terms(self, df: pd.DataFrame) -> tuple:
- """Extract words and potential acronyms"""
- word_freq = Counter()
- acronym_freq = Counter()
- for message in df["message"].fillna(""):
- text = str(message)
- # Extract words
- words = re.findall(r"\b[a-z]+\b", text.lower())
- word_freq.update(words)
- # Extract potential acronyms (2-6 uppercase letters)
- acronyms = re.findall(r"\b[A-Z]{2,6}\b", text)
- acronym_freq.update([a.lower() for a in acronyms])
- return word_freq, acronym_freq
- def _identify_acronyms_with_llm(
- self, messages_sample: str, acronym_candidates: List[str]
- ) -> List[Dict]:
- """Use LLM to identify unknown acronyms"""
- self.logger.info("identifying acronyms...")
- prompt = f"""You are analyzing messages.
- ACRONYMS FOUND: {', '.join(acronym_candidates[:100])}
- SAMPLE MESSAGES:
- {messages_sample[:2000]}
- Task: Identify which acronyms are UNKNOWN or UNCLEAR (not standard medical/legal acronyms).
- For each unknown acronym, try to infer its meaning from context.
- Respond with JSON:
- {{
- "unknown_acronyms": [
- {{"acronym": "XYZ", "possible_meaning": "...", "confidence": "high/medium/low"}},
- ...
- ]
- }}"""
- try:
- # print(
- # json.dumps(
- # {
- # "model": self.model,
- # "messages": [{"role": "user", "content": prompt}],
- # "max_tokens": 2048,
- # "temperature": 0.3,
- # },
- # indent=2,
- # )
- # )
- response = requests.post(
- f"{self.llm_url}/v1/chat/completions",
- json={
- "model": self.model,
- "messages": [{"role": "user", "content": prompt}],
- "max_tokens": 2048,
- "temperature": 0.3,
- },
- timeout=120,
- )
- if response.status_code == 200:
- text = response.json()["choices"][0]["message"]["content"]
- parsed = loads(text)
- if isinstance(parsed, dict):
- return parsed.get("unknown_acronyms", [])
- else:
- raise RuntimeError("LLM Error")
- except Exception as e:
- # raise e
- self.logger.error(f"LLM error: {e}")
- return []
- def _identify_unclear_terms_with_llm(
- self, messages_sample: str, word_candidates: List[str]
- ) -> List[Dict]:
- """Use LLM to identify unclear terms"""
- self.logger.info("identifying unclear terms...")
- prompt = f"""You are analyzing messages.
- FREQUENT WORDS: {', '.join(word_candidates[:100])}
- SAMPLE MESSAGES:
- {messages_sample[:2000]}
- Task: Identify words that are UNCLEAR, AMBIGUOUS, or may be TYPOS/SLANG.
- Focus on words that:
- - Have unclear meaning in context
- - May be misspellings
- - Are slang or informal terms
- - Need clarification for legal purposes
- Respond with JSON:
- {{
- "unclear_terms": [
- {{"term": "word", "reason": "...", "suggested_clarification": "..."}},
- ...
- ]
- }}"""
- try:
- response = requests.post(
- f"{self.llm_url}/v1/chat/completions",
- json={
- "model": self.model,
- "messages": [{"role": "user", "content": prompt}],
- "max_tokens": 2048,
- "temperature": 0.3,
- },
- timeout=120,
- )
- if response.status_code == 200:
- text = response.json()["choices"][0]["message"]["content"]
- parsed = loads(text)
- if isinstance(parsed, dict):
- return parsed.get("unclear_terms", [])
- else:
- raise RuntimeError("LLM Error")
- except Exception as e:
- # raise e
- self.logger.error(f"LLM error: {e}")
- return []
- def _get_expansion_suggestions_with_llm(
- self, messages_sample: str, acronyms: List[Dict]
- ) -> List[Dict]:
- """Get expansion suggestions for acronyms"""
- self.logger.info("getting expansion suggestions...")
- if not acronyms:
- return []
- acronym_list = ", ".join([a["acronym"] for a in acronyms[:100]])
- prompt = f"""Based on these medical/legal messages, suggest expansions for these acronyms:
- ACRONYMS: {acronym_list}
- SAMPLE MESSAGES:
- {messages_sample[:2000]}
- Respond with JSON:
- {{
- "expansions": [
- {{"acronym": "ABC", "expansion": "full form", "confidence": "high/medium/low"}},
- ...
- ]
- }}"""
- try:
- # print(
- # json.dumps(
- # {
- # "model": self.model,
- # "messages": [{"role": "user", "content": prompt}],
- # "max_tokens": 2048,
- # "temperature": 0.3,
- # },
- # indent=2,
- # )
- # )
- response = requests.post(
- f"{self.llm_url}/v1/chat/completions",
- json={
- "model": self.model,
- "messages": [{"role": "user", "content": prompt}],
- "max_tokens": 2048,
- "temperature": 0.3,
- },
- timeout=120,
- )
- if response.status_code == 200:
- text = response.json()["choices"][0]["message"]["content"]
- parsed = loads(text)
- if isinstance(parsed, dict):
- return parsed.get("expansions", [])
- else:
- raise RuntimeError("LLM Error")
- except Exception as e:
- # raise e
- self.logger.error(f"LLM error: {e}")
- return []
- def _save_llm_analysis(self, results: Dict):
- """Save LLM analysis results"""
- self.save_results(results, "llm_normalization_analysis.json")
- # Save text
- text_output = []
- text_output.append("LLM-BASED TEXT NORMALIZATION ANALYSIS")
- text_output.append("=" * 80)
- text_output.append("")
- text_output.append("UNKNOWN ACRONYMS:")
- text_output.append("-" * 80)
- for item in results["unknown_acronyms"]:
- text_output.append(
- f" {item['acronym']}: {item.get('possible_meaning', 'Unknown')}"
- )
- text_output.append("")
- text_output.append("UNCLEAR TERMS:")
- text_output.append("-" * 80)
- for item in results["unclear_terms"]:
- text_output.append(f" {item['term']}: {item.get('reason', 'Unclear')}")
- text_output.append("")
- text_output.append("SUGGESTED EXPANSIONS:")
- text_output.append("-" * 80)
- for item in results["suggested_expansions"]:
- text_output.append(f" {item['acronym']} -> {item['expansion']}")
- filepath = self.output_dir / "llm_normalization_analysis.txt"
- with open(filepath, "w") as f:
- f.write("\n".join(text_output))
- self.logger.info(f"Saved analysis to: {filepath}")
- if __name__ == "__main__":
- import pandas as pd
- df = pd.read_csv("../_sources/signal_messages.csv")
- analyzer = LLMNormalizationAnalyzer(
- llm_url="http://eos.dgtlu.net:11434",
- sample_size=14000,
- model="hf.co/bartowski/Qwen2.5-14B-Instruct-GGUF:Q4_K_S",
- )
- results = analyzer.execute(df)
- print(f"Found {len(results['unknown_acronyms'])} unknown acronyms")
- print(f"Found {len(results['unclear_terms'])} unclear terms")
|