Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# (c) Stefan Countryman, 2019 

2 

3""" 

4Primitive base classes used throughout the pipeline. 

5""" 

6 

7import os 

8import sys 

9import json 

10import warnings 

11import logging 

12import queue 

13from tempfile import NamedTemporaryFile 

14import yaml 

15from types import FunctionType 

16from textwrap import wrap, dedent, indent 

17from typing import List 

18from abc import ABC, abstractproperty, abstractmethod 

19from collections import namedtuple 

20 

21LOGGER = logging.getLogger(__name__) 

22LOCAL_TMPDIR_NAME = '.tmp.d' 

23 

24 

25# Colors for logging etc. 

26def _colorfmt(color, clear): 

27 """Return a string-formatting function that formats strings and colors them 

28 by wrapping them in the terminal escape code specified by ``color`` and 

29 then ending the string with escape code ``clear``.""" 

30 def colorfmt(fmt, *args): 

31 """Format a string using ``fmt`` as the format string and ``args`` as 

32 the arguments for it and wrap it in the terminal escape codes specified 

33 in this function's constructor.""" 

34 return color + fmt.format(*args) + clear 

35 return colorfmt 

36 

37 

38_BLUE = '\033[94m' 

39_RED = '\033[91m' 

40_GREEN = '\033[92m' 

41_MAGENTA = '\033[95m' 

42_YELLOW = '\033[93m' 

43_CLEAR = '\033[0m' 

44_BOLD = '\033[1m' 

45_UNDERLINE = '\033[4m' 

46 

47# A cointainer with strings for color formatting. Print these strings to 

48# STDOUT to set the color and formatting of the following text. Use 

49# ``COLOR.CLEAR`` to reset colors and formatting to default. 

50Colors = namedtuple( 

51 'Colors', 

52 ('BLUE', 'RED', 'GREEN', 'MAGENTA', 'YELLOW', 'CLEAR', 'BOLD', 

53 'UNDERLINE', 'blue', 'red', 'green', 'magenta', 'yellow', 'bold', 

54 'underline') 

55) 

56 

57COLOR = Colors( 

58 BLUE=_BLUE, 

59 RED=_RED, 

60 GREEN=_GREEN, 

61 MAGENTA=_MAGENTA, 

62 YELLOW=_YELLOW, 

63 CLEAR=_CLEAR, 

64 BOLD=_BOLD, 

65 UNDERLINE=_UNDERLINE, 

66 blue=_colorfmt(_BLUE, _CLEAR), 

67 red=_colorfmt(_RED, _CLEAR), 

68 green=_colorfmt(_GREEN, _CLEAR), 

69 magenta=_colorfmt(_MAGENTA, _CLEAR), 

70 yellow=_colorfmt(_YELLOW, _CLEAR), 

71 bold=_colorfmt(_BOLD, _CLEAR), 

72 underline=_colorfmt(_UNDERLINE, _CLEAR) 

73) 

74 

75 

76# pylint: disable=R0903 

77class GenerationError(IOError): 

78 """ 

79 An error meant to indicate that a given file could not be generated 

80 at the time its generate() method was called due to missing 

81 ``DEPENDENCIES`` (or some other benign failure mode). This error should be 

82 thrown when behavior is otherwise nominal as an indication that a 

83 particular file simply could not be generated at a particular moment in 

84 time. 

85 """ 

86 

87 

88class IntentException(GenerationError): 

89 """ 

90 A ``FileHandler`` is currently being generated and should be left alone for 

91 now. 

92 """ 

93 

94 

95class CoolDownException(GenerationError): 

96 """ 

97 A ``FileHandler`` is currently cooling down after a failed generation 

98 attempt and should be left alone for now. 

99 """ 

100 

101 

102class ImmutableDict(frozenset): 

103 """ 

104 A hashable, immutable namespace inspired by ``namedtuple``. Initialize 

105 by passing a dict or an iterable of ``(key, value)`` tuples. Attributes are 

106 accessible using dot notation or the map interface. 

107 """ 

108 

109 def __new__(cls, mappable): 

110 """See ``ImmutableDict`` class docstring for details.""" 

111 if not hasattr(mappable, 'items'): 

112 mappable = {k: v for k, v in mappable} 

113 return frozenset.__new__(cls, mappable.items()) 

