Source code for ignite_simple.model_manager

"""This module is meant to be responsible for all of the training performed
on an identical model, dataset, and loss. Specifically, it decides on the
folder structure, collates results for analysis, and handles archiving
old data.
"""
import typing
import psutil
import os
import datetime
import logging
import logging.config
import ignite_simple  # pylint: disable=unused-import
import ignite_simple.hyperparams as hparams
import ignite_simple.analarams as aparams
import ignite_simple.utils as utils
import ignite_simple.tuner as tuner
import ignite_simple.trainer as trainer
from ignite_simple.analysis import analyze
from ignite.engine import Events
from ignite.contrib.handlers import CustomPeriodicEvent
from ignite_simple.range_finder import autosmooth
import json
import numpy as np
import multiprocessing as mp
import time
import torch
import shutil

def _snap_perf(partial_epoch, ind, losses_train, losses_val, perfs_train,
               perfs_val, num_for_metric, tnr, state):
    state.evaluator.run(utils.create_partial_loader(
        state.train_set, num_for_metric))

    metrics = state.evaluator.state.metrics
    losses_train[ind[0]] = float(metrics['loss'])
    perfs_train[ind[0]] = float(metrics['perf'])

    state.evaluator.run(utils.create_partial_loader(
        state.val_set, num_for_metric))

    metrics = state.evaluator.state.metrics
    losses_val[ind[0]] = float(metrics['loss'])
    perfs_val[ind[0]] = float(metrics['perf'])


def _update_partial_epoch(partial_epoch, epochs, ind, tnr, state):
    num_in_batch = len(tnr.state.batch[0])
    epoch_size = len(state.train_set)

    try:
        epochs[ind[0]] = (float(num_in_batch) * tnr.state.iteration) / float(epoch_size)
    except IndexError:
        print(f'epoch={tnr.state.epoch}, iteration={tnr.state.iteration}')
        raise

def _increment_ind(ind, tnr, state):
    ind[0] += 1

def _reset_partial_epoch(partial_epoch, tnr, state):
    partial_epoch[0] = 0

def _save_result(filen, loss_train, loss_val, perf_train, perf_val, tnr,
                 state):
    with open(filen, 'w') as outfile:
        json.dump({
            'epochs': int(tnr.state.epoch),
            'loss_train': float(loss_train[0]),
            'loss_val': float(loss_val[0]),
            'perf_train': float(perf_train[0]),
            'perf_val': float(perf_val[0])
        }, outfile)

def _savez_compressed(filen, tnr, state, **kwargs):
    np.savez_compressed(filen, **kwargs)

def _save_model(filen, tnr, state):
    torch.save(state.unstripped_model, filen)

def _init_cpe(cpe, tnr):
    cpe.attach(tnr)

