Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2# (c) Stefan Countryman, 2018 

3 

4""" 

5Analysis ``Pipeline`` instances are Directed Acyclic Graphs (DAGs) 

6specifying sets of ``FileHandler`` classes that together form an analysis 

7pipeline. Use ``Pipeline`` instances to specify exactly which analysis steps 

8you want to run; select a subset of the maximal ``DEFAULT_PIPELINE`` if you 

9only need certain analysis outputs. 

10""" 

11 

12import logging 

13import inspect 

14import types 

15from operator import xor, is_ 

16from argparse import Action, Namespace 

17import llama.files 

18from llama.utils import ( 

19 vecstr, 

20 DOTFMT, 

21 EDGEFMT, 

22 plot_graphviz, 

23) 

24from llama.classes import ( 

25 ImmutableDict, 

26 NamespaceMappable, 

27 FileHandlerTuple, 

28) 

29from llama.filehandler import ( 

30 FileGraph, 

31) 

32from llama.files import ( 

33 CoincSignificanceSubthresholdI3Lvc, 

34) 

35from llama.cli import CliParser 

36 

37LOGGER = logging.getLogger(__name__) 

38 

39 

40class Pipeline(ImmutableDict, NamespaceMappable): 

41 """ 

42 A pipeline specifies a specific set of data inputs and the functions 

43 that act on them in terms of intermediate data products and the functions 

44 used to generate them in a Directed Acyclic Graph (DAG); these products are 

45 bundled into FileHandlers. FileHandlers are graph nodes with 

46 ``DEPENDENCIES`` (edges) specified. A Pipeline DAG can be built purely by 

47 specifying the specific FileHandlers which can be done trivially and 

48 clearly at the file-system level by putting the FileHandler code into a 

49 single directory for each pipeline. 

50 

51 Parameters 

52 ---------- 

53 kwargs : dict 

54 Names of ``FileHandler`` classes mapped to the classes themselves. 

55 args : array-like 

56 ``FileHandler`` classes. The ``__name__`` of each ``FileHandler`` will 

57 be used as the key. 

58 

59 Returns 

60 ------- 

61 pipeline : Pipeline 

62 A new ``Pipeline`` instance containing all of the ``FileHandler`` 

63 classes specified in ``args`` and ``kwargs``. 

64 

65 Raises 

66 ------ 

67 TypeError 

68 If there are any name collisions between classes in the input ``args`` 

69 and ``kwargs``, if any of the ``FileHandler`` classes it contains are 

70 abstract (non-implemented) classes, or if any of the ``FileHandler`` 

71 classes it contains have missing ``required_attributes``. 

72 """ 

73 

74 def __new__(cls, *args, **kwargs): 

75 """See ``Pipeline`` class docstring for details.""" 

76 argdict = {fh.__name__: fh for fh in args} 

77 if set(kwargs).intersection(argdict): 

78 raise TypeError("Conflicting ``FileHandler`` class names given in " 

79 f"``kwargs`` ({kwargs}) and ``args`` ({args}).") 

80 argdict.update(kwargs) 

81 for fhcls in argdict.values(): 

82 if inspect.isabstract(fhcls): 

83 raise TypeError("Abstract classes are prohibited in " 

84 f"``Pipeline`` instances. Abstract: {fhcls}") 

85 for required in fhcls.required_attributes(): 

86 if getattr(fhcls, required, None) is None: 

87 raise TypeError("Must define class attribute " 

88 f"``{required}`` for ``{fhcls}`` ") 

89 return ImmutableDict.__new__(cls, argdict) 

90 

91 def __hash__(self): 

92 """Hash is based on module and class names used.""" 

93 return hash(tuple(sorted(v.__module__ + '.' + v.__name__ 

94 for v in self.values()))) 

95 

96 def __eq__(self, other): 

97 return hash(self) == hash(other) 

98 

99 @classmethod 

100 def from_module(cls, module): 

101 """Create a pipeline by extracting all FileHandler objects from a given 

102 submodule.""" 

103 if isinstance(module, types.ModuleType): 

104 possible_file_handlers = vars(module) 

105 else: 

106 raise ValueError("Argument must be a module.") 

107 file_handlers = dict() 

108 for key in possible_file_handlers.keys(): 

109 # make sure we are only adding FileHandler subclasses 

110 if isinstance(possible_file_handlers[key], type): 

111 if issubclass(possible_file_handlers[key], FileHandlerTuple): 

112 # instantiate a FileHandler corresponding to this event 

113 file_handlers[key] = possible_file_handlers[key] 

114 return cls(**file_handlers) 

115 

116 def file_handler_instances(self, *args, **kwargs): 

117 """Return a FileHandlerMap with FileHandler instances sharing the same 

118 initialization arguments, e.g. for FileHandler instances that all refer 

119 to the same event.""" 

