Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# (c) Stefan Countryman, 2019 

2 

3""" 

4Execute actions across a bunch of DigitalOcean servers based on their names. 

5Useful for background and sensitivity runs. Executes a shell command on each 

6server using SSH. You can either specify shell scripts to run (in 

7non-interactive mode, as if you were calling "ssh foo@bar 'my shell command'") 

8or can provide your own shell command as an argument to the script. 

9""" 

10 

11import os 

12import re 

13import functools 

14from math import inf 

15from argparse import ArgumentParser 

16from copy import deepcopy 

17from fnmatch import fnmatch 

18from textwrap import indent, wrap 

19from time import time, sleep 

20from shlex import quote, split as shplit 

21from textwrap import dedent 

22from subprocess import Popen, PIPE 

23from concurrent.futures import ThreadPoolExecutor, as_completed 

24from digitalocean import Manager 

25 

26DEFAULT_FMT = 'On droplet {name} at {ip} -- Retval={retval} Stdout: {stdout}' 

27DEFAULT_CONNECTIONS = 50 

28DEFAULT_GLOB = "llama-*" 

29POLL_INTERVAL = 0.1 

30EPILOG = """ 

31 New commands can be added as bash scripts to ``llama.dev.dv.SCRIPTS`` (keep 

32 in mind that these run in a non-interactive SSH session, so you will need 

33 to initialize your environment accordingly). 

34 

35 These scripts can optionally specify default 

36 options (like "--format" below) by putting those arguments in a variable on 

37 a new line called "DROPVEC", e.g. DROPVEC="--timeout 5 --connections 2". 

38 

39 A short description of the command can be specified in a variable on a new 

40 line called HELP, e.g. HELP="check running servers". 

41 

42 Functions to run before and after the vectorized commands ("SETUP" and 

43 "CLEANUP" commands, respectively) can be defined using those variables in 

44 the command scripts, similarly to DROPVEC. For example, to make a directory 

45 called "foo" before any of the vectorized commands run, put SETUP="mkdir 

46 foo" somewhere in the script. To remove that directory afterwards, put 

47 CLEANUP="rm -r foo" somewhere in the script. You can specify multiple SETUP 

48 and CLEANUP commands this way; they will be run sequentially as 

49 encountered. Environmental variable expansion will be performed on "SETUP" 

50 and "CLEANUP" commands in the local environment; specify environmental 

51 variables using python format strings. The number of droplets being run on 

52 is available in SETUP and CLEANUP scripts as DROPNUM. 

53""" 

