Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
2# (c) Stefan Countryman 2016-2019
3# pylint: disable=C0325
5"""
6The main interface for running the LLAMA pipeline. Turns on the automated
7pipeline, either in the foreground (for immediate work) or in the background
8(launching a long-running daemon process).
10The pipeline checks for new LLAMA events and event data and
11(re)fetches/(re)generates data files for each event as new data becomes
12available. Provides a very flexible and powerful command line interface for
13running LLAMA.
14"""
16import traceback
17import subprocess
18from signal import SIGTERM
19import atexit
20import time
21from argparse import RawDescriptionHelpFormatter, Namespace
22import logging
23import sys
24import os
25from hashlib import sha1
26from glob import glob
27from urllib.parse import quote_plus, unquote_plus # python3
28from psutil import Process
29import llama
30from llama.cli import (
31 log_exceptions_and_recover,
32 safe_launch_daemon,
33 pidfile,
34 printprocs,
35 print_running_procs_action,
36 get_logging_cli,
37 Parsers,
38 CliParser,
39)
40from llama.run import (
41 Run,
42 Parsers as RunParsers,
43)
44from llama.event import Event
45from llama.utils import COLOR as COL, CACHEDIR
46from llama.classes import ImmutableDict
48RELAUNCH_FLAGS = ('-r', '--relaunch')
49CMD_NICKNAMES = ImmutableDict({('llama', 'run'): 'llama run',
50 ('llama.run',): 'llama run'})
51LLAMAD_RUNDIR = os.path.join(CACHEDIR, 'llamad.run.d')
52if not os.path.isdir(LLAMAD_RUNDIR):
53 os.makedirs(LLAMAD_RUNDIR)
54LOGFILE = os.path.join(llama.LOGDIR, 'llamad.log')
55EPILOG = """EXAMPLES
56--------
58*Note that by default,* ``llama run`` *will start a background process and exit
59from the foreground, allowing you to keep working or disconnect your SSH
60session while* ``llama run`` *progresses. Override this behaviour with* ``-F``.
62Continually monitor and update the default run directory
64.. code::
66 llama run
68Continually monitor and update a temporary test directory but NOT the default
69run directory
71.. code::
73 llama run /tmp/llamatest
75Keep monitoring only the current directory
77.. code::
79 llama run .
81Make a single attempt to generate ``IceCubeNeutrinoList`` in the current
82directory (if its input files exist) and then quit:
84.. code::
86 llama run . -o -f IceCubeNeutrinoList
88Same as above, but generate any missing ``IceCubeNeutrinoList`` inputs if
89possible:
91.. code::
93 llama run . -o +f IceCubeNeutrinoList
95Same as above, but instead of spinning up a background process, keep control of
96the terminal and print all logging output until the job is done:
98.. code::
100 llama run . -o +f IceCubeNeutrinoList -F
102"""
103SLEEP_WAIT = 2
104LOGGER = logging.getLogger(__name__)
107def postprocess_updater_downselections(_self: CliParser, namespace: Namespace):
108 """Update ``namespace.run`` ``Run`` instances to sort themselves in
109 descending order of event creation time (newest first) with a maximum of
110 ``namespace.num_updated`` runs returned."""
111 namespace.run = [r.downselect(sortkey=Event.modification_time,
112 reverse=True, limit=namespace.num_updated)
113 for r in namespace.run]
116def get_parser():
117 """Get CLI parser."""
118 parser = CliParser(
119 prog="llama run",
120 description=__doc__,
121 epilog=RunParsers.__doc__ + '\n\n' + EPILOG,
122 prefix_chars="-+",
123 parents=(
124 Parsers.dev_mode,
125 Parsers.erralert,
126 RunParsers(downselect="manual=False").pipeline_and_eventfiltering,
127 get_logging_cli(LOGFILE, 'info'),
128 ),
129 formatter_class=RawDescriptionHelpFormatter,
130 )
131 parser.POSTPROCESSORS += (postprocess_updater_downselections,)
132 arg = parser.add_argument
133 arg('-o', '--runonce', action='store_true', help="""
134 If specified, each event directory will only be updated once, and then
135 the script will exit.""")
136 arg('-n', '--num-updated', default=40, type=int, help="""
137 Number of event directories to keep updated. Older event directories
138 will be skipped and not updated if they do not fall within the top
139 ``num_updated``.""")
140 arg('-F', '--foreground', action='store_true', help="""
141 Run ``llama run`` in the foreground instead of immediately sending it
142 to the background as a daemon process. If running in the foreground,
143 logs are printed to STDERR (in addition to the global archival logfile,
144 see ``--logfile``); if running in the background (i.e. omitting this
145 flag), logs are appended to a logfile specific to this set of event
146 directories (again, in addition to the archival logfile). This
147 CLI-argument-specific logfile goes in the same log directory as the
148 default ``--logfile`` option but has a filename specific to this
149 ``llama run`` invocation; the name is the same as the ``lockdir`` name
150 (see: ``lockdir``), but with ``.log`` appended.""")
151 arg('-R', '--running-daemons', nargs=0,
152 action=print_running_procs_action(LLAMAD_RUNDIR,
153 command_nicknames=CMD_NICKNAMES),
154 help=f"""
155 Print the currently-running llama daemons as recorded in the
156 ``llama run`` lockfile directory, {LLAMAD_RUNDIR}. Note that this will
157 not catch ``llama run`` processes whose lock files have somehow
158 been deleted from this directory. Run ``ps -ax | grep 'llama run'`` if
159 you're paranoid about missing anything.""")
160 arg('-k', '--kill', action='store_true', help="""
161 Kill any llama processes covering potentially competing ``run`` and
162 ``eventidfilter`` values (as decided by conservative comparison
163 algorithms) by finding their PIDs in their semaphore lock directories
164 and killing the associated processes (if they are still running). You
165 can run this if a ``llama run`` instance fails to launch due to
166 competing instances in order to kill them safely.""")
167 arg('-K', '--killfirst', action='store_true', help="""
168 Like ``--kill``, but continues execution after killing. In other words,
169 start a new process but kill competition first.""")
170 arg(*RELAUNCH_FLAGS, action='store_true', help="""
171 If ``relaunch`` is specified, then wrap ``llama run`` as a subprocess
172 and keep relaunching it if it fails; if it exits gracefully (returncode
173 0), it will not be relaunched. All command line arguments (besides this
174 one) will be passed to subprocess ``llama run`` invocations.""")
175 return parser
178def lockdir(rundirstoupdate, eventidfilter):
179 """Define a lock directory name/path for this process. This depends partly
180 on the arguments (since we don't want multiple ``llama run`` processes
181 working on
182 the same event directory)."""
183 quoted_filter = quote_plus(simplify_glob(eventidfilter))
184 if not os.path.isdir(LLAMAD_RUNDIR):
185 LOGGER.info("``llama run`` lockfile dir not found; creating")
186 os.makedirs(LLAMAD_RUNDIR)
187 return os.path.join(LLAMAD_RUNDIR,
188 '{}+{}'.format(quote_rundirs(rundirstoupdate),
189 quoted_filter))
192def quote_rundirs(rundirstoupdate):
193 """Quote a list of run directories in a unique, filesystem-friendly way
194 for use in filenames and comparisons. If the result is long enough to risk
195 hitting the common filesystem name length limit of 255 bytes when combined
196 with a quoted eventidfilter (i.e. if it is longer than 120 characters, a
197 conservative choice), return a sha1 hash of that string instead."""
198 dirstr = ','.join(sorted(quote_plus(os.path.realpath(d))
199 for d in rundirstoupdate))
200 LOGGER.debug("dirstr: %s", dirstr)
201 if len(dirstr) > 120:
202 # handle python2 and python3 by encoding to ascii/bytes
203 dirstr = sha1(dirstr.encode()).hexdigest()
204 LOGGER.debug("len(dirstr) > 120, taking hash: %s", dirstr)
205 return dirstr
208def simplify_glob(glb):
209 """Simplify a glob to make it more general and easier to compare to other
210 globs using our simplified glob checking."""
211 glb = glb.replace('?', '*').replace('[', '*').replace(']', '*')
212 if '*' not in glb:
213 return glb
214 split_glb = glb.split('*')
215 return '*'.join((split_glb[0], split_glb[-1]))
218def kill_matching(rundirstoupdate, eventidfilter):
219 """Kill competing ``llama run`` processes."""
220 kill_list = ([(rundirstoupdate, eventidfilter)] +
221 competing_processes(rundirstoupdate, eventidfilter))
222 pids = []
223 locks = []
224 for rundirs, filt in kill_list:
225 lockd = lockdir(rundirs, filt)
226 pidpath = pidfile(lockd)
227 if os.path.isfile(pidpath):
228 with open(pidpath) as pidf:
229 pid = int(pidf.read())
230 pids.append(pid)
231 locks.append((lockd, pidpath))
232 else:
233 print("No pidfile found for ", rundirs, " with filter ", filt,
234 ", removing lockdir and continuing...")
235 os.rmdir(lockd)
236 print("TERMINATING THESE PROCESSES NOW (with SIGTERM):")
237 printprocs(pids, command_nicknames=CMD_NICKNAMES)
238 for proc in pids:
239 Process(pid).terminate()
240 print("Done killing. Cleaning up pidfiles and lockdirs...")
241 for lockd, pidpath in locks:
242 if os.path.isfile(pidpath):
243 print(f"RM {pidpath}")
244 os.unlink(pidpath)
245 if os.path.isdir(lockd):
246 print(f"RMDIR {lockd}")
247 os.rmdir(lockd)
248 print("Done. If you had auto-restart (enabled by default), you")
249 print("might need to run the last command a couple more times to make")
250 print("sure the ``llama run`` processes you wanted to kill are dead.")
251 print("Try this a few times to give the ``llama run`` processes a chance")
252 print("to gracefully exit before you forcibly kill them.")
255def globs_intersect(glob1, glob2):
256 r"""Conservatively check whether two UNIX filepath globs (compatible with
257 the python ``glob`` module) might intersect. Returns True if they might
258 describe the same filepaths, False if they certainly cannot describe the
259 same filepaths. Assumes that BOTH globs are strictly of the form
260 [^\*]*\*[^\*]* (using ``sed`` regex), i.e. at most one wildcard appears in
261 the glob and no wildcards besides '*' can be used."""
262 # if there are no wildcards, simply see if the filters match
263 if '*' not in glob1 and '*' not in glob2:
264 return glob1 == glob2
265 # if both have wildcards, see if they could overlap based on how the
266 # eventidfilters start and end
267 if '*' in glob1 and '*' in glob2:
268 # the shorter substrings before and after the wildcard are necessarily
269 # less restrictive; if the longer substrings contain them, then the
270 # shorter strings + the wildcard will also be able to match the longer
271 # substrings, allowing for overlap in the filenames described by both
272 # globs.
273 start1, end1 = glob1.split('*')
274 start2, end2 = glob2.split('*')
275 shortstart, longstart = sorted([start1, start2], key=len)
276 if not longstart.startswith(shortstart):
277 return False
278 shortend, longend = sorted([end1, end2], key=len)
279 if not longend.endswith(shortend):
280 return False
281 return True
282 # set it so that glob2 is the one with the wildcard
283 if '*' in glob1:
284 glob1, glob2 = glob2, glob1
285 # now see whether glob1, which has no wildcard, can be described by glob2
286 start2, end2 = glob2.split('*')
287 if not glob1.startswith(start2):
288 return False
289 # remove the pre-wildcard chunk of glob2 from glob1 to see if it ends with
290 # the post-wildcard chunk of glob2.
291 glob1 = glob1.replace(start2, '', 1)
292 return glob1.endswith(end2)
295def competing_processes(rundirstoupdate, eventidfilter):
296 """Get a list of (rundirstoupdate, eventidfilters) for running ``llama
297 run`` that are potentially monitoring the same events."""
298 this_proc = lockdir(rundirstoupdate, eventidfilter)
299 glb = [d for d in glob(os.path.join(LLAMAD_RUNDIR, '*'))
300 if d != this_proc]
301 others = [({unquote_plus(e) for e in evsdirs.split(',')}, unquote_plus(f))
302 for evsdirs, f in (os.path.basename(d).split('+') for d in glb)]
303 return [(e, f) for e, f in others
304 if (e.intersection(rundirstoupdate) or
305 globs_intersect(eventidfilter, f))]
308def exit_if_competing(rundirstoupdate, eventidfilter):
309 """Return a no-argument function that checks whether any other running
310 ``llama run`` processes are potentially looking at the same run directories
311 and exits if so."""
313 def check_and_exit():
314 """Pass as a ``post_fork`` hook to ``safe_launch_daemon``."""
315 competing = competing_processes(rundirstoupdate, eventidfilter)
316 if competing:
317 LOGGER.error("Could not start this ``llama run`` process because "
318 "another currently running ``llama run`` process "
319 "seems to be monitoring the same event directories.")
320 for evdirs, filt in competing:
321 LOGGER.error("Competing rundirs and (simplified) "
322 "eventidfilters: %s, %s", evdirs, filt)
323 try:
324 with open(pidfile(lockdir(evdirs, filt))) as pidf:
325 competing_pid = int(pidf.read())
326 LOGGER.error("PID of competing process: %d", competing_pid)
327 except OSError:
328 LOGGER.error("Could not get PID of competing process.")
329 LOGGER.error("EXITING.")
330 sys.exit(2)
331 return check_and_exit
334def main():
335 """Run the LLAMA update loop."""
336 parser = get_parser()
337 args = parser.parse_args()
338 eventidfilter = args.run[0].downselection[0]['eventid_filter']
339 assert args.run # this is a list of runs; should be non-empty
340 LOGGER.info("CLI args: %s", sys.argv)
342 if args.kill or args.killfirst:
343 kill_matching([r.rundir for r in args.run], eventidfilter)
344 if args.kill:
345 sys.exit()
347 if not args.foreground:
348 if os.name == 'posix':
349 # the daemon branch will pick up from this point
350 safe_launch_daemon(lockdir([r.rundir for r in args.run],
351 eventidfilter))
352 else:
353 LOGGER.error(
354 COL.RED + "NOTE: ``llama run`` can only be run in the "
355 "background on POSIX systems. Please use ``--foreground`` to "
356 "explicitly run in the foreground. Running in foreground "
357 "now..." + COL.CLEAR
358 )
359 exit_if_competing([r.rundir for r in args.run], eventidfilter)
361 if args.relaunch:
362 LOGGER.info("Relaunch flag set in args: %s", sys.argv)
363 for relaunch_flag in RELAUNCH_FLAGS:
364 while relaunch_flag in sys.argv:
365 sys.argv.remove(relaunch_flag)
366 not_done = True
367 child_pid = None
369 @atexit.register
370 def kill_child():
371 """Kill subprocesses before exiting."""
372 if child_pid is not None:
373 LOGGER.info("Relauncher terminating; killing child NOW.")
374 try:
375 os.kill(child_pid, SIGTERM)
376 except OSError:
377 LOGGER.error("Could not kill child with PID %s", child_pid)
379 while not_done:
380 LOGGER.info("Launching llama in restart loop with args: %s",
381 sys.argv)
382 proc = subprocess.Popen(sys.argv)
383 child_pid = proc.pid
384 proc.communicate()
385 if proc.returncode:
386 LOGGER.error("``llama run`` failed with args: %s", sys.argv)
387 LOGGER.error("Relaunching in restart loop.")
388 else:
389 not_done = False
390 sys.exit()
392 # keep checking whether more files can be generated in each run directory
393 LOGGER.info("Running on the following run directories:" +
394 "\n- "+"\n- ".join(r.rundir for r in args.run)
395 if args.run else "")
396 for run in args.run:
397 LOGGER.info(" %s", run.rundir)
398 LOGGER.info("eventidfilter: %s", eventidfilter)
399 while True:
400 for run in args.run:
401 log_exceptions_and_recover()(run.update)()
402 # if we only want to run this once, just exit after updating once
403 if args.runonce:
404 sys.exit()
405 # wait before retrying
406 time.sleep(SLEEP_WAIT)
409if __name__ == "__main__":
410 main()