step0b2_llm_keyword_identification.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. """
  2. Step 0a (Alternative): LLM-based keyword identification.
  3. """
  4. from typing import List, Dict
  5. import pandas as pd
  6. import requests
  7. from pipeline.models.base import PipelineStep
  8. from pipeline.common_defs import SUBPOENA_CRITERIA
  9. from json_repair import loads
  10. from tqdm import tqdm
  11. from pipeline.utils.text_utils import normalize_text
  12. from concurrent.futures import ThreadPoolExecutor, as_completed
  13. class LLMKeywordIdentifier(PipelineStep):
  14. """Identify keywords using LLM analysis"""
  15. def __init__(
  16. self,
  17. llm_url: str = "http://localhost:8000",
  18. sample_size: int = 1000,
  19. model: str = "Qwen/Qwen2.5-14B-Instruct-GPTQ-Int8",
  20. output_dir: str = "./pipeline_output",
  21. ):
  22. super().__init__(output_dir)
  23. self.llm_url = llm_url
  24. self.sample_size = sample_size
  25. self.model = model
  26. def execute(self, df: pd.DataFrame) -> Dict[int, List[str]]:
  27. """Use LLM to identify relevant keywords"""
  28. self.logger.info("LLM-BASED KEYWORD IDENTIFICATION")
  29. sample_df = df.sample(n=min(self.sample_size, len(df)), random_state=42)
  30. keywords_by_criterion = {}
  31. for num, desc in SUBPOENA_CRITERIA.items():
  32. self.logger.info(f"getting keywords for {num}...")
  33. keywords = self._identify_keywords_for_criterion(sample_df, num, desc)
  34. keywords_by_criterion[num] = keywords
  35. self.logger.info(f"Criterion {num}: {len(keywords)} keywords")
  36. self._save_llm_keywords(keywords_by_criterion)
  37. return keywords_by_criterion
  38. def _identify_keywords_for_criterion(self, df, num, desc):
  39. """Use LLM to identify keywords - OPTIMIZED"""
  40. all_keywords = []
  41. chunk_size = 200
  42. chunks = [df.iloc[i : i + chunk_size] for i in range(0, len(df), chunk_size)]
  43. def process_chunk(chunk_data):
  44. """Process a single chunk"""
  45. chunk_idx, chunk = chunk_data
  46. # Normalize text before processing
  47. messages_sample = "\n".join(
  48. normalize_text(msg) for msg in chunk["message"].fillna("").tolist()
  49. )
  50. prompt = f"Identify keywords for: {desc}\n\nMessages:\n{messages_sample}\n\nJSON:"
  51. try:
  52. response = requests.post(
  53. f"{self.llm_url}/v1/chat/completions",
  54. # headers={
  55. # "Authorization": "Bearer "
  56. # },
  57. json={
  58. "model": self.model,
  59. "messages": [{"role": "user", "content": prompt}],
  60. "max_tokens": 2048,
  61. "temperature": 0.05,
  62. },
  63. timeout=120,
  64. )
  65. if response.status_code == 200:
  66. text = response.json()["choices"][0]["message"]["content"]
  67. parsed = loads(text)
  68. if isinstance(parsed, dict):
  69. return parsed.get("keywords", [])
  70. else:
  71. self.logger.warning(
  72. f"LLM returned status {response.status_code} for chunk {chunk_idx}"
  73. )
  74. return []
  75. except Exception as e:
  76. self.logger.error(f"Error processing chunk {chunk_idx}: {e}")
  77. return []
  78. # Process chunks in parallel
  79. max_workers = 4 # Adjust based on your LLM server capacity
  80. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  81. # Submit all tasks
  82. future_to_chunk = {
  83. executor.submit(process_chunk, (i, chunk)): i
  84. for i, chunk in enumerate(chunks)
  85. }
  86. # Process results as they complete with progress bar
  87. with tqdm(desc="chunks", total=len(chunks)) as pbar:
  88. for future in as_completed(future_to_chunk):
  89. keywords = future.result()
  90. if isinstance(keywords, list):
  91. all_keywords.extend(keywords)
  92. pbar.update(1)
  93. # Remove duplicates while preserving order (more efficient)
  94. seen = set()
  95. unique_keywords = [
  96. kw for kw in all_keywords if kw not in seen and not seen.add(kw)
  97. ]
  98. return unique_keywords
  99. def _save_llm_keywords(self, keywords_by_criterion):
  100. """Save LLM keywords"""
  101. results = {"method": "llm_analysis", "criteria": {str(n): {"keywords": k} for n, k in keywords_by_criterion.items()}}
  102. self.save_results(results, "llm_keywords.json")
  103. self.logger.info("Saved LLM keywords")