54SCRIPTS = { 

55 "destroy": """ 

56 #!/bin/bash 

57 HELP="Delete all output files in the ouptut directory for the specified run type. Will not prompt for confirmation, so think twice before deleting all your work (i.e. only run this if you've already pulled results or if you made some mistake that you want to flush).' Specify injection type to delete as 'signal', 'doublet', or 'bg'." 

58 DROPVEC="--format 'Deleted output products on {name} at {ip} -- Retval={retval}'" 

59 ROOTDIR='/home/vagrant/pre-er14-background' 

60 

61 [ $# -ne 1 ] && set -- __invalid_choice 

62 case "$1" in 

63 bg) 

64 TARGET=events;; 

65 signal) 

66 TARGET=signal-events;; 

67 doublet) 

68 TARGET=signal-doublet-events;; 

69 *) 

70 echo "Must specify bg, signal, or doublet." 

71 exit 1 

72 esac 

73 rm -rf "$ROOTDIR"/"$TARGET" 

74 mkdir -p "$ROOTDIR"/"$TARGET" 

75 """, 

76 "done": """ 

77 #!/bin/bash 

78 HELP="See how many events have been processed so far for the specified injection type. Options are 'signal', 'doublet', or 'bg'." 

79 DROPVEC="done bg --reduce sum --format 'Droplet {name} at {ip} finished analyses: {stdout}'" 

80 ROOTDIR=/home/vagrant/pre-er14-background 

81 OUTFILE=significance_lvc-i3.json 

82 

83 [ $# -ne 1 ] && set -- __invalid_choice 

84 case "$1" in 

85 bg) 

86 TARGET=events;; 

87 signal) 

88 TARGET=signal-events;; 

89 doublet) 

90 TARGET=signal-doublet-events;; 

91 *) 

92 echo "Must specify bg, signal, or doublet." 

93 exit 1 

94 esac 

95 find "$ROOTDIR"/"$TARGET" -name "$OUTFILE" | wc -l 

96 """, 

97 "eval": """ 

98 HELP="Evaluate the provided arguments as code on all droplets." 

99 eval "$@" 

100 """, 

101 "forget": """ 

102 #!/bin/bash 

103 HELP="Remove active droplet host keys from ~/.ssh/known_hosts." 

104 DROPVEC="--local -n 1 --format 'Removing {name} at {ip} from ~/.ssh/known_hosts. Retval: {retval} Stderr: {stderr}'" 

105 SETUP="cp {HOME}/.ssh/known_hosts {HOME}/.ssh/known_hosts.orig" 

106 CLEANUP="diff {HOME}/.ssh/known_hosts {HOME}/.ssh/known_hosts.orig" 

107 sed -i '' "/$DROPIP/d" "$HOME"/.ssh/known_hosts 

108 """, 

109 "init": """ 

110 #!/bin/bash 

111 HELP="Add all droplets' host keys to the list of known hosts." 

112 DROPVEC="--local --format 'Adding hostkey to {name} at {ip}. Retval: {retval} Stdout: {stdout}'" 

113 ssh -o StrictHostKeyChecking=no vagrant@$DROPIP </dev/null "hostname" 

114 """, 

115 "killall": """ 

116 #!/bin/bash 

117 DROPVEC="--format 'Killed all LLAMA processes on {name} at {ip}'" 

118 HELP='Kill all LLAMA `llama run` and event-running processes on all droplets.' 

119 ps -ax \ 

120 | grep -E "llama run|er14|mid-o3" \ 

121 | sed '/grep/d;/DROPVEC/d' \ 

122 | awk '{print $1}' \ 

123 | xargs kill 2>/dev/null || true 

124 """, 

125 "launch": """ 

126 #!/bin/bash 

127 HELP="Setup droplets and start injection runs. Specify injection type as 'signal', 'doublet', or 'bg' (DEFAULT: bg). Optionally, specify a list of skymaps (on the remote machine) to perform the run on. Only skymaps in this list will be used, which is handy for running on a specific population. You *must* provide injection type if you want to specify a skymap list." 

128 DROPVEC="launch bg --prelaunch 'Starting launch on {name} at {ip}...'" 

129 LOGDIR='/home/vagrant/pre-er14-background/logs' 

130 case "$1" in 

131 bg) 

132 RUNNER=pre-er14-background-event-runner 

133 LOGPREFIX='' 

134 ;; 

135 signal) 

136 RUNNER=pre-er14-signal-event-runner 

137 LOGPREFIX=signal- 

138 ;; 

139 doublet) 

140 RUNNER=pre-er14-signal-doublet-event-runner 

141 LOGPREFIX=signal-doublet- 

142 ;; 

143 *) 

144 echo "Must specify bg, signal, or doublet." 

145 exit 1 

146 esac 

147 echo "linking /var/www/html to injection event directory..." 

148 sudo rm /var/www/html 

149 sudo ln -s /home/vagrant/pre-er14-background/events /var/www/html 

150 printf "%s" "Creating swapfile..." 

151 if [[ -f /swapfile ]]; then 

152 echo "/swapfile exists" 

153 else 

154 sudo fallocate -l 12G /swapfile && echo "created new /swapfile" 

155 sudo chmod 0600 /swapfile && echo "set swapfile mode 0600" 

156 sudo mkswap /swapfile && echo "mkswap /swapfile" 

157 sudo swapon /swapfile 

158 fi 

159 sudo swapon -s \ 

160 && echo "turned swapfile on" \ 

161 || echo "swapfile already on" 

162 free -m 

163 cd ~/dev/multimessenger-pipeline 

164 git fetch 

165 git checkout subthresh-integration 

166 git pull || true 

167 cd 

168 echo "in directory: $(pwd)" 

169 echo "launching event runner..." 

170 nohup >>"${LOGDIR}"/"${LOGPREFIX}"$(date "+%s").log 2>&1 "${RUNNER}" "$2" & 

171 """, 

172 "batch": """ 

173 #!/bin/bash 

174 HELP="Set up droplets and start llama batch. Specify injection type: 'subthresh' (DEFAULT: subthresh)" 

175 DROPVEC="batch subthresh --prelaunch 'STARTING BATCH on {name} at {ip}...'" 

176 CODEROOT=/home/llama/dev/multimessenger-pipeline/bin 

177 export PATH="/opt/anaconda/bin:$PATH" 

178 source /home/llama/creds 

179 echo "ENVIRONMENT:" 

180 echo 

181 env 

182 case "$1" in 

183 subthresh) 

184 RUNNER=$CODEROOT/mid-o3-subthreshold-background-event-runner 

185 ;; 

186 *) 

187 echo "Must specify subthresh" 

188 exit 1 

189 ;; 

190 esac 

191 printf "%s" "Creating swapfile..." 

192 if [[ -f /swapfile ]]; then 

193 echo "/swapfile exists" 

194 else 

195 sudo fallocate -l 40G /swapfile && echo "created new /swapfile" 

196 sudo chmod 0600 /swapfile && echo "set swapfile mode 0600" 

197 sudo mkswap /swapfile && echo "mkswap /swapfile" 

198 sudo swapon /swapfile 

199 fi 

200 sudo swapon -s \ 

201 && echo "turned swapfile on" \ 

202 || echo "swapfile already on" 

203 free -m 

204 cd ~/dev/multimessenger-pipeline 

205 git fetch 

206 git checkout subthresh-integration 

207 git pull || true 

208 cd 

209 echo "in directory: $(pwd)" 

210 for i in {1..4}; do 

211 echo "launching event runner $i..." 

212 nohup >>~/"batch-${i}-"$(date "+%s").log 2>&1 "${RUNNER}" "${i}" & 

213 done 

214 """, 

215 "ls-procs": """ 

216 #!/bin/bash 

217 DROPVEC="--reduce linecount --format 'Droplet {name} at {ip} llama run procs: {stdout}'" 

218 HELP='List `llama run` processes on each droplet and print the total count.' 

219 ps -ax \ 

220 | grep -E "llama run|er14|mid-o3" \ 

221 | sed "/grep/d;s/^/ /" 

222 """, 

223 "pull": """ 

224 #!/bin/bash 

225 DROPVEC="pull */*/scatterplot_lvc-i3.pdf */*/skymap_info.json */*/significance_lvc-i3.json */*/icecube_neutrino_list.json --prelaunch 'Starting rsync for {name} at {ip}...' --format 'Finished rsyncing {name} at {ip}.' --connections=3 --local" 

226 HELP='Run in directory where results should go. Only copies important information and sorts by server name (so make sure your servers have unique names). Add extra rsync include patterns as positional command-line arguments (specified from the root of the analysis directory). File name patterns cannot have spaces in them.' 

227 

228 rsync -a \ 

229 --include='*/*/' \ 

230 `printf -- "--include=%s " $@` \ 

231 --exclude='*/*' \ 

232 --prune-empty-dirs \ 

233 vagrant@$DROPIP:/home/vagrant/pre-er14-background/ \ 

234 $DROPNAME 

235 """, 

236 "push": """ 

237 #!/usr/bin/env python3 

238 

239 import os 

240 import sys 

241 from datetime import datetime 

242 from math import ceil 

243 from shutil import rmtree 

244 from subprocess import Popen 

245 from glob import glob 

246 from tempfile import mkdtemp 

247 

248 DROPVEC="push signal --connections 3 --local --format 'Split and uploaded for {name} at {ip}'" 

249 HELP="Split injection neutrino lists evenly between available droplets and upload them. Neutrino lists should be JSON formatted and located in subdirectories of the current directory. Their names should exactly match the GW skymap filenames they correspond to. Specify injection type as 'signal' or 'doublet' (DEFAULT: signal). Note that bg runs are MC and are not split a priori." 

250 DROPNUM = int(os.environ['DROPNUM']) 

251 DROPI = int(os.environ['DROPI']) 

252 DROPIP = os.environ['DROPIP'] 

253 

254 if sys.argv[1] == 'signal': 

255 REMOTEDIR = '/home/vagrant/pre-er14-background/signal-neutrinos' 

256 elif sys.argv[1] == 'doublet': 

257 REMOTEDIR = '/home/vagrant/pre-er14-background/signal-doublet-neutrinos' 

258 else: 

259 print("ERROR unrecognized argument ", sys.argv[1]) 

260 exit(1) 

261 

262 MKDIR_CMD = ''' 

263 if [[ -d {remotedir} ]]; then 

264 echo "Moving {remotedir} to make way for new one" 

265 mv {remotedir} {remotedir}.{timestamp} 

266 fi 

267 mkdir {remotedir} 

268 '''.format(remotedir=REMOTEDIR, timestamp=int(datetime.now().timestamp()))  

269 FILES = sorted(glob(os.path.join('**', '*.json'), recursive=True)) 

270 FILES_PER_DIR = ceil(len(FILES)/DROPNUM) 

271 TMP = mkdtemp() 

272 

273 print("Linking paths to tmpdir: ", TMP) 

274 for p in FILES[FILES_PER_DIR*DROPI:FILES_PER_DIR*(DROPI+1)]: 

275 d, f = os.path.split(p) 

276 dout = os.path.join(TMP, d) 

277 if not os.path.isdir(dout): 

278 os.makedirs(dout) 

279 os.link(p, os.path.join(TMP, p)) 

280 

281 print("Making remote dir: ", REMOTEDIR) 

282 proc = Popen(['ssh', 'vagrant@'+DROPIP, MKDIR_CMD]) 

283 proc.communicate() 

284 if not proc.returncode: 

285 cmd = ['rsync', '-a', os.path.join(TMP, ''), 

286 'vagrant@{}:{}'.format(DROPIP, REMOTEDIR)] 

287 print("Trying to rsync: ", cmd) 

288 proc = Popen(cmd) 

289 proc.communicate() 

290 if not proc.returncode: 

291 print("Succeeded rsyncing files to remote, unlinking local tmpdir.") 

292 rmtree(TMP) 

293 else: 

294 print("Could not rsync, local files still in ", TMP) 

295 exit(1) 

296 else: 

297 print("Could not make remote directory.") 

298 exit(1) 

299 """, 

300 "tail": """ 

301 #!/bin/bash 

302 HELP="Tail 30 lines of most recent log on each droplet. Specify injection type with 'subthresh', 'signal', 'doublet', or 'bg' as first argument (DEFAULT: bg)." 

303 LOGDIR=pre-er14-background/logs 

304 case "${1:-bg}" in 

305 subthresh) 

306 PATTERN='batch-[0-9]-' 

307 LOGDIR=~/ 

308 ;; 

309 signal) 

310 PATTERN='signal-' 

311 ;; 

312 doublet) 

313 PATTERN='signal-doublet-' 

314 ;; 

315 bg) 

316 PATTERN='' 

317 ;; 

318 *) 

319 echo "Invalid log choice $1, exiting." 

320 ;; 

321 esac 

322 LOGFILE="$( \ 

323 find "$LOGDIR" -name "${PATTERN}[0-9]*.log" \ 

324 | sort \ 

325 | tail -1 \ 

326 )" 

327 stat "${LOGFILE}" 

328 tail -30 "${LOGFILE}" 

329 echo CURRENT TIME: $(date) 

330 """, 

331} 

