Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python 

2# (c) Stefan Countryman 2016-2019 

3# pylint: disable=C0325 

4 

5""" 

6The main interface for running the LLAMA pipeline. Turns on the automated 

7pipeline, either in the foreground (for immediate work) or in the background 

8(launching a long-running daemon process). 

9 

10The pipeline checks for new LLAMA events and event data and 

11(re)fetches/(re)generates data files for each event as new data becomes 

12available. Provides a very flexible and powerful command line interface for 

13running LLAMA. 

14""" 

15 

16import traceback 

17import subprocess 

18from signal import SIGTERM 

19import atexit 

20import time 

21from argparse import RawDescriptionHelpFormatter, Namespace 

22import logging 

23import sys 

24import os 

25from hashlib import sha1 

26from glob import glob 

27from urllib.parse import quote_plus, unquote_plus # python3 

28from psutil import Process 

29import llama 

30from llama.cli import ( 

31 log_exceptions_and_recover, 

32 safe_launch_daemon, 

33 pidfile, 

34 printprocs, 

35 print_running_procs_action, 

36 get_logging_cli, 

37 Parsers, 

38 CliParser, 

39) 

40from llama.run import ( 

41 Run, 

42 Parsers as RunParsers, 

43) 

44from llama.event import Event 

45from llama.utils import COLOR as COL, CACHEDIR 

46from llama.classes import ImmutableDict 

47 

48RELAUNCH_FLAGS = ('-r', '--relaunch') 

49CMD_NICKNAMES = ImmutableDict({('llama', 'run'): 'llama run', 

50 ('llama.run',): 'llama run'}) 

51LLAMAD_RUNDIR = os.path.join(CACHEDIR, 'llamad.run.d') 

52if not os.path.isdir(LLAMAD_RUNDIR): 

53 os.makedirs(LLAMAD_RUNDIR) 

54LOGFILE = os.path.join(llama.LOGDIR, 'llamad.log') 

55EPILOG = """EXAMPLES 

56-------- 

57 

58*Note that by default,* ``llama run`` *will start a background process and exit 

59from the foreground, allowing you to keep working or disconnect your SSH 

60session while* ``llama run`` *progresses. Override this behaviour with* ``-F``. 

61 

62Continually monitor and update the default run directory 

63 

64.. code:: 

65 

66 llama run 

67 

68Continually monitor and update a temporary test directory but NOT the default 

69run directory 

70 

71.. code:: 

72 

73 llama run /tmp/llamatest 

74 

75Keep monitoring only the current directory 

76 

77.. code:: 

78 

79 llama run . 

80 

81Make a single attempt to generate ``IceCubeNeutrinoList`` in the current 

82directory (if its input files exist) and then quit: 

83 

84.. code:: 

85 

86 llama run . -o -f IceCubeNeutrinoList 

87 

88Same as above, but generate any missing ``IceCubeNeutrinoList`` inputs if 

89possible: 

90 

91.. code:: 

92 

93 llama run . -o +f IceCubeNeutrinoList 

94 

95Same as above, but instead of spinning up a background process, keep control of 

96the terminal and print all logging output until the job is done: 

97 

98.. code:: 

99 

100 llama run . -o +f IceCubeNeutrinoList -F 

101 

102""" 

103SLEEP_WAIT = 2 

104LOGGER = logging.getLogger(__name__) 

105 

106 

107def postprocess_updater_downselections(_self: CliParser, namespace: Namespace): 

108 """Update ``namespace.run`` ``Run`` instances to sort themselves in 

109 descending order of event creation time (newest first) with a maximum of 

110 ``namespace.num_updated`` runs returned.""" 

111 namespace.run = [r.downselect(sortkey=Event.modification_time, 

112 reverse=True, limit=namespace.num_updated) 

113 for r in namespace.run] 

114 

115 

116def get_parser(): 

117 """Get CLI parser.""" 

118 parser = CliParser( 

119 prog="llama run", 

120 description=__doc__, 

121 epilog=RunParsers.__doc__ + '\n\n' + EPILOG, 

122 prefix_chars="-+", 

123 parents=( 

124 Parsers.dev_mode, 

125 Parsers.erralert, 

126 RunParsers(downselect="manual=False").pipeline_and_eventfiltering, 

127 get_logging_cli(LOGFILE, 'info'), 

128 ), 

129 formatter_class=RawDescriptionHelpFormatter, 

130 ) 

131 parser.POSTPROCESSORS += (postprocess_updater_downselections,) 

