adri 1 месяц назад
Родитель
Сommit
7515e09655

+ 29 - 6
pipeline/steps/step0b1_semantic_keyword_identification.py

@@ -12,6 +12,7 @@ from sklearn.metrics.pairwise import cosine_similarity
 from pipeline.models.base import PipelineStep
 from pipeline.common_defs import SUBPOENA_CRITERIA
 from pipeline.utils.text_utils import normalize_text
+from tqdm import tqdm
 
 class SemanticKeywordIdentifier(PipelineStep):
     """
@@ -22,7 +23,7 @@ class SemanticKeywordIdentifier(PipelineStep):
     def __init__(
         self,
         similarity_threshold: float = 0.25,
-        max_keywords_per_criterion: int = 80,
+        max_keywords_per_criterion: int = 100,
         min_word_length: int = 3,
         output_dir: str = "./pipeline_output",
     ):
@@ -78,13 +79,35 @@ class SemanticKeywordIdentifier(PipelineStep):
 
         return keywords_by_criterion
 
+    # Alternative: If you need progress tracking, use this hybrid approach
     def _extract_unique_words(self, df: pd.DataFrame) -> List[str]:
-        """Extract unique words from messages"""
+        """
+        Extract unique words - optimized but with progress bar
+        Good balance between speed and user feedback
+        """
+        self.logger.info("extracting unique words")
         words = set()
-        for message in df["message"].fillna(""):
-            normalized = normalize_text(str(message))
-            tokens = [t for t in normalized.split() if len(t) >= self.min_word_length and t.isalpha()]
-            words.update(tokens)
+        min_len = self.min_word_length
+
+        # Process in chunks for progress tracking
+        chunk_size = 10000
+        total_chunks = (len(df) + chunk_size - 1) // chunk_size
+
+        with tqdm(total=total_chunks, desc="Processing chunks") as pbar:
+            for i in range(0, len(df), chunk_size):
+                chunk = df["message"].iloc[i : i + chunk_size]
+
+                # Vectorized processing within chunk
+                normalized = chunk.fillna("").astype(str).apply(normalize_text)
+                tokens = normalized.str.split().explode()
+                chunk_words = tokens[
+                    (tokens.str.len() >= min_len) & (tokens.str.isalpha())
+                ].unique()
+
+                words.update(chunk_words)
+                pbar.update(1)
+
+        self.logger.info(f"Found {len(words):,} unique words")
         return sorted(list(words))
 
     def _create_criteria_descriptions(self) -> Dict[int, str]:

+ 66 - 23
pipeline/steps/step0b2_llm_keyword_identification.py

@@ -4,21 +4,28 @@ Step 0a (Alternative): LLM-based keyword identification.
 
 from typing import List, Dict
 import pandas as pd
-import json
 import requests
 from pipeline.models.base import PipelineStep
 from pipeline.common_defs import SUBPOENA_CRITERIA
-
-MODEL = "hf.co/bartowski/Qwen2.5-14B-Instruct-GGUF:Q4_K_S"
+from json_repair import loads
+from tqdm import tqdm
+from pipeline.utils.text_utils import normalize_text
+from concurrent.futures import ThreadPoolExecutor, as_completed
 
 class LLMKeywordIdentifier(PipelineStep):
     """Identify keywords using LLM analysis"""
 
-    def __init__(self, llm_url: str = "http://localhost:8000",
-                 sample_size: int = 1000, output_dir: str = "./pipeline_output"):
+    def __init__(
+        self,
+        llm_url: str = "https://scripting-logo-pad-flat.trycloudflare.com",
+        sample_size: int = 1000,
+        model: str = "Qwen/Qwen2.5-14B-Instruct",
+        output_dir: str = "./pipeline_output",
+    ):
         super().__init__(output_dir)
         self.llm_url = llm_url
         self.sample_size = sample_size
+        self.model = model
 
     def execute(self, df: pd.DataFrame) -> Dict[int, List[str]]:
         """Use LLM to identify relevant keywords"""
@@ -26,6 +33,7 @@ class LLMKeywordIdentifier(PipelineStep):
         sample_df = df.sample(n=min(self.sample_size, len(df)), random_state=42)
         keywords_by_criterion = {}
         for num, desc in SUBPOENA_CRITERIA.items():
+            self.logger.info(f"getting keywords for {num}...")
             keywords = self._identify_keywords_for_criterion(sample_df, num, desc)
             keywords_by_criterion[num] = keywords
             self.logger.info(f"Criterion {num}: {len(keywords)} keywords")
@@ -33,36 +41,71 @@ class LLMKeywordIdentifier(PipelineStep):
         return keywords_by_criterion
 
     def _identify_keywords_for_criterion(self, df, num, desc):
-        """Use LLM to identify keywords"""
+        """Use LLM to identify keywords - OPTIMIZED"""
         all_keywords = []
+        chunk_size = 200
+        chunks = [df.iloc[i : i + chunk_size] for i in range(0, len(df), chunk_size)]
+
+        def process_chunk(chunk_data):
+            """Process a single chunk"""
+            chunk_idx, chunk = chunk_data
+            # Normalize text before processing
+            messages_sample = "\n".join(
+                normalize_text(msg) for msg in chunk["message"].fillna("").tolist()
+            )
 
-        # Loop through df in chunks of 100 rows
-        for i in range(0, len(df), 100):
-            chunk = df.iloc[i : i + 100]
-            messages_sample = "\n".join(chunk["message"].fillna("").tolist())
-            prompt = f"Identify 30-50 keywords for: {desc}\n\nMessages:\n{messages_sample}\n\nJSON:"
+            prompt = f"Identify keywords for: {desc}\n\nMessages:\n{messages_sample}\n\nJSON:"
 
             try:
                 response = requests.post(
                     f"{self.llm_url}/v1/chat/completions",
-                    json={"prompt": prompt, "max_tokens": 1000, "model": MODEL},
+                    # headers={
+                    #     "Authorization": "Bearer "
+                    # },
+                    json={
+                        "model": self.model,
+                        "messages": [{"role": "user", "content": prompt}],
+                        "max_tokens": 2048,
+                        "temperature": 0.05,
+                    },
                     timeout=120,
                 )
                 if response.status_code == 200:
-                    text = response.json()["choices"][0]["text"]
-                    parsed = json.loads(text)
-                    keywords = parsed.get("keywords", [])
-                    all_keywords.extend(keywords)
+                    text = response.json()["choices"][0]["message"]["content"]
+                    parsed = loads(text)
+                    if isinstance(parsed, dict):
+                        return parsed.get("keywords", [])
+                else:
+                    self.logger.warning(
+                        f"LLM returned status {response.status_code} for chunk {chunk_idx}"
+                    )
+                    return []
             except Exception as e:
-                self.logger.error(f"Error processing chunk {i//100 + 1}: {e}")
+                self.logger.error(f"Error processing chunk {chunk_idx}: {e}")
+                return []
+
+        # Process chunks in parallel
+        max_workers = 4  # Adjust based on your LLM server capacity
+        with ThreadPoolExecutor(max_workers=max_workers) as executor:
+            # Submit all tasks
+            future_to_chunk = {
+                executor.submit(process_chunk, (i, chunk)): i
+                for i, chunk in enumerate(chunks)
+            }
+
+            # Process results as they complete with progress bar
+            with tqdm(desc="chunks", total=len(chunks)) as pbar:
+                for future in as_completed(future_to_chunk):
+                    keywords = future.result()
+                    if isinstance(keywords, list):
+                        all_keywords.extend(keywords)
+                    pbar.update(1)
 
-        # Remove duplicates while preserving order
+        # Remove duplicates while preserving order (more efficient)
         seen = set()
-        unique_keywords = []
-        for keyword in all_keywords:
-            if keyword not in seen:
-                seen.add(keyword)
-                unique_keywords.append(keyword)
+        unique_keywords = [
+            kw for kw in all_keywords if kw not in seen and not seen.add(kw)
+        ]
 
         return unique_keywords
 

+ 30 - 12
pipeline/steps/step0b_keyword_identification.py

@@ -14,20 +14,38 @@ from pipeline.utils.combine_keywords import combine_keywords, analyze_overlap
 if __name__ == "__main__":
     df = pd.read_csv("../_sources/signal_messages.csv")
     ski = SemanticKeywordIdentifier()
-    semantic_keywords = ski.execute(df=df)
+    # semantic_keywords = ski.execute(df=df)
 
-    lki = LLMKeywordIdentifier(llm_url="http://eos.dgtlu.net:11434", sample_size=14000)
-    llm_keywords = lki.execute(df=df)
+    # lki = LLMKeywordIdentifier(llm_url="http://localhost:8000", sample_size=14000)
+    # llm_keywords = lki.execute(df=df)
 
-    combined = combine_keywords(
-        semantic_results=semantic_keywords, llm_results=llm_keywords
-    )
     out_dir = ski.output_dir
-    with open(f"{out_dir}/combined_keywords.json") as out_file:
-        out_file.write(json.dumps(combined))
 
-    overlap = analyze_overlap(
-        semantic_results=semantic_keywords, llm_results=llm_keywords
+    # with open(f"{out_dir}/semantic_keywords.json") as file:
+    #     semantic_keywords = json.load(file)
+
+    # with open(f"{out_dir}/llm_keywords.json") as file:
+    #     llm_keywords = json.load(file)
+
+    # combined = combine_keywords(
+    #     semantic_results=semantic_keywords, llm_results=llm_keywords
+    # )
+
+    # with open(f"{out_dir}/combined_keywords.json", "w") as out_file:
+    #     out_file.write(json.dumps(combined))
+
+    # overlap = analyze_overlap(
+    #     semantic_results=semantic_keywords, llm_results=llm_keywords
+    # )
+    # with open(f"{out_dir}/keyword_overlap.json", "w") as out_file:
+    #     out_file.write(json.dumps(combined))
+
+    with open(f"{out_dir}/combined_keywords.json", "r") as file:
+        dict_all = json.load(file)
+
+    merged_list = list(
+        dict.fromkeys(item for sublist in dict_all.values() for item in sublist)
     )
-    with open(f"{out_dir}/keyword_overlap.json") as out_file:
-        out_file.write(json.dumps(combined))
+
+    with open(f"{out_dir}/final_keyword_list.json", "w") as out_file:
+        json.dump(merged_list, out_file)

+ 0 - 246
pipeline/steps/step0b_normalization_analysis.py

@@ -1,246 +0,0 @@
-"""
-Step 0b: Analyze text patterns and suggest normalizations.
-"""
-
-from typing import List, Dict, Set, Tuple
-from collections import Counter
-import re
-import pandas as pd
-from pipeline.models.base import PipelineStep
-
-
-class NormalizationAnalyzer(PipelineStep):
-    """Analyze text patterns and suggest normalizations"""
-
-    def __init__(self, output_dir: str = "./pipeline_output"):
-        super().__init__(output_dir)
-
-    def execute(self, df: pd.DataFrame) -> Dict[str, Dict[str, str]]:
-        """
-        Analyze text and suggest normalizations.
-
-        Args:
-            df: DataFrame with messages
-
-        Returns:
-            Dictionary of suggested normalizations
-        """
-        self.logger.info("Analyzing text patterns for normalization...")
-
-        # Find abbreviations
-        abbreviations = self._find_abbreviations(df)
-
-        # Find acronyms
-        acronyms = self._find_acronyms(df)
-
-        # Find common misspellings
-        misspellings = self._find_misspellings(df)
-
-        # Find date/time patterns
-        datetime_patterns = self._find_datetime_patterns(df)
-
-        # Combine suggestions
-        suggestions = {
-            "abbreviations": abbreviations,
-            "acronyms": acronyms,
-            "misspellings": misspellings,
-            "datetime_patterns": datetime_patterns,
-        }
-
-        # Save results
-        self._save_normalization_suggestions(suggestions)
-
-        return suggestions
-
-    def _find_abbreviations(self, df: pd.DataFrame) -> Dict[str, str]:
-        """Find common abbreviations"""
-        self.logger.info("Finding abbreviations...")
-
-        # Common medical/legal abbreviations
-        known_abbrevs = {
-            "dr.": "doctor",
-            "dr ": "doctor ",
-            "appt": "appointment",
-            "hosp": "hospital",
-            "med": "medical",
-            "meds": "medications",
-            "rx": "prescription",
-            "pt": "patient",
-            "pts": "patients",
-            "pron": "pronoun",
-            "prns": "pronouns",
-            "info": "information",
-            "dept": "department",
-            "rep": "representative",
-            "admin": "administration",
-            "surg": "surgery",
-            "proc": "procedure",
-        }
-
-        # Find abbreviations in text
-        found_abbrevs = {}
-        pattern = r"\b[a-z]{2,5}\.?\b"
-
-        for message in df["message"].fillna(""):
-            text = str(message).lower()
-            matches = re.findall(pattern, text)
-
-            for match in matches:
-                if match in known_abbrevs:
-                    found_abbrevs[match] = known_abbrevs[match]
-
-        self.logger.info(f"Found {len(found_abbrevs)} abbreviations")
-        return found_abbrevs
-
-    def _find_acronyms(self, df: pd.DataFrame) -> Dict[str, str]:
-        """Find common acronyms"""
-        self.logger.info("Finding acronyms...")
-
-        known_acronyms = {
-            "msk": "memorial sloan kettering",
-            "er": "emergency room",
-            "icu": "intensive care unit",
-            "ob": "obstetrics",
-            "gyn": "gynecology",
-            "obgyn": "obstetrics gynecology",
-            "pcp": "primary care physician",
-            "np": "nurse practitioner",
-            "pa": "physician assistant",
-            "rn": "registered nurse",
-            "lpn": "licensed practical nurse",
-            "emr": "electronic medical record",
-            "ehr": "electronic health record",
-            "hipaa": "health insurance portability accountability act",
-            "lgbtq": "lesbian gay bisexual transgender queer",
-            "lgbt": "lesbian gay bisexual transgender",
-        }
-
-        found_acronyms = {}
-        pattern = r"\b[A-Z]{2,6}\b"
-
-        for message in df["message"].fillna(""):
-            text = str(message)
-            matches = re.findall(pattern, text)
-
-            for match in matches:
-                match_lower = match.lower()
-                if match_lower in known_acronyms:
-                    found_acronyms[match_lower] = known_acronyms[match_lower]
-
-        self.logger.info(f"Found {len(found_acronyms)} acronyms")
-        return found_acronyms
-
-    def _find_misspellings(self, df: pd.DataFrame) -> Dict[str, str]:
-        """Find common misspellings"""
-        self.logger.info("Finding common misspellings...")
-
-        # Common misspellings in medical/legal context
-        known_misspellings = {
-            "recieve": "receive",
-            "occured": "occurred",
-            "seperate": "separate",
-            "definately": "definitely",
-            "accomodate": "accommodate",
-            "untill": "until",
-            "thier": "their",
-            "recieved": "received",
-        }
-
-        found_misspellings = {}
-
-        for message in df["message"].fillna(""):
-            text = str(message).lower()
-            words = text.split()
-
-            for word in words:
-                clean_word = re.sub(r"[^a-z]", "", word)
-                if clean_word in known_misspellings:
-                    found_misspellings[clean_word] = known_misspellings[clean_word]
-
-        self.logger.info(f"Found {len(found_misspellings)} misspellings")
-        return found_misspellings
-
-    def _find_datetime_patterns(self, df: pd.DataFrame) -> Dict[str, str]:
-        """Find date/time patterns"""
-        self.logger.info("Finding date/time patterns...")
-
-        patterns = {}
-
-        # Common date patterns
-        date_patterns = [
-            (r"\d{1,2}/\d{1,2}/\d{2,4}", "date_slash"),
-            (r"\d{1,2}-\d{1,2}-\d{2,4}", "date_dash"),
-            (
-                r"\b(jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)[a-z]*\s+\d{1,2}",
-                "date_month_day",
-            ),
-            (
-                r"\d{1,2}\s+(jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)",
-                "date_day_month",
-            ),
-        ]
-
-        for message in df["message"].fillna(""):
-            text = str(message).lower()
-
-            for pattern, pattern_name in date_patterns:
-                if re.search(pattern, text):
-                    patterns[pattern_name] = pattern
-
-        self.logger.info(f"Found {len(patterns)} date/time patterns")
-        return patterns
-
-    def _save_normalization_suggestions(self, suggestions: Dict):
-        """Save normalization suggestions"""
-        self.save_results(suggestions, "normalization_suggestions.json")
-
-        # Create readable text file
-        text_output = []
-        text_output.append("TEXT NORMALIZATION SUGGESTIONS")
-        text_output.append("=" * 80)
-        text_output.append("")
-
-        text_output.append("ABBREVIATIONS TO EXPAND:")
-        text_output.append("-" * 80)
-        for abbrev, expansion in sorted(suggestions["abbreviations"].items()):
-            text_output.append(f"  {abbrev:20} -> {expansion}")
-        text_output.append("")
-
-        text_output.append("ACRONYMS TO EXPAND:")
-        text_output.append("-" * 80)
-        for acronym, expansion in sorted(suggestions["acronyms"].items()):
-            text_output.append(f"  {acronym:20} -> {expansion}")
-        text_output.append("")
-
-        if suggestions["misspellings"]:
-            text_output.append("MISSPELLINGS TO CORRECT:")
-            text_output.append("-" * 80)
-            for misspell, correct in sorted(suggestions["misspellings"].items()):
-                text_output.append(f"  {misspell:20} -> {correct}")
-            text_output.append("")
-
-        text_output.append("DATE/TIME PATTERNS FOUND:")
-        text_output.append("-" * 80)
-        for pattern_name, pattern in suggestions["datetime_patterns"].items():
-            text_output.append(f"  {pattern_name}: {pattern}")
-
-        filepath = self.output_dir / "normalization_suggestions.txt"
-        with open(filepath, "w") as f:
-            f.write("\n".join(text_output))
-
-        self.logger.info(f"Saved normalization suggestions to: {filepath}")
-
-
-if __name__ == "__main__":
-    import pandas as pd
-
-    df = pd.read_csv("../_sources/signal_messages.csv")
-
-    analyzer = NormalizationAnalyzer()
-    suggestions = analyzer.execute(df)
-
-    print("\nNormalization suggestions:")
-    print(f"  Abbreviations: {len(suggestions['abbreviations'])}")
-    print(f"  Acronyms: {len(suggestions['acronyms'])}")
-    print(f"  Misspellings: {len(suggestions['misspellings'])}")
-    print(f"  Date patterns: {len(suggestions['datetime_patterns'])}")

+ 1 - 1
pipeline/steps/step1_load_data.py

@@ -72,6 +72,6 @@ class DataLoader(PipelineStep):
 
 if __name__ == "__main__":
     # Example usage
-    loader = DataLoader('signal_messages.csv')
+    loader = DataLoader("../_sources/signal_messages.csv")
     df = loader.execute()
     print(f"Loaded {len(df)} messages")

+ 6 - 2
pipeline/utils/combine_keywords.py

@@ -10,7 +10,9 @@ def combine_keywords(semantic_results, llm_results):
     for criterion_num_str in semantic_results["criteria"].keys():
         criterion_num = int(criterion_num_str)
         semantic_kws = set(kw["word"] for kw in semantic_results["criteria"][criterion_num_str]["keywords"])
-        llm_kws = set(llm_results["criteria"][criterion_num_str]["keywords"])
+        llm_kws = set(
+            kw.lower() for kw in llm_results["criteria"][criterion_num_str]["keywords"]
+        )
         combined[criterion_num] = sorted(list(semantic_kws | llm_kws))
     return combined
 
@@ -20,6 +22,8 @@ def analyze_overlap(semantic_results, llm_results):
     for criterion_num_str in semantic_results["criteria"].keys():
         criterion_num = int(criterion_num_str)
         semantic_kws = set(kw["word"] for kw in semantic_results["criteria"][criterion_num_str]["keywords"])
-        llm_kws = set(llm_results["criteria"][criterion_num_str]["keywords"])
+        llm_kws = set(
+            kw.lower() for kw in llm_results["criteria"][criterion_num_str]["keywords"]
+        )
         overlap = semantic_kws & llm_kws
         print(f"Criterion {criterion_num}: {len(overlap)} overlap, {len(semantic_kws | llm_kws)} total")

+ 25 - 14
pipeline/utils/text_utils.py

@@ -7,37 +7,48 @@ from typing import List
 import pandas as pd
 from pipeline.common_defs import TEXT_EXPANSIONS, ACRONYMS
 
+# Build a single combined pattern for all acronyms (case-sensitive)
+_ACRONYM_PATTERN = re.compile(
+    r"\b(" + "|".join(re.escape(k) for k in ACRONYMS.keys()) + r")\b"
+)
+
+# Build a single combined pattern for all expansions (will be case-insensitive after lowering)
+_EXPANSION_PATTERN = re.compile(
+    r"\b(" + "|".join(re.escape(k) for k in TEXT_EXPANSIONS.keys()) + r")\b"
+)
+
+
 def normalize_text(text: str) -> str:
     """
