| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566 |
- """
- Base classes for pipeline components.
- """
- from abc import ABC, abstractmethod
- from pathlib import Path
- from typing import Any, Dict, List
- import logging
- class PipelineStep(ABC):
- """Abstract base class for pipeline steps"""
-
- def __init__(self, output_dir: str = './pipeline_output'):
- self.output_dir = Path(output_dir)
- self.output_dir.mkdir(exist_ok=True)
- self.logger = self._setup_logger()
-
- def _setup_logger(self) -> logging.Logger:
- """Setup logger for this step"""
- logger = logging.getLogger(self.__class__.__name__)
- logger.setLevel(logging.INFO)
-
- if not logger.handlers:
- handler = logging.StreamHandler()
- formatter = logging.Formatter(
- '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
- )
- handler.setFormatter(formatter)
- logger.addHandler(handler)
-
- return logger
-
- @abstractmethod
- def execute(self, *args, **kwargs) -> Any:
- """Execute this pipeline step"""
- pass
-
- def save_results(self, data: Any, filename: str):
- """Save results to file"""
- filepath = self.output_dir / filename
-
- if isinstance(data, (dict, list)):
- import json
- with open(filepath, 'w') as f:
- json.dump(data, f, indent=2)
- else:
- with open(filepath, 'w') as f:
- f.write(str(data))
-
- self.logger.info(f"Saved results to: {filepath}")
- return filepath
-
- def load_results(self, filename: str) -> Any:
- """Load results from file"""
- filepath = self.output_dir / filename
-
- if not filepath.exists():
- raise FileNotFoundError(f"File not found: {filepath}")
-
- if filepath.suffix == '.json':
- import json
- with open(filepath, 'r') as f:
- return json.load(f)
- else:
- with open(filepath, 'r') as f:
- return f.read()
|