132 arg = parser.add_argument 

133 arg('-o', '--runonce', action='store_true', help=""" 

134 If specified, each event directory will only be updated once, and then 

135 the script will exit.""") 

136 arg('-n', '--num-updated', default=40, type=int, help=""" 

137 Number of event directories to keep updated. Older event directories 

138 will be skipped and not updated if they do not fall within the top 

139 ``num_updated``.""") 

140 arg('-F', '--foreground', action='store_true', help=""" 

141 Run ``llama run`` in the foreground instead of immediately sending it 

142 to the background as a daemon process. If running in the foreground, 

143 logs are printed to STDERR (in addition to the global archival logfile, 

144 see ``--logfile``); if running in the background (i.e. omitting this 

145 flag), logs are appended to a logfile specific to this set of event 

146 directories (again, in addition to the archival logfile). This 

147 CLI-argument-specific logfile goes in the same log directory as the 

148 default ``--logfile`` option but has a filename specific to this 

149 ``llama run`` invocation; the name is the same as the ``lockdir`` name 

150 (see: ``lockdir``), but with ``.log`` appended.""") 

151 arg('-R', '--running-daemons', nargs=0, 

152 action=print_running_procs_action(LLAMAD_RUNDIR, 

153 command_nicknames=CMD_NICKNAMES), 

154 help=f""" 

155 Print the currently-running llama daemons as recorded in the 

156 ``llama run`` lockfile directory, {LLAMAD_RUNDIR}. Note that this will 

157 not catch ``llama run`` processes whose lock files have somehow 

158 been deleted from this directory. Run ``ps -ax | grep 'llama run'`` if 

159 you're paranoid about missing anything.""") 

160 arg('-k', '--kill', action='store_true', help=""" 

161 Kill any llama processes covering potentially competing ``run`` and 

162 ``eventidfilter`` values (as decided by conservative comparison 

163 algorithms) by finding their PIDs in their semaphore lock directories 

164 and killing the associated processes (if they are still running). You 

165 can run this if a ``llama run`` instance fails to launch due to 

166 competing instances in order to kill them safely.""") 

167 arg('-K', '--killfirst', action='store_true', help=""" 

168 Like ``--kill``, but continues execution after killing. In other words, 

169 start a new process but kill competition first.""") 

170 arg(*RELAUNCH_FLAGS, action='store_true', help=""" 

171 If ``relaunch`` is specified, then wrap ``llama run`` as a subprocess 

172 and keep relaunching it if it fails; if it exits gracefully (returncode 

173 0), it will not be relaunched. All command line arguments (besides this 

174 one) will be passed to subprocess ``llama run`` invocations.""") 

175 return parser 

176 

177 

178def lockdir(rundirstoupdate, eventidfilter): 

179 """Define a lock directory name/path for this process. This depends partly 

180 on the arguments (since we don't want multiple ``llama run`` processes 

181 working on 

182 the same event directory).""" 

183 quoted_filter = quote_plus(simplify_glob(eventidfilter)) 

184 if not os.path.isdir(LLAMAD_RUNDIR): 

185 LOGGER.info("``llama run`` lockfile dir not found; creating") 

186 os.makedirs(LLAMAD_RUNDIR) 

187 return os.path.join(LLAMAD_RUNDIR, 

188 '{}+{}'.format(quote_rundirs(rundirstoupdate), 

189 quoted_filter)) 

190 

191 

192def quote_rundirs(rundirstoupdate): 

193 """Quote a list of run directories in a unique, filesystem-friendly way 

194 for use in filenames and comparisons. If the result is long enough to risk 

195 hitting the common filesystem name length limit of 255 bytes when combined 

196 with a quoted eventidfilter (i.e. if it is longer than 120 characters, a 

197 conservative choice), return a sha1 hash of that string instead.""" 

198 dirstr = ','.join(sorted(quote_plus(os.path.realpath(d)) 

199 for d in rundirstoupdate)) 

200 LOGGER.debug("dirstr: %s", dirstr) 

201 if len(dirstr) > 120: 

202 # handle python2 and python3 by encoding to ascii/bytes 

203 dirstr = sha1(dirstr.encode()).hexdigest() 

204 LOGGER.debug("len(dirstr) > 120, taking hash: %s", dirstr) 

205 return dirstr 

206 

207 

208def simplify_glob(glb): 

209 """Simplify a glob to make it more general and easier to compare to other 

210 globs using our simplified glob checking.""" 

211 glb = glb.replace('?', '*').replace('[', '*').replace(']', '*') 

