Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# (c) Stefan Countryman 2016-2018, IceCube collaboration
3"""
4Provide ``realtime_tools`` functionality from IceCube so that the pipeline can
5run without installing a full-fledged IceCube environment. In particular,
6implement ``live.get_events`` and ``neutrino_singlet_pulls.calc_cr_pull``.
8User must provide a valid IceCube username and password as environmental
9variables ``ICECUBE_USERNAME`` and ``ICECUBE_PASSWORD``, respectively.
10"""
12import json
13import time
14import logging
15import datetime
16from math import pi
17from llama.classes import optional_env_var
18from llama.utils import RemoteFileCacher, utc2mjd
19from llama.files.i3.utils import zen_az2ra_dec
21ANG_ERR_FLOOR = 0.2 * pi / 180
22LOGGER = logging.getLogger(__name__)
23URL_PFOW = 'https://live.icecube.wisc.edu/pfow_data/'
24# some polynomial coefficients from neutrino_singlet_pulls.py
25CR_POLY = [-0.033012, 0.71735, -5.9243, 23.795, -46.633, 36.409]
26PRE_2018_TOPICS = [
27 'neutrino16',
28 'neutrino17',
29 'heseEvent16',
30 'eheEvent16',
31 'hbeheEvent16',
32]
33SPLINEMPEFAST_BOOTSTRAP_MUEX = RemoteFileCacher(
34 'https://nyc3.digitaloceanspaces.com/llama/llama/objects/'
35 '9f98ccd73b18c74a38cd07eb696bf9c339fcfb0513318cf4bbe7065890eac263'
36)
37SPLINEMPEFAST_BOOTSTRAP_TRUNCATEDE = RemoteFileCacher(
38 'https://nyc3.digitaloceanspaces.com/llama/llama/objects/'
39 '79e01972b1f046dfaa42e2e39a8c604baa5f47989c8c252ba38579f15d58fa46'
40)
41SPLINEMPEFAST_CRAMERRAO_MUEX = RemoteFileCacher(
42 'https://nyc3.digitaloceanspaces.com/llama/llama/objects/'
43 '9376ffcd48ca837eb376ef1dcd55f8958fc15c672e841dc110cdb2821e7084b5'
44)
45SPLINEMPEFAST_CRAMERRAO_TRUNCATEDE = RemoteFileCacher(
46 'https://nyc3.digitaloceanspaces.com/llama/llama/objects/'
47 '71ee87bb35fbbea18b1199610504dfa89d5ed92a22e1b22ee5c2fbac85a24e07'
48)
49SPLINEMPEFAST_PARABOLOID_MUEX = RemoteFileCacher(
50 'https://nyc3.digitaloceanspaces.com/llama/llama/objects/'
51 'bf08b1386d78ed319f907378f40e49e48bbc79c1887cc58fb93efb73e1fc6a98'
52)
53SPLINEMPEFAST_PARABOLOID_TRUNCATEDE = RemoteFileCacher(
54 'https://nyc3.digitaloceanspaces.com/llama/llama/objects/'
55 '96dcf3d58a0fa90da7566d3cb0bed9a77850122671d061091e4d6c9ef4e01d3e'
56)
58SPLINE_MANIFEST = """
59{
60 "SplineMPEfast.Bootstrap.MuEX.json": [
61 "https://nyc3.digitaloceanspaces.com/llama/llama/objects/9f98ccd73b18c74a38cd07eb696bf9c339fcfb0513318cf4bbe7065890eac263",
62 "9f98ccd73b18c74a38cd07eb696bf9c339fcfb0513318cf4bbe7065890eac263"
63 ],
64 "SplineMPEfast.Bootstrap.TruncatedE.json": [
65 "https://nyc3.digitaloceanspaces.com/llama/llama/objects/79e01972b1f046dfaa42e2e39a8c604baa5f47989c8c252ba38579f15d58fa46",
66 "79e01972b1f046dfaa42e2e39a8c604baa5f47989c8c252ba38579f15d58fa46"
67 ],
68 "SplineMPEfast.CramerRao.MuEX.json": [
69 "https://nyc3.digitaloceanspaces.com/llama/llama/objects/9376ffcd48ca837eb376ef1dcd55f8958fc15c672e841dc110cdb2821e7084b5",
70 "9376ffcd48ca837eb376ef1dcd55f8958fc15c672e841dc110cdb2821e7084b5"
71 ],
72 "SplineMPEfast.CramerRao.TruncatedE.json": [
73 "https://nyc3.digitaloceanspaces.com/llama/llama/objects/71ee87bb35fbbea18b1199610504dfa89d5ed92a22e1b22ee5c2fbac85a24e07",
74 "71ee87bb35fbbea18b1199610504dfa89d5ed92a22e1b22ee5c2fbac85a24e07"
75 ],
76 "SplineMPEfast.Paraboloid.MuEX.json": [
77 "https://nyc3.digitaloceanspaces.com/llama/llama/objects/bf08b1386d78ed319f907378f40e49e48bbc79c1887cc58fb93efb73e1fc6a98",
78 "bf08b1386d78ed319f907378f40e49e48bbc79c1887cc58fb93efb73e1fc6a98"
79 ],
80 "SplineMPEfast.Paraboloid.TruncatedE.json": [
81 "https://nyc3.digitaloceanspaces.com/llama/llama/objects/96dcf3d58a0fa90da7566d3cb0bed9a77850122671d061091e4d6c9ef4e01d3e",
82 "96dcf3d58a0fa90da7566d3cb0bed9a77850122671d061091e4d6c9ef4e01d3e"
83 ]
84}
85"""
87# read IceCube username and password
88ICECUBE_USERNAME, ICECUBE_PASSWORD = optional_env_var(
89 ['ICECUBE_USERNAME', 'ICECUBE_PASSWORD'],
90 """
91 Must specify IceCube username and password as environmental
92 variables ``ICECUBE_USERNAME`` and ``ICECUBE_PASSWORD``,
93 "respectively in order to fetch IceCube data.
94 """
95)
98def to_datestring(value):
99 """
100 Convert a given timestamp into a string of 'YYYY-MM-DD hh:mm:ss.ffffff'.
102 The input must be either an MJD (float) or a datetime object.
104 NOTE: This is modified from the IceCube code to run stand-alone. It *DOES*
105 *NOT ACCEPT* ``I3Time`` instances.
106 """
107 if not isinstance(value, datetime.datetime):
108 import astropy
109 value = astropy.time.Time(value, format='mjd').datetime
110 return str(value)
113def get_events(topic, starttime, stoptime, timekey=None, retries=10, delay=30):
114 """
115 Query events from the I3Live database.
117 Parameters
118 ----------
119 topic : str
120 Name of the stream (e.g. 'neutrino16').
121 starttime : int or float
122 start of timewindow
123 stoptime : int or float
124 stop of timewindow
125 timekey : str
126 which date/time to compare to, such as ``insert_time`` or
127 ``value.data.eventtime`` (default: auto-detect)
128 retries : int
129 Number of retries in case of HTTP errors
130 delay : int or float
131 time delay between retries (default: 10 seconds)
133 Returns
134 -------
135 events : list or None
136 Events matching the given criteria or None in case of (repeated) HTTP
137 error.
139 NOTE: This is modified from the IceCube code to run stand-alone.
140 """
141 # Post-2018, actually specify varname as distinct from topic
142 varname = topic if topic in PRE_2018_TOPICS else 'realtimeEvent'
143 timekey = 'value.data.eventtime' if timekey is None else timekey
144 return _get_pfow(varname, topic, timekey, starttime, stoptime, retries,
145 delay)
148def _get_pfow(varname, topic, timekey, starttime, stoptime, retries=10,
149 delay=30):
150 """
151 Implementation for ``get_events`` and ``get_events_data`` from the
152 original IceCube realtime_tools module.
153 """
154 from urllib.request import Request, urlopen
155 from urllib.error import HTTPError
156 from urllib.parse import urlencode
157 if ICECUBE_USERNAME is None or ICECUBE_PASSWORD is None:
158 raise RuntimeError("Must provide both `ICECUBE_USERNAME` and "
159 "`ICECUBE_PASSWORD` as environmental variables.")
160 try:
161 data = urlencode({
162 'user': ICECUBE_USERNAME,
163 'pass': ICECUBE_PASSWORD,
164 'varname': varname, # this used to just be topic pre-2018
165 'topic': topic,
166 'timekey': timekey,
167 'start': to_datestring(starttime),
168 'stop': to_datestring(stoptime)
169 }).encode('utf-8') # python2/python3 compatible conversion to bytes
170 req = Request(URL_PFOW, data)
172 conn = urlopen(req)
173 raw = conn.read()
174 msgs = json.loads(raw)
175 return msgs
176 except HTTPError as err:
177 LOGGER.warn('HTTP Error {0:}: {1:s}'.format(err.code, err.reason))
178 if retries > 0:
179 LOGGER.warn('Trying again in %.2f seconds (%d tries left)',
180 delay, retries)
181 time.sleep(delay)
182 return _get_pfow(varname, topic, timekey, starttime, stoptime,
183 retries-1)
184 LOGGER.error('Not trying again!')
185 return None
188def calc_cr_pull(cr_zen, cr_azi, zenith, muex):
189 """Calculate CR pull corrected value, based on cr_zen,cr_azi,
190 zenith and muex values based on Thomas's polynomial fit for
191 neutrino16 sample
193 NOTE: This is modified from the IceCube code to run stand-alone."""
194 import numpy as np
195 logmu = np.log10(muex)
196 pull_corr = (CR_POLY[5] +
197 CR_POLY[4]*logmu +
198 CR_POLY[3]*np.power(logmu, 2) +
199 CR_POLY[2]*np.power(logmu, 3) +
200 CR_POLY[1]*np.power(logmu, 4) +
201 CR_POLY[0]*np.power(logmu, 5))
202 avg_cr = np.sqrt(cr_zen**2 + (cr_azi*np.sin(zenith))**2)/np.sqrt(2)
203 LOGGER.info("pullcr:" + " %s"*5, cr_zen, cr_azi, zenith, muex, pull_corr)
204 return pull_corr*avg_cr
207def events_dtype():
208 """Datatypes for events."""
209 import numpy as np
210 return [
211 ('run', np.int),
212 ('event', np.int),
213 ('azi', np.float),
214 ('zen', np.float),
215 ('ra', np.float),
216 ('dec', np.float),
217 ('angErr', np.float),
218 ('logE', np.float),
219 ('time', np.float),
220 ]
224def _supports(topics, version):
225 """
226 Stub version of the
227 ``icecube.realtime_gfu.eventcache.NeutrinoEventStream().supports`` method,
228 published at:
230 https://code.icecube.wisc.edu/projects/icecube/browser/IceCube/
231 projects/realtime_gfu/releases/V19-02-00/python/eventcache.py
233 """
234 return ('neutrino' in topics) and (version == '2018a')
237def parse_neutrino_event_stream(messages, dtype=None):
238 """
239 Stub version of the
240 ``icecube.realtime_gfu.eventcache.NeutrinoEventStream().parse`` method,
241 published at:
243 https://code.icecube.wisc.edu/projects/icecube/browser/IceCube/
244 projects/realtime_gfu/releases/V19-02-00/python/eventcache.py
246 Parses and formats a bunch of neutrinos in the format returned by
247 icecube.realtime_tools.get_live, picking the correct fields from those
248 neutrinos such that the result is suitable for analyses. Based on
249 discussions with Thomas Kintscher and Josh Wood.
250 """
251 import numpy as np
252 if dtype is None:
253 dtype = events_dtype()
255 def ensure_finite(val):
256 assert np.isfinite(val)
257 return val
259 def none_to_nan(val):
260 return val if val is not None else float('nan')
262 # ensure that this is a list of messages, even if it has only one entry
263 messages = messages if isinstance(messages, list) else [ messages ]
265 # filter messages that can actually be parsed
266 messages = [ msg for msg in messages
267 if _supports(msg['value']['streams'], msg['value']['version']) ]
269 # create and fill output array
270 arr = np.zeros(len(messages), dtype=dtype)
271 for i, msg in enumerate(messages):
272 logMuEX = np.log10(ensure_finite(msg['value']['data']['reco']['energy']['spline_muex']))
273 logTruncated = np.log10(none_to_nan(msg['value']['data']['reco']['energy']['truncated']))
274 azimuth = ensure_finite(msg['value']['data']['reco']['splinempe']['azi'])
275 zenith = ensure_finite(msg['value']['data']['reco']['splinempe']['zen'])
276 cr_azi = ensure_finite(msg['value']['data']['reco']['splinempe']['cr_azi'])
277 cr_zen = ensure_finite(msg['value']['data']['reco']['splinempe']['cr_zen'])
278 cramerrao = np.sqrt(0.5*(cr_zen**2. + cr_azi**2.*np.sin(zenith)**2.))
280 pb_err1 = none_to_nan(msg['value']['data']['reco']['splinempe']['pb_err1'])
281 pb_err2 = none_to_nan(msg['value']['data']['reco']['splinempe']['pb_err2'])
282 paraboloid = np.sqrt(0.5*(pb_err1**2. + pb_err2**2.))
284 bootstrap = none_to_nan(msg['value']['data']['reco']['splinempe']['bs_est'])
286 arr[i]['run'] = msg['value']['data']['run_id']
287 arr[i]['event'] = msg['value']['data']['event_id']
288 arr[i]['azi'] = azimuth
289 arr[i]['zen'] = zenith
290 arr[i]['logE'] = logMuEX
291 arr[i]['time'] = utc2mjd(msg['value']['data']['eventtime'])
292 ra_deg, dec_deg = zen_az2ra_dec(
293 arr[i]['time'], # already in MJD, see above
294 arr[i]['zen'] * 180. / np.pi,
295 arr[i]['azi'] * 180. / np.pi
296 )
297 arr[i]['ra'] = ra_deg * np.pi / 180.
298 arr[i]['dec'] = dec_deg * np.pi / 180.
299 arr[i]['angErr'] = pull_correct_online_2017(cramerrao, paraboloid, bootstrap, logMuEX, 'MuEX')
301 # apply floor on angular errors
302 arr['angErr'] = arr['angErr'].clip(min=ANG_ERR_FLOOR)
304 return arr
307def _finite_in_range(var, lo, hi):
308 """Taken from:
310 https://code.icecube.wisc.edu/projects/icecube/browser/IceCube/
311 projects/realtime_gfu/releases/V19-02-00/python/angular_errors.py
313 """
314 import numpy as np
315 finite_var = np.nan_to_num(var)
316 return np.where(np.isfinite(var) & (lo < finite_var) & (finite_var < hi), finite_var, np.nan)
319def load_spline(filename):
320 """Taken from:
322 https://code.icecube.wisc.edu/projects/icecube/browser/IceCube/
323 projects/realtime_gfu/releases/V19-02-00/python/angular_errors.py
325 """
326 import scipy.interpolate
327 with open(filename, 'r') as f:
328 kwargs = json.load(f)
329 spline = scipy.interpolate.UnivariateSpline(**kwargs)
330 return spline
333def pull_correct_online_2017(cramerrao, paraboloid, bootstrap, logE, mode='MuEX'):
334 """Taken from:
336 https://code.icecube.wisc.edu/projects/icecube/browser/IceCube/
337 projects/realtime_gfu/releases/V19-02-00/python/angular_errors.py
339 """
340 import numpy as np
341 paraboloid = _finite_in_range(paraboloid, 0., np.radians(25.))
342 bootstrap = _finite_in_range(bootstrap, np.radians(0.01), np.pi)
343 if mode == 'MuEX':
344 spline_bs = load_spline(SPLINEMPEFAST_BOOTSTRAP_MUEX.get())
345 spline_pb = load_spline(SPLINEMPEFAST_PARABOLOID_MUEX.get())
346 spline_cr = load_spline(SPLINEMPEFAST_CRAMERRAO_MUEX.get())
347 return pull_correction([bootstrap, paraboloid, cramerrao], logE,
348 [spline_bs, spline_pb, spline_cr],
349 fixed_fallback=None)
350 elif mode == 'TruncatedEnergy':
351 spline_bs = load_spline(SPLINEMPEFAST_BOOTSTRAP_TRUNCATEDE.get())
352 spline_pb = load_spline(SPLINEMPEFAST_PARABOLOID_TRUNCATEDE.get())
353 spline_cr = load_spline(SPLINEMPEFAST_CRAMERRAO_TRUNCATEDE.get())
354 return pull_correction([bootstrap, paraboloid, cramerrao], logE,
355 [spline_bs, spline_pb, spline_cr],
356 fixed_fallback=None)
357 else:
358 raise AttributeError('Unknown mode!')
361def pull_correction(values, logE, splines, fixed_fallback=None):
362 """Apply pull-correction to angular error estimators. Taken from:
364 https://code.icecube.wisc.edu/projects/icecube/browser/IceCube/
365 projects/realtime_gfu/releases/V19-02-00/python/angular_errors.py
367 Parameters
368 ----------
369 values : list
370 List of NumPy arrays with the sigma estimates.
371 logE : array
372 The logE of the events.
373 splines : list
374 Splines to be used for the corrections.
375 fixed_fallback : float
376 Should all estimators fail, use this fixed value
377 """
378 import numpy as np
379 values = [ np.atleast_1d(estimate) for estimate in values ]
380 logE = np.atleast_1d(logE)
381 assert len(values) == len(splines), \
382 'Need one spline per estimator.'
383 assert all([ (len(values[i]) == len(logE)) for i in range(len(values)) ]), \
384 'The (all) estimator arrays and the logE array must have the same length.'
386 result = np.ones(len(values[0]))
387 result.fill(np.nan)
389 masks = []
390 for i in range(len(values)):
391 mask = np.isfinite(values[i]) & (np.nan_to_num(values[i]) > 0.) & np.isnan(result)
392 result[mask] = values[i][mask] * np.power(10., splines[i](logE[mask]))
394 if fixed_fallback is not None:
395 mask = np.isnan(result)
396 result[mask] = fixed_fallback
398 return result