Source code for DashAI.back.job.explorer_job

import logging
import os
import pathlib

from beartype.typing import Type
from kink import inject
from sqlalchemy import exc
from sqlalchemy.orm import sessionmaker

from DashAI.back.dataloaders.classes.dashai_dataset import load_dataset
from DashAI.back.dependencies.database.models import (
    Explorer,
    Notebook,
)
from DashAI.back.exploration.base_explorer import BaseExplorer
from DashAI.back.job.base_job import BaseJob, JobError

logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger(__name__)


[docs] class ExplorerJob(BaseJob): """ExplorerJob class to launch explorations.""" @inject def set_status_as_delivered( self, session_factory: sessionmaker = lambda di: di["session_factory"] ) -> None: """Set the status of the explorer as delivered.""" explorer_id: int = self.kwargs["explorer_id"] with session_factory() as db: explorer: Explorer = db.query(Explorer).get(explorer_id) if explorer is None: raise JobError(f"Explorer with id {explorer_id} not found.") try: explorer.set_status_as_delivered() db.commit() except exc.SQLAlchemyError as e: log.exception(e) raise JobError( "Error while setting the status of the explorer as delivered." ) from e @inject def set_status_as_error( self, session_factory: sessionmaker = lambda di: di["session_factory"] ) -> None: """Set the status of the explorer as error.""" explorer_id: int = self.kwargs.get("explorer_id") if explorer_id is None: return with session_factory() as db: try: explorer: Explorer = db.query(Explorer).get(explorer_id) if explorer: explorer.set_status_as_error() db.commit() except exc.SQLAlchemyError as e: log.exception(e) @inject def get_job_name(self) -> str: """Get a descriptive name for the job.""" explorer_id = self.kwargs.get("explorer_id") if not explorer_id: return "Exploration" from kink import di session_factory = di["session_factory"] try: with session_factory() as db: explorer: Explorer = db.query(Explorer).get(explorer_id) if explorer and explorer.name: return f"Explore: {explorer.name}" if explorer and explorer.exploration_type: return f"Explore: {explorer.exploration_type}" except Exception: pass return f"Exploration ({explorer_id})" @inject def run( self, ) -> None: from kink import di component_registry = di["component_registry"] session_factory = di["session_factory"] config = di["config"] explorer_id: int = self.kwargs["explorer_id"] with session_factory() as db: # Load the explorer information try: explorer_info: Explorer = db.query(Explorer).get(explorer_id) if explorer_info is None: raise JobError(f"Explorer with id {explorer_id} not found.") explorer_info.set_status_as_started() explorer_info.huey_id = self.kwargs.get("huey_id", None) db.commit() except exc.SQLAlchemyError as e: log.exception(e) raise JobError("Error while loading the explorer info.") from e # Load the notebook information try: notebook_info: Notebook = db.query(Notebook).get( explorer_info.notebook_id ) if notebook_info is None: raise JobError( f"Notebook with id {explorer_info.notebook_id} not found." ) except exc.SQLAlchemyError as e: log.exception(e) explorer_info.set_status_as_error() db.commit() raise JobError("Error while loading the notebook info.") from e # Load the dataset from the notebook try: loaded_dataset = load_dataset(f"{notebook_info.file_path}/dataset") except Exception as e: log.exception(e) explorer_info.set_status_as_error() db.commit() raise JobError( f"Can not load dataset from path {notebook_info.file_path}", ) from e # obtain the explorer component from the registry try: explorer_component_class: Type[BaseExplorer] = component_registry[ explorer_info.exploration_type ]["class"] except KeyError as e: log.exception(e) explorer_info.set_status_as_error() db.commit() raise JobError( ( f"Explorer {explorer_info.exploration_type} " "not found in the registry." ) ) from e # Instance the explorer (the explorer handles its validation) try: explorer_instance = explorer_component_class(**explorer_info.parameters) assert isinstance(explorer_instance, BaseExplorer) except Exception as e: log.exception(e) explorer_info.set_status_as_error() db.commit() raise JobError( f"Error instancing the explorer {explorer_info.exploration_type}." ) from e # prepare the dataset try: prepared_dataset = explorer_instance.prepare_dataset( loaded_dataset, explorer_info.columns ) except Exception as e: log.exception(e) explorer_info.set_status_as_error() db.commit() raise JobError( ( "Error preparing the dataset for the exploration " f"{explorer_info.exploration_type}." ) ) from e # Launch the exploration try: result = explorer_instance.launch_exploration( prepared_dataset, explorer_info ) except Exception as e: log.exception(e) explorer_info.set_status_as_error() db.commit() raise JobError( f"Error launching the exploration {explorer_info.exploration_type}." ) from e # Save the result try: # save in the notebook folder save_path = pathlib.Path( os.path.join( config["NOTEBOOK_PATH"], (f"{notebook_info.id}"), ) ) if not save_path.exists(): save_path.mkdir(parents=True) save_path = explorer_instance.save_notebook( notebook_info, explorer_info, save_path, result ) if isinstance(save_path, str): save_path = pathlib.Path(save_path) if not isinstance(save_path, pathlib.Path): raise JobError( ( f"Error while saving the exploration" f" {explorer_info.exploration_type}" f", save path is not a pathlib.Path." ) ) # Update the explorer info explorer_info.exploration_path = save_path.as_posix() explorer_info.set_status_as_finished() db.commit() except Exception as e: log.exception(e) explorer_info.set_status_as_error() db.commit() raise JobError( ( f"Error while saving the exploration " f"{explorer_info.exploration_type}." ) ) from e