Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# (c) Stefan Countryman 2016-2018, Rainer Corley 2016 

2 

3""" 

4Methods and ``FileHandler`` classes associated with fetching IceCube neutrino 

5triggers. 

6""" 

7 

8import os 

9import re 

10from math import pi 

11from copy import deepcopy 

12from random import random 

13from fnmatch import fnmatch 

14import shutil 

15import logging 

16import datetime 

17import warnings 

18from deprecated import deprecated 

19from llama.utils import ( 

20 mjd2gps, 

21 utc2gps, 

22 gps2mjd, 

23 parameter_factory, 

24) 

25from llama.files.i3.utils import zen_az2ra_dec 

26from llama.filehandler import JSONFile, recursive_obsolescence, GenerationError 

27from llama.files.healpix import HEALPixPSF 

28from llama.files.healpix.psf import PsfGaussian 

29from llama.files.skymap_info import SkymapInfo 

30from llama.files.lvc_skymap import LvcSkymapHdf5 

31from llama.files.slack import SlackReceiptLlama 

32from llama.files.gracedb import GraceDBReceipt 

33from llama import detectors 

34from llama.classes import ( 

35 OptionalFeatureWarning, 

36 registerstub, 

37) 

38from llama.com.s3 import PrivateFileCacher 

39# try loading icecube functions from the icecube environment if possible; 

40# otherwise, define usable stub functions. 

41 

42LOGGER = logging.getLogger(__name__) 

43 

44try: 

45 from realtime_tools.neutrino_singlet_pulls import calc_cr_pull 

46 from realtime_tools.live import get_events 

47except ImportError: 

48 msg = ("Could not import ``realtime_tools`` IceCube project. Will " 

49 "use stub implementations of IceCube software instead for " 

50 "fetching IceCube data. If you have a workable version of " 

51 "``realtime_tools`` somewhere, add it to your ``PYTHONPATH`` " 

52 "environmental variable to use it here.") 

53 LOGGER.debug(msg) 

54 warnings.warn(msg, OptionalFeatureWarning) 

55 from llama.files.i3.realtime_tools_stubs import ( 

56 calc_cr_pull, 

57 get_events, 

58 ) 

59 registerstub('calc_cr_pull', __name__, calc_cr_pull) 

60 registerstub('get_events', __name__, get_events) 

61try: 

62 from realtime_gfu.eventcache import NeutrinoEventStream, _events_dtype 

63 

64 def parse_neutrino_event_stream(messages, dtype=_events_dtype): 

65 """Parses and formats a bunch of neutrinos in the format returned by 

66 icecube.realtime_tools.get_live, picking the correct fields from those 

67 neutrinos such that the result is suitable for analyses. Based on 

68 discussions with Thomas Kintscher and Josh Wood.""" 

69 return NeutrinoEventStream().parse(messages) 

70 

71except ImportError: 

72 msg = ("Could not import ``realtime_gfu``. Will use stub " 

73 "implementations of IceCube software instead for " 

74 "formatting IceCube GFU neutrinos. If you have a workable version " 

75 "of ``realtime_gfu`` somewhere, add it to your ``PYTHONPATH`` " 

76 "environmental variable to use it here.") 

77 LOGGER.debug(msg) 

78 warnings.warn(msg, OptionalFeatureWarning) 

79 from llama.files.i3.realtime_tools_stubs import ( 

80 parse_neutrino_event_stream, 

81 ) 

82 registerstub('parse_neutrino_event_stream', __name__, 

83 parse_neutrino_event_stream) 

84 

85# CONSTANTS 

