step2_create_chunks.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. """
  2. Step 2: Create overlapping chunks from messages.
  3. """
  4. import pandas as pd
  5. from typing import List
  6. from pipeline.models.base import PipelineStep
  7. from pipeline.common_defs import Chunk, Message
  8. class ChunkCreator(PipelineStep):
  9. """Create overlapping chunks from messages"""
  10. def __init__(self, chunk_size: int = 20, overlap: int = 5,
  11. output_dir: str = './pipeline_output'):
  12. super().__init__(output_dir)
  13. self.chunk_size = chunk_size
  14. self.overlap = overlap
  15. def execute(self, df: pd.DataFrame) -> List[Chunk]:
  16. """
  17. Create overlapping chunks from DataFrame.
  18. Args:
  19. df: DataFrame with message data
  20. Returns:
  21. List of Chunk objects
  22. """
  23. self.logger.info(f"Creating chunks (size={self.chunk_size}, overlap={self.overlap})...")
  24. chunks = []
  25. total = len(df)
  26. step = self.chunk_size - self.overlap
  27. for i in range(0, total, step):
  28. chunk_df = df.iloc[i:i+self.chunk_size]
  29. if len(chunk_df) == 0:
  30. break
  31. # Create messages list
  32. messages = []
  33. for _, row in chunk_df.iterrows():
  34. msg = Message(
  35. line_number=int(row['line_number']),
  36. timestamp=str(row.get('timestamp', '')),
  37. sender=str(row.get('sender', '')),
  38. message=str(row.get('message', '')),
  39. message_normalized=str(row.get('message_normalized', ''))
  40. )
  41. messages.append(msg)
  42. # Create chunk
  43. chunk = Chunk(
  44. chunk_id=len(chunks),
  45. start_line=int(chunk_df['line_number'].iloc[0]),
  46. end_line=int(chunk_df['line_number'].iloc[-1]),
  47. messages=messages,
  48. combined_text=' '.join(chunk_df['message_normalized'].fillna('')),
  49. timestamp_start=str(chunk_df['timestamp'].iloc[0]),
  50. timestamp_end=str(chunk_df['timestamp'].iloc[-1])
  51. )
  52. chunks.append(chunk)
  53. self.logger.info(f"Created {len(chunks):,} chunks")
  54. # Save chunks
  55. self._save_chunks(chunks)
  56. return chunks
  57. def _save_chunks(self, chunks: List[Chunk]):
  58. """Save chunks to JSON"""
  59. chunks_data = []
  60. for chunk in chunks:
  61. chunk_dict = {
  62. 'chunk_id': chunk.chunk_id,
  63. 'start_line': chunk.start_line,
  64. 'end_line': chunk.end_line,
  65. 'combined_text': chunk.combined_text,
  66. 'timestamp_start': chunk.timestamp_start,
  67. 'timestamp_end': chunk.timestamp_end,
  68. 'num_messages': len(chunk.messages)
  69. }
  70. chunks_data.append(chunk_dict)
  71. self.save_results(chunks_data, 'chunks.json')
  72. if __name__ == "__main__":
  73. # Example usage
  74. import pandas as pd
  75. df = pd.read_csv('pipeline_output/preprocessed_messages.csv')
  76. creator = ChunkCreator(chunk_size=20, overlap=5)
  77. chunks = creator.execute(df)
  78. print(f"Created {len(chunks)} chunks")