114 

115 def __reduce__(self): 

116 """For some reason __getnewargs__ is ignored by pickler. This manual 

117 override works.""" 

118 return (type(self), (dict(self),)) 

119 

120 def items(self): 

121 """D.iteritems() -> an iterator over the (key, value) items of D""" 

122 return frozenset.__iter__(self) 

123 

124 def values(self): 

125 """D.itervalues() -> an iterator over the values of D""" 

126 for i in frozenset.__iter__(self): 

127 yield i[1] 

128 

129 def __iter__(self): 

130 """Iterate over just the keys, mimicking ``dict`` behavior.""" 

131 for i in frozenset.__iter__(self): 

132 yield i[0] 

133 

134 # methods below do not need to be redefined when changing the storage 

135 # behavior of a subclass 

136 

137 def get(self, k, d=None): # pylint: disable=invalid-name 

138 """D.get(k[,d]) -> D[k] if k in D, else d. d defaults to None.""" 

139 return self[k] if k in self else d 

140 

141 def keys(self): 

142 """D.iterkeys() -> an iterator over the keys of D""" 

143 return iter(self) 

144 

145 def __contains__(self, item): 

146 return item in self.keys() 

147 

148 def __getitem__(self, key): 

149 for item in self.items(): 

150 if item[0] == key: 

151 return item[1] 

152 raise KeyError(key) 

153 

154 def __repr__(self): 

155 return '{}({})'.format(type(self).__name__, repr(dict(self))) 

156 

157 

158class NamespaceMappable(ABC): 

159 """A mappable class implementing a dot-notation for accessing members.""" 

160 

161 @abstractmethod 

162 def __getitem__(self, key): 

163 """We need a dictionary interface to get items; this is supplied by 

164 __getitem__.""" 

165 

166 def __dir__(self): 

167 return sorted(set(dir(type(self)) + list(self.keys()))) 

168 

169 def __getattr__(self, key): 

170 try: 

171 return self[key] 

172 except KeyError: 

173 raise AttributeError() 

174 

175 

176class Hdf5Storage(object): 

177 """ 

178 A class for reading and writing to an HDF5 cache. Has a dictionary 

179 interface and abstracts away file opening and closing. 

180 """ 

181 

182 def __init__(self, filename): 

183 """Use ``filename`` as an HDF5 cache. Creates the file on first write if 

184 it doesn't exist.""" 

185 self.filename = os.path.abspath(filename) 

186 

187 def __setitem__(self, key, value): 

188 import h5py 

189 with h5py.File(self.filename, 'a') as h5file: 

190 if key in h5file: 

191 del h5file[key] 

192 h5file[key] = value 

193 

194 def __getitem__(self, key): 

195 import h5py 

196 with h5py.File(self.filename, 'a') as h5file: 

197 return h5file[key][()] 

198 

199 def __iter__(self): 

200 """Read all keys in at once so that we can close the file right 

201 away.""" 

202 import h5py 

203 with h5py.File(self.filename, 'a') as h5file: 

204 items = list(h5file) 

205 return (i for i in items) 

206 

207 

208# Implement FileHandler as a subclass of a namedtuple to ensure immutability 

209# and to make it possible to identify FileHandler instances without having to 

210# import FileHandler implementations 

211FileHandlerTuple = namedtuple("FileHandlerTuple", ("eventid", "rundir", 

212 "parent", "clsname")) 

213 

214 

215class AbstractFileHandler(FileHandlerTuple, ABC): 

216 """ 

217 An abstract ``FileHandler`` class defining a partial ``FileHandler`` 

218 interface. 

219 """ 

220 

221 def __new__(cls, eventid_or_fh, rundir=None, parent=None): 

222 """ 

223 Initialize a new ``FileHandler``. See ``FileHandler.__init__`` 

224 docstring for details. 

225 """ 

226 if (hasattr(eventid_or_fh, 'eventid') and 

227 hasattr(eventid_or_fh, 'rundir')): 

228 uid = eventid_or_fh.eventid 

229 rundir = eventid_or_fh.rundir if rundir is None else rundir 

230 if hasattr(eventid_or_fh, 'parent'): 

231 parent = eventid_or_fh.parent if parent is None else parent 

232 elif isinstance(eventid_or_fh, str): 