86ARCHIVAL_NEUTRINOS = { 

87 'version-002-p03': { 

88 'IC86_2017_data.npy': PrivateFileCacher( 

89 'llama/objects/72c3c52642192d08b3dc07e048d2bae05a8e948ce58b0f66cf442e5f9daf903c', 

90 'llama' 

91 ), 

92 'README': PrivateFileCacher( 

93 'llama/objects/1147be5116304327993dcbbcc290d0ff2b1245245f16f049c5529f54b7f04b47', 

94 'llama' 

95 ), 

96 'IC86_2016_data.npy': PrivateFileCacher( 

97 'llama/objects/8fd4eca468fcd8314aa2c54179c44bf52ba2deac35071c5d4e88b7642b93eb30', 

98 'llama' 

99 ), 

100 'IC86_2015_data.npy': PrivateFileCacher( 

101 'llama/objects/2b17257a472fc26b12be072b5d93d65317892c05a40f01cdd62a450efb67e0fb', 

102 'llama' 

103 ), 

104 }, 

105 'version-002-p04': { 

106 'IC86_2018_data.npy': PrivateFileCacher( 

107 'llama/objects/3a744c78e71088dd92725341cb24a2ac649e33555cb8cba4cdb851e6c135ce34', 

108 'llama' 

109 ), 

110 'IC86_2011_data.npy': PrivateFileCacher( 

111 'llama/objects/c353bde7c87e706da7472144d0805e6cec65ff0ca475e272a2023c83a653403b', 

112 'llama' 

113 ), 

114 'IC86_2017_data.npy': PrivateFileCacher( 

115 'llama/objects/a6db2bb0998fd160cdceaf1eb8642fb10bd05105d647fdb9224d81fc0a30bf54', 

116 'llama' 

117 ), 

118 'IC86_2013_data.npy': PrivateFileCacher( 

119 'llama/objects/d27802ddb13b0a6c134c15680f1bc1772a39cb185b0380cd43f3938388424362', 

120 'llama' 

121 ), 

122 'README': PrivateFileCacher( 

123 'llama/objects/6038294f52bb921191a0349805d361045c95c5886385fcd193f2cb7c94c88610', 

124 'llama' 

125 ), 

126 'IC86_2012_data.npy': PrivateFileCacher( 

127 'llama/objects/ff8903825f6850a4eb4e5bc0cab5cddbf63d47333531818e55538abac9b1ac75', 

128 'llama' 

129 ), 

130 'IC86_2016_data.npy': PrivateFileCacher( 

131 'llama/objects/57c6b21b6e3b6c0ea8db115fa78d2731c6d9662d5a1026b303dda55c3ade70db', 

132 'llama' 

133 ), 

134 'IC86_2014_data.npy': PrivateFileCacher( 

135 'llama/objects/4ea085eb95eaddd1a6b04b4a48d2491e89e792a2948a94d6af59b29ee936ec55', 

136 'llama' 

137 ), 

138 'IC86_2015_data.npy': PrivateFileCacher( 

139 'llama/objects/6f383d315c243adb25b97535dd9274840cfa1cd6f21860f9d93fb8a4ac17b43c', 

140 'llama' 

141 ), 

142 }, 

143 'version-002-p05': { 

144 'IC86_2018_data.npy': PrivateFileCacher( 

145 'llama/objects/09a8b00889a59f746ad16552e72cf8b07a43123788e82664c48f1aa3001f00b8', 

146 'llama' 

147 ), 

148 'IC86_2011_data.npy': PrivateFileCacher( 

149 'llama/objects/ac630a536db6207a3be87cc1938633c162b31493fe9e5002afadff9774ac0ac9', 

150 'llama' 

151 ), 

152 'IC86_2017_data.npy': PrivateFileCacher( 

153 'llama/objects/88d5068e96d5260dae8648982196782d9a6ae3079f907f7eabe88d78562bae7c', 

154 'llama' 

155 ), 

156 'IC86_2013_data.npy': PrivateFileCacher( 

157 'llama/objects/f0848891cb47f0d5c99f8c16fed4c15ecaf306b1b408730b4b6b8d66d2ba557b', 

158 'llama' 

159 ), 

160 'README': PrivateFileCacher( 

161 'llama/objects/22ad2a675500fbf0d81518cf8a00b04fa59b74b2de5b73b49126236cb6c92d76', 

162 'llama' 

163 ), 

164 'IC86_2012_data.npy': PrivateFileCacher( 

165 'llama/objects/88d4944325c3495ee4a106b8e108d42d1aea1ffdc59449a7987c8143dd5f4cdd', 

166 'llama' 

167 ), 

168 'IC86_2016_data.npy': PrivateFileCacher( 

169 'llama/objects/d5011765d5b5f6c02e6f9c683a2e8ef15cf35192764bdb7089a39a29c0e7d47d', 

170 'llama' 

171 ), 

172 'IC86_2014_data.npy': PrivateFileCacher( 

173 'llama/objects/fbb7236a03c2670c07f2619c252326eda9156d44b66bcd919b9d61857144f4dd', 

174 'llama' 

175 ), 

176 'IC86_2015_data.npy': PrivateFileCacher( 

177 'llama/objects/74c28037cba7dbf6465fa9b0267463916cf5c1fda9280ffa39bfe9cd07db60b4', 

178 'llama' 

179 ), 

180 } 

181} 

182DEFAULT_ARCHIVE = ARCHIVAL_NEUTRINOS['version-002-p04'] 

183DELTAT_MINUS = 500 # how many seconds before the event to search in 

184DELTAT_PLUS = 500 # how many seconds after the event to search in 

185ICECUBE_MAX_DELAY = 100 

186ICECUBE_COMPLETE_CHECK_MAX_RETRIES = 10 

187MJD_TOPIC_SWITCH_16_TO_17 = 57891.168 

188MJD_TOPIC_SWITCH_17_TO_BLANK = 58309.741 

189# time intervals when we can unblind neutrinos in MJD 

190UNBLINDED_TIMES = ( 

191 (58574.0, 58940.0), # 2019-04-01 to 2020-04-01 

192) 

193# [deg] the floor on the angular uncertainty. Specified by Josh Wood via email 

194# pre-ER14. 

195MIN_SIGMA = 0.2 

196 

197# CONVERSIONS AND TOOLS 

198 

199 

200def get_dec_filter(dec): 

201 """Return a function that will filter neutrinos based on DECLINATION, e.g. 

202 for use with the ``filter`` function. 

203 

204 Parameters 

205 ---------- 

206 dec : list 

207 A list of declination intervals (also specified as lists) into 

208 which a selected neutrino should fall, e.g. [[ 21, 41 ],[ 92, 102 ]]. 

209 

210 Returns 

211 ------- 

212 dec_filter : function 

213 A function which returns ``True`` when passed a neutrino that falls 

214 within at least one of the declination intervals passed included in 

215 ``dec``. The neutrino is expected to be in the format returned by 

216 ``format_neutrinos``. 

217 

218 Examples 

219 -------- 

220 >>> neutrino = { 

221 ... 'azimuth': 246.16444118441663, 

222 ... 'bdt_up': -0.0155, 

223 ... 'energy': 498.10789, 

224 ... 'mjd': 57982.5321185039, 

225 ... 'sigma': 1.5478695443038595, 

226 ... 'zenith': 113.7504569829126 

227 ... } 

228 >>> ra, dec = zen_az2ra_dec( 

229 ... neutrino['mjd'], 

230 ... neutrino['zenith'], 

231 ... neutrino['azimuth'] 

232 ... ) 

233 >>> f"RA: {ra:.3f}, Dec: {dec:.3f}" 

234 'RA: 1.253, Dec: 23.653' 

235 >>> get_dec_filter([[0,30],[50,60]])(neutrino) 

236 True 

237 >>> get_dec_filter([[0,20],[50,60]])(neutrino) 

238 False 

239 """ 

