Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# (c) Stefan Countryman, 2019
3"""
4Batch process large sets of organized data using LLAMA. Input and output files
5can be stored locally or on cloud data. Good for large-scale simulations of
6injected or randmoized data. Use the command-line interface to specify where
7input data should be loaded from, where outputs should be stored, and how many
8events should be processed. These source/destination paths can be specified as
9python format strings, in which case their variables will be populated with
10variables sourced from a list of possible values (specified with ``--varlist``)
11or from the environment (if no ``--varlist`` is specified for that variable
12name).
13"""
15import os
16import sys
17import logging
18from collections import OrderedDict
19from tempfile import TemporaryDirectory
20from llama.flags.cli import Parsers as FlagParsers
21from llama.event import Event
22from llama.cli import (
23 get_logging_cli,
24 CliParser,
25 Parsers,
26 get_postprocess_required_arg,
27)
28from llama.pipeline import Parsers as PipeParsers
29from llama.utils import LOGDIR
30from llama.batch import (
31 DictAction,
32 postprocess_param_iterator,
33 read_file,
34 put_file,
35 batch_error_alert_maintainers,
36 batch_error_dump,
37)
38from llama.run import Run
39from llama.cli import log_exceptions_and_recover, _ERR_ALERT
41SIMULATION_LOGFILE = os.path.join(LOGDIR, 'llama.batch.log')
42LOGGER = logging.getLogger(__name__)
45def get_parser():
46 """Get CLI Parser."""
47 parser = CliParser(
48 description=__doc__,
49 prefix_chars="-+",
50 parents=(
51 Parsers.dev_mode,
52 Parsers.erralert,
53 PipeParsers.pipeline,
54 FlagParsers.flags,
55 get_logging_cli(SIMULATION_LOGFILE, 'info'),
56 )
57 )
58 parser.POSTPROCESSORS += (
59 postprocess_param_iterator,
60 get_postprocess_required_arg('params'),
61 )
62 sim = parser.add_argument_group(
63 "simulation configuration",
64 """
65 Remote file paths in the below arguments can
66 be specified using regular URLs (by prefixing ``http://`` or
67 ``https://`` before the path) or Amazon/DigitalOcean S3 paths by
68 prefixing ``s3://{bucket}/`` before the path (where ``{bucket`` is
69 replaced with the relevant bucket name``). Note that this will only
70 work for unauthenticated HTTP/HTTPS endpoints, and the S3 downloads
71 will only work if you have S3 configured (see: ``llama.com.s3``
72 documentation). Further note that *uploads* to HTTP/HTTPS endpoints
73 are not supported.
74 """
75 )
76 arg = sim.add_argument
77 arg("params", nargs="*", action=DictAction, help="""
78 Specify variable names and files that contain their possible values.
79 Same syntax as ``bash`` variable assignments, but with the value set as
80 the file containing possible values. The list of possible values can be
81 a JSON or newline-delimited list. For example, if you have a list of
82 values for the ``graceid`` located at ``data/graceids.txt`` and a list
83 of values for ``neutrino_filename`` located in S3 bucket ``llama`` under
84 the path ``data/neutrino_filenames.json``, you would
85 write ``--params graceid=data/graceids.txt
86 neutrino_filename=s3://llama/data/neutrino_filenames.json``. Note that
87 the pipeline will run through the full Cartesian product of variable
88 values from these lists; you can instead run through a pre-determined
89 number of random vectors from this space (see ``--random`` below).""")
90 arg("--get", nargs="*", default=OrderedDict(), action=DictAction, help="""
91 Specify paths from which files can be loaded to be used as inputs for a
92 simulation run. This looks a lot like the ``--params`` syntax, but
93 with the variable names equal to the output filename and the option of
94 including variables using python format string syntax. For example, to
95 download ``skymap_info.json`` files from S3 directories with the
96 GraceID as part of the path name, you might specify ``--get
97 skymap_info.json=s3://llama/sim/{graceid}/skymap_info.json``.""")
98 arg("--put", nargs="*", default=OrderedDict(), action=DictAction, help="""
99 Like ``--get``, but instead you specify the *destination* to which an
100 output file should be saved. This can be a local path or an S3
101 path.""")
102 arg("--errdump", nargs="*", default=(), help="""
103 Specify directories in which to dump the contents of the active event
104 directory in case of an unhandled exception. You can specify multiple
105 places for this dump (e.g. upload to S3 and save to a local directory)
106 in order to aid debugging. Git history is not copied; everything else
107 is. This argument is ignored during normal operation.""")
108 arg("--format", nargs="*", help="""
109 Specify filenames (as relative paths within the event directory) whose
110 contents should be loaded and string formatted with values from
111 ``--params`` and the environment. This operation happens after
112 ``--get`` and replaces the contents of the file, allowing
113 you to use boilerplate files with parametric values inserted.""")
114 arg("--eventdir", help="""
115 Specify a local path in which each simulated event should be saved.
116 Once again, you can use a format string to specify a path that includes
117 the ``--params``. For example, save events to directories based on
118 their ``graceid`` and ``neutrino_filename`` with ``--eventdir
119 ~/sim/{graceid}-{neutrino_filename}/``. If this argument is omitted,
120 then only outputs specified using ``--put`` will be persisted, and the
121 actual simulation directory will be discarded immediately after those
122 files have been copied.""")
123 arg("--random", metavar="N", type=int, help="""
124 If the ``--random`` flag is specified, then the pipeline will be run on
125 random sequences of the variables specified in the ``--params``
126 files, and the simulation will run ``N`` times on randomized inputs
127 from those files. Set ``N`` to a negative integer to run
128 indefinitely. If ``--random`` is *not* specified, then the full
129 Cartesian product of input variables from the ``--params`` will be used
130 in sequence.""")
131 arg("--public", action="store_true", help="""
132 If specified, files uploaded to S3 will be publicly available for
133 download. Otherwise, those files will require S3 credentials for
134 access.""")
135 return parser
138def main():
139 """
140 Run CLI.
141 """
142 LOGGER.debug("Running simulations. Command line arguments: %s", sys.argv)
143 parser = get_parser()
144 args = parser.parse_args()
145 if args.format:
146 if any(f not in args.get for f in args.format):
147 parser.error("Files specified with `--format` must first be "
148 "added to each even with `--get`. Files in "
149 "`--format` not found in `--get`: " +
150 " ".join(f for f in args.format if f not in args.get))
151 for params in args.params:
152 LOGGER.info("Simulating event params: %s", params)
153 env = os.environ.copy()
154 env.update(params)
155 LOGGER.debug("Environment with params included: %s", env)
156 get = {k: v.format(**env) for k, v in args.get.items()}
157 LOGGER.debug("files to get: %s", get)
158 put = {k: v.format(**env) for k, v in args.put.items()}
159 LOGGER.debug("files to put: %s", put)
160 errcallbacks = []
161 errdump = [d.format(**params) for d in args.errdump]
163 try:
164 if args.eventdir is None:
165 tmp = TemporaryDirectory() # we will clean this up later
166 event = Event.fromdir(tmp.name, pipeline=args.pipeline)
167 else:
168 event = Event.fromdir(args.eventdir.format(**env),
169 pipeline=args.pipeline)
170 if _ERR_ALERT[0]:
171 LOGGER.debug("preparing error alert callback...")
172 errcallbacks.append(
173 batch_error_alert_maintainers(errdump, event, params)
174 )
175 LOGGER.debug("configuring error dump callback...")
176 errcallbacks.append(batch_error_dump(errdump, event))
178 @log_exceptions_and_recover(errcallbacks)
179 def run_event():
180 """Run the event. This function gets wrapped in
181 ``log_exceptions_and_recover`` so that cleanup can be run and
182 errors recovered from."""
183 event.init()
184 LOGGER.info("Setting flags: %s", args.flags)
185 # necessary check since an empty ``args.flags`` list will not
186 # lead to an update
187 if args.flags:
188 event.flags.update(args.flags)
189 else:
190 event.flags.update(event.flags.DEFAULT_FLAGS)
191 LOGGER.debug("Event being run: %s", event)
192 for dest, source in get.items():
193 with open(os.path.join(event.eventdir, dest), 'wb') as out:
194 out.write(read_file(source))
195 LOGGER.debug("contents of event directory after get files: %s",
196 os.listdir(event.eventdir))
197 if args.format:
198 LOGGER.debug("formatting contents of files: %s",
199 args.format)
200 for fname in args.format:
201 fpath = os.path.join(event.eventdir, fname)
202 with open(fpath) as fmt:
203 contents = fmt.read()
204 with open(fpath, 'w') as fmt:
205 fmt.write(contents.format(**env))
206 # use ``Run.update`` to loop through the full event
207 Run(event.rundir).downselect(
208 eventid_filter=event.eventid).update()
209 LOGGER.info("Finished simulating; copying results out")
210 for sourcename, dest in put.items():
211 source = os.path.join(event.eventdir, sourcename)
212 LOGGER.debug("put %s -> %s", source, dest)
213 put_file(source, dest, args.public)
214 LOGGER.info("Finished copying results.")
216 run_event()
217 finally:
218 if args.eventdir is None:
219 tmp.cleanup()
222if __name__ == '__main__':
223 main()