Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# (c) Stefan Countryman July 12, 2016 - 2018 

2 

3""" 

4Abstract definitions of FileHandler classes. FileHandlers provide methods for 

5defining and working with data associated with a trigger. 

6""" 

7 

8import re 

9import sys 

10import os 

11from abc import ABC, abstractmethod, abstractproperty 

12import json 

13import datetime 

14import time 

15import tempfile 

16import filecmp 

17import shutil 

18import difflib 

19import functools 

20import traceback 

21from subprocess import Popen, PIPE 

22from mmap import mmap, ACCESS_READ 

23from pathlib import Path 

24from hashlib import sha256 

25from copy import deepcopy 

26from operator import xor 

27import logging 

28from collections import namedtuple, OrderedDict 

29from tabulate import tabulate 

30# DAGRR 

31from llama.vetoes import ( 

32 VetoMixin, 

33 VetoException, 

34) 

35from llama.flags import FlagsMixin, flag_table 

36from llama.meta import MetaDataMixin 

37from llama.versioning import GitDirMixin 

38from llama.lock import LockMixin 

39from llama.intent import CoolDownParams, CoolDownMixin, IntentMixin 

40# DAGRR 

41from llama.utils import ( 

42 COLOR as COL, 

43 tbhighlight, 

44 color, 

45 EDGEFMT, 

46 DOTFMT, 

47 DEFAULT_RUN_DIR, 

48 GenerationError, 

49 vecstr, 

50 veccls, 

51 vecfh, 

52 plot_graphviz, 

53 sizeof_fmt, 

54) 

55from llama.classes import ( 

56 LOCAL_TMPDIR_NAME, 

57 RequiredAttributeMixin, 

58 ImmutableDict, 

59 NamespaceMappable, 

60 AbstractFileHandler, 

61) 

62from llama.filehandler.classes import GenerationResult 

63from llama.io.registry import get_io 

64from llama.io.classes import IOMixin 

65 

66NODELEGENDFMT = (r'"l" [label=<{{{{<B>FILE STATUS LEGEND</B>|{rows}}}}}>, ' 

67 r'shape="record", style=filled];') 

68NODELEGEND_COLOR_FMT = r'<FONT COLOR="{color}">▉</FONT> <I>{label}</I>' 

69# colors taken from: https://www.graphviz.org/doc/info/colors.html 

70NODELEGEND_COLORS = OrderedDict() 

71NODELEGEND_COLORS["EXISTS"] = "#7fffd4" # aquamarine 

72NODELEGEND_COLORS["EXISTS, LOCKED"] = "#ffffff" # white 

73NODELEGEND_COLORS["EXISTS, VETOED"] = "#ffffff" # white 

74NODELEGEND_COLORS["OBSOLETE"] = "#ffff33" # dim gold 

75NODELEGEND_COLORS["ANCESTOR OBSOLETE"] = "#cfcf00" # yellow 

76NODELEGEND_COLORS["VETOED"] = "#999999" # gray 

77NODELEGEND_COLORS["ANCESTOR VETOED"] = "#cccccc" # gray 

78NODELEGEND_COLORS["FAILED, COOLING DOWN"] = "#ff0000" # red 

79NODELEGEND_COLORS["FAILED"] = "#ff3322" # light red 

80NODELEGEND_COLORS["AWAITS EXTERNAL TRIGGER"] = "#984ea3" # purple 

81NODELEGEND_COLORS["ANCESTOR AWAITS TRIGGER"] = "#dd83ea" # light purple 

82NODELEGEND_COLORS["READY TO GENERATE"] = "#7fff00" # chartreuse 

83NODELEGEND_COLORS["ANCESTORS EXIST"] = "#bdff7a" # pale green 

84NODELEGEND_COLORS["NOT YET MADE"] = "#ff0000" # red 

85NODELEGEND_COLORS["ANCESTOR COOLING DOWN"] = "#ff7f00" # orange 

86NODELEGEND_COLORS["ANCESTOR FAILED"] = "#ffa954" # light orange 

87NODELEGEND_COLORS["GENERATING"] = "#f781bf" # pink 

88NODELEGEND_COLORS["REGENERATING"] = "#f781bf" # pink 

89NODELEGEND = NODELEGENDFMT.format( 

90 rows=r'<BR ALIGN="LEFT"/>'.join( 

91 [ 

92 NODELEGEND_COLOR_FMT.format(color=c, label=l) 

93 for l, c in NODELEGEND_COLORS.items() 

94 ] + [""] # put in one last aligning line break 

95 ) 

96) 

97NODEFMT = (r'"{num}" [label=<{{{{<B>{name}</B>|<B>{status}</B>|<I>{fname}</I>' 

98 r'{stats}}}}}>, shape="record", style=filled, ' 

99 r'{url}' 

100 r'fillcolor="{color}"];') 

101NODEFLAGSFMT = (r'"f" [label=<{{{{<B>EVENT FLAGS</B>|{rows}}}}}>, ' 

102 r'shape="record", ' 

103 r'style=filled, ' 

104 r'];') 

105NODEFLAGS_ROW_FMT = r'<B>{flag}:</B> <I>{value}</I>' 

106LOGGER = logging.getLogger(__name__) 

107TEXT_FILE_EXTENSIONS = ( 

108 'tex', 

109 'txt', 

110 'log', 

111 'csv', 

112 'tsv', 

113 'json', 

114 'yml', 

115 'xml', 

116) 

117NOW = datetime.datetime.now 

118BUFSIZE = 2**17 

119 

120 

121# this is just a function, so don't give it an uppercase constant var name 

122# pylint: disable=invalid-name 

123utcnow = datetime.datetime.utcnow 

124 

125 

126# this is just a function, so don't give it an uppercase constant var name 

127# pylint: disable=invalid-name 

128fromtimestamp = datetime.datetime.fromtimestamp 

129 

130 

131# A bunch of checks for the file handler and the query arg. See 

132# ``FileGraph.downselect`` for usage. Each check should be a function taking 

133# a FileHandler instance as its first argument and some kind of query value 

134# against which the FileHandler instance will be judged. If the variable 

135# returns true, the check has passed. 

136# NOTE Make sure to update the ``FileGraph.downselect`` docstring when 

137# adding, removing, or changing checks below! 

138DOWNSELECT_CHECKS = { 

139 'equals': vecfh(lambda fh, q: fh == q), 

140 'instanceof': veccls(isinstance), 

141 'type': veccls(lambda fh, q: type(fh) is q), # pylint: disable=C0123 

142 'typename': vecstr(lambda fh, q: type(fh).__name__ == q), 

143 'extension': vecstr(lambda fh, q: fh.FILENAME.split('.')[-1] == q), 

144 'startswith': vecstr(lambda fh, q: fh.FILENAME.startswith(q)), 

145 'endswith': vecstr(lambda fh, q: fh.FILENAME.endswith(q)), 

146 'inname': vecstr(lambda fh, q: q in fh.FILENAME), 

147 'nameis': vecstr(lambda fh, q: fh.FILENAME == q), 

148 'depends': veccls(lambda fh, q: q in fh.DEPENDENCIES), 

149 'ancestor': veccls(lambda fh, q: q in fh.UR_DEPENDENCIES), 

150 'descendent': veccls(lambda fh, q: type(fh) in q.UR_DEPENDENCIES), 

151 'dependsname': vecstr(lambda fh, q: q in (d.__name__ for d in 

152 fh.DEPENDENCIES)), 

153 'dependsfile': vecstr(lambda fh, q: q in (d('foo').FILENAME 

154 for d in fh.DEPENDENCIES)), 

155 'exists': lambda fh, q: fh.exists() is q, 

156 'cooldown': lambda fh, q: fh.cooldown.in_progress() is q, 

157 'intent': lambda fh, q: fh.intent.in_progress() is q, 

158 'vetoed': lambda fh, q: fh.veto.permanently_vetoed() is q, 

159 'urvetoed': lambda fh, q: bool([f for f in fh.UR_DEPENDENCIES 

160 if f(fh).veto.permanently_vetoed()]) is q, 

161 'obsolete': lambda fh, q: fh.exists() and fh.is_obsolete() is q, 

162 'selfobsolete': lambda fh, q: (fh.exists() and 

163 fh.is_obsolete(ancestors=False) is q), 

164 'depsmet': lambda fh, q: fh.are_dependencies_met() is q, 

165} 

166# this references other downselect check functions, so it can't be placed in 

167# the dictionary literal declaration of DOWNSELECT_CHECKS. 

168DOWNSELECT_CHECKS['needregen'] = lambda fh, q: q is ( 

169 DOWNSELECT_CHECKS['vetoed'](fh, False) and 

170 DOWNSELECT_CHECKS['urvetoed'](fh, False) and 

171 DOWNSELECT_CHECKS['cooldown'](fh, False) and 

172 DOWNSELECT_CHECKS['depsmet'](fh, True) and 

173 ( 

174 DOWNSELECT_CHECKS['exists'](fh, False) or 

175 DOWNSELECT_CHECKS['selfobsolete'](fh, True) 

176 ) 

177) 

178DOWNSELECT_CHECKS['subgraph'] = lambda fh, q: ( 

179 DOWNSELECT_CHECKS['descendent'](fh, q) or 

180 DOWNSELECT_CHECKS['type'](fh, q) 

181) 

182Status = namedtuple("Status", ("status", "stats", "color")) 

183 

184 

185FileGraphTuple = namedtuple("FileGraphTuple", ("eventid", "rundir", 

186 "pipeline")) 

187 

188 

189class FileGraph(ImmutableDict, NamespaceMappable): 

190 """ 

191 Used to store a list of ``FileHandler`` instances, e.g. those associated 

192 with a particular GraceDB event. In that example, one might access the 

193 LvcSkymapHdf5 file handler associated with some event using 

194 

195 > event.files.LvcSkymapHdf5 

196 

197 Has the nice feature of being able to take a dictionary as an 

198 initialization argument and create a dot-notation acessible map from that 

199 dictionary's key-value pairs. 

200 """ 

201 

202 def status(self): 

