Source code for ignite_simple.gen_sweep.sweeper

"""Handles performing the model parameter sweep."""

import typing
import ignite_simple.gen_sweep.param_selectors as ps
import ignite_simple.hyperparams as hyperparams
import ignite_simple.dispatcher as disp
import ignite_simple.utils as mutils
import psutil
import os
import pickle
import json
import importlib
import numpy as np
from sortedcontainers import SortedKeyList
from functools import partial
import csv
import logging

logger = logging.getLogger(__name__)

def _post_hparam_sweep(listeners: typing.Iterable[ps.SweepListener],
                       params: tuple, mm_folder: str):
    filen = os.path.join(mm_folder, 'hparams', 'final.json')
    if not os.path.exists(filen):
        logger = logging.getLogger(__name__)
        logger.error('Expected file %s to exist, corresponding with hparams'
                     + ' for params %s, but it does not', params)
        raise FileNotFoundError(filen)

    with open(filen, 'r') as infile:
        final = json.load(infile)

    lr_min = final['lr_start']
    lr_max = final['lr_end']
    bs = final['batch_size']

    for lst in listeners:
        lst: ps.SweepListener
        lst.on_hparams_found(params, lr_min, lr_max, bs)


def _post_trials(listeners: typing.Iterable[ps.SweepListener],
                 params: tuple, mm_folder: str, start_at_trial_ind: int):
    filen = os.path.join(mm_folder, 'results.npz')
    if not os.path.exists(filen):
        logger = logging.getLogger(__name__)
        logger.error('Expected file %s to exist, corresponding with results'
                     + ' for params %s, but it does not', params)
        raise FileNotFoundError(filen)

    with np.load(filen) as infile:
        l_train = infile['final_loss_train'][start_at_trial_ind:]
        p_train = infile['final_perf_train'][start_at_trial_ind:]
        l_val = infile['final_loss_val'][start_at_trial_ind:]
        p_val = infile['final_perf_val'][start_at_trial_ind:]

    for lst in listeners:
        lst: ps.SweepListener
        lst.on_trials_completed(params, p_train, l_train, p_val, l_val)

