Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# (c) Stefan Countryman 2019 

2 

3""" 

4Tools for fetching remote archival neutrino data from IceCube's servers. You 

5will need IceCube login credentials to be able to run these scripts. 

6""" 

7 

8import os 

9import logging 

10from typing import List 

11from hashlib import sha256 

12from tempfile import TemporaryDirectory 

13from pathlib import Path 

14from llama.dev.upload import DEFAULT_KEY_ROOT 

15from llama.classes import optional_env_var 

16from llama.com.s3 import upload_file, DEFAULT_BUCKET, PrivateFileCacher 

17 

18BLOCKSIZE = 2**20 # 1MB at a time 

19LOGGER = logging.getLogger(__name__) 

20DEFAULT_ROOT = '/data/ana/analyses/gfu/' 

21I3PUB = 'pub.icecube.wisc.edu' 

22COBALT = 'cobalt' 

23CACHER_DECLARATION_FMT = f""" 

24 '{{fname}}': {PrivateFileCacher.__name__}( 

25 '{{key}}', 

26 '{{bucket}}' 

27 ),""" 

28I3_USERNAME = optional_env_var( 

29 ['I3_USERNAME'], 

30 "You need to specify your IceCube username to access remote IceCube data." 

31)[0] 

32 

33 

34def available_versions(root: str = DEFAULT_ROOT): 

35 """Get a list of available GFU archival neutrino versions from the IceCube 

36 servers. 

37 

38 Parameters 

39 ---------- 

40 root : str, optonal 

41 The root directory in which to search for neutrino archive versions. 

42 

43 Returns 

44 ------- 

45 version_dirs : List[str] 

46 A list of relative subdirectories to ``root`` on the remote IceCube 

47 server. If you picked ``root`` correctly, this should be a list of 

48 neutrino archive version snapshots in ascending temporal order. 

49 

50 Raises 

51 ------ 

52 ValueError 

53 If ``I3_USERNAME`` is ``None`` (probably happened because you didn't set the 

54 ``I3_USERNAME`` to your IceCube server username). 

55 

56 Examples 

57 -------- 

58 You should be able to get archival neutrinos used in the early-o3 era: 

59 

60 >>> 'version-002-p04' in available_versions() 

61 True 

62 """ 

63 if I3_USERNAME is None: 

64 raise ValueError("Set I3_USERNAME in your environmental variables.") 

65 from subprocess import Popen, PIPE 

66 cmd = ['ssh', f"{I3_USERNAME}@{I3PUB}", f"ssh {I3_USERNAME}@{COBALT} " 

67 f"'find \"{root}\" -type d -maxdepth 1 -mindepth 1 " 

68 "-exec basename {} \\;'"] 

69 proc = Popen(cmd, stdout=PIPE, stderr=PIPE) 

70 res, err = proc.communicate() 

71 if proc.returncode: 

72 LOGGER.error("Could not fetch available neutrino archives.") 

73 LOGGER.error("STDOUT:\n%s\nSTDERR:\n%s", res, err) 

74 raise IOError("Could not fetch available neutrinos.") 

75 return sorted(res.decode().strip().split("\n")) 

76 

77 

78def fetch_archival_neutrinos(version_dir: str, dest: str, 

79 root: str = DEFAULT_ROOT): 

80 """Same as ``secure_transfer_to_s3``, but store the files in the ``dest`` 

81 directory instead of uploading them to S3. Mainly used as an implementation 

82 tool for ``secure_transfer_to_s3``. Raises a ``FileNotFoundError`` if 

83 ``dest`` is not an existing directory. 

84 """ 

85 if I3_USERNAME is None: 

86 raise ValueError("Set I3_USERNAME in your environmental variables.") 

87 from subprocess import Popen, PIPE 

88 from sys import stdout 

89 if not os.path.isdir(dest): 

90 raise FileNotFoundError("``dest`` must be an existing directory, got: " 

91 f"{dest}.") 

92 arc = f'{I3_USERNAME}@{COBALT}:{root}/{version_dir}/' 

93 LOGGER.debug("Archive path: %s", arc) 

94 if not stdout.isatty(): 

95 stdout = PIPE 

96 # *data.npy are data files; use the MC files if you want those in future 

97 cmd = ['scp', 

98 '-oProxyCommand=ssh -o StrictHostKeyChecking=no -W ' 

99 f'%h:%p {I3_USERNAME}@{I3PUB}', 

100 arc+'*README', arc+'*data.npy', dest] 

101 LOGGER.debug("Download command: %s", cmd) 

102 proc = Popen(cmd, stdout=stdout, stderr=stdout) 

103 res, err = proc.communicate() 

104 if proc.returncode: 

105 LOGGER.error("Archival neutrino fetchin failed. " 

106 "STDOUT:\n%s\nSTDERR:\n%s", res, err) 

107 else: 

108 LOGGER.info("Finished fetching files. STDOUT:\n%s\nSTDERR:\n%s", res, 

109 err) 

110 

111 

112def secure_transfer_to_s3(version_dir: str, root: str = DEFAULT_ROOT): 

113 """Fetch data files containing archival IceCube neutrino track events from 

114 IceCube servers and upload it to non-public AWS S3/DigitalOcean Spaces 

115 storage. Returns information on the file locations that can be used for 

116 automatic authenticated data retrieval. 

117 

118 Parameters 

119 ---------- 

120 version_dir : str 

121 The name of the directory (as a relative path from ``root``) in which 

122 the archival neutrinos you want are stored as ``.npy`` or ``.root`` 

123 files. Should be one of the return values of 

124 ``available_versions(root=root)``. 

125 root : str, optonal 

126 The root directory in which to search for neutrino archive versions on 

127 the IceCube server. 

128 

129 Returns 

130 ------- 

131 private_file_cachers : str 

132 A code string that's a ``dict`` of ``llama.com.s3.PrivateFileCacher`` 

133 declarations pointing to the relevant files; you should paste this file 

134 into the ``llama.files.i3.json.ARCHIVAL_NEUTRINOS`` dictionary to 

135 enable access to these files. 

136 """ 

137 with TemporaryDirectory() as tmpdir: 

138 fetch_archival_neutrinos(version_dir, dest=tmpdir, root=root) 

139 LOGGER.debug("Fetched remote neutrinos and stored them in %s", tmpdir) 

140 tmp = Path(tmpdir) 

141 cachers = f"'{version_dir}': {{" 

142 for fname in os.listdir(tmp): 

143 with open(tmp/fname, 'rb') as infile: 

144 sha = sha256() 

145 file_buffer = infile.read(BLOCKSIZE) 

146 while len(file_buffer) > 0: 

147 sha.update(file_buffer) 

148 file_buffer = infile.read(BLOCKSIZE) 

149 shasum = sha.hexdigest() 

150 key = DEFAULT_KEY_ROOT + shasum 

151 fpath = str(tmp/fname) 

152 LOGGER.debug("Uploading file %s to bucket %s, key %s", fpath, 

153 DEFAULT_BUCKET, key) 

154 upload_file(fpath, key, bucket=DEFAULT_BUCKET, public=False) 

155 LOGGER.info("Upload successful: %s -> %s %s", fpath, 

156 DEFAULT_BUCKET, key) 

157 cacher = CACHER_DECLARATION_FMT.format(fname=fname, key=key, 

158 bucket=DEFAULT_BUCKET) 

159 LOGGER.debug("Adding cacher declaration line: %s", cacher) 

160 cachers += cacher 

161 return cachers + "\n},"