-    Normalize text with abbreviation expansion.
-    
+    Normalize text with abbreviation expansion - OPTIMIZED.
+
     Args:
         text: Input text to normalize
-        
+
     Returns:
         Normalized text
     """
-    if pd.isna(text) or text == '':
+    if pd.isna(text) or text == "":
         return ""
 
     text = str(text)
 
-    # Apply expansions for acronyms
-    for abbr, full in ACRONYMS.items():
-        # Use \b for word boundaries to only match complete words
-        pattern = r"\b" + re.escape(abbr) + r"\b"
-        text = re.sub(pattern, full, text)
+    # Apply acronym expansions using single pattern
+    def replace_acronym(match):
+        return ACRONYMS[match.group(0)]
 
+    text = _ACRONYM_PATTERN.sub(replace_acronym, text)
     text = text.lower()
 
-    # Apply expansions
-    for abbr, full in TEXT_EXPANSIONS.items():
-        # Use \b for word boundaries to only match complete words
-        pattern = r"\b" + re.escape(abbr) + r"\b"
-        text = re.sub(pattern, full, text)
+    # Apply text expansions using single pattern
+    def replace_expansion(match):
+        return TEXT_EXPANSIONS[match.group(0)]
+
+    text = _EXPANSION_PATTERN.sub(replace_expansion, text)
 
     return text
 
+
 def extract_keywords(text: str, keywords: List[str]) -> List[str]:
     """
     Extract matching keywords from text.