212 if '*' not in glb: 

213 return glb 

214 split_glb = glb.split('*') 

215 return '*'.join((split_glb[0], split_glb[-1])) 

216 

217 

218def kill_matching(rundirstoupdate, eventidfilter): 

219 """Kill competing ``llama run`` processes.""" 

220 kill_list = ([(rundirstoupdate, eventidfilter)] + 

221 competing_processes(rundirstoupdate, eventidfilter)) 

222 pids = [] 

223 locks = [] 

224 for rundirs, filt in kill_list: 

225 lockd = lockdir(rundirs, filt) 

226 pidpath = pidfile(lockd) 

227 if os.path.isfile(pidpath): 

228 with open(pidpath) as pidf: 

229 pid = int(pidf.read()) 

230 pids.append(pid) 

231 locks.append((lockd, pidpath)) 

232 else: 

233 print("No pidfile found for ", rundirs, " with filter ", filt, 

234 ", removing lockdir and continuing...") 

235 os.rmdir(lockd) 

236 print("TERMINATING THESE PROCESSES NOW (with SIGTERM):") 

237 printprocs(pids, command_nicknames=CMD_NICKNAMES) 

238 for proc in pids: 

239 Process(pid).terminate() 

240 print("Done killing. Cleaning up pidfiles and lockdirs...") 

241 for lockd, pidpath in locks: 

242 if os.path.isfile(pidpath): 

243 print(f"RM {pidpath}") 

244 os.unlink(pidpath) 

245 if os.path.isdir(lockd): 

246 print(f"RMDIR {lockd}") 

247 os.rmdir(lockd) 

248 print("Done. If you had auto-restart (enabled by default), you") 

249 print("might need to run the last command a couple more times to make") 

250 print("sure the ``llama run`` processes you wanted to kill are dead.") 

251 print("Try this a few times to give the ``llama run`` processes a chance") 

252 print("to gracefully exit before you forcibly kill them.") 

253 

254 

255def globs_intersect(glob1, glob2): 

256 r"""Conservatively check whether two UNIX filepath globs (compatible with 

257 the python ``glob`` module) might intersect. Returns True if they might 

258 describe the same filepaths, False if they certainly cannot describe the 

259 same filepaths. Assumes that BOTH globs are strictly of the form 

260 [^\*]*\*[^\*]* (using ``sed`` regex), i.e. at most one wildcard appears in 

261 the glob and no wildcards besides '*' can be used.""" 

262 # if there are no wildcards, simply see if the filters match 

263 if '*' not in glob1 and '*' not in glob2: 

264 return glob1 == glob2 

265 # if both have wildcards, see if they could overlap based on how the 

266 # eventidfilters start and end 

267 if '*' in glob1 and '*' in glob2: 

268 # the shorter substrings before and after the wildcard are necessarily 

269 # less restrictive; if the longer substrings contain them, then the 

270 # shorter strings + the wildcard will also be able to match the longer 

271 # substrings, allowing for overlap in the filenames described by both 

272 # globs. 

273 start1, end1 = glob1.split('*') 

274 start2, end2 = glob2.split('*') 

275 shortstart, longstart = sorted([start1, start2], key=len) 

276 if not longstart.startswith(shortstart): 

277 return False 

278 shortend, longend = sorted([end1, end2], key=len) 

279 if not longend.endswith(shortend): 

280 return False 

281 return True 

282 # set it so that glob2 is the one with the wildcard 

283 if '*' in glob1: 

284 glob1, glob2 = glob2, glob1 

285 # now see whether glob1, which has no wildcard, can be described by glob2 

286 start2, end2 = glob2.split('*') 

287 if not glob1.startswith(start2): 

288 return False 

289 # remove the pre-wildcard chunk of glob2 from glob1 to see if it ends with 

290 # the post-wildcard chunk of glob2. 

291 glob1 = glob1.replace(start2, '', 1) 

292 return glob1.endswith(end2) 

293 

294 

295def competing_processes(rundirstoupdate, eventidfilter): 

296 """Get a list of (rundirstoupdate, eventidfilters) for running ``llama 

297 run`` that are potentially monitoring the same events.""" 

298 this_proc = lockdir(rundirstoupdate, eventidfilter) 

299 glb = [d for d in glob(os.path.join(LLAMAD_RUNDIR, '*')) 

300 if d != this_proc] 

301 others = [({unquote_plus(e) for e in evsdirs.split(',')}, unquote_plus(f)) 

302 for evsdirs, f in (os.path.basename(d).split('+') for d in glb)] 