332 

333for name, script in SCRIPTS.items(): 

334 SCRIPTS[name] = dedent(script) 

335 

336 

337REDUCERS = { 

338 'sum': lambda outs: print("SUM: {}".format(sum(int(o.strip() or 0) for o in 

339 outs))), 

340 'linecount': lambda outs: print("LINES: ", len( 

341 sum([o.strip().split('\n') for o in outs if o], []))), 

342} 

343 

344 

345def argset_string(args): 

346 """Set a bunch of arguments as if they were passed through the command line 

347 of a bash script.""" 

348 return "set -- " + " ".join(quote(a) for a in args) 

349 

350 

351class Command(): 

352 """A command that can be used on many Droplets. Can have arguments passed 

353 to it.""" 

354 

355 def __init__(self, commandname, local, args): 

356 """Specify the name of the command and the arguments to use. 

357 servers. 

358 

359 Parameters 

360 ---------- 

361 commandname : str 

362 The name of the bash script to run. Scripts are defined as bash 

363 scripts in ``llama.dev.dv``. 

364 local : bool 

365 Whether this command is meant to run LOCALLY (in which case it will 

366 run once for each droplet, with the droplet names set as 

367 DROPNAME and the IP addresses set as DROPIP environmental 

368 variables). If ``False``, execute the command remotely; remote 

369 commands must be bash scripts. 

370 args : list 

371 A list of command-line arguments to feed to the ``cmd`` script as 

372 positional variables. For remote commands (``local=False`` above), 

373 this is accomplished with a ``set -- arg1 ... argN`` line 

374 before the rest of the script specified by ``cmd`` is executed. 

375 

376 Raises 

377 ------ 

378 KeyError 

379 If ``commandname`` is not a script in ``llama.dev.dv.SCRIPTS`` 

380 """ 