240 

241 def dec_filter(neutrino): 

242 """Filter a neutrino based on Dec.""" 

243 dec_neutrino = zen_az2ra_dec( 

244 neutrino['mjd'], 

245 neutrino['zenith'], 

246 neutrino['azimuth'] 

247 )[1] 

248 for interval in dec: 

249 if dec_neutrino > interval[0] and dec_neutrino < interval[1]: 

250 return True 

251 return False 

252 

253 return dec_filter 

254 

255 

256def get_ra_filter(ra): # pylint: disable=C0103 

257 """Return a function that will filter neutrinos based on RIGHT ASCENSION, 

258 e.g. for use with the ``filter`` function. 

259 

260 Parameters 

261 ---------- 

262 ra : list 

263 A list of right ascension intervals (also specified as lists) into 

264 which a selected neutrino should fall, e.g. [[ 21, 41 ],[ 92, 102 ]]. 

265 

266 Returns 

267 ------- 

268 ra_filter : function 

269 A function which returns ``True`` when passed a neutrino that falls 

270 within at least one of the right ascension intervals passed included in 

271 ``ra``. The neutrino is expected to be in the format returned by 

272 ``format_neutrinos``. 

273 

274 Examples 

275 -------- 

276 >>> neutrino = { 

277 ... 'azimuth': 246.16444118441663, 

278 ... 'bdt_up': -0.0155, 

279 ... 'energy': 498.10789, 

280 ... 'mjd': 57982.5321185039, 

281 ... 'sigma': 1.5478695443038595, 

282 ... 'zenith': 113.7504569829126 

283 ... } 

284 >>> ra, dec = zen_az2ra_dec( 

285 ... neutrino['mjd'], 

286 ... neutrino['zenith'], 

287 ... neutrino['azimuth'] 

288 ... ) 

289 >>> f"RA: {ra:.3f}, Dec: {dec:.3f}" 

290 'RA: 1.253, Dec: 23.653' 

291 >>> get_ra_filter([[0, 30], [50, 60]])(neutrino) 

292 True 

293 >>> get_ra_filter([[5, 30], [50, 60]])(neutrino) 

294 False 

295 """ 

296 

297 # RA is periodic, so wrap it around in the interval [0, 360) for both 

298 # the neutrino RA and the interval RAs 

299 for interval in ra: 

300 interval[0] %= 360. 

301 interval[1] %= 360. 

302 

303 def ra_filter(neutrino): 

304 """Filter a neutrino based on RA.""" 

305 ra_neutrino = zen_az2ra_dec( 

306 neutrino['mjd'], 

307 neutrino['zenith'], 

308 neutrino['azimuth'] 

309 )[0] 

310 ra_neutrino %= 360 

311 for interval in ra: 

312 if ra_neutrino > interval[0] and ra_neutrino < interval[1]: 

313 return True 

314 return False 

315 

316 return ra_filter 

317 

318 

319def blinder(neutrino, unblinded_times=tuple()): 

320 """Randomize neutrino azimuth (and hence right ascension) for neutrinos 

321 triggered at times that are not explicitly approved for unblinded analysis. 

322 

323 Parameters 

324 ---------- 

325 neutrino : dict 

326 A neutrino dictionary in the format returned by format_neutrinos. 

327 unblinded_times : array_like, optional 

328 An iterable of [start, end] times (specified in UTC MJD format) in 

329 which we are approved by IceCube to pull unblinded neutrinos for our 

330 analysis. If not provided, it is assumed that ALL neutrinos should be 

331 blinded (DEFAULT: ``tuple()``). 

332 

333 Returns 

334 ------- 

335 blinded : dict 

336 The same neutrino provided as a parameter with the azimuth (and hence 

337 right ascension) with RA scrambled if necessary. If the neutrino's MJD 

338 falls inside one of these intervals, it will be returned unchanged. If 

339 it falls outside all of these intervals, its azimuth will be set to a 

340 random value. This is a fresh copy of the neutrino; the neutrino 

341 provided as input is *not* modified in place. 

342 

343 Examples 

344 -------- 

345 >>> neutrino_time = 57990 

346 >>> unblinded_times = [[neutrino_time-1, neutrino_time+1]] 

347 >>> neutrino = { 

348 ... "mjd": neutrino_time, 

349 ... "zenith": 90, 

350 ... "sigma": 0.3, 

351 ... "energy": 5000, 

352 ... "azimuth": 270, 

353 ... "bdt_up": 1, 

354 ... } 

355 >>> blind = blinder(neutrino, unblinded_times=[]) 

356 >>> blind['azimuth'] == neutrino['azimuth'] 

357 False 

358 >>> unblind = blinder(neutrino, unblinded_times=unblinded_times) 

359 >>> unblind['azimuth'] == neutrino['azimuth'] 

360 True 

361 """ 

362 blinded = deepcopy(neutrino) 

363 if not any(neutrino['mjd'] > start and neutrino['mjd'] < end 

364 for start, end in unblinded_times): 

365 blinded['azimuth'] = 360.*random() 