203 """ 

204 Return a status description of this graph for use in status-checking 

205 scripts and webpages. Values are taken from NODELEGEND_COLOR_FMT so 

206 that they can be colored appropriately. 

207 """ 

208 if not any(fh.exists() for fh in self.values()): 

209 status = "NOT YET STARTED" 

210 color = NODELEGEND_COLORS["NOT YET MADE"] 

211 elif any((fh.intent.in_progress() or 

212 (fh.modtime(ts=0).timestamp() > time.time() - 2)) 

213 for fh in self.values()): 

214 status = "GENERATING FILES" 

215 color = NODELEGEND_COLORS["GENERATING"] 

216 elif next(x for x in self.values()).flags['MANUAL'] == 'true': 

217 status = "MANUAL MODE" 

218 color = NODELEGEND_COLORS["ANCESTOR AWAITS TRIGGER"] 

219 else: 

220 status = "EXISTS" 

221 color = NODELEGEND_COLORS["EXISTS"] 

222 stats = "" 

223 return Status(status, stats, color) 

224 

225 def dependency_graph_term(self, unicode=True, plot=None, highlight=None): 

226 """ 

227 Like ``dependency_graph``, but return a terminal-friendly text plot of 

228 this ``FileGraph`` with current statuses as a string. By default, uses 

229 unicode characters to print a more legible graph; use pure ascii by 

230 passing ``unicode=False``. Highlight a specific line in the output 

231 table by specifying a substring in that row as ``highlight``. 

232 

233 Skip the graph plot with ``plot=False``; if 

234 ``plot=None``, try to make the graph but give up if ``Graph::Easy`` is 

235 not installed. 

236 

237 The plot requires ``Perl`` and the ``Graph::Easy`` module to be 

238 installed (with the ``graph-easy`` script in the current path); if 

239 ``plot=True`` and ``graph-easy`` is not available, a ``FileNotFound`` 

240 error will be raised. 

241 """ 

242 # color code info: https://stackoverflow.com/a/33206814/3601493 

243 if not len({fh.eventdir for fh in self.values()}) == 1: 

244 raise ValueError(("All ``FileHandler`` instances must refer to " 

245 "same event. Instead, got: {}").format(self)) 

246 tty = shutil.get_terminal_size((200, 20)) 

247 RES_FMT = "{graph}{flagtab}\n\n{tab}" 

248 

249 def bold(tab): 

250 if not unicode or not highlight: 

251 return tab 

252 tablines = tab.split('\n') 

253 hl = [highlight] if isinstance(highlight, str) else highlight 

254 for i, line in enumerate(tablines): 

255 if all(h not in line for h in hl): 

256 continue 

257 tablines[i] = re.sub('([^|]+)', 

258 lambda m: (COL.BOLD + COL.MAGENTA + 

259 m.group(1) + COL.CLEAR), line) 

260 return '\n'.join(tablines) 

261 

262 # let graph-easy start while we sort out subgraph status 

263 if plot is not False: 

264 cmd = ['graph-easy', '--as=boxart' if unicode else '--as=ascii'] 

265 try: 

266 proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) 

267 plot = True 

268 except FileNotFoundError: 

269 if plot is True: 

270 raise 

271 else: 

272 plot = False 

273 try: 

274 obscheck = dict() 

275 statuses = {fh: fh.status(obscheck=obscheck) for fh in 

276 self.values()} 

277 nodes = sorted(self.values(), key=lambda f: 

278 (f"{len(f.UR_DEPENDENCIES):05d}" + 

279 type(f).__name__)) 

280 flagtab = flag_table(list(self.values())[0].flags, color=unicode) 

281 if plot: 

282 graphtxt = '' 

283 for i, fh in enumerate(nodes): 

284 for d in fh.DEPENDENCIES: 

285 graphtxt += f'[ {nodes.index(d(fh))} ] -> [ {i} ]\n' 

286 res, err = proc.communicate(input=graphtxt.encode()) 

287 if proc.returncode: 

288 msg = (f"graph-easy failed with code {proc.returncode}.\n" 

289 f"STDOUT:\n{res}\nSTDERR:\n{err}") 

290 LOGGER.error(msg) 

291 raise RuntimeError(msg) 

292 LOGGER.debug(f"Generated text dependency graph, stderr: {err}") 

293 except: # noqa 

294 if plot: 

295 LOGGER.error("Text dep graph failed, killing graph-easy") 

296 proc.kill() 

297 raise 

298 graph = res.decode()+'\n\n' if plot else '' 

299 

300 # make a table 

301 if tty.columns < 200: 

302 tabs = [ 

303 tabulate( 

304 [ 

305 (i, type(fh).__name__, statuses[fh].status, 

306 ','.join(str(ii) for ii in sorted(nodes.index(d(fh)) 

307 for d in 

308 fh.DEPENDENCIES))) 

309 for i, fh in enumerate(nodes) 

310 ], 

311 ['No.', 'Class', 'Status', 'Deps',], 

312 # tablefmt='fancy_grid' if unicode else 'grid', 

313 tablefmt='presto', 

314 ), 

315 tabulate( 

316 [ 

317 (i, fh.FILENAME, 

318 '; '.join([' '.join(s.split()) 

319 for s in statuses[fh].stats])) 

320 for i, fh in enumerate(nodes) 

321 ], 

322 ['No.', 'File Name', 'Stats'], 

323 # tablefmt='fancy_grid' if unicode else 'grid', 

324 tablefmt='presto', 

325 ), 

326 ] 

327 else: 

328 tabs = [ 

329 tabulate( 

330 [ 

331 (i, type(fh).__name__, statuses[fh].status, 

332 ','.join(str(ii) for ii in sorted(nodes.index(d(fh)) 

333 for d in 

334 fh.DEPENDENCIES)), 

335 fh.FILENAME, 

336 '; '.join([' '.join(s.split()) 

337 for s in statuses[fh].stats])) 

338 for i, fh in enumerate(nodes) 

339 ], 

340 ['No.', 'Class', 'Status', 'Deps', 'File Name', 'Stats'], 

341 # tablefmt='fancy_grid' if unicode else 'grid', 

342 tablefmt='presto', 

343 ), 

344 ] 

345 if not unicode: 

346 tab = '\n\n'.join(bold(t) for t in tabs) 

347 return RES_FMT.format(graph=graph, flagtab=flagtab, tab=tab) 

348 # graph = (COL.MAGENTA + 

349 # graph.replace('\n', COL.CLEAR+'\n'+COL.MAGENTA) + COL.CLEAR) 

350 tablines = tabs[0].split('\n') 

351 rowpattern = '('+re.sub('[^|]', '.', 

352 tablines[0]).replace('|', r')(\|)(')+')' 

353 for i in range(len(tablines)): 

354 if len(tablines[i]) < 2: 

355 continue 

356 if not tablines[i][1] in ' 0123456789': 

357 continue 

358 split = tablines[i].split('|') 

359 status = split[2].strip() 

360 if status not in NODELEGEND_COLORS: 

361 continue 

362 split[2] = (color(bg=NODELEGEND_COLORS[status], fg='#000000') + 

363 split[2] + COL.CLEAR) 

364 # color deps 

365 deps = re.split('(\d+)', split[3]) 

366 for ii in range(1, len(deps), 2): 

367 dstat = statuses[nodes[int(deps[ii])]].status 

368 deps[ii] = (color(fg=NODELEGEND_COLORS[dstat]) + 

369 deps[ii] + COL.CLEAR) 

370 split[3] = ''.join(deps) 

371 tablines[i] = '|'.join(split) 

372 tabs[0] = '\n'.join(tablines) 

373 # color graph node boxes 

374 if plot: 

375 graphlines = graph.split('\n') 

376 coords = [[] for l in graphlines] 

377 for n, node in enumerate(nodes): 

378 for i, gline in enumerate(graphlines): 

379 parts = re.split(f'([ ]+{n}[ ]+)', gline) 

380 if len(parts) == 1: 

381 continue 

382 j_start = len(parts[0]) 

383 j_end = j_start + len(parts[1]) 

384 i_start = i_end = i 

385 coords[i].append([j_start, j_end, n]) 

386 while graphlines[i_start-1][j_start] == ' ': 

387 i_start -= 1 

388 coords[i_start].append([j_start, j_end, n]) 

389 while graphlines[i_end+1][j_start] == ' ': 

390 i_end += 1 

391 coords[i_end].append([j_start, j_end, n]) 

392 # for ii in [i_start-1, i_end+1]: 

393 # graphlines[ii] = (graphlines[ii][:j_start-1] + 

394 # ' '*(j_end-j_start+2) + 

395 # graphlines[ii][j_end+1:]) 

396 # coords[ii].append([j_start, j_end, n]) 

397 break 

398 else: 

399 continue 

400 for i, gline in enumerate(graphlines): 

401 colorline = [gline] 

402 for j_start, j_end, n in sorted(coords[i], key=lambda c: -c[0]): 

403 col = color(bg=NODELEGEND_COLORS[statuses[nodes[n]].status], 

404 fg="#000000") 

405 colorline = [colorline[0][:j_start], 

406 col+colorline[0][j_start:j_end]+COL.CLEAR, 

407 colorline[0][j_end:]] + colorline[1:] 

408 # colorline = [colorline[0][:j_start-1], 

409 # col+' '+colorline[0][j_start:j_end]+' '+COL.CLEAR, 

410 # colorline[0][j_end+1:]] + colorline[1:] 

411 graphlines[i] = ''.join(colorline) 

412 graph = '\n'.join(graphlines) 

413 tab = '\n\n'.join(bold(t) for t in tabs) 

414 return RES_FMT.format(graph=graph, flagtab=flagtab, tab=tab) 

415 

416 def dependency_graph(self, outfile: str = None, title: str = "FileGraph", 

417 urls: str = None, bgcolor: str = 'black'): 