233 uid = eventid_or_fh 

234 else: 

235 raise ValueError( 

236 """eventid_or_fh must be a str or have attributes "eventid" and 

237 "rundir". instead, received: {}""".format(eventid_or_fh) 

238 ) 

239 if not os.path.isdir(rundir): 

240 raise ValueError(f'rundir must exist. rundir specified: {rundir}') 

241 return FileHandlerTuple.__new__( 

242 cls, 

243 uid, 

244 os.path.abspath(DEFAULT_RUN_DIR if rundir is None else rundir), 

245 parent, 

246 cls.__module__ + '.' + cls.__name__, 

247 ) 

248 

249 @abstractmethod 

250 def generate(self, *args, **kwargs): 

251 """ 

252 Generate a file safely. 

253 """ 

254 

255 

256class RequiredAttributeMixin: 

257 """ 

258 Class with a ``required_attributes`` classmethod that finds all 

259 ``_REQUIRED`` attributes for itself and all superclasses. Use this feature 

260 to check whether subclasses have implemented all required attributes from 

261 their abstract superclasses. 

262 

263 To add new required attributes, specify a ``_REQUIRED`` tuple listing the 

264 names of those attributes as ``str`` instances. If your superclasses 

265 already define other required attributes thus, there is no need to 

266 reinclude them in the new class's ``_REQUIRED`` tuple. 

267 """ 

268 

269 _REQUIRED = tuple() 

270 

271 @classmethod 

272 def required_attributes(cls): 

273 """Recursively fetch a set of required attributes for this class based 

274 on its ``_REQUIRED`` attribute as well as the ``required_attributes`` 

275 class method of all of superclasses implementing it.""" 

276 # we explicitly do a member check, so ignore this pylint output 

277 # pylint: disable=no-member 

278 return set(cls._REQUIRED).union(a for b in cls.__bases__ 

279 if hasattr(b, 'required_attributes') 

280 for a in b.required_attributes()) 

281 

282 

283ManifestTuple = namedtuple('ManifestTuple', ('eventdir', 

284 'manifest_filehandlers')) 

285 

286 

287class RiderFile(ManifestTuple, ABC): 

288 """ 

289 A rider class that specifies ``filenames`` and ``fullpaths`` for a given 

290 manifest and a given ``rider_fmt``. 

291 """ 

292 

293 @abstractproperty 

294 def rider_fmt(self): 

295 """The format string for this rider file. Must be implemented by 

296 subclasses as a property or attribute.""" 

297 

298 # pylint: disable=super-init-not-called 

299 def __init__(self, eventdir, manifest_filehandlers): 

300 """Return a ``TypeError`` if ``manifest_filehandlers`` is empty or if 

301 the ``manifest_filehandlers`` for this instance do not all have the 

302 exact same ``DEPENDENCIES`` and ``_generate`` method.""" 

303 if not manifest_filehandlers: 

304 raise TypeError("Cannot have an empty manifest.") 

305 dependencies = {frozenset(f.DEPENDENCIES) 

306 for f in manifest_filehandlers} 

307 if not len(dependencies) == 1: 

308 raise TypeError("All ``FileHandler`` instances in " 

309 "manifest_filehandlers must have the same " 

310 "``DEPENDENCIES``. Got clashing sets: ", 

311 format(dependencies)) 

312 generators = {type(f)._generate for f in self.manifest_filehandlers} 

313 if not len(generators) == 1: 

314 raise TypeError("All ``FileHandler`` instances in " 

315 "manifest_filehandlers must have same " 

316 "``_generate`` methods. Got clashing generators: ", 

317 format(generators)) 

318 

319 @property 

320 def filenames(self): 

321 """Get the name of the file indicating the cooldown status of this 

322 file.""" 

323 return (self.rider_fmt.format(f.FILENAME) 

324 for f in self.manifest_filehandlers) 

325 

326 @property 

327 def fullpaths(self): 

328 """Get the full path of the file indicating the cooldown status of this 

329 file.""" 

330 return (os.path.join(self.eventdir, f) for f in self.filenames) 

331 

332 def exists(self): 

333 """Check whether any of the rider files in this manifest exist.""" 

334 return any(os.path.exists(f) for f in self.fullpaths) 

335 

336 def delete(self): 

