Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# (c) Stefan Countryman, 2019 

2 

3""" 

4Batch process large sets of organized data using LLAMA. Input and output files 

5can be stored locally or on cloud data. Good for large-scale simulations of 

6injected or randmoized data. Use the command-line interface to specify where 

7input data should be loaded from, where outputs should be stored, and how many 

8events should be processed. These source/destination paths can be specified as 

9python format strings, in which case their variables will be populated with 

10variables sourced from a list of possible values (specified with ``--varlist``) 

11or from the environment (if no ``--varlist`` is specified for that variable 

12name). 

13""" 

14 

15import os 

16import sys 

17import logging 

18from collections import OrderedDict 

19from tempfile import TemporaryDirectory 

20from llama.flags.cli import Parsers as FlagParsers 

21from llama.event import Event 

22from llama.cli import ( 

23 get_logging_cli, 

24 CliParser, 

25 Parsers, 

26 get_postprocess_required_arg, 

27) 

28from llama.pipeline import Parsers as PipeParsers 

29from llama.utils import LOGDIR 

30from llama.batch import ( 

31 DictAction, 

32 postprocess_param_iterator, 

33 read_file, 

34 put_file, 

35 batch_error_alert_maintainers, 

36 batch_error_dump, 

37) 

38from llama.run import Run 

39from llama.cli import log_exceptions_and_recover, _ERR_ALERT 

40 

41SIMULATION_LOGFILE = os.path.join(LOGDIR, 'llama.batch.log') 

42LOGGER = logging.getLogger(__name__) 

43 

44 

45def get_parser(): 

46 """Get CLI Parser.""" 

47 parser = CliParser( 

48 description=__doc__, 

49 prefix_chars="-+", 

50 parents=( 

51 Parsers.dev_mode, 

52 Parsers.erralert, 

53 PipeParsers.pipeline, 

54 FlagParsers.flags, 

55 get_logging_cli(SIMULATION_LOGFILE, 'info'), 

56 ) 

57 ) 

58 parser.POSTPROCESSORS += ( 

59 postprocess_param_iterator, 

60 get_postprocess_required_arg('params'), 

61 ) 

62 sim = parser.add_argument_group( 

63 "simulation configuration", 

64 """ 

65 Remote file paths in the below arguments can 

66 be specified using regular URLs (by prefixing ``http://`` or 

67 ``https://`` before the path) or Amazon/DigitalOcean S3 paths by 

68 prefixing ``s3://{bucket}/`` before the path (where ``{bucket`` is 

69 replaced with the relevant bucket name``). Note that this will only 

70 work for unauthenticated HTTP/HTTPS endpoints, and the S3 downloads 

71 will only work if you have S3 configured (see: ``llama.com.s3`` 

72 documentation). Further note that *uploads* to HTTP/HTTPS endpoints 

73 are not supported. 

74 """ 

75 ) 

76 arg = sim.add_argument 

77 arg("params", nargs="*", action=DictAction, help=""" 

78 Specify variable names and files that contain their possible values. 

79 Same syntax as ``bash`` variable assignments, but with the value set as 

80 the file containing possible values. The list of possible values can be 

81 a JSON or newline-delimited list. For example, if you have a list of 

82 values for the ``graceid`` located at ``data/graceids.txt`` and a list 

83 of values for ``neutrino_filename`` located in S3 bucket ``llama`` under 

84 the path ``data/neutrino_filenames.json``, you would 

85 write ``--params graceid=data/graceids.txt 

86 neutrino_filename=s3://llama/data/neutrino_filenames.json``. Note that 

87 the pipeline will run through the full Cartesian product of variable 

88 values from these lists; you can instead run through a pre-determined 

89 number of random vectors from this space (see ``--random`` below).""") 

90 arg("--get", nargs="*", default=OrderedDict(), action=DictAction, help=""" 

91 Specify paths from which files can be loaded to be used as inputs for a 

92 simulation run. This looks a lot like the ``--params`` syntax, but 

93 with the variable names equal to the output filename and the option of 

94 including variables using python format string syntax. For example, to 

95 download ``skymap_info.json`` files from S3 directories with the 

96 GraceID as part of the path name, you might specify ``--get 

97 skymap_info.json=s3://llama/sim/{graceid}/skymap_info.json``.""") 

98 arg("--put", nargs="*", default=OrderedDict(), action=DictAction, help=""" 

99 Like ``--get``, but instead you specify the *destination* to which an 

100 output file should be saved. This can be a local path or an S3 

101 path.""") 

102 arg("--errdump", nargs="*", default=(), help=""" 

103 Specify directories in which to dump the contents of the active event 

104 directory in case of an unhandled exception. You can specify multiple 

105 places for this dump (e.g. upload to S3 and save to a local directory) 

106 in order to aid debugging. Git history is not copied; everything else 

107 is. This argument is ignored during normal operation.""") 

