Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2# Stefan Countryman, July 21, 2016
4"""A class for interacting with the files associated with a given
5event. This class contains methods for working with these files individually
6(with FileHandlers) or as a group. In particular, the update method
7can be used to abstractly update the files in an event directory
8with the latest available information. This approach is robust and allows for
9trivial modifications and additions to the types of data files associated with
10events.
11"""
13import logging
14import datetime
15import filecmp
16import os
17import tarfile
18from shutil import copytree, rmtree
19from tempfile import mkdtemp
20from collections import namedtuple
21from llama.classes import LOCAL_TMPDIR_NAME
22from llama.pipeline import DEFAULT_PIPELINE
23from llama.utils import color_logger, COLOR as COL, DEFAULT_RUN_DIR
24from llama.versioning import GitDirMixin, GitRepoUninitialized
25from llama.filehandler import (
26 EventTriggeredFileHandler,
27 FlagsMixin,
28)
29LOGGER = logging.getLogger(__name__)
30EVENT_AUX_PATHS = [
31 '.git',
32 '.gitignore',
33 LOCAL_TMPDIR_NAME,
34 'FLAGS.json',
35]
37NOW = datetime.datetime.now
40# Implement FileHandler as a subclass of a namedtuple to ensure immutability
41EventTuple = namedtuple("EventTuple", ("eventid", "rundir", "pipeline"))
44class Event(EventTuple, FlagsMixin, GitDirMixin):
45 """
46 An event object, used to update and access data associated with a
47 specific trigger (which recieves its own directory or ``eventdir``).
48 FileHandler or another Event (or anything with 'eventid' and 'rundir'
49 properties) as an input argument, in which case it will correspond to the
50 same event as the object provided as an argument. One can also provide a
51 module or dictionary containing FileHandlers, which will be used to create
52 a ``FileGraph`` for the Event (i.e. it will specify which files should be
53 made for this event). Defaults to the files module for now, though
54 eventually this should be refactored out.
56 Parameters
57 ----------
58 eventid_or_event : str or EventTuple or llama.filehandler.FileHandlerTuple
59 This can be a string with the unique ID of this event (which can
60 simply be a filename-friendly descriptive string for tests or manual
61 analyses), in which case the next arguments, ``rundir`` and
62 ``pipeline``, will be used; *OR*, alternatively, it can be an
63 ``EventTuple`` (e.g. another ``Event`` instance), ``FileHandlerTuple``
64 (e.g. any ``FileHandler`` instance), or any object with valid
65 ``eventid`` and ``rundir`` attributes. In this case, those attributes
66 from the provided object will be re-used, and the ``rundir`` argument
67 will be ignored. This makes it easy to get a new ``Event`` instance
68 describing the same underlying event but with a different ``Pipeline``
69 specified, or alternatively to get the ``Event`` corresponding to a
70 given ``FileHandler`` *(though in this case you should take care to
71 manually specify the ``Pipeline`` you want to use!)*.
72 rundir : str, optional
73 The ``rundir``, i.e. the directory where all events for a given run
74 are stored, if it differs from the default and is not specified by
75 ``eventid_or_event``.
76 pipeline : llama.pipeline.Pipeline, optional
77 The ``pipeline``, i.e. the set of FileHandlers that we want to generate,
78 if it differs from the default pipeline. If none is provided, use
79 ``DEFAULT_PIPELINE``.
81 Returns
82 -------
83 event : Event
84 A new ``Event`` instance with the given properties.
86 Raises
87 ------
88 ValueError
89 If the ``eventid_or_event`` argument does not conform to the above
90 expectations or if the ``rundir`` directory for the run does not
91 exist, a ValueError with a descriptive message will be thrown.
92 """
94 def clone(self, commit="HEAD", rundir=None, clobber=False):
95 """Make a clone of this event in a temporary directory for quick
96 manipulations on a specific version of a file.
98 Parameters
99 ----------
100 commit : str, optional
101 The commit hash to check out when cloning this event. If not
102 specified, the most recent commit will be used. Unsaved changes
103 will be discarded.
104 rundir : str, optional
105 The run directory in which to store the cloned event. If not
106 specified, a temporary directory will be created and used. The
107 contents of this directory will NOT be deleted automatically.
108 clobber : bool, optional
109 Whether this cloned event should overwrite existing state.
111 Returns
112 -------
113 clone_event : llama.event.Event
114 A clone of this event. The full history is saved, but the specified
115 ``commit`` is checked out. Any uncommitted changes in the working
116 directory will not be copied over to the ``clone_event``. If
117 ``clone_event`` already seems to be a valid event with the correct
118 ``commit`` hash, no further action will be taken (thus repeated
119 cloning has little performance penalty).
121 Raises
122 llama.versioning.GitRepoUninitialized
123 If this is called on an ``Event`` that has not had its git history
124 initialized.
125 IOError
126 If this event already exists in the specified ``rundir`` and is
127 checked out to a different hash, unless ``clobber`` is True, in
128 which case that working directory will be deleted and replaced with
129 the desired commit.
130 """
131 if rundir is None:
132 rundir = mkdtemp(prefix="llama-", suffix="-"+self.eventid)
133 clone_event = Event(self.eventid, rundir=rundir)
134 clone_git = os.path.join(clone_event.eventdir, '.git')
135 if clone_event.exists():
136 if clone_event.git.current_hash == commit:
137 return clone_event
138 if not clobber:
139 current = clone_event.git.current_hash
140 raise IOError(("Clone target {} -> {} has different hash ({})"
141 " than desired ({}); specify ``clobber=True`` "
142 "to force overwrite.").format(self, clone_event,
143 current, commit))
144 if os.path.isdir(clone_git):
145 rmtree(clone_git)
146 elif os.path.isfile(clone_git):
147 os.unlink(clone_git)
148 copytree(os.path.join(self.eventdir, '.git'), clone_git)
149 clone_event.git.reset_hard(commit)
150 return clone_event
152 def save_tarball(self, outfile):
153 """Save this event and all its contents as a gzipped tarball. You
154 should probably use a ``.tar.gz`` extension for the ``outfile`` name.
155 """
156 with tarfile.open(outfile, "w:gz") as tar:
157 tar.add(self.eventdir, arcname=self.eventid)
159 def __new__(cls, eventid_or_event, rundir=DEFAULT_RUN_DIR, pipeline=None):
160 pipeline = DEFAULT_PIPELINE if pipeline is None else pipeline
161 if (hasattr(eventid_or_event, 'eventid') and
162 hasattr(eventid_or_event, 'rundir')):
163 eventid = eventid_or_event.eventid
164 rundir = eventid_or_event.rundir
165 elif isinstance(eventid_or_event, str):
166 eventid = eventid_or_event
167 else:
168 raise ValueError(
169 """eventid_or_event must be a str or have attributes "eventid"
170 and "rundir". instead, received: """ + str(eventid_or_event)
171 )
172 if not os.path.isdir(rundir):
173 raise ValueError('rundir must exist.')
174 rundir = os.path.abspath(rundir)
175 return EventTuple.__new__(cls, eventid, rundir, pipeline)
177 def init(self):
178 """Initialize the directory for this event, making sure it is in a
179 proper state for processing data. Make sure the ``eventdir`` exists by
180 creating it if necessary. Also initializes version control and set
181 flags to the defaults specified in ``FlagsMixin.DEFAULT_FLAGS`` (which
182 ``Event`` inherits).
184 Returns
185 -------
186 self
187 Returns this ``Event`` instance to allow command chaining.
189 Raises
190 ------
191 ValueError
192 If the ``eventdir`` path exists but is not a directory or a link to a
193 directory, we don't want to overwrite it to make an the directory.
194 """
195 if not os.path.isdir(self.eventdir):
196 if os.path.exists(self.eventdir):
197 raise ValueError(
198 (
199 "Tried to make a new directory for a new event with "
200 "``eventdir`` {} (resolving to canonical path {}), "
201 "but a non-directory object already exists at that "
202 "path on the filesystem."
203 ).format(self.eventdir, os.path.realpath(self.eventdir))
204 )
205 os.mkdir(self.eventdir)
206 self.git.init()
207 self.flags = self.flags.DEFAULT_FLAGS
208 return self
210 def compare_contents(self, other):
211 """Compare the file contents of this event to another event using
212 ``filecmp.cmpfiles`` (though results are given as ``FileHandler``
213 instances rather than file paths). Use this to see whether two event
214 directories contain the same contents under a given pipeline.
216 Parameters
217 ----------
218 other : Event, str
219 The other ``Event`` instance to compare this one to, or else a
220 directory containing files that can be compared to this ``Event``
221 (though in that case the filenames must still follow the expected
222 format).
224 Returns
225 -------
226 match : FileGraph
227 A ``FileGraph`` for this ``Event`` whose files have the same
228 contents as those corresponding to the ``other`` event.
229 mismatch : FileGraph
230 A ``FileGraph`` for this ``Event`` whose files have differing
231 contents as those corresponding to the ``other`` event.
232 errors : FileGraph
233 A ``FileGraph`` for this ``Event`` whose corresponding files do
234 not exist or otherwise could not be accessed for comparison (either
235 for the files corresponding to this ``Event`` or the ``other``
236 one).
238 Raises
239 ------
240 ValueError
241 If the ``Pipeline`` instances of this ``Event`` and the ``other``
242 one are not equal, it does not make sense to compare them, and a
243 ``ValueError`` will be raised.
244 """
245 if not isinstance(other, Event):
246 other = type(self).fromdir(other, pipeline=self.pipeline)
247 if self.pipeline != other.pipeline:
248 raise ValueError("Pipeline mismatch: {} vs {}".format(self, other))
249 filenames = [fh.FILENAME for fh in self.files.values()]
250 result = filecmp.cmpfiles(self.eventdir, other.eventdir, filenames)
251 return [self.files.downselect(nameis=l) for l in result]
253 @property
254 def files(self):
255 """Get a ``FileGraph`` full of ``FileHandler`` instances for the
256 files in this event with this particular ``pipeline``."""
257 return self.pipeline.file_handler_instances(self)
259 @classmethod
260 def fromdir(cls, eventdir='.', **kwargs):
261 """Initialize an event just by providing a filepath to its event
262 directory. If no directory is specified, default to the current
263 directory and try to treat that like an event.
264 Note that the returned event will eliminate symbolic links when
265 determining paths for ``rundir`` and ``eventid``. Useful for quickly
266 making events during interactive work.
268 Parameters
269 ----------
270 eventdir : str, optional
271 The event directory from which to initialize a new event.
272 **kwargs
273 Remaining keyword arguments to pass to ``Event()``.
274 """
275 rundir, eventid = os.path.split(os.path.realpath(eventdir))
276 return cls(eventid, rundir=rundir, **kwargs)
278 @property
279 def eventdir(self):
280 """The full path to the directory containing files related to this
281 event."""
282 return os.path.join(self.rundir, self.eventid)
284 def change_time(self):
285 """The time at which the permissions of this event directory were last
286 changed (according to the
287 underlying storage system). Note that you probably are more interested
288 in ``modification_time``."""
289 return os.stat(self.eventdir).st_ctime
291 def modification_time(self):
292 """The time at which this event directory was modified (according to
293 the underlying storage system)."""
294 return os.stat(self.eventdir).st_mtime
296 def v0_time(self):
297 """
298 Return the timestamp of the first file version commit, catching the
299 error if the event does not have versioning initialized/has no versions
300 and returning ``False``.
301 """
302 try:
303 return min([float(t) for t in self.git.hashes('--', pretty='%at')])
304 except (GitRepoUninitialized, ValueError):
305 return False
307 # TODO ADAGR factor out the LVC-specificity.
308 def gpstime(self):
309 """Return the GPS time of this event. Returns -1 if none can be
310 parsed."""
311 try:
312 return self.files.SkymapInfo.event_time_gps
313 except IOError:
314 LOGGER.warning("Could not find GPS time in skymap_info for %s. Is "
315 "this an old event using the deprecated file "
316 "structure?", self.eventid)
317 pass
318 try:
319 return self.files.LvcGcnXml.event_time_gps
320 except (KeyError, IOError):
321 LOGGER.warning("Could not find a VOEvent file either for %s. "
322 "gpstime not defined. Is this really an event "
323 "directory?", self.eventid)
324 except AttributeError:
325 LOGGER.warning("You need to have a VOEvent filehandler in your "
326 "pipeline to fall back on this check.")
327 return -1
329 @property
330 def auxiliary_paths(self):
331 """Names of *possible* auxiliary paths in the directory that are
332 used to track the state of the Event as a whole."""
333 return tuple(EVENT_AUX_PATHS)
335 def exists(self):
336 """Check whether this event already exists."""
337 return os.path.isdir(self.eventdir)
339 @property
340 def cruft_files(self):
341 """Return a list of files in the event directory that are not
342 associated with any file handler nor with event state directories."""
343 non_cruft = {
344 f for fh in self.files.values()
345 for f in {fh.FILENAME}.union(fh.auxiliary_paths)
346 }.union(self.auxiliary_paths)
347 return [f for f in os.listdir(self.eventdir) if f not in non_cruft]
349 def printstatus(self, cruft=False, highlight=None, unicode=True,
350 plot=None):
351 """Get a user-readable message indicating the current status of this
352 event. Include a list of files not in the selected
353 pipeline with ``cruft=True``. Bold lines in the summary table
354 containing strings in ``highlight`` as substrings. Use nice unicode
355 characters and terminal colors with ``unicode=True``, or use plain
356 ascii with ``unicode=False``. Include a status graph plot with
357 ``plot=True``, or exclude it with ``plot=False``; if ``plot=None``,
358 include the plot only if the underlying ``Graph::Easy`` Perl library is
359 available on the host.
360 """
361 res = []
362 log = lambda *a: res.append(''.join(a)) # noqa
363 log(COL.magenta("Event: "), self.eventid)
364 log(COL.magenta("Directory: "), self.eventdir)
365 log(COL.magenta("Directory Exists: "),
366 str(os.path.isdir(self.eventdir)).upper())
367 log(COL.magenta("Directory Under Version Control: "),
368 str(os.path.isdir(os.path.join(self.eventdir, '.git'))).upper())
369 if cruft:
370 log(COL.magenta("Cruft files:"))
371 for fname in self.cruft_files:
372 log(4*' ', COL.YELLOW, fname, COL.CLEAR)
373 graph = self.files.dependency_graph_term(highlight=highlight,
374 unicode=unicode, plot=plot)
375 log('\n'+graph)
376 return '\n'.join(res)
378 def update(self, **downselect):
379 """Generate any files that fit the ``FileGraph`` downselection
380 criteria specified in ``downselect``. By default, generate all files
381 that have not been generated and regenerate all files that have been
382 obsoleted because their data dependencies have changed. Returns
383 ``True`` if files were updated, ``False`` if no files in need of update
384 were found."""
385 return self.files.update(**downselect)
387 def __str__(self):
388 name = type(self).__name__
389 fmt = ('{}(eventid="{}",\n' + len(name)*' ' + ' rundir="{}",\n' +
390 len(name)*' ' + ' pipeline="{}")')
391 return fmt.format(name, self.eventid, self.rundir, self.pipeline)
393 def __repr__(self):
394 return str(self)
397def _print_file_status(filehandler, indent='', log=None):
398 """Print the status of ``filehandler`` recursively to show all missing
399 files that it depends on."""
400 if log is None:
401 log = color_logger()
402 for fhdep in [d(filehandler) for d in filehandler.DEPENDENCIES]:
403 exists = COL.green('y') if fhdep.exists() else COL.red('N')
404 log(indent, "├─", fhdep.FILENAME, ' [exists: ', exists, ']',
405 COL.blue(' -> '), COL.blue(type(fhdep).__name__))
406 if not fhdep.exists():
407 _print_file_status(fhdep, indent=indent+'│ ', log=log)