Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2# (c) Stefan Countryman, 2018
4"""
5Analysis ``Pipeline`` instances are Directed Acyclic Graphs (DAGs)
6specifying sets of ``FileHandler`` classes that together form an analysis
7pipeline. Use ``Pipeline`` instances to specify exactly which analysis steps
8you want to run; select a subset of the maximal ``DEFAULT_PIPELINE`` if you
9only need certain analysis outputs.
10"""
12import logging
13import inspect
14import types
15from operator import xor, is_
16from argparse import Action, Namespace
17import llama.files
18from llama.utils import (
19 vecstr,
20 DOTFMT,
21 EDGEFMT,
22 plot_graphviz,
23)
24from llama.classes import (
25 ImmutableDict,
26 NamespaceMappable,
27 FileHandlerTuple,
28)
29from llama.filehandler import (
30 FileGraph,
31)
32from llama.files import (
33 CoincSignificanceSubthresholdI3Lvc,
34)
35from llama.cli import CliParser
37LOGGER = logging.getLogger(__name__)
40class Pipeline(ImmutableDict, NamespaceMappable):
41 """
42 A pipeline specifies a specific set of data inputs and the functions
43 that act on them in terms of intermediate data products and the functions
44 used to generate them in a Directed Acyclic Graph (DAG); these products are
45 bundled into FileHandlers. FileHandlers are graph nodes with
46 ``DEPENDENCIES`` (edges) specified. A Pipeline DAG can be built purely by
47 specifying the specific FileHandlers which can be done trivially and
48 clearly at the file-system level by putting the FileHandler code into a
49 single directory for each pipeline.
51 Parameters
52 ----------
53 kwargs : dict
54 Names of ``FileHandler`` classes mapped to the classes themselves.
55 args : array-like
56 ``FileHandler`` classes. The ``__name__`` of each ``FileHandler`` will
57 be used as the key.
59 Returns
60 -------
61 pipeline : Pipeline
62 A new ``Pipeline`` instance containing all of the ``FileHandler``
63 classes specified in ``args`` and ``kwargs``.
65 Raises
66 ------
67 TypeError
68 If there are any name collisions between classes in the input ``args``
69 and ``kwargs``, if any of the ``FileHandler`` classes it contains are
70 abstract (non-implemented) classes, or if any of the ``FileHandler``
71 classes it contains have missing ``required_attributes``.
72 """
74 def __new__(cls, *args, **kwargs):
75 """See ``Pipeline`` class docstring for details."""
76 argdict = {fh.__name__: fh for fh in args}
77 if set(kwargs).intersection(argdict):
78 raise TypeError("Conflicting ``FileHandler`` class names given in "
79 f"``kwargs`` ({kwargs}) and ``args`` ({args}).")
80 argdict.update(kwargs)
81 for fhcls in argdict.values():
82 if inspect.isabstract(fhcls):
83 raise TypeError("Abstract classes are prohibited in "
84 f"``Pipeline`` instances. Abstract: {fhcls}")
85 for required in fhcls.required_attributes():
86 if getattr(fhcls, required, None) is None:
87 raise TypeError("Must define class attribute "
88 f"``{required}`` for ``{fhcls}`` ")
89 return ImmutableDict.__new__(cls, argdict)
91 def __hash__(self):
92 """Hash is based on module and class names used."""
93 return hash(tuple(sorted(v.__module__ + '.' + v.__name__
94 for v in self.values())))
96 def __eq__(self, other):
97 return hash(self) == hash(other)
99 @classmethod
100 def from_module(cls, module):
101 """Create a pipeline by extracting all FileHandler objects from a given
102 submodule."""
103 if isinstance(module, types.ModuleType):
104 possible_file_handlers = vars(module)
105 else:
106 raise ValueError("Argument must be a module.")
107 file_handlers = dict()
108 for key in possible_file_handlers.keys():
109 # make sure we are only adding FileHandler subclasses
110 if isinstance(possible_file_handlers[key], type):
111 if issubclass(possible_file_handlers[key], FileHandlerTuple):
112 # instantiate a FileHandler corresponding to this event
113 file_handlers[key] = possible_file_handlers[key]
114 return cls(**file_handlers)
116 def file_handler_instances(self, *args, **kwargs):
117 """Return a FileHandlerMap with FileHandler instances sharing the same
118 initialization arguments, e.g. for FileHandler instances that all refer
119 to the same event."""
120 instances = dict()
121 for key in self:
122 instances[key] = self[key](*args, **kwargs)
123 return FileGraph(instances)
125 def downselect(self, invert=False, reducer=all, **kwargs):
126 """Return a ``Pipeline`` instance whose ``FileHandler`` classes match
127 ALL the given query parameters.
129 Parameters
130 ----------
131 invert : bool, optional
132 Invert results. (Default: False)
133 reducer : function, optional
134 Specify ``any`` builtin to match if any check passes. Specify
135 ``all`` to match only when every check passes. (Default: ``all``)
136 type : type, optional
137 The type of the ``FileHandler`` must exactly match the given
138 ``FileHandler``.
139 typename : str, optional
140 The ``FileHandler`` type's name must match this string.
141 subclass : type, optional
142 The ``FileHandler`` must be a subclass of this ``FileHandler``.
143 subgraph : type, optional
144 The ``FileHandler`` must be either this ``FileHandler`` or one of
145 its ``UR_DEPENDENCIES``; use this to make a ``Pipeline`` that only
146 generates the subgraph leading to this ``FileHandler``.
147 """
148 # a bunch of checks for the file handler and the query arg
149 checks = {
150 'type': is_,
151 'typename': vecstr(lambda fh, q: fh.__name__ == q),
152 'subclass': issubclass,
153 'subgraph': lambda fh, q: fh is q or fh in q.UR_DEPENDENCIES,
154 }
155 return type(self)(**{
156 key: value for key, value in self.items()
157 if xor(
158 invert,
159 reducer(checks[query_name](value, query_value)
160 for query_name, query_value in kwargs.items())
161 )
162 })
164 def check_consistency(self, other):
165 """Check whether two ``Pipeline`` instances use the same keys to
166 describe the same ``FileHandler`` classes, raising a ``ValueError`` if
167 they don't."""
168 for key in self:
169 if key in other:
170 if self[key] != other[key]:
171 raise ValueError(("Matching keys must correspond to same "
172 "``FileHandler``. self: {} other: "
173 "{}").format(self, other))
175 def __add__(self, other):
176 """Take the union of two pipelines. Raises a ``ValueError`` if the
177 pipelines have matching keys corresponding to different ``FileHandler``
178 classes."""
179 self.check_consistency(other)
180 fhs = dict() # make a mutable dict
181 fhs.update(self)
182 fhs.update(other)
183 return type(self)(**fhs) # final result still immutable
185 def __sub__(self, other):
186 """Remove ``FileHandler`` classes in ``other`` from ``self``. Raises a
187 ``ValueError`` if the pipelines have matching keys corresponding to
188 different ``FileHandler`` classes."""
189 self.check_consistency(other)
190 fhs = dict()
191 for fhkey in self:
192 if fhkey not in other:
193 fhs[fhkey] = self[fhkey]
194 return type(self)(**fhs)
196 def __str__(self):
197 return '{}(<{}>)'.format(type(self).__name__, sorted(self.keys()))
199 def dependency_graph(self, outfile: str = None, title: str = 'Pipeline',
200 url: types.FunctionType = None, bgcolor: str = 'black'):
201 """Return a graphviz .dot graph of ``DEPENDENCIES`` between file
202 handlers in this pipeline. Optionally plot the graph to an output image
203 file visualizing the graph.
205 Optional file extensions for outfile:
207 - *dot*: just save the dotfile in .dot format.
208 - *png*: save the image in PNG format.
209 - *pdf*: save the image in PDF format.
210 - *svg*: save the image in svg format.
212 Parameters
213 ----------
214 outfile : str, optional
215 If not provided, return a string in ``.dot`` file format specifying
216 graph relationsIf an output file is specified, infer the filetype
217 and write to that file.
218 title : str, optional
219 The title of the pipeline graph plot.
220 url : FunctionType, optional
221 A function taking ``FileHandler`` classes as input and returning a
222 URL that will be added to each ``FileHandler`` class's node in the
223 output graph. Allows you to add links. If not included, URLs will
224 not be included.
225 bgcolor : str, optional
226 The background color to use for the generated plot.
228 Returns
229 -------
230 dot : str
231 The dependency graph in ``.dot`` format (can be used as input to
232 ``dot`` at the command line). This is returned regardless of
233 whether an outfile is specified.
234 """
235 # possible that some dependency nodes are not included in the default
236 # pipeline; issue a warning in this case.
237 allnodes = sorted({fh for base in self.values()
238 for fh in [base] + list(base.DEPENDENCIES)},
239 key=lambda t: f"{t.__module__}.{t.__name__}")
240 # https://www.graphviz.org/doc/info/shapes.html
241 nodedot = (
242 f'"{fh.__module__}.{fh.__name__}" '
243 f'[label=<{{{{<B>{fh.__name__}</B>|<I>{fh.FILENAME}</I>|'
244 f'{"<BR/>".join(b.__name__ for b in fh.__bases__)}}}}}>, '
245 f'shape="record", style=filled, target=_top, '
246 f'URL="{url(fh) if url is not None else ""}", '
247 'fillcolor="#cccccc"];'
248 for fh in allnodes
249 )
250 edgedot = (
251 EDGEFMT.format(
252 depnum=f"{d.__module__}.{d.__name__}",
253 num=f"{fh.__module__}.{fh.__name__}",
254 color="red"
255 )
256 for fh in self.values() for d in fh.DEPENDENCIES
257 )
258 dot = DOTFMT.format(name=title, nodes='\n'.join(sorted(nodedot)),
259 edges='\n'.join(sorted(edgedot)), bgcolor=bgcolor)
260 if outfile is not None:
261 plot_graphviz(dot, outfile)
262 return dot
265DEFAULT_PIPELINE = Pipeline.from_module(llama.files)
266DEFAULT_PIPELINE.__doc__ = """
267The ``DEFAULT_PIPELINE`` contains the full set of currently-used
268``FileHandler`` classes. You can downselect from this pipeline to create more
269focused pipelines for specific tasks.
271``FileHandler`` Classes
272~~~~~~~~~~~~~~~~~~~~~~~
274{}
276""".format(''.join(f"- ``{fh}``\n" for fh in sorted(DEFAULT_PIPELINE)))
278# A pipeline for the post-O3a (mid-O3) subthreshold simulations. Only generates
279# the subthreshold significance.
280SUBTHRESHOLD_PIPELINE = DEFAULT_PIPELINE.downselect(
281 subgraph=CoincSignificanceSubthresholdI3Lvc)
283# A pipeline for LLAMA 2.0 reviewers featuring the core of the analysis.
284# FileHandlers that communicate with the outside world are omitted, as
285# diagnostic or experimental FileHandlers. Takes a strict subset of the
286# DEFAULT_PIPELINE.
287LLAMA2_REVIEW_PIPELINE = DEFAULT_PIPELINE.downselect(
288 typename=[
289 'LvcSkymapHdf5',
290 'SkymapInfo',
291 'LvcSkymapFits',
292 'LVCGraceDbEventData',
293 'IceCubeNeutrinoList',
294 'LvcDistancesJson',
295 'PAstro',
296 # TODO update I3LvcGcnDraft to use output of
297 # LLAMA 2.0 significance calculation.
298 # 'I3LvcGcnDraft',
299 'IceCubeNeutrinoListTxt',
300 'IceCubeNeutrinoListCoincTxt',
301 'IceCubeNeutrinoListTex',
302 'CoincSignificanceI3Lvc',
303 'CoincScatterI3LvcPdf',
304 'CoincScatterI3LvcPng',
305 'CoincSummaryI3LvcTex',
306 ]
307)
310class PrintDefaultPipeline(Action):
311 """Print DEFAULT_PIPELINE in a readable format and quit."""
313 def __call__(self, parser, namespace, values, option_string=None):
314 """Only called if flag is explicitly specified."""
315 print(DEFAULT_PIPELINE.__doc__.strip())
316 exit()
319class FileHandlerSubgraphAction(Action):
320 """
321 Downselect for a subgraph on the specified filehandlers, returning a
322 ``llama.pipeline.Pipeline`` containing the specified filehandlers and all
323 of their ancestors if the option string starts with ``+`` or just the
324 specified filehandlers if the option_string starts with ``-``; combine
325 results so that both types of flags can be used in the same command-line
326 invocation.
327 """
329 def __call__(self, parser, namespace, values, option_string=None):
330 """Take the provided ``values`` and build a pipeline."""
331 if getattr(namespace, self.dest, None) is None:
332 setattr(namespace, self.dest, Pipeline())
333 if option_string.startswith('+'):
334 keyword = 'subgraph'
335 elif option_string.startswith('-'):
336 keyword = 'type'
337 else:
338 parser.error(f"Unrecognized prefix on option flag {option_string};"
339 " expected + or -.")
340 setattr(
341 namespace,
342 self.dest,
343 sum(
344 [
345 DEFAULT_PIPELINE.downselect(**{keyword: fh})
346 for fhname, fh in DEFAULT_PIPELINE.items()
347 if fhname in values
348 ],
349 getattr(namespace, self.dest)
350 )
351 )
354class PipelineAction(Action):
355 """
356 Set the pipeline from the available pipelines defined in
357 ``llama.pipeline``.
358 """
360 def __call__(self, parser, namespace, values, option_string=None):
361 """See ``PipelineAction``."""
362 setattr(namespace, self.dest, globals()[values])
365def postprocess_pipeline_selection(self: CliParser, namespace: Namespace):
366 """Add any filehandlers defined in ``namespace.filehandlers`` to
367 ``namespace.pipeline``. If ``namespace.pipeline`` is not defined, set it to
368 the ``DEFAULT_PIPELINE``."""
369 if namespace.pipeline is None:
370 if namespace.filehandlers is None:
371 namespace.pipeline = DEFAULT_PIPELINE
372 else:
373 namespace.pipeline = namespace.filehandlers
374 elif namespace.filehandlers is not None:
375 namespace.pipeline += namespace.filehandlers
378def postprocess_pipeline_dry_run(self: CliParser, namespace: Namespace):
379 """Print the filehandlers in ``namespace.pipeline`` and quit without
380 evaluating further if ``namespace.dry_run_pipeline`` is ``True``."""
381 if namespace.dry_run_pipeline:
382 print("PIPELINE SPECIFIED:")
383 for fhname, filehandler in namespace.pipeline.items():
384 print(f" {fhname:<40} -> {filehandler.FILENAME}")
385 exit()
388class Parsers:
389 """
390 A collection of CLI parsers implementing ``pipeline`` related
391 functionality. *Note that you will need to specify ``prefix_chars="-+"`` to
392 use the ``++filehandlers`` functionality.* See ``llama.cli.Parsers`` for
393 more information.
394 """
396 pipeline = CliParser(prefix_chars="-+", add_help=False)
397 _pgroup = pipeline.add_argument_group('choose pipeline (see '
398 '``llama.pipeline``)')
399 _pgroup.add_argument('-f', '--filehandlers', nargs='*',
400 action=FileHandlerSubgraphAction,
401 choices=list(DEFAULT_PIPELINE.keys()),
402 metavar="FILEHANDLER", help=f"""
403 A list of ``FileHandler`` class names which
404 should be used. If provided, ``FileHandler`` classes whose names are in
405 this list will be included in the pipeline. If the dependencies of a
406 requested file are not available, no attempt will be made to generate
407 them unless they too are listed explicitly. Available filehandlers are
408 drawn from the ``DEFAULT_PIPELINE`` (print them with
409 ``--print-default-pipeline``).""")
410 _pgroup.add_argument('+f', '++filehandlers', nargs='*',
411 action=FileHandlerSubgraphAction,
412 choices=list(DEFAULT_PIPELINE.keys()),
413 metavar="FILEHANDLER", help="""
414 Exact same as ``--filehandlers``, but all ancestors of the files listed
415 with the ``+`` prefix will also be included. This means that, if you
416 ask to generate a single file, an attempt will be made to **also
417 generate everything it depends on** if necessary. If you want all those
418 files made, this is a handy shortcut; if you want file generation to
419 fail when ancestors are missing, use the ``-`` prefix instead.""")
420 _pchoices = [k for k, v in globals().items() if isinstance(v, Pipeline)]
421 _pgroup.add_argument('-p', '--pipeline', action=PipelineAction,
422 choices=_pchoices, help=f"""
423 The name of the pipeline to use. Must be the name of a Pipeline
424 instance from ``llama.pipeline``. Available choices: {_pchoices}. If
425 both this and ``filehandlers`` are specified, then the resulting
426 pipeline will include all requested filehandlers from both options.""")
427 _pgroup.add_argument('--print-default-pipeline', nargs=0,
428 action=PrintDefaultPipeline, help="""
429 Print the contents of the default pipeline and quit.""")
430 _pgroup.add_argument('--dry-run-pipeline', action='store_true', help="""
431 Print the pipeline selected by the user and quit without taking further
432 action. Use this to make sure you've specified the correct
433 pipeline.""")
434 pipeline.POSTPROCESSORS = (postprocess_pipeline_selection,
435 postprocess_pipeline_dry_run)
436 del _pgroup