381 self.name = commandname 

382 self.local = local 

383 self._raw = SCRIPTS[commandname] 

384 self.args = args 

385 

386 def cmdline(self, ip): 

387 """The command line arguments in a list format (as expected by the 

388 first argument of ``subprocess.Popen``).""" 

389 if not self.local: 

390 return ["ssh", "llama@"+ip, 

391 argset_string(self.args) + "\n" + self._raw] 

392 return ['bash', '-c', self._raw, '--'] + list(self.args) 

393 

394 @property 

395 def default_str(self): 

396 """Get the raw defaults spec string for this command from the original 

397 script (parsed into ``argparse`` arguments using ``Command.defaults``). 

398 """ 

399 match = re.findall('DROPVEC=(.*)', self._raw) 

400 return shplit(match[0])[0] if match else "" 

401 

402 @property 

403 def defaults(self): 

404 """Get the default argparse args for this command. Specified as 

405 'DROPVEC=...' anywhere in the file (including in a comment, which is 

406 useful for non-bash scripts with different variable setting syntax).""" 

407 default_str = self.default_str 

408 defaults = shplit(default_str) if default_str else [] 

409 # print("Defaults: ", defaults) 

410 default_flags = [f.replace(r'\n', '\n') for f in defaults] 

411 # print("Default flags: ", default_flags) 

