Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# (c) Stefan Countryman 2016-2018 

2 

3""" 

4Utilities for working with a Run Directory, i.e. a directory containing 

5multiple subdirectories, one per event. Includes tools for automatically 

6finding events that can be updated and keeping them updated. Useful for running 

7a pipeline or batch job. 

8""" 

9 

10import re 

11import json 

12from textwrap import dedent 

13from datetime import datetime 

14import logging 

15import os 

16from argparse import Action, Namespace 

17from os.path import isfile, join 

18from collections import namedtuple 

19from fnmatch import fnmatch 

20from glob import glob 

21from llama.classes import ( 

22 ImmutableDict, 

23 IntentException, 

24 CoolDownException, 

25) 

26from llama.pipeline import DEFAULT_PIPELINE 

27from llama.event import Event 

28from llama.utils import ( 

29 archive_figs, 

30 PAST_RUNS_DIR, 

31 DEFAULT_RUN_DIR, 

32 COLOR, 

33 GenerationError, 

34) 

35from llama.vetoes import VetoException 

36from llama.cli import CliParser 

37from llama.pipeline import Parsers as PipeParsers 

38 

39LOGGER = logging.getLogger(__name__) 

40DEFAULT_EVENT_GLOB = '*' 

41SORTKEYS = ImmutableDict({ 

42 'mtime': lambda e: e.modification_time(), 

43 'ctime': lambda e: e.change_time(), 

44 'v0time': lambda e: e.v0_time(), 

45 'lvk': lambda e: re.sub('([a-zA-Z]+[0-9]+)([a-zA-Z]+)', 

46 lambda m: f'{m.group(1)}{m.group(2):0>4}', 

47 e.eventid), 

48}) 

49 

50 

51def downselect_events(events, **kwargs): 

52 """Take a list of events and downselect them using the checks described in 

53 ``Run.downselect``.""" 

54 # start with the full event list and only add each item to the results 

55 # if it matches all given criteria. 

56 results = list() 

57 # a bunch of checks for the file handler and the query arg 

58 checks = { 

59 'eventid_filter': lambda e, q: fnmatch(e.eventid, q), 

60 'fileexists': lambda e, q: isfile(join(e.eventdir, q)), 

61 'fhexists': lambda e, q: q(e).exists(), 

62 'fhnameexists': lambda e, q: e.files[q].exists(), 

63 'fhmeta': lambda e, q: any(isfile(f) for f in q(e).meta.fullpaths), 

64 'fhnamemeta': lambda e, q: any(isfile(f) 

65 for f in e.files[q].meta.fullpaths), 

66 'vetoed': lambda e, q: (e.flags['VETOED'] == 'true') == q, 

67 'manual': lambda e, q: (e.flags['MANUAL'] == 'true') == q, 

68 'v0before': lambda e, q: (e.v0_time() or q+1) < q, 

69 'v0after': lambda e, q: (e.v0_time() or q-1) > q, 

70 'modbefore': lambda e, q: e.modification_time() < q, 

71 } 

72 checks['modafter'] = lambda e, q: not checks['modbefore'](e, q) 

73 # timestamp gets set when downselection applied 

74 nowts = datetime.now().timestamp() 

75 checks['sec_since_mod_gt'] = lambda e, q: checks['modbefore'](e, nowts - q) 

76 checks['sec_since_mod_lt'] = lambda e, q: checks['modafter'](e, nowts - q) 

77 checks['sec_since_v0_gt'] = lambda e, q: checks['v0before'](e, nowts - q) 

78 checks['sec_since_v0_lt'] = lambda e, q: checks['v0after'](e, nowts - q) 

79 invert = kwargs.pop('invert', False) 

80 sortkey = kwargs.pop('sortkey', None) 

81 limit = kwargs.pop('limit', None) 

82 reverse = kwargs.pop('reverse', False) 

83 for event in events: 

84 for qkey in kwargs: 