418 """ 

419 Return a graphviz .dot graph of the ``FileHandler`` instances in this 

420 ``FileGraph`` and their ``DEPENDENCIES`` on each other. Plot their 

421 status (whether the file exists) as well as their metadata. Optionally 

422 plot the graph to an output image file visualizing the graph. 

423 

424 Parameters 

425 ---------- 

426 outfile : str, optional 

427 If not provided, return a string in ``.dot`` file format specifying 

428 graph relations. If an output file is specified, infer the filetype 

429 and write to that file. 

430 title : str, optional 

431 The name of the graph to use. If not provided, "FileGraph" 

432 will be used. 

433 urls : str, optional 

434 A format string for including URLs attributes for each 

435 ``FileHandler`` node. If provided, include a URL attribute pointing 

436 to each FILENAME for use on the summary page website, where the 

437 ``FILENAME`` of each ``FileHandler`` instance will be provided to 

438 the ``urls`` format string as the only ``format`` argument. The URL 

439 attribute is not compatible with all ``graphviz`` output formats 

440 (http://www.graphviz.org/doc/info/attrs.html#d:URL) so make sure to 

441 turn it off if publishing to an incompatible format. 

442 bgcolor : str, optional 

443 The background color to use for the generated plot. 

444 

445 Returns 

446 ------- 

447 dot : str 

448 The dependency graph in ``.dot`` format (can be used as input to 

449 ``dot`` at the command line). This is returned regardless of 

450 whether an outfile is specified. 

451 

452 Raises 

453 ------ 

454 ValueError 

455 If the ``FileHandler`` instances in this ``FileGraph`` do not all 

456 refer to the same event. 

457 

458 Optional file extensions for outfile: 

459 

460 - *dot*: just save the dotfile in .dot format. 

461 - *png*: save the image in PNG format. 

462 - *pdf*: save the image in PDF format. 

463 - *svg*: save the image in svg format. 

464 """ 

465 if not len({fh.eventdir for fh in self.values()}) == 1: 

466 raise ValueError(("All ``FileHandler`` instances must refer to " 

467 "same event. Instead, got: {}").format(self)) 

468 nodes = {fh: i for i, fh in enumerate(self.values())} 

469 obscheck = dict() 

470 statuses = {fh: fh.status(obscheck=obscheck) for fh in nodes} 

471 flagnode = NODEFLAGSFMT.format(rows=r'<BR ALIGN="LEFT"/>'.join([ 

472 NODEFLAGS_ROW_FMT.format(flag=f, value=v) 

473 for f, v in list(self.values())[0].flags.items() 

474 ] + [""])) # get a trailing left-aligned line break 

475 

476 def join_stats(stats): 

477 """Add formatting to stats list.""" 

478 if stats: 

479 stats.append("") # one last BR to ensure left-aligned 

480 return "|" + '<BR ALIGN="LEFT"/>'.join(stats) 

481 else: 

482 return "" 

483 

484 nodedot = NODELEGEND + '\n' + flagnode + '\n' + '\n'.join( 

485 NODEFMT.format( 

486 num=i, 

487 name=type(fh).__name__, 

488 fname=fh.FILENAME, 

489 url=( 

490 'URL="{}", '.format(urls.format(fh.FILENAME)) 

491 if urls else '' 

492 ), 

493 status=statuses[fh].status, 

494 stats=join_stats(statuses[fh].stats), 

495 color=statuses[fh].color, 

496 ) for fh, i in nodes.items() 

497 ) 

498 edgedot = '\n'.join(EDGEFMT.format(depnum=nodes[d(fh)], num=i, 

499 color=statuses[d(fh)].color) 

500 for fh, i in nodes.items() 

501 for d in fh.DEPENDENCIES) 

502 dot = DOTFMT.format(name=title, nodes=nodedot, edges=edgedot, 

503 bgcolor=bgcolor) 

504 if outfile is not None: 

505 plot_graphviz(dot, outfile) 

506 return dot 

507 

508 def _check_compatible(self, other): 

509 """ 

510 Check whether these two ``FileGraph`` instances are consistent and can 

511 thus be added/subtracted etc. Raises a ``ValueError`` if they are not 

512 compatible. 

513 """ 

514 if not isinstance(other, FileGraph): 

515 raise ValueError(("Both objects must be ``FileGraph`` " 

516 "instances, instead got {} of type" 

517 "{}").format(other, type(other))) 

518 contradictions = {k: (self[k], other[k]) 

519 for k in self 

520 if k in other and self[k] != other[k]} 

521 if contradictions: 

522 raise ValueError(("Both objects must have matching values when " 

523 "keys match to be added. The following keys " 

524 "did not match between ``FileGraph`` 1 {} " 

525 "and 2 {}: {}").format(self, other, 

526 contradictions)) 

527 

528 def __add__(self, other): 

529 """ 

530 Get the union of two ``FileGraph`` instances. 

531 """ 

532 self._check_compatible(other) 

533 selfdict = dict(self) 

534 selfdict.update(other) 

535 return FileGraph(selfdict) 

536 

537 def __sub__(self, other): 

538 """ 

539 Return a ``FileGraph`` with all the elements of ``self`` and none 

540 of the elements of ``other``. Works even if ``other`` contains 

541 ``FileHandler`` instances that are not in ``self``. 

542 """ 

543 self._check_compatible(other) 

544 return FileGraph({k: v for k, v in self.items() if k not in other}) 

545 

546 def downselect(self, invert=False, reducer=all, **kwargs): 

547 """ 

548 Get a ``FileGraph`` of file handlers that match *all* of the 

549 provided criteria. Checks in this docstring are defined in 

550 DOWNSELECT_CHECKS. 

551 

552 Parameters 

553 ---------- 

554 invert : bool 

555 Invert results. (Default: False) 

556 reducer : function 

557 Specify the ``any`` builtin to match if any check passes. Specify 

558 ``all`` to match only when every check passes. (Default: ``all``) 

559 equals : AbstractFileHandler or list 

560 The filehandler must equal the provided ``FileHandler`` instance 

561 or, if a list of instances is provided, it must be equal to one of 

562 them. 

563 instanceof : type 

564 The ``FileHandler`` must be an instance of this class. 

565 type : type 

566 The type of the ``FileHandler`` must exactly match. 

567 typename : str or list 

568 The FileHandler type's name must match this string (or one of 

569 these strings if given an iterable of strings). 

570 extension : str or list 

571 The file extension for the file name (or one of these strings if 

572 given an iterable of strings). 

573 startswith : str or list 

574 The file name starts with this string (or one of these strings if 

575 given an iterable of strings). 

576 endswith : str or list 

577 The file name ends with this string (or one of these strings if 

578 given an iterable of strings). 

579 inname : str or list 

580 The file name contains this substring (or one of these strings if 

581 given an iterable of strings). 

582 nameis : str or list 

583 The filename equals this string (or one of these strings if given 

584 an iterable of strings). 

585 depends : type or list 

586 Has this ``FileHandler`` class (or one of these ``FileHandler`` 

587 classes) as a dependency. 

588 ancestor : type or list 

589 Has this ``FileHandler`` class (or one of these ``FileHandler`` 

590 classes) as an ur dependency, i.e. steps *AFTER* this 

591 ``FileHandler``. 

592 descendent : type or list 

593 Has this ``FileHandler`` class (or one of these ``FileHandler`` 

594 classes) as a descendent, i.e. steps *BEFORE* this ``FileHandler``. 

595 subgraph : type or list 

596 Same as ``descendent``, but also include FileHandlers that are 

597 instances of the query arguments. Defines a subgraph of the 

598 original ``FileGraph`` graph containing only the given 

599 ``FileHandler`` instances and their ``DEPENDENCIES``. 

600 dependsname : str or list 

601 Has the ``FileHandler`` with this name (or one of these names if 

602 given an iterable of strings) as a dependency. 

603 dependsfile : str or list 

604 Has the ``FileHandler`` with this filename (or one of these names 

605 if given an iterable of strings) as a dependency. 

606 exists : bool 

607 Whether the returned ``FileHandler`` instances' output files exist. 

608 cooldown : bool 

609 Whether the returned ``FileHandler`` instances are currently 

610 cooling down. 

611 intent : bool 

612 Whether the returned ``FileHandler`` instances are currently 

613 being generated. 

614 vetoed : bool 

615 Whether the returned ``FileHandler`` instances have been 

616 permanently vetoed. 

617 urvetoed : bool 

618 Whether the returned ``FileHandler`` instances have had any of 

619 their ancestors vetoed. Does not check the instances themselves 

620 (combine with ``vetoed`` for that). 

621 obsolete : bool 

622 Whether the returned ``FileHandler`` instances' 

623 output files exist but are obsoleted due to newer input data being 

624 available. **NB** Obsolescense *implies* that the file exists! Will 

625 not include ``FileHandler`` results whose files don't exist. 

626 selfobsolete : bool 

627 Same as ``obsolete``, but don't run obsolescence checks on each 

628 file's ancestors; only run checks relevant relevant to each 

629 ``FileHandler`` instance. 

630 depsmet : bool 

631 Whether the returned ``FileHandler`` instances' ``DEPENDENCIES`` 

632 have been met. These can be generated immediately if they don't 

633 exist. 

634 needregen : bool 

635 Whether the returned ``FileHandler`` instances that have their 

636 ``DEPENDENCIES`` met and either exist (but are obsolete) or have 

637 not been generated yet. 

638 

639 Returns 

640 ------- 

641 matches : FileGraph 

642 A new ``FileGraph`` instance containing a subset of the 

643 ``FileHandler`` instances for this ``FileGraph`` that match the 

644 given downselection criteria. 

645 """ 

646 return type(self)({ 

647 key: value for key, value in self.items() 

648 if xor( 

649 invert, 

650 reducer(DOWNSELECT_CHECKS[query_name](value, query_value) 

651 for query_name, query_value in kwargs.items()) 

652 ) 

653 }) 

654 

655 def update(self, **downselect): 

656 """ 

657 Generate any files that fit the FileGraph downselection 

658 criteria specified in ``downselect``. By default, generate all files 

659 that have not been generated and regenerate all files that have been 

660 obsoleted because their data ``DEPENDENCIES`` have changed. Will only 

661 generate files that are *immediately* generateable; if some of the 

662 files you want to generate depend on files that haven't been generated 

663 yet, you'll need to keep running this method until you've generated 

664 each successive layer of dependencies (and, finally, your target 

665 files). You can do this by running the method repeatedly until it 

666 returns ``False``. 

667 

668 Parameters 

669 ---------- 

670 **downselect 

671 Keyword arguments that can be passed to ``Event.downselect`` to 

672 narrow down the set of ``FileHandler`` instances that should be 

673 generated. 

674 

675 Returns 

676 ------- 

677 files_submitted_for_generation : Iterable 

678 An iterable of ``FileHandler`` instances that have been checked out 

679 and submitted for generation. 

680 """ 

