Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# (c) Stefan Countryman, 2019
3"""
4Tools for listening to LVAlerts, LIGO/Virgo's internal GW trigger alert system.
5"""
7import sys
8import time
9import traceback
10import json
11import os
12import datetime
13import logging
14from random import choices
15# from queue import Queue
16from multiprocessing import Queue # since our queue is shared between procs
17from typing import Tuple
18from types import FunctionType
19from six import string_types
20from llama.listen import flag_preset_freeze_veto
21from llama.event import Event
22from llama.classes import optional_env_var
23from llama.files.lvc_skymap import SKYMAP_FILENAMES
24from llama.files.skymap_info.utils import is_super
26LOGGER = logging.getLogger(__name__)
27SERVER = 'lvalert.cgca.uwm.edu'
28PLAYGROUND_SERVER = 'lvalert-playground.cgca.uwm.edu'
29LVALERT_USERNAME, LVALERT_PASSWORD = optional_env_var(
30 ['LVALERT_USERNAME', 'LVALERT_PASSWORD'],
31 "Could not load LVAlert credentials; ``llama listen lvalert`` will fail.",
32)
34ALERT_TYPE = 'lvalert'
35# labels which indicate that an event is ADVOK
36ADVOK_LABELS = {
37 'ADVOK',
38}
39ADVREQ_LABELS = {
40 'ADVREQ',
41}
42TEST_NODES = {
43 'stc-testnode',
44 'test_superevent',
45}
46NODES = {
47 'superevent',
48 'cbc_gstlal',
49 'cbc_pycbc',
50 'cbc_mbtaonline',
51 'cbc_spiir',
52 'cbc_lowmass',
53 'stc-testnode',
54 'burst_cwb',
55 'test_superevent',
56}
59def get_client(server: str):
60 """Get an ``LVAlertClient`` at the specified ``server`` URL using the login
61 credentials stored in the ``LVALERT_USERNAME`` and ``LVALERT_PASSWORD``
62 environmental variables (if available)."""
63 from ligo.lvalert import LVAlertClient
64 return LVAlertClient(username=LVALERT_USERNAME, password=LVALERT_PASSWORD,
65 server=server)
68class Client:
69 """
70 A wrapper around ``LVAlertClient`` that provides an interface for
71 subscribing to LVAlert nodes and starting a listener. Specify the new alert
72 processor as ``processor`` and the URL of the LVAlert server to use as
73 ``server``.
74 """
75 server: str
76 processor: FunctionType
77 nodes: Tuple[str]
78 heartbeat_queue: Queue
79 heartbeat_interval: float
81 HEARTBEAT_NODE = 'stc-testnode'
83 def __init__(self, processor: FunctionType, server: str = SERVER,
84 nodes: Tuple[str] = tuple(NODES),
85 heartbeat_interval: float = 180):
86 """Initialize a ``Client``."""
87 self.server = server
88 self.processor = processor
89 self.nodes = nodes
90 self.heartbeat_queue = Queue()
91 self.heartbeat_interval = heartbeat_interval
92 self._client = get_client(server)
94 def heartbeat_message(self):
95 """Get a random message with which to run a heartbeat check."""
96 randmsg = ''.join(
97 choices(
98 b''.join(
99 int.to_bytes(i, 1, sys.byteorder)
100 for i in range(65, 91)
101 # for i in range(40, 127)
102 ).decode(),
103 k=16
104 )
105 )
106 return randmsg
108 def check_heartbeat(self, timeout=20):
109 """Publish a test message to ``self.HEARTBEAT_NODE`` and wait for a
110 that same message to be parotted back by this instance's ``listen``
111 function to ``self.heartbeat_queue``. If, after waiting for ``timeout``
112 seconds, nothing has been received, or if the received message does not
113 match the random string sent out, return ``False``; otherwise,
114 return ``True``. You should probably restart your subscriptions if
115 ``check_heartbeat`` fails.
117 This heartbeat check creates a new ``LVAlertClient`` and reconnects
118 each time it is called. It is therefore fairly slow and should only be
119 called at most every few minutes or so."""
120 # no idea if LVAlertClient is threadsafe, so just make a brand new one.
121 LOGGER.info("Checking heartbeat; creating a new publishing client "
122 "for server %s", self.server)
123 pubclient = get_client(self.server)
124 if pubclient.connect():
125 pubclient.process(block=False)
126 LOGGER.debug("Heartbeat publisher connected to %s", self.server)
127 else:
128 LOGGER.error("Could not connect to %s with heartbeat_check "
129 "publishing client.", self.server)
130 return False
131 # a random string
132 randmsg = self.heartbeat_message()
133 msg = json.dumps({"llama_heartbeat": randmsg})
134 pubclient.publish(self.HEARTBEAT_NODE, msg)
135 start = time.time()
136 while time.time() - start < timeout:
137 if not self.heartbeat_queue.empty():
138 echo = self.heartbeat_queue.get()
139 if echo == randmsg:
140 LOGGER.debug("Heartbeat answered in %s seconds; things "
141 "are running nominally.", time.time() - start)
142 return True
143 else:
144 LOGGER.error("Heartbeat anwered WRONG in %s seconds. "
145 "Expected %s but got %s", time.time() - start,
146 randmsg, echo)
147 return False
148 time.sleep(1)
149 LOGGER.error("Heartbeat check TIMEOUT after %s seconds with no "
150 "response received.")
151 return False
153 def listen(self, sleep=3):
154 """Connect to ``self.server``, subscribe to desired ``self.nodes``, and
155 start listening for new LVAlerts, reacting with ``self.processor``.
156 Sleep for ``sleep`` seconds between checks for ``KeyboardInterrupt`` or
157 ``SystemExit``. Will keep checking to make sure all original
158 subscriptions are still active. Will only quit on keyboard interrupt or
159 system exit call."""
160 try:
161 LOGGER.info("Attempting to connect to %s", self.server)
162 if self._client.connect():
163 self._client.process(block=False)
164 LOGGER.debug("Connected successfully.")
165 else:
166 LOGGER.critical("Could not connect; exiting.")
167 try:
168 self._client.abort()
169 except AttributeError as err:
170 LOGGER.critical(f"Error while aborting client: {err}")
171 exit(1)
173 LOGGER.debug("Trying to subscribe to %s", self.nodes)
174 all_nodes = list(self._client.get_nodes())
175 nodes = [n for n in self.nodes if n in all_nodes]
176 defunct = [n for n in self.nodes if n not in all_nodes]
177 if defunct:
178 LOGGER.error("The following requested nodes were not available"
179 " on the server; will not attempt to subscribe to"
180 " these: %s", defunct)
181 # putting it in the loop is no slower and is nice because you'll
182 # see exactly where it failed if it fails
183 for node in nodes:
184 self._client.subscribe(node)
185 LOGGER.debug("Subscribed successfully to %s", node)
187 def heartbeat_wrapper(node, alert_json):
188 """Put a safe wrapper around ``self.processor`` that catches
189 exceptions and logs their tracebacks before they propogate up
190 to the XMPP machinery where, in the past, exceptions have
191 sometimes caused severed subscriptions. Also checks for
192 heartbeat messages and reports via ``self.heartbeat_queue``
193 that they were received (these are not passed off to
194 ``self.processor``)."""
195 try:
196 alert = json.loads(alert_json)
197 # handle a heartbeat message
198 if tuple(alert.keys()) == ('llama_heartbeat',):
199 self.heartbeat_queue.put(alert['llama_heartbeat'])
200 return
201 LOGGER.info("Received new trigger, launching %s",
202 self.processor)
203 self.processor(node, alert_json)
204 except Exception as e:
205 LOGGER.error("EXCEPTION CAUGHT from node %s; returning to "
206 "normal", node)
207 LOGGER.error(traceback.format_exc())
208 LOGGER.error("OFFENDING NODE: %s", node)
209 LOGGER.error("OFFENDING ALERT JSON: %s", alert_json)
211 self._client.listen(heartbeat_wrapper)
213 last_heartbeat = 0 # long time ago
214 while True:
215 # check heartbeat to make sure we can receive our own messages
216 if time.time() - last_heartbeat > self.heartbeat_interval:
217 if not self.check_heartbeat():
218 LOGGER.error("HEARTBEAT CHECK FAILED; aborting this "
219 "client.")
220 self._client.abort()
221 return
222 last_heartbeat = time.time()
223 # check subscriptions to make sure they are what we originally
224 # set
225 subs = set(self._client.get_subscriptions())
226 if subs != set(nodes):
227 LOGGER.error("SUBSCRIPTIONS HAVE BEEN LOST; aborting this "
228 "client. Final active subscription list: %s "
229 "vs. expected %s (missing: %s)", subs,
230 set(nodes), set(nodes) - subs)
231 self._client.abort()
232 return
233 time.sleep(sleep)
235 except (KeyboardInterrupt, SystemExit):
236 LOGGER.critical("Disconnecting from server: %s", self.server)
237 self._client.abort()
238 exit()
241class Alert(dict):
242 """
243 A JSON alert dictionary from LVAlert. Parses the alert and reads various
244 properties.
245 """
247 def __init__(self, *args, **kwargs):
248 """If only one argument is provided and it is positional and a string,
249 assume it is JSON and decode it. Otherwise initialize the way you
250 usually would for a dictionary."""
251 if ((not kwargs) and len(args) == 1 and
252 any(isinstance(args[0], s) for s in string_types)):
253 super().__init__(json.loads(args[0]))
254 else:
255 super().__init__(*args, **kwargs)
257 @property
258 def json(self):
259 """Get the JSON value. If this was originally constructed as JSON,
260 return that; otherwise, try to serialize it. Raises a ``TypeError`` if
261 this ``Alert`` cannot be represented as a JSON string."""
262 return getattr(self, "_json", json.dumps(self))
264 def __str__(self):
265 """Get this Alert as a JSON string if possible. If it cannot be
266 represented as a JSON string, just return whatever default string
267 conversion it would have according to python."""
268 try:
269 return self.json
270 except TypeError:
271 return super().__str__()
273 @property
274 def new_skymap(self):
275 """Check whether the alert describes the availability of a new
276 skymap."""
277 return (self['alert_type'] == 'log' and
278 self['data']['filename'] in SKYMAP_FILENAMES)
280 @property
281 def advok(self):
282 """Check whether the alert describes an ADVOK label being applied."""
283 return (
284 (
285 self['alert_type'] == 'label_added' and
286 self['data']['name'] in ADVOK_LABELS
287 ) or (
288 bool(ADVOK_LABELS.intersection(self['object']['labels']))
289 )
290 )
292 @property
293 def advreq(self):
294 """Check whether the alert has received the advocate requested label
295 (ADVREQ) and is therefore a promising trigger."""
296 return (
297 (
298 self['alert_type'] == 'label_added' and
299 self['data']['name'] in ADVREQ_LABELS
300 ) or (
301 bool(ADVREQ_LABELS.intersection(self['object']['labels']))
302 )
303 )
305 @property
306 def is_superevent(self):
307 """Check whether the alert describes a superevent."""
308 return 'superevent_id' in self['object']
310 @property
311 def superid(self):
312 """Return the superevent GraceID if this is a superevent. Raises a
313 ``KeyError`` if not available."""
314 return self['object']['superevent_id']
316 @property
317 def eventid(self):
318 """Return the preferred event GraceID if this is a superevent or else
319 return this event's GraceID if it is just a normal event."""
320 if self.is_superevent:
321 return self['object']['preferred_event']
322 return self['uid']
324 @property
325 def production(self):
326 """Return whether this LVAlert object is marked as "production"."""
327 return self["object"]["category"] == "Production"
329 @property
330 def graceid(self):
331 """Return the GraceID associated with this trigger; works for both
332 events and superevents."""
333 return self['uid']
336def process_alert_json(node, alert_json, rundir):
337 """Process an LVAlert JSON string as read from stdin when called by
338 ``lvalert-listen``."""
339 # load json alert string
340 LOGGER.info('\nNEW ALERT')
341 LOGGER.info(' JSON: ')
342 LOGGER.info(alert_json)
343 # parse json alert string
344 alert = Alert(alert_json)
346 # TODO run anyway even if not advok/advreq IF FAR is low
347 # exit if this isn't an alert of interest
348 if not ((alert.advok or alert.advreq) and alert.is_superevent):
349 LOGGER.info(" NOTHING TO DO FOR THIS LVALERT. EXITING.")
350 return
352 # otherwise, make the event directory if it does not exist
353 event = Event(alert.graceid, rundir=rundir)
354 LOGGER.info(' EVENTDIR EXISTS' if
355 os.path.isdir(event.eventdir) else ' MAKING EVENTDIR')
356 event.init()
357 # we have special testnodes in LVAlert that are for test events/messages.
358 if node in TEST_NODES or not alert.production:
359 flag_preset_freeze_veto(event.flags, 'TRIGGERED_TEST')
360 else:
361 # TODO check if public; if public, immediately mark as TRIGGERED_PUBLIC
362 flag_preset_freeze_veto(event.flags, 'TRIGGERED_INTERNAL')
363 # always dump the lvalert contents to file if this is a new event.
364 # skymap_info.json is not created yet, so this will not cause the analysis
365 # to move further.
366 lvalert = event.files.LVAlertJSON
367 if not lvalert.exists():
368 LOGGER.info(' EVENT DOES NOT EXIST ALREADY!')
369 LOGGER.info(' CREATING EVENT AND LVALERT JSON FILE.')
370 lvalert.generate(alert_json)
371 else:
372 LOGGER.info(' LVALERT JSON FILE EXISTS, NOT REGENERATING.')
373 skyinfo = event.files.SkymapInfo
374 if not skyinfo.exists():
375 if skyinfo.intent.in_progress():
376 LOGGER.info(' SKYMAP INFO INTENT SET, NOT COMPETING TO GENERATE')
377 else:
378 LOGGER.info(' GENERATING SKYMAP INFO FILE.')
379 graceid = lvalert.graceid
380 if is_super(graceid):
381 skyinfo.generate_from_gracedb_superevent(graceid)
382 else:
383 skyinfo.generate_from_gracedb(graceid)
384 else:
385 LOGGER.info(' SKYMAP INFO FILE EXISTS, NOT REGENERATING.')
387 # handle whichever case applies.
388 if alert.advreq and alert.is_superevent:
389 LOGGER.info(' NEW ADVREQ LABEL!')
390 if alert.advok and alert.is_superevent:
391 LOGGER.info(' ADVOK LABEL APPLIED!')
392 LOGGER.info(' MAKING ADVOK LVALERT FILE')
393 event.files.lvalert_em_ready_json.generate(alert_json)
394 LOGGER.info(' MAKING ADVOK JSON MARKER')
395 event.files.Advok.generate(
396 alert_type=ALERT_TYPE,
397 notice_time_iso=datetime.datetime.utcnow().isoformat(),
398 )
399 LOGGER.info('EVENT PROCESSED.')