| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- """
- Step 2: Create overlapping chunks from messages.
- """
- import pandas as pd
- from typing import List
- from pipeline.models.base import PipelineStep
- from pipeline.common_defs import Chunk, Message
- class ChunkCreator(PipelineStep):
- """Create overlapping chunks from messages"""
-
- def __init__(self, chunk_size: int = 20, overlap: int = 5,
- output_dir: str = './pipeline_output'):
- super().__init__(output_dir)
- self.chunk_size = chunk_size
- self.overlap = overlap
-
- def execute(self, df: pd.DataFrame) -> List[Chunk]:
- """
- Create overlapping chunks from DataFrame.
-
- Args:
- df: DataFrame with message data
-
- Returns:
- List of Chunk objects
- """
- self.logger.info(f"Creating chunks (size={self.chunk_size}, overlap={self.overlap})...")
-
- chunks = []
- total = len(df)
- step = self.chunk_size - self.overlap
-
- for i in range(0, total, step):
- chunk_df = df.iloc[i:i+self.chunk_size]
- if len(chunk_df) == 0:
- break
-
- # Create messages list
- messages = []
- for _, row in chunk_df.iterrows():
- msg = Message(
- line_number=int(row['line_number']),
- timestamp=str(row.get('timestamp', '')),
- sender=str(row.get('sender', '')),
- message=str(row.get('message', '')),
- message_normalized=str(row.get('message_normalized', ''))
- )
- messages.append(msg)
-
- # Create chunk
- chunk = Chunk(
- chunk_id=len(chunks),
- start_line=int(chunk_df['line_number'].iloc[0]),
- end_line=int(chunk_df['line_number'].iloc[-1]),
- messages=messages,
- combined_text=' '.join(chunk_df['message_normalized'].fillna('')),
- timestamp_start=str(chunk_df['timestamp'].iloc[0]),
- timestamp_end=str(chunk_df['timestamp'].iloc[-1])
- )
- chunks.append(chunk)
-
- self.logger.info(f"Created {len(chunks):,} chunks")
-
- # Save chunks
- self._save_chunks(chunks)
-
- return chunks
-
- def _save_chunks(self, chunks: List[Chunk]):
- """Save chunks to JSON"""
- chunks_data = []
- for chunk in chunks:
- chunk_dict = {
- 'chunk_id': chunk.chunk_id,
- 'start_line': chunk.start_line,
- 'end_line': chunk.end_line,
- 'combined_text': chunk.combined_text,
- 'timestamp_start': chunk.timestamp_start,
- 'timestamp_end': chunk.timestamp_end,
- 'num_messages': len(chunk.messages)
- }
- chunks_data.append(chunk_dict)
-
- self.save_results(chunks_data, 'chunks.json')
- if __name__ == "__main__":
- # Example usage
- import pandas as pd
- df = pd.read_csv('pipeline_output/preprocessed_messages.csv')
-
- creator = ChunkCreator(chunk_size=20, overlap=5)
- chunks = creator.execute(df)
- print(f"Created {len(chunks)} chunks")
|