366 blinded['type'] = 'blinded' 

367 # For the sake of the unit test, don't let the output neutrino match 

368 # the input neutrino's azimuth. This will have no significant effect on 

369 # anything else. 

370 if blinded['azimuth'] == neutrino['azimuth']: 

371 blinded['azimuth'] = (blinded['azimuth'] + 180.) % 360. 

372 return blinded 

373 

374 

375def neutrino_archive_available_years(): 

376 """Get a list of years for which archival GFU neutrinos are available. 

377 These files are stored remotely on LLAMA S3 (see ``llama.com.s3``) as 

378 ``.npy`` files and are cached locally in ``llama.utils.OBJECT_DIR``; these 

379 are private files that require LLAMA S3 credentials to access. Archival 

380 neutrino releases available via LLAMA S3 are recorded in 

381 ``ARCHIVAL_NEUTRINOS``, and the default archival release version is 

382 contained in ``DEFAULT_ARCHIVE``. 

383 

384 Examples 

385 -------- 

386 Confirm that we have access to archival neutrinos from 2011 through 2018: 

387 

388 >>> neutrino_archive_available_years() 

389 [2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018] 

390 """ 

391 regex = re.compile(r'IC\d+_(\d\d\d\d)_data.npy') 

392 matches = [regex.match(k) for k in DEFAULT_ARCHIVE] 

393 return sorted(int(m.groups()[0]) for m in matches if m is not None) 

394 

395 

396def get_archive(mjd_start, mjd_end): 

397 """Fetch neutrinos from a local archive. Archive must be defined and stored 

398 on LLAMA S3 for the queried neutrinos, **and you must have access to LLAMA 

399 S3 API credentials to retrieve the archive**. 

400 

401 Parameters 

402 ---------- 

403 mjd_start : float 

404 The start of the time window in which to search for neutrinos in 

405 Modified Julian Date (MJD) format. Use ``gps2mjd`` or ``utc2mjd`` from 

406 the ``llama.utils`` module to make the time conversions to the 

407 appropriate format. 

408 mjd_end : float 

409 The end of the time window in which to search for neutrinos. 

410 

411 Returns 

412 ------- 

413 formatted : list 

414 A list of neutrino dictionaries formatted as expected by 

415 IceCubeNeutrinoList. If mjd_start > mjd_end, no error will be raised; 

416 instead, an empty list of neutrinos will be returned. 

417 

418 Raises 

419 ------ 

420 FileNotFoundError 

421 If data for the requested time range is not available. 

422 

423 Examples 

424 -------- 

425 You can confirm that the archival neutrino data is available by fetching a 

426 specific trigger from the archive: 

427 >>> neutrino = get_archive(58392, 58393)[0] 

428 >>> neutrino['run'] 

429 131569 

430 >>> neutrino['event'] 

431 43188689 

432 """ 

433 from astropy.time import Time 

434 start_year = Time(mjd_start, format='mjd').datetime.year 

435 end_year = Time(mjd_end, format='mjd').datetime.year 

436 if not {start_year, end_year}.issubset(neutrino_archive_available_years()): 

437 raise FileNotFoundError(("Could not find GFU archive neutrinos for " 

438 "requested years: {}, {}").format(start_year, 

439 end_year)) 

440 archives = [v for k, v in DEFAULT_ARCHIVE.items() 

441 if fnmatch(k, "IC*_*.npy")] 

442 if not archives: 

443 return [] 

444 import numpy as np 

445 neutrinos = [ 

446 { 

447 'run': int(n[0]), 

448 'event': int(n[1]), 

449 'subevent': int(n[2]), 

450 'type': 'archival', 

451 'file': str(arc.localpath), 

452 'mjd': float(n[3]), 

453 'azimuth': float(n[4]*180./np.pi), 

454 'zenith': float(n[5]*180./np.pi), 

455 'ra': float(n[6]*180./np.pi), 

456 'dec': float(n[7]*180./np.pi), 

457 'energy': float(10**n[9]), 

458 'sigma': float(n[8]*180./np.pi), 

459 } 

460 for arc in archives 

461 for n in np.load(arc.get()) 

462 if n[3] > mjd_start and n[3] < mjd_end 

463 ] 

464 assert all(n['ra'] <= 360 and n['ra'] >= 0 for n in neutrinos) 

465 assert all(n['dec'] <= 90 and n['dec'] >= -90 for n in neutrinos) 

466 assert all(n['azimuth'] <= 360 and n['azimuth'] >= 0 for n in neutrinos) 

467 assert all(n['zenith'] <= 180 and n['zenith'] >= 0 for n in neutrinos) 

468 assert all(n['energy'] <= 10**9 for n in neutrinos) 

469 assert all(n['sigma'] <= 180 for n in neutrinos) 

470 assert all(n['mjd'] > 0 for n in neutrinos) 

471 return neutrinos 

472 

473 

474def get_real(mjd_start, mjd_end, check_complete=False): 

