Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# (c) Stefan Countryman, 2019 

2 

3""" 

4Tools for listening to LVAlerts, LIGO/Virgo's internal GW trigger alert system. 

5""" 

6 

7import sys 

8import time 

9import traceback 

10import json 

11import os 

12import datetime 

13import logging 

14from random import choices 

15# from queue import Queue 

16from multiprocessing import Queue # since our queue is shared between procs 

17from typing import Tuple 

18from types import FunctionType 

19from six import string_types 

20from llama.listen import flag_preset_freeze_veto 

21from llama.event import Event 

22from llama.classes import optional_env_var 

23from llama.files.lvc_skymap import SKYMAP_FILENAMES 

24from llama.files.skymap_info.utils import is_super 

25 

26LOGGER = logging.getLogger(__name__) 

27SERVER = 'lvalert.cgca.uwm.edu' 

28PLAYGROUND_SERVER = 'lvalert-playground.cgca.uwm.edu' 

29LVALERT_USERNAME, LVALERT_PASSWORD = optional_env_var( 

30 ['LVALERT_USERNAME', 'LVALERT_PASSWORD'], 

31 "Could not load LVAlert credentials; ``llama listen lvalert`` will fail.", 

32) 

33 

34ALERT_TYPE = 'lvalert' 

35# labels which indicate that an event is ADVOK 

36ADVOK_LABELS = { 

37 'ADVOK', 

38} 

39ADVREQ_LABELS = { 

40 'ADVREQ', 

41} 

42TEST_NODES = { 

43 'stc-testnode', 

44 'test_superevent', 

45} 

46NODES = { 

47 'superevent', 

48 'cbc_gstlal', 

49 'cbc_pycbc', 

50 'cbc_mbtaonline', 

51 'cbc_spiir', 

52 'cbc_lowmass', 

53 'stc-testnode', 

54 'burst_cwb', 

55 'test_superevent', 

56} 

57 

58 

59def get_client(server: str): 

60 """Get an ``LVAlertClient`` at the specified ``server`` URL using the login 

61 credentials stored in the ``LVALERT_USERNAME`` and ``LVALERT_PASSWORD`` 

62 environmental variables (if available).""" 

63 from ligo.lvalert import LVAlertClient 

64 return LVAlertClient(username=LVALERT_USERNAME, password=LVALERT_PASSWORD, 

65 server=server) 

66 

67 

68class Client: 

69 """ 

70 A wrapper around ``LVAlertClient`` that provides an interface for 

71 subscribing to LVAlert nodes and starting a listener. Specify the new alert 

72 processor as ``processor`` and the URL of the LVAlert server to use as 

73 ``server``. 

74 """ 

75 server: str 

76 processor: FunctionType 

77 nodes: Tuple[str] 

78 heartbeat_queue: Queue 

79 heartbeat_interval: float 

80 

81 HEARTBEAT_NODE = 'stc-testnode' 

82 

83 def __init__(self, processor: FunctionType, server: str = SERVER, 

84 nodes: Tuple[str] = tuple(NODES), 

85 heartbeat_interval: float = 180): 

86 """Initialize a ``Client``.""" 

87 self.server = server 

88 self.processor = processor 

89 self.nodes = nodes 

90 self.heartbeat_queue = Queue() 

91 self.heartbeat_interval = heartbeat_interval 

92 self._client = get_client(server) 

93 

94 def heartbeat_message(self): 

95 """Get a random message with which to run a heartbeat check.""" 

96 randmsg = ''.join( 

97 choices( 

98 b''.join( 

99 int.to_bytes(i, 1, sys.byteorder) 

100 for i in range(65, 91) 

101 # for i in range(40, 127) 

102 ).decode(), 

103 k=16 

104 ) 

105 ) 

106 return randmsg 

107 

108 def check_heartbeat(self, timeout=20): 

109 """Publish a test message to ``self.HEARTBEAT_NODE`` and wait for a 

110 that same message to be parotted back by this instance's ``listen`` 

111 function to ``self.heartbeat_queue``. If, after waiting for ``timeout`` 

112 seconds, nothing has been received, or if the received message does not 

113 match the random string sent out, return ``False``; otherwise, 

114 return ``True``. You should probably restart your subscriptions if 

115 ``check_heartbeat`` fails. 

116 

117 This heartbeat check creates a new ``LVAlertClient`` and reconnects 

118 each time it is called. It is therefore fairly slow and should only be 

119 called at most every few minutes or so.""" 