681 if not downselect: 

682 downselect = ImmutableDict({'needregen': True}) 

683 regenlist = self.downselect(**downselect) 

684 # only generate files whose DEPENDENCIES do not also need to be 

685 # regenerated (in order to maintain self-consistency) 

686 notyet = regenlist.downselect(ancestor=[type(f) for f in 

687 regenlist.values()]) 

688 regenlist = regenlist - notyet 

689 if not regenlist: 

690 return tuple() 

691 LOGGER.debug("Regenlist (downselection) to update (after removing " 

692 "obsolete ``DEPENDENCIES``): %s", regenlist) 

693 return get_io('file').generate.submit(regenlist) 

694 

695 

696def recursive_obsolescence(func): 

697 """ 

698 Store ``is_obsolete`` values for repeated calls to make sure that we don't 

699 recompute them while recursively checking ``is_obsolete`` values. 

700 """ 

701 @functools.wraps(func) 

702 def wrapper(self, checked=None, **kwargs): 

703 if checked is None: 

704 checked = dict() 

705 if self in checked: 

706 return checked[self] 

707 checked[self] = func(self, checked=checked, **kwargs) 

708 return checked[self] 

709 return wrapper 

710 

711 

712class FileHandler(AbstractFileHandler, RequiredAttributeMixin, 

713 IntentMixin, CoolDownMixin, GitDirMixin, FlagsMixin, 

714 VetoMixin, LockMixin, MetaDataMixin, IOMixin): 

715 """ 

716 A class for generating, opening, and checking existence of data files 

717 associated with these events. Specify the maximum amount of time that 

718 should be spent generating each file by setting the TIMEOUT attribute of 

719 the relevant implementation class. *Instances are immutable* other than 

720 their ``parent`` and ``graph`` attributes. 

721 

722 ``FileHandler`` instances can check whether their corresponding output data 

723 has been generated and stored. If the data in their input ``DEPENDENCIES`` 

724 have changed, they can dynamically check whether their corresponding 

725 outputs need to be regenerated. Use ``DEP_CHECKSUM_KWARGS`` to specify 

726 which subsets of input data are relevant to each ``FileHandler``; these 

727 keyword arguments will be fed to the ``checksum`` methods of each 

728 ``dependency`` to see whether relevant subsets of input data have changed 

729 (causing the current version of the ``FileHandler`` instance to become 

730 obsolete and triggering automatic file regeneration on the next update). 

731 

732 Required Class Attributes 

733 ------------------------- 

734 The following class attributes must be defined in subclasses, either 

735 manually or programmatically (see: ``FileHandler.set_class_attributes`` and 

736 its implementations in subclasses). 

737 

738 FILENAME : str 

739 The base filename for this filehandler as it will appear in an event 

740 directory. 

741 

742 DEPENDENCIES : Tuple[FileHandler] 

743 A tuple of other ``FileHandler`` subclasses whose data this 

744 ``FileHandler`` uses in order to ``generate`` its own output. 

745 

746 MANIFEST_TYPES : Tuple[FileHandler] 

747 A tuple of other ``FileHandler`` subclasses that are generated at the 

748 same time as this one. In other words, running ``self.generate`` for 

749 any of the subclasses in ``MANIFEST_TYPES`` will produce all of the 

750 files in that tuple. 

751 

752 UR_DEPENDENCIES : Tuple[FileHandler] 

753 Return a list of ur-dependencies, i.e. ``DEPENDENCIES`` (of 

754 ``DEPENDENCIES`` etc.) of this ``FileHandler``, i.e. ``FileHandler`` 

755 classes whose data is ultimately used (after some number of steps in 

756 the DAG) to generate this ``FileHandler``. The list is ordered such 

757 that its files can be generated in order without encountering missing 

758 dependencies (i.e. items deepest in ``cls.UR_DEPENDENCY_TREE`` come 

759 first). 

760 

761 UR_DEPENDENCY_TREE : ImmutableDict 

762 A dict of all ``DEPENDENCIES`` of ``DEPENDENCIES`` going back to the 

763 original input files that are ultimately required to generate this 

764 file. Maps ``FileHandler`` classes to dictionaries of their own 

765 ancestry trees recursively starting at the current FileHandler. The 

766 deepest items in the dictionary are the furthest ``DEPENDENCIES`` up 

767 the dependency graph (and consequently the files that must be generated 

768 first in order for the shallower files to be generated). See 

769 ``FileHandler.UR_DEPENDENCIES`` for a flattened, ordered version. 

770 

771 Parameters 

772 ---------- 

773 eventid_or_fh : str, llama.Event, or llama.FileHandler 

774 the ``eventid`` of the file handler or else another ``FileHandler`` or 

775 ``Event`` instance (though anything with ``eventid`` and ``rundir`` 

776 properties will work). If such an object is provided, the ``rundir`` 

777 will be inferred therefrom. 

778 rundir : str, optional 

779 The directory in which events from this run are being stored. If 

780 ``eventid_or_fh`` is an object with a ``rundir`` attribute, then that 

781 value of ``rundir`` will be used and this argument will be ignored. 

782 (See ``DEFAULT_RUN_DIR`` for default value.) Overrides the ``rundir`` 

783 specified in ``eventid_or_fh``. 

784 parent : FileHandler, optional 

785 If this ``FileHandler`` instance is being used to generate a different 

786 ``FileHandler`` instance, specify the original instance as the 

787 ``parent``. Overrides the ``rundir`` specified in ``eventid_or_fh``. 

788 

789 Raises 

790 ------ 

791 AssertionError 

792 If class constants in ``cls.required_attributes`` are not defined or if 

793 ``cls.MANIFEST_TYPES`` has any inconsistencies. 

794 """ 

795 

796 COOLDOWN_PARAMS = CoolDownParams(base=60, increment=60, maximum=240*60) 

797 

798 FILENAME = None 

799 DEPENDENCIES = None 

800 MANIFEST_TYPES = None 

801 UR_DEPENDENCIES = None 

802 UR_DEPENDENCY_TREE = None 

803 

804 _REQUIRED = ("FILENAME", "DEPENDENCIES", "MANIFEST_TYPES", 

805 "UR_DEPENDENCIES", "UR_DEPENDENCY_TREE") 

806 

807 @classmethod 

808 def set_class_attributes(cls, subclass): 

809 """ 

810 Decorater for a new subclass that sets its ``MANIFEST_TYPES`` class 

811 attribute to the default ``FileHandler`` value without requiring them 

812 to be manually specified. Also determines the ``UR_DEPENDENCIES`` and 

813 ``UR_DEPENDENCY_TREE`` based on ``subclass.DEPENDENCIES``. **NB: 

814 manually set class attributes will be overwritten by this decorator.** 

815 

816 Parameters 

817 ---------- 

818 subclass : type 

819 The subclass whose class attributes need to be set. 

820 

821 Returns 

822 ------- 

823 subclass : type 

824 The same decorated subclass. 

825 """ 

826 subclass.MANIFEST_TYPES = (subclass,) 

827 if subclass.DEPENDENCIES is None: 

828 raise ValueError("You must define DEPENDENCIES first for subclass " 

829 f"{subclass}. Got ``None``.") 

830 subclass.UR_DEPENDENCY_TREE = ImmutableDict({ 

831 d: d.UR_DEPENDENCY_TREE for d in subclass.DEPENDENCIES 

832 }) 

833 # flatten this to get UR_DEPENDENCIES 

834 next_branches = [subclass.UR_DEPENDENCY_TREE] 

835 ur_dependencies = list() 

836 while next_branches: 

837 ur_dependencies += [dep for b in next_branches for dep in b] 

838 next_branches = [b[dep] for b in next_branches for dep in b] 

839 sorted_unique_ur_deps = list() 

840 for dep in ur_dependencies[::-1]: 

841 if dep not in sorted_unique_ur_deps: 

842 sorted_unique_ur_deps.append(dep) 

843 subclass.UR_DEPENDENCIES = tuple(sorted_unique_ur_deps) 

844 return subclass 

845 

846 def __getnewargs__(self): 

847 """ 

848 Get arguments to pass to ``new``. Used for pickling and copying. 

849 """ 

850 return (self.eventid, self.rundir) 

851 

852 # Required constants for subclasses go in _REQUIRED 

853 

854 DEP_CHECKSUM_KWARGS = ImmutableDict({}) 

855 

856 def checksum(self, **kwargs): 

857 """ 

858 Get the sha256 checksum of this file's contents. Use this to version 

859 files and check whether their contents have changed. 

860 

861 Parameters 

862 ---------- 

863 kwargs : dict 

864 **(Ignored in the base** ``FileHandler`` **implementation)**. 

865 Subclass implementations can optionally use input arguments to 

866 identify relevant subsets of the file's contents for versioning or 

867 diff-checking purposes. In this way, checks can be made for changes 

868 on only specific subsets of file contents. This is useful for 

869 ignoring changes to unused input data when checking for 

870 obsolescence. 

871 """ 

872 with self.open('rb') as infile: 

873 return sha256(infile.read()).hexdigest() 

874 

875 def dep_checksums(self): 