def _trial(model_loader, dataset_loader, loss_loader, trial_folder,
           with_throughtime, accuracy_style, lr_start, lr_end, batch_size,
           cycle_time_epochs, num_epochs):
    if os.path.exists('logging-worker.conf'):
        logging.config.fileConfig('logging-worker.conf')

    logger = logging.getLogger(__name__)
    logger.debug('Starting trial: accuracy_style=%s, lr_start=%s, lr_end=%s, '
                 + 'batch_size=%s, cycle_time_epochs=%s, num_epochs=%s',
                 accuracy_style, lr_start, lr_end, batch_size,
                 cycle_time_epochs, num_epochs)

    os.makedirs(trial_folder)

    train_set, val_set = utils.invoke(dataset_loader)
    len_train_set = len(train_set)
    len_val_set = len(val_set)
    del train_set
    del val_set

    final_loss_train = np.zeros(1)
    final_loss_val = np.zeros(1)
    final_perf_train = np.zeros(1)
    final_perf_val = np.zeros(1)
    final_ind = [0]
    final_partial_epoch = [1]

    handlers = [
        (
            Events.STARTED,
            (
                __name__, '_save_model',
                (
                    os.path.join(trial_folder, 'model_init.pt'),
                ),
                dict()
            )
        ),
        (
            Events.COMPLETED,
            (
                __name__, '_snap_perf',
                (
                    final_partial_epoch,
                    final_ind,
                    final_loss_train,
                    final_loss_val,
                    final_perf_train,
                    final_perf_val,
                    min(len_train_set, len_val_set)
                ),
                dict()
            ),
        ),
        (
            Events.COMPLETED,
            (
                __name__, '_save_result',
                (
                    os.path.join(trial_folder, 'result.json'),
                    final_loss_train, final_loss_val, final_perf_train,
                    final_perf_val
                ),
                dict()
            )
        ),
        (
            Events.COMPLETED,
            (
                __name__, '_save_model',
                (
                    os.path.join(trial_folder, 'model.pt'),
                ),
                dict()
            )
        ),
    ]

    initializer = None

    if with_throughtime:
        iters_per = int(len_train_set / batch_size)
        iters_total = iters_per * num_epochs

        samples_per = min(10, iters_per)
        iters_per_sample = iters_per // samples_per

        if iters_per_sample < 1:
            iters_per_sample = 1
            samples_per = iters_per

        cpe = CustomPeriodicEvent(n_iterations=iters_per_sample)
        snap_event = getattr(cpe.Events, f'ITERATIONS_{iters_per_sample}_COMPLETED')  # pylint: disable=no-member
        initializer = (
            __name__, '_init_cpe', (cpe,), dict()
        )

        samples_total = iters_total // iters_per_sample

        num_for_metric = min(1024, len_val_set, len_train_set)
        settings = np.array([num_for_metric])
        epochs = np.zeros(samples_total)
        loss_train = np.zeros(samples_total)
        loss_val = np.zeros(samples_total)
        perf_train = np.zeros(samples_total)
        perf_val = np.zeros(samples_total)
        ind = [0]
        partial_epoch = [0]
        handlers.extend([
            (
                snap_event,
                (
                    __name__, '_update_partial_epoch',
                    (
                        partial_epoch, epochs, ind
                    ),
                    dict()
                ),
            ),
            (
                snap_event,
                (
                    __name__, '_snap_perf',
                    (
                        partial_epoch,
                        ind,
                        loss_train,
                        loss_val,
                        perf_train,
                        perf_val,
                        num_for_metric,
                    ),
                    dict()
                ),
            ),
            (
                snap_event,
                (
                    __name__, '_increment_ind',
                    (
                        ind,
                    ),
                    dict()
                ),
            ),
            (
                Events.EPOCH_COMPLETED,
                (
                    __name__, '_reset_partial_epoch',
                    (
                        partial_epoch,
                    ),
                    dict()
                )
            ),
            (
                Events.COMPLETED,
                (
                    __name__, '_savez_compressed',
                    (
                        os.path.join(trial_folder, 'throughtime.npz'),
                    ),
                    {
                        'settings': settings,
                        'epochs': epochs,
                        'losses_train': loss_train,
                        'losses_val': loss_val,
                        'perfs_train': perf_train,
                        'perfs_val': perf_val
                    }
                )
            )
        ])

    tnr_settings = trainer.TrainSettings(
        accuracy_style, model_loader, loss_loader,
        (
            'ignite_simple.utils', 'task_loader',
            (dataset_loader, batch_size, True, True),
            dict()
        ),
        handlers,
        initializer,
        lr_start, lr_end, cycle_time_epochs, num_epochs
    )

    try:
        trainer.train(tnr_settings)
    except:
        traceback.print_exc()
        logger.exception('Exception encountered while training during sweep')
        raise

    logger.debug('Finished trial')

