Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# (c) Stefan Countryman, 2016-2019 

2 

3""" 

4Tools for receiving, parsing, and archiving GCN Notices. When run as a script, 

5starts a GCN VOEvent listener process that listens for GCN triggers and runs 

6``all_llama_handlers`` on those triggers. When run at the command line, starts 

7a GCN listener that reacts to incoming GCN triggers. 

8""" 

9 

10import os 

11from datetime import datetime 

12from glob import glob 

13import logging 

14import warnings 

15import base64 

16from collections import namedtuple 

17from lxml.etree import fromstring, XMLSyntaxError 

18import gcn 

19from gcn.notice_types import ( 

20 LVC_PRELIMINARY, 

21 LVC_INITIAL, 

22 LVC_UPDATE, 

23 LVC_RETRACTION, 

24) 

25from six.moves.urllib.parse import quote_plus 

26from six import string_types 

27from pygments import highlight 

28# http://pygments.org/docs/lexers/#lexers-for-html-xml-and-related-markup 

29from pygments.lexers.html import XmlLexer 

30from llama.utils import ( 

31 OUTPUT_DIR, 

32 DEFAULT_RUN_DIR, 

33 CACHEDIR, 

34 COLOR, 

35 TB_FORMATTER, 

36) 

37from llama.event import Event 

38from llama.run import Run 

39from llama.files.lvc_gcn_xml import LvcGcnXml, LvcRetractionXml, parse_ivorn 

40from llama.com.slack import alert_maintainers 

41from llama.files.advok import Advok 

42 

43PRIVATE_GCN_PORT = 8096 # historically used for LVC GCN circulars pre-OPA 

44PUBLIC_GCN_PORT = 8099 

45DEFAULT_HOST = '68.169.57.253' 

46LOCKDIR = os.path.join(CACHEDIR, 'gcnd') 

47LOGGER = logging.getLogger(__name__) 

48LOGGER.setLevel(logging.DEBUG) 

49ALERT_TYPE = 'gcn' 

50GCN_ARCHIVE = os.path.join(OUTPUT_DIR, 'gcn', 'archive') 

51if not os.path.isdir(GCN_ARCHIVE): 

52 try: 

53 os.makedirs(GCN_ARCHIVE) 

54 except OSError as err: 

55 LOGGER.error("Could not create default directory %s. Error: %s", 

56 GCN_ARCHIVE, err) 

57 warnings.warn(("Could not create default directory expected by " 

58 "llama: {} Error: {}").format(GCN_ARCHIVE, err)) 

59LVC_XML_QUERIES = { 

60 "superid": "./What/Param[@name='GraceID']", 

61 "alerttype": "./What/Param[@name='AlertType']", 

62} 

63 

64 

65def get_handle_lvc_gcn(filehandler, notice_types, rundir=DEFAULT_RUN_DIR): 

66 """Get a ``handle_lvc_gcn`` function that outputs triggers to directories 

67 in ``rundir`` using the specified filehandler.""" 

68 # pylint: disable=no-member 

69 @gcn.include_notice_types(*notice_types) 

70 def handle_lvc_gcn(payload, root): 

71 """Process LIGO/Virgo Initial GCN notices. Save the payload to a new 

72 event directory if it doesn't exist (corresponding to the GraceDB 

73 Event) and mark the trigger as having an OPA, allowing GCN Notices and 

74 Circulars to be distributed. Saves to the run directory specified when 

75 ``get_handle_lvc_gcn`` was called.""" 

76 LOGGER.info("Running ``handle_lvc_gcn`` on rundir %s with handler %s", 

77 rundir, filehandler.__name__) 

78 params = dict() 

79 try: 

80 for name, query in LVC_XML_QUERIES.items(): 

81 value = root.find(query).attrib['value'] 

82 params[name] = value 

83 LOGGER.info('%s: %s: %s', datetime.utcnow(), name, value) 

84 except AttributeError: 

85 LOGGER.exception('%s: Query %s for %s failed, dumping ' 

86 'payload:', datetime.utcnow(), query, name) 

87 LOGGER.exception(payload) 

88 return 

89 except KeyError: 

90 LOGGER.exception('%s: %s Param has no ``value``, dumping ' 

91 'payload:', datetime.utcnow(), name) 

92 LOGGER.exception(payload) 

93 return 

94 _, eventid, _, _, _ = parse_ivorn(payload) 

95 event = Event(eventid, rundir) 

96 if event.exists(): 

97 LOGGER.info("Event exists, not initializing %s", event.eventid) 

98 else: 

99 LOGGER.info("Event does not exist, initializing %s", event.eventid) 

100 event.init() 

101 voe = filehandler(event) 

102 voe.generate(payload) 

103 if voe.role == 'observation': 

104 event.flags.update(event.flags.PRESETS.TRIGGERED_PUBLIC) 

105 LOGGER.info("Setting flags to preset 'TRIGGERED_PUBLIC': %s", 

106 event.flags.PRESETS.TRIGGERED_PUBLIC) 

107 if "PYTEST" not in os.environ: 