[docs]class ParamsTask: """Describes a task to call model_manager.train for a particular set of model parameters. This isn't actually a disp.Task, although it can be converted to one. :ivar tuple params: the parameters. this is the instance variable used for equality checks. :ivar int trials: the number of trials to perform. """ def __init__(self, params: tuple, trials: int) -> None: self.params = tuple(params) self.trials = trials def copy(self) -> 'ParamsTask': return ParamsTask(self.params, self.trials)
[docs] def as_task(self, module_name: str, hparams: hyperparams.HyperparameterSettings, folder: str, iden: int, trials: int, sweep_cores: int, listeners: typing.Iterable[ps.SweepListener], ntrials_disp: int) -> disp.Task: """Creates the task object which performs the specified number of trials with this tasks parameters, assuming that the directory structure is as follows: folder/points/ <id>/ <result from model manager> This function assumes that we always perform the hparam sweep in a separate call to the trial sweep for the purposes of handling the listeners. """ md = importlib.import_module(module_name) acc_style = getattr(md, 'accuracy_style') mm_folder = os.path.join(folder, 'points', str(iden)) if trials <= 0: callback = partial(_post_hparam_sweep, listeners, self.params, mm_folder) else: callback = partial(_post_trials, listeners, self.params, mm_folder, ntrials_disp) return disp.Task( 'ignite_simple.model_manager', 'train', ( (module_name, 'model', self.params, dict()), (module_name, 'dataset', tuple(), dict()), (module_name, 'loss', tuple(), dict()), mm_folder, hparams, 'none', 'none', acc_style, max(trials, 0), True, os.path.join(folder, 'points', 'history', str(iden)), sweep_cores if trials <= 0 else trials, True ), dict(), sweep_cores if trials <= 0 else trials, callback )
[docs]class ParamsTaskQueue(disp.TaskQueue, ps.SweepListener): """This task queue acts like a greedy task queue, except it only works for tasks which amount to calling model_manager.train for a particular set of models. The process for selecting tasks is as follows: - If we have a hparameter sweep queued and we have enough cores to do one, do that. - Perform as many trials as possible. Note we can only perform trials on parameters we already know the hyperparameters for :ivar int total_cores: the number of physical cores that are available :ivar int sweep_cores: the number of cores to use for sweeping (not greater than the number of total cores) :ivar str module: the module which we are getting the model/dataset/etc :ivar HyperparameterSettings hparams: strategy for hyperparameters :ivar str folder: the folder containing the points folder :ivar list[SweepListener] listeners: the sweep listeners. contains self. :ivar set[tuple] in_progress: parameters which are currently in progress. :ivar list[ParamsTask] sweeps: the sweeps that need to be performed in an arbitrary order :ivar dict[tuple, int] params_to_sweeps_ind: a lookup that goes from the parameters of tasks to the index in sweeps if a sweep is still necessary. :ivar SortedList[ParamsTask] trials: the trials that need to be performed, in ascending order of the number of trials. :ivar dict[tuple, int] params_to_id: dictionary which converts a given set of parameters to the corresponding unique identifier for that set of parameters. :ivar dict[tuple, int] params_to_ntrials: dictionary which converts a given set of parameters to the corresponding number of trials that have been dispatched for those parameters :ivar int next_id: the next id that should be given out to a set of parameters and then incremented. :ivar dict[tuple, ParamsTask] params_to_task: a lookup that goes from params lists to param tasks, where this only contains tasks which have not yet been dispatched, and does not include tasks which are in sweeps :ivar int _len: number of actual tasks currently in queue :ivar bool expecting_more_trials: True to prevent saying we are out of trials, False otherwise """ def __init__(self, total_cores: int, sweep_cores: int, module: str, hparams: hyperparams.HyperparameterSettings, folder: str, listeners: typing.List[ps.SweepListener]): self.total_cores = total_cores self.sweep_cores = sweep_cores self.module = module self.hparams = hparams self.folder = folder self.listeners = list(listeners) self.listeners.append(self) self.in_progress = set() self.sweeps = list() self.params_to_sweeps_ind = dict() self.trials = SortedKeyList(key=lambda tsk: tsk.trials) self.params_to_id = dict() self.params_to_ntrials = dict() self.next_id = 0 self.params_to_task = dict() self._len = 0 self.expecting_more_trials = False
[docs] def add_task_by_params(self, params: tuple, trials: int) -> None: """Adds a task to this queue based on the parameters which should be swept and the number of trials to perform. Regardless of the value for trials, this will ensure that the hyperparameters for the given model parameters have been found. """ sweep_id = self.params_to_id.get(params) if sweep_id is None: sweep_id = self.next_id self.next_id += 1 self.sweeps.append(ParamsTask(params, 0)) self.params_to_sweeps_ind[params] = len(self.sweeps) - 1 self.params_to_id[params] = sweep_id self.params_to_ntrials[params] = 0 self._len += 1 if trials <= 0: return tsk = self.params_to_task.get(params) if tsk is not None: self.trials.remove(tsk) tsk.trials += trials self.trials.add(tsk) return tsk = ParamsTask(params, trials) self.params_to_task[params] = tsk self.trials.add(tsk) self._len += 1
[docs] def set_total_cores(self, total_cores): self.total_cores = total_cores
[docs] def on_hparams_found(self, values, lr_min, lr_max, batch_size): logger.debug('Found hyperparameters for %s: lr=(%s, %s), bs=%s', values, lr_min, lr_max, batch_size) self.in_progress.remove(values)
[docs] def on_trials_completed(self, values, perfs_train, losses_train, perfs_val, losses_val): logger.info('Completed some trials for %s - mean train/val perf = %s / %s', values, perfs_train.mean(), perfs_val.mean()) logger.debug('%s - perf: %s, loss: %s, val - perf: %s, loss: %s', values, perfs_train, losses_train, perfs_val, losses_val) self.in_progress.remove(values) self.params_to_ntrials[values] += len(losses_train)
def _get_next_task(self, cores): # Pseudocode: # if we have enough cores to sweep and a sweep available then # pop from the end of sweeps (so only one index changes) # remove from params_to_sweeps_ind # add to in_progress # return # # pop the item with the most number of trials from trials, ignoring # ones which are already in progress or haven't been sweep yet # # if we cannot finish this then # build the disp.Task which does the right # of trials # update the remaining number of trials for this trial # add to in_progress # return the built disp.Task # build the disp.Task which finishes the queued trials for this set # remove from params_to_task # add to in_progress # return built disp.Task if cores <= 0: return None if cores >= self.sweep_cores and self.sweeps: swp: ParamsTask = self.sweeps.pop() del self.params_to_sweeps_ind[swp.params] self._len -= 1 self.in_progress.add(swp.params) return swp.as_task( self.module, self.hparams, self.folder, self.params_to_id[swp.params], 0, self.sweep_cores, self.listeners, self.params_to_ntrials[swp.params] ) if not self.trials: return None pop_ind = len(self.trials) - 1 while True: trl = self.trials[pop_ind] if (trl.params not in self.in_progress and trl.params not in self.params_to_sweeps_ind): break pop_ind -= 1 if pop_ind < 0: return None trl = self.trials.pop(pop_ind) if trl.trials > cores: trl.trials -= cores self.trials.add(trl) self.in_progress.add(trl.params) return trl.as_task( self.module, self.hparams, self.folder, self.params_to_id[trl.params], cores, self.sweep_cores, self.listeners, self.params_to_ntrials[trl.params] ) del self.params_to_task[trl.params] self._len -= 1 self.in_progress.add(trl.params) return trl.as_task( self.module, self.hparams, self.folder, self.params_to_id[trl.params], trl.trials, self.sweep_cores, self.listeners, self.params_to_ntrials[trl.params] )
[docs] def get_next_task(self, cores): res = self._get_next_task(cores) if res is not None: logger.debug('starting task %s', str(res)) return res
[docs] def have_more_tasks(self): return (self.expecting_more_trials or self._len > 0)
def __len__(self) -> int: return self._len
[docs]class Sweeper: """The instance of the class which performs all the individual operations required for sweeping over an arbitrary number of architectural parameters. Everything within this class occurs on the main thread. :ivar module_name: the module which contains the model, dataset, loss, and accuracy style. The model function has N parameters, where N is the number of parameters the param selector gives us. :ivar ParamSelector param_selector: the object which selects which combination of parameters to test :ivar ParamsTaskQueue tasks: the tasks that we know we are ready to perform. These are being immediately dispatched to other threads according to the number of cores required. :ivar int sweep_cores: the number of actual physical cores that we assign to sweeping. Note that this differs from the argument to sweep. :ivar str folder: the path to the folder we are saving things in. we actually store stuff at folder/points for the most part. :ivar HyperparameterSettings hparams: the hyperparameter settings used for hyperparameter tuning. """ def __init__(self, module_name: str, param_selector: ps.ParamSelector, tasks: ParamsTaskQueue, sweep_cores: int, folder: str, hparams: hyperparams.HyperparameterSettings): self.module_name = module_name self.param_selector = param_selector self.tasks = tasks self.sweep_cores = sweep_cores self.folder = folder self.hparams = hparams
[docs] def add_tasks(self, limit=100) -> None: """Adds as many tasks to the TaskQueue as the parameter selector will provide, without causing the tasks list to exceed the given limit. This will also update tasks.expecting_more_tasks. This must ensure that we don't enqueue multiple tasks which have the same hyperparameters, however we can update tasks to add more trials. """ self.tasks.expecting_more_trials = True while (self.param_selector.has_more_trials() and len(self.tasks) < limit): res = self.param_selector.get_next_trials() if res is None: return prms, trls = res self.tasks.add_task_by_params(prms, trls) self.tasks.expecting_more_trials = False
[docs] def store_params_lookup(self) -> None: """Stores the parameters that corresponded to each of the arbitrarily named folders in folder/points. This only works when we just swept and hence the details are still in memory. Produces the following files: folder/points/ params_lookup.pt Contains a pickled tuple of two dictionaries. The first dictionary goes form parameter tuples to corresponding ids (names of folders). The second goes from ids to parameter tuples. params.csv A human-readable variant of params_lookup. A CSV file where the rows are <id>,<params>. Each individual parameter is given its own column. The first row is used for descriptions and should be skipped when parsing. """ params_to_id = self.tasks.params_to_id ids_to_param = dict((v, k) for k, v in params_to_id.items()) pickled_file = os.path.join(self.folder, 'points', 'params_lookup.pt') with open(pickled_file, 'wb') as outfile: pickle.dump((params_to_id, ids_to_param), outfile) csv_filen = os.path.join(self.folder, 'points', 'params.csv') with open(csv_filen, 'w') as csv_file: csv_writer = csv.writer(csv_file) csv_writer.writerow(['Identifier / Folder Name', 'Parameters (one per column)']) for iden, params in ids_to_param.items(): row = [iden] row.extend(params) row = [str(s) for s in row] csv_writer.writerow(row)
[docs] def collate_files(self) -> None: """Collates the information found in each of the sweeps into locations which are more readily accessible. This only requires that the folder is set correctly and store_params_lookup has been called. Produces the following files: folder/points/ funcres.pt Contains a pickled list of tuples of the following form: (params, min lr, max lr, bs, perfs_train, losses_train, perfs_val, losses_val) """ pickled_file = os.path.join(self.folder, 'points', 'params_lookup.pt') with open(pickled_file, 'rb') as infile: params_to_id, ids_to_param = pickle.load(infile) res = [] for iden, params in ids_to_param.items(): pfolder = os.path.join(self.folder, 'points', str(iden)) hpfile = os.path.join(pfolder, 'hparams', 'final.json') with open(hpfile, 'r') as infile: hps = json.load(infile) lr_min = hps['lr_start'] lr_max = hps['lr_end'] bs = hps['batch_size'] resfile = os.path.join(pfolder, 'results.npz') with np.load(resfile) as infile: l_train = infile['final_loss_train'] p_train = infile['final_perf_train'] l_val = infile['final_loss_val'] p_val = infile['final_perf_val'] res.append((params, lr_min, lr_max, bs, p_train, l_train, p_val, l_val)) fres_file = os.path.join(self.folder, 'points', 'funcres.pt') with open(fres_file, 'wb') as outfile: pickle.dump(res, outfile)
[docs]def sweep(module_name: str, param_selector: ps.ParamSelector, sweep_cores: int, hparams: typing.Union[str, hyperparams.HyperparameterSettings], folder: str) -> list: """Performs the given architectural parameter search on the problem defined in the given module. :param str module_name: the path to the module which contains the model / dataset / loss / accuracy style, where the model accepts the parameters which are being swept (one argument per parameter). :param ParamSelector param_selector: what decides what parameters to be sent. Note that this is structured so it can utilize partial sweeps to inform further search, although the best performance requires that it can give more than one trial at a time. :param int sweep_cores: the number of cores that are used for sweeping parameters. This is important both for being able to replicate the results of the sweep and in terms of performance. If this number exceeds the number of physical cores, it will be simulated in a way that correctly utilizes the available resources. A higher number of sweep cores gives more accurate and consistent hyperparameters, which is essential for meaningful comparisons between architectures. 4 is a reasonable starting place. :param HyperparameterSettings hparams: either the hyperparameter settings to use or a name of a preset ('fastest', 'fast', 'slow', 'slowest') that is used for sweeping hyperparameters. This value will be modified to ensure we are simulating sweep_cores correctly if necessary. :param str folder: where to save the output to. the main output that one will typically want to use is folder/points/ funcres.pt: contains a list of tuples where each tuple is of the form (params, lr_min, lr_max, bs, perf_train, loss_train, perf_val, loss_val). The performance and loss are expressed as an array with one element per trial. params_lookup.pt: contains a tuple of two dictionaries, where the first is params_to_ids and goes from a tuple of parameters to the corresponding name of the folder in folder/points which is the result of the model_manager train, and the second has the keys/values swapped (ids to params). This returns the unpickled content in folder/points/funcres.pt, which is a list of tuples of the form .. code:: python (params: tuple, lr_min: float, lr_max: float, bs: int, perf_train: np.ndarray # (shape=(trials,)), loss_train: np.ndarray # (shape=(trials,)), perf_val: np.ndarray # (shape=(trials,)), loss_val: np.ndarray # (shape=(trials,)) ) As is typical, loss is non-negative and lower is better. Performance is between 0 and 1 and higher is better. """ module_name = mutils.fix_imports((module_name,))[0] n_physical_cores = psutil.cpu_count(False) hparams = hyperparams.get_settings(hparams) if sweep_cores > n_physical_cores: hparams.lr_min_inits = max(hparams.lr_min_inits, sweep_cores) hparams.batch_rn_min_inits = max( hparams.batch_rn_min_inits, sweep_cores) hparams.batch_pt_min_inits = max( hparams.batch_pt_min_inits, (sweep_cores // hparams.batch_pts) ) sweep_cores = n_physical_cores tasks = ParamsTaskQueue( n_physical_cores, sweep_cores, module_name, hparams, folder, [param_selector] ) sweeper = Sweeper(module_name, param_selector, tasks, sweep_cores, folder, hparams) sweeper.add_tasks() disp.dispatch(tasks, n_physical_cores, ('ignite_simple.model_manager',), sweeper.add_tasks) sweeper.store_params_lookup() sweeper.collate_files() with open(os.path.join(folder, 'points', 'funcres.pt'), 'rb') as infile: res = pickle.load(infile) return res