Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# (c) Stefan Countryman, 2019
3"""
4Execute actions across a bunch of DigitalOcean servers based on their names.
5Useful for background and sensitivity runs. Executes a shell command on each
6server using SSH. You can either specify shell scripts to run (in
7non-interactive mode, as if you were calling "ssh foo@bar 'my shell command'")
8or can provide your own shell command as an argument to the script.
9"""
11import os
12import re
13import functools
14from math import inf
15from argparse import ArgumentParser
16from copy import deepcopy
17from fnmatch import fnmatch
18from textwrap import indent, wrap
19from time import time, sleep
20from shlex import quote, split as shplit
21from textwrap import dedent
22from subprocess import Popen, PIPE
23from concurrent.futures import ThreadPoolExecutor, as_completed
24from digitalocean import Manager
26DEFAULT_FMT = 'On droplet {name} at {ip} -- Retval={retval} Stdout: {stdout}'
27DEFAULT_CONNECTIONS = 50
28DEFAULT_GLOB = "llama-*"
29POLL_INTERVAL = 0.1
30EPILOG = """
31 New commands can be added as bash scripts to ``llama.dev.dv.SCRIPTS`` (keep
32 in mind that these run in a non-interactive SSH session, so you will need
33 to initialize your environment accordingly).
35 These scripts can optionally specify default
36 options (like "--format" below) by putting those arguments in a variable on
37 a new line called "DROPVEC", e.g. DROPVEC="--timeout 5 --connections 2".
39 A short description of the command can be specified in a variable on a new
40 line called HELP, e.g. HELP="check running servers".
42 Functions to run before and after the vectorized commands ("SETUP" and
43 "CLEANUP" commands, respectively) can be defined using those variables in
44 the command scripts, similarly to DROPVEC. For example, to make a directory
45 called "foo" before any of the vectorized commands run, put SETUP="mkdir
46 foo" somewhere in the script. To remove that directory afterwards, put
47 CLEANUP="rm -r foo" somewhere in the script. You can specify multiple SETUP
48 and CLEANUP commands this way; they will be run sequentially as
49 encountered. Environmental variable expansion will be performed on "SETUP"
50 and "CLEANUP" commands in the local environment; specify environmental
51 variables using python format strings. The number of droplets being run on
52 is available in SETUP and CLEANUP scripts as DROPNUM.
53"""
54SCRIPTS = {
55 "destroy": """
56 #!/bin/bash
57 HELP="Delete all output files in the ouptut directory for the specified run type. Will not prompt for confirmation, so think twice before deleting all your work (i.e. only run this if you've already pulled results or if you made some mistake that you want to flush).' Specify injection type to delete as 'signal', 'doublet', or 'bg'."
58 DROPVEC="--format 'Deleted output products on {name} at {ip} -- Retval={retval}'"
59 ROOTDIR='/home/vagrant/pre-er14-background'
61 [ $# -ne 1 ] && set -- __invalid_choice
62 case "$1" in
63 bg)
64 TARGET=events;;
65 signal)
66 TARGET=signal-events;;
67 doublet)
68 TARGET=signal-doublet-events;;
69 *)
70 echo "Must specify bg, signal, or doublet."
71 exit 1
72 esac
73 rm -rf "$ROOTDIR"/"$TARGET"
74 mkdir -p "$ROOTDIR"/"$TARGET"
75 """,
76 "done": """
77 #!/bin/bash
78 HELP="See how many events have been processed so far for the specified injection type. Options are 'signal', 'doublet', or 'bg'."
79 DROPVEC="done bg --reduce sum --format 'Droplet {name} at {ip} finished analyses: {stdout}'"
80 ROOTDIR=/home/vagrant/pre-er14-background
81 OUTFILE=significance_lvc-i3.json
83 [ $# -ne 1 ] && set -- __invalid_choice
84 case "$1" in
85 bg)
86 TARGET=events;;
87 signal)
88 TARGET=signal-events;;
89 doublet)
90 TARGET=signal-doublet-events;;
91 *)
92 echo "Must specify bg, signal, or doublet."
93 exit 1
94 esac
95 find "$ROOTDIR"/"$TARGET" -name "$OUTFILE" | wc -l
96 """,
97 "eval": """
98 HELP="Evaluate the provided arguments as code on all droplets."
99 eval "$@"
100 """,
101 "forget": """
102 #!/bin/bash
103 HELP="Remove active droplet host keys from ~/.ssh/known_hosts."
104 DROPVEC="--local -n 1 --format 'Removing {name} at {ip} from ~/.ssh/known_hosts. Retval: {retval} Stderr: {stderr}'"
105 SETUP="cp {HOME}/.ssh/known_hosts {HOME}/.ssh/known_hosts.orig"
106 CLEANUP="diff {HOME}/.ssh/known_hosts {HOME}/.ssh/known_hosts.orig"
107 sed -i '' "/$DROPIP/d" "$HOME"/.ssh/known_hosts
108 """,
109 "init": """
110 #!/bin/bash
111 HELP="Add all droplets' host keys to the list of known hosts."
112 DROPVEC="--local --format 'Adding hostkey to {name} at {ip}. Retval: {retval} Stdout: {stdout}'"
113 ssh -o StrictHostKeyChecking=no vagrant@$DROPIP </dev/null "hostname"
114 """,
115 "killall": """
116 #!/bin/bash
117 DROPVEC="--format 'Killed all LLAMA processes on {name} at {ip}'"
118 HELP='Kill all LLAMA `llama run` and event-running processes on all droplets.'
119 ps -ax \
120 | grep -E "llama run|er14|mid-o3" \
121 | sed '/grep/d;/DROPVEC/d' \
122 | awk '{print $1}' \
123 | xargs kill 2>/dev/null || true
124 """,
125 "launch": """
126 #!/bin/bash
127 HELP="Setup droplets and start injection runs. Specify injection type as 'signal', 'doublet', or 'bg' (DEFAULT: bg). Optionally, specify a list of skymaps (on the remote machine) to perform the run on. Only skymaps in this list will be used, which is handy for running on a specific population. You *must* provide injection type if you want to specify a skymap list."
128 DROPVEC="launch bg --prelaunch 'Starting launch on {name} at {ip}...'"
129 LOGDIR='/home/vagrant/pre-er14-background/logs'
130 case "$1" in
131 bg)
132 RUNNER=pre-er14-background-event-runner
133 LOGPREFIX=''
134 ;;
135 signal)
136 RUNNER=pre-er14-signal-event-runner
137 LOGPREFIX=signal-
138 ;;
139 doublet)
140 RUNNER=pre-er14-signal-doublet-event-runner
141 LOGPREFIX=signal-doublet-
142 ;;
143 *)
144 echo "Must specify bg, signal, or doublet."
145 exit 1
146 esac
147 echo "linking /var/www/html to injection event directory..."
148 sudo rm /var/www/html
149 sudo ln -s /home/vagrant/pre-er14-background/events /var/www/html
150 printf "%s" "Creating swapfile..."
151 if [[ -f /swapfile ]]; then
152 echo "/swapfile exists"
153 else
154 sudo fallocate -l 12G /swapfile && echo "created new /swapfile"
155 sudo chmod 0600 /swapfile && echo "set swapfile mode 0600"
156 sudo mkswap /swapfile && echo "mkswap /swapfile"
157 sudo swapon /swapfile
158 fi
159 sudo swapon -s \
160 && echo "turned swapfile on" \
161 || echo "swapfile already on"
162 free -m
163 cd ~/dev/multimessenger-pipeline
164 git fetch
165 git checkout subthresh-integration
166 git pull || true
167 cd
168 echo "in directory: $(pwd)"
169 echo "launching event runner..."
170 nohup >>"${LOGDIR}"/"${LOGPREFIX}"$(date "+%s").log 2>&1 "${RUNNER}" "$2" &
171 """,
172 "batch": """
173 #!/bin/bash
174 HELP="Set up droplets and start llama batch. Specify injection type: 'subthresh' (DEFAULT: subthresh)"
175 DROPVEC="batch subthresh --prelaunch 'STARTING BATCH on {name} at {ip}...'"
176 CODEROOT=/home/llama/dev/multimessenger-pipeline/bin
177 export PATH="/opt/anaconda/bin:$PATH"
178 source /home/llama/creds
179 echo "ENVIRONMENT:"
180 echo
181 env
182 case "$1" in
183 subthresh)
184 RUNNER=$CODEROOT/mid-o3-subthreshold-background-event-runner
185 ;;
186 *)
187 echo "Must specify subthresh"
188 exit 1
189 ;;
190 esac
191 printf "%s" "Creating swapfile..."
192 if [[ -f /swapfile ]]; then
193 echo "/swapfile exists"
194 else
195 sudo fallocate -l 40G /swapfile && echo "created new /swapfile"
196 sudo chmod 0600 /swapfile && echo "set swapfile mode 0600"
197 sudo mkswap /swapfile && echo "mkswap /swapfile"
198 sudo swapon /swapfile
199 fi
200 sudo swapon -s \
201 && echo "turned swapfile on" \
202 || echo "swapfile already on"
203 free -m
204 cd ~/dev/multimessenger-pipeline
205 git fetch
206 git checkout subthresh-integration
207 git pull || true
208 cd
209 echo "in directory: $(pwd)"
210 for i in {1..4}; do
211 echo "launching event runner $i..."
212 nohup >>~/"batch-${i}-"$(date "+%s").log 2>&1 "${RUNNER}" "${i}" &
213 done
214 """,
215 "ls-procs": """
216 #!/bin/bash
217 DROPVEC="--reduce linecount --format 'Droplet {name} at {ip} llama run procs: {stdout}'"
218 HELP='List `llama run` processes on each droplet and print the total count.'
219 ps -ax \
220 | grep -E "llama run|er14|mid-o3" \
221 | sed "/grep/d;s/^/ /"
222 """,
223 "pull": """
224 #!/bin/bash
225 DROPVEC="pull */*/scatterplot_lvc-i3.pdf */*/skymap_info.json */*/significance_lvc-i3.json */*/icecube_neutrino_list.json --prelaunch 'Starting rsync for {name} at {ip}...' --format 'Finished rsyncing {name} at {ip}.' --connections=3 --local"
226 HELP='Run in directory where results should go. Only copies important information and sorts by server name (so make sure your servers have unique names). Add extra rsync include patterns as positional command-line arguments (specified from the root of the analysis directory). File name patterns cannot have spaces in them.'
228 rsync -a \
229 --include='*/*/' \
230 `printf -- "--include=%s " $@` \
231 --exclude='*/*' \
232 --prune-empty-dirs \
233 vagrant@$DROPIP:/home/vagrant/pre-er14-background/ \
234 $DROPNAME
235 """,
236 "push": """
237 #!/usr/bin/env python3
239 import os
240 import sys
241 from datetime import datetime
242 from math import ceil
243 from shutil import rmtree
244 from subprocess import Popen
245 from glob import glob
246 from tempfile import mkdtemp
248 DROPVEC="push signal --connections 3 --local --format 'Split and uploaded for {name} at {ip}'"
249 HELP="Split injection neutrino lists evenly between available droplets and upload them. Neutrino lists should be JSON formatted and located in subdirectories of the current directory. Their names should exactly match the GW skymap filenames they correspond to. Specify injection type as 'signal' or 'doublet' (DEFAULT: signal). Note that bg runs are MC and are not split a priori."
250 DROPNUM = int(os.environ['DROPNUM'])
251 DROPI = int(os.environ['DROPI'])
252 DROPIP = os.environ['DROPIP']
254 if sys.argv[1] == 'signal':
255 REMOTEDIR = '/home/vagrant/pre-er14-background/signal-neutrinos'
256 elif sys.argv[1] == 'doublet':
257 REMOTEDIR = '/home/vagrant/pre-er14-background/signal-doublet-neutrinos'
258 else:
259 print("ERROR unrecognized argument ", sys.argv[1])
260 exit(1)
262 MKDIR_CMD = '''
263 if [[ -d {remotedir} ]]; then
264 echo "Moving {remotedir} to make way for new one"
265 mv {remotedir} {remotedir}.{timestamp}
266 fi
267 mkdir {remotedir}
268 '''.format(remotedir=REMOTEDIR, timestamp=int(datetime.now().timestamp()))
269 FILES = sorted(glob(os.path.join('**', '*.json'), recursive=True))
270 FILES_PER_DIR = ceil(len(FILES)/DROPNUM)
271 TMP = mkdtemp()
273 print("Linking paths to tmpdir: ", TMP)
274 for p in FILES[FILES_PER_DIR*DROPI:FILES_PER_DIR*(DROPI+1)]:
275 d, f = os.path.split(p)
276 dout = os.path.join(TMP, d)
277 if not os.path.isdir(dout):
278 os.makedirs(dout)
279 os.link(p, os.path.join(TMP, p))
281 print("Making remote dir: ", REMOTEDIR)
282 proc = Popen(['ssh', 'vagrant@'+DROPIP, MKDIR_CMD])
283 proc.communicate()
284 if not proc.returncode:
285 cmd = ['rsync', '-a', os.path.join(TMP, ''),
286 'vagrant@{}:{}'.format(DROPIP, REMOTEDIR)]
287 print("Trying to rsync: ", cmd)
288 proc = Popen(cmd)
289 proc.communicate()
290 if not proc.returncode:
291 print("Succeeded rsyncing files to remote, unlinking local tmpdir.")
292 rmtree(TMP)
293 else:
294 print("Could not rsync, local files still in ", TMP)
295 exit(1)
296 else:
297 print("Could not make remote directory.")
298 exit(1)
299 """,
300 "tail": """
301 #!/bin/bash
302 HELP="Tail 30 lines of most recent log on each droplet. Specify injection type with 'subthresh', 'signal', 'doublet', or 'bg' as first argument (DEFAULT: bg)."
303 LOGDIR=pre-er14-background/logs
304 case "${1:-bg}" in
305 subthresh)
306 PATTERN='batch-[0-9]-'
307 LOGDIR=~/
308 ;;
309 signal)
310 PATTERN='signal-'
311 ;;
312 doublet)
313 PATTERN='signal-doublet-'
314 ;;
315 bg)
316 PATTERN=''
317 ;;
318 *)
319 echo "Invalid log choice $1, exiting."
320 ;;
321 esac
322 LOGFILE="$( \
323 find "$LOGDIR" -name "${PATTERN}[0-9]*.log" \
324 | sort \
325 | tail -1 \
326 )"
327 stat "${LOGFILE}"
328 tail -30 "${LOGFILE}"
329 echo CURRENT TIME: $(date)
330 """,
331}
333for name, script in SCRIPTS.items():
334 SCRIPTS[name] = dedent(script)
337REDUCERS = {
338 'sum': lambda outs: print("SUM: {}".format(sum(int(o.strip() or 0) for o in
339 outs))),
340 'linecount': lambda outs: print("LINES: ", len(
341 sum([o.strip().split('\n') for o in outs if o], []))),
342}
345def argset_string(args):
346 """Set a bunch of arguments as if they were passed through the command line
347 of a bash script."""
348 return "set -- " + " ".join(quote(a) for a in args)
351class Command():
352 """A command that can be used on many Droplets. Can have arguments passed
353 to it."""
355 def __init__(self, commandname, local, args):
356 """Specify the name of the command and the arguments to use.
357 servers.
359 Parameters
360 ----------
361 commandname : str
362 The name of the bash script to run. Scripts are defined as bash
363 scripts in ``llama.dev.dv``.
364 local : bool
365 Whether this command is meant to run LOCALLY (in which case it will
366 run once for each droplet, with the droplet names set as
367 DROPNAME and the IP addresses set as DROPIP environmental
368 variables). If ``False``, execute the command remotely; remote
369 commands must be bash scripts.
370 args : list
371 A list of command-line arguments to feed to the ``cmd`` script as
372 positional variables. For remote commands (``local=False`` above),
373 this is accomplished with a ``set -- arg1 ... argN`` line
374 before the rest of the script specified by ``cmd`` is executed.
376 Raises
377 ------
378 KeyError
379 If ``commandname`` is not a script in ``llama.dev.dv.SCRIPTS``
380 """
381 self.name = commandname
382 self.local = local
383 self._raw = SCRIPTS[commandname]
384 self.args = args
386 def cmdline(self, ip):
387 """The command line arguments in a list format (as expected by the
388 first argument of ``subprocess.Popen``)."""
389 if not self.local:
390 return ["ssh", "llama@"+ip,
391 argset_string(self.args) + "\n" + self._raw]
392 return ['bash', '-c', self._raw, '--'] + list(self.args)
394 @property
395 def default_str(self):
396 """Get the raw defaults spec string for this command from the original
397 script (parsed into ``argparse`` arguments using ``Command.defaults``).
398 """
399 match = re.findall('DROPVEC=(.*)', self._raw)
400 return shplit(match[0])[0] if match else ""
402 @property
403 def defaults(self):
404 """Get the default argparse args for this command. Specified as
405 'DROPVEC=...' anywhere in the file (including in a comment, which is
406 useful for non-bash scripts with different variable setting syntax)."""
407 default_str = self.default_str
408 defaults = shplit(default_str) if default_str else []
409 # print("Defaults: ", defaults)
410 default_flags = [f.replace(r'\n', '\n') for f in defaults]
411 # print("Default flags: ", default_flags)
412 # add in 'eval' flag as required positional argument
413 return get_parser().parse_args(default_flags)
415 def run(self, droplet_name, ip, timeout, i, num_drops):
416 """Run a command synchronously using ``subprocess.Popen``.
418 Parameters
419 ----------
420 droplet_name : str
421 The name of the droplet to run on.
422 ip : str
423 The IP address of the droplet to run on.
424 timeout : float
425 The maximum execution time in seconds, after which the process will
426 be killed.
427 i : int
428 The number of the droplet in the full list of droplets.
429 num_drops : int
430 The total number of droplets run.
432 Returns
433 -------
434 stdout : str
435 STDOUT from the process.
436 stderr : str
437 STDERR from the process.
438 retval : str
439 The return value of the process.
440 """
441 env = deepcopy(os.environ)
442 if self.local:
443 env['DROPIP'] = ip
444 env['DROPNAME'] = droplet_name
445 env['DROPI'] = str(i)
446 env['DROPNUM'] = str(num_drops)
447 proc = Popen(self.cmdline(ip), env=env, stdout=PIPE, stderr=PIPE)
448 start = time()
449 while proc.poll() is None and time() - start < timeout:
450 sleep(POLL_INTERVAL)
451 proc.kill()
452 out, err = proc.communicate()
453 return out.decode(), err.decode(), proc.returncode
455 @property
456 def help(self):
457 """Return the help string for this command. Specified as 'HELP=...'
458 anywhere in the file (including in a comment, which is useful for
459 non-bash scripts with different variable setting syntax)."""
460 match = re.findall('HELP=(.*)', self._raw)
461 if match:
462 return shplit(match[0])[0]
463 return ""
465 def _runvar(self, varname, env):
466 """Run a command saved as a ``bash`` variable in this ``Command``
467 script, where the command is pulled out of the given ``varname`` (e.g.
468 "SETUP" for a setup command defined as ``SETUP="mkdir foo"``. Parse it,
469 expand user home directories, and substitute local environmental
470 variables (specified as a **python** format string) into it.
472 Raises
473 ------
474 """
475 matches = re.findall(varname+'=(.*)', self._raw)
476 for match in matches:
477 # print("Match: ", match)
478 matchstr = shplit(match)[0].format(**os.environ)
479 print("Running {} command: {}".format(varname, matchstr))
480 proc = Popen(shplit(matchstr), env=env)
481 proc.communicate()
482 print("{} command returned: {}".format(varname, proc.returncode))
484 def setup(self, env):
485 """Run a setup command BEFORE the parallel processes run for each
486 server. Specified as a standard ``bash`` command in 'SETUP=...'
487 anywhere in the file (including in a comment, which is useful for
488 non-bash scripts with different variable setting syntax). This command
489 is *ALWAYS* run once locally, regardless of how the vectorized commands
490 run. STDOUT and STDERR are not piped."""
491 self._runvar("SETUP", env)
493 def cleanup(self, env):
494 """Run a cleanup command AFTER the parallel processes run for each
495 server. Specified as a standard ``bash`` command in 'CLEANUP=...'
496 anywhere in the file (including in a comment, which is useful for
497 non-bash scripts with different variable setting syntax). This command
498 is *ALWAYS* run once locally, regardless of how the vectorized commands
499 run. STDOUT and STDERR are not piped."""
500 self._runvar("CLEANUP", env)
503def scripts():
504 """Print a list of available scripts."""
505 return list(SCRIPTS)
508def print_descriptions():
509 """Print descriptions of the available commands."""
510 print("AVAILABLE COMMANDS (use --defaults to show default CLI flags):")
511 for cmdname in scripts():
512 print('- ' + cmdname)
513 cmd = Command(cmdname, True, []) # the value of local doesn't matter
514 helpstr = indent('\n'.join(wrap(cmd.help)), " "*4)
515 if helpstr:
516 print(helpstr)
519def print_defaults():
520 """Print default values of command line arguments for ``dropvec`` for a
521 given command."""
522 print("DEFAULT FLAGS FOR AVAILABLE COMMANDS:")
523 for cmdname in scripts():
524 print('- ' + cmdname)
525 cmd = Command(cmdname, True, [])
526 defaults = indent('\n'.join(wrap(str(cmd.defaults))), " "*4)
527 if defaults:
528 print(defaults)
531def get_droplets(pattern):
532 """Get droplets whose names match the fnmatch pattern ``pattern``."""
533 from llama.com.do import TOKEN, NO_TOKEN_WARNING
534 if TOKEN is None:
535 raise IOError(NO_TOKEN_WARNING)
536 return [d for d in Manager(token=TOKEN).get_all_droplets()
537 if fnmatch(d.name, pattern)]
540def get_parser():
541 """Get an ArgumentParser with this tool's command line interface."""
542 parser = ArgumentParser(description=__doc__, epilog=EPILOG)
543 parser.add_argument("cmd", nargs="?",
544 choices=scripts(), help="""
545 The command to run remotely.""")
546 parser.add_argument("args", nargs="*", help="""
547 Optional additional arguments to feed to the script specified in
548 ``cmd``. These arguments will be set using ``set -- arg1 arg2 ...
549 argN`` before the script specified by ``cmd`` is evaluated, effectively
550 making it as if these arguments were passed as ``$1`` through ``$N`` to
551 that shell script.""")
552 parser.add_argument("-d", "--descriptions", action="store_true", help="""
553 Print names and descriptions of available commands (see "cmd"
554 above).""")
555 parser.add_argument("-D", "--defaults", action="store_true", help="""
556 Print default command line flags for each available command.""")
557 parser.add_argument("-p", "--pattern", default=DEFAULT_GLOB, help=f"""
558 An fnmatch-style pattern that droplet names must match. Droplets whose
559 names do not match this pattern will not be included. (default:
560 {DEFAULT_GLOB})""")
561 parser.add_argument("-l", "--local", action="store_true", help="""
562 Run the given command locally instead of on the remote droplet. This is
563 useful for e.g. rsync commands (and anything that is meant to interact
564 with each droplet but which must run on the local machine). These
565 scripts will have the DROPNAME and DROPIP variables set to each
566 droplet's name and IP address respectively.""")
567 parser.add_argument("-f", "--format", default=DEFAULT_FMT, help="""
568 The format string to use to describe what happened for each command.
569 Should be a typical python format string with "name" referring to the
570 droplet name, "ip" referring to its IP address, "stdout" referring to
571 the standard output of the command, "stderr" the standard error, and
572 "retval" referring to the return code of the SSH call.""")
573 parser.add_argument("-P", "--prelaunch", default='', help="""
574 A python format string to print for each droplet BEFORE launch (for
575 long-running processes). Specify the droplet name and IP as in
576 ``--format`` (stdout, stderr, and retval are obviously not available
577 before the thread starts).""")
578 parser.add_argument("-n", "--connections", type=int,
579 default=DEFAULT_CONNECTIONS, help="""
580 The max number of simultaneous connections (or processes) to make. For
581 actions involving data transfer, set this to something low (probably
582 1). DEFAULT: {}""".format(DEFAULT_CONNECTIONS))
583 parser.add_argument("-t", "--timeout", default=inf, type=float, help="""
584 Max execution time on each droplet in s. No timeout by default.""")
585 parser.add_argument("-r", "--reduce", choices=tuple(REDUCERS), help="""
586 Specify a function that reduces the STDOUT values, e.g. by summing
587 them.""")
588 return parser
591def argparse_to_dict(args):
592 """Get a dictionary representing an ``argparse.Namespace``."""
593 return {a: getattr(args, a) for a in dir(args) if not a.startswith('_')}
596def apply_default_cli_args(args):
597 """Modify the command line arguments with the args provided in ``args``.
598 Returns the provided command line arguments modified using defaults from
599 ``get_parser`` when neither CLI args nor ``args`` overrides them."""
600 parser = get_parser()
601 cli = parser.parse_args()
602 # must provide a ``cmd`` argument
603 default_dict = argparse_to_dict(parser.parse_args([]))
604 cli_dict = argparse_to_dict(cli)
605 # print("Default dict:", default_dict)
606 # print("CLI dict: ", cli_dict)
607 for name in default_dict:
608 # update with actual CLI args (if nondefault), possibly overriding the
609 # given arguments
610 if cli_dict[name] != default_dict[name]:
611 setattr(args, name, cli_dict[name])
612 # print("Final args: ", args)
613 return args
616def run_workers(cmd, droplets, fmt, prelaunch, connections, timeout):
617 """Run the given bash commmand string on the specified droplets, printing
618 the given ``fmt`` string after each one completes, using up to
619 ``connections`` concurrent connections while spending up to ``timeout``
620 seconds on each connection."""
621 outs = []
623 def announce_and_run(msg):
624 """Return a function wrapping ``cmd`` from this function's local scope
625 that simply prints ``msg`` before running (to announce that a thread
626 has started). Does not print anything if the ``msg`` is empty."""
627 runner = cmd.run
629 @functools.wraps(runner)
630 def announcer(*args):
631 if msg:
632 print(msg)
633 return runner(*args)
635 return announcer
637 num_drops = len(droplets)
638 with ThreadPoolExecutor(max_workers=connections) as executor:
639 futures_to_droplets = {
640 executor.submit(
641 announce_and_run(
642 prelaunch.format(
643 name=d.name,
644 ip=d.ip_address
645 )
646 ),
647 d.name,
648 d.ip_address,
649 timeout,
650 i,
651 num_drops,
652 ): d for i, d in enumerate(droplets)
653 }
654 for future in as_completed(futures_to_droplets):
655 drop = futures_to_droplets[future]
656 # out, err, retval = future.result()
657 try:
658 out, err, retval = future.result()
659 except Exception as exc:
660 out, err, retval = format(exc), format(exc), '-1'
661 print(fmt.format(name=drop.name, ip=drop.ip_address,
662 stdout=out.strip() if out is not None else '',
663 stderr=err.strip() if out is not None else '',
664 retval=retval))
665 outs.append(out)
666 return outs