Expand Up @@ -25,7 +25,7 @@ import pandas as pd from smac.runhistory.runhistory import RunHistory from smac.runhistory.runhistory importDataOrigin, RunHistory from smac.stats.stats import Stats from smac.tae import StatusType Expand Down Expand Up @@ -172,11 +172,10 @@ def __init__( self.search_space: Optional[ConfigurationSpace] = None self._metric: Optional[autoPyTorchMetric] = None self._logger: Optional[PicklableClientLogger] = None self.run_history:Optional[ RunHistory] =None self.run_history: RunHistory =RunHistory() self.trajectory: Optional[List] = None self.dataset_name: Optional[str] = None self.cv_models_: Dict = {} self.num_run: int = 1 self.experiment_task_name: str = 'runSearch' # By default try to use the TCP logging port or get a new port Expand Down Expand Up @@ -492,6 +491,9 @@ def _do_dummy_prediction(self) -> None: assert self._metric is not None assert self._logger is not None # For dummy estimator, we always expect the num_run to be 1 num_run = 1 self._logger.info("Starting to create dummy predictions.") memory_limit = self._memory_limit Expand All @@ -511,14 +513,14 @@ def _do_dummy_prediction(self) -> None: logger_port=self._logger_port, cost_for_crash=get_cost_of_crash(self._metric), abort_on_first_run_crash=False, initial_num_run=self. num_run, initial_num_run=num_run, stats=stats, memory_limit=memory_limit, disable_file_output=True if len(self._disable_file_output) > 0 else False, all_supported_metrics=self._all_supported_metrics ) status, cost, runtime, additional_info = ta.run(self. num_run, cutoff=self._total_walltime_limit) status, cost, runtime, additional_info = ta.run(num_run, cutoff=self._total_walltime_limit) if status == StatusType.SUCCESS: self._logger.info("Finished creating dummy predictions.") else: Expand Down Expand Up @@ -560,20 +562,12 @@ def _do_traditional_prediction(self, time_left: int) -> None: This method currently only supports classification. Args: num_run: (int) An identifier to indicate the current machine learning algorithm being processed time_left: (int) Hard limit on how many machine learning algorithms can be fit. Depending on how fast a traditional machine learning algorithm trains, it will allow multiple models to be fitted. func_eval_time_limit_secs: (int) Maximum training time each algorithm is allowed to take, during training Returns: num_run: (int) The incremented identifier index. This depends on how many machine learning models were fitted. """ # Mypy Checkings -- Traditional prediction is only called for search Expand All @@ -582,16 +576,19 @@ def _do_traditional_prediction(self, time_left: int) -> None: assert self._logger is not None assert self._dask_client is not None self.num_run += 1 self._logger.info("Starting to create traditional classifier predictions.") # Initialise run history for the traditional classifiers run_history = RunHistory() memory_limit = self._memory_limit if memory_limit is not None: memory_limit = int(math.ceil(memory_limit)) available_classifiers = get_available_classifiers() dask_futures = [] total_number_classifiers = len(available_classifiers) + self.num_run for n_r, classifier in enumerate(available_classifiers, start=self.num_run ): total_number_classifiers = len(available_classifiers) for n_r, classifier in enumerate(available_classifiers): # Only launch a task if there is time start_time = time.time() Expand All @@ -610,7 +607,7 @@ def _do_traditional_prediction(self, time_left: int) -> None: logger_port=self._logger_port, cost_for_crash=get_cost_of_crash(self._metric), abort_on_first_run_crash=False, initial_num_run=n_r , initial_num_run=self._backend.get_next_num_run() , stats=stats, memory_limit=memory_limit, disable_file_output=True if len(self._disable_file_output) > 0 else False, Expand All @@ -624,9 +621,6 @@ def _do_traditional_prediction(self, time_left: int) -> None: ) ]) # Increment the launched job index self.num_run = n_r # When managing time, we need to take into account the allocated time resources, # which are dependent on the number of cores. 'dask_futures' is a proxy to the number # of workers /n_jobs that we have, in that if there are 4 cores allocated, we can run at most Expand All @@ -653,6 +647,11 @@ def _do_traditional_prediction(self, time_left: int) -> None: if status == StatusType.SUCCESS: self._logger.info( f"Fitting {cls} took {runtime}s, performance:{cost}/{additional_info}") configuration = additional_info['pipeline_configuration'] origin = additional_info['configuration_origin'] run_history.add(config=configuration, cost=cost, time=runtime, status=status, seed=self.seed, origin=origin) else: if additional_info.get('exitcode') == -6: self._logger.error( Expand All @@ -679,6 +678,13 @@ def _do_traditional_prediction(self, time_left: int) -> None: "Please consider increasing the run time to further improve performance.") break self._logger.debug("Run history traditional: {}".format(run_history)) # add run history of traditional to api run history self.run_history.update(run_history, DataOrigin.EXTERNAL_SAME_INSTANCES) run_history.save_json(os.path.join(self._backend.internals_directory, 'traditional_run_history.json'), save_external=True) return def _run_dummy_predictions(self) -> None: dummy_task_name = 'runDummy' self._stopwatch.start_task(dummy_task_name) Expand Down Expand Up @@ -727,13 +733,17 @@ def _run_ensemble(self, dataset: BaseDataset, optimize_metric: str, dataset_name=dataset.dataset_name, output_type=STRING_TO_OUTPUT_TYPES[dataset.output_type], task_type=STRING_TO_TASK_TYPES[self.task_type], metrics=[self._metric], opt_metric=optimize_metric, metrics=[self._metric], opt_metric=optimize_metric, ensemble_size=self.ensemble_size, ensemble_nbest=self.ensemble_nbest, max_models_on_disc=self.max_models_on_disc, seed=self.seed, max_iterations=None, read_at_most=sys.maxsize, ensemble_memory_limit=self._memory_limit, seed=self.seed, max_iterations=None, random_state=self.seed,read_at_most=sys.maxsize, precision=precision, random_state=self.seed, precision=precision, logger_port=self._logger_port ) self._stopwatch.stop_task(ensemble_task_name) Expand All @@ -756,8 +766,9 @@ def _start_smac(self, proc_smac: AutoMLSMBO) -> None: assert self._logger is not None try: self. run_history, self.trajectory, budget_type = \ run_history, self.trajectory, budget_type = \ proc_smac.run_smbo() self.run_history.update(run_history, DataOrigin.INTERNAL) trajectory_filename = os.path.join( self._backend.get_smac_output_directory_for_run(self.seed), 'trajectory.json') Expand Down Expand Up @@ -802,8 +813,10 @@ def _run_smac(self, dataset: BaseDataset, proc_ensemble: EnsembleBuilderManager, func_eval_time_limit_secs=self._func_eval_time_limit_secs, dask_client=self._dask_client, memory_limit=self._memory_limit, n_jobs=self.n_jobs, watcher=self._stopwatch, metric=self._metric, seed=self.seed, n_jobs=self.n_jobs, watcher=self._stopwatch, metric=self._metric, seed=self.seed, include=self.include_components, exclude=self.exclude_components, disable_file_output=self._disable_file_output, Expand All @@ -813,7 +826,7 @@ def _run_smac(self, dataset: BaseDataset, proc_ensemble: EnsembleBuilderManager, pipeline_config={**self.pipeline_options, **budget_config}, ensemble_callback=proc_ensemble, logger_port=self._logger_port, start_num_run=self.num_run , start_num_run=self._backend.get_next_num_run(peek=True) , search_space_updates=self.search_space_updates ) Expand Down Expand Up @@ -930,18 +943,23 @@ def _finish_experiment(self, proc_ensemble: EnsembleBuilderManager, self._logger.info("Starting to clean up the logger") self._clean_logger() def _search(self, optimize_metric: str, dataset: BaseDataset, budget_type: Optional[str] = None, budget: Optional[float] = None, total_walltime_limit: int = 100, func_eval_time_limit_secs: Optional[int] = None, enable_traditional_pipeline: bool = True, memory_limit: Optional[int] = 4096, smac_scenario_args: Optional[Dict[str, Any]] = None, get_smac_object_callback: Optional[Callable] = None, all_supported_metrics: bool = True, precision: int = 32, disable_file_output: List = [], load_models: bool = True) -> 'BaseTask': def _search( self, optimize_metric: str, dataset: BaseDataset, budget_type: Optional[str] = None, budget: Optional[float] = None, total_walltime_limit: int = 100, func_eval_time_limit_secs: Optional[int] = None, enable_traditional_pipeline: bool = True, memory_limit: Optional[int] = 4096, smac_scenario_args: Optional[Dict[str, Any]] = None, get_smac_object_callback: Optional[Callable] = None, all_supported_metrics: bool = True, precision: int = 32, disable_file_output: List = [], load_models: bool = True ) -> 'BaseTask': """ Search for the best pipeline configuration for the given dataset. Expand Down Expand Up @@ -1033,7 +1051,6 @@ def _search(self, optimize_metric: str, total_walltime_limit=total_walltime_limit) self._adapt_time_resource_allocation() self.num_run = 1 self._run_dummy_predictions() if enable_traditional_pipeline: Expand Down Expand Up @@ -1098,7 +1115,7 @@ def refit( 'train_indices': dataset.splits[split_id][0], 'val_indices': dataset.splits[split_id][1], 'split_id': split_id, 'num_run':0 'num_run':self._backend.get_next_num_run(), }) X.update({**self.pipeline_options, **budget_config}) if self.models_ is None or len(self.models_) == 0 or self.ensemble_ is None: Expand Down Expand Up @@ -1175,7 +1192,7 @@ def fit(self, 'train_indices': dataset.splits[split_id][0], 'val_indices': dataset.splits[split_id][1], 'split_id': split_id, 'num_run':0 'num_run':self._backend.get_next_num_run(), }) X.update({**self.pipeline_options, **budget_config}) Expand Down