Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2# Stefan Countryman, July 21, 2016 

3 

4"""A class for interacting with the files associated with a given 

5event. This class contains methods for working with these files individually 

6(with FileHandlers) or as a group. In particular, the update method 

7can be used to abstractly update the files in an event directory 

8with the latest available information. This approach is robust and allows for 

9trivial modifications and additions to the types of data files associated with 

10events. 

11""" 

12 

13import logging 

14import datetime 

15import filecmp 

16import os 

17import tarfile 

18from shutil import copytree, rmtree 

19from tempfile import mkdtemp 

20from collections import namedtuple 

21from llama.classes import LOCAL_TMPDIR_NAME 

22from llama.pipeline import DEFAULT_PIPELINE 

23from llama.utils import color_logger, COLOR as COL, DEFAULT_RUN_DIR 

24from llama.versioning import GitDirMixin, GitRepoUninitialized 

25from llama.filehandler import ( 

26 EventTriggeredFileHandler, 

27 FlagsMixin, 

28) 

29LOGGER = logging.getLogger(__name__) 

30EVENT_AUX_PATHS = [ 

31 '.git', 

32 '.gitignore', 

33 LOCAL_TMPDIR_NAME, 

34 'FLAGS.json', 

35] 

36 

37NOW = datetime.datetime.now 

38 

39 

40# Implement FileHandler as a subclass of a namedtuple to ensure immutability 

41EventTuple = namedtuple("EventTuple", ("eventid", "rundir", "pipeline")) 

42 

43 

44class Event(EventTuple, FlagsMixin, GitDirMixin): 

45 """ 

46 An event object, used to update and access data associated with a 

47 specific trigger (which recieves its own directory or ``eventdir``). 

48 FileHandler or another Event (or anything with 'eventid' and 'rundir' 

49 properties) as an input argument, in which case it will correspond to the 

50 same event as the object provided as an argument. One can also provide a 

51 module or dictionary containing FileHandlers, which will be used to create 

52 a ``FileGraph`` for the Event (i.e. it will specify which files should be 

53 made for this event). Defaults to the files module for now, though 

54 eventually this should be refactored out. 

55 

56 Parameters 

57 ---------- 

58 eventid_or_event : str or EventTuple or llama.filehandler.FileHandlerTuple 

59 This can be a string with the unique ID of this event (which can 

60 simply be a filename-friendly descriptive string for tests or manual 

61 analyses), in which case the next arguments, ``rundir`` and 

62 ``pipeline``, will be used; *OR*, alternatively, it can be an 

63 ``EventTuple`` (e.g. another ``Event`` instance), ``FileHandlerTuple`` 

64 (e.g. any ``FileHandler`` instance), or any object with valid 

65 ``eventid`` and ``rundir`` attributes. In this case, those attributes 

66 from the provided object will be re-used, and the ``rundir`` argument 

67 will be ignored. This makes it easy to get a new ``Event`` instance 

68 describing the same underlying event but with a different ``Pipeline`` 

69 specified, or alternatively to get the ``Event`` corresponding to a 

70 given ``FileHandler`` *(though in this case you should take care to 

71 manually specify the ``Pipeline`` you want to use!)*. 

72 rundir : str, optional 

73 The ``rundir``, i.e. the directory where all events for a given run 

74 are stored, if it differs from the default and is not specified by 

75 ``eventid_or_event``. 

76 pipeline : llama.pipeline.Pipeline, optional 

77 The ``pipeline``, i.e. the set of FileHandlers that we want to generate, 

78 if it differs from the default pipeline. If none is provided, use 

79 ``DEFAULT_PIPELINE``. 

80 

81 Returns 

82 ------- 

83 event : Event 

84 A new ``Event`` instance with the given properties. 

85 

86 Raises 

87 ------ 

88 ValueError 

89 If the ``eventid_or_event`` argument does not conform to the above 

90 expectations or if the ``rundir`` directory for the run does not 

91 exist, a ValueError with a descriptive message will be thrown. 

92 """ 

93 

94 def clone(self, commit="HEAD", rundir=None, clobber=False): 