120 instances = dict() 

121 for key in self: 

122 instances[key] = self[key](*args, **kwargs) 

123 return FileGraph(instances) 

124 

125 def downselect(self, invert=False, reducer=all, **kwargs): 

126 """Return a ``Pipeline`` instance whose ``FileHandler`` classes match 

127 ALL the given query parameters. 

128 

129 Parameters 

130 ---------- 

131 invert : bool, optional 

132 Invert results. (Default: False) 

133 reducer : function, optional 

134 Specify ``any`` builtin to match if any check passes. Specify 

135 ``all`` to match only when every check passes. (Default: ``all``) 

136 type : type, optional 

137 The type of the ``FileHandler`` must exactly match the given 

138 ``FileHandler``. 

139 typename : str, optional 

140 The ``FileHandler`` type's name must match this string. 

141 subclass : type, optional 

142 The ``FileHandler`` must be a subclass of this ``FileHandler``. 

143 subgraph : type, optional 

144 The ``FileHandler`` must be either this ``FileHandler`` or one of 

145 its ``UR_DEPENDENCIES``; use this to make a ``Pipeline`` that only 

146 generates the subgraph leading to this ``FileHandler``. 

147 """ 

148 # a bunch of checks for the file handler and the query arg 

149 checks = { 

150 'type': is_, 

151 'typename': vecstr(lambda fh, q: fh.__name__ == q), 

152 'subclass': issubclass, 

153 'subgraph': lambda fh, q: fh is q or fh in q.UR_DEPENDENCIES, 

154 } 

155 return type(self)(**{ 

156 key: value for key, value in self.items() 

157 if xor( 

158 invert, 

159 reducer(checks[query_name](value, query_value) 

160 for query_name, query_value in kwargs.items()) 

161 ) 

162 }) 

163 

164 def check_consistency(self, other): 

165 """Check whether two ``Pipeline`` instances use the same keys to 

166 describe the same ``FileHandler`` classes, raising a ``ValueError`` if 

167 they don't.""" 

168 for key in self: 

169 if key in other: 

170 if self[key] != other[key]: 

171 raise ValueError(("Matching keys must correspond to same " 

172 "``FileHandler``. self: {} other: " 

173 "{}").format(self, other)) 

174 

175 def __add__(self, other): 

176 """Take the union of two pipelines. Raises a ``ValueError`` if the 

177 pipelines have matching keys corresponding to different ``FileHandler`` 

178 classes.""" 

179 self.check_consistency(other) 

180 fhs = dict() # make a mutable dict 

181 fhs.update(self) 

182 fhs.update(other) 

183 return type(self)(**fhs) # final result still immutable 

184 

185 def __sub__(self, other): 

186 """Remove ``FileHandler`` classes in ``other`` from ``self``. Raises a 

187 ``ValueError`` if the pipelines have matching keys corresponding to 

188 different ``FileHandler`` classes.""" 

189 self.check_consistency(other) 

190 fhs = dict() 

191 for fhkey in self: 

192 if fhkey not in other: 

193 fhs[fhkey] = self[fhkey] 

194 return type(self)(**fhs) 

195 

196 def __str__(self): 

197 return '{}(<{}>)'.format(type(self).__name__, sorted(self.keys())) 

198 

199 def dependency_graph(self, outfile: str = None, title: str = 'Pipeline', 

200 url: types.FunctionType = None, bgcolor: str = 'black'): 

201 """Return a graphviz .dot graph of ``DEPENDENCIES`` between file 

202 handlers in this pipeline. Optionally plot the graph to an output image 

203 file visualizing the graph. 

204 

205 Optional file extensions for outfile: 

206 

207 - *dot*: just save the dotfile in .dot format. 

208 - *png*: save the image in PNG format. 

209 - *pdf*: save the image in PDF format. 

210 - *svg*: save the image in svg format. 

211 

212 Parameters 

213 ---------- 

214 outfile : str, optional 

215 If not provided, return a string in ``.dot`` file format specifying 

216 graph relationsIf an output file is specified, infer the filetype 

217 and write to that file. 

218 title : str, optional 

219 The title of the pipeline graph plot. 

220 url : FunctionType, optional 

221 A function taking ``FileHandler`` classes as input and returning a 

222 URL that will be added to each ``FileHandler`` class's node in the 

223 output graph. Allows you to add links. If not included, URLs will 

224 not be included. 

225 bgcolor : str, optional 

226 The background color to use for the generated plot. 

227 

228 Returns 

229 ------- 

230 dot : str 

231 The dependency graph in ``.dot`` format (can be used as input to 

232 ``dot`` at the command line). This is returned regardless of 

233 whether an outfile is specified. 

234 """ 

235 # possible that some dependency nodes are not included in the default 

236 # pipeline; issue a warning in this case. 

