# coding: utf-8
# flake8: noqa
"""
Error handlers for errors originating from the Submission systems.
"""
import re
import abc
from abc import ABCMeta, abstractmethod
__author__ = "Michiel van Setten"
__copyright__ = " "
__version__ = "0.9"
__maintainer__ = "Michiel van Setten"
__email__ = "mjvansetten@gmail.com"
__date__ = "May 2014"
__all_errors__ = ['SubmitError', 'FullQueueError', 'DiskError', 'TimeCancelError', 'MemoryCancelError',
'NodeFailureError']
[docs]
class CorrectorProtocolScheduler(metaclass=ABCMeta):
"""
Abstract class to define the protocol / interface for correction operators. The client code qadapters / submission
script generator method / ... should implement these methods.
"""
@property
@abc.abstractmethod
def name(self):
return str()
[docs]
@abstractmethod
def exclude_nodes(self, nodes):
"""
Method to exclude certain nodes from being used in the calculation. It is called when a calculation seemed to
have been crashed due to a hardware failure at the nodes specified.
nodes: list of node numbers that were found to cause problems
returns True if the memory could be increased False otherwise
"""
[docs]
@abstractmethod
def increase_mem(self):
"""
Method to increase then memory in the calculation. It is called when a calculation seemed to have been crashed
due to a insufficient memory.
returns True if the memory could be increased False otherwise
"""
[docs]
@abstractmethod
def increase_time(self):
"""
Method to increase te time for the calculation. It is called when a calculation seemed to
have been crashed due to a time limit.
returns True if the memory could be increased False otherwise
"""
[docs]
@abstractmethod
def increase_cpus(self):
"""
Method to increse the number of cpus being used in the calculation. It is called when a calculation seemed to
have been crashed due to time or memory limits being broken.
returns True if the memory could be increased False otherwise
"""
[docs]
class CorrectorProtocolApplication(metaclass=ABCMeta):
"""
Abstract class to define the protocol / interface for correction operators. The client code quadapters / submission
script generator method / ... should implement these methods.
"""
@property
@abc.abstractmethod
def name(self):
return str()
[docs]
@abstractmethod
def decrease_mem(self):
"""
Method to increase then memory in the calculation. It is called when a calculation seemed to have been crashed
due to a insufficient memory.
returns True if the memory could be increased False otherwise
"""
[docs]
@abstractmethod
def speed_up(self):
"""
Method to speed_up the calculation. It is called when a calculation seemed to time limits being broken.
returns True if the memory could be increased False otherwise
"""
[docs]
class AbstractError(metaclass=ABCMeta):
"""
Error base class
"""
def __init__(self, errmsg, meta_data):
self.errmsg = errmsg
self.meta_data = meta_data if meta_data is not None else {}
def __str__(self):
_message = '%s %s\n' \
' error message : %s \n' \
' meta data : %s' % (self.name, self.__doc__, self.errmsg, str(self.meta_data))
return _message
@property
def name(self):
return self.__class__.__name__
@property
def scheduler_adapter_solutions(self):
"""
to be implemented by concrete errors returning a list of tuples defining corrections. The First element of the
tuple should be a string of one of the methods in CorrectorProtocolScheduler, the second element should
contain the arguments.
"""
return []
@property
def application_adapter_solutions(self):
"""
to be implemented by concrete errors returning a list of tuples defining corrections. The First element of the
tuple should be a string of one of the methods in CorrectorProtocolApplication, the second element should
contain the arguments.
"""
return []
[docs]
def last_resort_solution(self):
"""
what to do if every thing else fails...
"""
print('non of the defined solutions for %s returned success...' % self.name)
return
[docs]
class SubmitError(AbstractError):
"""
Errors occurring at submission. The limits on the cluster may have changed.
"""
[docs]
class FullQueueError(AbstractError):
"""
Errors occurring at submission. To many jobs in the queue / total cpus / .. .
"""
[docs]
class DiskError(AbstractError):
"""
Errors involving problems writing to disk.
"""
[docs]
class TimeCancelError(AbstractError):
"""
Error due to exceeding the time limit for the job.
.limit will return a list of limits that were broken, None if it could not be determined.
"""
@property
def limit(self):
return self.meta_data.get('broken_limit')
@property
def scheduler_adapter_solutions(self):
return [(CorrectorProtocolScheduler.increase_time,)]
@property
def application_adapter_solutions(self):
return [(CorrectorProtocolApplication.speed_up,)]
[docs]
class MemoryCancelError(AbstractError):
"""
Error due to exceeding the memory limit for the job.
.limit will return a list of limits that were broken, None if it could not be determined.
"""
@property
def limit(self):
return self.meta_data.get('broken_limit')
@property
def scheduler_adapter_solutions(self):
return [(CorrectorProtocolScheduler.increase_mem,)]
@property
def application_adapter_solutions(self):
return [(CorrectorProtocolApplication.decrease_mem,)]
[docs]
class MasterProcessMemoryCancelError(AbstractError):
"""
Error due to exceeding the memory limit for the job on the master node.
"""
[docs]
class SlaveProcessMemoryCancelError(AbstractError):
"""
Error due to exceeding the memory limit for the job on a node different from the master.
"""
[docs]
class NodeFailureError(AbstractError):
"""
Error due the hardware failure of a specific node.
.node will return a list of problematic nodes, None if it could not be determined.
"""
@property
def nodes(self):
return self.meta_data.get('nodes')
@property
def scheduler_adapter_solutions(self):
return [(CorrectorProtocolScheduler.exclude_nodes, [self.nodes])]
[docs]
class AbstractErrorParser(metaclass=ABCMeta):
"""
Abstract class for parsing errors originating from the scheduler system and error that are not reported by the
program itself, i.e. segmentation faults.
A concrete implementation of this class for a specific scheduler needs a class attribute ERRORS for containing a
dictionary specifying error:
ERRORS = {ErrorClass: {
'file_specifier' : {
'string': "the string to be looked for",
'meta_filter': "string specifing the regular expression to obtain the meta data"
}
}
"""
def __init__(self, err_file, out_file=None, run_err_file=None, batch_err_file=None):
self.files = {'err': err_file, 'out': out_file, 'run_err': run_err_file, 'batch_err': batch_err_file}
self.errors = []
@property
@abc.abstractmethod
def error_definitions(self):
return dict()
[docs]
def parse_single(self, errmsg):
"""
Parse the provided files for the corresponding strings.
"""
found = False
message = None
metadata = None
for k in errmsg.keys():
if self.files[k] is not None:
#print('parsing ', self.files[k], ' for ', errmsg[k]['string'])
try:
with open(self.files[k], mode='r') as f:
lines = f.read().split('\n')
for line in lines:
if errmsg[k]['string'] in line:
message = line
found = True
if found:
metadata = self.extract_metadata(lines, errmsg[k]['meta_filter'])
except (IOError, OSError):
print(self.files[k], 'not found')
pass
except TypeError:
print('type error', self.files[k], ' has type ', self.files[k].cls(), ' should be string.')
pass
return found, message, metadata
[docs]
def parse(self):
"""
Parse for the occurens of all errors defined in ERRORS
"""
errors_tested = 0
for error in self.error_definitions:
errors_tested += 1
result = self.parse_single(self.error_definitions[error])
if result[0]:
self.errors.append(error(result[1], result[2]))
if len(self.errors) > 0:
print('QUEUE_ERROR FOUND')
for error in self.errors:
print(error)
return errors_tested
[docs]
class SlurmErrorParser(AbstractErrorParser):
"""
Implementation of the error definitions for the Slurm scheduler
"""
@property
def error_definitions(self):
return {
SubmitError: {
'batch_err': {
'string': "Batch job submission failed",
'meta_filter': {}
}
},
FullQueueError: {
'batch_err': {
'string': "Job violates accounting/QOS policy",
'meta_filter': {}
}
},
MemoryCancelError: {
'err': {
'string': "Exceeded job memory limit",
'meta_filter': {}
}
},
#slurmstepd: error: *** JOB 1803480 CANCELLED AT 2015-12-16T14:57:32 DUE TO TIME LIMIT on lmWn009 ***
#slurmstepd: error: *** JOB 1803712 CANCELLED AT 2015-12-17T15:21:41 DUE TO TIME LIMIT on lmWn001 ***
TimeCancelError: {
'err': {
'string': "DUE TO TIME LIMIT",
'meta_filter': {
'time_of_cancel': [r"(.*)JOB (\d+) CANCELLED AT (\S*) DUE TO TIME LIMIT(.*)", 3]
}
}
},
NodeFailureError: {
'run_err': {
'string': "can't open /dev/ipath, network down",
'meta_filter': {
'nodes': [r"node(\d+)\.(\d+)can't open (\S*), network down \(err=26\)", 1]
}
}
},
AbstractError: {
'out': {
'string': "a string to be found",
'meta_filter': {}
}
}
}
[docs]
class PBSErrorParser(AbstractErrorParser):
"""
Implementation for the PBS scheduler
PBS: job killed: walltime 932 exceeded limit 900
PBS: job killed: walltime 46 exceeded limit 30
PBS: job killed: vmem 2085244kb exceeded limit 1945600kb
"""
@property
def error_definitions(self):
return {
TimeCancelError: {
'out': {
'string': "job killed: walltime",
'meta_filter': {
'broken_limit': [r"=>> PBS: job killed: walltime (\d+) exceeded limit (\d+)", 2]
}
}
},
AbstractError: {
'out': {
'string': "a string to be found",
'meta_filter': {}
}
},
MemoryCancelError: {
'out': {
'string': "job killed: vmem",
'meta_filter': {
'broken_limit': [r"(.*)job killed: vmem (\d+)kb exceeded limit (\d+)kb", 3]
}
}
}
}
ALL_PARSERS = {'slurm': SlurmErrorParser, 'pbspro': PBSErrorParser, 'torque': PBSErrorParser}
[docs]
def get_parser(scheduler, err_file, out_file=None, run_err_file=None, batch_err_file=None):
"""
Factory function to provide the parser for the specified scheduler. If the scheduler is not implemented None is
returned. The files, string, correspond to file names of the out and err files:
err_file stderr of the scheduler
out_file stdout of the scheduler
run_err_file stderr of the application
batch_err_file stderr of the submission
Returns:
None if scheduler is not supported.
"""
cls = ALL_PARSERS.get(scheduler)
return cls if cls is None else cls(err_file, out_file, run_err_file, batch_err_file)
if __name__ == "__main__":
my_parser = get_parser('pbs', err_file='queue.err', out_file='queue.out', run_err_file='run.err',
batch_err_file='sbatch.err')
my_parser.parse()
print('parser.errors', my_parser.errors)
for my_error in my_parser.errors:
print(my_error)