475 """Fetch real neutrinos from ICECUBE's database; takes gps time of the 

476 events and the time range abnut this central value over which to search 

477 for neutrinos. 

478 

479 Parameters 

480 ---------- 

481 mjd_start : float 

482 The start of the time window in which to search for neutrinos in 

483 Modified Julian Date (MJD) format. Use ``gps2mjd`` or ``utc2mjd`` from 

484 the ``llama.utils`` module to make the time conversions to the 

485 appropriate format. 

486 mjd_end : float 

487 The end of the time window in which to search for neutrinos. 

488 check_complete : bool, optional 

489 Neutrinos from IceCube can be delayed due to communications issues; to 

490 combat this, when ``check_complete`` is ``True``, a check will be run 

491 to see if the next neutrino after the end of the search window is yet 

492 available. If it is not available, it is assumed that not all neutrinos 

493 needed for the search have arrived from south pole, and a 

494 ``GenerationError`` will be raised, allowing the pipeline to cooldown 

495 and retry the neutrino pull later. 

496 

497 Returns 

498 ------- 

499 formatted : list 

500 A list of neutrino dictionaries formatted as expected by 

501 IceCubeNeutrinoList. If mjd_start > mjd_end, no error will be raised; 

502 instead, an empty list of neutrinos will be returned. 

503 

504 Raises 

505 ------ 

506 ValueError 

507 If the start time of the neutrino pull predates the switch from the 

508 'neutrino17' topic to the 'neutrino' topic (in late 2018), a 

509 ``ValueError`` is raised. The date of this switch is given by 

510 ``MJD_TOPIC_SWITCH_17_TO_BLANK``. 

511 

512 Examples 

513 -------- 

514 Get some neutrinos from late-2018 on the 'neutrino' topic: 

515 >>> from llama.utils import utc2mjd 

516 >>> mjd = utc2mjd("2018-09-30") 

517 >>> len(get_real(mjd-500/86400., mjd+500/86400.)) > 0 

518 True 

519 

520 If your end time is earlier than your start time, you'll get an empty 

521 neutrino list: 

522 >>> get_real(mjd, mjd-1) 

523 [] 

524 

525 You should be able to fetch this neutrino from the online GFU database: 

526 >>> neutrino = get_archive(58392, 58393)[0] 

527 >>> neutrino['run'] 

528 131569 

529 >>> neutrino['event'] 

530 43188689 

531 """ 

532 if mjd_start >= mjd_end: 

533 return [] 

534 # the neutrino 'topic' changed in spring of 2017 

535 if mjd_end < MJD_TOPIC_SWITCH_17_TO_BLANK: 

536 LOGGER.error("Cannot pull neutrinos from earlier than %s; requested " 

537 "interval: (%s, %s)", MJD_TOPIC_SWITCH_17_TO_BLANK, 

538 mjd_start, mjd_end) 

539 raise ValueError(("Pulling GFU neutrinos from the neutrino17 and " 

540 "earlier topics is no longer supported. Queried " 

541 "interval ({}, {}).").format(mjd_start, mjd_end)) 

542 LOGGER.debug('Pulling ICECUBE neutrinos from %s to %s', mjd_start, mjd_end) 

543 # the neutrino 'topic' changed again in july 2018 

544 formatted = format_neutrinos(get_events('neutrino', mjd_start, mjd_end)) 

545 LOGGER.debug('Formatted ICECUBE neutrinos:') 

546 LOGGER.debug(str(formatted)) 

547 if check_complete: 

548 # make sure that all neutrinos are in by confirming that more neutrinos 

549 # from outside the time window are available. 

550 check_end = mjd_end 

551 for attempt in range(ICECUBE_COMPLETE_CHECK_MAX_RETRIES): 

552 check_start = check_end 

553 check_end = check_start + ICECUBE_MAX_DELAY / 86400 

554 LOGGER.debug('Checking whether this list is complete by pulling ' 

555 'neutrinos from next time slice %s - %s (attempt %s)', 

556 check_start, check_end, attempt) 

557 next_nu = get_real(check_start, check_end, check_complete=False) 

558 if next_nu: 

559 LOGGER.debug('Found extra neutrinos after the search window; ' 

560 'all neutrinos are likely available. Post-window ' 

561 'neutrinos: %s', next_nu) 

562 break 

563 else: 

564 msg = ('Could not confirm that all neutrinos are available from ' 

565 f'MJD search window {mjd_start} - {mjd_end} since no ' 

566 'subsequent neutrinos were found in ' 

567 f'{ICECUBE_COMPLETE_CHECK_MAX_RETRIES} consecutive time ' 

568 f'windows of width {ICECUBE_MAX_DELAY} seconds. It is ' 

569 'likely that not all neutrinos from the search window have ' 

570 'arrived from south pole.') 

571 LOGGER.error(msg) 

572 raise GenerationError(msg) 

573 return formatted 

574 

575 

576def get_archive_and_real_neutrinos(mjd_start, mjd_end, blinded_neutrinos=None, 

577 check_complete=False): 

