Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# (c) Stefan Countryman 2016-2018, IceCube collaboration 

2 

3""" 

4Provide ``realtime_tools`` functionality from IceCube so that the pipeline can 

5run without installing a full-fledged IceCube environment. In particular, 

6implement ``live.get_events`` and ``neutrino_singlet_pulls.calc_cr_pull``. 

7 

8User must provide a valid IceCube username and password as environmental 

9variables ``ICECUBE_USERNAME`` and ``ICECUBE_PASSWORD``, respectively. 

10""" 

11 

12import json 

13import time 

14import logging 

15import datetime 

16from math import pi 

17from llama.classes import optional_env_var 

18from llama.utils import RemoteFileCacher, utc2mjd 

19from llama.files.i3.utils import zen_az2ra_dec 

20 

21ANG_ERR_FLOOR = 0.2 * pi / 180 

22LOGGER = logging.getLogger(__name__) 

23URL_PFOW = 'https://live.icecube.wisc.edu/pfow_data/' 

24# some polynomial coefficients from neutrino_singlet_pulls.py 

25CR_POLY = [-0.033012, 0.71735, -5.9243, 23.795, -46.633, 36.409] 

26PRE_2018_TOPICS = [ 

27 'neutrino16', 

28 'neutrino17', 

29 'heseEvent16', 

30 'eheEvent16', 

31 'hbeheEvent16', 

32] 

33SPLINEMPEFAST_BOOTSTRAP_MUEX = RemoteFileCacher( 

34 'https://nyc3.digitaloceanspaces.com/llama/llama/objects/' 

35 '9f98ccd73b18c74a38cd07eb696bf9c339fcfb0513318cf4bbe7065890eac263' 

36) 

37SPLINEMPEFAST_BOOTSTRAP_TRUNCATEDE = RemoteFileCacher( 

38 'https://nyc3.digitaloceanspaces.com/llama/llama/objects/' 

39 '79e01972b1f046dfaa42e2e39a8c604baa5f47989c8c252ba38579f15d58fa46' 

40) 

41SPLINEMPEFAST_CRAMERRAO_MUEX = RemoteFileCacher( 

42 'https://nyc3.digitaloceanspaces.com/llama/llama/objects/' 

43 '9376ffcd48ca837eb376ef1dcd55f8958fc15c672e841dc110cdb2821e7084b5' 

44) 

45SPLINEMPEFAST_CRAMERRAO_TRUNCATEDE = RemoteFileCacher( 

46 'https://nyc3.digitaloceanspaces.com/llama/llama/objects/' 

47 '71ee87bb35fbbea18b1199610504dfa89d5ed92a22e1b22ee5c2fbac85a24e07' 

48) 

49SPLINEMPEFAST_PARABOLOID_MUEX = RemoteFileCacher( 

50 'https://nyc3.digitaloceanspaces.com/llama/llama/objects/' 

51 'bf08b1386d78ed319f907378f40e49e48bbc79c1887cc58fb93efb73e1fc6a98' 

52) 

53SPLINEMPEFAST_PARABOLOID_TRUNCATEDE = RemoteFileCacher( 

54 'https://nyc3.digitaloceanspaces.com/llama/llama/objects/' 

55 '96dcf3d58a0fa90da7566d3cb0bed9a77850122671d061091e4d6c9ef4e01d3e' 

56) 

57 