412 # add in 'eval' flag as required positional argument 

413 return get_parser().parse_args(default_flags) 

414 

415 def run(self, droplet_name, ip, timeout, i, num_drops): 

416 """Run a command synchronously using ``subprocess.Popen``. 

417 

418 Parameters 

419 ---------- 

420 droplet_name : str 

421 The name of the droplet to run on. 

422 ip : str 

423 The IP address of the droplet to run on. 

424 timeout : float 

425 The maximum execution time in seconds, after which the process will 

426 be killed. 

427 i : int 

428 The number of the droplet in the full list of droplets. 

429 num_drops : int 

430 The total number of droplets run. 

431 

432 Returns 

433 ------- 

434 stdout : str 

435 STDOUT from the process. 

436 stderr : str 

437 STDERR from the process. 

438 retval : str 

439 The return value of the process. 

440 """ 

441 env = deepcopy(os.environ) 

442 if self.local: 

443 env['DROPIP'] = ip 

444 env['DROPNAME'] = droplet_name 

445 env['DROPI'] = str(i) 

446 env['DROPNUM'] = str(num_drops) 

447 proc = Popen(self.cmdline(ip), env=env, stdout=PIPE, stderr=PIPE) 

448 start = time() 

449 while proc.poll() is None and time() - start < timeout: 

450 sleep(POLL_INTERVAL) 

451 proc.kill() 

452 out, err = proc.communicate() 

453 return out.decode(), err.decode(), proc.returncode 

454 

455 @property 

456 def help(self): 

457 """Return the help string for this command. Specified as 'HELP=...' 

458 anywhere in the file (including in a comment, which is useful for 

459 non-bash scripts with different variable setting syntax).""" 

460 match = re.findall('HELP=(.*)', self._raw) 

461 if match: 

462 return shplit(match[0])[0] 

463 return "" 

464 

465 def _runvar(self, varname, env): 

466 """Run a command saved as a ``bash`` variable in this ``Command`` 

467 script, where the command is pulled out of the given ``varname`` (e.g. 

468 "SETUP" for a setup command defined as ``SETUP="mkdir foo"``. Parse it, 

469 expand user home directories, and substitute local environmental 

470 variables (specified as a **python** format string) into it. 

471 

472 Raises 

473 ------ 

474 """ 

475 matches = re.findall(varname+'=(.*)', self._raw) 

476 for match in matches: 

477 # print("Match: ", match) 

478 matchstr = shplit(match)[0].format(**os.environ) 