120 # no idea if LVAlertClient is threadsafe, so just make a brand new one. 

121 LOGGER.info("Checking heartbeat; creating a new publishing client " 

122 "for server %s", self.server) 

123 pubclient = get_client(self.server) 

124 if pubclient.connect(): 

125 pubclient.process(block=False) 

126 LOGGER.debug("Heartbeat publisher connected to %s", self.server) 

127 else: 

128 LOGGER.error("Could not connect to %s with heartbeat_check " 

129 "publishing client.", self.server) 

130 return False 

131 # a random string 

132 randmsg = self.heartbeat_message() 

133 msg = json.dumps({"llama_heartbeat": randmsg}) 

134 pubclient.publish(self.HEARTBEAT_NODE, msg) 

135 start = time.time() 

136 while time.time() - start < timeout: 

137 if not self.heartbeat_queue.empty(): 

138 echo = self.heartbeat_queue.get() 

139 if echo == randmsg: 

140 LOGGER.debug("Heartbeat answered in %s seconds; things " 

141 "are running nominally.", time.time() - start) 

142 return True 

143 else: 

144 LOGGER.error("Heartbeat anwered WRONG in %s seconds. " 

145 "Expected %s but got %s", time.time() - start, 

146 randmsg, echo) 

147 return False 

148 time.sleep(1) 

149 LOGGER.error("Heartbeat check TIMEOUT after %s seconds with no " 

150 "response received.") 

151 return False 

152 

153 def listen(self, sleep=3): 

154 """Connect to ``self.server``, subscribe to desired ``self.nodes``, and 

155 start listening for new LVAlerts, reacting with ``self.processor``. 

156 Sleep for ``sleep`` seconds between checks for ``KeyboardInterrupt`` or 

157 ``SystemExit``. Will keep checking to make sure all original 

158 subscriptions are still active. Will only quit on keyboard interrupt or 

159 system exit call.""" 

160 try: 

161 LOGGER.info("Attempting to connect to %s", self.server) 

162 if self._client.connect(): 

163 self._client.process(block=False) 

164 LOGGER.debug("Connected successfully.") 

165 else: 

166 LOGGER.critical("Could not connect; exiting.") 

167 try: 

168 self._client.abort() 

169 except AttributeError as err: 

170 LOGGER.critical(f"Error while aborting client: {err}") 

171 exit(1) 

172 

173 LOGGER.debug("Trying to subscribe to %s", self.nodes) 

174 all_nodes = list(self._client.get_nodes()) 

175 nodes = [n for n in self.nodes if n in all_nodes] 

176 defunct = [n for n in self.nodes if n not in all_nodes] 

177 if defunct: 

178 LOGGER.error("The following requested nodes were not available" 

179 " on the server; will not attempt to subscribe to" 

180 " these: %s", defunct) 

181 # putting it in the loop is no slower and is nice because you'll 

182 # see exactly where it failed if it fails 

183 for node in nodes: 

184 self._client.subscribe(node) 

185 LOGGER.debug("Subscribed successfully to %s", node) 

186 

187 def heartbeat_wrapper(node, alert_json): 

188 """Put a safe wrapper around ``self.processor`` that catches 

189 exceptions and logs their tracebacks before they propogate up 

190 to the XMPP machinery where, in the past, exceptions have 

191 sometimes caused severed subscriptions. Also checks for 

192 heartbeat messages and reports via ``self.heartbeat_queue`` 

193 that they were received (these are not passed off to 

194 ``self.processor``).""" 

195 try: 

196 alert = json.loads(alert_json) 

197 # handle a heartbeat message 

198 if tuple(alert.keys()) == ('llama_heartbeat',): 

199 self.heartbeat_queue.put(alert['llama_heartbeat']) 

200 return 

201 LOGGER.info("Received new trigger, launching %s", 

202 self.processor) 

