Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# (c) Stefan Countryman 2016-2018, Rainer Corley 2016
3"""
4Methods and ``FileHandler`` classes associated with fetching IceCube neutrino
5triggers.
6"""
8import os
9import re
10from math import pi
11from copy import deepcopy
12from random import random
13from fnmatch import fnmatch
14import shutil
15import logging
16import datetime
17import warnings
18from deprecated import deprecated
19from llama.utils import (
20 mjd2gps,
21 utc2gps,
22 gps2mjd,
23 parameter_factory,
24)
25from llama.files.i3.utils import zen_az2ra_dec
26from llama.filehandler import JSONFile, recursive_obsolescence, GenerationError
27from llama.files.healpix import HEALPixPSF
28from llama.files.healpix.psf import PsfGaussian
29from llama.files.skymap_info import SkymapInfo
30from llama.files.lvc_skymap import LvcSkymapHdf5
31from llama.files.slack import SlackReceiptLlama
32from llama.files.gracedb import GraceDBReceipt
33from llama import detectors
34from llama.classes import (
35 OptionalFeatureWarning,
36 registerstub,
37)
38from llama.com.s3 import PrivateFileCacher
39# try loading icecube functions from the icecube environment if possible;
40# otherwise, define usable stub functions.
42LOGGER = logging.getLogger(__name__)
44try:
45 from realtime_tools.neutrino_singlet_pulls import calc_cr_pull
46 from realtime_tools.live import get_events
47except ImportError:
48 msg = ("Could not import ``realtime_tools`` IceCube project. Will "
49 "use stub implementations of IceCube software instead for "
50 "fetching IceCube data. If you have a workable version of "
51 "``realtime_tools`` somewhere, add it to your ``PYTHONPATH`` "
52 "environmental variable to use it here.")
53 LOGGER.debug(msg)
54 warnings.warn(msg, OptionalFeatureWarning)
55 from llama.files.i3.realtime_tools_stubs import (
56 calc_cr_pull,
57 get_events,
58 )
59 registerstub('calc_cr_pull', __name__, calc_cr_pull)
60 registerstub('get_events', __name__, get_events)
61try:
62 from realtime_gfu.eventcache import NeutrinoEventStream, _events_dtype
64 def parse_neutrino_event_stream(messages, dtype=_events_dtype):
65 """Parses and formats a bunch of neutrinos in the format returned by
66 icecube.realtime_tools.get_live, picking the correct fields from those
67 neutrinos such that the result is suitable for analyses. Based on
68 discussions with Thomas Kintscher and Josh Wood."""
69 return NeutrinoEventStream().parse(messages)
71except ImportError:
72 msg = ("Could not import ``realtime_gfu``. Will use stub "
73 "implementations of IceCube software instead for "
74 "formatting IceCube GFU neutrinos. If you have a workable version "
75 "of ``realtime_gfu`` somewhere, add it to your ``PYTHONPATH`` "
76 "environmental variable to use it here.")
77 LOGGER.debug(msg)
78 warnings.warn(msg, OptionalFeatureWarning)
79 from llama.files.i3.realtime_tools_stubs import (
80 parse_neutrino_event_stream,
81 )
82 registerstub('parse_neutrino_event_stream', __name__,
83 parse_neutrino_event_stream)
85# CONSTANTS
86ARCHIVAL_NEUTRINOS = {
87 'version-002-p03': {
88 'IC86_2017_data.npy': PrivateFileCacher(
89 'llama/objects/72c3c52642192d08b3dc07e048d2bae05a8e948ce58b0f66cf442e5f9daf903c',
90 'llama'
91 ),
92 'README': PrivateFileCacher(
93 'llama/objects/1147be5116304327993dcbbcc290d0ff2b1245245f16f049c5529f54b7f04b47',
94 'llama'
95 ),
96 'IC86_2016_data.npy': PrivateFileCacher(
97 'llama/objects/8fd4eca468fcd8314aa2c54179c44bf52ba2deac35071c5d4e88b7642b93eb30',
98 'llama'
99 ),
100 'IC86_2015_data.npy': PrivateFileCacher(
101 'llama/objects/2b17257a472fc26b12be072b5d93d65317892c05a40f01cdd62a450efb67e0fb',
102 'llama'
103 ),
104 },
105 'version-002-p04': {
106 'IC86_2018_data.npy': PrivateFileCacher(
107 'llama/objects/3a744c78e71088dd92725341cb24a2ac649e33555cb8cba4cdb851e6c135ce34',
108 'llama'
109 ),
110 'IC86_2011_data.npy': PrivateFileCacher(
111 'llama/objects/c353bde7c87e706da7472144d0805e6cec65ff0ca475e272a2023c83a653403b',
112 'llama'
113 ),
114 'IC86_2017_data.npy': PrivateFileCacher(
115 'llama/objects/a6db2bb0998fd160cdceaf1eb8642fb10bd05105d647fdb9224d81fc0a30bf54',
116 'llama'
117 ),
118 'IC86_2013_data.npy': PrivateFileCacher(
119 'llama/objects/d27802ddb13b0a6c134c15680f1bc1772a39cb185b0380cd43f3938388424362',
120 'llama'
121 ),
122 'README': PrivateFileCacher(
123 'llama/objects/6038294f52bb921191a0349805d361045c95c5886385fcd193f2cb7c94c88610',
124 'llama'
125 ),
126 'IC86_2012_data.npy': PrivateFileCacher(
127 'llama/objects/ff8903825f6850a4eb4e5bc0cab5cddbf63d47333531818e55538abac9b1ac75',
128 'llama'
129 ),
130 'IC86_2016_data.npy': PrivateFileCacher(
131 'llama/objects/57c6b21b6e3b6c0ea8db115fa78d2731c6d9662d5a1026b303dda55c3ade70db',
132 'llama'
133 ),
134 'IC86_2014_data.npy': PrivateFileCacher(
135 'llama/objects/4ea085eb95eaddd1a6b04b4a48d2491e89e792a2948a94d6af59b29ee936ec55',
136 'llama'
137 ),
138 'IC86_2015_data.npy': PrivateFileCacher(
139 'llama/objects/6f383d315c243adb25b97535dd9274840cfa1cd6f21860f9d93fb8a4ac17b43c',
140 'llama'
141 ),
142 },
143 'version-002-p05': {
144 'IC86_2018_data.npy': PrivateFileCacher(
145 'llama/objects/09a8b00889a59f746ad16552e72cf8b07a43123788e82664c48f1aa3001f00b8',
146 'llama'
147 ),
148 'IC86_2011_data.npy': PrivateFileCacher(
149 'llama/objects/ac630a536db6207a3be87cc1938633c162b31493fe9e5002afadff9774ac0ac9',
150 'llama'
151 ),
152 'IC86_2017_data.npy': PrivateFileCacher(
153 'llama/objects/88d5068e96d5260dae8648982196782d9a6ae3079f907f7eabe88d78562bae7c',
154 'llama'
155 ),
156 'IC86_2013_data.npy': PrivateFileCacher(
157 'llama/objects/f0848891cb47f0d5c99f8c16fed4c15ecaf306b1b408730b4b6b8d66d2ba557b',
158 'llama'
159 ),
160 'README': PrivateFileCacher(
161 'llama/objects/22ad2a675500fbf0d81518cf8a00b04fa59b74b2de5b73b49126236cb6c92d76',
162 'llama'
163 ),
164 'IC86_2012_data.npy': PrivateFileCacher(
165 'llama/objects/88d4944325c3495ee4a106b8e108d42d1aea1ffdc59449a7987c8143dd5f4cdd',
166 'llama'
167 ),
168 'IC86_2016_data.npy': PrivateFileCacher(
169 'llama/objects/d5011765d5b5f6c02e6f9c683a2e8ef15cf35192764bdb7089a39a29c0e7d47d',
170 'llama'
171 ),
172 'IC86_2014_data.npy': PrivateFileCacher(
173 'llama/objects/fbb7236a03c2670c07f2619c252326eda9156d44b66bcd919b9d61857144f4dd',
174 'llama'
175 ),
176 'IC86_2015_data.npy': PrivateFileCacher(
177 'llama/objects/74c28037cba7dbf6465fa9b0267463916cf5c1fda9280ffa39bfe9cd07db60b4',
178 'llama'
179 ),
180 }
181}
182DEFAULT_ARCHIVE = ARCHIVAL_NEUTRINOS['version-002-p04']
183DELTAT_MINUS = 500 # how many seconds before the event to search in
184DELTAT_PLUS = 500 # how many seconds after the event to search in
185ICECUBE_MAX_DELAY = 100
186ICECUBE_COMPLETE_CHECK_MAX_RETRIES = 10
187MJD_TOPIC_SWITCH_16_TO_17 = 57891.168
188MJD_TOPIC_SWITCH_17_TO_BLANK = 58309.741
189# time intervals when we can unblind neutrinos in MJD
190UNBLINDED_TIMES = (
191 (58574.0, 58940.0), # 2019-04-01 to 2020-04-01
192)
193# [deg] the floor on the angular uncertainty. Specified by Josh Wood via email
194# pre-ER14.
195MIN_SIGMA = 0.2
197# CONVERSIONS AND TOOLS
200def get_dec_filter(dec):
201 """Return a function that will filter neutrinos based on DECLINATION, e.g.
202 for use with the ``filter`` function.
204 Parameters
205 ----------
206 dec : list
207 A list of declination intervals (also specified as lists) into
208 which a selected neutrino should fall, e.g. [[ 21, 41 ],[ 92, 102 ]].
210 Returns
211 -------
212 dec_filter : function
213 A function which returns ``True`` when passed a neutrino that falls
214 within at least one of the declination intervals passed included in
215 ``dec``. The neutrino is expected to be in the format returned by
216 ``format_neutrinos``.
218 Examples
219 --------
220 >>> neutrino = {
221 ... 'azimuth': 246.16444118441663,
222 ... 'bdt_up': -0.0155,
223 ... 'energy': 498.10789,
224 ... 'mjd': 57982.5321185039,
225 ... 'sigma': 1.5478695443038595,
226 ... 'zenith': 113.7504569829126
227 ... }
228 >>> ra, dec = zen_az2ra_dec(
229 ... neutrino['mjd'],
230 ... neutrino['zenith'],
231 ... neutrino['azimuth']
232 ... )
233 >>> f"RA: {ra:.3f}, Dec: {dec:.3f}"
234 'RA: 1.253, Dec: 23.653'
235 >>> get_dec_filter([[0,30],[50,60]])(neutrino)
236 True
237 >>> get_dec_filter([[0,20],[50,60]])(neutrino)
238 False
239 """
241 def dec_filter(neutrino):
242 """Filter a neutrino based on Dec."""
243 dec_neutrino = zen_az2ra_dec(
244 neutrino['mjd'],
245 neutrino['zenith'],
246 neutrino['azimuth']
247 )[1]
248 for interval in dec:
249 if dec_neutrino > interval[0] and dec_neutrino < interval[1]:
250 return True
251 return False
253 return dec_filter
256def get_ra_filter(ra): # pylint: disable=C0103
257 """Return a function that will filter neutrinos based on RIGHT ASCENSION,
258 e.g. for use with the ``filter`` function.
260 Parameters
261 ----------
262 ra : list
263 A list of right ascension intervals (also specified as lists) into
264 which a selected neutrino should fall, e.g. [[ 21, 41 ],[ 92, 102 ]].
266 Returns
267 -------
268 ra_filter : function
269 A function which returns ``True`` when passed a neutrino that falls
270 within at least one of the right ascension intervals passed included in
271 ``ra``. The neutrino is expected to be in the format returned by
272 ``format_neutrinos``.
274 Examples
275 --------
276 >>> neutrino = {
277 ... 'azimuth': 246.16444118441663,
278 ... 'bdt_up': -0.0155,
279 ... 'energy': 498.10789,
280 ... 'mjd': 57982.5321185039,
281 ... 'sigma': 1.5478695443038595,
282 ... 'zenith': 113.7504569829126
283 ... }
284 >>> ra, dec = zen_az2ra_dec(
285 ... neutrino['mjd'],
286 ... neutrino['zenith'],
287 ... neutrino['azimuth']
288 ... )
289 >>> f"RA: {ra:.3f}, Dec: {dec:.3f}"
290 'RA: 1.253, Dec: 23.653'
291 >>> get_ra_filter([[0, 30], [50, 60]])(neutrino)
292 True
293 >>> get_ra_filter([[5, 30], [50, 60]])(neutrino)
294 False
295 """
297 # RA is periodic, so wrap it around in the interval [0, 360) for both
298 # the neutrino RA and the interval RAs
299 for interval in ra:
300 interval[0] %= 360.
301 interval[1] %= 360.
303 def ra_filter(neutrino):
304 """Filter a neutrino based on RA."""
305 ra_neutrino = zen_az2ra_dec(
306 neutrino['mjd'],
307 neutrino['zenith'],
308 neutrino['azimuth']
309 )[0]
310 ra_neutrino %= 360
311 for interval in ra:
312 if ra_neutrino > interval[0] and ra_neutrino < interval[1]:
313 return True
314 return False
316 return ra_filter
319def blinder(neutrino, unblinded_times=tuple()):
320 """Randomize neutrino azimuth (and hence right ascension) for neutrinos
321 triggered at times that are not explicitly approved for unblinded analysis.
323 Parameters
324 ----------
325 neutrino : dict
326 A neutrino dictionary in the format returned by format_neutrinos.
327 unblinded_times : array_like, optional
328 An iterable of [start, end] times (specified in UTC MJD format) in
329 which we are approved by IceCube to pull unblinded neutrinos for our
330 analysis. If not provided, it is assumed that ALL neutrinos should be
331 blinded (DEFAULT: ``tuple()``).
333 Returns
334 -------
335 blinded : dict
336 The same neutrino provided as a parameter with the azimuth (and hence
337 right ascension) with RA scrambled if necessary. If the neutrino's MJD
338 falls inside one of these intervals, it will be returned unchanged. If
339 it falls outside all of these intervals, its azimuth will be set to a
340 random value. This is a fresh copy of the neutrino; the neutrino
341 provided as input is *not* modified in place.
343 Examples
344 --------
345 >>> neutrino_time = 57990
346 >>> unblinded_times = [[neutrino_time-1, neutrino_time+1]]
347 >>> neutrino = {
348 ... "mjd": neutrino_time,
349 ... "zenith": 90,
350 ... "sigma": 0.3,
351 ... "energy": 5000,
352 ... "azimuth": 270,
353 ... "bdt_up": 1,
354 ... }
355 >>> blind = blinder(neutrino, unblinded_times=[])
356 >>> blind['azimuth'] == neutrino['azimuth']
357 False
358 >>> unblind = blinder(neutrino, unblinded_times=unblinded_times)
359 >>> unblind['azimuth'] == neutrino['azimuth']
360 True
361 """
362 blinded = deepcopy(neutrino)
363 if not any(neutrino['mjd'] > start and neutrino['mjd'] < end
364 for start, end in unblinded_times):
365 blinded['azimuth'] = 360.*random()
366 blinded['type'] = 'blinded'
367 # For the sake of the unit test, don't let the output neutrino match
368 # the input neutrino's azimuth. This will have no significant effect on
369 # anything else.
370 if blinded['azimuth'] == neutrino['azimuth']:
371 blinded['azimuth'] = (blinded['azimuth'] + 180.) % 360.
372 return blinded
375def neutrino_archive_available_years():
376 """Get a list of years for which archival GFU neutrinos are available.
377 These files are stored remotely on LLAMA S3 (see ``llama.com.s3``) as
378 ``.npy`` files and are cached locally in ``llama.utils.OBJECT_DIR``; these
379 are private files that require LLAMA S3 credentials to access. Archival
380 neutrino releases available via LLAMA S3 are recorded in
381 ``ARCHIVAL_NEUTRINOS``, and the default archival release version is
382 contained in ``DEFAULT_ARCHIVE``.
384 Examples
385 --------
386 Confirm that we have access to archival neutrinos from 2011 through 2018:
388 >>> neutrino_archive_available_years()
389 [2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018]
390 """
391 regex = re.compile(r'IC\d+_(\d\d\d\d)_data.npy')
392 matches = [regex.match(k) for k in DEFAULT_ARCHIVE]
393 return sorted(int(m.groups()[0]) for m in matches if m is not None)
396def get_archive(mjd_start, mjd_end):
397 """Fetch neutrinos from a local archive. Archive must be defined and stored
398 on LLAMA S3 for the queried neutrinos, **and you must have access to LLAMA
399 S3 API credentials to retrieve the archive**.
401 Parameters
402 ----------
403 mjd_start : float
404 The start of the time window in which to search for neutrinos in
405 Modified Julian Date (MJD) format. Use ``gps2mjd`` or ``utc2mjd`` from
406 the ``llama.utils`` module to make the time conversions to the
407 appropriate format.
408 mjd_end : float
409 The end of the time window in which to search for neutrinos.
411 Returns
412 -------
413 formatted : list
414 A list of neutrino dictionaries formatted as expected by
415 IceCubeNeutrinoList. If mjd_start > mjd_end, no error will be raised;
416 instead, an empty list of neutrinos will be returned.
418 Raises
419 ------
420 FileNotFoundError
421 If data for the requested time range is not available.
423 Examples
424 --------
425 You can confirm that the archival neutrino data is available by fetching a
426 specific trigger from the archive:
427 >>> neutrino = get_archive(58392, 58393)[0]
428 >>> neutrino['run']
429 131569
430 >>> neutrino['event']
431 43188689
432 """
433 from astropy.time import Time
434 start_year = Time(mjd_start, format='mjd').datetime.year
435 end_year = Time(mjd_end, format='mjd').datetime.year
436 if not {start_year, end_year}.issubset(neutrino_archive_available_years()):
437 raise FileNotFoundError(("Could not find GFU archive neutrinos for "
438 "requested years: {}, {}").format(start_year,
439 end_year))
440 archives = [v for k, v in DEFAULT_ARCHIVE.items()
441 if fnmatch(k, "IC*_*.npy")]
442 if not archives:
443 return []
444 import numpy as np
445 neutrinos = [
446 {
447 'run': int(n[0]),
448 'event': int(n[1]),
449 'subevent': int(n[2]),
450 'type': 'archival',
451 'file': str(arc.localpath),
452 'mjd': float(n[3]),
453 'azimuth': float(n[4]*180./np.pi),
454 'zenith': float(n[5]*180./np.pi),
455 'ra': float(n[6]*180./np.pi),
456 'dec': float(n[7]*180./np.pi),
457 'energy': float(10**n[9]),
458 'sigma': float(n[8]*180./np.pi),
459 }
460 for arc in archives
461 for n in np.load(arc.get())
462 if n[3] > mjd_start and n[3] < mjd_end
463 ]
464 assert all(n['ra'] <= 360 and n['ra'] >= 0 for n in neutrinos)
465 assert all(n['dec'] <= 90 and n['dec'] >= -90 for n in neutrinos)
466 assert all(n['azimuth'] <= 360 and n['azimuth'] >= 0 for n in neutrinos)
467 assert all(n['zenith'] <= 180 and n['zenith'] >= 0 for n in neutrinos)
468 assert all(n['energy'] <= 10**9 for n in neutrinos)
469 assert all(n['sigma'] <= 180 for n in neutrinos)
470 assert all(n['mjd'] > 0 for n in neutrinos)
471 return neutrinos
474def get_real(mjd_start, mjd_end, check_complete=False):
475 """Fetch real neutrinos from ICECUBE's database; takes gps time of the
476 events and the time range abnut this central value over which to search
477 for neutrinos.
479 Parameters
480 ----------
481 mjd_start : float
482 The start of the time window in which to search for neutrinos in
483 Modified Julian Date (MJD) format. Use ``gps2mjd`` or ``utc2mjd`` from
484 the ``llama.utils`` module to make the time conversions to the
485 appropriate format.
486 mjd_end : float
487 The end of the time window in which to search for neutrinos.
488 check_complete : bool, optional
489 Neutrinos from IceCube can be delayed due to communications issues; to
490 combat this, when ``check_complete`` is ``True``, a check will be run
491 to see if the next neutrino after the end of the search window is yet
492 available. If it is not available, it is assumed that not all neutrinos
493 needed for the search have arrived from south pole, and a
494 ``GenerationError`` will be raised, allowing the pipeline to cooldown
495 and retry the neutrino pull later.
497 Returns
498 -------
499 formatted : list
500 A list of neutrino dictionaries formatted as expected by
501 IceCubeNeutrinoList. If mjd_start > mjd_end, no error will be raised;
502 instead, an empty list of neutrinos will be returned.
504 Raises
505 ------
506 ValueError
507 If the start time of the neutrino pull predates the switch from the
508 'neutrino17' topic to the 'neutrino' topic (in late 2018), a
509 ``ValueError`` is raised. The date of this switch is given by
510 ``MJD_TOPIC_SWITCH_17_TO_BLANK``.
512 Examples
513 --------
514 Get some neutrinos from late-2018 on the 'neutrino' topic:
515 >>> from llama.utils import utc2mjd
516 >>> mjd = utc2mjd("2018-09-30")
517 >>> len(get_real(mjd-500/86400., mjd+500/86400.)) > 0
518 True
520 If your end time is earlier than your start time, you'll get an empty
521 neutrino list:
522 >>> get_real(mjd, mjd-1)
523 []
525 You should be able to fetch this neutrino from the online GFU database:
526 >>> neutrino = get_archive(58392, 58393)[0]
527 >>> neutrino['run']
528 131569
529 >>> neutrino['event']
530 43188689
531 """
532 if mjd_start >= mjd_end:
533 return []
534 # the neutrino 'topic' changed in spring of 2017
535 if mjd_end < MJD_TOPIC_SWITCH_17_TO_BLANK:
536 LOGGER.error("Cannot pull neutrinos from earlier than %s; requested "
537 "interval: (%s, %s)", MJD_TOPIC_SWITCH_17_TO_BLANK,
538 mjd_start, mjd_end)
539 raise ValueError(("Pulling GFU neutrinos from the neutrino17 and "
540 "earlier topics is no longer supported. Queried "
541 "interval ({}, {}).").format(mjd_start, mjd_end))
542 LOGGER.debug('Pulling ICECUBE neutrinos from %s to %s', mjd_start, mjd_end)
543 # the neutrino 'topic' changed again in july 2018
544 formatted = format_neutrinos(get_events('neutrino', mjd_start, mjd_end))
545 LOGGER.debug('Formatted ICECUBE neutrinos:')
546 LOGGER.debug(str(formatted))
547 if check_complete:
548 # make sure that all neutrinos are in by confirming that more neutrinos
549 # from outside the time window are available.
550 check_end = mjd_end
551 for attempt in range(ICECUBE_COMPLETE_CHECK_MAX_RETRIES):
552 check_start = check_end
553 check_end = check_start + ICECUBE_MAX_DELAY / 86400
554 LOGGER.debug('Checking whether this list is complete by pulling '
555 'neutrinos from next time slice %s - %s (attempt %s)',
556 check_start, check_end, attempt)
557 next_nu = get_real(check_start, check_end, check_complete=False)
558 if next_nu:
559 LOGGER.debug('Found extra neutrinos after the search window; '
560 'all neutrinos are likely available. Post-window '
561 'neutrinos: %s', next_nu)
562 break
563 else:
564 msg = ('Could not confirm that all neutrinos are available from '
565 f'MJD search window {mjd_start} - {mjd_end} since no '
566 'subsequent neutrinos were found in '
567 f'{ICECUBE_COMPLETE_CHECK_MAX_RETRIES} consecutive time '
568 f'windows of width {ICECUBE_MAX_DELAY} seconds. It is '
569 'likely that not all neutrinos from the search window have '
570 'arrived from south pole.')
571 LOGGER.error(msg)
572 raise GenerationError(msg)
573 return formatted
576def get_archive_and_real_neutrinos(mjd_start, mjd_end, blinded_neutrinos=None,
577 check_complete=False):
578 """Get a mix of archival and real online GFU neutrinos (automatically calls
579 ``get_archive`` and ``get_real`` as appropriate). Pulls online neutrinos
580 for dates after ``MJD_TOPIC_SWITCH_17_TO_BLANK`` and archival neutrinos for
581 dates prior.
583 Parameters
584 ----------
585 mjd_start : float
586 The start of the time window in which to search for neutrinos in
587 Modified Julian Date (MJD) format. Use ``gps2mjd`` or ``utc2mjd`` from
588 the ``llama.utils`` module to make the time conversions to the
589 appropriate format.
590 mjd_end : float
591 The end of the time window in which to search for neutrinos.
592 blinded_neutrinos : bool, optional
593 Whether the neutrinos should be forcefully blinded or not. Use this to
594 blind the neutrinos even if they are from time windows approved for
595 unblinding (e.g. when doing background sensitivity studies). If not
596 specified, use ``UNBLINDED_TIMES`` to determine which neutrinos can be
597 unblinded.
598 check_complete : bool, optional
599 Neutrinos from IceCube can be delayed due to communications issues; to
600 combat this, when neutrinos are pulled from the online data stream
601 and ``check_complete`` is ``True``, a check will be run to see if the
602 next neutrino after the end of the search window is yet available. If
603 it is not available, it is assumed that not all neutrinos needed for
604 the search have arrived from south pole, and a ``GenerationError`` will
605 be raised, allowing the pipeline to cooldown and retry the neutrino
606 pull later.
608 Returns
609 -------
610 formatted : list
611 A list of neutrino dictionaries formatted as expected by
612 IceCubeNeutrinoList. If mjd_start > mjd_end, no error will be raised;
613 instead, an empty list of neutrinos will be returned. Each neutrino
614 will be blinded if ``blinded_neutrinos`` is ``True`` or if its event
615 time falls outside the list of approved ``UNBLINDED_TIMES``.
617 Examples
618 --------
619 Pull some blinded archival neutrinos
620 >>> a_start = MJD_TOPIC_SWITCH_17_TO_BLANK-1.77
621 >>> a_end = MJD_TOPIC_SWITCH_17_TO_BLANK
622 >>> arc = get_archive_and_real_neutrinos(a_start, a_end, True)
623 >>> len(arc) > 0
624 True
626 Pull some blinded GFU neutrinos
627 >>> l_start = MJD_TOPIC_SWITCH_17_TO_BLANK
628 >>> l_end = MJD_TOPIC_SWITCH_17_TO_BLANK + 500/86400.
629 >>> live = get_archive_and_real_neutrinos(l_start, l_end, True)
630 >>> len(live) > 0
631 True
633 Pull a mix of archival and blinded neutrinos
634 >>> mix = get_archive_and_real_neutrinos(a_start, l_end, True)
635 >>> len(arc) + len(live) == len(mix)
636 True
637 """
638 neutrinos = []
639 if mjd_start < MJD_TOPIC_SWITCH_17_TO_BLANK:
640 if mjd_end < MJD_TOPIC_SWITCH_17_TO_BLANK:
641 end = mjd_end
642 else:
643 end = MJD_TOPIC_SWITCH_17_TO_BLANK
644 neutrinos += get_archive(mjd_start, end)
645 if mjd_end > MJD_TOPIC_SWITCH_17_TO_BLANK:
646 if mjd_start > MJD_TOPIC_SWITCH_17_TO_BLANK:
647 start = mjd_start
648 else:
649 start = MJD_TOPIC_SWITCH_17_TO_BLANK
650 neutrinos += get_real(start, mjd_end, check_complete=check_complete)
651 if blinded_neutrinos is None:
652 LOGGER.info("``blinded_neutrinos`` not specified, unblinding as "
653 "specified in ``UNBLINDED_TIMES``.")
654 neutrinos = [blinder(n, UNBLINDED_TIMES) for n in neutrinos]
655 elif blinded_neutrinos:
656 LOGGER.info("``blinded_neutrinos`` is 'true'; force-blinding "
657 "all neutrinos.")
658 neutrinos = [blinder(n) for n in neutrinos]
659 else:
660 LOGGER.info("``blinded_neutrinos`` is 'false'; unblinding.")
661 for neutrino in neutrinos:
662 for required_neutrino_prop in NEUTRINO_PROPERTIES:
663 if required_neutrino_prop not in neutrino:
664 raise ValueError('Input list of neutrinos incomplete:' +
665 str(required_neutrino_prop))
666 return neutrinos
669def format_neutrinos(neutrinos):
670 """Read in ICECUBE neutrinos as returned by realtime_tools.live.get_events
671 and put them in the format that LLAMA expects."""
672 parsed_events = parse_neutrino_event_stream(neutrinos)
673 formatted = [{k: e[i] for i, k in enumerate(parsed_events.dtype.names)}
674 for e in parsed_events]
675 # fit our internal formatting
676 for neutrino in formatted:
677 neutrino['ra'] = float(neutrino['ra']) * 180. / pi
678 neutrino['dec'] = float(neutrino['dec']) * 180. / pi
679 neutrino['mjd'] = float(neutrino['time'])
680 del neutrino['time']
681 neutrino['sigma'] = float(neutrino['angErr'] * 180. / pi)
682 del neutrino['angErr']
683 neutrino['zenith'] = float(neutrino['zen'] * 180. / pi)
684 del neutrino['zen']
685 neutrino['azimuth'] = float(neutrino['azi'] * 180. / pi)
686 del neutrino['azi']
687 neutrino['energy'] = float(10**neutrino['logE'])
688 del neutrino['logE']
689 neutrino['event'] = int(neutrino['event'])
690 neutrino['run'] = int(neutrino['run'])
691 # mark as observational
692 neutrino['type'] = 'observation'
693 return formatted
696NEUTRINO_PROPERTIES = ['mjd', 'zenith', 'azimuth', 'energy', 'sigma', 'type']
697NEUTRINO_TYPES = ['observation', 'blinded', 'injected', 'unspecified',
698 'archival']
701# these methods are used for the Neutrino class
702def _neutrino_gps(self):
703 """GPS time, i.e. number of GPS seconds since start of GPS epoch, when
704 the neutrino candidate was detected."""
705 return mjd2gps(self.mjd)
708def _neutrino_ra(self):
709 """Right Ascension of neutrino candidate in degrees."""
710 return zen_az2ra_dec(self.mjd, self.zenith, self.azimuth)[0]
713def _neutrino_dec(self):
714 """Declination of neutrino candidate in degrees."""
715 return zen_az2ra_dec(self.mjd, self.zenith, self.azimuth)[1]
718def _neutrino_psf(self):
719 """A Psf object for this neutrino that can be used to take products of this
720 neutrino's localization PSF with other skymap objects."""
721 return PsfGaussian(ra=self.ra, dec=self.dec, sigma=self.sigma)
724Neutrino = parameter_factory(
725 "Neutrino",
726 """A class for holding neutrino trigger data in easily accessible form.
727 Also provides convenience methods for coordinate conversions of underlying
728 data. Good for plugging values into formulae.
729 """,
730 mjd="""Modified Julian Date in UTC epoch at which the neutrino candidate
731 was detected.""",
732 zenith="""Zenith angle of neutrino source direction, measured from
733 IceCube's location at South Pole.""",
734 azimuth="""Azimuthal angle of neutrino source direction, measured from
735 IceCube's location at South Pole.""",
736 energy="Reconstructed energy of the neutrino candidate in GeV.",
737 sigma="Effective uncertainty radius in degrees.",
738 gps=_neutrino_gps,
739 ra=_neutrino_ra,
740 dec=_neutrino_dec,
741 psf=_neutrino_psf,
742)
745@GraceDBReceipt.upload_this('GWHEN ICECUBE GFU neutrinos from a +/- 500s '
746 'window surrounding the event.')
747@SlackReceiptLlama.upload_this()
748@HEALPixPSF.set_class_attributes
749class IceCubeNeutrinoList(JSONFile, HEALPixPSF):
750 """
751 Takes a list of dictionaries describing neutrinos and saves it to a
752 text file. Validates the neutrino data to make sure each neutrino contains
753 (minimally) the following properties:
755 - mjd
756 - zenith
757 - azimuth
758 - sigma
759 - energy
760 - type (one the values found in ``NEUTRINO_TYPES``)
762 If this file is generated within the temporal search window for neutrinos,
763 it is possible not all neutrinos needed for the final result will be
764 available. In this case, the file will automatically become ``obsolete``
765 and will be regenerated after the window has elapsed.
767 Neutrinos from IceCube can be delayed due to communications issues; to
768 combat this, when neutrinos are pulled from the online data stream after
769 the temporal search window has elapsed, a check will be run to see if the
770 next neutrino after the end of the search window is yet available. If it is
771 not available, it is assumed that not all neutrinos needed for the search
772 have arrived from south pole, and a ``GenerationError`` will be raised,
773 allowing the pipeline to cooldown and retry the neutrino pull later.
775 Neutrinos pulled from before ``MJD_TOPIC_SWITCH_17_TO_BLANK`` will be
776 automatically pulled from local GFU archives (stored online on LLAMA S3 and
777 cached locally after use in ``llama.utils.CACHEDIR`` in ``*.npy`` format).
778 Neutrinos from after this date will be pulled from the online GFU stream.
779 """
781 FILENAME = "icecube_neutrino_list.json"
782 DEPENDENCIES = (SkymapInfo,)
783 DETECTORS = (detectors.IceCube,)
785 def _generate(self): # pylint: disable=W0221
786 """Take a list of neutrinos as an argument, validate, and save."""
787 from astropy.time import Time
788 # Must add small value (1e-5) to avoid {u'error': u'invalid start
789 # date'} due to I3Time.cxx problem with parsing even seconds when
790 # handling microseconds (see issue #19 on github). This is a kludge.
791 gpstime = SkymapInfo(self).event_time_gps_seconds + 1e-5
792 mjd_start = gps2mjd(gpstime-DELTAT_MINUS)
793 mjd_end = gps2mjd(gpstime+DELTAT_PLUS)
794 # if BLINDED_NEUTRINOS is 'false', let the blinding be done
795 # automatically
796 blind = self.flags['BLINDED_NEUTRINOS'] == 'true' or None
797 # if we are past the end of the temporal search window, make sure we're
798 # getting all neutrinos by checking whether new ones are available
799 # after the search window
800 check_complete = Time.now().gps > self.end_of_neutrino_window()
801 neutrinos = get_archive_and_real_neutrinos(
802 mjd_start,
803 mjd_end,
804 blinded_neutrinos=blind,
805 check_complete=check_complete,
806 )
807 self._write_json({'triggers': neutrinos, 'complete': check_complete})
809 @property
810 def template_skymap_filehandler(self):
811 return LvcSkymapHdf5
813 def source_locations(self):
814 neutrinos = self.neutrinos
815 locations = list()
816 # pylint: disable=C0103
817 for n in neutrinos:
818 ra, dec = zen_az2ra_dec(n.mjd, n.zenith, n.azimuth)
819 coords = (ra, dec, n.sigma)
820 if all([isinstance(x, float) for x in coords]):
821 locations.append(coords)
822 else:
823 LOGGER.warning('Excluding neutrinos with undefined coords: %s',
824 coords)
825 return locations
827 @property
828 def neutrinos(self):
829 """Return a list containing a ``Neutrino`` object for each neutrino;
830 more convenient to work with when writing formulae than dealing
831 directly with the neutrino dictionaries."""
832 data = self.read_json()
833 if isinstance(data, dict):
834 neutrinos = data['triggers']
835 else:
836 neutrinos = data
837 for neu in neutrinos:
838 if 'type' not in neu:
839 neu['type'] = 'unspecified'
840 elif neu['type'] not in NEUTRINO_TYPES:
841 raise ValueError(("loaded neutrinos, but given type {} not in "
842 "allowed values {}").format(neu['type'],
843 NEUTRINO_TYPES))
844 return [Neutrino(**n) for n in neutrinos]
846 @recursive_obsolescence
847 def is_obsolete(self, checked=None, **kwargs):
848 """``IceCubeNeutrinoList`` files can be fetched immediately after a GW
849 event becomes available. However, we want to eventually run our
850 analysis on all neutrinos in a certain time window around an event. We
851 therefore want to re-fetch the neutrinos in that time window after the
852 time window has passed (allowing a safety factor for the IceCube GFU
853 API to process new neutrino triggers). To do this, we mark the
854 ``IceCubeNeutrinoList`` as obsolete if the current time is after the
855 time window + safety margin has elapsed and the neutrino list was
856 generated before that time had passed (meaning there might have been
857 new neutrinos saved that are useful for the analysis after the file was
858 originally generated). Standard ``FileHandler`` obsolescence criteria
859 are also used to determine obsolescence through a call to ``super``."""
860 if not self.exists():
861 return False
862 if not self.is_complete():
863 return False
864 return super().is_obsolete(checked=checked, **kwargs)
866 def is_complete(self):
867 """
868 Whether the list of neutrinos is complete. Assumes incomplete if no
869 ``complete`` key specified.
870 """
871 return self.read_json().get('complete', False)
873 def end_of_neutrino_window(self):
874 """
875 The GPS time at which all neutrinos should be in under normal
876 circumstances.
877 """
878 return (SkymapInfo(self).event_time_gps + DELTAT_PLUS +
879 ICECUBE_MAX_DELAY)
881 @property
882 def num_triggers(self):
883 return len(self.neutrinos)
885 # pylint: disable=C0103
886 @deprecated
887 def downselect(self, ra=None, dec=None, ntype=None):
888 """Downselect the neutrino list to match given criteria. Save the old
889 neutrino list with the original filename prefixed by a timestamp and
890 replace it with the downselected list.
892 Parameters
893 ----------
894 ra : list
895 Right-ascension intervals (in degrees) that the neutrinos must fall
896 into, e.g. ``[(21, 41),(92, 102)]``
897 dec : list
898 Decliination intervals (in degrees) that the neutrinos must fall
899 into, e.g. ``[(-3, 20)]``
900 ntype : str
901 A type that the neutrino must match, e.g. ``'observation'`` for
902 real neutrinos.
903 """
905 # load the file
906 neutrinos = self.read_json()
908 # convert to RA/DEC if either of those selections is specified so that
909 # we can perform the actual downselect later
910 if not (ra is None and dec is None):
911 ra_list = list()
912 dec_list = list()
913 for neutrino in neutrinos:
914 neutrino_ra, neutrino_dec = zen_az2ra_dec(
915 neutrino['mjd'],
916 neutrino['zenith'],
917 neutrino['azimuth']
918 )
919 ra_list.append(neutrino_ra)
920 dec_list.append(neutrino_dec)
922 # apply RA downselection
923 if ra is not None:
924 # neutrinos = filter(get_ra_filter(ra), neutrinos)
925 test = get_ra_filter(ra)
926 neutrinos = [n for n in neutrinos if test(n)]
928 # apply Dec downselection
929 if dec is not None:
930 # neutrinos = filter(get_dec_filter(dec), neutrinos)
931 test = get_dec_filter(dec)
932 neutrinos = [n for n in neutrinos if test(n)]
934 # apply the type downselection
935 if ntype is not None:
936 tmp_neutrinos = list()
937 for neu in neutrinos:
938 if 'type' in neu:
939 if neu['type'] == ntype:
940 tmp_neutrinos.append(neu)
941 elif ntype == 'unspecified':
942 neu_modified = deepcopy(neu)
943 neu_modified['type'] = 'unspecified'
944 tmp_neutrinos.append(neu_modified)
945 neutrinos = tmp_neutrinos
947 # copy the old neutrino list as backup
948 nowstr = datetime.datetime.utcnow().isoformat().replace(':', '_')
949 outpath = os.path.join(
950 self.eventdir,
951 'preselect-{}-{}'.format(nowstr, self.FILENAME)
952 )
953 shutil.copyfile(self.fullpath, outpath)
955 # save the downselected neutrinos to the old file
956 self._write_json(neutrinos)