step1_load_data.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. """
  2. Step 1: Load and preprocess Signal CSV data.
  3. """
  4. import pandas as pd
  5. from typing import List
  6. from pipeline.models.base import PipelineStep
  7. from pipeline.common_defs import Message
  8. from pipeline.utils.text_utils import normalize_text
  9. class DataLoader(PipelineStep):
  10. """Load and preprocess Signal chat CSV"""
  11. def __init__(self, csv_path: str, output_dir: str = './pipeline_output'):
  12. super().__init__(output_dir)
  13. self.csv_path = csv_path
  14. def execute(self) -> pd.DataFrame:
  15. """
  16. Load CSV and preprocess messages.
  17. Returns:
  18. DataFrame with preprocessed messages
  19. """
  20. self.logger.info(f"Loading Signal chat CSV: {self.csv_path}")
  21. # Load CSV
  22. df = pd.read_csv(self.csv_path)
  23. df.columns = df.columns.str.lower().str.strip()
  24. # Add line numbers
  25. df['line_number'] = range(1, len(df) + 1)
  26. # Fill missing messages
  27. df['message'] = df['message'].fillna('')
  28. # Normalize text
  29. self.logger.info("Normalizing text...")
  30. df['message_normalized'] = df['message'].apply(normalize_text)
  31. self.logger.info(f"Loaded {len(df):,} messages")
  32. # Save preprocessed data
  33. output_file = 'preprocessed_messages.csv'
  34. df.to_csv(self.output_dir / output_file, index=False)
  35. self.logger.info(f"Saved preprocessed data to: {output_file}")
  36. return df
  37. def create_message_objects(self, df: pd.DataFrame) -> List[Message]:
  38. """
  39. Convert DataFrame rows to Message objects.
  40. Args:
  41. df: DataFrame with message data
  42. Returns:
  43. List of Message objects
  44. """
  45. messages = []
  46. for _, row in df.iterrows():
  47. msg = Message(
  48. line_number=int(row['line_number']),
  49. timestamp=str(row.get('timestamp', '')),
  50. sender=str(row.get('sender', '')),
  51. message=str(row.get('message', '')),
  52. message_normalized=str(row.get('message_normalized', ''))
  53. )
  54. messages.append(msg)
  55. return messages
  56. if __name__ == "__main__":
  57. # Example usage
  58. loader = DataLoader("../_sources/signal_messages.csv")
  59. df = loader.execute()
  60. print(f"Loaded {len(df)} messages")