876 """ 

877 Recalculate the ``sha256`` sums of the contents of the input files 

878 (i.e. ``DEPENDENCIES``) for this ``FileHandler`` instance's 

879 ``manifest_filehandlers`` (either to store them or to check whether they 

880 have changed from the stored values). 

881 

882 Returns 

883 ------- 

884 dep_checksums : dict 

885 Keys are ``FileHandler.clsname`` values for each of this 

886 ``FileHandler`` class's ``DEPENDENCIES`` and values are the 

887 corresponding ``checksums`` of each file. These checksums uniquely 

888 determine the exact input files used and are suitable for creating 

889 snapshots of pipeline state. 

890 dep_subset_checksums : dict 

891 Same as ``checksums`` but with ``self.DEP_CHECKSUM_KWARGS`` applied 

892 to each dependency's ``checksum`` method to **only** check whether 

893 the data used by ``self.generate`` has changed. These checksums 

894 determine whether the ``DEPENDENCIES`` have changed *in a way that 

895 is meaningful to this specific filehandler* (since unused fields 

896 are ignored) and are suitable for determining whether a 

897 ``FileHandler`` needs to be regenerated based on the availability 

898 of new data. (To avoid unnecessary computation, these checksums are 

899 only calculated separately from ``checksums`` when a dependency has 

900 a set of ``DEP_CHECKSUM_KWARGS`` defined for it.) 

901 """ 

902 sums = dict() 

903 sub_sums = dict() 

904 for dep in self.DEPENDENCIES: 

905 depfh = dep(self) 

906 sums[depfh.clsname] = depfh.checksum() 

907 sum_kwargs = self.DEP_CHECKSUM_KWARGS.get(dep) 

908 if sum_kwargs: 

909 sub_sums[depfh.clsname] = depfh.checksum(**sum_kwargs) 

910 else: 

911 sub_sums[depfh.clsname] = sums[depfh.clsname] 

912 assert set(sums) == set(sub_sums) 

913 return sums, sub_sums 

914 

915 @property 

916 def filename_for_download(self): 

917 """ 

918 Get a filename that includes the ``eventid``, revision number, and 

919 version hash for this file (i.e. what version number this is in the 

920 version history; e.g. if three versions of *this* file exist in the 

921 version history, then this is version 3). If this file does not appear 

922 in the git history, it will be marked 'v0' and the hash will be 

923 'UNVERSIONED'. The output format is ``eventid``, version, first 7 

924 digits of commit hash, and filename, split by hyphens, so that the 

925 third version of ``skymap_info.json`` for event ``S1234a`` with git 

926 hash ``dedb33f`` would be called 

927 ``S1234a-v3-dedb33f-skymap_info.json``. Use this for file downloads or 

928 files sent to other services in order to facilitate data product 

929 tracking outside the highly-organized confines of a pipeline run 

930 directory. 

931 """ 

932 return self.git.filename_for_download(self.FILENAME) 

933 

934 @property 

935 def auxiliary_paths(self): 

936 """ 

937 Return all names of *possible* auxiliary (rider) files associated 

938 with this ``FileHandler`` instance. These are things like metadata, 

939 cooldown, veto, and locking files. 

940 """ 

941 return (f for f in (list(self.meta.filenames) + 

942 list(self.cooldown.filenames) + 

943 list(self.intent.filenames) + 

944 list(self.veto.vetofilenames) + 

945 list(self.lock.lockfiles) + 

946 list(self.lock.obsolescence_files))) 

947 # if os.path.exists(os.path.join(self.eventdir, f))) 

948 

949 def open(self, mode='r'): 

950 """ 

951 Open this file in readonly mode and return the resulting object. 

952 """ 

953 # mode can only be 'r' or 'rb' 

954 if mode not in ['r', 'rb', 'br']: 

955 raise ValueError('File is readonly.') 

956 return open(self.fullpath, mode) 

957 

958 def __copy__(self): 

959 """ 

960 Return an instance of this ``FileHandler`` with the same 

961 properties, *including* ``graph`` and ``parent``. 

962 """ 

963 dup = type(self)(self) 

964 if hasattr(self, '_graph'): 

965 dup.graph = self.graph 

966 if hasattr(self, '_parent'): 

967 dup.parent = self.parent 

968 return dup 

969 

970 def __deepcopy__(self, memo=None): 

971 """ 

972 Same as ``__copy__`` but make recursive deepcopies of attributes. 

973 """ 

974 dup = type(self)( 

975 deepcopy(self.eventid), 

976 rundir=deepcopy(self.rundir), 

977 ) 

978 if hasattr(self, '_graph'): 

979 dup.graph = deepcopy(self.graph, memo=memo) 

980 if hasattr(self, '_parent'): 

981 dup.parent = deepcopy(self.parent, memo=memo) 

982 return dup 

983 

984 @property 

985 def subgraph(self): 

986 """ 

987 Get all ur ``DEPENDENCIES`` along with this ``FileHandler`` instance in 

988 a single ``FileGraph`` instance. 

989 """ 

990 fhs = list(self.UR_DEPENDENCIES) + [type(self)] 

991 return FileGraph({fh.__name__: fh(self) for fh in fhs}) 

992 

993 @property 

994 def graph(self): 

995 """ 

996 Get the ``FileGraph`` to which this ``FileHandler`` instance belongs. 

997 This can be set dynamically to associate a ``FileHandler`` with a given 

998 ``FileGraph`` (and hence a given ``Pipeline``). If ``graph`` is not 

999 manually set, it defaults to the ``subgraph`` of this ``FileHandler``. 

1000 """ 

1001 return getattr(self, '_graph', self.subgraph) 

1002 

1003 @graph.setter 

1004 def graph(self, graph): 

1005 if not isinstance(graph, FileGraph): 

1006 raise ValueError(("Must be an instance of ``FileGraph``, instead " 

1007 "got {}").format(graph)) 

1008 setattr(self, '_graph', graph) 

1009 

1010 def diff_contents(self, other, force=False): 

1011 """ 

1012 Return a diff of two text files. If the files are not text files 

1013 (i.e. if their file extensions are not in ``TEXT_FILE_EXTENSIONS``), 

1014 prints "Binary files ``self`` and ``other`` differ" (with ``self`` and 

1015 ``other`` replaced with their full file paths). 

1016 

1017 Parameters 

1018 ---------- 

1019 other : FileHandler, str 

1020 The ``FileHandler`` to diff to this one or a path to a file to diff 

1021 with this one. 

1022 force : bool, optional 

1023 Whether to force the diff (as if ``self`` and ``other`` are both 

1024 text files) even if the files are not recognized as a form of text 

1025 file. 

1026 

1027 Returns 

1028 diff : str 

1029 A textual diff of the two files' contents, provided they are both 

1030 recognized as text files (or ``force`` is ``True``); or else a 

1031 message saying that they differ. Prints nothing if the files are 

1032 the same. 

1033 """ 

1034 BIN_FMT = "Binary files {} and {} differ" 

1035 otherpath = other.fullpath if isinstance(other, FileHandler) else other 

1036 selfpath = self.fullpath 

1037 txt = force or ( 

1038 any(selfpath.endswith(x) for x in TEXT_FILE_EXTENSIONS) and 

1039 any(otherpath.endswith(x) for x in TEXT_FILE_EXTENSIONS) 

1040 ) 

1041 if not txt: 

1042 if self.compare_contents(other): 

1043 return "" 

1044 return BIN_FMT.format(selfpath, otherpath) 

1045 with self.open() as selffile, open(otherpath, 'r') as otherfile: 

1046 selflines = selffile.readlines() 

1047 otherlines = otherfile.readlines() 

1048 # make sure last line ends with newline 

1049 for lines in selflines, otherlines: 

1050 if lines[-1][-1] != '\n': 

1051 lines[-1] += '\n' 

1052 return "".join(difflib.unified_diff(selflines, otherlines)) 

1053 

1054 def compare_contents(self, other): 

1055 """ 

1056 Check whether the contents of this ``FileHandler`` instance's file are 

1057 the same as the contents of the other ``FileHandler`` instance's file. 

1058 

1059 Parameters 

1060 ---------- 

1061 other : FileHandler, str 

1062 The ``FileHandler`` to compare to this one or else a path to a file 

1063 to compare to this one. 

1064 

1065 Returns 

1066 same : bool 

1067 Returns ``True`` if both files exist and have the same contents. 

1068 Otherwise, returns ``False``. 

1069 """ 

1070 # TODO This is not database implementation friendly 

1071 otherpath = other.fullpath if isinstance(other, FileHandler) else other 

1072 selfpath = self.fullpath 

1073 for path in (selfpath, otherpath): 

1074 if not os.path.isfile(path): 

1075 LOGGER.warning("Tried to compare contents of %s, but file %s " 

1076 "does not exist. Returning False.", self, path) 

1077 return False 

1078 return filecmp.cmp(selfpath, otherpath) 

1079 

1080 def exists(self): 

1081 """ 

1082 Check whether the file associated with this handler has yet been 

1083 generated. 

1084 """ 

1085 return os.path.isfile(self.fullpath) 

1086 

1087 @recursive_obsolescence 

1088 def is_obsolete(self, checked=None, ancestors=True): 

1089 """ 

1090 Check whether this file exists but needs to be regenerated by seeing 

1091 whether any of its ``DEPENDENCIES`` have updated their file contents 

1092 since this file was made. This is a somewhat conservative check to see 

1093 whether any files need to be regenerated automatically; it will return 

1094 False if there is any ambiguity (in which case you will need to 

1095 manually delete and regenerate child files). You can extend this 

1096 definition with extra obsolescence criteria, but make sure to call 

1097 ``super`` to keep this automatic regeneration functionality in response 

1098 to regenerated ``DEPENDENCIES``. 

1099 

1100 If the file is marked as "locked" (see: ``llama.lock.LockHandler``), 

1101 the file will **never** be marked obsolete. This provides a way to 

1102 manually prevent file obsolescence in situations in which it does not 

1103 apply. 

1104 

1105 If any of this file's ancestors are obsolete, then this file will be 

1106 marked obsolete. You can skip this check with ``ancestors=False``. 

1107 

1108 Failing that, if the output file does not exist, or if its input 

1109 ``DEPENDENCIES`` do not exist, it is not considered obsolete, and this 

1110 method will return False. Likewise, if the file has no 

1111 ``DEPENDENCIES``, it cannot be naively obsoleted, and this method will 

1112 return False. 

1113 

1114 Failing that, it will be marked obsolete if its 

1115 ``dep_subset_checksums()[1]`` (see: ``FileHandler.dep_checksums`` 

1116 second return value) have changed from their previously recorded 

1117 values. If those checksums are not recorded, it will not be marked as 

1118 obsolete. 

1119 

1120 Parameters 

1121 ---------- 

1122 checked : dict, optional 

1123 A dictionary mapping ``FileHandler`` instances that have previously 

1124 had their obsolescences checked mapped to whether they are yet 

1125 obsolete; used internally to track whether this ``FileHandler`` 

1126 instance's ``DEPENDENCIES`` are obsolete without recomputing them. 

1127 ancestors : bool, optional 

1128 If ``False``, don't check whether ancestors are obsolete; only run 

1129 this ``FileHandler`` instance's obsolescence checks. 

1130 """ 

