""" Step 1: Load and preprocess Signal CSV data. """ import pandas as pd from typing import List from pipeline.models.base import PipelineStep from pipeline.common_defs import Message from pipeline.utils.text_utils import normalize_text class DataLoader(PipelineStep): """Load and preprocess Signal chat CSV""" def __init__(self, csv_path: str, output_dir: str = './pipeline_output'): super().__init__(output_dir) self.csv_path = csv_path def execute(self) -> pd.DataFrame: """ Load CSV and preprocess messages. Returns: DataFrame with preprocessed messages """ self.logger.info(f"Loading Signal chat CSV: {self.csv_path}") # Load CSV df = pd.read_csv(self.csv_path) df.columns = df.columns.str.lower().str.strip() # Add line numbers df['line_number'] = range(1, len(df) + 1) # Fill missing messages df['message'] = df['message'].fillna('') # Normalize text self.logger.info("Normalizing text...") df['message_normalized'] = df['message'].apply(normalize_text) self.logger.info(f"Loaded {len(df):,} messages") # Save preprocessed data output_file = 'preprocessed_messages.csv' df.to_csv(self.output_dir / output_file, index=False) self.logger.info(f"Saved preprocessed data to: {output_file}") return df def create_message_objects(self, df: pd.DataFrame) -> List[Message]: """ Convert DataFrame rows to Message objects. Args: df: DataFrame with message data Returns: List of Message objects """ messages = [] for _, row in df.iterrows(): msg = Message( line_number=int(row['line_number']), timestamp=str(row.get('timestamp', '')), sender=str(row.get('sender', '')), message=str(row.get('message', '')), message_normalized=str(row.get('message_normalized', '')) ) messages.append(msg) return messages if __name__ == "__main__": # Example usage loader = DataLoader("../_sources/signal_messages.csv") df = loader.execute() print(f"Loaded {len(df)} messages")