85 if not checks[qkey](event, kwargs[qkey]): 

86 # if inverted, failing any check means you get added 

87 if invert: 

88 results.append(event) 

89 break 

90 else: 

91 # if all checks pass and we are not inverting results, add the 

92 # file handler 

93 if not invert: 

94 results.append(event) 

95 if sortkey is not None: 

96 results = sorted(results, key=sortkey, reverse=reverse) 

97 return results[:limit] 

98 

99 

100def past_runs(paths=(PAST_RUNS_DIR,), pipeline=DEFAULT_PIPELINE): 

101 """Get a dictionary of run names and corresponding ``Run`` 

102 instances, looking for run directories in the specified paths. 

103 

104 Parameters 

105 ---------- 

106 paths : tuple, optional 

107 Directories in which to search for past run directories. 

108 pipeline : llama.pipeline.Pipeline 

109 The pipeline to use for the returned ``Run`` instances in ``runs``. 

110 

111 Returns 

112 ------- 

113 runs : dict 

114 A dictionary of past runs whose keys are the name of the rundir and 

115 whose values are the corresponding ``Run``. Absolute paths are used as 

116 the keys. 

117 """ 

118 rundirs = [os.path.split(d)[0] 

119 for p in paths 

120 for d in glob(os.path.join(p, '*/'))] 

121 return {d: Run(rundir=d, pipeline=pipeline) for d in rundirs} 

122 

123 

124RunTuple = namedtuple("RunTuple", ("rundir", "pipeline", "downselection")) 

125 

126 

127class Run(RunTuple): 

128 """ 

129 A single directory containing multiple event directories combined with a 

130 pipeline (i.e. a selection of analysis steps to use) and a set of 

131 downselection criteria for picking events: 

132 

133 Run Directory 

134 ├─ Event Directory 1 

135 ├─ Event Directory 2 

136 └─ Event Directory 3 

137 

138 This should ordinarily correspond to a run of some sort (an observing run, 

139 engineering run, offline run, test run, etc.) where the events are somehow 

140 related. Since this class mostly just provides methods for organizing and 

141 selecting ``Event`` instances with tailored ``Pipeline`` instances, it's up 

142 to you to decide how to best organize a run. Run objects are immutable to 

143 simplify hashing and uniqueness checks. 

144 

145 These tools allow the user to conveniently check on the status of all 

146 events in a given ``Run``. A dictionary of downselection arguments (as 

147 fed to ``downselect``) can be used to restrict the set of events that will 

148 be returned ``events``. 

149 

150 Parameters 

151 ---------- 

152 rundir : str 

153 The directory where all events are stored. Files for individual 

154 events are stored in per-event subdirectories of ``rundir``. Will be 

155 converted to a canonical path with ``os.path.realpath`` to help ensure 

156 unique ``Run`` definitions. 

157 pipeline : llama.pipeline.Pipeline, optional 

158 A ``Pipeline`` instance holding ``FileHandler`` classes that should 

159 be used for this analysis. Defaults to the main pipeline in 

160 production use. 

161 downselection : tuple, optional 

162 A tuple of dictionaries of keyword arguments of the type passed to 

163 ``downselect``. The events returned by ``events`` will match these 

164 downselection criteria with each downselection dict applied in the 

165 order they appear in this argument (to allow more complex chained 

166 downselections). You probably don't want to manually specify this; 

167 a more pythonic way to provide downselection arguments is to use 

168 the ``downselect`` method to return a downselection from a starting 

169 ``Run``. 

170 """ 

171 

172 def __new__(cls, rundir=DEFAULT_RUN_DIR, pipeline=DEFAULT_PIPELINE, 

173 downselection=tuple()): 

174 return RunTuple.__new__(cls, os.path.realpath(rundir), pipeline, 

175 downselection) 

176 

177 @property 

178 def events(self): 