578 """Get a mix of archival and real online GFU neutrinos (automatically calls 

579 ``get_archive`` and ``get_real`` as appropriate). Pulls online neutrinos 

580 for dates after ``MJD_TOPIC_SWITCH_17_TO_BLANK`` and archival neutrinos for 

581 dates prior. 

582 

583 Parameters 

584 ---------- 

585 mjd_start : float 

586 The start of the time window in which to search for neutrinos in 

587 Modified Julian Date (MJD) format. Use ``gps2mjd`` or ``utc2mjd`` from 

588 the ``llama.utils`` module to make the time conversions to the 

589 appropriate format. 

590 mjd_end : float 

591 The end of the time window in which to search for neutrinos. 

592 blinded_neutrinos : bool, optional 

593 Whether the neutrinos should be forcefully blinded or not. Use this to 

594 blind the neutrinos even if they are from time windows approved for 

595 unblinding (e.g. when doing background sensitivity studies). If not 

596 specified, use ``UNBLINDED_TIMES`` to determine which neutrinos can be 

597 unblinded. 

598 check_complete : bool, optional 

599 Neutrinos from IceCube can be delayed due to communications issues; to 

600 combat this, when neutrinos are pulled from the online data stream 

601 and ``check_complete`` is ``True``, a check will be run to see if the 

602 next neutrino after the end of the search window is yet available. If 

603 it is not available, it is assumed that not all neutrinos needed for 

604 the search have arrived from south pole, and a ``GenerationError`` will 

605 be raised, allowing the pipeline to cooldown and retry the neutrino 

606 pull later. 

607 

608 Returns 

609 ------- 

610 formatted : list 

611 A list of neutrino dictionaries formatted as expected by 

612 IceCubeNeutrinoList. If mjd_start > mjd_end, no error will be raised; 

613 instead, an empty list of neutrinos will be returned. Each neutrino 

614 will be blinded if ``blinded_neutrinos`` is ``True`` or if its event 

615 time falls outside the list of approved ``UNBLINDED_TIMES``. 

616 

617 Examples 

618 -------- 

619 Pull some blinded archival neutrinos 

620 >>> a_start = MJD_TOPIC_SWITCH_17_TO_BLANK-1.77 

621 >>> a_end = MJD_TOPIC_SWITCH_17_TO_BLANK 

622 >>> arc = get_archive_and_real_neutrinos(a_start, a_end, True) 

623 >>> len(arc) > 0 

624 True 

625 

626 Pull some blinded GFU neutrinos 

627 >>> l_start = MJD_TOPIC_SWITCH_17_TO_BLANK 

628 >>> l_end = MJD_TOPIC_SWITCH_17_TO_BLANK + 500/86400. 

629 >>> live = get_archive_and_real_neutrinos(l_start, l_end, True) 

630 >>> len(live) > 0 

631 True 

632 

633 Pull a mix of archival and blinded neutrinos 

634 >>> mix = get_archive_and_real_neutrinos(a_start, l_end, True) 

635 >>> len(arc) + len(live) == len(mix) 

636 True 

637 """ 

638 neutrinos = [] 

639 if mjd_start < MJD_TOPIC_SWITCH_17_TO_BLANK: 

640 if mjd_end < MJD_TOPIC_SWITCH_17_TO_BLANK: 

641 end = mjd_end 

642 else: 

643 end = MJD_TOPIC_SWITCH_17_TO_BLANK 

644 neutrinos += get_archive(mjd_start, end) 

645 if mjd_end > MJD_TOPIC_SWITCH_17_TO_BLANK: 

646 if mjd_start > MJD_TOPIC_SWITCH_17_TO_BLANK: 

647 start = mjd_start 

648 else: 

649 start = MJD_TOPIC_SWITCH_17_TO_BLANK 

650 neutrinos += get_real(start, mjd_end, check_complete=check_complete) 

651 if blinded_neutrinos is None: 

652 LOGGER.info("``blinded_neutrinos`` not specified, unblinding as " 

653 "specified in ``UNBLINDED_TIMES``.") 

654 neutrinos = [blinder(n, UNBLINDED_TIMES) for n in neutrinos] 

655 elif blinded_neutrinos: 

656 LOGGER.info("``blinded_neutrinos`` is 'true'; force-blinding " 

657 "all neutrinos.") 

658 neutrinos = [blinder(n) for n in neutrinos] 

659 else: 

660 LOGGER.info("``blinded_neutrinos`` is 'false'; unblinding.") 

661 for neutrino in neutrinos: 

662 for required_neutrino_prop in NEUTRINO_PROPERTIES: 

663 if required_neutrino_prop not in neutrino: 

664 raise ValueError('Input list of neutrinos incomplete:' + 

665 str(required_neutrino_prop)) 

666 return neutrinos 

667 

668 

669def format_neutrinos(neutrinos): 

670 """Read in ICECUBE neutrinos as returned by realtime_tools.live.get_events 

671 and put them in the format that LLAMA expects.""" 

672 parsed_events = parse_neutrino_event_stream(neutrinos) 

673 formatted = [{k: e[i] for i, k in enumerate(parsed_events.dtype.names)} 

674 for e in parsed_events] 

675 # fit our internal formatting 

676 for neutrino in formatted: 

677 neutrino['ra'] = float(neutrino['ra']) * 180. / pi 

678 neutrino['dec'] = float(neutrino['dec']) * 180. / pi 

679 neutrino['mjd'] = float(neutrino['time']) 

680 del neutrino['time'] 

681 neutrino['sigma'] = float(neutrino['angErr'] * 180. / pi) 

682 del neutrino['angErr'] 

683 neutrino['zenith'] = float(neutrino['zen'] * 180. / pi) 

684 del neutrino['zen'] 

685 neutrino['azimuth'] = float(neutrino['azi'] * 180. / pi) 

686 del neutrino['azi'] 

687 neutrino['energy'] = float(10**neutrino['logE']) 

688 del neutrino['logE'] 

689 neutrino['event'] = int(neutrino['event']) 

690 neutrino['run'] = int(neutrino['run']) 

691 # mark as observational 

692 neutrino['type'] = 'observation' 

693 return formatted 

694 

695 

696NEUTRINO_PROPERTIES = ['mjd', 'zenith', 'azimuth', 'energy', 'sigma', 'type'] 

697NEUTRINO_TYPES = ['observation', 'blinded', 'injected', 'unspecified', 

698 'archival'] 

699 

700 

701# these methods are used for the Neutrino class 

702def _neutrino_gps(self): 