337 """Delete all rider files associated with this manifest.""" 

338 for f in self.fullpaths: 

339 if os.path.exists(f): 

340 os.remove(f) 

341 

342 

343class JsonRiderMixin: 

344 """ 

345 Class for reading and writing JSON to multiple rider files in a 

346 manifest. 

347 """ 

348 

349 def read_json(self, err=False): 

350 """Returns ``None`` if none of the ``fullpaths`` exists, unless ``err`` 

351 is ``True``, in which case a ``FileNotFoundError`` will be raised.""" 

352 for path in self.fullpaths: 

353 if os.path.isfile(path): 

354 with open(path) as infile: 

355 return json.load(infile) 

356 if err: 

357 raise FileNotFoundError("Rider file not found: self.fullpath") 

358 return None 

359 

360 def write_json(self, outdict): 

361 """Write ``outdict`` to all the files in ``self.fullpaths``.""" 

362 outdir = os.path.join(self.eventdir, LOCAL_TMPDIR_NAME) 

363 os.makedirs(outdir, exist_ok=True) 

364 with NamedTemporaryFile('w', prefix=self.rider_fmt.format('TMP'), 

365 dir=outdir) as tmp: 

366 json.dump(outdict, tmp, indent=4, sort_keys=True) 

367 for outf in self.fullpaths: 

368 if os.path.isfile(outf): 

369 os.unlink(outf) 

370 os.link(tmp.name, outf) 

371 

372 

373def rider_mixin_factory(classname, **kwargs): 

374 """Get a Mixin with name ``classname`` for ``FileHandler`` classes that 

375 provides properties with the whose names are the ``kwargs`` keys and whose 

376 return values are ``RiderFile`` instances initialized to that specific 

377 ``FileHandler``. This implies that the values of ``kwargs`` are 

378 ``RiderFile`` subclasses. 

379 """ 

380 classdict = dict() 

381 rider_doc_fmt = ("Get a ``{name}`` for this instance. See " 

382 "``{module}.{name}`` for full documentation.") 

383 for key, rider in kwargs.items(): 

384 def newmethod(self, rider=rider): 

385 return rider(self.eventdir, self.manifest_filehandlers) 

386 newmethod.__doc__ = rider_doc_fmt.format(name=rider.__name__, 

387 module=rider.__module__) 

388 newmethod.__name__ = key 

389 classdict[key] = property(newmethod) 

390 classdict['__doc__'] = ( 

391 """ 

392 Mixin with properties for fetching the following ``RiderFile`` 

393 subclass instances for the given ``FileHandler`` instance: {riders} 

394 Generated by ``{__name__}.rider_mixin_factory``. 

395 """ 

396 ).format(riders=', '.join(c.__name__ for c in kwargs.values()), 

397 __name__=__name__) 

398 return type(classname, (object,), classdict) 

399 

400 

401class OptionalFeatureWarning(UserWarning): 

402 """A warning indicating that an optional feature will not be available. You 

403 will usually want to suppress this (except when running tests, evaluating 

404 the status of a fresh install/upgrade, or debugging).""" 

405 

406 

407class EnvVarRegistry: 

408 """ 

409 Register environmental variables used by the pipeline in a thread safe way 

410 for help documentation. 

411 """ 

412 

413 _OPTIONAL_ENV_VARS = queue.Queue() 

414 

415 @classmethod 

416 def register( 

417 cls: type, 

418 varnames: List[str], 

419 errmsg: str, 

420 values: List[str], 

421 module: str, 

422 loaded: bool, 

423 ) -> None: 

424 """ 

425 Register the result of a call to ``optional_env_var``. 

426 """ 

427 cls._OPTIONAL_ENV_VARS.put((varnames, errmsg, values, module, 

428 loaded)) 

429 

430 @classmethod 

431 def print_and_quit(cls): 

432 """ 

433 Print the list of registered environmental variables and exit. 

434 """ 

435 registered = [] 

436 while True: 

437 try: 

438 reg = cls._OPTIONAL_ENV_VARS.get_nowait() 

439 formatted = [ 

440 { 

441 "name": name, 

442 "loaded from env": reg[4], 

443 "in module": reg[3], 

444 "value assigned": reg[2][i], 

445 "warning if not loaded": reg[1], 

446 } for i, name in enumerate(reg[0]) 

447 ] 