95 """Make a clone of this event in a temporary directory for quick 

96 manipulations on a specific version of a file. 

97 

98 Parameters 

99 ---------- 

100 commit : str, optional 

101 The commit hash to check out when cloning this event. If not 

102 specified, the most recent commit will be used. Unsaved changes 

103 will be discarded. 

104 rundir : str, optional 

105 The run directory in which to store the cloned event. If not 

106 specified, a temporary directory will be created and used. The 

107 contents of this directory will NOT be deleted automatically. 

108 clobber : bool, optional 

109 Whether this cloned event should overwrite existing state. 

110 

111 Returns 

112 ------- 

113 clone_event : llama.event.Event 

114 A clone of this event. The full history is saved, but the specified 

115 ``commit`` is checked out. Any uncommitted changes in the working 

116 directory will not be copied over to the ``clone_event``. If 

117 ``clone_event`` already seems to be a valid event with the correct 

118 ``commit`` hash, no further action will be taken (thus repeated 

119 cloning has little performance penalty). 

120 

121 Raises 

122 llama.versioning.GitRepoUninitialized 

123 If this is called on an ``Event`` that has not had its git history 

124 initialized. 

125 IOError 

126 If this event already exists in the specified ``rundir`` and is 

127 checked out to a different hash, unless ``clobber`` is True, in 

128 which case that working directory will be deleted and replaced with 

129 the desired commit. 

130 """ 

131 if rundir is None: 

132 rundir = mkdtemp(prefix="llama-", suffix="-"+self.eventid) 

133 clone_event = Event(self.eventid, rundir=rundir) 

134 clone_git = os.path.join(clone_event.eventdir, '.git') 

135 if clone_event.exists(): 

136 if clone_event.git.current_hash == commit: 

137 return clone_event 

138 if not clobber: 

139 current = clone_event.git.current_hash 

140 raise IOError(("Clone target {} -> {} has different hash ({})" 

141 " than desired ({}); specify ``clobber=True`` " 

142 "to force overwrite.").format(self, clone_event, 

143 current, commit)) 

144 if os.path.isdir(clone_git): 

145 rmtree(clone_git) 

146 elif os.path.isfile(clone_git): 

147 os.unlink(clone_git) 

148 copytree(os.path.join(self.eventdir, '.git'), clone_git) 

149 clone_event.git.reset_hard(commit) 

150 return clone_event 

151 

152 def save_tarball(self, outfile): 

153 """Save this event and all its contents as a gzipped tarball. You 

154 should probably use a ``.tar.gz`` extension for the ``outfile`` name. 

155 """ 

156 with tarfile.open(outfile, "w:gz") as tar: 

157 tar.add(self.eventdir, arcname=self.eventid) 

158 

159 def __new__(cls, eventid_or_event, rundir=DEFAULT_RUN_DIR, pipeline=None): 

160 pipeline = DEFAULT_PIPELINE if pipeline is None else pipeline 

161 if (hasattr(eventid_or_event, 'eventid') and 

162 hasattr(eventid_or_event, 'rundir')): 

163 eventid = eventid_or_event.eventid 

164 rundir = eventid_or_event.rundir 

165 elif isinstance(eventid_or_event, str): 

166 eventid = eventid_or_event 

167 else: 

168 raise ValueError( 

169 """eventid_or_event must be a str or have attributes "eventid" 

170 and "rundir". instead, received: """ + str(eventid_or_event) 

171 ) 

172 if not os.path.isdir(rundir): 

173 raise ValueError('rundir must exist.') 

174 rundir = os.path.abspath(rundir) 

175 return EventTuple.__new__(cls, eventid, rundir, pipeline) 

176 

177 def init(self): 

178 """Initialize the directory for this event, making sure it is in a 

179 proper state for processing data. Make sure the ``eventdir`` exists by 

180 creating it if necessary. Also initializes version control and set 

181 flags to the defaults specified in ``FlagsMixin.DEFAULT_FLAGS`` (which 

182 ``Event`` inherits). 

183 

184 Returns 

185 ------- 

186 self 

187 Returns this ``Event`` instance to allow command chaining. 

188 

189 Raises 

190 ------ 

191 ValueError 

192 If the ``eventdir`` path exists but is not a directory or a link to a 

193 directory, we don't want to overwrite it to make an the directory. 

194 """ 

195 if not os.path.isdir(self.eventdir): 

196 if os.path.exists(self.eventdir): 

197 raise ValueError( 

198 ( 

199 "Tried to make a new directory for a new event with " 

200 "``eventdir`` {} (resolving to canonical path {}), " 

201 "but a non-directory object already exists at that " 

202 "path on the filesystem." 

203 ).format(self.eventdir, os.path.realpath(self.eventdir)) 

204 ) 

