""" Step 2: Create overlapping chunks from messages. """ import pandas as pd from typing import List from pipeline.models.base import PipelineStep from pipeline.common_defs import Chunk, Message class ChunkCreator(PipelineStep): """Create overlapping chunks from messages""" def __init__(self, chunk_size: int = 20, overlap: int = 5, output_dir: str = './pipeline_output'): super().__init__(output_dir) self.chunk_size = chunk_size self.overlap = overlap def execute(self, df: pd.DataFrame) -> List[Chunk]: """ Create overlapping chunks from DataFrame. Args: df: DataFrame with message data Returns: List of Chunk objects """ self.logger.info(f"Creating chunks (size={self.chunk_size}, overlap={self.overlap})...") chunks = [] total = len(df) step = self.chunk_size - self.overlap for i in range(0, total, step): chunk_df = df.iloc[i:i+self.chunk_size] if len(chunk_df) == 0: break # Create messages list messages = [] for _, row in chunk_df.iterrows(): msg = Message( line_number=int(row['line_number']), timestamp=str(row.get('timestamp', '')), sender=str(row.get('sender', '')), message=str(row.get('message', '')), message_normalized=str(row.get('message_normalized', '')) ) messages.append(msg) # Create chunk chunk = Chunk( chunk_id=len(chunks), start_line=int(chunk_df['line_number'].iloc[0]), end_line=int(chunk_df['line_number'].iloc[-1]), messages=messages, combined_text=' '.join(chunk_df['message_normalized'].fillna('')), timestamp_start=str(chunk_df['timestamp'].iloc[0]), timestamp_end=str(chunk_df['timestamp'].iloc[-1]) ) chunks.append(chunk) self.logger.info(f"Created {len(chunks):,} chunks") # Save chunks self._save_chunks(chunks) return chunks def _save_chunks(self, chunks: List[Chunk]): """Save chunks to JSON""" chunks_data = [] for chunk in chunks: chunk_dict = { 'chunk_id': chunk.chunk_id, 'start_line': chunk.start_line, 'end_line': chunk.end_line, 'combined_text': chunk.combined_text, 'timestamp_start': chunk.timestamp_start, 'timestamp_end': chunk.timestamp_end, 'num_messages': len(chunk.messages) } chunks_data.append(chunk_dict) self.save_results(chunks_data, 'chunks.json') if __name__ == "__main__": # Example usage import pandas as pd df = pd.read_csv('pipeline_output/preprocessed_messages.csv') creator = ChunkCreator(chunk_size=20, overlap=5) chunks = creator.execute(df) print(f"Created {len(chunks)} chunks")