448 registered += formatted 

449 except queue.Empty: 

450 break 

451 categories = { 

452 "LOADED SUCCESSFULLY": [r for r in registered 

453 if r['loaded from env']], 

454 "NOT LOADED, DEFAULT USED": 

455 [r for r in registered 

456 if (not r['loaded from env'] and 

457 r['value assigned'] is not None)], 

458 "NOT LOADED, NO DEFAULT REGISTERED": 

459 [r for r in registered 

460 if (not r['loaded from env'] and 

461 r['value assigned'] is None)], 

462 } 

463 for cat, regs in categories.items(): 

464 print(COLOR.bold(COLOR.underline(COLOR.magenta(f'{cat}:')))) 

465 for reg in sorted(regs, key=lambda r: r['name']): 

466 print(f' - {COLOR.green(reg["name"])}:') 

467 for key in sorted(reg): 

468 if key == 'name': 

469 continue 

470 val = str(reg[key]) 

471 if max(len(l) for l in val.split('\n')) > 72: 

472 val = '|\n'+indent(val, ' '*8) 

473 print(f' {COLOR.blue(key)}: {val}') 

474 # yaml.dump(categories, stream=sys.stdout, default_style='|') 

475 sys.exit() 

476 

477 

478def optional_env_var( 

479 varnames: List[str], 

480 errmsg: str = '', 

481 defaults: List[str] = None, 

482 register: bool = True, 

483) -> List[str]: 

484 """Get environmental variables ``varnames`` from ``os.environ``. Log and 

485 warn the user with an ``OptionalFeatureWarning`` if the environmental 

486 variable is not set and return ``None``; otherwise, return the value of the 

487 environmental variable. Registers the list of environmental variables to a 

488 central list. 

489 

490 Parameters 

491 ---------- 

492 varnames : List[str] 

493 A list of environmental variables to import. 

494 errmsg : str, optional 

495 A descriptive message to display in logs and warnings if the user has 

496 not configured the specified environmental variable. This will be 

497 printed in addition to a default message explaining which environmental 

498 variables were not available and which module tried to define them. 

499 defaults : List[str] 

500 A list of default values corresponding to ``varnames``. If any of the 

501 variables are not defined, raise the warning and use defaults. If 

502 ``defaults`` are not defined, return ``None`` for each variable. 

503 register : bool, optional 

504 Whether to register the environmental variable change with 

505 ``EnvVarRegistry`` (which will allow its use to be printed at the 

506 command line when requesting help). 

507 

508 Returns 

509 ------- 

510 values : List[str] 

511 The values of the environmental variables specified in ``varnames``, 

512 with each replaced by ``None`` if it is not set. 

513 

514 Raises 

515 ------ 

516 ValueError 

517 If ``defaults`` is not None but has a different length than 

518 ``varnames``. 

519 """ 

520 if defaults is not None: 

521 if len(varnames) != len(defaults): 

522 raise ValueError("defaults and varnames must have the same length." 

523 f" defaults: {defaults} varnames: {varnames}") 

524 values = [os.environ.get(v, None) for v in varnames] 

525 missing = [v for v in varnames if v not in os.environ] 

526 if missing: 

527 missingfmt = ', '.join(COLOR.red(m) for m in missing) 

528 errmsg = (f"{sys._getframe(1).f_globals['__name__']} tried to load " 

529 f"undefined environmental variables: {missingfmt} -- " 

530 f"{COLOR.YELLOW}{dedent(errmsg)}{COLOR.CLEAR}") 

531 LOGGER.debug(errmsg) 

532 warnings.warn(errmsg, OptionalFeatureWarning) 

533 if defaults is not None: 

534 values = [d for d in defaults] 

535 if register: 

536 EnvVarRegistry.register(varnames, errmsg, values, 

537 sys._getframe(1).f_globals['__name__'], 

538 not missing) 

539 return values 

540 

541 

542# placeholders should immediately cause errors 

543_PLACEHOLDER_CLASSES = dict() 

544_PLACEHOLDER_FUNCTIONS = dict() 

545 

546# stubs should mimick desired functionality, but we still want to track them 

547# because they might not be full replacements. 

548_STUBS = dict() 

549 

550 