203 self.processor(node, alert_json) 

204 except Exception as e: 

205 LOGGER.error("EXCEPTION CAUGHT from node %s; returning to " 

206 "normal", node) 

207 LOGGER.error(traceback.format_exc()) 

208 LOGGER.error("OFFENDING NODE: %s", node) 

209 LOGGER.error("OFFENDING ALERT JSON: %s", alert_json) 

210 

211 self._client.listen(heartbeat_wrapper) 

212 

213 last_heartbeat = 0 # long time ago 

214 while True: 

215 # check heartbeat to make sure we can receive our own messages 

216 if time.time() - last_heartbeat > self.heartbeat_interval: 

217 if not self.check_heartbeat(): 

218 LOGGER.error("HEARTBEAT CHECK FAILED; aborting this " 

219 "client.") 

220 self._client.abort() 

221 return 

222 last_heartbeat = time.time() 

223 # check subscriptions to make sure they are what we originally 

224 # set 

225 subs = set(self._client.get_subscriptions()) 

226 if subs != set(nodes): 

227 LOGGER.error("SUBSCRIPTIONS HAVE BEEN LOST; aborting this " 

228 "client. Final active subscription list: %s " 

229 "vs. expected %s (missing: %s)", subs, 

230 set(nodes), set(nodes) - subs) 

231 self._client.abort() 

232 return 

233 time.sleep(sleep) 

234 

235 except (KeyboardInterrupt, SystemExit): 

236 LOGGER.critical("Disconnecting from server: %s", self.server) 

237 self._client.abort() 

238 exit() 

239 

240 

241class Alert(dict): 

242 """ 

243 A JSON alert dictionary from LVAlert. Parses the alert and reads various 

244 properties. 

245 """ 

246 

247 def __init__(self, *args, **kwargs): 

248 """If only one argument is provided and it is positional and a string, 

249 assume it is JSON and decode it. Otherwise initialize the way you 

250 usually would for a dictionary.""" 

251 if ((not kwargs) and len(args) == 1 and 

252 any(isinstance(args[0], s) for s in string_types)): 

253 super().__init__(json.loads(args[0])) 

254 else: 

255 super().__init__(*args, **kwargs) 

256 

257 @property 

258 def json(self): 

259 """Get the JSON value. If this was originally constructed as JSON, 

260 return that; otherwise, try to serialize it. Raises a ``TypeError`` if 

261 this ``Alert`` cannot be represented as a JSON string.""" 

262 return getattr(self, "_json", json.dumps(self)) 

263 

264 def __str__(self): 

265 """Get this Alert as a JSON string if possible. If it cannot be 

266 represented as a JSON string, just return whatever default string 

267 conversion it would have according to python.""" 

268 try: 

269 return self.json 

270 except TypeError: 

271 return super().__str__() 

272 

273 @property 

274 def new_skymap(self): 

275 """Check whether the alert describes the availability of a new 

276 skymap.""" 

277 return (self['alert_type'] == 'log' and 

278 self['data']['filename'] in SKYMAP_FILENAMES) 

279 

280 @property 

281 def advok(self): 

282 """Check whether the alert describes an ADVOK label being applied.""" 

283 return ( 

284 ( 

285 self['alert_type'] == 'label_added' and 

286 self['data']['name'] in ADVOK_LABELS 

287 ) or ( 

288 bool(ADVOK_LABELS.intersection(self['object']['labels'])) 

289 ) 

290 ) 

291 

292 @property 

293 def advreq(self): 

294 """Check whether the alert has received the advocate requested label 

295 (ADVREQ) and is therefore a promising trigger.""" 

296 return ( 

297 ( 

298 self['alert_type'] == 'label_added' and 

299 self['data']['name'] in ADVREQ_LABELS 

300 ) or ( 

301 bool(ADVREQ_LABELS.intersection(self['object']['labels'])) 

302 ) 

303 ) 

304 

305 @property 

306 def is_superevent(self): 

307 """Check whether the alert describes a superevent.""" 

308 return 'superevent_id' in self['object'] 

309 

310 @property 

311 def superid(self): 

312 """Return the superevent GraceID if this is a superevent. Raises a 

313 ``KeyError`` if not available.""" 

