Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# (c) Stefan Countryman 2019
3"""
4Tools for fetching remote archival neutrino data from IceCube's servers. You
5will need IceCube login credentials to be able to run these scripts.
6"""
8import os
9import logging
10from typing import List
11from hashlib import sha256
12from tempfile import TemporaryDirectory
13from pathlib import Path
14from llama.dev.upload import DEFAULT_KEY_ROOT
15from llama.classes import optional_env_var
16from llama.com.s3 import upload_file, DEFAULT_BUCKET, PrivateFileCacher
18BLOCKSIZE = 2**20 # 1MB at a time
19LOGGER = logging.getLogger(__name__)
20DEFAULT_ROOT = '/data/ana/analyses/gfu/'
21I3PUB = 'pub.icecube.wisc.edu'
22COBALT = 'cobalt'
23CACHER_DECLARATION_FMT = f"""
24 '{{fname}}': {PrivateFileCacher.__name__}(
25 '{{key}}',
26 '{{bucket}}'
27 ),"""
28I3_USERNAME = optional_env_var(
29 ['I3_USERNAME'],
30 "You need to specify your IceCube username to access remote IceCube data."
31)[0]
34def available_versions(root: str = DEFAULT_ROOT):
35 """Get a list of available GFU archival neutrino versions from the IceCube
36 servers.
38 Parameters
39 ----------
40 root : str, optonal
41 The root directory in which to search for neutrino archive versions.
43 Returns
44 -------
45 version_dirs : List[str]
46 A list of relative subdirectories to ``root`` on the remote IceCube
47 server. If you picked ``root`` correctly, this should be a list of
48 neutrino archive version snapshots in ascending temporal order.
50 Raises
51 ------
52 ValueError
53 If ``I3_USERNAME`` is ``None`` (probably happened because you didn't set the
54 ``I3_USERNAME`` to your IceCube server username).
56 Examples
57 --------
58 You should be able to get archival neutrinos used in the early-o3 era:
60 >>> 'version-002-p04' in available_versions()
61 True
62 """
63 if I3_USERNAME is None:
64 raise ValueError("Set I3_USERNAME in your environmental variables.")
65 from subprocess import Popen, PIPE
66 cmd = ['ssh', f"{I3_USERNAME}@{I3PUB}", f"ssh {I3_USERNAME}@{COBALT} "
67 f"'find \"{root}\" -type d -maxdepth 1 -mindepth 1 "
68 "-exec basename {} \\;'"]
69 proc = Popen(cmd, stdout=PIPE, stderr=PIPE)
70 res, err = proc.communicate()
71 if proc.returncode:
72 LOGGER.error("Could not fetch available neutrino archives.")
73 LOGGER.error("STDOUT:\n%s\nSTDERR:\n%s", res, err)
74 raise IOError("Could not fetch available neutrinos.")
75 return sorted(res.decode().strip().split("\n"))
78def fetch_archival_neutrinos(version_dir: str, dest: str,
79 root: str = DEFAULT_ROOT):
80 """Same as ``secure_transfer_to_s3``, but store the files in the ``dest``
81 directory instead of uploading them to S3. Mainly used as an implementation
82 tool for ``secure_transfer_to_s3``. Raises a ``FileNotFoundError`` if
83 ``dest`` is not an existing directory.
84 """
85 if I3_USERNAME is None:
86 raise ValueError("Set I3_USERNAME in your environmental variables.")
87 from subprocess import Popen, PIPE
88 from sys import stdout
89 if not os.path.isdir(dest):
90 raise FileNotFoundError("``dest`` must be an existing directory, got: "
91 f"{dest}.")
92 arc = f'{I3_USERNAME}@{COBALT}:{root}/{version_dir}/'
93 LOGGER.debug("Archive path: %s", arc)
94 if not stdout.isatty():
95 stdout = PIPE
96 # *data.npy are data files; use the MC files if you want those in future
97 cmd = ['scp',
98 '-oProxyCommand=ssh -o StrictHostKeyChecking=no -W '
99 f'%h:%p {I3_USERNAME}@{I3PUB}',
100 arc+'*README', arc+'*data.npy', dest]
101 LOGGER.debug("Download command: %s", cmd)
102 proc = Popen(cmd, stdout=stdout, stderr=stdout)
103 res, err = proc.communicate()
104 if proc.returncode:
105 LOGGER.error("Archival neutrino fetchin failed. "
106 "STDOUT:\n%s\nSTDERR:\n%s", res, err)
107 else:
108 LOGGER.info("Finished fetching files. STDOUT:\n%s\nSTDERR:\n%s", res,
109 err)
112def secure_transfer_to_s3(version_dir: str, root: str = DEFAULT_ROOT):
113 """Fetch data files containing archival IceCube neutrino track events from
114 IceCube servers and upload it to non-public AWS S3/DigitalOcean Spaces
115 storage. Returns information on the file locations that can be used for
116 automatic authenticated data retrieval.
118 Parameters
119 ----------
120 version_dir : str
121 The name of the directory (as a relative path from ``root``) in which
122 the archival neutrinos you want are stored as ``.npy`` or ``.root``
123 files. Should be one of the return values of
124 ``available_versions(root=root)``.
125 root : str, optonal
126 The root directory in which to search for neutrino archive versions on
127 the IceCube server.
129 Returns
130 -------
131 private_file_cachers : str
132 A code string that's a ``dict`` of ``llama.com.s3.PrivateFileCacher``
133 declarations pointing to the relevant files; you should paste this file
134 into the ``llama.files.i3.json.ARCHIVAL_NEUTRINOS`` dictionary to
135 enable access to these files.
136 """
137 with TemporaryDirectory() as tmpdir:
138 fetch_archival_neutrinos(version_dir, dest=tmpdir, root=root)
139 LOGGER.debug("Fetched remote neutrinos and stored them in %s", tmpdir)
140 tmp = Path(tmpdir)
141 cachers = f"'{version_dir}': {{"
142 for fname in os.listdir(tmp):
143 with open(tmp/fname, 'rb') as infile:
144 sha = sha256()
145 file_buffer = infile.read(BLOCKSIZE)
146 while len(file_buffer) > 0:
147 sha.update(file_buffer)
148 file_buffer = infile.read(BLOCKSIZE)
149 shasum = sha.hexdigest()
150 key = DEFAULT_KEY_ROOT + shasum
151 fpath = str(tmp/fname)
152 LOGGER.debug("Uploading file %s to bucket %s, key %s", fpath,
153 DEFAULT_BUCKET, key)
154 upload_file(fpath, key, bucket=DEFAULT_BUCKET, public=False)
155 LOGGER.info("Upload successful: %s -> %s %s", fpath,
156 DEFAULT_BUCKET, key)
157 cacher = CACHER_DECLARATION_FMT.format(fname=fname, key=key,
158 bucket=DEFAULT_BUCKET)
159 LOGGER.debug("Adding cacher declaration line: %s", cacher)
160 cachers += cacher
161 return cachers + "\n},"