from typing import Sequence, Union
import logging
import os
import math
import json
import subprocess
import copy
import tempfile
from .config import UGEConfig
from .model_generator_interface import ModelGeneratorInterface, validate_model_generator_interface_input
from .config import ModelGeneratorConfig
logger = logging.getLogger(__name__)
ALL_EXEC_PERMISSIONS = 0o555
"""
This file contains all the functionality needed to train models for a Univa Grid Engine (UGE) HPC cluster.
"""
[docs]class UGEModelGenerator(ModelGeneratorInterface):
"""
Class which generates models utilizing a Univa Grid Engine
"""
def __init__(self, configs: Union[ModelGeneratorConfig, Sequence[ModelGeneratorConfig]],
uge_config: UGEConfig, working_directory: str = os.path.join(os.environ['HOME'],"uge_model_generator"),
validate_uge_dirs: bool = True):
"""
Initializes a UGE Model Generator
:param configs: a ModelGeneratorConfig or a Sequence of ModelGeneratorConfig objects which
define the models to be created
:param uge_config: configuration object which specifies how the parallelization should be farmed across the UGE.
:param working_directory: the directory where the scripts to be run will be stored, along with any objects
that are persisted to carry out the parallelization.
NOTE: this should be a directory that is replicated across the cluster (for example, sometimes /tmp is
configured to not replicate across the cluster
:param validate_uge_dirs: if True, the directory will be validated to ensure it doesn't begin with /tmp,
for the reason that typically /tmp is not replicated across the cluster.
"""
super().__init__(configs)
self.uge_config = uge_config
if self.uge_config.multi_model_same_gpu:
self.configs_expanded = self.configs
else:
self.configs_expanded = self.expand_modelgen_configs_to_process()
self.working_directory = working_directory
self.validate_uge_dirs = validate_uge_dirs
self.validate()
[docs] def expand_modelgen_configs_to_process(self) -> Sequence[ModelGeneratorConfig]:
"""
Converts a sequence of ModelGeneratorConfig objects into another sequence of ModelGeneratorConfig
objects such that each element in the sequence only creates one model.
For example:
Input: cfgs = [cfg1->num_models=1, cfg2->num_models=2]. len(cfgs)=2
Output: cfgs = [cfg1->num_models=1, cfg2->num_models=1, cfg2->num_models=1]. len(cfgs)=3
This is useful so that we can fully distribute all the models that need to be generated
########################################
NOTE: This will lead to multiple configs pointing to the same data on disk. I'm not sure if
this is a problem for PyTorch or not, but this is something to investigate if unexpected results arise.
########################################
:return: expanded config configuration
"""
configs_expanded = []
for cfg in self.configs:
num_models = cfg.num_models
for model_idx in range(num_models):
# make a copy of the object so that there are no memory conflicts among multiple jobs running
cfg_copy = copy.deepcopy(cfg)
cfg_copy.num_models = 1
configs_expanded.append(cfg_copy)
return configs_expanded
[docs] def get_queue_numjobs_assignment(self) -> Sequence:
"""
Determine the number of jobs to give to each queue based on UGEConfig
:return: a list of tuples, with each tuple containing the queue in index-0, and the number of jobs
assigned to that queue in index-1
"""
num_available_queues = len(self.uge_config.queues)
num_jobs_to_process = len(self.configs_expanded)
num_jobs_to_assign = num_jobs_to_process
queue_numjobs_assignment = []
if self.uge_config.queue_distribution is None:
num_jobs = math.ceil(num_jobs_to_process/num_available_queues)
for q in self.uge_config.queues:
if num_jobs_to_assign < num_jobs:
num_jobs = num_jobs_to_assign
queue_numjobs_assignment.append((q, num_jobs))
num_jobs_to_assign -= num_jobs
if num_jobs_to_assign <= 0:
break
else:
for ii, q in enumerate(self.uge_config.queues):
desired_dist_value = self.uge_config.queue_distribution[ii]
num_jobs = num_jobs_to_assign*desired_dist_value
if num_jobs_to_assign < num_jobs:
num_jobs = num_jobs_to_assign
queue_numjobs_assignment.append((q, num_jobs))
num_jobs_to_assign -= num_jobs
if num_jobs_to_assign <= 0:
break
return queue_numjobs_assignment
@staticmethod
def _gen_py_script(pyscript_fname: str, pyscript_log_fname: str, modelgen_cfg_persist_fname: str,
persist_metadata_fname: str,
run_id: str = None, filename: str = None) -> None:
"""
Generate the Python script which will be used to
:param pyscript_fname: name of the file which will have the Python script
:param pyscript_log_fname: log filename where all Python program output will be captured
:param modelgen_cfg_persist_fname: filename of where the configuration data will be persisted for
distributing the job onto cluster nodes
:param persist_metadata_fname: filename of where the configuration metadata will be persisted for
distributing the job onto cluster nodes
:param run_id: any specified run-id which will be passed to the Runner
:param filename: any specified filename which will be passed to the runner
:return: None
"""
with open(pyscript_fname, 'w') as f:
f.write('''\
#!/usr/bin/env python
import json
import logging.config
import trojai.modelgen.config as tpmc
import trojai.modelgen.runner as tpmr
# setup logger
logging.config.dictConfig({
'version': 1,
'formatters': {
'detailed': {
'format': '[%%(asctime)s] %%(levelname)s in %%(module)s: %%(message)s',
},
},
'handlers': {
'file': {
'class': 'logging.handlers.RotatingFileHandler',
'filename': '%s',
'maxBytes': 10 * 1024 * 1024,
'backupCount': 5,
'formatter': 'detailed',
'level': 'INFO',
},
},
'loggers': {
'trojai': {
'handlers': ['file'],
},
'trojai_private': {
'handlers': ['file'],
},
},
'root': {
'level': 'INFO',
},
})
modelgen_cfg = tpmc.ModelGeneratorConfig.load("%s")
with open("%s", 'r') as f:
persist_metadata = json.load(f)
run_cfg = tpmc.modelgen_cfg_to_runner_cfg(modelgen_cfg, run_id=%s, filename=%s)
runner = tpmr.Runner(run_cfg, persist_metadata=persist_metadata, progress_bar_disable=True)
runner.run()
''' % (pyscript_log_fname, modelgen_cfg_persist_fname, persist_metadata_fname, run_id, filename))
@staticmethod
def _gen_bash_script(bashscript_fname: str, pyscript_fname: str) -> None:
"""
Generates the bash script, which sets up the correct environment for each node and calls the generated Python
script to generate the models
:param bashscript_fname: the filename of the bash script to be generated
:param pyscript_fname: the filename of the python script which will be called in the bash script
:return: None
"""
pyscript_abs_path = os.path.abspath(pyscript_fname)
with open(bashscript_fname, 'w') as f:
f.write('''\
#!/bin/bash
source /etc/profile.d/modules.sh
module load cuda91
# setup conda environment
. /cm/shared/apps/anaconda3/etc/profile.d/conda.sh
conda activate trojai
python3 %s
''' % (pyscript_abs_path,))
os.chmod(bashscript_fname, ALL_EXEC_PERMISSIONS) # give everyone read & execute permissions
@staticmethod
def _gen_bash_command(bashscript_fname: str, uge_log_fname: str, queue_name: str,
gpu_node: bool = False, sync_mode: bool = False) -> str:
"""
Creates a UGE command to submit the job to the cluster for processing
:param bashscript_fname: the bash script to run by the cluster
:param uge_log_fname: a filename of any log messages captured by the UGE
:param queue_name: the queue to submit the job to
:param gpu_node: (bool) indicates whether the queue the job is being submitted to has GPU nodes
:param sync_mode: if True, then the shell will be captured by this process. Currently unsupported!
:return: (str) bash command
"""
# UGE job submit command example
#qsub -q gpu-k40.q -l gpu=1 -V -v PATH -cwd -S /bin/bash -j y -sync y -o /home/karrak1/trojai/qsub.log
#test_torch_cuda.sh
cmd_list = list()
cmd_list.append('qsub -q')
cmd_list.append(queue_name)
if gpu_node:
cmd_list.append('-l gpu=1')
cmd_list.append('-V -v PATH -cwd -S /bin/bash -j y')
if sync_mode:
cmd_list.append('-sync y')
cmd_list.append('-o %s' % (uge_log_fname,)) # logging
cmd_list.append(bashscript_fname)
cmd = ' '.join(cmd_list)
return cmd
[docs] def run(self, mock=False) -> None:
"""
Run's the actual UGE job.
:param mock: if True, then it generates all the necessary scripts but doesn't execute the UGE command
:return: None
"""
modelgen_cfgs_processed_idx = 0
queue_numjobs_assignment = self.get_queue_numjobs_assignment()
for qj_assignment in queue_numjobs_assignment:
queue = qj_assignment[0]
num_jobs = qj_assignment[1]
queue_subworking_dir_name = queue.queue_name
for job_idx in range(num_jobs):
# setup working directory for this job
job_working_dir = str(job_idx)
modelgen_cfg_to_schedule = self.configs_expanded[modelgen_cfgs_processed_idx]
num_models_to_gen = modelgen_cfg_to_schedule.num_models
for model_idx in range(num_models_to_gen):
subjob_working_dir = os.path.join(self.working_directory, queue_subworking_dir_name,
job_working_dir, str(model_idx))
try:
os.makedirs(subjob_working_dir)
except IOError as e:
logger.exception(e)
raise IOError(e)
filename = None
run_id = None
if modelgen_cfg_to_schedule.filenames is not None:
if isinstance(modelgen_cfg_to_schedule.filenames, str):
filename = modelgen_cfg_to_schedule.filenames
else:
filename = modelgen_cfg_to_schedule.filenames[model_idx]
elif modelgen_cfg_to_schedule.run_ids is not None:
run_id = modelgen_cfg_to_schedule.run_ids[model_idx]
# save a serialized version of the modelgen_config object to the working directory
persist_cfg_fname = os.path.join(subjob_working_dir, 'persist_config')
modelgen_cfg_to_schedule.save(persist_cfg_fname)
# save the experiment information as a persist_metadata to track results after
persist_metadata_fname = os.path.join(subjob_working_dir, 'persist_metadata.json')
persist_metadata = modelgen_cfg_to_schedule.experiment_cfg
with open(persist_metadata_fname, 'w') as f:
json.dump(persist_metadata, f)
# setup filenames for the python & bash scripts which will be generated to distribute the processing
pyscript_fname = os.path.join(subjob_working_dir, "generate_model.py")
bashscript_fname = os.path.join(subjob_working_dir, "generate_model_" +
queue.queue_name +
"_" + str(job_idx) + "_" + str(model_idx) +
".sh")
pyscript_log_fname = os.path.join(subjob_working_dir, "generate_model.py.log")
uge_log_fname = os.path.join(subjob_working_dir, "log.txt")
# create the python script that will run the actual model
UGEModelGenerator._gen_py_script(pyscript_fname, pyscript_log_fname,
persist_cfg_fname,
persist_metadata_fname,
run_id, filename)
# create the bash script wrapper that will be passed to UGE
UGEModelGenerator._gen_bash_script(bashscript_fname, pyscript_fname)
# create the command that will be called to submit the job
bash_cmd = UGEModelGenerator._gen_bash_command(bashscript_fname, uge_log_fname, queue.queue_name,
queue.gpu_enabled, queue.sync_mode)
# submit the job
logger.info("submitting job with command: " + bash_cmd)
if not mock:
try:
subprocess.run(bash_cmd, shell=True, check=True)
except subprocess.CalledProcessError as e:
logger.exception(e)
raise subprocess.CalledProcessError(e)
modelgen_cfgs_processed_idx += 1
[docs] def validate(self) -> None:
"""
Validate the input configuration
"""
validate_model_generator_interface_input(self.configs)
if not isinstance(self.uge_config, UGEConfig):
msg = "uge_queue_config must be of type UGEQueueConfig"
logger.error(msg)
raise TypeError(msg)
os_temp_dir = tempfile.gettempdir()
if not isinstance(self.working_directory, str):
msg = "working_directory must be a path to a directory that the UGEModelGenerator can use to submit jobs"
logger.error(msg)
raise TypeError(msg)
else:
# check if the working directory is in /tmp, which is not replicated across the cluster and thus should
# not be used
if self.validate_uge_dirs:
if self.working_directory.startswith(os_temp_dir):
msg = os_temp_dir + " should not be used for the working directory because OS temp directories " \
"are typically not propagated throughout the cluster!"
logger.error(msg)
raise ValueError(msg)
try:
os.makedirs(self.working_directory)
except IOError as e:
logger.exception(e)
if not isinstance(self.validate_uge_dirs, bool):
msg = "validate_uge_dirs must be a boolean!"
logger.error(msg)
raise TypeError(msg)
if self.validate_uge_dirs:
for cfg in self.configs_expanded:
if cfg.model_save_dir.startswith(os_temp_dir):
msg = os_temp_dir + " should not be used as the directory for saving models because OS temp " \
"directories are typically not propagated throughout the cluster!"
logger.error(msg)
raise ValueError(msg)
if cfg.stats_save_dir.startswith(os_temp_dir):
msg = os_temp_dir + " should not be used as the directory for saving stats because OS temp " \
"directories are typically not propagated throughout the cluster!"
logger.error(msg)
raise ValueError(msg)