Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# (c) Stefan Countryman July 12, 2016 - 2018
3"""
4Abstract definitions of FileHandler classes. FileHandlers provide methods for
5defining and working with data associated with a trigger.
6"""
8import re
9import sys
10import os
11from abc import ABC, abstractmethod, abstractproperty
12import json
13import datetime
14import time
15import tempfile
16import filecmp
17import shutil
18import difflib
19import functools
20import traceback
21from subprocess import Popen, PIPE
22from mmap import mmap, ACCESS_READ
23from pathlib import Path
24from hashlib import sha256
25from copy import deepcopy
26from operator import xor
27import logging
28from collections import namedtuple, OrderedDict
29from tabulate import tabulate
30# DAGRR
31from llama.vetoes import (
32 VetoMixin,
33 VetoException,
34)
35from llama.flags import FlagsMixin, flag_table
36from llama.meta import MetaDataMixin
37from llama.versioning import GitDirMixin
38from llama.lock import LockMixin
39from llama.intent import CoolDownParams, CoolDownMixin, IntentMixin
40# DAGRR
41from llama.utils import (
42 COLOR as COL,
43 tbhighlight,
44 color,
45 EDGEFMT,
46 DOTFMT,
47 DEFAULT_RUN_DIR,
48 GenerationError,
49 vecstr,
50 veccls,
51 vecfh,
52 plot_graphviz,
53 sizeof_fmt,
54)
55from llama.classes import (
56 LOCAL_TMPDIR_NAME,
57 RequiredAttributeMixin,
58 ImmutableDict,
59 NamespaceMappable,
60 AbstractFileHandler,
61)
62from llama.filehandler.classes import GenerationResult
63from llama.io.registry import get_io
64from llama.io.classes import IOMixin
66NODELEGENDFMT = (r'"l" [label=<{{{{<B>FILE STATUS LEGEND</B>|{rows}}}}}>, '
67 r'shape="record", style=filled];')
68NODELEGEND_COLOR_FMT = r'<FONT COLOR="{color}">▉</FONT> <I>{label}</I>'
69# colors taken from: https://www.graphviz.org/doc/info/colors.html
70NODELEGEND_COLORS = OrderedDict()
71NODELEGEND_COLORS["EXISTS"] = "#7fffd4" # aquamarine
72NODELEGEND_COLORS["EXISTS, LOCKED"] = "#ffffff" # white
73NODELEGEND_COLORS["EXISTS, VETOED"] = "#ffffff" # white
74NODELEGEND_COLORS["OBSOLETE"] = "#ffff33" # dim gold
75NODELEGEND_COLORS["ANCESTOR OBSOLETE"] = "#cfcf00" # yellow
76NODELEGEND_COLORS["VETOED"] = "#999999" # gray
77NODELEGEND_COLORS["ANCESTOR VETOED"] = "#cccccc" # gray
78NODELEGEND_COLORS["FAILED, COOLING DOWN"] = "#ff0000" # red
79NODELEGEND_COLORS["FAILED"] = "#ff3322" # light red
80NODELEGEND_COLORS["AWAITS EXTERNAL TRIGGER"] = "#984ea3" # purple
81NODELEGEND_COLORS["ANCESTOR AWAITS TRIGGER"] = "#dd83ea" # light purple
82NODELEGEND_COLORS["READY TO GENERATE"] = "#7fff00" # chartreuse
83NODELEGEND_COLORS["ANCESTORS EXIST"] = "#bdff7a" # pale green
84NODELEGEND_COLORS["NOT YET MADE"] = "#ff0000" # red
85NODELEGEND_COLORS["ANCESTOR COOLING DOWN"] = "#ff7f00" # orange
86NODELEGEND_COLORS["ANCESTOR FAILED"] = "#ffa954" # light orange
87NODELEGEND_COLORS["GENERATING"] = "#f781bf" # pink
88NODELEGEND_COLORS["REGENERATING"] = "#f781bf" # pink
89NODELEGEND = NODELEGENDFMT.format(
90 rows=r'<BR ALIGN="LEFT"/>'.join(
91 [
92 NODELEGEND_COLOR_FMT.format(color=c, label=l)
93 for l, c in NODELEGEND_COLORS.items()
94 ] + [""] # put in one last aligning line break
95 )
96)
97NODEFMT = (r'"{num}" [label=<{{{{<B>{name}</B>|<B>{status}</B>|<I>{fname}</I>'
98 r'{stats}}}}}>, shape="record", style=filled, '
99 r'{url}'
100 r'fillcolor="{color}"];')
101NODEFLAGSFMT = (r'"f" [label=<{{{{<B>EVENT FLAGS</B>|{rows}}}}}>, '
102 r'shape="record", '
103 r'style=filled, '
104 r'];')
105NODEFLAGS_ROW_FMT = r'<B>{flag}:</B> <I>{value}</I>'
106LOGGER = logging.getLogger(__name__)
107TEXT_FILE_EXTENSIONS = (
108 'tex',
109 'txt',
110 'log',
111 'csv',
112 'tsv',
113 'json',
114 'yml',
115 'xml',
116)
117NOW = datetime.datetime.now
118BUFSIZE = 2**17
121# this is just a function, so don't give it an uppercase constant var name
122# pylint: disable=invalid-name
123utcnow = datetime.datetime.utcnow
126# this is just a function, so don't give it an uppercase constant var name
127# pylint: disable=invalid-name
128fromtimestamp = datetime.datetime.fromtimestamp
131# A bunch of checks for the file handler and the query arg. See
132# ``FileGraph.downselect`` for usage. Each check should be a function taking
133# a FileHandler instance as its first argument and some kind of query value
134# against which the FileHandler instance will be judged. If the variable
135# returns true, the check has passed.
136# NOTE Make sure to update the ``FileGraph.downselect`` docstring when
137# adding, removing, or changing checks below!
138DOWNSELECT_CHECKS = {
139 'equals': vecfh(lambda fh, q: fh == q),
140 'instanceof': veccls(isinstance),
141 'type': veccls(lambda fh, q: type(fh) is q), # pylint: disable=C0123
142 'typename': vecstr(lambda fh, q: type(fh).__name__ == q),
143 'extension': vecstr(lambda fh, q: fh.FILENAME.split('.')[-1] == q),
144 'startswith': vecstr(lambda fh, q: fh.FILENAME.startswith(q)),
145 'endswith': vecstr(lambda fh, q: fh.FILENAME.endswith(q)),
146 'inname': vecstr(lambda fh, q: q in fh.FILENAME),
147 'nameis': vecstr(lambda fh, q: fh.FILENAME == q),
148 'depends': veccls(lambda fh, q: q in fh.DEPENDENCIES),
149 'ancestor': veccls(lambda fh, q: q in fh.UR_DEPENDENCIES),
150 'descendent': veccls(lambda fh, q: type(fh) in q.UR_DEPENDENCIES),
151 'dependsname': vecstr(lambda fh, q: q in (d.__name__ for d in
152 fh.DEPENDENCIES)),
153 'dependsfile': vecstr(lambda fh, q: q in (d('foo').FILENAME
154 for d in fh.DEPENDENCIES)),
155 'exists': lambda fh, q: fh.exists() is q,
156 'cooldown': lambda fh, q: fh.cooldown.in_progress() is q,
157 'intent': lambda fh, q: fh.intent.in_progress() is q,
158 'vetoed': lambda fh, q: fh.veto.permanently_vetoed() is q,
159 'urvetoed': lambda fh, q: bool([f for f in fh.UR_DEPENDENCIES
160 if f(fh).veto.permanently_vetoed()]) is q,
161 'obsolete': lambda fh, q: fh.exists() and fh.is_obsolete() is q,
162 'selfobsolete': lambda fh, q: (fh.exists() and
163 fh.is_obsolete(ancestors=False) is q),
164 'depsmet': lambda fh, q: fh.are_dependencies_met() is q,
165}
166# this references other downselect check functions, so it can't be placed in
167# the dictionary literal declaration of DOWNSELECT_CHECKS.
168DOWNSELECT_CHECKS['needregen'] = lambda fh, q: q is (
169 DOWNSELECT_CHECKS['vetoed'](fh, False) and
170 DOWNSELECT_CHECKS['urvetoed'](fh, False) and
171 DOWNSELECT_CHECKS['cooldown'](fh, False) and
172 DOWNSELECT_CHECKS['depsmet'](fh, True) and
173 (
174 DOWNSELECT_CHECKS['exists'](fh, False) or
175 DOWNSELECT_CHECKS['selfobsolete'](fh, True)
176 )
177)
178DOWNSELECT_CHECKS['subgraph'] = lambda fh, q: (
179 DOWNSELECT_CHECKS['descendent'](fh, q) or
180 DOWNSELECT_CHECKS['type'](fh, q)
181)
182Status = namedtuple("Status", ("status", "stats", "color"))
185FileGraphTuple = namedtuple("FileGraphTuple", ("eventid", "rundir",
186 "pipeline"))
189class FileGraph(ImmutableDict, NamespaceMappable):
190 """
191 Used to store a list of ``FileHandler`` instances, e.g. those associated
192 with a particular GraceDB event. In that example, one might access the
193 LvcSkymapHdf5 file handler associated with some event using
195 > event.files.LvcSkymapHdf5
197 Has the nice feature of being able to take a dictionary as an
198 initialization argument and create a dot-notation acessible map from that
199 dictionary's key-value pairs.
200 """
202 def status(self):
203 """
204 Return a status description of this graph for use in status-checking
205 scripts and webpages. Values are taken from NODELEGEND_COLOR_FMT so
206 that they can be colored appropriately.
207 """
208 if not any(fh.exists() for fh in self.values()):
209 status = "NOT YET STARTED"
210 color = NODELEGEND_COLORS["NOT YET MADE"]
211 elif any((fh.intent.in_progress() or
212 (fh.modtime(ts=0).timestamp() > time.time() - 2))
213 for fh in self.values()):
214 status = "GENERATING FILES"
215 color = NODELEGEND_COLORS["GENERATING"]
216 elif next(x for x in self.values()).flags['MANUAL'] == 'true':
217 status = "MANUAL MODE"
218 color = NODELEGEND_COLORS["ANCESTOR AWAITS TRIGGER"]
219 else:
220 status = "EXISTS"
221 color = NODELEGEND_COLORS["EXISTS"]
222 stats = ""
223 return Status(status, stats, color)
225 def dependency_graph_term(self, unicode=True, plot=None, highlight=None):
226 """
227 Like ``dependency_graph``, but return a terminal-friendly text plot of
228 this ``FileGraph`` with current statuses as a string. By default, uses
229 unicode characters to print a more legible graph; use pure ascii by
230 passing ``unicode=False``. Highlight a specific line in the output
231 table by specifying a substring in that row as ``highlight``.
233 Skip the graph plot with ``plot=False``; if
234 ``plot=None``, try to make the graph but give up if ``Graph::Easy`` is
235 not installed.
237 The plot requires ``Perl`` and the ``Graph::Easy`` module to be
238 installed (with the ``graph-easy`` script in the current path); if
239 ``plot=True`` and ``graph-easy`` is not available, a ``FileNotFound``
240 error will be raised.
241 """
242 # color code info: https://stackoverflow.com/a/33206814/3601493
243 if not len({fh.eventdir for fh in self.values()}) == 1:
244 raise ValueError(("All ``FileHandler`` instances must refer to "
245 "same event. Instead, got: {}").format(self))
246 tty = shutil.get_terminal_size((200, 20))
247 RES_FMT = "{graph}{flagtab}\n\n{tab}"
249 def bold(tab):
250 if not unicode or not highlight:
251 return tab
252 tablines = tab.split('\n')
253 hl = [highlight] if isinstance(highlight, str) else highlight
254 for i, line in enumerate(tablines):
255 if all(h not in line for h in hl):
256 continue
257 tablines[i] = re.sub('([^|]+)',
258 lambda m: (COL.BOLD + COL.MAGENTA +
259 m.group(1) + COL.CLEAR), line)
260 return '\n'.join(tablines)
262 # let graph-easy start while we sort out subgraph status
263 if plot is not False:
264 cmd = ['graph-easy', '--as=boxart' if unicode else '--as=ascii']
265 try:
266 proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
267 plot = True
268 except FileNotFoundError:
269 if plot is True:
270 raise
271 else:
272 plot = False
273 try:
274 obscheck = dict()
275 statuses = {fh: fh.status(obscheck=obscheck) for fh in
276 self.values()}
277 nodes = sorted(self.values(), key=lambda f:
278 (f"{len(f.UR_DEPENDENCIES):05d}" +
279 type(f).__name__))
280 flagtab = flag_table(list(self.values())[0].flags, color=unicode)
281 if plot:
282 graphtxt = ''
283 for i, fh in enumerate(nodes):
284 for d in fh.DEPENDENCIES:
285 graphtxt += f'[ {nodes.index(d(fh))} ] -> [ {i} ]\n'
286 res, err = proc.communicate(input=graphtxt.encode())
287 if proc.returncode:
288 msg = (f"graph-easy failed with code {proc.returncode}.\n"
289 f"STDOUT:\n{res}\nSTDERR:\n{err}")
290 LOGGER.error(msg)
291 raise RuntimeError(msg)
292 LOGGER.debug(f"Generated text dependency graph, stderr: {err}")
293 except: # noqa
294 if plot:
295 LOGGER.error("Text dep graph failed, killing graph-easy")
296 proc.kill()
297 raise
298 graph = res.decode()+'\n\n' if plot else ''
300 # make a table
301 if tty.columns < 200:
302 tabs = [
303 tabulate(
304 [
305 (i, type(fh).__name__, statuses[fh].status,
306 ','.join(str(ii) for ii in sorted(nodes.index(d(fh))
307 for d in
308 fh.DEPENDENCIES)))
309 for i, fh in enumerate(nodes)
310 ],
311 ['No.', 'Class', 'Status', 'Deps',],
312 # tablefmt='fancy_grid' if unicode else 'grid',
313 tablefmt='presto',
314 ),
315 tabulate(
316 [
317 (i, fh.FILENAME,
318 '; '.join([' '.join(s.split())
319 for s in statuses[fh].stats]))
320 for i, fh in enumerate(nodes)
321 ],
322 ['No.', 'File Name', 'Stats'],
323 # tablefmt='fancy_grid' if unicode else 'grid',
324 tablefmt='presto',
325 ),
326 ]
327 else:
328 tabs = [
329 tabulate(
330 [
331 (i, type(fh).__name__, statuses[fh].status,
332 ','.join(str(ii) for ii in sorted(nodes.index(d(fh))
333 for d in
334 fh.DEPENDENCIES)),
335 fh.FILENAME,
336 '; '.join([' '.join(s.split())
337 for s in statuses[fh].stats]))
338 for i, fh in enumerate(nodes)
339 ],
340 ['No.', 'Class', 'Status', 'Deps', 'File Name', 'Stats'],
341 # tablefmt='fancy_grid' if unicode else 'grid',
342 tablefmt='presto',
343 ),
344 ]
345 if not unicode:
346 tab = '\n\n'.join(bold(t) for t in tabs)
347 return RES_FMT.format(graph=graph, flagtab=flagtab, tab=tab)
348 # graph = (COL.MAGENTA +
349 # graph.replace('\n', COL.CLEAR+'\n'+COL.MAGENTA) + COL.CLEAR)
350 tablines = tabs[0].split('\n')
351 rowpattern = '('+re.sub('[^|]', '.',
352 tablines[0]).replace('|', r')(\|)(')+')'
353 for i in range(len(tablines)):
354 if len(tablines[i]) < 2:
355 continue
356 if not tablines[i][1] in ' 0123456789':
357 continue
358 split = tablines[i].split('|')
359 status = split[2].strip()
360 if status not in NODELEGEND_COLORS:
361 continue
362 split[2] = (color(bg=NODELEGEND_COLORS[status], fg='#000000') +
363 split[2] + COL.CLEAR)
364 # color deps
365 deps = re.split('(\d+)', split[3])
366 for ii in range(1, len(deps), 2):
367 dstat = statuses[nodes[int(deps[ii])]].status
368 deps[ii] = (color(fg=NODELEGEND_COLORS[dstat]) +
369 deps[ii] + COL.CLEAR)
370 split[3] = ''.join(deps)
371 tablines[i] = '|'.join(split)
372 tabs[0] = '\n'.join(tablines)
373 # color graph node boxes
374 if plot:
375 graphlines = graph.split('\n')
376 coords = [[] for l in graphlines]
377 for n, node in enumerate(nodes):
378 for i, gline in enumerate(graphlines):
379 parts = re.split(f'([ ]+{n}[ ]+)', gline)
380 if len(parts) == 1:
381 continue
382 j_start = len(parts[0])
383 j_end = j_start + len(parts[1])
384 i_start = i_end = i
385 coords[i].append([j_start, j_end, n])
386 while graphlines[i_start-1][j_start] == ' ':
387 i_start -= 1
388 coords[i_start].append([j_start, j_end, n])
389 while graphlines[i_end+1][j_start] == ' ':
390 i_end += 1
391 coords[i_end].append([j_start, j_end, n])
392 # for ii in [i_start-1, i_end+1]:
393 # graphlines[ii] = (graphlines[ii][:j_start-1] +
394 # ' '*(j_end-j_start+2) +
395 # graphlines[ii][j_end+1:])
396 # coords[ii].append([j_start, j_end, n])
397 break
398 else:
399 continue
400 for i, gline in enumerate(graphlines):
401 colorline = [gline]
402 for j_start, j_end, n in sorted(coords[i], key=lambda c: -c[0]):
403 col = color(bg=NODELEGEND_COLORS[statuses[nodes[n]].status],
404 fg="#000000")
405 colorline = [colorline[0][:j_start],
406 col+colorline[0][j_start:j_end]+COL.CLEAR,
407 colorline[0][j_end:]] + colorline[1:]
408 # colorline = [colorline[0][:j_start-1],
409 # col+' '+colorline[0][j_start:j_end]+' '+COL.CLEAR,
410 # colorline[0][j_end+1:]] + colorline[1:]
411 graphlines[i] = ''.join(colorline)
412 graph = '\n'.join(graphlines)
413 tab = '\n\n'.join(bold(t) for t in tabs)
414 return RES_FMT.format(graph=graph, flagtab=flagtab, tab=tab)
416 def dependency_graph(self, outfile: str = None, title: str = "FileGraph",
417 urls: str = None, bgcolor: str = 'black'):
418 """
419 Return a graphviz .dot graph of the ``FileHandler`` instances in this
420 ``FileGraph`` and their ``DEPENDENCIES`` on each other. Plot their
421 status (whether the file exists) as well as their metadata. Optionally
422 plot the graph to an output image file visualizing the graph.
424 Parameters
425 ----------
426 outfile : str, optional
427 If not provided, return a string in ``.dot`` file format specifying
428 graph relations. If an output file is specified, infer the filetype
429 and write to that file.
430 title : str, optional
431 The name of the graph to use. If not provided, "FileGraph"
432 will be used.
433 urls : str, optional
434 A format string for including URLs attributes for each
435 ``FileHandler`` node. If provided, include a URL attribute pointing
436 to each FILENAME for use on the summary page website, where the
437 ``FILENAME`` of each ``FileHandler`` instance will be provided to
438 the ``urls`` format string as the only ``format`` argument. The URL
439 attribute is not compatible with all ``graphviz`` output formats
440 (http://www.graphviz.org/doc/info/attrs.html#d:URL) so make sure to
441 turn it off if publishing to an incompatible format.
442 bgcolor : str, optional
443 The background color to use for the generated plot.
445 Returns
446 -------
447 dot : str
448 The dependency graph in ``.dot`` format (can be used as input to
449 ``dot`` at the command line). This is returned regardless of
450 whether an outfile is specified.
452 Raises
453 ------
454 ValueError
455 If the ``FileHandler`` instances in this ``FileGraph`` do not all
456 refer to the same event.
458 Optional file extensions for outfile:
460 - *dot*: just save the dotfile in .dot format.
461 - *png*: save the image in PNG format.
462 - *pdf*: save the image in PDF format.
463 - *svg*: save the image in svg format.
464 """
465 if not len({fh.eventdir for fh in self.values()}) == 1:
466 raise ValueError(("All ``FileHandler`` instances must refer to "
467 "same event. Instead, got: {}").format(self))
468 nodes = {fh: i for i, fh in enumerate(self.values())}
469 obscheck = dict()
470 statuses = {fh: fh.status(obscheck=obscheck) for fh in nodes}
471 flagnode = NODEFLAGSFMT.format(rows=r'<BR ALIGN="LEFT"/>'.join([
472 NODEFLAGS_ROW_FMT.format(flag=f, value=v)
473 for f, v in list(self.values())[0].flags.items()
474 ] + [""])) # get a trailing left-aligned line break
476 def join_stats(stats):
477 """Add formatting to stats list."""
478 if stats:
479 stats.append("") # one last BR to ensure left-aligned
480 return "|" + '<BR ALIGN="LEFT"/>'.join(stats)
481 else:
482 return ""
484 nodedot = NODELEGEND + '\n' + flagnode + '\n' + '\n'.join(
485 NODEFMT.format(
486 num=i,
487 name=type(fh).__name__,
488 fname=fh.FILENAME,
489 url=(
490 'URL="{}", '.format(urls.format(fh.FILENAME))
491 if urls else ''
492 ),
493 status=statuses[fh].status,
494 stats=join_stats(statuses[fh].stats),
495 color=statuses[fh].color,
496 ) for fh, i in nodes.items()
497 )
498 edgedot = '\n'.join(EDGEFMT.format(depnum=nodes[d(fh)], num=i,
499 color=statuses[d(fh)].color)
500 for fh, i in nodes.items()
501 for d in fh.DEPENDENCIES)
502 dot = DOTFMT.format(name=title, nodes=nodedot, edges=edgedot,
503 bgcolor=bgcolor)
504 if outfile is not None:
505 plot_graphviz(dot, outfile)
506 return dot
508 def _check_compatible(self, other):
509 """
510 Check whether these two ``FileGraph`` instances are consistent and can
511 thus be added/subtracted etc. Raises a ``ValueError`` if they are not
512 compatible.
513 """
514 if not isinstance(other, FileGraph):
515 raise ValueError(("Both objects must be ``FileGraph`` "
516 "instances, instead got {} of type"
517 "{}").format(other, type(other)))
518 contradictions = {k: (self[k], other[k])
519 for k in self
520 if k in other and self[k] != other[k]}
521 if contradictions:
522 raise ValueError(("Both objects must have matching values when "
523 "keys match to be added. The following keys "
524 "did not match between ``FileGraph`` 1 {} "
525 "and 2 {}: {}").format(self, other,
526 contradictions))
528 def __add__(self, other):
529 """
530 Get the union of two ``FileGraph`` instances.
531 """
532 self._check_compatible(other)
533 selfdict = dict(self)
534 selfdict.update(other)
535 return FileGraph(selfdict)
537 def __sub__(self, other):
538 """
539 Return a ``FileGraph`` with all the elements of ``self`` and none
540 of the elements of ``other``. Works even if ``other`` contains
541 ``FileHandler`` instances that are not in ``self``.
542 """
543 self._check_compatible(other)
544 return FileGraph({k: v for k, v in self.items() if k not in other})
546 def downselect(self, invert=False, reducer=all, **kwargs):
547 """
548 Get a ``FileGraph`` of file handlers that match *all* of the
549 provided criteria. Checks in this docstring are defined in
550 DOWNSELECT_CHECKS.
552 Parameters
553 ----------
554 invert : bool
555 Invert results. (Default: False)
556 reducer : function
557 Specify the ``any`` builtin to match if any check passes. Specify
558 ``all`` to match only when every check passes. (Default: ``all``)
559 equals : AbstractFileHandler or list
560 The filehandler must equal the provided ``FileHandler`` instance
561 or, if a list of instances is provided, it must be equal to one of
562 them.
563 instanceof : type
564 The ``FileHandler`` must be an instance of this class.
565 type : type
566 The type of the ``FileHandler`` must exactly match.
567 typename : str or list
568 The FileHandler type's name must match this string (or one of
569 these strings if given an iterable of strings).
570 extension : str or list
571 The file extension for the file name (or one of these strings if
572 given an iterable of strings).
573 startswith : str or list
574 The file name starts with this string (or one of these strings if
575 given an iterable of strings).
576 endswith : str or list
577 The file name ends with this string (or one of these strings if
578 given an iterable of strings).
579 inname : str or list
580 The file name contains this substring (or one of these strings if
581 given an iterable of strings).
582 nameis : str or list
583 The filename equals this string (or one of these strings if given
584 an iterable of strings).
585 depends : type or list
586 Has this ``FileHandler`` class (or one of these ``FileHandler``
587 classes) as a dependency.
588 ancestor : type or list
589 Has this ``FileHandler`` class (or one of these ``FileHandler``
590 classes) as an ur dependency, i.e. steps *AFTER* this
591 ``FileHandler``.
592 descendent : type or list
593 Has this ``FileHandler`` class (or one of these ``FileHandler``
594 classes) as a descendent, i.e. steps *BEFORE* this ``FileHandler``.
595 subgraph : type or list
596 Same as ``descendent``, but also include FileHandlers that are
597 instances of the query arguments. Defines a subgraph of the
598 original ``FileGraph`` graph containing only the given
599 ``FileHandler`` instances and their ``DEPENDENCIES``.
600 dependsname : str or list
601 Has the ``FileHandler`` with this name (or one of these names if
602 given an iterable of strings) as a dependency.
603 dependsfile : str or list
604 Has the ``FileHandler`` with this filename (or one of these names
605 if given an iterable of strings) as a dependency.
606 exists : bool
607 Whether the returned ``FileHandler`` instances' output files exist.
608 cooldown : bool
609 Whether the returned ``FileHandler`` instances are currently
610 cooling down.
611 intent : bool
612 Whether the returned ``FileHandler`` instances are currently
613 being generated.
614 vetoed : bool
615 Whether the returned ``FileHandler`` instances have been
616 permanently vetoed.
617 urvetoed : bool
618 Whether the returned ``FileHandler`` instances have had any of
619 their ancestors vetoed. Does not check the instances themselves
620 (combine with ``vetoed`` for that).
621 obsolete : bool
622 Whether the returned ``FileHandler`` instances'
623 output files exist but are obsoleted due to newer input data being
624 available. **NB** Obsolescense *implies* that the file exists! Will
625 not include ``FileHandler`` results whose files don't exist.
626 selfobsolete : bool
627 Same as ``obsolete``, but don't run obsolescence checks on each
628 file's ancestors; only run checks relevant relevant to each
629 ``FileHandler`` instance.
630 depsmet : bool
631 Whether the returned ``FileHandler`` instances' ``DEPENDENCIES``
632 have been met. These can be generated immediately if they don't
633 exist.
634 needregen : bool
635 Whether the returned ``FileHandler`` instances that have their
636 ``DEPENDENCIES`` met and either exist (but are obsolete) or have
637 not been generated yet.
639 Returns
640 -------
641 matches : FileGraph
642 A new ``FileGraph`` instance containing a subset of the
643 ``FileHandler`` instances for this ``FileGraph`` that match the
644 given downselection criteria.
645 """
646 return type(self)({
647 key: value for key, value in self.items()
648 if xor(
649 invert,
650 reducer(DOWNSELECT_CHECKS[query_name](value, query_value)
651 for query_name, query_value in kwargs.items())
652 )
653 })
655 def update(self, **downselect):
656 """
657 Generate any files that fit the FileGraph downselection
658 criteria specified in ``downselect``. By default, generate all files
659 that have not been generated and regenerate all files that have been
660 obsoleted because their data ``DEPENDENCIES`` have changed. Will only
661 generate files that are *immediately* generateable; if some of the
662 files you want to generate depend on files that haven't been generated
663 yet, you'll need to keep running this method until you've generated
664 each successive layer of dependencies (and, finally, your target
665 files). You can do this by running the method repeatedly until it
666 returns ``False``.
668 Parameters
669 ----------
670 **downselect
671 Keyword arguments that can be passed to ``Event.downselect`` to
672 narrow down the set of ``FileHandler`` instances that should be
673 generated.
675 Returns
676 -------
677 files_submitted_for_generation : Iterable
678 An iterable of ``FileHandler`` instances that have been checked out
679 and submitted for generation.
680 """
681 if not downselect:
682 downselect = ImmutableDict({'needregen': True})
683 regenlist = self.downselect(**downselect)
684 # only generate files whose DEPENDENCIES do not also need to be
685 # regenerated (in order to maintain self-consistency)
686 notyet = regenlist.downselect(ancestor=[type(f) for f in
687 regenlist.values()])
688 regenlist = regenlist - notyet
689 if not regenlist:
690 return tuple()
691 LOGGER.debug("Regenlist (downselection) to update (after removing "
692 "obsolete ``DEPENDENCIES``): %s", regenlist)
693 return get_io('file').generate.submit(regenlist)
696def recursive_obsolescence(func):
697 """
698 Store ``is_obsolete`` values for repeated calls to make sure that we don't
699 recompute them while recursively checking ``is_obsolete`` values.
700 """
701 @functools.wraps(func)
702 def wrapper(self, checked=None, **kwargs):
703 if checked is None:
704 checked = dict()
705 if self in checked:
706 return checked[self]
707 checked[self] = func(self, checked=checked, **kwargs)
708 return checked[self]
709 return wrapper
712class FileHandler(AbstractFileHandler, RequiredAttributeMixin,
713 IntentMixin, CoolDownMixin, GitDirMixin, FlagsMixin,
714 VetoMixin, LockMixin, MetaDataMixin, IOMixin):
715 """
716 A class for generating, opening, and checking existence of data files
717 associated with these events. Specify the maximum amount of time that
718 should be spent generating each file by setting the TIMEOUT attribute of
719 the relevant implementation class. *Instances are immutable* other than
720 their ``parent`` and ``graph`` attributes.
722 ``FileHandler`` instances can check whether their corresponding output data
723 has been generated and stored. If the data in their input ``DEPENDENCIES``
724 have changed, they can dynamically check whether their corresponding
725 outputs need to be regenerated. Use ``DEP_CHECKSUM_KWARGS`` to specify
726 which subsets of input data are relevant to each ``FileHandler``; these
727 keyword arguments will be fed to the ``checksum`` methods of each
728 ``dependency`` to see whether relevant subsets of input data have changed
729 (causing the current version of the ``FileHandler`` instance to become
730 obsolete and triggering automatic file regeneration on the next update).
732 Required Class Attributes
733 -------------------------
734 The following class attributes must be defined in subclasses, either
735 manually or programmatically (see: ``FileHandler.set_class_attributes`` and
736 its implementations in subclasses).
738 FILENAME : str
739 The base filename for this filehandler as it will appear in an event
740 directory.
742 DEPENDENCIES : Tuple[FileHandler]
743 A tuple of other ``FileHandler`` subclasses whose data this
744 ``FileHandler`` uses in order to ``generate`` its own output.
746 MANIFEST_TYPES : Tuple[FileHandler]
747 A tuple of other ``FileHandler`` subclasses that are generated at the
748 same time as this one. In other words, running ``self.generate`` for
749 any of the subclasses in ``MANIFEST_TYPES`` will produce all of the
750 files in that tuple.
752 UR_DEPENDENCIES : Tuple[FileHandler]
753 Return a list of ur-dependencies, i.e. ``DEPENDENCIES`` (of
754 ``DEPENDENCIES`` etc.) of this ``FileHandler``, i.e. ``FileHandler``
755 classes whose data is ultimately used (after some number of steps in
756 the DAG) to generate this ``FileHandler``. The list is ordered such
757 that its files can be generated in order without encountering missing
758 dependencies (i.e. items deepest in ``cls.UR_DEPENDENCY_TREE`` come
759 first).
761 UR_DEPENDENCY_TREE : ImmutableDict
762 A dict of all ``DEPENDENCIES`` of ``DEPENDENCIES`` going back to the
763 original input files that are ultimately required to generate this
764 file. Maps ``FileHandler`` classes to dictionaries of their own
765 ancestry trees recursively starting at the current FileHandler. The
766 deepest items in the dictionary are the furthest ``DEPENDENCIES`` up
767 the dependency graph (and consequently the files that must be generated
768 first in order for the shallower files to be generated). See
769 ``FileHandler.UR_DEPENDENCIES`` for a flattened, ordered version.
771 Parameters
772 ----------
773 eventid_or_fh : str, llama.Event, or llama.FileHandler
774 the ``eventid`` of the file handler or else another ``FileHandler`` or
775 ``Event`` instance (though anything with ``eventid`` and ``rundir``
776 properties will work). If such an object is provided, the ``rundir``
777 will be inferred therefrom.
778 rundir : str, optional
779 The directory in which events from this run are being stored. If
780 ``eventid_or_fh`` is an object with a ``rundir`` attribute, then that
781 value of ``rundir`` will be used and this argument will be ignored.
782 (See ``DEFAULT_RUN_DIR`` for default value.) Overrides the ``rundir``
783 specified in ``eventid_or_fh``.
784 parent : FileHandler, optional
785 If this ``FileHandler`` instance is being used to generate a different
786 ``FileHandler`` instance, specify the original instance as the
787 ``parent``. Overrides the ``rundir`` specified in ``eventid_or_fh``.
789 Raises
790 ------
791 AssertionError
792 If class constants in ``cls.required_attributes`` are not defined or if
793 ``cls.MANIFEST_TYPES`` has any inconsistencies.
794 """
796 COOLDOWN_PARAMS = CoolDownParams(base=60, increment=60, maximum=240*60)
798 FILENAME = None
799 DEPENDENCIES = None
800 MANIFEST_TYPES = None
801 UR_DEPENDENCIES = None
802 UR_DEPENDENCY_TREE = None
804 _REQUIRED = ("FILENAME", "DEPENDENCIES", "MANIFEST_TYPES",
805 "UR_DEPENDENCIES", "UR_DEPENDENCY_TREE")
807 @classmethod
808 def set_class_attributes(cls, subclass):
809 """
810 Decorater for a new subclass that sets its ``MANIFEST_TYPES`` class
811 attribute to the default ``FileHandler`` value without requiring them
812 to be manually specified. Also determines the ``UR_DEPENDENCIES`` and
813 ``UR_DEPENDENCY_TREE`` based on ``subclass.DEPENDENCIES``. **NB:
814 manually set class attributes will be overwritten by this decorator.**
816 Parameters
817 ----------
818 subclass : type
819 The subclass whose class attributes need to be set.
821 Returns
822 -------
823 subclass : type
824 The same decorated subclass.
825 """
826 subclass.MANIFEST_TYPES = (subclass,)
827 if subclass.DEPENDENCIES is None:
828 raise ValueError("You must define DEPENDENCIES first for subclass "
829 f"{subclass}. Got ``None``.")
830 subclass.UR_DEPENDENCY_TREE = ImmutableDict({
831 d: d.UR_DEPENDENCY_TREE for d in subclass.DEPENDENCIES
832 })
833 # flatten this to get UR_DEPENDENCIES
834 next_branches = [subclass.UR_DEPENDENCY_TREE]
835 ur_dependencies = list()
836 while next_branches:
837 ur_dependencies += [dep for b in next_branches for dep in b]
838 next_branches = [b[dep] for b in next_branches for dep in b]
839 sorted_unique_ur_deps = list()
840 for dep in ur_dependencies[::-1]:
841 if dep not in sorted_unique_ur_deps:
842 sorted_unique_ur_deps.append(dep)
843 subclass.UR_DEPENDENCIES = tuple(sorted_unique_ur_deps)
844 return subclass
846 def __getnewargs__(self):
847 """
848 Get arguments to pass to ``new``. Used for pickling and copying.
849 """
850 return (self.eventid, self.rundir)
852 # Required constants for subclasses go in _REQUIRED
854 DEP_CHECKSUM_KWARGS = ImmutableDict({})
856 def checksum(self, **kwargs):
857 """
858 Get the sha256 checksum of this file's contents. Use this to version
859 files and check whether their contents have changed.
861 Parameters
862 ----------
863 kwargs : dict
864 **(Ignored in the base** ``FileHandler`` **implementation)**.
865 Subclass implementations can optionally use input arguments to
866 identify relevant subsets of the file's contents for versioning or
867 diff-checking purposes. In this way, checks can be made for changes
868 on only specific subsets of file contents. This is useful for
869 ignoring changes to unused input data when checking for
870 obsolescence.
871 """
872 with self.open('rb') as infile:
873 return sha256(infile.read()).hexdigest()
875 def dep_checksums(self):
876 """
877 Recalculate the ``sha256`` sums of the contents of the input files
878 (i.e. ``DEPENDENCIES``) for this ``FileHandler`` instance's
879 ``manifest_filehandlers`` (either to store them or to check whether they
880 have changed from the stored values).
882 Returns
883 -------
884 dep_checksums : dict
885 Keys are ``FileHandler.clsname`` values for each of this
886 ``FileHandler`` class's ``DEPENDENCIES`` and values are the
887 corresponding ``checksums`` of each file. These checksums uniquely
888 determine the exact input files used and are suitable for creating
889 snapshots of pipeline state.
890 dep_subset_checksums : dict
891 Same as ``checksums`` but with ``self.DEP_CHECKSUM_KWARGS`` applied
892 to each dependency's ``checksum`` method to **only** check whether
893 the data used by ``self.generate`` has changed. These checksums
894 determine whether the ``DEPENDENCIES`` have changed *in a way that
895 is meaningful to this specific filehandler* (since unused fields
896 are ignored) and are suitable for determining whether a
897 ``FileHandler`` needs to be regenerated based on the availability
898 of new data. (To avoid unnecessary computation, these checksums are
899 only calculated separately from ``checksums`` when a dependency has
900 a set of ``DEP_CHECKSUM_KWARGS`` defined for it.)
901 """
902 sums = dict()
903 sub_sums = dict()
904 for dep in self.DEPENDENCIES:
905 depfh = dep(self)
906 sums[depfh.clsname] = depfh.checksum()
907 sum_kwargs = self.DEP_CHECKSUM_KWARGS.get(dep)
908 if sum_kwargs:
909 sub_sums[depfh.clsname] = depfh.checksum(**sum_kwargs)
910 else:
911 sub_sums[depfh.clsname] = sums[depfh.clsname]
912 assert set(sums) == set(sub_sums)
913 return sums, sub_sums
915 @property
916 def filename_for_download(self):
917 """
918 Get a filename that includes the ``eventid``, revision number, and
919 version hash for this file (i.e. what version number this is in the
920 version history; e.g. if three versions of *this* file exist in the
921 version history, then this is version 3). If this file does not appear
922 in the git history, it will be marked 'v0' and the hash will be
923 'UNVERSIONED'. The output format is ``eventid``, version, first 7
924 digits of commit hash, and filename, split by hyphens, so that the
925 third version of ``skymap_info.json`` for event ``S1234a`` with git
926 hash ``dedb33f`` would be called
927 ``S1234a-v3-dedb33f-skymap_info.json``. Use this for file downloads or
928 files sent to other services in order to facilitate data product
929 tracking outside the highly-organized confines of a pipeline run
930 directory.
931 """
932 return self.git.filename_for_download(self.FILENAME)
934 @property
935 def auxiliary_paths(self):
936 """
937 Return all names of *possible* auxiliary (rider) files associated
938 with this ``FileHandler`` instance. These are things like metadata,
939 cooldown, veto, and locking files.
940 """
941 return (f for f in (list(self.meta.filenames) +
942 list(self.cooldown.filenames) +
943 list(self.intent.filenames) +
944 list(self.veto.vetofilenames) +
945 list(self.lock.lockfiles) +
946 list(self.lock.obsolescence_files)))
947 # if os.path.exists(os.path.join(self.eventdir, f)))
949 def open(self, mode='r'):
950 """
951 Open this file in readonly mode and return the resulting object.
952 """
953 # mode can only be 'r' or 'rb'
954 if mode not in ['r', 'rb', 'br']:
955 raise ValueError('File is readonly.')
956 return open(self.fullpath, mode)
958 def __copy__(self):
959 """
960 Return an instance of this ``FileHandler`` with the same
961 properties, *including* ``graph`` and ``parent``.
962 """
963 dup = type(self)(self)
964 if hasattr(self, '_graph'):
965 dup.graph = self.graph
966 if hasattr(self, '_parent'):
967 dup.parent = self.parent
968 return dup
970 def __deepcopy__(self, memo=None):
971 """
972 Same as ``__copy__`` but make recursive deepcopies of attributes.
973 """
974 dup = type(self)(
975 deepcopy(self.eventid),
976 rundir=deepcopy(self.rundir),
977 )
978 if hasattr(self, '_graph'):
979 dup.graph = deepcopy(self.graph, memo=memo)
980 if hasattr(self, '_parent'):
981 dup.parent = deepcopy(self.parent, memo=memo)
982 return dup
984 @property
985 def subgraph(self):
986 """
987 Get all ur ``DEPENDENCIES`` along with this ``FileHandler`` instance in
988 a single ``FileGraph`` instance.
989 """
990 fhs = list(self.UR_DEPENDENCIES) + [type(self)]
991 return FileGraph({fh.__name__: fh(self) for fh in fhs})
993 @property
994 def graph(self):
995 """
996 Get the ``FileGraph`` to which this ``FileHandler`` instance belongs.
997 This can be set dynamically to associate a ``FileHandler`` with a given
998 ``FileGraph`` (and hence a given ``Pipeline``). If ``graph`` is not
999 manually set, it defaults to the ``subgraph`` of this ``FileHandler``.
1000 """
1001 return getattr(self, '_graph', self.subgraph)
1003 @graph.setter
1004 def graph(self, graph):
1005 if not isinstance(graph, FileGraph):
1006 raise ValueError(("Must be an instance of ``FileGraph``, instead "
1007 "got {}").format(graph))
1008 setattr(self, '_graph', graph)
1010 def diff_contents(self, other, force=False):
1011 """
1012 Return a diff of two text files. If the files are not text files
1013 (i.e. if their file extensions are not in ``TEXT_FILE_EXTENSIONS``),
1014 prints "Binary files ``self`` and ``other`` differ" (with ``self`` and
1015 ``other`` replaced with their full file paths).
1017 Parameters
1018 ----------
1019 other : FileHandler, str
1020 The ``FileHandler`` to diff to this one or a path to a file to diff
1021 with this one.
1022 force : bool, optional
1023 Whether to force the diff (as if ``self`` and ``other`` are both
1024 text files) even if the files are not recognized as a form of text
1025 file.
1027 Returns
1028 diff : str
1029 A textual diff of the two files' contents, provided they are both
1030 recognized as text files (or ``force`` is ``True``); or else a
1031 message saying that they differ. Prints nothing if the files are
1032 the same.
1033 """
1034 BIN_FMT = "Binary files {} and {} differ"
1035 otherpath = other.fullpath if isinstance(other, FileHandler) else other
1036 selfpath = self.fullpath
1037 txt = force or (
1038 any(selfpath.endswith(x) for x in TEXT_FILE_EXTENSIONS) and
1039 any(otherpath.endswith(x) for x in TEXT_FILE_EXTENSIONS)
1040 )
1041 if not txt:
1042 if self.compare_contents(other):
1043 return ""
1044 return BIN_FMT.format(selfpath, otherpath)
1045 with self.open() as selffile, open(otherpath, 'r') as otherfile:
1046 selflines = selffile.readlines()
1047 otherlines = otherfile.readlines()
1048 # make sure last line ends with newline
1049 for lines in selflines, otherlines:
1050 if lines[-1][-1] != '\n':
1051 lines[-1] += '\n'
1052 return "".join(difflib.unified_diff(selflines, otherlines))
1054 def compare_contents(self, other):
1055 """
1056 Check whether the contents of this ``FileHandler`` instance's file are
1057 the same as the contents of the other ``FileHandler`` instance's file.
1059 Parameters
1060 ----------
1061 other : FileHandler, str
1062 The ``FileHandler`` to compare to this one or else a path to a file
1063 to compare to this one.
1065 Returns
1066 same : bool
1067 Returns ``True`` if both files exist and have the same contents.
1068 Otherwise, returns ``False``.
1069 """
1070 # TODO This is not database implementation friendly
1071 otherpath = other.fullpath if isinstance(other, FileHandler) else other
1072 selfpath = self.fullpath
1073 for path in (selfpath, otherpath):
1074 if not os.path.isfile(path):
1075 LOGGER.warning("Tried to compare contents of %s, but file %s "
1076 "does not exist. Returning False.", self, path)
1077 return False
1078 return filecmp.cmp(selfpath, otherpath)
1080 def exists(self):
1081 """
1082 Check whether the file associated with this handler has yet been
1083 generated.
1084 """
1085 return os.path.isfile(self.fullpath)
1087 @recursive_obsolescence
1088 def is_obsolete(self, checked=None, ancestors=True):
1089 """
1090 Check whether this file exists but needs to be regenerated by seeing
1091 whether any of its ``DEPENDENCIES`` have updated their file contents
1092 since this file was made. This is a somewhat conservative check to see
1093 whether any files need to be regenerated automatically; it will return
1094 False if there is any ambiguity (in which case you will need to
1095 manually delete and regenerate child files). You can extend this
1096 definition with extra obsolescence criteria, but make sure to call
1097 ``super`` to keep this automatic regeneration functionality in response
1098 to regenerated ``DEPENDENCIES``.
1100 If the file is marked as "locked" (see: ``llama.lock.LockHandler``),
1101 the file will **never** be marked obsolete. This provides a way to
1102 manually prevent file obsolescence in situations in which it does not
1103 apply.
1105 If any of this file's ancestors are obsolete, then this file will be
1106 marked obsolete. You can skip this check with ``ancestors=False``.
1108 Failing that, if the output file does not exist, or if its input
1109 ``DEPENDENCIES`` do not exist, it is not considered obsolete, and this
1110 method will return False. Likewise, if the file has no
1111 ``DEPENDENCIES``, it cannot be naively obsoleted, and this method will
1112 return False.
1114 Failing that, it will be marked obsolete if its
1115 ``dep_subset_checksums()[1]`` (see: ``FileHandler.dep_checksums``
1116 second return value) have changed from their previously recorded
1117 values. If those checksums are not recorded, it will not be marked as
1118 obsolete.
1120 Parameters
1121 ----------
1122 checked : dict, optional
1123 A dictionary mapping ``FileHandler`` instances that have previously
1124 had their obsolescences checked mapped to whether they are yet
1125 obsolete; used internally to track whether this ``FileHandler``
1126 instance's ``DEPENDENCIES`` are obsolete without recomputing them.
1127 ancestors : bool, optional
1128 If ``False``, don't check whether ancestors are obsolete; only run
1129 this ``FileHandler`` instance's obsolescence checks.
1130 """
1131 if self.lock.is_locked:
1132 return False
1133 if ancestors:
1134 for dep in self.DEPENDENCIES:
1135 if dep(self).is_obsolete(checked=checked):
1136 return True
1137 if not self.exists():
1138 return False
1139 if not self.DEPENDENCIES:
1140 return False
1141 for dep in self.DEPENDENCIES:
1142 if not dep(self).exists():
1143 return False
1144 try:
1145 dep_checksums = self.dep_checksums()[1]
1146 prev_dep_checksums = self.meta.dep_subset_checksums
1147 if dep_checksums != prev_dep_checksums:
1148 # LOGGER.debug(f"{self} found obsolete due to changed "
1149 # "dependency checksums. current checksums: "
1150 # f"{dep_checksums}, checksums at gen time: "
1151 # f"{prev_dep_checksums}")
1152 return True
1153 except (FileNotFoundError, KeyError, IOError):
1154 pass
1155 return False
1157 def delete(self):
1158 """
1159 Delete this file if it exists, along with all of its rider files, and
1160 commit that change to version control.
1161 """
1162 if not self.exists():
1163 raise IOError("Can't delete, doesn't exist: " + self.fullpath)
1164 evdir = Path(self.eventdir)
1165 aux = [evdir/p for p in self.auxiliary_paths if (evdir/p).exists()]
1166 LOGGER.info(f"Deleting file {self.fullpath} and aux paths {aux}")
1167 self.git.remove(self.FILENAME, *aux)
1169 @property
1170 def manifest_filehandlers(self):
1171 """
1172 A set of ``FileHandler`` instances generated by this ``FileHandler``
1173 (not including temp files that should be deleted after generation). By
1174 default, this set only includes the filename for this FileHandler. If
1175 ``generate`` fails, these files will be removed to enforce atomicity.
1177 If you are making a bunch of ``FileHandler`` subclasses that are all
1178 generated with the same method, you should define an abstract base
1179 class for those ``FileHandler`` classes with a suitable ``_generate``
1180 method and a ``manifest_filehandlers`` property that returns all of
1181 their filenames. The subclasses then only need to return those
1182 ``FileHandler`` instances (and any other relevant methods and
1183 properties unique to each subclass ``FileHandler``).
1184 """
1185 return [c(self) for c in self.MANIFEST_TYPES]
1187 @property
1188 def manifest(self):
1189 """
1190 A set of filenames generated by this ``FileHandler`` (not
1191 including temp files that should be deleted after generation). By
1192 default, this set only includes the filename for this FileHandler.
1193 If ``generate`` fails, these files will be removed to enforce atomicity.
1195 In general, you should not need to modify this, since it will return
1196 filenames from the ``FileHandler`` instances in
1197 ``self.manifest_filehandlers``, which should contain an exhaustive list
1198 of ``FileHandler`` instances associated with the ``_generate`` method
1199 that creates them.
1200 """
1201 filenames = {fh.FILENAME for fh in self.manifest_filehandlers}
1202 assert len(filenames) == len(self.manifest_filehandlers)
1203 return filenames
1205 def modtime(self, ts=None):
1206 """
1207 Get a ``datetime`` object with the modification time of this
1208 file, assuming it exists.
1210 Parameters
1211 ----------
1212 ts : int or float, optional
1213 If the file does not exist, return a datetime parsed from the UNIX
1214 timestamp given by ``ts`` if provided; otherwise, return ``None``.
1215 Use this to provide a fallback modification time in cases when
1216 modification times might need to be compared between existing files
1217 and files whose existence is uncertain.
1218 """
1219 if self.exists():
1220 return fromtimestamp(os.path.getmtime(self.fullpath))
1221 elif ts is not None:
1222 return fromtimestamp(ts)
1223 return None
1225 @property
1226 def _tempdir(self):
1227 """
1228 Path to a directory for temporary files associated with this
1229 ``Event``. This temporary directory is located within
1230 ``self.eventdir``.
1231 """
1232 return os.path.join(self.eventdir, LOCAL_TMPDIR_NAME)
1234 @property
1235 def chmod(self, mode):
1236 """
1237 Change read/write/execute permissions of a file. ``mode`` has same
1238 meaning as in ``os.chmod``.
1239 """
1240 os.chmod(self.fullpath, mode)
1242 @property
1243 def read_bytes(self, memmap=False):
1244 """
1245 Read full file into memory as a ``bytes`` object. If ``memmap`` is
1246 ``True``, create a memory-map object (workalike to a ``bytes`` object)
1247 mapped to the data on disk (what you want to use if you don't know the
1248 file size, since it could be larger than available memory). Note that
1249 if ``memmap=True``, you'll need to manually close the returned
1250 bytes-like object to free up the file descriptor when you're done using
1251 its ``close`` method. Returns the resulting bytes-like object
1252 containing the file's data. Raises a ``FileNotFoundError`` if the file
1253 does not exist.
1254 """
1255 if not os.path.isfile(self.fullpath):
1256 raise FileNotFoundError(f"Cannot read bytes, {self.fullpath} "
1257 "does not exist.}")
1258 if memmap:
1259 return mmap(self.open('rb').fileno(), 0, access=ACCESS_READ)
1260 else:
1261 return Path(self.fullpath).read_bytes()
1263 @property
1264 def write_bytes(self, data):
1265 """
1266 Write ``data`` (a bytes-like object) to file, replacing any existing
1267 file contents. ``data`` can be a memory-mapped object or a file-object
1268 opened in binary mode (autodetected), allowing ``write_bytes`` to work
1269 on memory-mapped byte arrays returned by ``read_bytes``. Raises a
1270 ``ValueError`` if ``bytes`` can't be read from ``data``.
1271 """
1272 if hasattr(data, 'read'):
1273 buf = data.read(BUFSIZE)
1274 if not isinstance(buf, bytes):
1275 raise ValueError(f"Cannot write bytes to {self.fullpath}, "
1276 f"input {data} is not in a ``bytes`` format.")
1277 with open(self.fullpath, 'wb') as out:
1278 while buf:
1279 out.write(buf)
1280 buf = data.read(BUFSIZE)
1281 return
1282 Path(self.fullpath).write_bytes(data)
1284 @IntentMixin.decorate_checkout
1285 @CoolDownMixin.decorate_checkout
1286 @GitDirMixin.decorate_checkout
1287 @VetoMixin.decorate_checkout
1288 @FlagsMixin.decorate_checkout
1289 def checkout(self):
1290 """
1291 Create a temporary directory for generating new files in. This
1292 ensures that, in the event of file generation failure, the original
1293 event directory is unchanged. It also ensures that the state of the
1294 original event directory is not modified until the file generation is
1295 completed, assisting in parallel file generation. ``DEPENDENCIES`` are
1296 hardlinked to the temporary event directory to ensure that no extra
1297 data is being drawn from files not explicitly declared as such.
1299 Returns
1300 -------
1301 tmp_self : FileHandler
1302 A copy of the current FileHandler pointing to the temporary
1303 directory where this file will actually be generated.
1304 """
1305 if not os.path.exists(self._tempdir):
1306 os.mkdir(self._tempdir)
1307 tmp_rundir = tempfile.mkdtemp(prefix=type(self).__name__+'-',
1308 dir=self._tempdir)
1309 tmp_self = type(self)(self.eventid, rundir=tmp_rundir, parent=self)
1310 # don't init the event because we don't want to initialize a git repo
1311 os.mkdir(tmp_self.eventdir)
1312 for dep in self.DEPENDENCIES:
1313 # DO NOT link files, since we can't prevent the user from writing
1314 # to them; MUST copy.
1315 source = dep(self)
1316 dest = dep(tmp_self)
1317 shutil.copy(source.fullpath, dest.fullpath)
1318 # make readonly; should raise an exception if it gets edited,
1319 # helping ensure that generation functions do not mutate inputs
1320 os.chmod(dest.fullpath, 0o400)
1321 # TODO don't include aux paths
1322 # also copy auxiliary paths, which might be needed
1323 for aux in source.auxiliary_paths:
1324 srcaux = os.path.join(source.eventdir, aux)
1325 destaux = os.path.join(dest.eventdir, aux)
1326 if os.path.isfile(srcaux):
1327 try:
1328 shutil.copy(srcaux, destaux)
1329 except shutil.SameFileError:
1330 pass
1331 os.chmod(destaux, 0o400)
1332 return tmp_self
1334 @staticmethod
1335 def _checkin_cleanup(func):
1336 """
1337 Make sure to wrap ``checkin`` with this decorator at the outermost
1338 level to delete the temporary directory regardless of whether the
1339 attempt was successful.
1340 """
1342 @functools.wraps(func)
1343 def wrapper(self, gen_result, *args, **kwargs):
1344 """
1345 Delete temporary directory.
1346 """
1347 try:
1348 return func(self, gen_result, *args, **kwargs)
1349 finally:
1350 LOGGER.debug("Deleting tempfiles from %s from atomic "
1351 "generation of %s", gen_result, self)
1352 try:
1353 # a separate temp run directory is made for each attempt
1354 shutil.rmtree(gen_result.fh.rundir)
1355 except OSError:
1356 LOGGER.warning(
1357 "Generation attempt done, could not remove "
1358 "temporary generation directory for %s",
1359 self
1360 )
1362 return wrapper
1364 @_checkin_cleanup.__func__
1365 @IntentMixin.decorate_checkin
1366 @GitDirMixin.decorate_checkin
1367 @CoolDownMixin.decorate_checkin
1368 @VetoMixin.decorate_checkin
1369 @MetaDataMixin.decorate_checkin
1370 def checkin(self, gen_result):
1371 """
1372 Copy all generated output files generated by the temporary
1373 ``FileHandler`` created with ``checkout`` (as listed in
1374 ``manifest``) to this event directory once generation is complete.
1376 gen_result : str
1377 The generation result to be checked in, containing the temporary
1378 filehandler with the results of the generation attempt as well as
1379 any errors raised.
1381 Returns
1382 -------
1383 self : FileHandler
1384 Returns the successfully checked-in ``FileHandler`` instance.
1385 """
1386 LOGGER.info(f"Checking in {gen_result} to {self}.")
1387 if gen_result.err is not None:
1388 LOGGER.error(f"Exception while generating {self}: {gen_result}")
1389 raise gen_result.err
1390 src_dest_paths = [
1391 (os.path.join(gen_result.fh.eventdir, f),
1392 os.path.join(self.eventdir, f))
1393 for f in self.manifest
1394 ]
1395 if not all(os.path.isfile(s) for s, d in src_dest_paths):
1396 LOGGER.error("temp directory is missing output files specified in"
1397 "the manifest! aborting file generation.")
1398 raise GenerationError(("Could not find all generated files "
1399 "in temp directory {} that are defined in "
1400 "manifest; ABORTING.\nManifest:\n"
1401 "{}").format(gen_result.fh.eventdir,
1402 self.manifest))
1403 for src, dest in src_dest_paths:
1404 try:
1405 os.link(src, dest)
1406 except OSError:
1407 shutil.copy(src, dest)
1408 return self
1410 @property
1411 def eventdir(self):
1412 """
1413 The directory where data for the event this ``FileHandler``
1414 corresponds to is stored.
1415 """
1416 return os.path.join(self.rundir, self.eventid)
1418 @property
1419 def fullpath(self):
1420 """
1421 The full path to the file referred to by this ``FileHandler``.
1422 """
1423 return os.path.join(self.eventdir, self.FILENAME)
1425 @abstractmethod
1426 def _generate(self, *args, **kwargs):
1427 """
1428 Generate this file, whether by taking input data, reading input
1429 files, or fetching data through an API.
1430 """
1432 def __str__(self):
1433 return '{}(eventid="{}", rundir="{}"{})'.format(
1434 type(self).__name__,
1435 self.eventid,
1436 self.rundir,
1437 f', parent="{self.parent}"' if self.parent is not None else '',
1438 )
1440 def __repr__(self):
1441 return str(self)
1443 TIMEOUT = 20
1445 @staticmethod
1446 def _generate_cleanup(func):
1447 """
1448 Catch any exceptions raised by ``generate`` wrappers, log them, and put
1449 them in a generation result.
1450 """
1452 @functools.wraps(func)
1453 def wrapper(self, *args, **kwargs):
1454 try:
1455 return func(self, *args, **kwargs)
1456 except Exception as err:
1457 LOGGER.error("Cleaning unhandled generation error type %s, "
1458 "message: %s", type(err), err)
1459 LOGGER.error("TRACEBACK:\n%s",
1460 tbhighlight(traceback.format_exc()))
1461 return GenerationResult(self, err)
1463 return wrapper
1465 @_generate_cleanup.__func__
1466 @VetoMixin.decorate_generate
1467 @MetaDataMixin.decorate_generate
1468 def generate_unsafe(self, *args, **kwargs):
1469 """
1470 Generate this file without performing any of the usual ``checkout`` and
1471 ``checkin`` procedures; no cleanup will happen if file generation
1472 fails. You **almost certainly** want to call ``generate`` instead, or
1473 if you need to execute more than a single step of the pipeline,
1474 ``FileGraph.update`` with an appropriate downselection.
1476 Returns
1477 -------
1478 gen_result : GenerationResult
1479 A ``GenerationResult`` containing any errors raised during any part
1480 of file generation.
1481 """
1482 if list(self.DEPENDENCIES) and (not self.are_dependencies_met()):
1483 LOGGER.error("%s has unmet dependencies; cannot generate yet.",
1484 self)
1485 raise GenerationError(f"Unmet dependencies for {self}.")
1486 # close all figure windows to conserve memory
1487 if 'matplotlib.pyplot' in sys.modules:
1488 sys.modules['matplotlib.pyplot'].close('all')
1489 try:
1490 LOGGER.info(COL.BLUE + COL.BOLD + "Generating " +
1491 str(self) + COL.CLEAR)
1492 # invalidate obsolescence caches for all files (robust
1493 # against code changes that change obsolescence
1494 # definitions) TODO remove this obsolescence caching crap
1495 self.lock.remove_all_obsolescence()
1496 self._generate()
1497 generated = {f: f(self).exists() for f in self.MANIFEST_TYPES}
1498 if all(generated.values()):
1499 LOGGER.info(COL.GREEN + f"{self} files written." + COL.CLEAR)
1500 else:
1501 msg = f"Files missing after {self} generation: {generated}"
1502 LOGGER.error(COL.RED + msg + COL.CLEAR)
1503 raise GenerationError(msg)
1504 return GenerationResult(self)
1505 except Exception as err:
1506 LOGGER.warning('Wrapping exception raised while generating '
1507 '%s: %s("%s")', self, type(err).__name__, err)
1508 return GenerationResult(self, err=err)
1510 def generate(self, *args, **kwargs):
1511 """
1512 Make the next version of a file syncronously, assuming it does not
1513 exist or is in need of updating, in a syncronous manner. In the event
1514 that some sort of error causes file generation go fail, delete the
1515 output file (if it exists). You should ALWAYS use this instead of
1516 ``_generate()`` in order to generate and version files safely and
1517 atomically without risk of disrupting parallel file manipulations. If
1518 you want to do something more complicated, like updating multiple files
1519 in an event directory, or even multiple events, use functions like
1520 ``Run.events.update``, ``Event.update``,
1521 ``FileHandler.subgraph.update``, or ``FileGraph.downselect().update``.
1522 """
1523 return self.checkin(self.checkout().generate_unsafe())
1525 def are_dependencies_met(self):
1526 """
1527 Check whether the data needed to generate this filetype exists. If
1528 there are no ``DEPENDENCIES`` (i.e. no input), this file cannot
1529 possibly be made (since outputs are considered to be pseudo-functional
1530 mappings from inputs, having no inputs means that no meaningful output
1531 can exist).
1532 """
1533 if not self.DEPENDENCIES:
1534 return False
1535 for filehandler in self.DEPENDENCIES:
1536 if not filehandler(self).exists():
1537 return False
1538 return True
1540 def size(self):
1541 """Get the size in bytes of this file. Raises a ``FileNotFoundError``
1542 if this file does not exist."""
1543 return os.stat(self.fullpath).st_size
1545 def are_ur_dependencies_met(self):
1546 """
1547 Recursively check whether ``DEPENDENCIES`` for this file can be
1548 generated (by in turn running this same check on their
1549 ``DEPENDENCIES``). In short, this is a check as to whether we can
1550 eventually get to generating this file or whether the required data to
1551 get to this node in the FileHandler DAG is simply not in the eventdir.
1552 If there are no ``DEPENDENCIES`` (i.e. no input), this file cannot
1553 possibly be made (since outputs are considered to be pseudo-functional
1554 mappings from inputs, having no inputs means that no meaningful output
1555 can exist).
1556 """
1557 if not self.DEPENDENCIES:
1558 return False
1559 return (self.are_dependencies_met() or
1560 all(fh(self).exists() or fh(self).are_ur_dependencies_met()
1561 for fh in self.DEPENDENCIES))
1563 def status(self, obscheck=None):
1564 """
1565 Return the status string (see ``NODELEGEND_COLORS``) for a given
1566 ``FileHandler`` instance.
1567 """
1568 stats_list = []
1569 if self.exists():
1570 try:
1571 walltime = f"{self.meta.wall_time:8.3f}s"
1572 except IOError:
1573 walltime = "N/A"
1574 mtime = self.modtime()
1575 stats_list += [
1576 (
1577 f"{mtime.year}-{mtime.month}-{mtime.day} "
1578 f"{mtime.hour}:{mtime.minute:02d}:{mtime.second}"
1579 ),
1580 "Wall: {}".format(walltime),
1581 "Size: {}".format(sizeof_fmt(self.size())),
1582 ]
1583 if self.veto.permanently_vetoed():
1584 status = "EXISTS, VETOED"
1585 elif self.is_obsolete(ancestors=False):
1586 status = "OBSOLETE"
1587 if obscheck is not None:
1588 obscheck[self] = True # if obsolete, so are descendants
1589 elif self.is_obsolete(obscheck):
1590 # if self.lock.is_locked:
1591 # status = "OBSOLETE, LOCKED"
1592 # else:
1593 status = "ANCESTOR OBSOLETE"
1594 try:
1595 newer = [f.__name__ for f in self.DEPENDENCIES
1596 if f(self).modtime(ts=0) > self.modtime()]
1597 if newer:
1598 stats_list.append("Newer deps: "+", ".join(newer))
1599 except TypeError:
1600 pass
1601 else:
1602 if self.lock.is_locked:
1603 status = "EXISTS, LOCKED"
1604 else:
1605 status = "EXISTS"
1606 elif self.intent.in_progress():
1607 status = "REGENERATING"
1608 elif self.veto.permanently_vetoed():
1609 status = "VETOED"
1610 veto_dict = self.veto.read_json()
1611 veto_func = veto_dict['veto_function']
1612 veto_msg = veto_dict['veto_message']
1613 stats_list += [
1614 "Veto: {}".format(
1615 veto_func.replace("<", "").replace(">", "")
1616 ),
1617 f"Why: {veto_msg}",
1618 ]
1619 else:
1620 vetoed_ancestors = [fh for fh in self.UR_DEPENDENCIES
1621 if fh(self).veto.permanently_vetoed()]
1622 if self.intent.in_progress():
1623 status = "GENERATING"
1624 elif self.cooldown.in_progress():
1625 status = "FAILED, COOLING DOWN"
1626 elif self.cooldown.exists() or self.intent.exists():
1627 status = "FAILED"
1628 elif isinstance(self, EventTriggeredFileHandler):
1629 status = "AWAITS EXTERNAL TRIGGER"
1630 elif self.are_dependencies_met():
1631 status = "READY TO GENERATE"
1632 elif vetoed_ancestors:
1633 status = "ANCESTOR VETOED"
1634 veto_desc = ', '.join(fh.__name__ for fh in vetoed_ancestors)
1635 stats_list += [
1636 "Vetoed: {}".format(veto_desc),
1637 ]
1638 elif any(f(self).cooldown.in_progress()
1639 for f in self.UR_DEPENDENCIES):
1640 status = "ANCESTOR COOLING DOWN"
1641 elif any(f(self).cooldown.exists()
1642 for f in self.UR_DEPENDENCIES):
1643 status = "ANCESTOR FAILED"
1644 elif self.are_ur_dependencies_met():
1645 status = "ANCESTORS EXIST"
1646 elif [f for f in self.UR_DEPENDENCIES
1647 if (issubclass(f, EventTriggeredFileHandler) and
1648 not f(self).exists())]:
1649 status = "ANCESTOR AWAITS TRIGGER"
1650 else:
1651 status = "NOT YET MADE"
1652 return Status(status, stats_list, NODELEGEND_COLORS[status])
1654 @classmethod
1655 def sync_shared_manifests(cls, *filehandlers):
1656 """
1657 When implementing a ``FileHandler`` with its own subclasses in
1658 its manifest (the easiest pattern for ensuring that the ``FileHandler``
1659 classes in the manifest share a common ``generate`` method and
1660 ``DEPENDENCIES``), use this decorator before each class definition to
1661 make sure that the base ``FileHandler`` (with the shared ``generate``
1662 function) as well as its subclasses (which distinguish between its
1663 outputs by each having their own ``FILENAME`` attributes).
1665 Parameters
1666 ----------
1667 *filehandlers : FileHandler or str
1668 ``FileHandler`` subclasses that are generated together.
1670 Returns
1671 -------
1672 first_filehandler
1673 The first ``FileHandler`` subclass from ``filehandlers`` (this
1674 allows the function to operate as a class decorator).
1676 Raises
1677 ------
1678 ValueError
1679 If provided ``filehandlers`` are not subclasses of this class or if
1680 no ``filehandlers`` are provided.
1681 TypeError
1682 If provided ``filehandlers`` do not share the same ``DEPENDENCIES``
1683 and ``_generate`` methods as this class, or if any of them lack a
1684 ``FILENAME`` attribute.
1685 """
1686 if not filehandlers:
1687 raise ValueError("No ``FileHandlers`` provided to sync.")
1688 manifest = tuple(set(filehandlers).union(cls.MANIFEST_TYPES or ()))
1689 for synced_cls in manifest + (cls,):
1690 if not isinstance(cls, type) or not issubclass(synced_cls, cls):
1691 raise ValueError(f"Must specify a subclass of {cls}, "
1692 f"not {synced_cls}")
1693 setattr(synced_cls, "MANIFEST_TYPES", manifest)
1694 return filehandlers[0]
1697# The following class is abstract, so we shouldn't need to define abstract
1698# methods.
1699# pylint: disable=abstract-method
1700class EventTriggeredFileHandler(FileHandler):
1701 """
1702 A data file that is downloaded or received as the result of an event
1703 notification from some external source, e.g. a post request or a GCN
1704 notification. The generate() method should be able to take any arguments
1705 necessary for the creation of this file, since this method is likely to be
1706 invoked by an event-handler script. These ``FileHandler`` classes should
1707 not have ``DEPENDENCIES`` because they are created by events external to
1708 the dependency graph.
1709 """
1711 DEPENDENCIES = tuple()
1714class TriggerList(FileHandler):
1715 """
1716 A file that contains lists of triggers of some sort.
1717 """
1719 DETECTORS = None
1721 _REQUIRED = ("DETECTORS",)
1723 @abstractproperty
1724 def num_triggers(self):
1725 """
1726 The number of triggers described by this file. Useful mostly for
1727 quickly determining if this trigger list is empty.
1728 """
1731# The following class is abstract, so we shouldn't need to define abstract
1732# methods.
1733# pylint: disable=abstract-method
1734class JSONFile(FileHandler):
1735 """
1736 A FileHandler abstract subclass providing tools to work with JSON
1737 dictionaries.
1738 """
1740 def checksum(self, fields=None):
1741 """
1742 Get the sha256 checksum of this file's contents. Use this to version
1743 files and check whether their contents have changed. Optionally only
1744 use specific fields in generating the checksum to ignore irrelevant
1745 changes (e.g. when determining file obsolescence).
1747 Parameters
1748 ----------
1749 fields : tuple or list, optional
1750 A tuple of tuples of strings, with each sub-tuple containing
1751 strings or integers indexing into this JSON-file's contents
1752 (strings for ``dict`` fields, integers for ``list`` fields) to
1753 specify only relevant fields. See example below.
1754 kwargs : dict
1755 Remaining keyword arguments are ignored (see
1756 ``FileHandler.checksum`` note on ``kwargs``).
1758 Raises
1759 ------
1760 KeyError
1761 If a field that is expected in a sub-dictionary is not found.
1762 IndexError
1763 If an entry that is expected in a sub-list is not found.
1764 TypeError
1765 If you attempt to index into a sub field that is not a dictionary
1766 using a string or if ``fields`` cannot be indexed into.
1768 Examples
1769 --------
1770 For a JSON dictionary like:
1771 >>> foo = {
1772 ... "names": ["stef", "countryman"],
1773 ... "age": 27,
1774 ... "eyes": {
1775 ... "left": "brown",
1776 ... "right": "brown",
1777 ... }
1778 ... }
1780 You can calculate the checksum using *only* the first name and left
1781 eye-color values by using these ``fields``:
1782 >>> fields = (
1783 ... ('names', 0),
1784 ... ('eyes', 'left')
1785 ... )
1786 """
1787 if fields is None:
1788 return super().checksum()
1789 if fields:
1790 fields[0] # make sure this can be indexed into
1791 data = self.read_json()
1792 compare = {'fields': fields, 'values': list()}
1793 for field in fields:
1794 value = data
1795 for key in field:
1796 value = value[key]
1797 compare['values'].append(value)
1798 assert (len(compare['fields']) == len(compare['values']))
1799 return sha256(json.dumps(compare, allow_nan=True, indent=4,
1800 sort_keys=True).encode()).hexdigest()
1803 def _write_json(self, outdict):
1804 """
1805 Write this dictionary to a JSON file in a consistent and
1806 deterministic format during file generation.
1807 """
1808 with open(self.fullpath, 'w') as outfile:
1809 json.dump(outdict, outfile, indent=4, sort_keys=True)
1811 def read_json(self):
1812 """
1813 Read in this JSON file as a dictionary.
1814 """
1815 with self.open() as jsonfile:
1816 return json.load(jsonfile)
1818 @property
1819 def html_table(self):
1820 """
1821 Generate an HTML table representation of the JSON data
1822 contained in this ``FileHandler``.
1823 """
1824 from json2html import json2html
1825 return json2html.convert(json=self.read_json(), table_attributes="")
1828class GenerateOnceMixin(object):
1829 """
1830 An object that, once generated, never becomes obsolete; must be manually
1831 regenerated.
1832 """
1834 @recursive_obsolescence
1835 def is_obsolete(self, checked=None, **kwargs):
1836 """
1837 Because this class inherits from GenerateOnceMixin, it is never
1838 marked obsolete automatically; it must be manually regenerated. This
1839 function always returns False. See ``FileHandler.is_obsolete`` for the
1840 meaning of ``checked``.
1841 """
1842 return False