179 """Return a list of events in this run directory with 

180 ``self.downselection`` criteria applied (see ``downselect`` for a list 

181 of possible downselection criteria). 

182 

183 Parameters 

184 ---------- 

185 sortkey : function, optional 

186 A sorting key (as passed to ``sorted``) to use to sort the returned 

187 events. If none is provided, the events will be sorted based on 

188 astrophysical event time using ``Event.gpstime``; beware that an 

189 error will be raised if this quantity is ill-defined for ANY of the 

190 returned events. 

191 reverse : bool, optional 

192 Whether to reverse the default sort order, i.e. put in descending 

193 order. ``True`` by default so that the most-recently-occuring 

194 events are first in the list. 

195 """ 

196 events = [Event.fromdir(p, pipeline=self.pipeline) 

197 for p in glob(join(self.rundir, '*', ''))] 

198 for downselect_args in self.downselection: 

199 events = downselect_events(events, **downselect_args) 

200 return events 

201 

202 def update(self, **downselect): 

203 """ 

204 Get a list of ``Event`` instances matching this ``Run`` instance's 

205 downselection criteria and update each event directory. Run until all 

206 events are up-to-date. Will queue files from each event that are ready 

207 to update, allowing them to be handled in parallel, and will track 

208 outstanding jobs. Optionally specify a ``FileGraph.downselect`` 

209 downselection argument to pass to each ``FileGraph`` being updated 

210 (default is to regenerate all files needing regeneration). Be careful 

211 with this argument, as it will cause file generation attempts for 

212 matching files without checking whether they need to be generated. 

213 """ 

214 running_files: List[Tuple[FileHandler, Future]] = [] 

215 processed_files: List[FileHandler] = [] 

216 while True: 

217 for _i in range(len(running_files)): 

218 filehandler, future = running_files.pop(0) 

219 if future.running(): 

220 running_files.append((filehandler, future)) 

221 continue 

222 try: 

223 res = future.result() 

224 if filehandler != res: 

225 raise RuntimeError("Got the wrong FileHandler back " 

226 "from our executor. Expected %s, " 

227 "got %s", filehandler, res) 

228 LOGGER.info('%sFinished generating %s%s', COLOR.GREEN, 

229 filehandler, COLOR.CLEAR) 

230 except (IntentException, CoolDownException): 

231 pass 

232 except GenerationError as err: 

233 LOGGER.warning('%sGenerationError while generating %s: ' 

234 '%s%s', COLOR.YELLOW, filehandler, err, 

235 COLOR.CLEAR) 

236 except VetoException as err: 

237 LOGGER.info('%sVetoed %s%s', 

238 COLOR.GREEN, filehandler, COLOR.CLEAR) 

239 processed_files.append(filehandler) 

240 for event in self.events: 

241 new_files = event.files.downselect( 

242 invert=True, 

243 equals=processed_files+[fh for fh, f in running_files], 

244 ) 

245 running_files += new_files.update(**downselect) 

246 if not running_files: 

247 break 

248 if processed_files: 

249 LOGGER.info('%sRun.update complete. Processed files: %s%s', 

250 COLOR.GREEN, processed_files, COLOR.CLEAR) 

251 return processed_files 

252 

253 def downselect(self, **kwargs): 

