import argparse import os import sys import traceback import optuna import train # SETTINGS RANDOM_SEED = 42 SAMPLER = optuna.samplers.TPESampler PRUNER = optuna.pruners.HyperbandPruner MIN_NUMBER_EPOCHS = 1 MAX_NUMBER_EPOCHS = 5 __INITIAL_TRIAL = { 'loss': 'mse', 'optimizer': 'adam', 'pow_batch_size': 6, # batch_size: 64 'lr': 1.e-5 } # Return the task id. def get_task_id() -> str: if 'SLURM_PROCID' in os.environ: return os.environ['SLURM_PROCID'] else: return 'NO_ID' # Configure the optimizers. def create_optuna_conf(journal_file_path: str) -> tuple: storage = optuna.storages.JournalStorage(optuna.storages.JournalFileStorage(journal_file_path)) sampler = SAMPLER(seed=RANDOM_SEED, multivariate=True) pruner = PRUNER(min_resource=MIN_NUMBER_EPOCHS, max_resource=MAX_NUMBER_EPOCHS) return storage, sampler, pruner # Create the Optuna HPO study: # Usage example: python -c "from optuna_bootstrap import create_study ; create_study()" 'optuna_journal.log' 'study_name' def create_study() -> None: journal_file_path = sys.argv[1] study_name = sys.argv[2] storage, sampler, pruner = create_optuna_conf(journal_file_path) print(f"> creating optuna journal file '{journal_file_path}'") study = optuna.study.create_study(study_name=study_name, storage=storage, load_if_exists=False, sampler=sampler, direction='minimize', pruner=pruner) # Set an initial trial based on best hyperparameter values already found. study.enqueue_trial(__INITIAL_TRIAL) print('> Done') # Define the space values for each hyperparameter to be optimized and fetch their value for each trial. def get_training_options(trial: optuna.trial.Trial): options = dict() options['loss'] = trial.suggest_categorical(name='loss', choices=['mae', 'mse']) options['optimizer'] = trial.suggest_categorical(name='optimizer', choices=['sgd', 'adam', 'adamw']) options['batch_size'] = int(pow(2, trial.suggest_int(name='pow_batch_size', low=4, high=10, step=1))) options['epochs'] = MAX_NUMBER_EPOCHS options['lr'] = trial.suggest_float(name='lr', low=1e-6, high=1., log=True) return options # Define the HPO objective. def objective(trial: optuna.trial.Trial) -> float: training_options = get_training_options(trial) print(f"> task #{get_task_id()} starting trial #{trial.number} with the following settings:\n{training_options}") metric_value = train.train(argparse.Namespace(**training_options), trial) return metric_value # Run the optimization. # Running this script several times at the same time, enables optimization to be parallelized. # i.e., each script own it's unique set of hyperparameter values: trial. # Usage example: python optuna_bootstrap.py 'optuna_journal.log' 'study_name' 3600 # Command line arguments: # - optuna journal file path # - study name # - study duration (in seconds) def main() -> int: journal_file_path = sys.argv[1] study_name = sys.argv[2] total_optimization_time = int(sys.argv[3]) # Unit: seconds. storage, sampler, pruner = create_optuna_conf(journal_file_path) study = optuna.study.load_study(study_name=study_name, storage=storage, sampler=sampler, pruner=pruner) try: study.optimize(func=objective, timeout=total_optimization_time) print(f"> task #{get_task_id()} ends") exit_code = train.SUCCESS_CODE except Exception as e: print(f"> [ERROR] task #{get_task_id()}: {str(e)}") traceback.print_exception(type(e), e, e.__traceback__) exit_code = train.FAILED_CODE return exit_code if __name__ == '__main__': main_exit_code = main() exit(main_exit_code)