108 alert_maintainers("NEW EVENT: {}".format(event.eventdir), 

109 __name__) 

110 else: 

111 if voe.role not in event.flags.ALLOWED_VALUES['ROLE']: 

112 LOGGER.error("Could not understand VOE role '%s'.", voe.role) 

113 event.flags.update(event.flags.PRESETS.TRIGGERED_TEST) 

114 LOGGER.info("Setting flags to preset 'TRIGGERED_TEST': %s", 

115 event.flags.PRESETS.TRIGGERED_TEST) 

116 event.files.SkymapInfo.generate_from_lvc_gcn_xml(voe) 

117 return handle_lvc_gcn 

118 

119 

120LlamaHandlersTuple = namedtuple('LlamaHandlersTuple', ('rundir', 'gcn_archive', 

121 'manually_included')) 

122 

123 

124def get_subdir(archive_dir, receipt_time): 

125 """Get the subdirectory where a VOEvent would be archived. Since VOEvents 

126 are archived in subdirectories (based on date) of the main 

127 ``archive_directory``, this path depends on the ``receipt_time`` of the 

128 VOEvent. 

129 

130 Parameters 

131 ---------- 

132 archive_dir : str 

133 The root directory for all GCN Notices. 

134 receipt_time : datetime.datetime 

135 The time of receipt of the GCN Notice. 

136 """ 

137 return os.path.join(archive_dir, receipt_time.strftime("%Y-%m"), 

138 receipt_time.strftime("%d")) 

139 

140 

141class LlamaHandlers(LlamaHandlersTuple): 

142 """ 

143 A gcn handler (see pygcn documentation) that runs all gcn handlers 

144 specified in ``LlamaHandlers.included`` on the given ``payload`` (the 

145 VOEvent XML string) and ``root`` (the ``lxml`` VOEvent object for the 

146 payload). Add handlers either manually or with the ``llama_handler`` 

147 decorator. All handlers are run in succession, so it's up to you to make 

148 sure they don't interfere with each other. Set up as a callable class so 

149 that parameters for handlers can be stored in intances (e.g. output 

150 directories). 

151 

152 Parameters 

153 ---------- 

154 rundir : str, optional 

155 Path to the run directory (i.e. where new LLAMA triggers are saved 

156 by the GCN handler). Defaults to ``DEFAULT_RUN_DIR``. This directory 

157 is created on instantiation if it does not already exist. 

158 gcn_archive : str, optional 

159 Path to the GCN Notice archive directory (i.e. where ALL new GCN 

160 Notices are archived). Defaults to ``GCN_ARCHIVE``. This directory is 

161 created on instantiation if it does not already exist. 

162 included : array-like, optional 

163 A tuple of handlers to call when this instance is called; these 

164 functions must take ``payload`` and ``root`` as their only arguments. 

165 Defaults to a list of all methods and callable properties of 

166 ``LlamaHandlers`` whose names start with "handle", but this can be 

167 overridden. Note that you can include other valid GCN handlers that are 

168 not ``LlamaHandlers`` methods if you choose (see the ``pygcn`` 

169 documentation for details on valid GCN handlers). 

170 """ 

171 

172 def __new__(cls, rundir=DEFAULT_RUN_DIR, gcn_archive=GCN_ARCHIVE, 

173 included=None): 

174 """Create a new ``LlamaHandlers`` instance.""" 

175 rundir = os.path.realpath(rundir) 

176 gcn_archive = os.path.realpath(gcn_archive) 

177 manually_included = tuple(included) if included is not None else None 

178 return LlamaHandlersTuple.__new__(cls, rundir, gcn_archive, 

179 manually_included) 

180 

181 def __call__(self, payload, root): 

182 """Run all handlers specified in ``LlamaHandlers.included``.""" 

183 LOGGER.info("NEW PAYLOAD. IVORN: %s", root.attrib['ivorn']) 

184 for handler in self.included: 

185 LOGGER.info("Running handler: %s from %s", handler.__name__, 

186 handler.__module__) 

187 handler(payload, root) 

188 

189 @property 

190 def included(self): 

191 """Get a list of GCN handlers to include in this overall handler. If 

192 this was not specified when ``LlamaHandlers`` was instantiated, 

193 return all GCN handlers defined in ``LlamaHandlers`` whose names start 

194 with "handle".""" 

195 handlers = [getattr(self, h) for h in dir(self) 

196 if h.startswith("handle")] 

197 return tuple(h for h in handlers if callable(h)) 

198 

199 @property 

200 def handle_lvc_preliminary(self): 

201 """Get a ``handle_lvc_gcn`` function for GCN Preliminary notices that 

202 saves to ``self.rundir``.""" 

203 return get_handle_lvc_gcn(filehandler=LvcGcnXml, 

204 notice_types=[LVC_PRELIMINARY,], 

205 rundir=self.rundir) 

206 

207 @property 

208 def handle_lvc_initial_or_update(self): 

209 """Get a ``handle_lvc_gcn`` function for GCN Initial or Update notices 

210 that saves to ``self.rundir`` and adds an ``Advok`` marker.""" 

211 notice_types = [LVC_INITIAL, LVC_UPDATE] 

