| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- """
- Step 0a (Alternative): LLM-based keyword identification.
- """
- from typing import List, Dict
- import pandas as pd
- import json
- import requests
- from pipeline.models.base import PipelineStep
- from pipeline.common_defs import SUBPOENA_CRITERIA
- MODEL = "hf.co/bartowski/Qwen2.5-14B-Instruct-GGUF:Q4_K_S"
- class LLMKeywordIdentifier(PipelineStep):
- """Identify keywords using LLM analysis"""
- def __init__(self, llm_url: str = "http://localhost:8000",
- sample_size: int = 1000, output_dir: str = "./pipeline_output"):
- super().__init__(output_dir)
- self.llm_url = llm_url
- self.sample_size = sample_size
- def execute(self, df: pd.DataFrame) -> Dict[int, List[str]]:
- """Use LLM to identify relevant keywords"""
- self.logger.info("LLM-BASED KEYWORD IDENTIFICATION")
- sample_df = df.sample(n=min(self.sample_size, len(df)), random_state=42)
- keywords_by_criterion = {}
- for num, desc in SUBPOENA_CRITERIA.items():
- keywords = self._identify_keywords_for_criterion(sample_df, num, desc)
- keywords_by_criterion[num] = keywords
- self.logger.info(f"Criterion {num}: {len(keywords)} keywords")
- self._save_llm_keywords(keywords_by_criterion)
- return keywords_by_criterion
- def _identify_keywords_for_criterion(self, df, num, desc):
- """Use LLM to identify keywords"""
- all_keywords = []
- # Loop through df in chunks of 100 rows
- for i in range(0, len(df), 100):
- chunk = df.iloc[i : i + 100]
- messages_sample = "\n".join(chunk["message"].fillna("").tolist())
- prompt = f"Identify 30-50 keywords for: {desc}\n\nMessages:\n{messages_sample}\n\nJSON:"
- try:
- response = requests.post(
- f"{self.llm_url}/v1/chat/completions",
- json={"prompt": prompt, "max_tokens": 1000, "model": MODEL},
- timeout=120,
- )
- if response.status_code == 200:
- text = response.json()["choices"][0]["text"]
- parsed = json.loads(text)
- keywords = parsed.get("keywords", [])
- all_keywords.extend(keywords)
- except Exception as e:
- self.logger.error(f"Error processing chunk {i//100 + 1}: {e}")
- # Remove duplicates while preserving order
- seen = set()
- unique_keywords = []
- for keyword in all_keywords:
- if keyword not in seen:
- seen.add(keyword)
- unique_keywords.append(keyword)
- return unique_keywords
- def _save_llm_keywords(self, keywords_by_criterion):
- """Save LLM keywords"""
- results = {"method": "llm_analysis", "criteria": {str(n): {"keywords": k} for n, k in keywords_by_criterion.items()}}
- self.save_results(results, "llm_keywords.json")
- self.logger.info("Saved LLM keywords")
|