254 """ 

255 Get another ``Run`` instance identical to the current one but 

256 with the following downselection criteria applied to the ``Event`` 

257 instances returned by ``self.events``. Can also specify a sorting 

258 function and a maximum number of returned values: 

259 

260 Parameters 

261 ---------- 

262 invert : bool, optional 

263 Invert what matches and what doesn't. Default: False 

264 eventid_filter : str, optional 

265 A glob (as taken by ``fnmatch``) that the ``eventid`` must match. 

266 fileexists : str, optional 

267 The event directory contains a file with this name. 

268 fhexists : llama.filehandler.FileHandler, optional 

269 The eventdir contains the file for this FileHandler. 

270 fhnameexists : str, optional 

271 The eventdir contains the file for the FileHandler with this name. 

272 fhmeta : llama.filehandler.FileHandler, optional 

273 The eventdir contains a metadata rider for the file for this 

274 FileHandler. 

275 fhnamemeta : str, optional 

276 The eventdir contains a metadata rider for the file for the 

277 FileHandler with this name. 

278 vetoed : bool, optional 

279 Whether the events have been vetoed by the VETOED flag or not. 

280 manual : bool, optional 

281 Whether the events have been marked as manual by the MANUAL flag. 

282 modbefore : float, optional 

283 Select events whose directory modtimes were before this timestamp. 

284 modafter : float, optional 

285 Select events whose directory modtimes were after this timestamp. 

286 sec_since_mod_gt : float, optional 

287 Select events whose directory modtimes are more than this many 

288 seconds ago. 

289 sec_since_mod_lt : float, optional 

290 Select events whose directory modtimes are less than this many 

291 seconds ago. 

292 v0before : float, optional 

293 Select events whose first event state version was generated before 

294 this timestamp. Will **IGNORE** directories that do not have any 

295 versioned files. 

296 v0after : float, optional 

297 Select events whose first event state version was generated after 

298 this timestamp. Will **IGNORE** directories that do not have any 

299 versioned files. 

300 sec_since_v0_gt : float, optional 

301 Select events whose first event state version was generated more 

302 than this many seconds ago. Will **IGNORE** directories that do not 

303 have any versioned files. 

304 sec_since_v0_lt : float, optional 

305 Select events whose first event state version was generated less 

306 than this many seconds ago. Will **IGNORE** directories that do not 

307 have any versioned files. 

308 sortkey : function, optional 

309 A function taking ``Event`` instances that can be passed to 

310 ``sorted`` to sort the downselected ``Event`` instances. Default: 

311 None (i.e. no sorting) 

312 reverse : bool, optional 

313 Whether to reverse the order of sorting (i.e. put the results in 

314 descending order) before applying ``limit``. Default: False 

315 limit : int, optional 

316 Return up to this number of events. Most useful if ``sortkey`` has 

317 also been provided. Default: None (i.e. no limit) 

318 """ 

319 if kwargs: 

320 return type(self)( 

321 rundir=self.rundir, 

322 pipeline=self.pipeline, 

323 downselection=self.downselection + (ImmutableDict(kwargs),) 

324 ) 

325 return self 

326 

327 def downselect_pipeline(self, invert=False, **kwargs): 

328 """Return a ``Run`` instance with a pipeline that has been downselected 

329 using ``Pipeline.downselect``.""" 

330 kwargs['invert'] = invert 

331 return type(self)(rundir=self.rundir, 

332 pipeline=self.pipeline.downselect(**kwargs), 

333 downselection=self.downselection) 

334 

335 @property 

336 def vis(self): 

337 """A collection of visualization methods for this ``Run`` instance.""" 

338 return RunVisualization(self) 

339 

340 def __str__(self): 

341 name = type(self).__name__ 

342 fmt = ('{}(rundir="{}",\n' + len(name)*' ' + ' pipeline="{}")') 

343 return fmt.format(name, self.rundir, self.pipeline) 

344 

345 def __repr__(self): 

346 return str(self) 

347 

348 

349class ParseRunsAction(Action): # pylint: disable=too-few-public-methods 

350 """ 

351 Take a bunch of pathnames and parse them into ``Run`` instances with 

352 associated ``eventid`` glob filters. See ``Parsers`` docstring for 

353 details. 

354 """ 

355 

356 _default_run = tuple() 

357 

358 def __call__(self, parser, namespace, values, option_string=None): 

359 if getattr(namespace, self.dest, None) is None: 

360 setattr(namespace, self.dest, list()) 

361 if not values: 

362 values = self._default_run 

363 if not isinstance(values, (list, tuple)): 

364 values = [values] 

365 for path in values: 

366 if '/' not in path: 

