""" Base classes for pipeline components. """ from abc import ABC, abstractmethod from pathlib import Path from typing import Any, Dict, List import logging class PipelineStep(ABC): """Abstract base class for pipeline steps""" def __init__(self, output_dir: str = './pipeline_output'): self.output_dir = Path(output_dir) self.output_dir.mkdir(exist_ok=True) self.logger = self._setup_logger() def _setup_logger(self) -> logging.Logger: """Setup logger for this step""" logger = logging.getLogger(self.__class__.__name__) logger.setLevel(logging.INFO) if not logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) logger.addHandler(handler) return logger @abstractmethod def execute(self, *args, **kwargs) -> Any: """Execute this pipeline step""" pass def save_results(self, data: Any, filename: str): """Save results to file""" filepath = self.output_dir / filename if isinstance(data, (dict, list)): import json with open(filepath, 'w') as f: json.dump(data, f, indent=2) else: with open(filepath, 'w') as f: f.write(str(data)) self.logger.info(f"Saved results to: {filepath}") return filepath def load_results(self, filename: str) -> Any: """Load results from file""" filepath = self.output_dir / filename if not filepath.exists(): raise FileNotFoundError(f"File not found: {filepath}") if filepath.suffix == '.json': import json with open(filepath, 'r') as f: return json.load(f) else: with open(filepath, 'r') as f: return f.read()