108 arg("--format", nargs="*", help=""" 

109 Specify filenames (as relative paths within the event directory) whose 

110 contents should be loaded and string formatted with values from 

111 ``--params`` and the environment. This operation happens after 

112 ``--get`` and replaces the contents of the file, allowing 

113 you to use boilerplate files with parametric values inserted.""") 

114 arg("--eventdir", help=""" 

115 Specify a local path in which each simulated event should be saved. 

116 Once again, you can use a format string to specify a path that includes 

117 the ``--params``. For example, save events to directories based on 

118 their ``graceid`` and ``neutrino_filename`` with ``--eventdir 

119 ~/sim/{graceid}-{neutrino_filename}/``. If this argument is omitted, 

120 then only outputs specified using ``--put`` will be persisted, and the 

121 actual simulation directory will be discarded immediately after those 

122 files have been copied.""") 

123 arg("--random", metavar="N", type=int, help=""" 

124 If the ``--random`` flag is specified, then the pipeline will be run on 

125 random sequences of the variables specified in the ``--params`` 

126 files, and the simulation will run ``N`` times on randomized inputs 

127 from those files. Set ``N`` to a negative integer to run 

128 indefinitely. If ``--random`` is *not* specified, then the full 

129 Cartesian product of input variables from the ``--params`` will be used 

130 in sequence.""") 

131 arg("--public", action="store_true", help=""" 

132 If specified, files uploaded to S3 will be publicly available for 

133 download. Otherwise, those files will require S3 credentials for 

134 access.""") 

135 return parser 

136 

137 

138def main(): 

139 """ 

140 Run CLI. 

141 """ 

142 LOGGER.debug("Running simulations. Command line arguments: %s", sys.argv) 

143 parser = get_parser() 

144 args = parser.parse_args() 

145 if args.format: 

146 if any(f not in args.get for f in args.format): 

147 parser.error("Files specified with `--format` must first be " 

148 "added to each even with `--get`. Files in " 

149 "`--format` not found in `--get`: " + 

150 " ".join(f for f in args.format if f not in args.get)) 

151 for params in args.params: 

152 LOGGER.info("Simulating event params: %s", params) 

153 env = os.environ.copy() 

154 env.update(params) 

155 LOGGER.debug("Environment with params included: %s", env) 

156 get = {k: v.format(**env) for k, v in args.get.items()} 

157 LOGGER.debug("files to get: %s", get) 

158 put = {k: v.format(**env) for k, v in args.put.items()} 

159 LOGGER.debug("files to put: %s", put) 

160 errcallbacks = [] 

161 errdump = [d.format(**params) for d in args.errdump] 

162 

163 try: 

164 if args.eventdir is None: 

165 tmp = TemporaryDirectory() # we will clean this up later 

166 event = Event.fromdir(tmp.name, pipeline=args.pipeline) 

167 else: 

168 event = Event.fromdir(args.eventdir.format(**env), 

169 pipeline=args.pipeline) 

170 if _ERR_ALERT[0]: 

171 LOGGER.debug("preparing error alert callback...") 

172 errcallbacks.append( 

173 batch_error_alert_maintainers(errdump, event, params) 

174 ) 

175 LOGGER.debug("configuring error dump callback...") 

176 errcallbacks.append(batch_error_dump(errdump, event)) 

177 

178 @log_exceptions_and_recover(errcallbacks) 

179 def run_event(): 

180 """Run the event. This function gets wrapped in 

181 ``log_exceptions_and_recover`` so that cleanup can be run and 

182 errors recovered from.""" 

183 event.init() 

184 LOGGER.info("Setting flags: %s", args.flags) 

185 # necessary check since an empty ``args.flags`` list will not 

186 # lead to an update 

187 if args.flags: 

188 event.flags.update(args.flags) 

189 else: 

190 event.flags.update(event.flags.DEFAULT_FLAGS) 

191 LOGGER.debug("Event being run: %s", event) 

192 for dest, source in get.items(): 

193 with open(os.path.join(event.eventdir, dest), 'wb') as out: 

194 out.write(read_file(source)) 

195 LOGGER.debug("contents of event directory after get files: %s", 

196 os.listdir(event.eventdir)) 

197 if args.format: 

198 LOGGER.debug("formatting contents of files: %s", 

199 args.format) 

200 for fname in args.format: 

201 fpath = os.path.join(event.eventdir, fname) 

202 with open(fpath) as fmt: 

203 contents = fmt.read() 

204 with open(fpath, 'w') as fmt: 

205 fmt.write(contents.format(**env)) 

206 # use ``Run.update`` to loop through the full event 

207 Run(event.rundir).downselect( 

208 eventid_filter=event.eventid).update() 

209 LOGGER.info("Finished simulating; copying results out") 

210 for sourcename, dest in put.items(): 

211 source = os.path.join(event.eventdir, sourcename) 

212 LOGGER.debug("put %s -> %s", source, dest) 

213 put_file(source, dest, args.public) 

214 LOGGER.info("Finished copying results.") 

215 

216 run_event() 

217 finally: 

218 if args.eventdir is None: 

219 tmp.cleanup() 

220 

221 

222if __name__ == '__main__': 

223 main()