367 path = os.path.realpath(path) 

368 if path == '/': 

369 parser.error("You can't specify the root directory '/' " 

370 "as the ``Event`` directory to manipulate. " 

371 "``Event`` directories must always be " 

372 "subdirectories of a ``Run`` directory. Why " 

373 "would you want to do this anyway?") 

374 if re.findall(r'^/[^/]*$', path): 

375 path = '/' + path # if they are matching a subdir of '/' 

376 assert '/' in path 

377 splitpath = path.split('/') 

378 rundir = '/'.join(splitpath[:-1]) or DEFAULT_RUN_DIR 

379 eventidfilt = splitpath[-1] or DEFAULT_EVENT_GLOB 

380 getattr(namespace, self.dest).append( 

381 Run(rundir=rundir).downselect(eventid_filter=eventidfilt)) 

382 LOGGER.info("Selected run directories: %s", [r.rundir for r in 

383 namespace.run]) 

384 

385 

386def postprocess_select_pipeline(_self: CliParser, namespace: Namespace): 

387 """ 

388 Take the pipeline specified by ``--pipeline`` and/or ``--filehandlers`` and 

389 set the ``llama.Run`` instances selected in ``namespace`` to use that 

390 pipeline instead of the default. 

391 """ 

392 LOGGER.info("Pipeline specified: %s", namespace.pipeline) 

393 for i, run in enumerate(namespace.run): 

394 namespace.run[i] = Run(rundir=run.rundir, 

395 pipeline=namespace.pipeline, 

396 downselection=run.downselection) 

397 LOGGER.info("Selected runs: %s", namespace.run) 

398 

399 

400def postprocess_dry_run(_self: CliParser, namespace: Namespace): 

401 """ 

402 If ``--dry-run-dirs`` is true, print the directories that would be 

403 affected by the given arguments and quit without taking further action. If 

404 you want to extend this, print more dry run information and then call this 

405 function to print run/event information before quitting. 

406 """ 

407 if namespace.dry_run_dirs: 

408 print("DOWNSELECTIONS IN EFFECT:") 

409 for i, downselect in enumerate(namespace.run[0].downselection): 

410 print(f" - LAYER {i}:") 

411 for arg, value in downselect.items(): 

412 print(f" {arg}: {json.dumps(value)}") 

413 print("RUNS AFFECTED:") 

414 for run in namespace.run: 

415 print(run.rundir) 

416 for event in run.events: 

417 print(f" {os.path.basename(event.eventdir)}") 

418 exit() 

419 

420 

421def postprocess_downselect(_self: CliParser, namespace: Namespace): 

422 """ 

423 If ``namespace.downselect`` is not ``None``, parse it as a 

424 comma-separated list of ``key=value`` pairs, where ``value`` will be parsed 

425 as a boolean if it equals either ``True`` or ``False`` and as a string 

426 otherwise. Use these arguments to ``downselect`` each of the runs specified 

427 in ``namespace.run``. 

428 """ 

429 if namespace.downselect: 

430 kwargs = dict(p.split('=', 1) for p in namespace.downselect.split(',')) 

431 for key in kwargs: 

432 if kwargs[key] == 'True': 

433 kwargs[key] = True 

434 elif kwargs[key] == 'False': 

435 kwargs[key] = False 

436 else: 

437 try: 

438 kwargs[key] = int(kwargs[key]) 

439 except ValueError: 

440 try: 

441 kwargs[key] = float(kwargs[key]) 

442 except ValueError: 

443 pass 

444 runs = [r.downselect(**kwargs) for r in namespace.run] 

445 setattr(namespace, 'run', runs) 

446 

447 

448class PrintDownselectionsAction(Action): 

449 """ 

450 Print a dedented docstring for ``Run.downselect`` and exit. 

451 """ 

452 

453 def __call__(self, parser, namespace, values, option_string=None): 

454 print(dedent(Run.downselect.__doc__)) 