237 allnodes = sorted({fh for base in self.values() 

238 for fh in [base] + list(base.DEPENDENCIES)}, 

239 key=lambda t: f"{t.__module__}.{t.__name__}") 

240 # https://www.graphviz.org/doc/info/shapes.html 

241 nodedot = ( 

242 f'"{fh.__module__}.{fh.__name__}" ' 

243 f'[label=<{{{{<B>{fh.__name__}</B>|<I>{fh.FILENAME}</I>|' 

244 f'{"<BR/>".join(b.__name__ for b in fh.__bases__)}}}}}>, ' 

245 f'shape="record", style=filled, target=_top, ' 

246 f'URL="{url(fh) if url is not None else ""}", ' 

247 'fillcolor="#cccccc"];' 

248 for fh in allnodes 

249 ) 

250 edgedot = ( 

251 EDGEFMT.format( 

252 depnum=f"{d.__module__}.{d.__name__}", 

253 num=f"{fh.__module__}.{fh.__name__}", 

254 color="red" 

255 ) 

256 for fh in self.values() for d in fh.DEPENDENCIES 

257 ) 

258 dot = DOTFMT.format(name=title, nodes='\n'.join(sorted(nodedot)), 

259 edges='\n'.join(sorted(edgedot)), bgcolor=bgcolor) 

260 if outfile is not None: 

261 plot_graphviz(dot, outfile) 

262 return dot 

263 

264 

265DEFAULT_PIPELINE = Pipeline.from_module(llama.files) 

266DEFAULT_PIPELINE.__doc__ = """ 

267The ``DEFAULT_PIPELINE`` contains the full set of currently-used 

268``FileHandler`` classes. You can downselect from this pipeline to create more 

269focused pipelines for specific tasks. 

270 

271``FileHandler`` Classes 

272~~~~~~~~~~~~~~~~~~~~~~~ 

273 

274{} 

275 

276""".format(''.join(f"- ``{fh}``\n" for fh in sorted(DEFAULT_PIPELINE))) 

277 

278# A pipeline for the post-O3a (mid-O3) subthreshold simulations. Only generates 

279# the subthreshold significance. 

280SUBTHRESHOLD_PIPELINE = DEFAULT_PIPELINE.downselect( 

281 subgraph=CoincSignificanceSubthresholdI3Lvc) 

282 

283# A pipeline for LLAMA 2.0 reviewers featuring the core of the analysis. 

284# FileHandlers that communicate with the outside world are omitted, as 

285# diagnostic or experimental FileHandlers. Takes a strict subset of the 

286# DEFAULT_PIPELINE. 

287LLAMA2_REVIEW_PIPELINE = DEFAULT_PIPELINE.downselect( 

288 typename=[ 

289 'LvcSkymapHdf5', 

290 'SkymapInfo', 

291 'LvcSkymapFits', 

292 'LVCGraceDbEventData', 

293 'IceCubeNeutrinoList', 

294 'LvcDistancesJson', 

295 'PAstro', 

296 # TODO update I3LvcGcnDraft to use output of 

297 # LLAMA 2.0 significance calculation. 

298 # 'I3LvcGcnDraft', 

299 'IceCubeNeutrinoListTxt', 

300 'IceCubeNeutrinoListCoincTxt', 

301 'IceCubeNeutrinoListTex', 

302 'CoincSignificanceI3Lvc', 

303 'CoincScatterI3LvcPdf', 

304 'CoincScatterI3LvcPng', 

305 'CoincSummaryI3LvcTex', 

306 ] 

307) 

308 

309 

310class PrintDefaultPipeline(Action): 

311 """Print DEFAULT_PIPELINE in a readable format and quit.""" 

312 

313 def __call__(self, parser, namespace, values, option_string=None): 

314 """Only called if flag is explicitly specified.""" 

315 print(DEFAULT_PIPELINE.__doc__.strip()) 

316 exit() 

317 

318 

319class FileHandlerSubgraphAction(Action): 

320 """ 

321 Downselect for a subgraph on the specified filehandlers, returning a 

322 ``llama.pipeline.Pipeline`` containing the specified filehandlers and all 

323 of their ancestors if the option string starts with ``+`` or just the 

324 specified filehandlers if the option_string starts with ``-``; combine 

325 results so that both types of flags can be used in the same command-line 

326 invocation. 

327 """ 

328 

329 def __call__(self, parser, namespace, values, option_string=None): 

330 """Take the provided ``values`` and build a pipeline.""" 

331 if getattr(namespace, self.dest, None) is None: 

332 setattr(namespace, self.dest, Pipeline()) 

333 if option_string.startswith('+'): 

334 keyword = 'subgraph' 

335 elif option_string.startswith('-'): 

336 keyword = 'type' 

337 else: 

338 parser.error(f"Unrecognized prefix on option flag {option_string};" 

339 " expected + or -.") 

