| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- """
- Step 0a (Alternative): LLM-based keyword identification.
- """
- from typing import List, Dict
- import pandas as pd
- import requests
- from pipeline.models.base import PipelineStep
- from pipeline.common_defs import SUBPOENA_CRITERIA
- from json_repair import loads
- from tqdm import tqdm
- from pipeline.utils.text_utils import normalize_text
- from concurrent.futures import ThreadPoolExecutor, as_completed
- class LLMKeywordIdentifier(PipelineStep):
- """Identify keywords using LLM analysis"""
- def __init__(
- self,
- llm_url: str = "http://localhost:8000",
- sample_size: int = 1000,
- model: str = "Qwen/Qwen2.5-14B-Instruct-GPTQ-Int8",
- output_dir: str = "./pipeline_output",
- ):
- super().__init__(output_dir)
- self.llm_url = llm_url
- self.sample_size = sample_size
- self.model = model
- def execute(self, df: pd.DataFrame) -> Dict[int, List[str]]:
- """Use LLM to identify relevant keywords"""
- self.logger.info("LLM-BASED KEYWORD IDENTIFICATION")
- sample_df = df.sample(n=min(self.sample_size, len(df)), random_state=42)
- keywords_by_criterion = {}
- for num, desc in SUBPOENA_CRITERIA.items():
- self.logger.info(f"getting keywords for {num}...")
- keywords = self._identify_keywords_for_criterion(sample_df, num, desc)
- keywords_by_criterion[num] = keywords
- self.logger.info(f"Criterion {num}: {len(keywords)} keywords")
- self._save_llm_keywords(keywords_by_criterion)
- return keywords_by_criterion
- def _identify_keywords_for_criterion(self, df, num, desc):
- """Use LLM to identify keywords - OPTIMIZED"""
- all_keywords = []
- chunk_size = 200
- chunks = [df.iloc[i : i + chunk_size] for i in range(0, len(df), chunk_size)]
- def process_chunk(chunk_data):
- """Process a single chunk"""
- chunk_idx, chunk = chunk_data
- # Normalize text before processing
- messages_sample = "\n".join(
- normalize_text(msg) for msg in chunk["message"].fillna("").tolist()
- )
- prompt = f"Identify keywords for: {desc}\n\nMessages:\n{messages_sample}\n\nJSON:"
- try:
- response = requests.post(
- f"{self.llm_url}/v1/chat/completions",
- # headers={
- # "Authorization": "Bearer "
- # },
- json={
- "model": self.model,
- "messages": [{"role": "user", "content": prompt}],
- "max_tokens": 2048,
- "temperature": 0.05,
- },
- timeout=120,
- )
- if response.status_code == 200:
- text = response.json()["choices"][0]["message"]["content"]
- parsed = loads(text)
- if isinstance(parsed, dict):
- return parsed.get("keywords", [])
- else:
- self.logger.warning(
- f"LLM returned status {response.status_code} for chunk {chunk_idx}"
- )
- return []
- except Exception as e:
- self.logger.error(f"Error processing chunk {chunk_idx}: {e}")
- return []
- # Process chunks in parallel
- max_workers = 4 # Adjust based on your LLM server capacity
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- # Submit all tasks
- future_to_chunk = {
- executor.submit(process_chunk, (i, chunk)): i
- for i, chunk in enumerate(chunks)
- }
- # Process results as they complete with progress bar
- with tqdm(desc="chunks", total=len(chunks)) as pbar:
- for future in as_completed(future_to_chunk):
- keywords = future.result()
- if isinstance(keywords, list):
- all_keywords.extend(keywords)
- pbar.update(1)
- # Remove duplicates while preserving order (more efficient)
- seen = set()
- unique_keywords = [
- kw for kw in all_keywords if kw not in seen and not seen.add(kw)
- ]
- return unique_keywords
- def _save_llm_keywords(self, keywords_by_criterion):
- """Save LLM keywords"""
- results = {"method": "llm_analysis", "criteria": {str(n): {"keywords": k} for n, k in keywords_by_criterion.items()}}
- self.save_results(results, "llm_keywords.json")
- self.logger.info("Saved LLM keywords")
|