455 exit(0) 

456 

457 

458# pylint: disable=missing-docstring 

459class Parsers: 

460 __doc__ = dedent(f""" 

461 Specifying Directories 

462 ---------------------- 

463 

464 Each LLAMA trigger gets its own directory. The name of this directory 

465 is called the ``eventid`` and the trigger itself is a LLAMA ``Event`` 

466 (see: ``llama.event``). For a given LLAMA run, all event directories 

467 should go in a commond directory called a "run directory"; the 

468 collection of events is called a ``Run`` (see: ``llama.run``). Most 

469 things the pipeline does work on a single ``Run`` and are meant to 

470 affect one or more matching ``Event`` instances. When you specify 

471 directories, you are implicitly specifying the ``Run`` (i.e. collection 

472 of triggers) as well as a UNIX-style glob (like the asterisk matching 

473 all files, ``*``) which describes the ``eventid`` pattern you want to 

474 match. For example, matching all event IDs that start with "S" 

475 (corresponding to O3 LIGO/Virgo superevents) would require using ``S*`` 

476 as your event glob. 

477 

478 *If you want to explicitly print which currently-existing* ``Event`` 

479 *directories will be impacted by the arguments you provide, you can 

480 use* ``--dry-run-dirs`` *to print the impacted directories and exit 

481 without taking further action. This is good practice while getting used 

482 to this interface.* 

483 

484 The syntax for specifying the ``Run`` and ``Event`` glob is the path of 

485 the run directory followed by a slash followed by the event glob with 

486 **no slash at the end** (be sure to escape the ``*`` so the shell 

487 doesn't expand it): 

488 

489 .. code:: 

490 

491 '/run/directory/event*glob' 

492 

493 Specify **only** the event glob by leaving the run directory out but 

494 keeping the leading ``/`` (if for some insane reason your root 

495 directory is your run directory, a double-leading ``/`` will 

496 communicate your perverse desire). In this case the default ``Run`` 

497 directory ``{DEFAULT_RUN_DIR}`` is implied, so the following are 

498 equivalent: 

499 

500 .. code:: 

501 

502 '/event*glob' 

503 {DEFAULT_RUN_DIR}'event*glob' 

504 

505 Specify **only** the ``Run`` directory by leaving a trailing slash and 

506 omitting the event glob; in this case, the default event glob 

507 ``{DEFAULT_EVENT_GLOB}`` will be used, so the following are equivalent: 

508 

509 .. code:: 

510 

511 /run/directory/ 

512 /run/directory/'{DEFAULT_EVENT_GLOB}' 

513 

514 You can use relative paths for the ``Run`` directory, the final part of 

515 the path will **not** be expanded and will be treated as the base 

516 directory. The only exception to this is if you are using relative 

517 paths and don't put *any* ``/`` in the specified path, in which case 

518 the relative path will be expanded. This allows the common and 

519 intuitive behavior of running specific events in the current directory 

520 when you pass their name alone, or alternatively to treat the current 

521 directory as the only event directory by passing a single ``.`` as the 

522 run argument. Something like ``./.``, however, will be interpreted as 

523 meaning you want the *current* directory to be the run directory only 

524 matching ``Event`` ids of ``.``. 

525 

526 Specifying Directories: Examples 

527 -------------------------------- 

528 

529 The following examples assume you are currently in the event directory 

530 ``/some/directory/``. Let's say this is the event directory, and you 

531 want to update **only** the contents of this directory. You can specify 

532 the run as ``/some/`` and the event glob as ``directory`` with either 

533 of the following commands paths: 

534 

535 .. code:: 

536 

537 . 

538 /some/directory 

539 

540 Alternatively, if ``/some/directory/`` is a run directory, and you want 

541 to affect the event directories it contains that match the default 

542 event glob ``{DEFAULT_EVENT_GLOB}``, you can run use any of the 

543 following (note again that the event glob is in quotes to prevent your 

544 shell from expanding it into multiple arguments): 

545 

546 .. code:: 

547 

548 ./ 

549 ./'{DEFAULT_EVENT_GLOB}' 

550 /some/directory/ 

551 /some/directory/'{DEFAULT_EVENT_GLOB}' 

552 

553 If you want to use the name of the current directory as your event glob 

554 (so that only ``eventids`` that have the *same* basename as your 

555 current directory are used) while **keeping** the default run directory 

556 ``{DEFAULT_RUN_DIR}``, you would have to place a leading slash followed 

557 by the actual name of the run directory; as noted above, ``/.`` not 

558 work because the dot will be treated literally as the eventid you want 

559 to use. (Note that you usually wouldn't want to do this; why would you 

560 be in this directory if you want to operate on an event stored in a 

561 different run directory?): 

562 

563 .. code:: 

564 

565 /directory 

566 {DEFAULT_RUN_DIR}directory 

567 

568 You can further specify which types of events should be processed by 

569 specifying ``--downselect`` followed by a string to be passed 

570 as the arguments to ``Run.downselect`` (run ``--print-downselections`` 

571 to see possible options). 

572 

573 See ``llama.run`` and ``llama.event`` for more information on ``Run`` 

574 and ``Event`` objects. 

575 """).strip() 