58SPLINE_MANIFEST = """ 

59{ 

60 "SplineMPEfast.Bootstrap.MuEX.json": [ 

61 "https://nyc3.digitaloceanspaces.com/llama/llama/objects/9f98ccd73b18c74a38cd07eb696bf9c339fcfb0513318cf4bbe7065890eac263", 

62 "9f98ccd73b18c74a38cd07eb696bf9c339fcfb0513318cf4bbe7065890eac263" 

63 ], 

64 "SplineMPEfast.Bootstrap.TruncatedE.json": [ 

65 "https://nyc3.digitaloceanspaces.com/llama/llama/objects/79e01972b1f046dfaa42e2e39a8c604baa5f47989c8c252ba38579f15d58fa46", 

66 "79e01972b1f046dfaa42e2e39a8c604baa5f47989c8c252ba38579f15d58fa46" 

67 ], 

68 "SplineMPEfast.CramerRao.MuEX.json": [ 

69 "https://nyc3.digitaloceanspaces.com/llama/llama/objects/9376ffcd48ca837eb376ef1dcd55f8958fc15c672e841dc110cdb2821e7084b5", 

70 "9376ffcd48ca837eb376ef1dcd55f8958fc15c672e841dc110cdb2821e7084b5" 

71 ], 

72 "SplineMPEfast.CramerRao.TruncatedE.json": [ 

73 "https://nyc3.digitaloceanspaces.com/llama/llama/objects/71ee87bb35fbbea18b1199610504dfa89d5ed92a22e1b22ee5c2fbac85a24e07", 

74 "71ee87bb35fbbea18b1199610504dfa89d5ed92a22e1b22ee5c2fbac85a24e07" 

75 ], 

76 "SplineMPEfast.Paraboloid.MuEX.json": [ 

77 "https://nyc3.digitaloceanspaces.com/llama/llama/objects/bf08b1386d78ed319f907378f40e49e48bbc79c1887cc58fb93efb73e1fc6a98", 

78 "bf08b1386d78ed319f907378f40e49e48bbc79c1887cc58fb93efb73e1fc6a98" 

79 ], 

80 "SplineMPEfast.Paraboloid.TruncatedE.json": [ 

81 "https://nyc3.digitaloceanspaces.com/llama/llama/objects/96dcf3d58a0fa90da7566d3cb0bed9a77850122671d061091e4d6c9ef4e01d3e", 

82 "96dcf3d58a0fa90da7566d3cb0bed9a77850122671d061091e4d6c9ef4e01d3e" 

83 ] 

84} 

85""" 

86 

87# read IceCube username and password 

88ICECUBE_USERNAME, ICECUBE_PASSWORD = optional_env_var( 

89 ['ICECUBE_USERNAME', 'ICECUBE_PASSWORD'], 

90 """ 

91 Must specify IceCube username and password as environmental 

92 variables ``ICECUBE_USERNAME`` and ``ICECUBE_PASSWORD``, 

93 "respectively in order to fetch IceCube data. 

94 """ 

95) 

96 

97 

98def to_datestring(value): 

99 """ 

100 Convert a given timestamp into a string of 'YYYY-MM-DD hh:mm:ss.ffffff'. 

101 

102 The input must be either an MJD (float) or a datetime object. 

103 

104 NOTE: This is modified from the IceCube code to run stand-alone. It *DOES* 

105 *NOT ACCEPT* ``I3Time`` instances. 

106 """ 

107 if not isinstance(value, datetime.datetime): 

108 import astropy 

109 value = astropy.time.Time(value, format='mjd').datetime 

110 return str(value) 

111 

112 

113def get_events(topic, starttime, stoptime, timekey=None, retries=10, delay=30): 

114 """ 

115 Query events from the I3Live database. 

116 

117 Parameters 

118 ---------- 

119 topic : str 

120 Name of the stream (e.g. 'neutrino16'). 

121 starttime : int or float 

122 start of timewindow 

123 stoptime : int or float 

124 stop of timewindow 

125 timekey : str 

126 which date/time to compare to, such as ``insert_time`` or 

127 ``value.data.eventtime`` (default: auto-detect) 

128 retries : int 

129 Number of retries in case of HTTP errors 

130 delay : int or float 

131 time delay between retries (default: 10 seconds) 

132 

133 Returns 

134 ------- 

135 events : list or None 

136 Events matching the given criteria or None in case of (repeated) HTTP 

137 error. 

138 

139 NOTE: This is modified from the IceCube code to run stand-alone. 

140 """ 

141 # Post-2018, actually specify varname as distinct from topic 

142 varname = topic if topic in PRE_2018_TOPICS else 'realtimeEvent' 