551def placeholderclass(name, modulename, bases=(object)): 

552 """ 

553 Define and return a placeholder class that raises a 

554 ``NotImplementedError`` on instantiation. 

555 

556 Parameters 

557 ---------- 

558 name : str 

559 The name of the new class. 

560 modulename : str 

561 The name of the module you would like to import from. 

562 bases : tuple 

563 The base classes of the new placeholder class. 

564 

565 Returns 

566 ------- 

567 newclass : type 

568 A placeholder class that immediately raises a ``NotImplementedError`` 

569 when you try to instantiate it. 

570 """ 

571 doc = "A placeholder for the {}.{} class. Will fail on init." 

572 

573 # we don't care about any of the arguments anyway; we just don't want to 

574 # raise an exception related to the arguments because that will mask the 

575 # NotImplementedError, which is the really important one, since that 

576 # reminds us that this is a placeholder class. 

577 # pylint: disable=unused-argument 

578 def init(self, *args, **kwargs): 

579 """ 

580 A placeholder initializer that immediately raises a 

581 ``NotImplementedError`` when called. 

582 """ 

583 msg = "This is a placeholder; {}.{} could not be loaded." 

584 raise NotImplementedError(msg.format(modulename, name)) 

585 newclassdict = { 

586 '__doc__': wrap(doc.format(modulename, name)), 

587 '__init__': init 

588 } 

589 args = (name, modulename, bases) 

590 if args not in _PLACEHOLDER_CLASSES: 

591 _PLACEHOLDER_CLASSES[args] = type(name, bases, newclassdict) 

592 return _PLACEHOLDER_CLASSES[args] 

593 

594 

595def registerstub(name, modulename, stub): 

596 """ 

597 Register a stub object. Just a way to keep track of when partial 

598 implementations of objects have been defined as replacements for modules 

599 that cannot be imported for whatever reason (since these stub objects might 

600 not provide perfect/full functionality of the missing modules). 

601 

602 Parameters 

603 ---------- 

604 name : str 

605 The variable name of the stub object. 

606 modulename : str 

607 The name of the module where it is defined; you should probably set 

608 this to ``__name__`` to get the name of the defining scope for the 

609 stub. 

610 stub : function or type 

611 The stub object itself. 

612 """ 

613 args = (name, modulename) 

614 if args not in _STUBS: 

615 _STUBS[args] = stub 

616 else: 

617 raise ValueError('Stub already registered: {}'.format(args)) 

618 

619 

620def MetaClassFactory(function, meth_names=None): 

621 """ 

622 Create a MetaClass that wraps all methods in ``function``. Use it by 

623 setting ``__metaclass__ = <your new class`` when declaring a class whose 

624 methods should be wrapped thus. 

625 

626 Parameters 

627 ---------- 

628 function: func 

629 The function that should wrap each method of a class. 

630 meth_names: list, optional 

631 If provided, only wrap these methods. Otherwise, wrap all methods. 

632 

633 Raises 

634 ------ 

635 TypeError 

636 If ``meth_names`` is provided, all of the specified strings must refer 

637 to functions in the class to be created, otherwise a ``TypeError`` will 

638 be raised. 

639 """ 

640 

641 class MetaClass(type): 

642 

643 def __new__(cls, classname, bases, class_dict): 

644 # create the new class to include base methods 

645 mro = type('mro', bases, {}) 

646 resolved_dict = {k: class_dict.get(k, getattr(mro, k, None)) 

647 for k in set(dir(mro)).union(class_dict)} 

648 class_dict 

649 if meth_names is None: 

650 items = {k: v for k, v in resolved_dict.items() 

651 if type(v) is FunctionType} 

652 else: 

653 items = {k: resolved_dict[k] for k in meth_names} 

654 if any(type(v) is not FunctionType for v in items.values()): 

655 msg = (f"Wrapped class {classname} expected to have all " 

656 f"attributes from list be functions: {meth_names}") 

657 LOGGER.error(msg) 

658 raise TypeError(msg) 

659 LOGGER.debug("Wrapping classname %s, metaclass %s methods %s with " 

660 "function %s", classname, cls, meth_names, function) 

661 class_dict.update({k: function(v) for k, v in items.items()}) 

662 return super().__new__(cls, classname, bases, class_dict) 

663 

664 return MetaClass