303 return [(e, f) for e, f in others 

304 if (e.intersection(rundirstoupdate) or 

305 globs_intersect(eventidfilter, f))] 

306 

307 

308def exit_if_competing(rundirstoupdate, eventidfilter): 

309 """Return a no-argument function that checks whether any other running 

310 ``llama run`` processes are potentially looking at the same run directories 

311 and exits if so.""" 

312 

313 def check_and_exit(): 

314 """Pass as a ``post_fork`` hook to ``safe_launch_daemon``.""" 

315 competing = competing_processes(rundirstoupdate, eventidfilter) 

316 if competing: 

317 LOGGER.error("Could not start this ``llama run`` process because " 

318 "another currently running ``llama run`` process " 

319 "seems to be monitoring the same event directories.") 

320 for evdirs, filt in competing: 

321 LOGGER.error("Competing rundirs and (simplified) " 

322 "eventidfilters: %s, %s", evdirs, filt) 

323 try: 

324 with open(pidfile(lockdir(evdirs, filt))) as pidf: 

325 competing_pid = int(pidf.read()) 

326 LOGGER.error("PID of competing process: %d", competing_pid) 

327 except OSError: 

328 LOGGER.error("Could not get PID of competing process.") 

329 LOGGER.error("EXITING.") 

330 sys.exit(2) 

331 return check_and_exit 

332 

333 

334def main(): 

335 """Run the LLAMA update loop.""" 

336 parser = get_parser() 

337 args = parser.parse_args() 

338 eventidfilter = args.run[0].downselection[0]['eventid_filter'] 

339 assert args.run # this is a list of runs; should be non-empty 

340 LOGGER.info("CLI args: %s", sys.argv) 

341 

342 if args.kill or args.killfirst: 

343 kill_matching([r.rundir for r in args.run], eventidfilter) 

344 if args.kill: 

345 sys.exit() 

346 

347 if not args.foreground: 

348 if os.name == 'posix': 

349 # the daemon branch will pick up from this point 

350 safe_launch_daemon(lockdir([r.rundir for r in args.run], 

351 eventidfilter)) 

352 else: 

353 LOGGER.error( 

354 COL.RED + "NOTE: ``llama run`` can only be run in the " 

355 "background on POSIX systems. Please use ``--foreground`` to " 

356 "explicitly run in the foreground. Running in foreground " 

357 "now..." + COL.CLEAR 

358 ) 

359 exit_if_competing([r.rundir for r in args.run], eventidfilter) 

360 

361 if args.relaunch: 

362 LOGGER.info("Relaunch flag set in args: %s", sys.argv) 

363 for relaunch_flag in RELAUNCH_FLAGS: 

364 while relaunch_flag in sys.argv: 

365 sys.argv.remove(relaunch_flag) 

366 not_done = True 

367 child_pid = None 

368 

369 @atexit.register 

370 def kill_child(): 

371 """Kill subprocesses before exiting.""" 

372 if child_pid is not None: 

373 LOGGER.info("Relauncher terminating; killing child NOW.") 

374 try: 

375 os.kill(child_pid, SIGTERM) 

376 except OSError: 

377 LOGGER.error("Could not kill child with PID %s", child_pid) 

378 

379 while not_done: 

380 LOGGER.info("Launching llama in restart loop with args: %s", 

381 sys.argv) 

382 proc = subprocess.Popen(sys.argv) 

383 child_pid = proc.pid 

384 proc.communicate() 

385 if proc.returncode: 

386 LOGGER.error("``llama run`` failed with args: %s", sys.argv) 

387 LOGGER.error("Relaunching in restart loop.") 

388 else: 

389 not_done = False 

390 sys.exit() 

391 

392 # keep checking whether more files can be generated in each run directory 

393 LOGGER.info("Running on the following run directories:" + 

394 "\n- "+"\n- ".join(r.rundir for r in args.run) 

395 if args.run else "") 

396 for run in args.run: 

397 LOGGER.info(" %s", run.rundir) 

398 LOGGER.info("eventidfilter: %s", eventidfilter) 

399 while True: 

400 for run in args.run: 

401 log_exceptions_and_recover()(run.update)() 

402 # if we only want to run this once, just exit after updating once 

403 if args.runonce: 

404 sys.exit() 

405 # wait before retrying 

406 time.sleep(SLEEP_WAIT) 

407 

408 

409if __name__ == "__main__": 

410 main()