479 print("Running {} command: {}".format(varname, matchstr)) 

480 proc = Popen(shplit(matchstr), env=env) 

481 proc.communicate() 

482 print("{} command returned: {}".format(varname, proc.returncode)) 

483 

484 def setup(self, env): 

485 """Run a setup command BEFORE the parallel processes run for each 

486 server. Specified as a standard ``bash`` command in 'SETUP=...' 

487 anywhere in the file (including in a comment, which is useful for 

488 non-bash scripts with different variable setting syntax). This command 

489 is *ALWAYS* run once locally, regardless of how the vectorized commands 

490 run. STDOUT and STDERR are not piped.""" 

491 self._runvar("SETUP", env) 

492 

493 def cleanup(self, env): 

494 """Run a cleanup command AFTER the parallel processes run for each 

495 server. Specified as a standard ``bash`` command in 'CLEANUP=...' 

496 anywhere in the file (including in a comment, which is useful for 

497 non-bash scripts with different variable setting syntax). This command 

498 is *ALWAYS* run once locally, regardless of how the vectorized commands 

499 run. STDOUT and STDERR are not piped.""" 

500 self._runvar("CLEANUP", env) 

501 

502 

503def scripts(): 

504 """Print a list of available scripts.""" 

505 return list(SCRIPTS) 

506 

507 

508def print_descriptions(): 

509 """Print descriptions of the available commands.""" 

510 print("AVAILABLE COMMANDS (use --defaults to show default CLI flags):") 

511 for cmdname in scripts(): 

512 print('- ' + cmdname) 

513 cmd = Command(cmdname, True, []) # the value of local doesn't matter 

514 helpstr = indent('\n'.join(wrap(cmd.help)), " "*4) 

515 if helpstr: 

516 print(helpstr) 

517 

518 

519def print_defaults(): 

520 """Print default values of command line arguments for ``dropvec`` for a 

521 given command.""" 

522 print("DEFAULT FLAGS FOR AVAILABLE COMMANDS:") 

523 for cmdname in scripts(): 

524 print('- ' + cmdname) 

525 cmd = Command(cmdname, True, []) 

526 defaults = indent('\n'.join(wrap(str(cmd.defaults))), " "*4) 

527 if defaults: 

528 print(defaults) 

529 

530 

531def get_droplets(pattern): 

532 """Get droplets whose names match the fnmatch pattern ``pattern``.""" 

533 from llama.com.do import TOKEN, NO_TOKEN_WARNING 

534 if TOKEN is None: 

535 raise IOError(NO_TOKEN_WARNING) 

536 return [d for d in Manager(token=TOKEN).get_all_droplets() 

537 if fnmatch(d.name, pattern)] 

538 

539 

540def get_parser(): 

541 """Get an ArgumentParser with this tool's command line interface.""" 

542 parser = ArgumentParser(description=__doc__, epilog=EPILOG) 

543 parser.add_argument("cmd", nargs="?", 

544 choices=scripts(), help=""" 

545 The command to run remotely.""") 

546 parser.add_argument("args", nargs="*", help=""" 

547 Optional additional arguments to feed to the script specified in 

548 ``cmd``. These arguments will be set using ``set -- arg1 arg2 ... 

549 argN`` before the script specified by ``cmd`` is evaluated, effectively 

550 making it as if these arguments were passed as ``$1`` through ``$N`` to 

551 that shell script.""") 

552 parser.add_argument("-d", "--descriptions", action="store_true", help=""" 

553 Print names and descriptions of available commands (see "cmd" 

554 above).""") 

555 parser.add_argument("-D", "--defaults", action="store_true", help=""" 

556 Print default command line flags for each available command.""") 

557 parser.add_argument("-p", "--pattern", default=DEFAULT_GLOB, help=f""" 

558 An fnmatch-style pattern that droplet names must match. Droplets whose 

559 names do not match this pattern will not be included. (default: 

560 {DEFAULT_GLOB})""") 

561 parser.add_argument("-l", "--local", action="store_true", help=""" 

562 Run the given command locally instead of on the remote droplet. This is 

563 useful for e.g. rsync commands (and anything that is meant to interact 

564 with each droplet but which must run on the local machine). These 

565 scripts will have the DROPNAME and DROPIP variables set to each 

566 droplet's name and IP address respectively.""") 