340 setattr( 

341 namespace, 

342 self.dest, 

343 sum( 

344 [ 

345 DEFAULT_PIPELINE.downselect(**{keyword: fh}) 

346 for fhname, fh in DEFAULT_PIPELINE.items() 

347 if fhname in values 

348 ], 

349 getattr(namespace, self.dest) 

350 ) 

351 ) 

352 

353 

354class PipelineAction(Action): 

355 """ 

356 Set the pipeline from the available pipelines defined in 

357 ``llama.pipeline``. 

358 """ 

359 

360 def __call__(self, parser, namespace, values, option_string=None): 

361 """See ``PipelineAction``.""" 

362 setattr(namespace, self.dest, globals()[values]) 

363 

364 

365def postprocess_pipeline_selection(self: CliParser, namespace: Namespace): 

366 """Add any filehandlers defined in ``namespace.filehandlers`` to 

367 ``namespace.pipeline``. If ``namespace.pipeline`` is not defined, set it to 

368 the ``DEFAULT_PIPELINE``.""" 

369 if namespace.pipeline is None: 

370 if namespace.filehandlers is None: 

371 namespace.pipeline = DEFAULT_PIPELINE 

372 else: 

373 namespace.pipeline = namespace.filehandlers 

374 elif namespace.filehandlers is not None: 

375 namespace.pipeline += namespace.filehandlers 

376 

377 

378def postprocess_pipeline_dry_run(self: CliParser, namespace: Namespace): 

379 """Print the filehandlers in ``namespace.pipeline`` and quit without 

380 evaluating further if ``namespace.dry_run_pipeline`` is ``True``.""" 

381 if namespace.dry_run_pipeline: 

382 print("PIPELINE SPECIFIED:") 

383 for fhname, filehandler in namespace.pipeline.items(): 

384 print(f" {fhname:<40} -> {filehandler.FILENAME}") 

385 exit() 

386 

387 

388class Parsers: 

389 """ 

390 A collection of CLI parsers implementing ``pipeline`` related 

391 functionality. *Note that you will need to specify ``prefix_chars="-+"`` to 

392 use the ``++filehandlers`` functionality.* See ``llama.cli.Parsers`` for 

393 more information. 

394 """ 

395 

396 pipeline = CliParser(prefix_chars="-+", add_help=False) 

397 _pgroup = pipeline.add_argument_group('choose pipeline (see ' 

398 '``llama.pipeline``)') 

399 _pgroup.add_argument('-f', '--filehandlers', nargs='*', 

400 action=FileHandlerSubgraphAction, 

401 choices=list(DEFAULT_PIPELINE.keys()), 

402 metavar="FILEHANDLER", help=f""" 

403 A list of ``FileHandler`` class names which 

404 should be used. If provided, ``FileHandler`` classes whose names are in 

405 this list will be included in the pipeline. If the dependencies of a 

406 requested file are not available, no attempt will be made to generate 

407 them unless they too are listed explicitly. Available filehandlers are 

408 drawn from the ``DEFAULT_PIPELINE`` (print them with 

409 ``--print-default-pipeline``).""") 

410 _pgroup.add_argument('+f', '++filehandlers', nargs='*', 

411 action=FileHandlerSubgraphAction, 

412 choices=list(DEFAULT_PIPELINE.keys()), 

413 metavar="FILEHANDLER", help=""" 

414 Exact same as ``--filehandlers``, but all ancestors of the files listed 

415 with the ``+`` prefix will also be included. This means that, if you 

416 ask to generate a single file, an attempt will be made to **also 

417 generate everything it depends on** if necessary. If you want all those 

418 files made, this is a handy shortcut; if you want file generation to 

419 fail when ancestors are missing, use the ``-`` prefix instead.""") 

420 _pchoices = [k for k, v in globals().items() if isinstance(v, Pipeline)] 

421 _pgroup.add_argument('-p', '--pipeline', action=PipelineAction, 

422 choices=_pchoices, help=f""" 

423 The name of the pipeline to use. Must be the name of a Pipeline 

424 instance from ``llama.pipeline``. Available choices: {_pchoices}. If 

425 both this and ``filehandlers`` are specified, then the resulting 

426 pipeline will include all requested filehandlers from both options.""") 

427 _pgroup.add_argument('--print-default-pipeline', nargs=0, 

428 action=PrintDefaultPipeline, help=""" 

429 Print the contents of the default pipeline and quit.""") 

430 _pgroup.add_argument('--dry-run-pipeline', action='store_true', help=""" 

431 Print the pipeline selected by the user and quit without taking further 

432 action. Use this to make sure you've specified the correct 

433 pipeline.""") 

434 pipeline.POSTPROCESSORS = (postprocess_pipeline_selection, 

435 postprocess_pipeline_dry_run) 

436 del _pgroup