576 

577 def __init__( 

578 self, 

579 downselect=None, 

580 run=(os.path.join(DEFAULT_RUN_DIR, DEFAULT_EVENT_GLOB),) 

581 ): 

582 """Create a new parser collection with the specified ``downselect`` 

583 arguments to be passed to parsed ``Run`` instances (no downselection if 

584 not provided) and a default list of ``run`` inputs (as typed at the 

585 command line, i.e. paths with UNIX globs for ``eventid``). 

586 """ 

587 self.downselect = downselect 

588 self.run = run 

589 

590 @property 

591 def eventfiltering(self): 

592 """A ``CliParser`` to be used for downselecting runs and events. 

593 """ 

594 

595 class ParseRunsWithDefaultAction(ParseRunsAction): 

596 

597 _default_run = self.run 

598 

599 eventfiltering = CliParser(add_help=False, epilog=__doc__) 

600 fgroup = eventfiltering.add_argument_group( 

601 'filter runs and events (see: ``llama.run``)') 

602 arg = fgroup.add_argument 

603 arg('run', nargs="?", action=ParseRunsWithDefaultAction, help=f""" 

604 A pattern specifying a list of directories to update of the 

605 form ``/run/directory/event*glob``. See end of ``llama run -h`` 

606 documentation for details. (default: 

607 ``{self.run[0] or DEFAULT_RUN_DIR+DEFAULT_EVENT_GLOB}``""") 

608 arg('--dry-run-dirs', action='store_true', help=""" 

609 Print the runs and event directories that would be affected and 

610 exit without taking further action.""") 

611 arg('--downselect', default=self.downselect, help=f""" 

612 Arguments to pass to the ``downselect`` method of runs selected 

613 by the ``run`` argument (note that ``eventid_filter`` is 

614 already implicitly set by the glob pattern specified in 

615 ``run``). Arguments will only be parsed as booleans (if they 

616 equal "True" or "False"), ints (if they can be parsed as such), 

617 floats (if they can be parsed as such), or strings and should 

618 be separated by commas, e.g. ``--downselect 

619 manual=False,fhnameexists=PAstro``. 

620 Omit a list of downselections or provide an empty string to 

621 specify no further downselections beyond the one implied by the 

622 ``run`` argument. (default: {self.downselect})""") 

623 arg('--print-downselections', action=PrintDownselectionsAction, 

624 nargs=0, help="Print available downselections.") 

625 eventfiltering.POSTPROCESSORS = ( 

626 postprocess_downselect, 

627 postprocess_dry_run, 

628 ) 

629 return eventfiltering 

630 

631 @property 