1131 if self.lock.is_locked: 

1132 return False 

1133 if ancestors: 

1134 for dep in self.DEPENDENCIES: 

1135 if dep(self).is_obsolete(checked=checked): 

1136 return True 

1137 if not self.exists(): 

1138 return False 

1139 if not self.DEPENDENCIES: 

1140 return False 

1141 for dep in self.DEPENDENCIES: 

1142 if not dep(self).exists(): 

1143 return False 

1144 try: 

1145 dep_checksums = self.dep_checksums()[1] 

1146 prev_dep_checksums = self.meta.dep_subset_checksums 

1147 if dep_checksums != prev_dep_checksums: 

1148 # LOGGER.debug(f"{self} found obsolete due to changed " 

1149 # "dependency checksums. current checksums: " 

1150 # f"{dep_checksums}, checksums at gen time: " 

1151 # f"{prev_dep_checksums}") 

1152 return True 

1153 except (FileNotFoundError, KeyError, IOError): 

1154 pass 

1155 return False 

1156 

1157 def delete(self): 

1158 """ 

1159 Delete this file if it exists, along with all of its rider files, and 

1160 commit that change to version control. 

1161 """ 

1162 if not self.exists(): 

1163 raise IOError("Can't delete, doesn't exist: " + self.fullpath) 

1164 evdir = Path(self.eventdir) 

1165 aux = [evdir/p for p in self.auxiliary_paths if (evdir/p).exists()] 

1166 LOGGER.info(f"Deleting file {self.fullpath} and aux paths {aux}") 

1167 self.git.remove(self.FILENAME, *aux) 

1168 

1169 @property 

1170 def manifest_filehandlers(self): 

1171 """ 

1172 A set of ``FileHandler`` instances generated by this ``FileHandler`` 

1173 (not including temp files that should be deleted after generation). By 

1174 default, this set only includes the filename for this FileHandler. If 

1175 ``generate`` fails, these files will be removed to enforce atomicity. 

1176 

1177 If you are making a bunch of ``FileHandler`` subclasses that are all 

1178 generated with the same method, you should define an abstract base 

1179 class for those ``FileHandler`` classes with a suitable ``_generate`` 

1180 method and a ``manifest_filehandlers`` property that returns all of 

1181 their filenames. The subclasses then only need to return those 

1182 ``FileHandler`` instances (and any other relevant methods and 

1183 properties unique to each subclass ``FileHandler``). 

1184 """ 

1185 return [c(self) for c in self.MANIFEST_TYPES] 

1186 

1187 @property 

1188 def manifest(self): 

1189 """ 

1190 A set of filenames generated by this ``FileHandler`` (not 

1191 including temp files that should be deleted after generation). By 

1192 default, this set only includes the filename for this FileHandler. 

1193 If ``generate`` fails, these files will be removed to enforce atomicity. 

1194 

1195 In general, you should not need to modify this, since it will return 

1196 filenames from the ``FileHandler`` instances in 

1197 ``self.manifest_filehandlers``, which should contain an exhaustive list 

1198 of ``FileHandler`` instances associated with the ``_generate`` method 

1199 that creates them. 

1200 """ 

1201 filenames = {fh.FILENAME for fh in self.manifest_filehandlers} 

1202 assert len(filenames) == len(self.manifest_filehandlers) 

1203 return filenames 

1204 

1205 def modtime(self, ts=None): 

1206 """ 

1207 Get a ``datetime`` object with the modification time of this 

1208 file, assuming it exists. 

1209 

1210 Parameters 

1211 ---------- 

1212 ts : int or float, optional 

1213 If the file does not exist, return a datetime parsed from the UNIX 

1214 timestamp given by ``ts`` if provided; otherwise, return ``None``. 

1215 Use this to provide a fallback modification time in cases when 

1216 modification times might need to be compared between existing files 

1217 and files whose existence is uncertain. 

1218 """ 

1219 if self.exists(): 

1220 return fromtimestamp(os.path.getmtime(self.fullpath)) 

1221 elif ts is not None: 

1222 return fromtimestamp(ts) 

1223 return None 

1224 

1225 @property 

1226 def _tempdir(self): 

1227 """ 

1228 Path to a directory for temporary files associated with this 

1229 ``Event``. This temporary directory is located within 

1230 ``self.eventdir``. 

1231 """ 

1232 return os.path.join(self.eventdir, LOCAL_TMPDIR_NAME) 

1233 

1234 @property 

1235 def chmod(self, mode): 

1236 """ 

1237 Change read/write/execute permissions of a file. ``mode`` has same 

1238 meaning as in ``os.chmod``. 

1239 """ 

1240 os.chmod(self.fullpath, mode) 

1241 

1242 @property 

1243 def read_bytes(self, memmap=False): 

1244 """ 

1245 Read full file into memory as a ``bytes`` object. If ``memmap`` is 

1246 ``True``, create a memory-map object (workalike to a ``bytes`` object) 

1247 mapped to the data on disk (what you want to use if you don't know the 

1248 file size, since it could be larger than available memory). Note that 

1249 if ``memmap=True``, you'll need to manually close the returned 

1250 bytes-like object to free up the file descriptor when you're done using 

1251 its ``close`` method. Returns the resulting bytes-like object 

1252 containing the file's data. Raises a ``FileNotFoundError`` if the file 

1253 does not exist. 

1254 """ 

1255 if not os.path.isfile(self.fullpath): 

1256 raise FileNotFoundError(f"Cannot read bytes, {self.fullpath} " 

1257 "does not exist.}") 

1258 if memmap: 

1259 return mmap(self.open('rb').fileno(), 0, access=ACCESS_READ) 

1260 else: 

1261 return Path(self.fullpath).read_bytes() 

1262 

1263 @property 

1264 def write_bytes(self, data): 

1265 """ 

1266 Write ``data`` (a bytes-like object) to file, replacing any existing 

1267 file contents. ``data`` can be a memory-mapped object or a file-object 

1268 opened in binary mode (autodetected), allowing ``write_bytes`` to work 

1269 on memory-mapped byte arrays returned by ``read_bytes``. Raises a 

1270 ``ValueError`` if ``bytes`` can't be read from ``data``. 

1271 """ 

1272 if hasattr(data, 'read'): 

1273 buf = data.read(BUFSIZE) 

1274 if not isinstance(buf, bytes): 

1275 raise ValueError(f"Cannot write bytes to {self.fullpath}, " 

1276 f"input {data} is not in a ``bytes`` format.") 

1277 with open(self.fullpath, 'wb') as out: 

1278 while buf: 

1279 out.write(buf) 

1280 buf = data.read(BUFSIZE) 

1281 return 

1282 Path(self.fullpath).write_bytes(data) 

1283 

1284 @IntentMixin.decorate_checkout 

1285 @CoolDownMixin.decorate_checkout 

1286 @GitDirMixin.decorate_checkout 

1287 @VetoMixin.decorate_checkout 

1288 @FlagsMixin.decorate_checkout 

1289 def checkout(self): 

1290 """ 

1291 Create a temporary directory for generating new files in. This 

1292 ensures that, in the event of file generation failure, the original 

1293 event directory is unchanged. It also ensures that the state of the 

1294 original event directory is not modified until the file generation is 

1295 completed, assisting in parallel file generation. ``DEPENDENCIES`` are 

1296 hardlinked to the temporary event directory to ensure that no extra 

1297 data is being drawn from files not explicitly declared as such. 

1298 

1299 Returns 

1300 ------- 

1301 tmp_self : FileHandler 

1302 A copy of the current FileHandler pointing to the temporary 

1303 directory where this file will actually be generated. 

1304 """ 

1305 if not os.path.exists(self._tempdir): 

1306 os.mkdir(self._tempdir) 

1307 tmp_rundir = tempfile.mkdtemp(prefix=type(self).__name__+'-', 

1308 dir=self._tempdir) 

1309 tmp_self = type(self)(self.eventid, rundir=tmp_rundir, parent=self) 

1310 # don't init the event because we don't want to initialize a git repo 

1311 os.mkdir(tmp_self.eventdir) 

1312 for dep in self.DEPENDENCIES: 

1313 # DO NOT link files, since we can't prevent the user from writing 

1314 # to them; MUST copy. 

1315 source = dep(self) 

1316 dest = dep(tmp_self) 

1317 shutil.copy(source.fullpath, dest.fullpath) 

1318 # make readonly; should raise an exception if it gets edited, 

1319 # helping ensure that generation functions do not mutate inputs 

1320 os.chmod(dest.fullpath, 0o400) 

1321 # TODO don't include aux paths 

1322 # also copy auxiliary paths, which might be needed 

1323 for aux in source.auxiliary_paths: 

1324 srcaux = os.path.join(source.eventdir, aux) 

1325 destaux = os.path.join(dest.eventdir, aux) 

1326 if os.path.isfile(srcaux): 

1327 try: 

1328 shutil.copy(srcaux, destaux) 

1329 except shutil.SameFileError: 

1330 pass 

1331 os.chmod(destaux, 0o400) 

1332 return tmp_self 

1333 

1334 @staticmethod 

1335 def _checkin_cleanup(func): 

1336 """ 

1337 Make sure to wrap ``checkin`` with this decorator at the outermost 

1338 level to delete the temporary directory regardless of whether the 

1339 attempt was successful. 

1340 """ 

1341 

1342 @functools.wraps(func) 

1343 def wrapper(self, gen_result, *args, **kwargs): 

1344 """ 

1345 Delete temporary directory. 

1346 """ 

1347 try: 

1348 return func(self, gen_result, *args, **kwargs) 

1349 finally: 