212 handle_gcn = get_handle_lvc_gcn(filehandler=LvcGcnXml, 

213 notice_types=notice_types, 

214 rundir=self.rundir) 

215 

216 @gcn.include_notice_types(*notice_types) 

217 def handle(payload, root): 

218 """Save the GCN Notice and mark as ADVOK.""" 

219 _, eventid, _, _, _ = parse_ivorn(payload) 

220 handle_gcn(payload, root) 

221 adv = Advok(eventid, rundir=self.rundir) 

222 adv.generate('gcn', root.find('./Who/Date').text) 

223 

224 return handle 

225 

226 def handle_lvc_retraction(self, payload, root): 

227 """Mark all auto-generated events associated with this graceid as 

228 vetoed and save the retraction.""" 

229 if not gcn.get_notice_type == LVC_RETRACTION: 

230 return 

231 _, _, graceid, _, _ = parse_ivorn(payload) 

232 run = Run(rundir=self.rundir) 

233 for event in run.downselect(eventid_filter=graceid+'*').events: 

234 event.files.LvcRetractionXml.generate(payload) 

235 try: 

236 event.files.RctSlkLmaLvcRetractionXml.generate() 

237 except GenerationError as err: 

238 alert_maintainers('Could not send Slack message for ' 

239 f'retraction of {event}. ' 

240 f'Error message: {err}') 

241 event.flags['VETOED'] = 'true' 

242 

243 def handle_archive(self, payload, root): 

244 """Save a GCN Notice VOEvent to the archive specified by this 

245 ``LlamaHandlers`` instance. Applies to all GCN Notices including ones 

246 processed by the pipeline. Will not overwrite existing VOEvent files 

247 (since VOEvent filenames are based on IVORNs, which are supposed to be 

248 unique). If the filename is already in use, the xml will be logged at 

249 the ``info`` level (useful for production to catch unexpected 

250 behavior).""" 

251 LOGGER.info("Running handle_archive on gcn archive directory %s", 

252 self.gcn_archive) 

253 now = datetime.utcnow() 

254 try: 

255 pretty_payload = highlight(payload.decode(), XmlLexer(), 

256 TB_FORMATTER) 

257 except UnicodeDecodeError: 

258 LOGGER.error('Could not decode payload as Unicode: %s', payload) 

259 pretty_payload = payload 

260 try: 

261 ivorn = root.attrib['ivorn'] 

262 LOGGER.debug("Received GCN notice: %s", ivorn) 

263 except KeyError: 

264 ivorn = "IVORN_NOT_FOUND_{}".format(now.isoformat()) 

265 LOGGER.error("Could not parse IVORN from notice; archiving as " 

266 "%s:\n%s", ivorn, pretty_payload) 

267 filename = quote_plus(ivorn) 

268 subdirs = get_subdir(self.gcn_archive, now) 

269 if not os.path.isdir(subdirs): 

270 os.makedirs(subdirs) 

271 outpath = os.path.join(subdirs, filename) 

272 LOGGER.info("Selected outpath %s", outpath) 

273 if not os.path.exists(outpath): 

274 with open(outpath, 'wb') as outfile: 

275 outfile.write(payload) 

276 else: 

277 LOGGER.info("outpath %s exists, not overwriting. dumping xml:\n%s", 

278 outpath, pretty_payload) 

279 LOGGER.info("archived %s", ivorn) 

280 

281 

282def run_on_voevent_file(files="*.xml", handler=LlamaHandlers()): 

283 """Run a handler on VOEvent XML files (as if they had just been received 

284 via GCN). By default, runs all LLAMA handlers (marked with 

285 ``@llama_handler`` decorator) on the given VOEvent. 

286 

287 Parameters 

288 ---------- 

289 files : str or list, optional 

290 A list of filenames to operate on or else a string that can be 

291 interpreted as a UNIX-like glob pattern for matching files as handled 

292 by ``fnmatch`` or ``glob`` (DEFAULT: "*.xml", matching XML files in the 

293 current directory). Note that this means you can usually just provide 

294 the exact filename (as long as it can't be interpreted as a UNIX glob 

295 pattern). 

296 handler : function, optional 

297 The GCN Handler, of the same sort used by ``gcn.listen`` (DEFAULT: 

298 ``all_llama_handlers``). 

299 

300 Raises 

301 ------ 

302 XMLSyntaxError 

303 If the files provided are not valid VOEvents, raises an exception once 

304 it gets to that VOEvent file. 

305 """ 

306 if isinstance(files, string_types): 

307 files = glob(files) 

308 for filename in files: 

309 LOGGER.info(COLOR.blue("Running %s handler on %s."), handler, filename) 

310 with open(filename, 'rb') as voefile: 

311 payload = voefile.read() 

312 try: 

313 handler(payload, fromstring(payload)) 

314 except XMLSyntaxError: 

315 LOGGER.exception(COLOR.RED("failed to parse XML from %s, " 

316 "base64-encoded payload is:\n%s"), 

317 os.path.realpath(filename), 

318 base64.b64encode(payload)) 

319 raise