143 timekey = 'value.data.eventtime' if timekey is None else timekey 

144 return _get_pfow(varname, topic, timekey, starttime, stoptime, retries, 

145 delay) 

146 

147 

148def _get_pfow(varname, topic, timekey, starttime, stoptime, retries=10, 

149 delay=30): 

150 """ 

151 Implementation for ``get_events`` and ``get_events_data`` from the 

152 original IceCube realtime_tools module. 

153 """ 

154 from urllib.request import Request, urlopen 

155 from urllib.error import HTTPError 

156 from urllib.parse import urlencode 

157 if ICECUBE_USERNAME is None or ICECUBE_PASSWORD is None: 

158 raise RuntimeError("Must provide both `ICECUBE_USERNAME` and " 

159 "`ICECUBE_PASSWORD` as environmental variables.") 

160 try: 

161 data = urlencode({ 

162 'user': ICECUBE_USERNAME, 

163 'pass': ICECUBE_PASSWORD, 

164 'varname': varname, # this used to just be topic pre-2018 

165 'topic': topic, 

166 'timekey': timekey, 

167 'start': to_datestring(starttime), 

168 'stop': to_datestring(stoptime) 

169 }).encode('utf-8') # python2/python3 compatible conversion to bytes 

170 req = Request(URL_PFOW, data) 

171 

172 conn = urlopen(req) 

173 raw = conn.read() 

174 msgs = json.loads(raw) 

175 return msgs 

176 except HTTPError as err: 

177 LOGGER.warn('HTTP Error {0:}: {1:s}'.format(err.code, err.reason)) 

178 if retries > 0: 

179 LOGGER.warn('Trying again in %.2f seconds (%d tries left)', 

180 delay, retries) 

181 time.sleep(delay) 

182 return _get_pfow(varname, topic, timekey, starttime, stoptime, 

183 retries-1) 

184 LOGGER.error('Not trying again!') 

185 return None 

186 

187 

188def calc_cr_pull(cr_zen, cr_azi, zenith, muex): 

189 """Calculate CR pull corrected value, based on cr_zen,cr_azi, 

190 zenith and muex values based on Thomas's polynomial fit for 

191 neutrino16 sample 

192 

193 NOTE: This is modified from the IceCube code to run stand-alone.""" 

194 import numpy as np 

195 logmu = np.log10(muex) 

196 pull_corr = (CR_POLY[5] + 

197 CR_POLY[4]*logmu + 

198 CR_POLY[3]*np.power(logmu, 2) + 

199 CR_POLY[2]*np.power(logmu, 3) + 

200 CR_POLY[1]*np.power(logmu, 4) + 

201 CR_POLY[0]*np.power(logmu, 5)) 

202 avg_cr = np.sqrt(cr_zen**2 + (cr_azi*np.sin(zenith))**2)/np.sqrt(2) 

203 LOGGER.info("pullcr:" + " %s"*5, cr_zen, cr_azi, zenith, muex, pull_corr) 

204 return pull_corr*avg_cr 

205 

206 

207def events_dtype(): 

208 """Datatypes for events.""" 

209 import numpy as np 

210 return [ 

211 ('run', np.int), 

212 ('event', np.int), 

213 ('azi', np.float), 

214 ('zen', np.float), 

215 ('ra', np.float), 

216 ('dec', np.float), 

217 ('angErr', np.float), 

218 ('logE', np.float), 

219 ('time', np.float), 

220 ] 

221 

222 

223 

224def _supports(topics, version): 

225 """ 

226 Stub version of the 

227 ``icecube.realtime_gfu.eventcache.NeutrinoEventStream().supports`` method, 

228 published at: 

229 

230 https://code.icecube.wisc.edu/projects/icecube/browser/IceCube/ 

231 projects/realtime_gfu/releases/V19-02-00/python/eventcache.py 

232 

233 """ 

234 return ('neutrino' in topics) and (version == '2018a') 

235 

236 

237def parse_neutrino_event_stream(messages, dtype=None): 