703 """GPS time, i.e. number of GPS seconds since start of GPS epoch, when 

704 the neutrino candidate was detected.""" 

705 return mjd2gps(self.mjd) 

706 

707 

708def _neutrino_ra(self): 

709 """Right Ascension of neutrino candidate in degrees.""" 

710 return zen_az2ra_dec(self.mjd, self.zenith, self.azimuth)[0] 

711 

712 

713def _neutrino_dec(self): 

714 """Declination of neutrino candidate in degrees.""" 

715 return zen_az2ra_dec(self.mjd, self.zenith, self.azimuth)[1] 

716 

717 

718def _neutrino_psf(self): 

719 """A Psf object for this neutrino that can be used to take products of this 

720 neutrino's localization PSF with other skymap objects.""" 

721 return PsfGaussian(ra=self.ra, dec=self.dec, sigma=self.sigma) 

722 

723 

724Neutrino = parameter_factory( 

725 "Neutrino", 

726 """A class for holding neutrino trigger data in easily accessible form. 

727 Also provides convenience methods for coordinate conversions of underlying 

728 data. Good for plugging values into formulae. 

729 """, 

730 mjd="""Modified Julian Date in UTC epoch at which the neutrino candidate 

731 was detected.""", 

732 zenith="""Zenith angle of neutrino source direction, measured from 

733 IceCube's location at South Pole.""", 

734 azimuth="""Azimuthal angle of neutrino source direction, measured from 

735 IceCube's location at South Pole.""", 

736 energy="Reconstructed energy of the neutrino candidate in GeV.", 

737 sigma="Effective uncertainty radius in degrees.", 

738 gps=_neutrino_gps, 

739 ra=_neutrino_ra, 

740 dec=_neutrino_dec, 

741 psf=_neutrino_psf, 

742) 

743 

744 

745@GraceDBReceipt.upload_this('GWHEN ICECUBE GFU neutrinos from a +/- 500s ' 

746 'window surrounding the event.') 

747@SlackReceiptLlama.upload_this() 

748@HEALPixPSF.set_class_attributes 

749class IceCubeNeutrinoList(JSONFile, HEALPixPSF): 

750 """ 

751 Takes a list of dictionaries describing neutrinos and saves it to a 

752 text file. Validates the neutrino data to make sure each neutrino contains 

753 (minimally) the following properties: 

754 

755 - mjd 

756 - zenith 

757 - azimuth 

758 - sigma 

759 - energy 

760 - type (one the values found in ``NEUTRINO_TYPES``) 

761 

762 If this file is generated within the temporal search window for neutrinos, 

763 it is possible not all neutrinos needed for the final result will be 

764 available. In this case, the file will automatically become ``obsolete`` 

765 and will be regenerated after the window has elapsed. 

766 

767 Neutrinos from IceCube can be delayed due to communications issues; to 

768 combat this, when neutrinos are pulled from the online data stream after 

769 the temporal search window has elapsed, a check will be run to see if the 

770 next neutrino after the end of the search window is yet available. If it is 

771 not available, it is assumed that not all neutrinos needed for the search 

772 have arrived from south pole, and a ``GenerationError`` will be raised, 

773 allowing the pipeline to cooldown and retry the neutrino pull later. 

774 

775 Neutrinos pulled from before ``MJD_TOPIC_SWITCH_17_TO_BLANK`` will be 

776 automatically pulled from local GFU archives (stored online on LLAMA S3 and 

777 cached locally after use in ``llama.utils.CACHEDIR`` in ``*.npy`` format). 

778 Neutrinos from after this date will be pulled from the online GFU stream. 

779 """ 

780 

781 FILENAME = "icecube_neutrino_list.json" 

782 DEPENDENCIES = (SkymapInfo,) 

783 DETECTORS = (detectors.IceCube,) 

784 

785 def _generate(self): # pylint: disable=W0221 

786 """Take a list of neutrinos as an argument, validate, and save.""" 

787 from astropy.time import Time 

788 # Must add small value (1e-5) to avoid {u'error': u'invalid start 

789 # date'} due to I3Time.cxx problem with parsing even seconds when 

790 # handling microseconds (see issue #19 on github). This is a kludge. 

791 gpstime = SkymapInfo(self).event_time_gps_seconds + 1e-5 

792 mjd_start = gps2mjd(gpstime-DELTAT_MINUS) 

793 mjd_end = gps2mjd(gpstime+DELTAT_PLUS) 

794 # if BLINDED_NEUTRINOS is 'false', let the blinding be done 

795 # automatically 

796 blind = self.flags['BLINDED_NEUTRINOS'] == 'true' or None 

797 # if we are past the end of the temporal search window, make sure we're 

798 # getting all neutrinos by checking whether new ones are available 

799 # after the search window 

800 check_complete = Time.now().gps > self.end_of_neutrino_window() 

801 neutrinos = get_archive_and_real_neutrinos( 

802 mjd_start, 

803 mjd_end, 

804 blinded_neutrinos=blind, 

805 check_complete=check_complete, 

806 ) 

807 self._write_json({'triggers': neutrinos, 'complete': check_complete}) 

808 

809 @property 

810 def template_skymap_filehandler(self): 

811 return LvcSkymapHdf5 

812 

813 def source_locations(self): 

814 neutrinos = self.neutrinos 

815 locations = list() 

816 # pylint: disable=C0103 

817 for n in neutrinos: 

818 ra, dec = zen_az2ra_dec(n.mjd, n.zenith, n.azimuth) 

819 coords = (ra, dec, n.sigma) 