314 return self['object']['superevent_id'] 

315 

316 @property 

317 def eventid(self): 

318 """Return the preferred event GraceID if this is a superevent or else 

319 return this event's GraceID if it is just a normal event.""" 

320 if self.is_superevent: 

321 return self['object']['preferred_event'] 

322 return self['uid'] 

323 

324 @property 

325 def production(self): 

326 """Return whether this LVAlert object is marked as "production".""" 

327 return self["object"]["category"] == "Production" 

328 

329 @property 

330 def graceid(self): 

331 """Return the GraceID associated with this trigger; works for both 

332 events and superevents.""" 

333 return self['uid'] 

334 

335 

336def process_alert_json(node, alert_json, rundir): 

337 """Process an LVAlert JSON string as read from stdin when called by 

338 ``lvalert-listen``.""" 

339 # load json alert string 

340 LOGGER.info('\nNEW ALERT') 

341 LOGGER.info(' JSON: ') 

342 LOGGER.info(alert_json) 

343 # parse json alert string 

344 alert = Alert(alert_json) 

345 

346 # TODO run anyway even if not advok/advreq IF FAR is low 

347 # exit if this isn't an alert of interest 

348 if not ((alert.advok or alert.advreq) and alert.is_superevent): 

349 LOGGER.info(" NOTHING TO DO FOR THIS LVALERT. EXITING.") 

350 return 

351 

352 # otherwise, make the event directory if it does not exist 

353 event = Event(alert.graceid, rundir=rundir) 

354 LOGGER.info(' EVENTDIR EXISTS' if 

355 os.path.isdir(event.eventdir) else ' MAKING EVENTDIR') 

356 event.init() 

357 # we have special testnodes in LVAlert that are for test events/messages. 

358 if node in TEST_NODES or not alert.production: 

359 flag_preset_freeze_veto(event.flags, 'TRIGGERED_TEST') 

360 else: 

361 # TODO check if public; if public, immediately mark as TRIGGERED_PUBLIC 

362 flag_preset_freeze_veto(event.flags, 'TRIGGERED_INTERNAL') 

363 # always dump the lvalert contents to file if this is a new event. 

364 # skymap_info.json is not created yet, so this will not cause the analysis 

365 # to move further. 

366 lvalert = event.files.LVAlertJSON 

367 if not lvalert.exists(): 

368 LOGGER.info(' EVENT DOES NOT EXIST ALREADY!') 

369 LOGGER.info(' CREATING EVENT AND LVALERT JSON FILE.') 

370 lvalert.generate(alert_json) 

371 else: 

372 LOGGER.info(' LVALERT JSON FILE EXISTS, NOT REGENERATING.') 

373 skyinfo = event.files.SkymapInfo 

374 if not skyinfo.exists(): 

375 if skyinfo.intent.in_progress(): 

376 LOGGER.info(' SKYMAP INFO INTENT SET, NOT COMPETING TO GENERATE') 

377 else: 

378 LOGGER.info(' GENERATING SKYMAP INFO FILE.') 

379 graceid = lvalert.graceid 

380 if is_super(graceid): 

381 skyinfo.generate_from_gracedb_superevent(graceid) 

382 else: 

383 skyinfo.generate_from_gracedb(graceid) 

384 else: 

385 LOGGER.info(' SKYMAP INFO FILE EXISTS, NOT REGENERATING.') 

386 

387 # handle whichever case applies. 

388 if alert.advreq and alert.is_superevent: 

389 LOGGER.info(' NEW ADVREQ LABEL!') 

390 if alert.advok and alert.is_superevent: 

391 LOGGER.info(' ADVOK LABEL APPLIED!') 

392 LOGGER.info(' MAKING ADVOK LVALERT FILE') 

393 event.files.lvalert_em_ready_json.generate(alert_json) 

394 LOGGER.info(' MAKING ADVOK JSON MARKER') 

395 event.files.Advok.generate( 

396 alert_type=ALERT_TYPE, 

397 notice_time_iso=datetime.datetime.utcnow().isoformat(), 

398 ) 

399 LOGGER.info('EVENT PROCESSED.')