Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# (c) Stefan Countryman, 2019
3"""
4Primitive base classes used throughout the pipeline.
5"""
7import os
8import sys
9import json
10import warnings
11import logging
12import queue
13from tempfile import NamedTemporaryFile
14import yaml
15from types import FunctionType
16from textwrap import wrap, dedent, indent
17from typing import List
18from abc import ABC, abstractproperty, abstractmethod
19from collections import namedtuple
21LOGGER = logging.getLogger(__name__)
22LOCAL_TMPDIR_NAME = '.tmp.d'
25# Colors for logging etc.
26def _colorfmt(color, clear):
27 """Return a string-formatting function that formats strings and colors them
28 by wrapping them in the terminal escape code specified by ``color`` and
29 then ending the string with escape code ``clear``."""
30 def colorfmt(fmt, *args):
31 """Format a string using ``fmt`` as the format string and ``args`` as
32 the arguments for it and wrap it in the terminal escape codes specified
33 in this function's constructor."""
34 return color + fmt.format(*args) + clear
35 return colorfmt
38_BLUE = '\033[94m'
39_RED = '\033[91m'
40_GREEN = '\033[92m'
41_MAGENTA = '\033[95m'
42_YELLOW = '\033[93m'
43_CLEAR = '\033[0m'
44_BOLD = '\033[1m'
45_UNDERLINE = '\033[4m'
47# A cointainer with strings for color formatting. Print these strings to
48# STDOUT to set the color and formatting of the following text. Use
49# ``COLOR.CLEAR`` to reset colors and formatting to default.
50Colors = namedtuple(
51 'Colors',
52 ('BLUE', 'RED', 'GREEN', 'MAGENTA', 'YELLOW', 'CLEAR', 'BOLD',
53 'UNDERLINE', 'blue', 'red', 'green', 'magenta', 'yellow', 'bold',
54 'underline')
55)
57COLOR = Colors(
58 BLUE=_BLUE,
59 RED=_RED,
60 GREEN=_GREEN,
61 MAGENTA=_MAGENTA,
62 YELLOW=_YELLOW,
63 CLEAR=_CLEAR,
64 BOLD=_BOLD,
65 UNDERLINE=_UNDERLINE,
66 blue=_colorfmt(_BLUE, _CLEAR),
67 red=_colorfmt(_RED, _CLEAR),
68 green=_colorfmt(_GREEN, _CLEAR),
69 magenta=_colorfmt(_MAGENTA, _CLEAR),
70 yellow=_colorfmt(_YELLOW, _CLEAR),
71 bold=_colorfmt(_BOLD, _CLEAR),
72 underline=_colorfmt(_UNDERLINE, _CLEAR)
73)
76# pylint: disable=R0903
77class GenerationError(IOError):
78 """
79 An error meant to indicate that a given file could not be generated
80 at the time its generate() method was called due to missing
81 ``DEPENDENCIES`` (or some other benign failure mode). This error should be
82 thrown when behavior is otherwise nominal as an indication that a
83 particular file simply could not be generated at a particular moment in
84 time.
85 """
88class IntentException(GenerationError):
89 """
90 A ``FileHandler`` is currently being generated and should be left alone for
91 now.
92 """
95class CoolDownException(GenerationError):
96 """
97 A ``FileHandler`` is currently cooling down after a failed generation
98 attempt and should be left alone for now.
99 """
102class ImmutableDict(frozenset):
103 """
104 A hashable, immutable namespace inspired by ``namedtuple``. Initialize
105 by passing a dict or an iterable of ``(key, value)`` tuples. Attributes are
106 accessible using dot notation or the map interface.
107 """
109 def __new__(cls, mappable):
110 """See ``ImmutableDict`` class docstring for details."""
111 if not hasattr(mappable, 'items'):
112 mappable = {k: v for k, v in mappable}
113 return frozenset.__new__(cls, mappable.items())
115 def __reduce__(self):
116 """For some reason __getnewargs__ is ignored by pickler. This manual
117 override works."""
118 return (type(self), (dict(self),))
120 def items(self):
121 """D.iteritems() -> an iterator over the (key, value) items of D"""
122 return frozenset.__iter__(self)
124 def values(self):
125 """D.itervalues() -> an iterator over the values of D"""
126 for i in frozenset.__iter__(self):
127 yield i[1]
129 def __iter__(self):
130 """Iterate over just the keys, mimicking ``dict`` behavior."""
131 for i in frozenset.__iter__(self):
132 yield i[0]
134 # methods below do not need to be redefined when changing the storage
135 # behavior of a subclass
137 def get(self, k, d=None): # pylint: disable=invalid-name
138 """D.get(k[,d]) -> D[k] if k in D, else d. d defaults to None."""
139 return self[k] if k in self else d
141 def keys(self):
142 """D.iterkeys() -> an iterator over the keys of D"""
143 return iter(self)
145 def __contains__(self, item):
146 return item in self.keys()
148 def __getitem__(self, key):
149 for item in self.items():
150 if item[0] == key:
151 return item[1]
152 raise KeyError(key)
154 def __repr__(self):
155 return '{}({})'.format(type(self).__name__, repr(dict(self)))
158class NamespaceMappable(ABC):
159 """A mappable class implementing a dot-notation for accessing members."""
161 @abstractmethod
162 def __getitem__(self, key):
163 """We need a dictionary interface to get items; this is supplied by
164 __getitem__."""
166 def __dir__(self):
167 return sorted(set(dir(type(self)) + list(self.keys())))
169 def __getattr__(self, key):
170 try:
171 return self[key]
172 except KeyError:
173 raise AttributeError()
176class Hdf5Storage(object):
177 """
178 A class for reading and writing to an HDF5 cache. Has a dictionary
179 interface and abstracts away file opening and closing.
180 """
182 def __init__(self, filename):
183 """Use ``filename`` as an HDF5 cache. Creates the file on first write if
184 it doesn't exist."""
185 self.filename = os.path.abspath(filename)
187 def __setitem__(self, key, value):
188 import h5py
189 with h5py.File(self.filename, 'a') as h5file:
190 if key in h5file:
191 del h5file[key]
192 h5file[key] = value
194 def __getitem__(self, key):
195 import h5py
196 with h5py.File(self.filename, 'a') as h5file:
197 return h5file[key][()]
199 def __iter__(self):
200 """Read all keys in at once so that we can close the file right
201 away."""
202 import h5py
203 with h5py.File(self.filename, 'a') as h5file:
204 items = list(h5file)
205 return (i for i in items)
208# Implement FileHandler as a subclass of a namedtuple to ensure immutability
209# and to make it possible to identify FileHandler instances without having to
210# import FileHandler implementations
211FileHandlerTuple = namedtuple("FileHandlerTuple", ("eventid", "rundir",
212 "parent", "clsname"))
215class AbstractFileHandler(FileHandlerTuple, ABC):
216 """
217 An abstract ``FileHandler`` class defining a partial ``FileHandler``
218 interface.
219 """
221 def __new__(cls, eventid_or_fh, rundir=None, parent=None):
222 """
223 Initialize a new ``FileHandler``. See ``FileHandler.__init__``
224 docstring for details.
225 """
226 if (hasattr(eventid_or_fh, 'eventid') and
227 hasattr(eventid_or_fh, 'rundir')):
228 uid = eventid_or_fh.eventid
229 rundir = eventid_or_fh.rundir if rundir is None else rundir
230 if hasattr(eventid_or_fh, 'parent'):
231 parent = eventid_or_fh.parent if parent is None else parent
232 elif isinstance(eventid_or_fh, str):
233 uid = eventid_or_fh
234 else:
235 raise ValueError(
236 """eventid_or_fh must be a str or have attributes "eventid" and
237 "rundir". instead, received: {}""".format(eventid_or_fh)
238 )
239 if not os.path.isdir(rundir):
240 raise ValueError(f'rundir must exist. rundir specified: {rundir}')
241 return FileHandlerTuple.__new__(
242 cls,
243 uid,
244 os.path.abspath(DEFAULT_RUN_DIR if rundir is None else rundir),
245 parent,
246 cls.__module__ + '.' + cls.__name__,
247 )
249 @abstractmethod
250 def generate(self, *args, **kwargs):
251 """
252 Generate a file safely.
253 """
256class RequiredAttributeMixin:
257 """
258 Class with a ``required_attributes`` classmethod that finds all
259 ``_REQUIRED`` attributes for itself and all superclasses. Use this feature
260 to check whether subclasses have implemented all required attributes from
261 their abstract superclasses.
263 To add new required attributes, specify a ``_REQUIRED`` tuple listing the
264 names of those attributes as ``str`` instances. If your superclasses
265 already define other required attributes thus, there is no need to
266 reinclude them in the new class's ``_REQUIRED`` tuple.
267 """
269 _REQUIRED = tuple()
271 @classmethod
272 def required_attributes(cls):
273 """Recursively fetch a set of required attributes for this class based
274 on its ``_REQUIRED`` attribute as well as the ``required_attributes``
275 class method of all of superclasses implementing it."""
276 # we explicitly do a member check, so ignore this pylint output
277 # pylint: disable=no-member
278 return set(cls._REQUIRED).union(a for b in cls.__bases__
279 if hasattr(b, 'required_attributes')
280 for a in b.required_attributes())
283ManifestTuple = namedtuple('ManifestTuple', ('eventdir',
284 'manifest_filehandlers'))
287class RiderFile(ManifestTuple, ABC):
288 """
289 A rider class that specifies ``filenames`` and ``fullpaths`` for a given
290 manifest and a given ``rider_fmt``.
291 """
293 @abstractproperty
294 def rider_fmt(self):
295 """The format string for this rider file. Must be implemented by
296 subclasses as a property or attribute."""
298 # pylint: disable=super-init-not-called
299 def __init__(self, eventdir, manifest_filehandlers):
300 """Return a ``TypeError`` if ``manifest_filehandlers`` is empty or if
301 the ``manifest_filehandlers`` for this instance do not all have the
302 exact same ``DEPENDENCIES`` and ``_generate`` method."""
303 if not manifest_filehandlers:
304 raise TypeError("Cannot have an empty manifest.")
305 dependencies = {frozenset(f.DEPENDENCIES)
306 for f in manifest_filehandlers}
307 if not len(dependencies) == 1:
308 raise TypeError("All ``FileHandler`` instances in "
309 "manifest_filehandlers must have the same "
310 "``DEPENDENCIES``. Got clashing sets: ",
311 format(dependencies))
312 generators = {type(f)._generate for f in self.manifest_filehandlers}
313 if not len(generators) == 1:
314 raise TypeError("All ``FileHandler`` instances in "
315 "manifest_filehandlers must have same "
316 "``_generate`` methods. Got clashing generators: ",
317 format(generators))
319 @property
320 def filenames(self):
321 """Get the name of the file indicating the cooldown status of this
322 file."""
323 return (self.rider_fmt.format(f.FILENAME)
324 for f in self.manifest_filehandlers)
326 @property
327 def fullpaths(self):
328 """Get the full path of the file indicating the cooldown status of this
329 file."""
330 return (os.path.join(self.eventdir, f) for f in self.filenames)
332 def exists(self):
333 """Check whether any of the rider files in this manifest exist."""
334 return any(os.path.exists(f) for f in self.fullpaths)
336 def delete(self):
337 """Delete all rider files associated with this manifest."""
338 for f in self.fullpaths:
339 if os.path.exists(f):
340 os.remove(f)
343class JsonRiderMixin:
344 """
345 Class for reading and writing JSON to multiple rider files in a
346 manifest.
347 """
349 def read_json(self, err=False):
350 """Returns ``None`` if none of the ``fullpaths`` exists, unless ``err``
351 is ``True``, in which case a ``FileNotFoundError`` will be raised."""
352 for path in self.fullpaths:
353 if os.path.isfile(path):
354 with open(path) as infile:
355 return json.load(infile)
356 if err:
357 raise FileNotFoundError("Rider file not found: self.fullpath")
358 return None
360 def write_json(self, outdict):
361 """Write ``outdict`` to all the files in ``self.fullpaths``."""
362 outdir = os.path.join(self.eventdir, LOCAL_TMPDIR_NAME)
363 os.makedirs(outdir, exist_ok=True)
364 with NamedTemporaryFile('w', prefix=self.rider_fmt.format('TMP'),
365 dir=outdir) as tmp:
366 json.dump(outdict, tmp, indent=4, sort_keys=True)
367 for outf in self.fullpaths:
368 if os.path.isfile(outf):
369 os.unlink(outf)
370 os.link(tmp.name, outf)
373def rider_mixin_factory(classname, **kwargs):
374 """Get a Mixin with name ``classname`` for ``FileHandler`` classes that
375 provides properties with the whose names are the ``kwargs`` keys and whose
376 return values are ``RiderFile`` instances initialized to that specific
377 ``FileHandler``. This implies that the values of ``kwargs`` are
378 ``RiderFile`` subclasses.
379 """
380 classdict = dict()
381 rider_doc_fmt = ("Get a ``{name}`` for this instance. See "
382 "``{module}.{name}`` for full documentation.")
383 for key, rider in kwargs.items():
384 def newmethod(self, rider=rider):
385 return rider(self.eventdir, self.manifest_filehandlers)
386 newmethod.__doc__ = rider_doc_fmt.format(name=rider.__name__,
387 module=rider.__module__)
388 newmethod.__name__ = key
389 classdict[key] = property(newmethod)
390 classdict['__doc__'] = (
391 """
392 Mixin with properties for fetching the following ``RiderFile``
393 subclass instances for the given ``FileHandler`` instance: {riders}
394 Generated by ``{__name__}.rider_mixin_factory``.
395 """
396 ).format(riders=', '.join(c.__name__ for c in kwargs.values()),
397 __name__=__name__)
398 return type(classname, (object,), classdict)
401class OptionalFeatureWarning(UserWarning):
402 """A warning indicating that an optional feature will not be available. You
403 will usually want to suppress this (except when running tests, evaluating
404 the status of a fresh install/upgrade, or debugging)."""
407class EnvVarRegistry:
408 """
409 Register environmental variables used by the pipeline in a thread safe way
410 for help documentation.
411 """
413 _OPTIONAL_ENV_VARS = queue.Queue()
415 @classmethod
416 def register(
417 cls: type,
418 varnames: List[str],
419 errmsg: str,
420 values: List[str],
421 module: str,
422 loaded: bool,
423 ) -> None:
424 """
425 Register the result of a call to ``optional_env_var``.
426 """
427 cls._OPTIONAL_ENV_VARS.put((varnames, errmsg, values, module,
428 loaded))
430 @classmethod
431 def print_and_quit(cls):
432 """
433 Print the list of registered environmental variables and exit.
434 """
435 registered = []
436 while True:
437 try:
438 reg = cls._OPTIONAL_ENV_VARS.get_nowait()
439 formatted = [
440 {
441 "name": name,
442 "loaded from env": reg[4],
443 "in module": reg[3],
444 "value assigned": reg[2][i],
445 "warning if not loaded": reg[1],
446 } for i, name in enumerate(reg[0])
447 ]
448 registered += formatted
449 except queue.Empty:
450 break
451 categories = {
452 "LOADED SUCCESSFULLY": [r for r in registered
453 if r['loaded from env']],
454 "NOT LOADED, DEFAULT USED":
455 [r for r in registered
456 if (not r['loaded from env'] and
457 r['value assigned'] is not None)],
458 "NOT LOADED, NO DEFAULT REGISTERED":
459 [r for r in registered
460 if (not r['loaded from env'] and
461 r['value assigned'] is None)],
462 }
463 for cat, regs in categories.items():
464 print(COLOR.bold(COLOR.underline(COLOR.magenta(f'{cat}:'))))
465 for reg in sorted(regs, key=lambda r: r['name']):
466 print(f' - {COLOR.green(reg["name"])}:')
467 for key in sorted(reg):
468 if key == 'name':
469 continue
470 val = str(reg[key])
471 if max(len(l) for l in val.split('\n')) > 72:
472 val = '|\n'+indent(val, ' '*8)
473 print(f' {COLOR.blue(key)}: {val}')
474 # yaml.dump(categories, stream=sys.stdout, default_style='|')
475 sys.exit()
478def optional_env_var(
479 varnames: List[str],
480 errmsg: str = '',
481 defaults: List[str] = None,
482 register: bool = True,
483) -> List[str]:
484 """Get environmental variables ``varnames`` from ``os.environ``. Log and
485 warn the user with an ``OptionalFeatureWarning`` if the environmental
486 variable is not set and return ``None``; otherwise, return the value of the
487 environmental variable. Registers the list of environmental variables to a
488 central list.
490 Parameters
491 ----------
492 varnames : List[str]
493 A list of environmental variables to import.
494 errmsg : str, optional
495 A descriptive message to display in logs and warnings if the user has
496 not configured the specified environmental variable. This will be
497 printed in addition to a default message explaining which environmental
498 variables were not available and which module tried to define them.
499 defaults : List[str]
500 A list of default values corresponding to ``varnames``. If any of the
501 variables are not defined, raise the warning and use defaults. If
502 ``defaults`` are not defined, return ``None`` for each variable.
503 register : bool, optional
504 Whether to register the environmental variable change with
505 ``EnvVarRegistry`` (which will allow its use to be printed at the
506 command line when requesting help).
508 Returns
509 -------
510 values : List[str]
511 The values of the environmental variables specified in ``varnames``,
512 with each replaced by ``None`` if it is not set.
514 Raises
515 ------
516 ValueError
517 If ``defaults`` is not None but has a different length than
518 ``varnames``.
519 """
520 if defaults is not None:
521 if len(varnames) != len(defaults):
522 raise ValueError("defaults and varnames must have the same length."
523 f" defaults: {defaults} varnames: {varnames}")
524 values = [os.environ.get(v, None) for v in varnames]
525 missing = [v for v in varnames if v not in os.environ]
526 if missing:
527 missingfmt = ', '.join(COLOR.red(m) for m in missing)
528 errmsg = (f"{sys._getframe(1).f_globals['__name__']} tried to load "
529 f"undefined environmental variables: {missingfmt} -- "
530 f"{COLOR.YELLOW}{dedent(errmsg)}{COLOR.CLEAR}")
531 LOGGER.debug(errmsg)
532 warnings.warn(errmsg, OptionalFeatureWarning)
533 if defaults is not None:
534 values = [d for d in defaults]
535 if register:
536 EnvVarRegistry.register(varnames, errmsg, values,
537 sys._getframe(1).f_globals['__name__'],
538 not missing)
539 return values
542# placeholders should immediately cause errors
543_PLACEHOLDER_CLASSES = dict()
544_PLACEHOLDER_FUNCTIONS = dict()
546# stubs should mimick desired functionality, but we still want to track them
547# because they might not be full replacements.
548_STUBS = dict()
551def placeholderclass(name, modulename, bases=(object)):
552 """
553 Define and return a placeholder class that raises a
554 ``NotImplementedError`` on instantiation.
556 Parameters
557 ----------
558 name : str
559 The name of the new class.
560 modulename : str
561 The name of the module you would like to import from.
562 bases : tuple
563 The base classes of the new placeholder class.
565 Returns
566 -------
567 newclass : type
568 A placeholder class that immediately raises a ``NotImplementedError``
569 when you try to instantiate it.
570 """
571 doc = "A placeholder for the {}.{} class. Will fail on init."
573 # we don't care about any of the arguments anyway; we just don't want to
574 # raise an exception related to the arguments because that will mask the
575 # NotImplementedError, which is the really important one, since that
576 # reminds us that this is a placeholder class.
577 # pylint: disable=unused-argument
578 def init(self, *args, **kwargs):
579 """
580 A placeholder initializer that immediately raises a
581 ``NotImplementedError`` when called.
582 """
583 msg = "This is a placeholder; {}.{} could not be loaded."
584 raise NotImplementedError(msg.format(modulename, name))
585 newclassdict = {
586 '__doc__': wrap(doc.format(modulename, name)),
587 '__init__': init
588 }
589 args = (name, modulename, bases)
590 if args not in _PLACEHOLDER_CLASSES:
591 _PLACEHOLDER_CLASSES[args] = type(name, bases, newclassdict)
592 return _PLACEHOLDER_CLASSES[args]
595def registerstub(name, modulename, stub):
596 """
597 Register a stub object. Just a way to keep track of when partial
598 implementations of objects have been defined as replacements for modules
599 that cannot be imported for whatever reason (since these stub objects might
600 not provide perfect/full functionality of the missing modules).
602 Parameters
603 ----------
604 name : str
605 The variable name of the stub object.
606 modulename : str
607 The name of the module where it is defined; you should probably set
608 this to ``__name__`` to get the name of the defining scope for the
609 stub.
610 stub : function or type
611 The stub object itself.
612 """
613 args = (name, modulename)
614 if args not in _STUBS:
615 _STUBS[args] = stub
616 else:
617 raise ValueError('Stub already registered: {}'.format(args))
620def MetaClassFactory(function, meth_names=None):
621 """
622 Create a MetaClass that wraps all methods in ``function``. Use it by
623 setting ``__metaclass__ = <your new class`` when declaring a class whose
624 methods should be wrapped thus.
626 Parameters
627 ----------
628 function: func
629 The function that should wrap each method of a class.
630 meth_names: list, optional
631 If provided, only wrap these methods. Otherwise, wrap all methods.
633 Raises
634 ------
635 TypeError
636 If ``meth_names`` is provided, all of the specified strings must refer
637 to functions in the class to be created, otherwise a ``TypeError`` will
638 be raised.
639 """
641 class MetaClass(type):
643 def __new__(cls, classname, bases, class_dict):
644 # create the new class to include base methods
645 mro = type('mro', bases, {})
646 resolved_dict = {k: class_dict.get(k, getattr(mro, k, None))
647 for k in set(dir(mro)).union(class_dict)}
648 class_dict
649 if meth_names is None:
650 items = {k: v for k, v in resolved_dict.items()
651 if type(v) is FunctionType}
652 else:
653 items = {k: resolved_dict[k] for k in meth_names}
654 if any(type(v) is not FunctionType for v in items.values()):
655 msg = (f"Wrapped class {classname} expected to have all "
656 f"attributes from list be functions: {meth_names}")
657 LOGGER.error(msg)
658 raise TypeError(msg)
659 LOGGER.debug("Wrapping classname %s, metaclass %s methods %s with "
660 "function %s", classname, cls, meth_names, function)
661 class_dict.update({k: function(v) for k, v in items.items()})
662 return super().__new__(cls, classname, bases, class_dict)
664 return MetaClass