| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- """
- Step 1: Load and preprocess Signal CSV data.
- """
- import pandas as pd
- from typing import List
- from pipeline.models.base import PipelineStep
- from pipeline.common_defs import Message
- from pipeline.utils.text_utils import normalize_text
- class DataLoader(PipelineStep):
- """Load and preprocess Signal chat CSV"""
-
- def __init__(self, csv_path: str, output_dir: str = './pipeline_output'):
- super().__init__(output_dir)
- self.csv_path = csv_path
-
- def execute(self) -> pd.DataFrame:
- """
- Load CSV and preprocess messages.
-
- Returns:
- DataFrame with preprocessed messages
- """
- self.logger.info(f"Loading Signal chat CSV: {self.csv_path}")
-
- # Load CSV
- df = pd.read_csv(self.csv_path)
- df.columns = df.columns.str.lower().str.strip()
-
- # Add line numbers
- df['line_number'] = range(1, len(df) + 1)
-
- # Fill missing messages
- df['message'] = df['message'].fillna('')
-
- # Normalize text
- self.logger.info("Normalizing text...")
- df['message_normalized'] = df['message'].apply(normalize_text)
-
- self.logger.info(f"Loaded {len(df):,} messages")
-
- # Save preprocessed data
- output_file = 'preprocessed_messages.csv'
- df.to_csv(self.output_dir / output_file, index=False)
- self.logger.info(f"Saved preprocessed data to: {output_file}")
-
- return df
-
- def create_message_objects(self, df: pd.DataFrame) -> List[Message]:
- """
- Convert DataFrame rows to Message objects.
-
- Args:
- df: DataFrame with message data
-
- Returns:
- List of Message objects
- """
- messages = []
- for _, row in df.iterrows():
- msg = Message(
- line_number=int(row['line_number']),
- timestamp=str(row.get('timestamp', '')),
- sender=str(row.get('sender', '')),
- message=str(row.get('message', '')),
- message_normalized=str(row.get('message_normalized', ''))
- )
- messages.append(msg)
-
- return messages
- if __name__ == "__main__":
- # Example usage
- loader = DataLoader("../_sources/signal_messages.csv")
- df = loader.execute()
- print(f"Loaded {len(df)} messages")
|