238 """ 

239 Stub version of the 

240 ``icecube.realtime_gfu.eventcache.NeutrinoEventStream().parse`` method, 

241 published at: 

242 

243 https://code.icecube.wisc.edu/projects/icecube/browser/IceCube/ 

244 projects/realtime_gfu/releases/V19-02-00/python/eventcache.py 

245 

246 Parses and formats a bunch of neutrinos in the format returned by 

247 icecube.realtime_tools.get_live, picking the correct fields from those 

248 neutrinos such that the result is suitable for analyses. Based on 

249 discussions with Thomas Kintscher and Josh Wood. 

250 """ 

251 import numpy as np 

252 if dtype is None: 

253 dtype = events_dtype() 

254 

255 def ensure_finite(val): 

256 assert np.isfinite(val) 

257 return val 

258 

259 def none_to_nan(val): 

260 return val if val is not None else float('nan') 

261 

262 # ensure that this is a list of messages, even if it has only one entry 

263 messages = messages if isinstance(messages, list) else [ messages ] 

264 

265 # filter messages that can actually be parsed 

266 messages = [ msg for msg in messages 

267 if _supports(msg['value']['streams'], msg['value']['version']) ] 

268 

269 # create and fill output array 

270 arr = np.zeros(len(messages), dtype=dtype) 

271 for i, msg in enumerate(messages): 

272 logMuEX = np.log10(ensure_finite(msg['value']['data']['reco']['energy']['spline_muex'])) 

273 logTruncated = np.log10(none_to_nan(msg['value']['data']['reco']['energy']['truncated'])) 

274 azimuth = ensure_finite(msg['value']['data']['reco']['splinempe']['azi']) 

275 zenith = ensure_finite(msg['value']['data']['reco']['splinempe']['zen']) 

276 cr_azi = ensure_finite(msg['value']['data']['reco']['splinempe']['cr_azi']) 

277 cr_zen = ensure_finite(msg['value']['data']['reco']['splinempe']['cr_zen']) 

278 cramerrao = np.sqrt(0.5*(cr_zen**2. + cr_azi**2.*np.sin(zenith)**2.)) 

279 

280 pb_err1 = none_to_nan(msg['value']['data']['reco']['splinempe']['pb_err1']) 

281 pb_err2 = none_to_nan(msg['value']['data']['reco']['splinempe']['pb_err2']) 

282 paraboloid = np.sqrt(0.5*(pb_err1**2. + pb_err2**2.)) 

283 

284 bootstrap = none_to_nan(msg['value']['data']['reco']['splinempe']['bs_est']) 

285 

286 arr[i]['run'] = msg['value']['data']['run_id'] 

287 arr[i]['event'] = msg['value']['data']['event_id'] 

288 arr[i]['azi'] = azimuth 

289 arr[i]['zen'] = zenith 

290 arr[i]['logE'] = logMuEX 

291 arr[i]['time'] = utc2mjd(msg['value']['data']['eventtime']) 

292 ra_deg, dec_deg = zen_az2ra_dec( 

293 arr[i]['time'], # already in MJD, see above 

294 arr[i]['zen'] * 180. / np.pi, 

295 arr[i]['azi'] * 180. / np.pi 

296 ) 

297 arr[i]['ra'] = ra_deg * np.pi / 180. 

298 arr[i]['dec'] = dec_deg * np.pi / 180. 

299 arr[i]['angErr'] = pull_correct_online_2017(cramerrao, paraboloid, bootstrap, logMuEX, 'MuEX') 

300 

301 # apply floor on angular errors 

302 arr['angErr'] = arr['angErr'].clip(min=ANG_ERR_FLOOR) 

303 

304 return arr 

305 

306 

307def _finite_in_range(var, lo, hi): 

308 """Taken from: 

309 

310 https://code.icecube.wisc.edu/projects/icecube/browser/IceCube/ 

311 projects/realtime_gfu/releases/V19-02-00/python/angular_errors.py 

312 

313 """ 

314 import numpy as np 