205 os.mkdir(self.eventdir) 

206 self.git.init() 

207 self.flags = self.flags.DEFAULT_FLAGS 

208 return self 

209 

210 def compare_contents(self, other): 

211 """Compare the file contents of this event to another event using 

212 ``filecmp.cmpfiles`` (though results are given as ``FileHandler`` 

213 instances rather than file paths). Use this to see whether two event 

214 directories contain the same contents under a given pipeline. 

215 

216 Parameters 

217 ---------- 

218 other : Event, str 

219 The other ``Event`` instance to compare this one to, or else a 

220 directory containing files that can be compared to this ``Event`` 

221 (though in that case the filenames must still follow the expected 

222 format). 

223 

224 Returns 

225 ------- 

226 match : FileGraph 

227 A ``FileGraph`` for this ``Event`` whose files have the same 

228 contents as those corresponding to the ``other`` event. 

229 mismatch : FileGraph 

230 A ``FileGraph`` for this ``Event`` whose files have differing 

231 contents as those corresponding to the ``other`` event. 

232 errors : FileGraph 

233 A ``FileGraph`` for this ``Event`` whose corresponding files do 

234 not exist or otherwise could not be accessed for comparison (either 

235 for the files corresponding to this ``Event`` or the ``other`` 

236 one). 

237 

238 Raises 

239 ------ 

240 ValueError 

241 If the ``Pipeline`` instances of this ``Event`` and the ``other`` 

242 one are not equal, it does not make sense to compare them, and a 

243 ``ValueError`` will be raised. 

244 """ 

245 if not isinstance(other, Event): 

246 other = type(self).fromdir(other, pipeline=self.pipeline) 

247 if self.pipeline != other.pipeline: 

248 raise ValueError("Pipeline mismatch: {} vs {}".format(self, other)) 

249 filenames = [fh.FILENAME for fh in self.files.values()] 

250 result = filecmp.cmpfiles(self.eventdir, other.eventdir, filenames) 

251 return [self.files.downselect(nameis=l) for l in result] 

252 

253 @property 

254 def files(self): 

255 """Get a ``FileGraph`` full of ``FileHandler`` instances for the 

256 files in this event with this particular ``pipeline``.""" 

257 return self.pipeline.file_handler_instances(self) 

258 

259 @classmethod 

260 def fromdir(cls, eventdir='.', **kwargs): 

261 """Initialize an event just by providing a filepath to its event 

262 directory. If no directory is specified, default to the current 

263 directory and try to treat that like an event. 

264 Note that the returned event will eliminate symbolic links when 

265 determining paths for ``rundir`` and ``eventid``. Useful for quickly 

266 making events during interactive work. 

267 

268 Parameters 

269 ---------- 

270 eventdir : str, optional 

271 The event directory from which to initialize a new event. 

272 **kwargs 

273 Remaining keyword arguments to pass to ``Event()``. 

274 """ 

275 rundir, eventid = os.path.split(os.path.realpath(eventdir)) 

276 return cls(eventid, rundir=rundir, **kwargs) 

277 

278 @property 

279 def eventdir(self): 

280 """The full path to the directory containing files related to this 

281 event.""" 

282 return os.path.join(self.rundir, self.eventid) 

283 

284 def change_time(self): 

285 """The time at which the permissions of this event directory were last 

286 changed (according to the 

287 underlying storage system). Note that you probably are more interested 

288 in ``modification_time``.""" 

289 return os.stat(self.eventdir).st_ctime 

290 

291 def modification_time(self): 

292 """The time at which this event directory was modified (according to 

293 the underlying storage system).""" 

294 return os.stat(self.eventdir).st_mtime 

295 

296 def v0_time(self): 

297 """ 

298 Return the timestamp of the first file version commit, catching the 

299 error if the event does not have versioning initialized/has no versions 

300 and returning ``False``. 

301 """ 

302 try: 

303 return min([float(t) for t in self.git.hashes('--', pretty='%at')]) 

304 except (GitRepoUninitialized, ValueError): 

305 return False 

306 

307 # TODO ADAGR factor out the LVC-specificity. 

308 def gpstime(self): 

309 """Return the GPS time of this event. Returns -1 if none can be 

310 parsed.""" 

311 try: 

312 return self.files.SkymapInfo.event_time_gps 

313 except IOError: 