820 if all([isinstance(x, float) for x in coords]): 

821 locations.append(coords) 

822 else: 

823 LOGGER.warning('Excluding neutrinos with undefined coords: %s', 

824 coords) 

825 return locations 

826 

827 @property 

828 def neutrinos(self): 

829 """Return a list containing a ``Neutrino`` object for each neutrino; 

830 more convenient to work with when writing formulae than dealing 

831 directly with the neutrino dictionaries.""" 

832 data = self.read_json() 

833 if isinstance(data, dict): 

834 neutrinos = data['triggers'] 

835 else: 

836 neutrinos = data 

837 for neu in neutrinos: 

838 if 'type' not in neu: 

839 neu['type'] = 'unspecified' 

840 elif neu['type'] not in NEUTRINO_TYPES: 

841 raise ValueError(("loaded neutrinos, but given type {} not in " 

842 "allowed values {}").format(neu['type'], 

843 NEUTRINO_TYPES)) 

844 return [Neutrino(**n) for n in neutrinos] 

845 

846 @recursive_obsolescence 

847 def is_obsolete(self, checked=None, **kwargs): 

848 """``IceCubeNeutrinoList`` files can be fetched immediately after a GW 

849 event becomes available. However, we want to eventually run our 

850 analysis on all neutrinos in a certain time window around an event. We 

851 therefore want to re-fetch the neutrinos in that time window after the 

852 time window has passed (allowing a safety factor for the IceCube GFU 

853 API to process new neutrino triggers). To do this, we mark the 

854 ``IceCubeNeutrinoList`` as obsolete if the current time is after the 

855 time window + safety margin has elapsed and the neutrino list was 

856 generated before that time had passed (meaning there might have been 

857 new neutrinos saved that are useful for the analysis after the file was 

858 originally generated). Standard ``FileHandler`` obsolescence criteria 

859 are also used to determine obsolescence through a call to ``super``.""" 

860 if not self.exists(): 

861 return False 

862 if not self.is_complete(): 

863 return False 

864 return super().is_obsolete(checked=checked, **kwargs) 

865 

866 def is_complete(self): 

867 """ 

868 Whether the list of neutrinos is complete. Assumes incomplete if no 

869 ``complete`` key specified. 

870 """ 

871 return self.read_json().get('complete', False) 

872 

873 def end_of_neutrino_window(self): 

874 """ 

875 The GPS time at which all neutrinos should be in under normal 

876 circumstances. 

877 """ 

878 return (SkymapInfo(self).event_time_gps + DELTAT_PLUS + 

879 ICECUBE_MAX_DELAY) 

880 

881 @property 

882 def num_triggers(self): 

883 return len(self.neutrinos) 

884 

885 # pylint: disable=C0103 

886 @deprecated 

887 def downselect(self, ra=None, dec=None, ntype=None): 

888 """Downselect the neutrino list to match given criteria. Save the old 

889 neutrino list with the original filename prefixed by a timestamp and 

890 replace it with the downselected list. 

891 

892 Parameters 

893 ---------- 

894 ra : list 

895 Right-ascension intervals (in degrees) that the neutrinos must fall 

896 into, e.g. ``[(21, 41),(92, 102)]`` 

897 dec : list 

898 Decliination intervals (in degrees) that the neutrinos must fall 

899 into, e.g. ``[(-3, 20)]`` 

900 ntype : str 

901 A type that the neutrino must match, e.g. ``'observation'`` for 

902 real neutrinos. 

903 """ 

904 

905 # load the file 

906 neutrinos = self.read_json() 

907 

908 # convert to RA/DEC if either of those selections is specified so that 

909 # we can perform the actual downselect later 

910 if not (ra is None and dec is None): 

911 ra_list = list() 

912 dec_list = list() 

913 for neutrino in neutrinos: 

914 neutrino_ra, neutrino_dec = zen_az2ra_dec( 

915 neutrino['mjd'], 

916 neutrino['zenith'], 

917 neutrino['azimuth'] 

918 ) 

919 ra_list.append(neutrino_ra) 

920 dec_list.append(neutrino_dec) 

921 

922 # apply RA downselection 

923 if ra is not None: 

924 # neutrinos = filter(get_ra_filter(ra), neutrinos) 

925 test = get_ra_filter(ra) 

926 neutrinos = [n for n in neutrinos if test(n)] 

927 

928 # apply Dec downselection 

929 if dec is not None: 

930 # neutrinos = filter(get_dec_filter(dec), neutrinos) 

931 test = get_dec_filter(dec) 

932 neutrinos = [n for n in neutrinos if test(n)] 

933 

934 # apply the type downselection 

935 if ntype is not None: 

936 tmp_neutrinos = list() 

937 for neu in neutrinos: 

938 if 'type' in neu: 

939 if neu['type'] == ntype: 

940 tmp_neutrinos.append(neu) 

941 elif ntype == 'unspecified': 

942 neu_modified = deepcopy(neu) 

943 neu_modified['type'] = 'unspecified' 

944 tmp_neutrinos.append(neu_modified) 

945 neutrinos = tmp_neutrinos 

946 

947 # copy the old neutrino list as backup 

948 nowstr = datetime.datetime.utcnow().isoformat().replace(':', '_') 

949 outpath = os.path.join( 

950 self.eventdir, 

951 'preselect-{}-{}'.format(nowstr, self.FILENAME) 

952 ) 

953 shutil.copyfile(self.fullpath, outpath) 

954 

955 # save the downselected neutrinos to the old file 

956 self._write_json(neutrinos)