base.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. """
  2. Base classes for pipeline components.
  3. """
  4. from abc import ABC, abstractmethod
  5. from pathlib import Path
  6. from typing import Any, Dict, List
  7. import logging
  8. class PipelineStep(ABC):
  9. """Abstract base class for pipeline steps"""
  10. def __init__(self, output_dir: str = './pipeline_output'):
  11. self.output_dir = Path(output_dir)
  12. self.output_dir.mkdir(exist_ok=True)
  13. self.logger = self._setup_logger()
  14. def _setup_logger(self) -> logging.Logger:
  15. """Setup logger for this step"""
  16. logger = logging.getLogger(self.__class__.__name__)
  17. logger.setLevel(logging.INFO)
  18. if not logger.handlers:
  19. handler = logging.StreamHandler()
  20. formatter = logging.Formatter(
  21. '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  22. )
  23. handler.setFormatter(formatter)
  24. logger.addHandler(handler)
  25. return logger
  26. @abstractmethod
  27. def execute(self, *args, **kwargs) -> Any:
  28. """Execute this pipeline step"""
  29. pass
  30. def save_results(self, data: Any, filename: str):
  31. """Save results to file"""
  32. filepath = self.output_dir / filename
  33. if isinstance(data, (dict, list)):
  34. import json
  35. with open(filepath, 'w') as f:
  36. json.dump(data, f, indent=2)
  37. else:
  38. with open(filepath, 'w') as f:
  39. f.write(str(data))
  40. self.logger.info(f"Saved results to: {filepath}")
  41. return filepath
  42. def load_results(self, filename: str) -> Any:
  43. """Load results from file"""
  44. filepath = self.output_dir / filename
  45. if not filepath.exists():
  46. raise FileNotFoundError(f"File not found: {filepath}")
  47. if filepath.suffix == '.json':
  48. import json
  49. with open(filepath, 'r') as f:
  50. return json.load(f)
  51. else:
  52. with open(filepath, 'r') as f:
  53. return f.read()