314 LOGGER.warning("Could not find GPS time in skymap_info for %s. Is " 

315 "this an old event using the deprecated file " 

316 "structure?", self.eventid) 

317 pass 

318 try: 

319 return self.files.LvcGcnXml.event_time_gps 

320 except (KeyError, IOError): 

321 LOGGER.warning("Could not find a VOEvent file either for %s. " 

322 "gpstime not defined. Is this really an event " 

323 "directory?", self.eventid) 

324 except AttributeError: 

325 LOGGER.warning("You need to have a VOEvent filehandler in your " 

326 "pipeline to fall back on this check.") 

327 return -1 

328 

329 @property 

330 def auxiliary_paths(self): 

331 """Names of *possible* auxiliary paths in the directory that are 

332 used to track the state of the Event as a whole.""" 

333 return tuple(EVENT_AUX_PATHS) 

334 

335 def exists(self): 

336 """Check whether this event already exists.""" 

337 return os.path.isdir(self.eventdir) 

338 

339 @property 

340 def cruft_files(self): 

341 """Return a list of files in the event directory that are not 

342 associated with any file handler nor with event state directories.""" 

343 non_cruft = { 

344 f for fh in self.files.values() 

345 for f in {fh.FILENAME}.union(fh.auxiliary_paths) 

346 }.union(self.auxiliary_paths) 

347 return [f for f in os.listdir(self.eventdir) if f not in non_cruft] 

348 

349 def printstatus(self, cruft=False, highlight=None, unicode=True, 

350 plot=None): 

351 """Get a user-readable message indicating the current status of this 

352 event. Include a list of files not in the selected 

353 pipeline with ``cruft=True``. Bold lines in the summary table 

354 containing strings in ``highlight`` as substrings. Use nice unicode 

355 characters and terminal colors with ``unicode=True``, or use plain 

356 ascii with ``unicode=False``. Include a status graph plot with 

357 ``plot=True``, or exclude it with ``plot=False``; if ``plot=None``, 

358 include the plot only if the underlying ``Graph::Easy`` Perl library is 

359 available on the host. 

360 """ 

361 res = [] 

362 log = lambda *a: res.append(''.join(a)) # noqa 

363 log(COL.magenta("Event: "), self.eventid) 

364 log(COL.magenta("Directory: "), self.eventdir) 

365 log(COL.magenta("Directory Exists: "), 

366 str(os.path.isdir(self.eventdir)).upper()) 

367 log(COL.magenta("Directory Under Version Control: "), 

368 str(os.path.isdir(os.path.join(self.eventdir, '.git'))).upper()) 

369 if cruft: 

370 log(COL.magenta("Cruft files:")) 

371 for fname in self.cruft_files: 

372 log(4*' ', COL.YELLOW, fname, COL.CLEAR) 

373 graph = self.files.dependency_graph_term(highlight=highlight, 

374 unicode=unicode, plot=plot) 

375 log('\n'+graph) 

376 return '\n'.join(res) 

377 

378 def update(self, **downselect): 

379 """Generate any files that fit the ``FileGraph`` downselection 

380 criteria specified in ``downselect``. By default, generate all files 

381 that have not been generated and regenerate all files that have been 

382 obsoleted because their data dependencies have changed. Returns 

383 ``True`` if files were updated, ``False`` if no files in need of update 

384 were found.""" 

385 return self.files.update(**downselect) 

386 

387 def __str__(self): 

388 name = type(self).__name__ 

389 fmt = ('{}(eventid="{}",\n' + len(name)*' ' + ' rundir="{}",\n' + 

390 len(name)*' ' + ' pipeline="{}")') 

391 return fmt.format(name, self.eventid, self.rundir, self.pipeline) 

392 

393 def __repr__(self): 

394 return str(self) 

395 

396 

397def _print_file_status(filehandler, indent='', log=None): 

398 """Print the status of ``filehandler`` recursively to show all missing 

399 files that it depends on.""" 

400 if log is None: 

401 log = color_logger() 

402 for fhdep in [d(filehandler) for d in filehandler.DEPENDENCIES]: 

403 exists = COL.green('y') if fhdep.exists() else COL.red('N') 

404 log(indent, "├─", fhdep.FILENAME, ' [exists: ', exists, ']', 

405 COL.blue(' -> '), COL.blue(type(fhdep).__name__)) 

406 if not fhdep.exists(): 

407 _print_file_status(fhdep, indent=indent+'│ ', log=log)