1350 LOGGER.debug("Deleting tempfiles from %s from atomic " 

1351 "generation of %s", gen_result, self) 

1352 try: 

1353 # a separate temp run directory is made for each attempt 

1354 shutil.rmtree(gen_result.fh.rundir) 

1355 except OSError: 

1356 LOGGER.warning( 

1357 "Generation attempt done, could not remove " 

1358 "temporary generation directory for %s", 

1359 self 

1360 ) 

1361 

1362 return wrapper 

1363 

1364 @_checkin_cleanup.__func__ 

1365 @IntentMixin.decorate_checkin 

1366 @GitDirMixin.decorate_checkin 

1367 @CoolDownMixin.decorate_checkin 

1368 @VetoMixin.decorate_checkin 

1369 @MetaDataMixin.decorate_checkin 

1370 def checkin(self, gen_result): 

1371 """ 

1372 Copy all generated output files generated by the temporary 

1373 ``FileHandler`` created with ``checkout`` (as listed in 

1374 ``manifest``) to this event directory once generation is complete. 

1375 

1376 gen_result : str 

1377 The generation result to be checked in, containing the temporary 

1378 filehandler with the results of the generation attempt as well as 

1379 any errors raised. 

1380 

1381 Returns 

1382 ------- 

1383 self : FileHandler 

1384 Returns the successfully checked-in ``FileHandler`` instance. 

1385 """ 

1386 LOGGER.info(f"Checking in {gen_result} to {self}.") 

1387 if gen_result.err is not None: 

1388 LOGGER.error(f"Exception while generating {self}: {gen_result}") 

1389 raise gen_result.err 

1390 src_dest_paths = [ 

1391 (os.path.join(gen_result.fh.eventdir, f), 

1392 os.path.join(self.eventdir, f)) 

1393 for f in self.manifest 

1394 ] 

1395 if not all(os.path.isfile(s) for s, d in src_dest_paths): 

1396 LOGGER.error("temp directory is missing output files specified in" 

1397 "the manifest! aborting file generation.") 

1398 raise GenerationError(("Could not find all generated files " 

1399 "in temp directory {} that are defined in " 

1400 "manifest; ABORTING.\nManifest:\n" 

1401 "{}").format(gen_result.fh.eventdir, 

1402 self.manifest)) 

1403 for src, dest in src_dest_paths: 

1404 try: 

1405 os.link(src, dest) 

1406 except OSError: 

1407 shutil.copy(src, dest) 

1408 return self 

1409 

1410 @property 

1411 def eventdir(self): 

1412 """ 

1413 The directory where data for the event this ``FileHandler`` 

1414 corresponds to is stored. 

1415 """ 

1416 return os.path.join(self.rundir, self.eventid) 

1417 

1418 @property 

1419 def fullpath(self): 

1420 """ 

1421 The full path to the file referred to by this ``FileHandler``. 

1422 """ 

1423 return os.path.join(self.eventdir, self.FILENAME) 

1424 

1425 @abstractmethod 

1426 def _generate(self, *args, **kwargs): 

1427 """ 

1428 Generate this file, whether by taking input data, reading input 

1429 files, or fetching data through an API. 

1430 """ 

1431 

1432 def __str__(self): 

1433 return '{}(eventid="{}", rundir="{}"{})'.format( 

1434 type(self).__name__, 

1435 self.eventid, 

1436 self.rundir, 

1437 f', parent="{self.parent}"' if self.parent is not None else '', 

1438 ) 

1439 

1440 def __repr__(self): 

1441 return str(self) 

1442 

1443 TIMEOUT = 20 

1444 

1445 @staticmethod 

1446 def _generate_cleanup(func): 

1447 """ 

1448 Catch any exceptions raised by ``generate`` wrappers, log them, and put 

1449 them in a generation result. 

1450 """ 

1451 

1452 @functools.wraps(func) 

1453 def wrapper(self, *args, **kwargs): 

1454 try: 

1455 return func(self, *args, **kwargs) 

1456 except Exception as err: 

1457 LOGGER.error("Cleaning unhandled generation error type %s, " 

1458 "message: %s", type(err), err) 

1459 LOGGER.error("TRACEBACK:\n%s", 

1460 tbhighlight(traceback.format_exc())) 

1461 return GenerationResult(self, err) 

1462 

1463 return wrapper 

1464 

1465 @_generate_cleanup.__func__ 

1466 @VetoMixin.decorate_generate 

1467 @MetaDataMixin.decorate_generate 

1468 def generate_unsafe(self, *args, **kwargs): 

1469 """ 

1470 Generate this file without performing any of the usual ``checkout`` and 

1471 ``checkin`` procedures; no cleanup will happen if file generation 

1472 fails. You **almost certainly** want to call ``generate`` instead, or 

1473 if you need to execute more than a single step of the pipeline, 

1474 ``FileGraph.update`` with an appropriate downselection. 

1475 

1476 Returns 

1477 ------- 

1478 gen_result : GenerationResult 

1479 A ``GenerationResult`` containing any errors raised during any part 

1480 of file generation. 

1481 """ 

1482 if list(self.DEPENDENCIES) and (not self.are_dependencies_met()): 

1483 LOGGER.error("%s has unmet dependencies; cannot generate yet.", 

1484 self) 

1485 raise GenerationError(f"Unmet dependencies for {self}.") 

1486 # close all figure windows to conserve memory 

1487 if 'matplotlib.pyplot' in sys.modules: 

1488 sys.modules['matplotlib.pyplot'].close('all') 

1489 try: 

1490 LOGGER.info(COL.BLUE + COL.BOLD + "Generating " + 

1491 str(self) + COL.CLEAR) 

1492 # invalidate obsolescence caches for all files (robust 

1493 # against code changes that change obsolescence 

1494 # definitions) TODO remove this obsolescence caching crap 

1495 self.lock.remove_all_obsolescence() 

1496 self._generate() 

1497 generated = {f: f(self).exists() for f in self.MANIFEST_TYPES} 

1498 if all(generated.values()): 

1499 LOGGER.info(COL.GREEN + f"{self} files written." + COL.CLEAR) 

1500 else: 

1501 msg = f"Files missing after {self} generation: {generated}" 

1502 LOGGER.error(COL.RED + msg + COL.CLEAR) 

1503 raise GenerationError(msg) 

1504 return GenerationResult(self) 

1505 except Exception as err: 

1506 LOGGER.warning('Wrapping exception raised while generating ' 

1507 '%s: %s("%s")', self, type(err).__name__, err) 

1508 return GenerationResult(self, err=err) 

1509 

1510 def generate(self, *args, **kwargs): 

1511 """ 

1512 Make the next version of a file syncronously, assuming it does not 

1513 exist or is in need of updating, in a syncronous manner. In the event 

1514 that some sort of error causes file generation go fail, delete the 

1515 output file (if it exists). You should ALWAYS use this instead of 

1516 ``_generate()`` in order to generate and version files safely and 

1517 atomically without risk of disrupting parallel file manipulations. If 

1518 you want to do something more complicated, like updating multiple files 

1519 in an event directory, or even multiple events, use functions like 

1520 ``Run.events.update``, ``Event.update``, 

1521 ``FileHandler.subgraph.update``, or ``FileGraph.downselect().update``. 

1522 """ 

1523 return self.checkin(self.checkout().generate_unsafe()) 

1524 

1525 def are_dependencies_met(self): 

1526 """ 

1527 Check whether the data needed to generate this filetype exists. If 

1528 there are no ``DEPENDENCIES`` (i.e. no input), this file cannot 

1529 possibly be made (since outputs are considered to be pseudo-functional 

1530 mappings from inputs, having no inputs means that no meaningful output 

1531 can exist). 

1532 """ 

1533 if not self.DEPENDENCIES: 

1534 return False 

1535 for filehandler in self.DEPENDENCIES: 

1536 if not filehandler(self).exists(): 

1537 return False 

1538 return True 

1539 

1540 def size(self): 

1541 """Get the size in bytes of this file. Raises a ``FileNotFoundError`` 

1542 if this file does not exist.""" 

1543 return os.stat(self.fullpath).st_size 

1544 

1545 def are_ur_dependencies_met(self): 

1546 """ 

1547 Recursively check whether ``DEPENDENCIES`` for this file can be 

1548 generated (by in turn running this same check on their 

1549 ``DEPENDENCIES``). In short, this is a check as to whether we can 

1550 eventually get to generating this file or whether the required data to 

1551 get to this node in the FileHandler DAG is simply not in the eventdir. 

1552 If there are no ``DEPENDENCIES`` (i.e. no input), this file cannot 

1553 possibly be made (since outputs are considered to be pseudo-functional 

1554 mappings from inputs, having no inputs means that no meaningful output 

1555 can exist). 

1556 """ 

1557 if not self.DEPENDENCIES: 

1558 return False 

1559 return (self.are_dependencies_met() or 

1560 all(fh(self).exists() or fh(self).are_ur_dependencies_met() 

1561 for fh in self.DEPENDENCIES)) 

1562 

1563 def status(self, obscheck=None): 

1564 """ 

1565 Return the status string (see ``NODELEGEND_COLORS``) for a given 

1566 ``FileHandler`` instance. 

1567 """ 

1568 stats_list = [] 

1569 if self.exists(): 

1570 try: 

1571 walltime = f"{self.meta.wall_time:8.3f}s" 

1572 except IOError: 

1573 walltime = "N/A" 

1574 mtime = self.modtime() 

1575 stats_list += [ 

1576 ( 

1577 f"{mtime.year}-{mtime.month}-{mtime.day} " 

1578 f"{mtime.hour}:{mtime.minute:02d}:{mtime.second}" 

1579 ), 

1580 "Wall: {}".format(walltime), 

1581 "Size: {}".format(sizeof_fmt(self.size())), 

1582 ] 

1583 if self.veto.permanently_vetoed(): 

1584 status = "EXISTS, VETOED" 

1585 elif self.is_obsolete(ancestors=False): 

1586 status = "OBSOLETE" 

1587 if obscheck is not None: 

1588 obscheck[self] = True # if obsolete, so are descendants 

1589 elif self.is_obsolete(obscheck): 