[docs]def train(model_loader: typing.Tuple[str, str, tuple, dict], dataset_loader: typing.Tuple[str, str, tuple, dict], loss_loader: typing.Tuple[str, str, tuple, dict], folder: str, hyperparameters: typing.Union[str, hparams.HyperparameterSettings], analysis: typing.Union[str, aparams.AnalysisSettings], allow_later_analysis_up_to: typing.Union[ str, aparams.AnalysisSettings], accuracy_style: str, trials: int, is_continuation: bool, history_folder: str, cores: typing.Union[str, int], trials_strict: bool = False) -> None: """Trains the given model on the given dataset with the given loss by finding and saving or loading the hyperparameters with the given settings, performing the given analysis but gathering sufficient data to later analyze up to the specified amount. The folder structure is as follows: .. code:: none folder/ hparams/ the result from tuner.tune trials/ i/ (where i=0,1,...) result.json note that for classification tasks perf is accuracy, and in other tasks it is inverse loss. Note that perf is always available and higher is better, whereas for loss lower is better. {'loss_train': float, 'loss_val': float, 'perf_train': float, 'perf_val': float} model_init.pt the initial random initialization of the model, saved with torch.save model.pt the model after training, saved with torch.save throughtime.npz this is only stored if storing training_metric_imgs. settings: shape (1,), where the values are: - number of points randomly selected from the corresponding dataset to calculate metrics epochs: the partial epoch number for the samples losses_train: the loss for the corresponding epoch, same shape as epochs, for the training dataset losses_val: the loss for the corresponding epoch, same shape as epochs, for the validation dataset perfs_train: in classification tasks this is fractional accuracy in other tasks, this is inverse loss the performance at the corresponding epoch, same shape as epochs, for the training dataset perfs_val: in classification tasks this is fractional accuracy in other tasks, this is inverse loss the performance at the corresponding epoch, same shape as epochs, for the validation dataset results.npz The trials/result.json except concatenated together for easier loading final_loss_train: shape (trials,) final_loss_val: shape (trials,) final_perf_train: shape (trials,) final_perf_val: shape (trials,) throughtimes.npz the trials/throughtime.npz, if available, stacked for easier loading settings: shape (1,) the number of points used for gathering metrics at each sample epochs: shape (samples,) the epoch that corresponds to each sample during training. this is a float, since we may sample multiple times per epoch losses_train: shape (trials, samples) losses_train_smoothed: shape (trials, samples) losses_val: shape (trials, samples) losses_val_smoothed: shape (trials, samples) perfs_train: shape (trials, samples) perfs_train_smoothed: shape (trials, samples) perfs_val: shape (trials, samples) perfs_val_smoothed: shape (trials, samples) :param model_loader: (module, attrname, args, kwargs) defines where the callable which returns a torch.nn.Module can be found, and what arguments to pass to the callable to get the module. The callable should return a random initialization of the model. :param dataset_loader: (module, attrname, args, kwargs) defines where the callable which returns (train_set, val_set) can be found, and what arguments to pass to the callable to get the datasets :param loss_loader: (module, attrname, args, kwargs) defines where the callable which returns the torch.nn.Module that computes a scalar value which ought to be minimized, and what arguments to pass the callable for the loss. :param folder: the folder where the output should be stored :param hyperparameters: the hyperparameter settings or a name of a preset (one of `fastest`, `fast`, `slow`, and `slowest`) :param analysis: the analysis settings or a name of a preset (typically one of `none`, `text`, `images`, `animations`, `videos`), for a complete list see `ignite_simple.analarams`. It is always equivalent to set this value to `none` and then call analysis.reanalyze with the desired analysis :param allow_later_analysis_up_to: this is also an analysis settings or name of a preset, except this analysis isn't produced but instead we ensure that sufficient data is collected to perform this analysis if desired later. it must be at least as high as analysis :param accuracy_style: one of `classification`, `multiclass`, and `inv-loss` to describe how performance is measured. classification assumes one-hot labels for the output, multiclass assumes potentially multiple ones in the labels, and `inv-loss` uses 1/loss as the performance metric instead. :param trials: the number of trials which should be formed with the found settings :param is_continuation: if True then if folder already exists then it is assumed to have been the result of this function called with the same parameters except possible trials, and the result will be the sum of the existing trials plus the new trials to perform. If this is False and the folder already exists, it will be moved into history_folder where the name is the current timestamp. :param history_folder: where to store the old folders if they are found when is_continuation is False. :param cores: either an integer for the number of physical cores that are available for training, or the string 'all' for the number of cores to be auto-detected and used. :param trials_strict: if False, then this will use all available resources to compute trials such that this completes in approximately the minimum amount of time to produce the required number of trials. This may result in more than the specified number of trials being run. If True, exactly trial runs will be performed regardless of the amount of available computational resources (i.e., available cores may be unused) """ model_loader = utils.fix_imports(model_loader) dataset_loader = utils.fix_imports(dataset_loader) loss_loader = utils.fix_imports(loss_loader) hyperparameters = hparams.get_settings(hyperparameters) skip_analysis = analysis == 'none' analysis = aparams.get_settings(analysis) allow_later_analysis_up_to = aparams.get_settings( allow_later_analysis_up_to) if cores == 'all': cores = psutil.cpu_count(logical=False) logger = logging.getLogger(__name__) logger.debug('Starting trial with model args %s, %s, dataset args %s %s', model_loader[2], model_loader[3], dataset_loader[2], dataset_loader[3]) if not is_continuation and os.path.exists(folder): tstr = str(datetime.datetime.now()).replace(' ', '_').replace(':', '-') fname = tstr os.makedirs(history_folder, exist_ok=True) ctr = 1 while os.path.exists(os.path.join(history_folder, fname)): ctr += 1 fname = f'{tstr}_({ctr})' logger.info('Archiving %s to %s...', folder, os.path.join(history_folder, fname)) os.rename(folder, os.path.join(history_folder, fname)) continuing = is_continuation and os.path.exists(folder) if not continuing: logger.info('Tuning learning rate and batch size...') try: tuner.tune(model_loader, dataset_loader, loss_loader, 'inv-loss', os.path.join(folder, 'hparams'), cores, hyperparameters, allow_later_analysis_up_to) except: logger.exception('An error occurred while tuning') raise with open(os.path.join(folder, 'hparams', 'final.json')) as infile: hparam_final = json.load(infile) lr_start = float(hparam_final['lr_start']) lr_end = float(hparam_final['lr_end']) batch_size = int(hparam_final['batch_size']) cycle_time_epochs = int(hparam_final['cycle_size_epochs']) epochs = int(hparam_final['epochs']) trial_offset = 0 if continuing: while os.path.exists( os.path.join(folder, 'trials', str(trial_offset))): trial_offset += 1 logger.info('Detected %s existing trials...', trial_offset) if trial_offset == 0: continuing = False if trials == 0 and trials_strict: logger.info('No trials -> skipping collating and analyzing') return with_throughtime = allow_later_analysis_up_to.training_metric_imgs if cores > 1: logger.info('Performing at least %s trials across %s cores...', trials, cores) processes = [] num_trials = 0 last_print_trial = 0 while num_trials < trials or (not trials_strict and num_trials < cores): if len(processes) >= cores: if last_print_trial < num_trials: logger.info('Started up to trial %s...', num_trials) last_print_trial = num_trials time.sleep(0.1) for i in range(len(processes) - 1, -1, -1): if not processes[i].is_alive(): processes.pop(i) continue proc = mp.Process( target=_trial, args=( model_loader, dataset_loader, loss_loader, os.path.join(folder, 'trials', str(trial_offset + num_trials)), with_throughtime, accuracy_style, lr_start, lr_end, batch_size, cycle_time_epochs, epochs ) ) proc.start() processes.append(proc) num_trials += 1 logger.info('Waiting for %s trials to complete...', len(processes)) for proc in processes: proc.join() else: logger.info('Performing %s trials (single core)...', trials) for num_trials in range(trials): _trial(model_loader, dataset_loader, loss_loader, os.path.join(folder, 'trials', str(trial_offset + num_trials)), with_throughtime, accuracy_style, lr_start, lr_end, batch_size, cycle_time_epochs, epochs) logger.info('Finished trial %s/%s', num_trials + 1, trials) logger.info('Collating data...') # This is a bit messy and a bit repetitive, but trying to break it out # into functions would require some very arduent function signatures # that are all but certainly never going to be reused res_file = os.path.join(folder, 'results.npz') tt_file = os.path.join(folder, 'throughtimes.npz') to_collate = dict() skip_collate = {'settings', 'epochs'} skipped_sample = dict() if continuing: with np.load(res_file) as infile: for key, val in infile.items(): if key in skip_collate: skipped_sample[key] = val else: to_collate[key] = [val] os.remove(res_file) if with_throughtime: with np.load(tt_file) as infile: for key, val in infile.items(): if key in skip_collate: skipped_sample[key] = val else: to_collate[key] = [val] os.remove(tt_file) for trial in range(trial_offset, trial_offset + num_trials): trial_folder = os.path.join(folder, 'trials', str(trial)) with open(os.path.join(trial_folder, 'result.json'), 'r') as infile: inparsed = json.load(infile) for key, val in inparsed.items(): key = f'final_{key}' if key in skip_collate: if key not in skipped_sample: skipped_sample[key] = val else: to_ap = np.stack([np.array(val)]) if key not in to_collate: to_collate[key] = [to_ap] else: to_collate[key].append(to_ap) if with_throughtime: with np.load(os.path.join(trial_folder, 'throughtime.npz')) as infile: for key, val in infile.items(): if key in skip_collate: if key not in skipped_sample: skipped_sample[key] = val else: to_ap = np.stack([np.array(val)]) if key not in to_collate: to_collate[key] = [to_ap] else: to_collate[key].append(to_ap) for key, val in tuple(to_collate.items()): to_collate[key] = np.concatenate(val, axis=0) np.savez_compressed( res_file, final_loss_train=to_collate['final_loss_train'], final_loss_val=to_collate['final_loss_val'], final_perf_train=to_collate['final_perf_train'], final_perf_val=to_collate['final_perf_val'] ) if with_throughtime: np.savez_compressed( tt_file, settings=skipped_sample['settings'], epochs=skipped_sample['epochs'], losses_train=to_collate['losses_train'], losses_train_smoothed=autosmooth(to_collate['losses_train']), losses_val=to_collate['losses_val'], losses_val_smoothed=autosmooth(to_collate['losses_val']), perfs_train=to_collate['perfs_train'], perfs_train_smoothed=autosmooth(to_collate['perfs_train']), perfs_val=to_collate['perfs_val'], perfs_val_smoothed=autosmooth(to_collate['perfs_val']), ) if continuing and os.path.exists(os.path.join(folder, 'analysis')): logger.info('Cleaning old analysis folder...') shutil.rmtree(os.path.join(folder, 'analysis')) if not skip_analysis: analyze(dataset_loader, loss_loader, folder, analysis, accuracy_style, cores)