567 parser.add_argument("-f", "--format", default=DEFAULT_FMT, help=""" 

568 The format string to use to describe what happened for each command. 

569 Should be a typical python format string with "name" referring to the 

570 droplet name, "ip" referring to its IP address, "stdout" referring to 

571 the standard output of the command, "stderr" the standard error, and 

572 "retval" referring to the return code of the SSH call.""") 

573 parser.add_argument("-P", "--prelaunch", default='', help=""" 

574 A python format string to print for each droplet BEFORE launch (for 

575 long-running processes). Specify the droplet name and IP as in 

576 ``--format`` (stdout, stderr, and retval are obviously not available 

577 before the thread starts).""") 

578 parser.add_argument("-n", "--connections", type=int, 

579 default=DEFAULT_CONNECTIONS, help=""" 

580 The max number of simultaneous connections (or processes) to make. For 

581 actions involving data transfer, set this to something low (probably 

582 1). DEFAULT: {}""".format(DEFAULT_CONNECTIONS)) 

583 parser.add_argument("-t", "--timeout", default=inf, type=float, help=""" 

584 Max execution time on each droplet in s. No timeout by default.""") 

585 parser.add_argument("-r", "--reduce", choices=tuple(REDUCERS), help=""" 

586 Specify a function that reduces the STDOUT values, e.g. by summing 

587 them.""") 

588 return parser 

589 

590 

591def argparse_to_dict(args): 

592 """Get a dictionary representing an ``argparse.Namespace``.""" 

593 return {a: getattr(args, a) for a in dir(args) if not a.startswith('_')} 

594 

595 

596def apply_default_cli_args(args): 

597 """Modify the command line arguments with the args provided in ``args``. 

598 Returns the provided command line arguments modified using defaults from 

599 ``get_parser`` when neither CLI args nor ``args`` overrides them.""" 

600 parser = get_parser() 

601 cli = parser.parse_args() 

602 # must provide a ``cmd`` argument 

603 default_dict = argparse_to_dict(parser.parse_args([])) 

604 cli_dict = argparse_to_dict(cli) 

605 # print("Default dict:", default_dict) 

606 # print("CLI dict: ", cli_dict) 

607 for name in default_dict: 

608 # update with actual CLI args (if nondefault), possibly overriding the 

609 # given arguments 

610 if cli_dict[name] != default_dict[name]: 

611 setattr(args, name, cli_dict[name]) 

612 # print("Final args: ", args) 

613 return args 

614 

615 

616def run_workers(cmd, droplets, fmt, prelaunch, connections, timeout): 

617 """Run the given bash commmand string on the specified droplets, printing 

618 the given ``fmt`` string after each one completes, using up to 

619 ``connections`` concurrent connections while spending up to ``timeout`` 

620 seconds on each connection.""" 

621 outs = [] 

622 

623 def announce_and_run(msg): 

624 """Return a function wrapping ``cmd`` from this function's local scope 

625 that simply prints ``msg`` before running (to announce that a thread 

626 has started). Does not print anything if the ``msg`` is empty.""" 

627 runner = cmd.run 

628 

629 @functools.wraps(runner) 

630 def announcer(*args): 

631 if msg: 

632 print(msg) 

633 return runner(*args) 

634 

635 return announcer 

636 

637 num_drops = len(droplets) 

638 with ThreadPoolExecutor(max_workers=connections) as executor: 

639 futures_to_droplets = { 

640 executor.submit( 

641 announce_and_run( 

642 prelaunch.format( 

643 name=d.name, 

644 ip=d.ip_address 

645 ) 

646 ), 

647 d.name, 

648 d.ip_address, 

649 timeout, 

650 i, 

651 num_drops, 

652 ): d for i, d in enumerate(droplets) 

653 } 

654 for future in as_completed(futures_to_droplets): 

655 drop = futures_to_droplets[future] 

656 # out, err, retval = future.result() 

657 try: 

658 out, err, retval = future.result() 

659 except Exception as exc: 

660 out, err, retval = format(exc), format(exc), '-1' 

661 print(fmt.format(name=drop.name, ip=drop.ip_address, 

662 stdout=out.strip() if out is not None else '', 

663 stderr=err.strip() if out is not None else '', 

664 retval=retval)) 

665 outs.append(out) 

666 return outs