1590 # if self.lock.is_locked: 

1591 # status = "OBSOLETE, LOCKED" 

1592 # else: 

1593 status = "ANCESTOR OBSOLETE" 

1594 try: 

1595 newer = [f.__name__ for f in self.DEPENDENCIES 

1596 if f(self).modtime(ts=0) > self.modtime()] 

1597 if newer: 

1598 stats_list.append("Newer deps: "+", ".join(newer)) 

1599 except TypeError: 

1600 pass 

1601 else: 

1602 if self.lock.is_locked: 

1603 status = "EXISTS, LOCKED" 

1604 else: 

1605 status = "EXISTS" 

1606 elif self.intent.in_progress(): 

1607 status = "REGENERATING" 

1608 elif self.veto.permanently_vetoed(): 

1609 status = "VETOED" 

1610 veto_dict = self.veto.read_json() 

1611 veto_func = veto_dict['veto_function'] 

1612 veto_msg = veto_dict['veto_message'] 

1613 stats_list += [ 

1614 "Veto: {}".format( 

1615 veto_func.replace("<", "").replace(">", "") 

1616 ), 

1617 f"Why: {veto_msg}", 

1618 ] 

1619 else: 

1620 vetoed_ancestors = [fh for fh in self.UR_DEPENDENCIES 

1621 if fh(self).veto.permanently_vetoed()] 

1622 if self.intent.in_progress(): 

1623 status = "GENERATING" 

1624 elif self.cooldown.in_progress(): 

1625 status = "FAILED, COOLING DOWN" 

1626 elif self.cooldown.exists() or self.intent.exists(): 

1627 status = "FAILED" 

1628 elif isinstance(self, EventTriggeredFileHandler): 

1629 status = "AWAITS EXTERNAL TRIGGER" 

1630 elif self.are_dependencies_met(): 

1631 status = "READY TO GENERATE" 

1632 elif vetoed_ancestors: 

1633 status = "ANCESTOR VETOED" 

1634 veto_desc = ', '.join(fh.__name__ for fh in vetoed_ancestors) 

1635 stats_list += [ 

1636 "Vetoed: {}".format(veto_desc), 

1637 ] 

1638 elif any(f(self).cooldown.in_progress() 

1639 for f in self.UR_DEPENDENCIES): 

1640 status = "ANCESTOR COOLING DOWN" 

1641 elif any(f(self).cooldown.exists() 

1642 for f in self.UR_DEPENDENCIES): 

1643 status = "ANCESTOR FAILED" 

1644 elif self.are_ur_dependencies_met(): 

1645 status = "ANCESTORS EXIST" 

1646 elif [f for f in self.UR_DEPENDENCIES 

1647 if (issubclass(f, EventTriggeredFileHandler) and 

1648 not f(self).exists())]: 

1649 status = "ANCESTOR AWAITS TRIGGER" 

1650 else: 

1651 status = "NOT YET MADE" 

1652 return Status(status, stats_list, NODELEGEND_COLORS[status]) 

1653 

1654 @classmethod 

1655 def sync_shared_manifests(cls, *filehandlers): 

1656 """ 

1657 When implementing a ``FileHandler`` with its own subclasses in 

1658 its manifest (the easiest pattern for ensuring that the ``FileHandler`` 

1659 classes in the manifest share a common ``generate`` method and 

1660 ``DEPENDENCIES``), use this decorator before each class definition to 

1661 make sure that the base ``FileHandler`` (with the shared ``generate`` 

1662 function) as well as its subclasses (which distinguish between its 

1663 outputs by each having their own ``FILENAME`` attributes). 

1664  

1665 Parameters 

1666 ---------- 

1667 *filehandlers : FileHandler or str 

1668 ``FileHandler`` subclasses that are generated together. 

1669 

1670 Returns 

1671 ------- 

1672 first_filehandler 

1673 The first ``FileHandler`` subclass from ``filehandlers`` (this 

1674 allows the function to operate as a class decorator). 

1675 

1676 Raises 

1677 ------ 

1678 ValueError 

1679 If provided ``filehandlers`` are not subclasses of this class or if 

1680 no ``filehandlers`` are provided. 

1681 TypeError 

1682 If provided ``filehandlers`` do not share the same ``DEPENDENCIES`` 

1683 and ``_generate`` methods as this class, or if any of them lack a 

1684 ``FILENAME`` attribute. 

1685 """ 

1686 if not filehandlers: 

1687 raise ValueError("No ``FileHandlers`` provided to sync.") 

1688 manifest = tuple(set(filehandlers).union(cls.MANIFEST_TYPES or ())) 

1689 for synced_cls in manifest + (cls,): 

1690 if not isinstance(cls, type) or not issubclass(synced_cls, cls): 

1691 raise ValueError(f"Must specify a subclass of {cls}, " 

1692 f"not {synced_cls}") 

1693 setattr(synced_cls, "MANIFEST_TYPES", manifest) 

1694 return filehandlers[0] 

1695 

1696 

1697# The following class is abstract, so we shouldn't need to define abstract 

1698# methods. 

1699# pylint: disable=abstract-method 

1700class EventTriggeredFileHandler(FileHandler): 

1701 """ 

1702 A data file that is downloaded or received as the result of an event 

1703 notification from some external source, e.g. a post request or a GCN 

1704 notification. The generate() method should be able to take any arguments 

1705 necessary for the creation of this file, since this method is likely to be 

1706 invoked by an event-handler script. These ``FileHandler`` classes should 

1707 not have ``DEPENDENCIES`` because they are created by events external to 

1708 the dependency graph. 

1709 """ 

1710 

1711 DEPENDENCIES = tuple() 

1712 

1713 

1714class TriggerList(FileHandler): 

1715 """ 

1716 A file that contains lists of triggers of some sort. 

1717 """ 

1718 

1719 DETECTORS = None 

1720 

1721 _REQUIRED = ("DETECTORS",) 

1722 

1723 @abstractproperty 

1724 def num_triggers(self): 

1725 """ 

1726 The number of triggers described by this file. Useful mostly for 

1727 quickly determining if this trigger list is empty. 

1728 """ 

1729 

1730 

1731# The following class is abstract, so we shouldn't need to define abstract 

1732# methods. 

1733# pylint: disable=abstract-method 

1734class JSONFile(FileHandler): 

1735 """ 

1736 A FileHandler abstract subclass providing tools to work with JSON 

1737 dictionaries. 

1738 """ 

1739 

1740 def checksum(self, fields=None): 

1741 """ 

1742 Get the sha256 checksum of this file's contents. Use this to version 

1743 files and check whether their contents have changed. Optionally only 

1744 use specific fields in generating the checksum to ignore irrelevant 

1745 changes (e.g. when determining file obsolescence). 

1746 

1747 Parameters 

1748 ---------- 

1749 fields : tuple or list, optional 

1750 A tuple of tuples of strings, with each sub-tuple containing 

1751 strings or integers indexing into this JSON-file's contents 

1752 (strings for ``dict`` fields, integers for ``list`` fields) to 

1753 specify only relevant fields. See example below. 

1754 kwargs : dict 

1755 Remaining keyword arguments are ignored (see 

1756 ``FileHandler.checksum`` note on ``kwargs``). 

1757 

1758 Raises 

1759 ------ 

1760 KeyError 

1761 If a field that is expected in a sub-dictionary is not found. 

1762 IndexError 

1763 If an entry that is expected in a sub-list is not found. 

1764 TypeError 

1765 If you attempt to index into a sub field that is not a dictionary 

1766 using a string or if ``fields`` cannot be indexed into. 

1767 

1768 Examples 

1769 -------- 

1770 For a JSON dictionary like: 

1771 >>> foo = { 

1772 ... "names": ["stef", "countryman"], 

1773 ... "age": 27, 

1774 ... "eyes": { 

1775 ... "left": "brown", 

1776 ... "right": "brown", 

1777 ... } 

1778 ... } 

1779 

1780 You can calculate the checksum using *only* the first name and left 

1781 eye-color values by using these ``fields``: 

1782 >>> fields = ( 

1783 ... ('names', 0), 

1784 ... ('eyes', 'left') 

1785 ... ) 

1786 """ 

1787 if fields is None: 

1788 return super().checksum() 

1789 if fields: 

1790 fields[0] # make sure this can be indexed into 

1791 data = self.read_json() 

1792 compare = {'fields': fields, 'values': list()} 

1793 for field in fields: 

1794 value = data 

1795 for key in field: 

1796 value = value[key] 

1797 compare['values'].append(value) 

1798 assert (len(compare['fields']) == len(compare['values'])) 

1799 return sha256(json.dumps(compare, allow_nan=True, indent=4, 

1800 sort_keys=True).encode()).hexdigest() 

1801 

1802 

1803 def _write_json(self, outdict): 

1804 """ 

1805 Write this dictionary to a JSON file in a consistent and 

1806 deterministic format during file generation. 

1807 """ 

1808 with open(self.fullpath, 'w') as outfile: 

1809 json.dump(outdict, outfile, indent=4, sort_keys=True) 

1810 

1811 def read_json(self): 

1812 """ 

1813 Read in this JSON file as a dictionary. 

1814 """ 

1815 with self.open() as jsonfile: 

1816 return json.load(jsonfile) 

1817 

1818 @property 

1819 def html_table(self): 

1820 """ 

1821 Generate an HTML table representation of the JSON data 

1822 contained in this ``FileHandler``. 

1823 """ 

1824 from json2html import json2html 

1825 return json2html.convert(json=self.read_json(), table_attributes="") 

1826 

1827 

1828class GenerateOnceMixin(object): 

1829 """ 

1830 An object that, once generated, never becomes obsolete; must be manually 

1831 regenerated. 

1832 """ 

1833 

1834 @recursive_obsolescence 

1835 def is_obsolete(self, checked=None, **kwargs): 

1836 """ 

1837 Because this class inherits from GenerateOnceMixin, it is never 

1838 marked obsolete automatically; it must be manually regenerated. This 

1839 function always returns False. See ``FileHandler.is_obsolete`` for the 

1840 meaning of ``checked``. 

1841 """ 

1842 return False