Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# (c) Stefan Countryman, 2016-2019
3"""
4Tools for receiving, parsing, and archiving GCN Notices. When run as a script,
5starts a GCN VOEvent listener process that listens for GCN triggers and runs
6``all_llama_handlers`` on those triggers. When run at the command line, starts
7a GCN listener that reacts to incoming GCN triggers.
8"""
10import os
11from datetime import datetime
12from glob import glob
13import logging
14import warnings
15import base64
16from collections import namedtuple
17from lxml.etree import fromstring, XMLSyntaxError
18import gcn
19from gcn.notice_types import (
20 LVC_PRELIMINARY,
21 LVC_INITIAL,
22 LVC_UPDATE,
23 LVC_RETRACTION,
24)
25from six.moves.urllib.parse import quote_plus
26from six import string_types
27from pygments import highlight
28# http://pygments.org/docs/lexers/#lexers-for-html-xml-and-related-markup
29from pygments.lexers.html import XmlLexer
30from llama.utils import (
31 OUTPUT_DIR,
32 DEFAULT_RUN_DIR,
33 CACHEDIR,
34 COLOR,
35 TB_FORMATTER,
36)
37from llama.event import Event
38from llama.run import Run
39from llama.files.lvc_gcn_xml import LvcGcnXml, LvcRetractionXml, parse_ivorn
40from llama.com.slack import alert_maintainers
41from llama.files.advok import Advok
43PRIVATE_GCN_PORT = 8096 # historically used for LVC GCN circulars pre-OPA
44PUBLIC_GCN_PORT = 8099
45DEFAULT_HOST = '68.169.57.253'
46LOCKDIR = os.path.join(CACHEDIR, 'gcnd')
47LOGGER = logging.getLogger(__name__)
48LOGGER.setLevel(logging.DEBUG)
49ALERT_TYPE = 'gcn'
50GCN_ARCHIVE = os.path.join(OUTPUT_DIR, 'gcn', 'archive')
51if not os.path.isdir(GCN_ARCHIVE):
52 try:
53 os.makedirs(GCN_ARCHIVE)
54 except OSError as err:
55 LOGGER.error("Could not create default directory %s. Error: %s",
56 GCN_ARCHIVE, err)
57 warnings.warn(("Could not create default directory expected by "
58 "llama: {} Error: {}").format(GCN_ARCHIVE, err))
59LVC_XML_QUERIES = {
60 "superid": "./What/Param[@name='GraceID']",
61 "alerttype": "./What/Param[@name='AlertType']",
62}
65def get_handle_lvc_gcn(filehandler, notice_types, rundir=DEFAULT_RUN_DIR):
66 """Get a ``handle_lvc_gcn`` function that outputs triggers to directories
67 in ``rundir`` using the specified filehandler."""
68 # pylint: disable=no-member
69 @gcn.include_notice_types(*notice_types)
70 def handle_lvc_gcn(payload, root):
71 """Process LIGO/Virgo Initial GCN notices. Save the payload to a new
72 event directory if it doesn't exist (corresponding to the GraceDB
73 Event) and mark the trigger as having an OPA, allowing GCN Notices and
74 Circulars to be distributed. Saves to the run directory specified when
75 ``get_handle_lvc_gcn`` was called."""
76 LOGGER.info("Running ``handle_lvc_gcn`` on rundir %s with handler %s",
77 rundir, filehandler.__name__)
78 params = dict()
79 try:
80 for name, query in LVC_XML_QUERIES.items():
81 value = root.find(query).attrib['value']
82 params[name] = value
83 LOGGER.info('%s: %s: %s', datetime.utcnow(), name, value)
84 except AttributeError:
85 LOGGER.exception('%s: Query %s for %s failed, dumping '
86 'payload:', datetime.utcnow(), query, name)
87 LOGGER.exception(payload)
88 return
89 except KeyError:
90 LOGGER.exception('%s: %s Param has no ``value``, dumping '
91 'payload:', datetime.utcnow(), name)
92 LOGGER.exception(payload)
93 return
94 _, eventid, _, _, _ = parse_ivorn(payload)
95 event = Event(eventid, rundir)
96 if event.exists():
97 LOGGER.info("Event exists, not initializing %s", event.eventid)
98 else:
99 LOGGER.info("Event does not exist, initializing %s", event.eventid)
100 event.init()
101 voe = filehandler(event)
102 voe.generate(payload)
103 if voe.role == 'observation':
104 event.flags.update(event.flags.PRESETS.TRIGGERED_PUBLIC)
105 LOGGER.info("Setting flags to preset 'TRIGGERED_PUBLIC': %s",
106 event.flags.PRESETS.TRIGGERED_PUBLIC)
107 if "PYTEST" not in os.environ:
108 alert_maintainers("NEW EVENT: {}".format(event.eventdir),
109 __name__)
110 else:
111 if voe.role not in event.flags.ALLOWED_VALUES['ROLE']:
112 LOGGER.error("Could not understand VOE role '%s'.", voe.role)
113 event.flags.update(event.flags.PRESETS.TRIGGERED_TEST)
114 LOGGER.info("Setting flags to preset 'TRIGGERED_TEST': %s",
115 event.flags.PRESETS.TRIGGERED_TEST)
116 event.files.SkymapInfo.generate_from_lvc_gcn_xml(voe)
117 return handle_lvc_gcn
120LlamaHandlersTuple = namedtuple('LlamaHandlersTuple', ('rundir', 'gcn_archive',
121 'manually_included'))
124def get_subdir(archive_dir, receipt_time):
125 """Get the subdirectory where a VOEvent would be archived. Since VOEvents
126 are archived in subdirectories (based on date) of the main
127 ``archive_directory``, this path depends on the ``receipt_time`` of the
128 VOEvent.
130 Parameters
131 ----------
132 archive_dir : str
133 The root directory for all GCN Notices.
134 receipt_time : datetime.datetime
135 The time of receipt of the GCN Notice.
136 """
137 return os.path.join(archive_dir, receipt_time.strftime("%Y-%m"),
138 receipt_time.strftime("%d"))
141class LlamaHandlers(LlamaHandlersTuple):
142 """
143 A gcn handler (see pygcn documentation) that runs all gcn handlers
144 specified in ``LlamaHandlers.included`` on the given ``payload`` (the
145 VOEvent XML string) and ``root`` (the ``lxml`` VOEvent object for the
146 payload). Add handlers either manually or with the ``llama_handler``
147 decorator. All handlers are run in succession, so it's up to you to make
148 sure they don't interfere with each other. Set up as a callable class so
149 that parameters for handlers can be stored in intances (e.g. output
150 directories).
152 Parameters
153 ----------
154 rundir : str, optional
155 Path to the run directory (i.e. where new LLAMA triggers are saved
156 by the GCN handler). Defaults to ``DEFAULT_RUN_DIR``. This directory
157 is created on instantiation if it does not already exist.
158 gcn_archive : str, optional
159 Path to the GCN Notice archive directory (i.e. where ALL new GCN
160 Notices are archived). Defaults to ``GCN_ARCHIVE``. This directory is
161 created on instantiation if it does not already exist.
162 included : array-like, optional
163 A tuple of handlers to call when this instance is called; these
164 functions must take ``payload`` and ``root`` as their only arguments.
165 Defaults to a list of all methods and callable properties of
166 ``LlamaHandlers`` whose names start with "handle", but this can be
167 overridden. Note that you can include other valid GCN handlers that are
168 not ``LlamaHandlers`` methods if you choose (see the ``pygcn``
169 documentation for details on valid GCN handlers).
170 """
172 def __new__(cls, rundir=DEFAULT_RUN_DIR, gcn_archive=GCN_ARCHIVE,
173 included=None):
174 """Create a new ``LlamaHandlers`` instance."""
175 rundir = os.path.realpath(rundir)
176 gcn_archive = os.path.realpath(gcn_archive)
177 manually_included = tuple(included) if included is not None else None
178 return LlamaHandlersTuple.__new__(cls, rundir, gcn_archive,
179 manually_included)
181 def __call__(self, payload, root):
182 """Run all handlers specified in ``LlamaHandlers.included``."""
183 LOGGER.info("NEW PAYLOAD. IVORN: %s", root.attrib['ivorn'])
184 for handler in self.included:
185 LOGGER.info("Running handler: %s from %s", handler.__name__,
186 handler.__module__)
187 handler(payload, root)
189 @property
190 def included(self):
191 """Get a list of GCN handlers to include in this overall handler. If
192 this was not specified when ``LlamaHandlers`` was instantiated,
193 return all GCN handlers defined in ``LlamaHandlers`` whose names start
194 with "handle"."""
195 handlers = [getattr(self, h) for h in dir(self)
196 if h.startswith("handle")]
197 return tuple(h for h in handlers if callable(h))
199 @property
200 def handle_lvc_preliminary(self):
201 """Get a ``handle_lvc_gcn`` function for GCN Preliminary notices that
202 saves to ``self.rundir``."""
203 return get_handle_lvc_gcn(filehandler=LvcGcnXml,
204 notice_types=[LVC_PRELIMINARY,],
205 rundir=self.rundir)
207 @property
208 def handle_lvc_initial_or_update(self):
209 """Get a ``handle_lvc_gcn`` function for GCN Initial or Update notices
210 that saves to ``self.rundir`` and adds an ``Advok`` marker."""
211 notice_types = [LVC_INITIAL, LVC_UPDATE]
212 handle_gcn = get_handle_lvc_gcn(filehandler=LvcGcnXml,
213 notice_types=notice_types,
214 rundir=self.rundir)
216 @gcn.include_notice_types(*notice_types)
217 def handle(payload, root):
218 """Save the GCN Notice and mark as ADVOK."""
219 _, eventid, _, _, _ = parse_ivorn(payload)
220 handle_gcn(payload, root)
221 adv = Advok(eventid, rundir=self.rundir)
222 adv.generate('gcn', root.find('./Who/Date').text)
224 return handle
226 def handle_lvc_retraction(self, payload, root):
227 """Mark all auto-generated events associated with this graceid as
228 vetoed and save the retraction."""
229 if not gcn.get_notice_type == LVC_RETRACTION:
230 return
231 _, _, graceid, _, _ = parse_ivorn(payload)
232 run = Run(rundir=self.rundir)
233 for event in run.downselect(eventid_filter=graceid+'*').events:
234 event.files.LvcRetractionXml.generate(payload)
235 try:
236 event.files.RctSlkLmaLvcRetractionXml.generate()
237 except GenerationError as err:
238 alert_maintainers('Could not send Slack message for '
239 f'retraction of {event}. '
240 f'Error message: {err}')
241 event.flags['VETOED'] = 'true'
243 def handle_archive(self, payload, root):
244 """Save a GCN Notice VOEvent to the archive specified by this
245 ``LlamaHandlers`` instance. Applies to all GCN Notices including ones
246 processed by the pipeline. Will not overwrite existing VOEvent files
247 (since VOEvent filenames are based on IVORNs, which are supposed to be
248 unique). If the filename is already in use, the xml will be logged at
249 the ``info`` level (useful for production to catch unexpected
250 behavior)."""
251 LOGGER.info("Running handle_archive on gcn archive directory %s",
252 self.gcn_archive)
253 now = datetime.utcnow()
254 try:
255 pretty_payload = highlight(payload.decode(), XmlLexer(),
256 TB_FORMATTER)
257 except UnicodeDecodeError:
258 LOGGER.error('Could not decode payload as Unicode: %s', payload)
259 pretty_payload = payload
260 try:
261 ivorn = root.attrib['ivorn']
262 LOGGER.debug("Received GCN notice: %s", ivorn)
263 except KeyError:
264 ivorn = "IVORN_NOT_FOUND_{}".format(now.isoformat())
265 LOGGER.error("Could not parse IVORN from notice; archiving as "
266 "%s:\n%s", ivorn, pretty_payload)
267 filename = quote_plus(ivorn)
268 subdirs = get_subdir(self.gcn_archive, now)
269 if not os.path.isdir(subdirs):
270 os.makedirs(subdirs)
271 outpath = os.path.join(subdirs, filename)
272 LOGGER.info("Selected outpath %s", outpath)
273 if not os.path.exists(outpath):
274 with open(outpath, 'wb') as outfile:
275 outfile.write(payload)
276 else:
277 LOGGER.info("outpath %s exists, not overwriting. dumping xml:\n%s",
278 outpath, pretty_payload)
279 LOGGER.info("archived %s", ivorn)
282def run_on_voevent_file(files="*.xml", handler=LlamaHandlers()):
283 """Run a handler on VOEvent XML files (as if they had just been received
284 via GCN). By default, runs all LLAMA handlers (marked with
285 ``@llama_handler`` decorator) on the given VOEvent.
287 Parameters
288 ----------
289 files : str or list, optional
290 A list of filenames to operate on or else a string that can be
291 interpreted as a UNIX-like glob pattern for matching files as handled
292 by ``fnmatch`` or ``glob`` (DEFAULT: "*.xml", matching XML files in the
293 current directory). Note that this means you can usually just provide
294 the exact filename (as long as it can't be interpreted as a UNIX glob
295 pattern).
296 handler : function, optional
297 The GCN Handler, of the same sort used by ``gcn.listen`` (DEFAULT:
298 ``all_llama_handlers``).
300 Raises
301 ------
302 XMLSyntaxError
303 If the files provided are not valid VOEvents, raises an exception once
304 it gets to that VOEvent file.
305 """
306 if isinstance(files, string_types):
307 files = glob(files)
308 for filename in files:
309 LOGGER.info(COLOR.blue("Running %s handler on %s."), handler, filename)
310 with open(filename, 'rb') as voefile:
311 payload = voefile.read()
312 try:
313 handler(payload, fromstring(payload))
314 except XMLSyntaxError:
315 LOGGER.exception(COLOR.RED("failed to parse XML from %s, "
316 "base64-encoded payload is:\n%s"),
317 os.path.realpath(filename),
318 base64.b64encode(payload))
319 raise