632 def pipeline_and_eventfiltering(self): 

633 """ 

634 Get a combination of ``llama.pipeline.Parsers.pipeline`` and 

635 ``llama.run.Parsers.eventfiltering`` processors in the correct order 

636 and includes the extra step of using the pipeline specified in the 

637 first parser in the ``Run`` instances returned by the second parser. 

638 """ 

639 parser = CliParser( 

640 add_help=False, 

641 prefix_chars="-+", 

642 parents=( 

643 PipeParsers.pipeline, 

644 self.eventfiltering, 

645 ), 

646 ) 

647 parser.POSTPROCESSORS += (postprocess_select_pipeline,) 

648 return parser 

649 

650 

651class RunVisualization: 

652 """Provide methods for visualizing the status of a run directory.""" 

653 

654 def __init__(self, run): 

655 """Create visualizations for a certain ``Run`` instance.""" 

656 self._run = run 

657 

658 def wall_times(self, outfile=None): 

659 """Create histograms of wall times (i.e. how long each file took to 

660 generate) for each FileHandler in this ``Run`` instance. 

661 

662 Parameters 

663 ---------- 

664 outfile : str, optional 

665 If provided, save all plots as PNG files to a gzipped tarfile with 

666 this filename. 

667 

668 Returns 

669 ------- 

670 plots : dict 

671 A dictionary of ``matplotlib.figure`` instances whose keys are the 

672 names of each ``FileHandler`` class and whose values are histograms 

673 of wall times for each ``FileHandler`` class. 

674 """ 

675 import matplotlib.pyplot as plt 

676 plots = dict() 

677 for fhname in self._run.pipeline.file_handlers: 

678 times = list() 

679 for event in self._run.downselect(fhnamemeta=fhname): 

680 try: 

681 times.append(event.files[fhname].wall_time) 

682 except KeyError: 

683 pass 

684 fig = plt.figure() 

685 plt.hist(times) 

686 plt.gca().set_title("{} Wall Times".format(fhname)) 

687 plt.gca().set_xlabel("Time to Generate File [s]") 

688 plt.gca().set_ylabel("Count") 

689 plots[fhname] = fig 

690 if outfile is not None: 

691 plotlist = [plots[k] for k in plots] 

692 archive_figs(plotlist, outfile, exts=['png'], 

693 fname_list=plots.keys()) 

694 return plots 

695 

696 def finished(self, outfile=None): 

697 """Create a bar plot showing the proportion of complete to incomplete 

698 files for each FileHandler in this ``Run`` instance. 

699 

700 Parameters 

701 ---------- 

702 outfile : str, optional 

703 If provided, save the plot to this filename. 

704 

705 Returns 

706 ------- 

707 fig : matplotlib.figure 

708 The bar plot figure. 

709 """ 

710 import matplotlib.pyplot as plt 

711 fhnames = self._run.pipeline.file_handlers.keys() 

712 ind = range(len(fhnames)) 

713 ndirs = len(self._run.events()) 

714 done = [len(self._run.downselect(fhnameexists=n)) for n in fhnames] 

715 notdone = [ndirs - d for d in done] 

716 fig = plt.figure() 

717 # matplotlib.org/gallery/lines_bars_and_markers/bar_stacked.html 

718 pdone = plt.bar(ind, done) 

719 pnotdone = plt.bar(ind, notdone, bottom=done) 

720 plt.title('Files finished generating (by FileHandler)') 

721 plt.xlabel('FileHandler name') 

722 plt.ylabel('Number of files') 

723 plt.xticks(ind, fhnames, rotation='vertical') 

724 plt.legend((pdone[0], pnotdone[0]), ('File exists', 'File not made')) 

725 for tick in plt.gca().xaxis.get_major_ticks(): 

726 tick.label.set_fontsize(8) 

727 plt.tight_layout() 

728 if outfile is not None: 

729 fig.savefig(outfile) 

730 return fig