315 finite_var = np.nan_to_num(var) 

316 return np.where(np.isfinite(var) & (lo < finite_var) & (finite_var < hi), finite_var, np.nan) 

317 

318 

319def load_spline(filename): 

320 """Taken from: 

321 

322 https://code.icecube.wisc.edu/projects/icecube/browser/IceCube/ 

323 projects/realtime_gfu/releases/V19-02-00/python/angular_errors.py 

324 

325 """ 

326 import scipy.interpolate 

327 with open(filename, 'r') as f: 

328 kwargs = json.load(f) 

329 spline = scipy.interpolate.UnivariateSpline(**kwargs) 

330 return spline 

331 

332 

333def pull_correct_online_2017(cramerrao, paraboloid, bootstrap, logE, mode='MuEX'): 

334 """Taken from: 

335 

336 https://code.icecube.wisc.edu/projects/icecube/browser/IceCube/ 

337 projects/realtime_gfu/releases/V19-02-00/python/angular_errors.py 

338 

339 """ 

340 import numpy as np 

341 paraboloid = _finite_in_range(paraboloid, 0., np.radians(25.)) 

342 bootstrap = _finite_in_range(bootstrap, np.radians(0.01), np.pi) 

343 if mode == 'MuEX': 

344 spline_bs = load_spline(SPLINEMPEFAST_BOOTSTRAP_MUEX.get()) 

345 spline_pb = load_spline(SPLINEMPEFAST_PARABOLOID_MUEX.get()) 

346 spline_cr = load_spline(SPLINEMPEFAST_CRAMERRAO_MUEX.get()) 

347 return pull_correction([bootstrap, paraboloid, cramerrao], logE, 

348 [spline_bs, spline_pb, spline_cr], 

349 fixed_fallback=None) 

350 elif mode == 'TruncatedEnergy': 

351 spline_bs = load_spline(SPLINEMPEFAST_BOOTSTRAP_TRUNCATEDE.get()) 

352 spline_pb = load_spline(SPLINEMPEFAST_PARABOLOID_TRUNCATEDE.get()) 

353 spline_cr = load_spline(SPLINEMPEFAST_CRAMERRAO_TRUNCATEDE.get()) 

354 return pull_correction([bootstrap, paraboloid, cramerrao], logE, 

355 [spline_bs, spline_pb, spline_cr], 

356 fixed_fallback=None) 

357 else: 

358 raise AttributeError('Unknown mode!') 

359 

360 

361def pull_correction(values, logE, splines, fixed_fallback=None): 

362 """Apply pull-correction to angular error estimators. Taken from: 

363 

364 https://code.icecube.wisc.edu/projects/icecube/browser/IceCube/ 

365 projects/realtime_gfu/releases/V19-02-00/python/angular_errors.py 

366 

367 Parameters 

368 ---------- 

369 values : list 

370 List of NumPy arrays with the sigma estimates. 

371 logE : array 

372 The logE of the events. 

373 splines : list 

374 Splines to be used for the corrections. 

375 fixed_fallback : float 

376 Should all estimators fail, use this fixed value 

377 """ 

378 import numpy as np 

379 values = [ np.atleast_1d(estimate) for estimate in values ] 

380 logE = np.atleast_1d(logE) 

381 assert len(values) == len(splines), \ 

382 'Need one spline per estimator.' 

383 assert all([ (len(values[i]) == len(logE)) for i in range(len(values)) ]), \ 

384 'The (all) estimator arrays and the logE array must have the same length.' 

385 

386 result = np.ones(len(values[0])) 

387 result.fill(np.nan) 

388 

389 masks = [] 

390 for i in range(len(values)): 

391 mask = np.isfinite(values[i]) & (np.nan_to_num(values[i]) > 0.) & np.isnan(result) 

392 result[mask] = values[i][mask] * np.power(10., splines[i](logE[mask])) 

393 

394 if fixed_fallback is not None: 

395 mask = np.isnan(result) 

396 result[mask] = fixed_fallback 

397 

398 return result