Source code for DashAI.back.job.explorer_job

import logging
import os
import pathlib

from beartype.typing import Any, Dict, Type
from kink import inject
from sqlalchemy import exc
from sqlalchemy.orm import Session

from DashAI.back.dataloaders.classes.dashai_dataset import load_dataset
from DashAI.back.dependencies.database.models import Dataset, Exploration, Explorer
from DashAI.back.dependencies.registry import ComponentRegistry
from DashAI.back.exploration.base_explorer import BaseExplorer
from DashAI.back.job.base_job import BaseJob, JobError

logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger(__name__)


[docs]class ExplorerJob(BaseJob): """ExplorerJob class to launch explorations.""" def set_status_as_delivered(self) -> None: """Set the status of the explorer as delivered.""" explorer_id: int = self.kwargs["explorer_id"] db: Session = self.kwargs["db"] explorer: Explorer = db.query(Explorer).get(explorer_id) if explorer is None: raise JobError(f"Explorer with id {explorer_id} not found.") try: explorer.set_status_as_delivered() db.commit() except exc.SQLAlchemyError as e: log.exception(e) raise JobError( "Error while setting the status of the explorer as delivered." ) from e @inject def run( self, component_registry: ComponentRegistry = lambda di: di["component_registry"], config: Dict[str, Any] = lambda di: di["config"], ) -> None: explorer_id: int = self.kwargs["explorer_id"] db: Session = self.kwargs["db"] # Load the explorer information try: explorer_info: Explorer = db.query(Explorer).get(explorer_id) if explorer_info is None: raise JobError(f"Explorer with id {explorer_id} not found.") explorer_info.set_status_as_started() db.commit() except exc.SQLAlchemyError as e: log.exception(e) raise JobError("Error while loading the explorer info.") from e # Load the exploration information try: exploration_info: Exploration = db.query(Exploration).get( explorer_info.exploration_id ) if exploration_info is None: raise JobError( f"Exploration with id {explorer_info.exploration_id} not found." ) except exc.SQLAlchemyError as e: log.exception(e) explorer_info.set_status_as_error() db.commit() raise JobError("Error while loading the exploration info.") from e # Load the dataset information try: dataset_info: Dataset = db.query(Dataset).get(exploration_info.dataset_id) if dataset_info is None: raise JobError( f"Dataset with id {exploration_info.dataset_id} not found." ) except exc.SQLAlchemyError as e: log.exception(e) explorer_info.set_status_as_error() db.commit() raise JobError("Error while loading the dataset info.") from e # Load the dataset try: loaded_dataset = load_dataset(f"{dataset_info.file_path}/dataset") except Exception as e: log.exception(e) explorer_info.set_status_as_error() db.commit() raise JobError( f"Can not load dataset from path {dataset_info.file_path}", ) from e # obtain the explorer component from the registry try: explorer_component_class: Type[BaseExplorer] = component_registry[ explorer_info.exploration_type ]["class"] except KeyError as e: log.exception(e) explorer_info.set_status_as_error() db.commit() raise JobError( f"Explorer {explorer_info.exploration_type} not found in the registry." ) from e # Instance the explorer (the explorer handles its validation) try: explorer_instance = explorer_component_class(**explorer_info.parameters) assert isinstance(explorer_instance, BaseExplorer) except Exception as e: log.exception(e) explorer_info.set_status_as_error() db.commit() raise JobError( f"Error instancing the explorer {explorer_info.exploration_type}." ) from e # prepare the dataset try: prepared_dataset = explorer_instance.prepare_dataset( loaded_dataset, explorer_info.columns ) except Exception as e: log.exception(e) explorer_info.set_status_as_error() db.commit() raise JobError( ( "Error preparing the dataset for the exploration " f"{explorer_info.exploration_type}." ) ) from e # Launch the exploration try: result = explorer_instance.launch_exploration( prepared_dataset, explorer_info ) except Exception as e: log.exception(e) explorer_info.set_status_as_error() db.commit() raise JobError( f"Error launching the exploration {explorer_info.exploration_type}." ) from e # Save the result try: # save in the exploration folder save_path = pathlib.Path( os.path.join( config["EXPLORATIONS_PATH"], (f"{exploration_info.id}"), ) ) if not save_path.exists(): save_path.mkdir(parents=True) save_path = explorer_instance.save_exploration( exploration_info, explorer_info, save_path, result ) if isinstance(save_path, str): save_path = pathlib.Path(save_path) if not isinstance(save_path, pathlib.Path): raise JobError( ( f"Error while saving the exploration" f" {explorer_info.exploration_type}" f", save path is not a pathlib.Path." ) ) # Update the explorer info explorer_info.exploration_path = save_path.as_posix() explorer_info.set_status_as_finished() db.commit() except Exception as e: log.exception(e) explorer_info.set_status_as_error() db.commit() raise JobError( f"Error while saving the exploration {explorer_info.exploration_type}." ) from e