Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# (c) Stefan Countryman 2019
3"""
4Classes for tracking whether files are cooling down after a failed generation
5attempt or are currently being generated.
6"""
8import threading
9import atexit
10import os
11import functools
12import time
13import logging
14import datetime
15from queue import Queue
16from collections import namedtuple
17from dateutil.parser import parse as parsetime
18from llama.utils import GenerationError
19from llama.classes import (
20 JsonRiderMixin,
21 RiderFile,
22 IntentException,
23 CoolDownException,
24)
26LOGGER = logging.getLogger(__name__)
27LOCK_TIMEOUT = 20
29# this is just a function, so don't give it an uppercase constant var name
30# pylint: disable=invalid-name
31utcnow = datetime.datetime.utcnow
35CoolDownParams = namedtuple("CoolDownParams", ("base", "increment", "maximum"))
38class CoolDown(RiderFile, JsonRiderMixin):
39 """
40 An object for keeping track of whether a particular file failed to
41 generate recently and determining how long the pipeline should wait before
42 trying again to generate it. Instances of this class are meant to be
43 calculated as properties from FileHandlers and similar classes.
45 Parameters
46 ----------
47 manifest : list
48 List of names of the files that need to be generated (and which might
49 fail, necessitating a cooldown countdown).
50 eventdir : str
51 Path to the directory containing that file.
52 """
54 rider_fmt = '.{}.cooldown.json'
56 def in_progress(self):
57 """Determine whether a cooldown is still in progress for generating
58 this file, i.e. whether this file generation routine failed too
59 recently for another generation attempt."""
60 params = {f.COOLDOWN_PARAMS for f in self.manifest_filehandlers}
61 assert len(params) == 1
62 base, inc, maximum = params.pop()
63 data = self.read_json()
64 if data is None:
65 return False # no cooldown files exist
66 # get time since last attempt and number of attempts
67 deltat = (utcnow() - parsetime(data['last_attempt'])).total_seconds()
68 nattempt = data['num_attempts']
69 cooldown = min(base+(nattempt-1)*inc, maximum)
70 return deltat < cooldown
72 def write(self):
73 """Write a json file with the name .<FILENAME>.cooldown.json for each
74 file in the manifest containing
75 the time of the last attempt at generating the file as well as the
76 number of consecutive failed attempts at generating the file. Format is
78 {
79 "num_attempts": 3,
80 "last_attempt": "2016-09-12T17:16:20.337867"
81 }
83 Where last_attempt in particular is an ISO formatted time string."""
84 LOGGER.debug(f"Writing cooldown for {self}")
85 last_attempt_str = utcnow().isoformat()
86 olddata = self.read_json()
87 if olddata is None:
88 data = {'last_attempt': last_attempt_str, 'num_attempts': 0}
89 else:
90 data = {
91 'last_attempt': last_attempt_str,
92 'num_attempts': olddata['num_attempts'] + 1
93 }
94 self.write_json(data)
97class Intent(RiderFile, JsonRiderMixin):
98 """
99 An object tracking intent to make a file. Tracks whether another process
100 should start making this file. Saves the timestamp at which intent expires
101 to a rider file for each file in a ``FileHandler``'s manifest.
102 """
104 rider_fmt = '.{}.intent.json'
106 def in_progress(self):
107 """Check whether this file is already being generated and has not timed
108 out yet."""
109 timeout = self.read_json()
110 if timeout is None:
111 return False
112 return utcnow().timestamp() < timeout
114 def write(self, timeout):
115 """Write the ``timeout`` (as a UNIX timestamp) at which this file has
116 timed out and another file can attempt to generate it."""
117 self.write_json(timeout)
120_CLEANUP_QUEUE = Queue()
123def _cleanup(cleanup_queue):
124 """
125 Remove inactive locks from ``_CLEANUP_QUEUE`` to keep it from getting too
126 long. If ``SystemExit`` is read from the ``_CLEANUP_QUEUE``, delete any
127 files in the queue and end the thread. This runs in a separate thread
128 and should not be called anywhere else.
129 """
130 while True:
131 time.sleep(0.05)
132 cleanup_queue.put(None)
133 active = set()
134 finished = set()
135 go_down = False
136 lock = cleanup_queue.get()
137 while lock is not None:
138 try:
139 if lock is SystemExit:
140 LOGGER.info("Got SystemExit in cleanup thread")
141 go_down = True
142 elif lock[0] == 'active':
143 active.add(lock[1])
144 elif lock[0] == 'finished':
145 finished.add(lock[1])
146 else:
147 LOGGER.error("Unrecognized lock instruction: %s", lock)
148 pass
149 except (TypeError, IndexError):
150 LOGGER.error("Unrecognized lock structure: %s", lock)
151 pass
152 lock = cleanup_queue.get()
153 for still_active in active - finished:
154 if os.path.isdir(still_active):
155 cleanup_queue.put(('active', still_active))
156 else:
157 LOGGER.debug("%s marked still active but does not exist "
158 "any more, removing from cleanup queue",
159 still_active)
160 pass
161 if go_down:
162 LOGGER.debug("GOING DOWN; REMOVE EVENT LOCKS")
163 while not cleanup_queue.empty():
164 lock = cleanup_queue.get()
165 if lock is SystemExit:
166 LOGGER.debug("EVENTLOCK TERMINATE SEMAPHORE, SKIPPING")
167 continue
168 if lock is None:
169 LOGGER.debug("EVENTLOCK CLEANUP SEMAPHORE, SKIPPING")
170 continue
171 try:
172 status, path = lock
173 except (ValueError, TypeError):
174 LOGGER.critical("UNRECOGNIZED LOCK FORMAT: %s", lock)
175 continue
176 if status == 'finished':
177 continue
178 if status != 'active':
179 LOGGER.critical("UNRECOGNIZED LOCK CLEANUP STATUS: %s",
180 status)
181 continue
182 try:
183 os.rmdir(path)
184 LOGGER.info("REMOVED LOCK: %s", path)
185 except FileNotFoundError:
186 LOGGER.info("NO LOCK TO REMOVE: %s", path)
187 pass
188 except NotADirectoryError:
189 LOGGER.critical("LOCKDIR NOT A DIR, CORRUPTED, "
190 "MANUALLY RM %s", path)
191 pass
192 LOGGER.debug("EVENTLOCK CLEANUP COMPLETED SUCCESSFULLY.")
193 return
196_CLEANUP_THREAD = threading.Thread(
197 target=_cleanup,
198 args=(_CLEANUP_QUEUE,),
199 name='LockClean',
200 daemon=True,
201)
202_CLEANUP_THREAD.start()
203atexit.register(lambda t: t.is_alive() and t.join(), _CLEANUP_THREAD)
204atexit.register(_CLEANUP_QUEUE.put, SystemExit)
207class EventLock:
208 """
209 A class for getting a cooperative lock on an ``Event`` directory.
210 """
212 def __init__(self, eventdir):
213 self.eventdir = eventdir
214 self._fullpath = os.path.join(eventdir, 'EVENTLOCK')
216 def acquire(self, wait=True, steal=True):
217 """
218 Try to get a cooperative lock on the current directory. Lock expires
219 after ``LOCK_TIMEOUT`` seconds. Lock directory will be cleaned up at
220 exit of program.
222 Parameters
223 ----------
224 wait : bool, optional
225 If ``True``, and the lock cannot be acquired, keep waiting until
226 the lock goes away (note that this will not time out unless
227 ``steal`` is also ``True``).
228 steal : bool, optional
229 If this and ``wait`` are ``True``, if the lock times out without
230 being deleted, delete it and acquire lock on the assumption that a
231 previous transaction failed. Note that you can still fail to
232 acquire lock after the timeout if another thread/process manages to
233 acquire it after the expiration but before you try to acquire lock.
235 Returns
236 -------
237 success : bool
238 Whether lock was acquired.
239 """
240 while True:
241 try:
242 os.mkdir(self._fullpath)
243 _CLEANUP_QUEUE.put(('active', self._fullpath))
244 LOGGER.debug('Acquired EventLock for %s', self.eventdir)
245 return True
246 except FileExistsError:
247 if not wait:
248 LOGGER.debug('EventLock failed for %s', self.eventdir)
249 return False
250 if steal and time.time() > self.expiration():
251 LOGGER.debug('Stealing timed-out EventLock for %s',
252 self.eventdir)
253 self.release()
254 time.sleep(0.05)
256 def renew(self):
257 """
258 Set the modification time of the lock directory to the current time,
259 effectively extending the timeout. If the lockdir does not exist,
260 ``FileNotFound`` will be raised. It's up to you to make sure to keep
261 renewing your lock well before ``LOCK_TIMEOUT`` to make sure you don't
262 lose lock.
263 """
264 os.utime(self._fullpath)
266 def release(self):
267 """
268 Try removing lock. Returns ``True`` if an existing lock was
269 successfully removed and ``False`` if the lock did not previously
270 exist.
271 """
272 try:
273 os.rmdir(self._fullpath)
274 _CLEANUP_QUEUE.put(('finished', self._fullpath))
275 LOGGER.debug("Removed event lock for %s", self.eventdir)
276 return True
277 except FileNotFoundError:
278 return False
280 def expiration(self):
281 """
282 Get the UNIX timestamp for when the lock expires. If not locked,
283 returns ``0``. After this time, it's likely that the locking process
284 crashed and the lock can be safely freed.
285 """
286 try:
287 return os.path.getmtime(self._fullpath) + LOCK_TIMEOUT
288 except FileNotFoundError:
289 return 0
292class CoolDownMixin:
293 """
294 Add a ``cooldown`` property to ``FileHandler`` class.
295 """
297 @property
298 def cooldown(self):
299 """
300 Set and get information about whether an attempt to generate this file
301 failed, and, if so, whether enough time has passed to warrant retrying
302 that file's generation.
303 """
304 return CoolDown(self.eventdir, self.manifest_filehandlers)
306 @staticmethod
307 def decorate_checkout(func):
308 """
309 Make sure the file is not cooling down before trying to generate it. If
310 it is cooling down, raise a ``GenerationError``. This will wrap a
311 ``FileHandler.generate`` method, so it needs to have the same
312 signature.
313 """
315 @functools.wraps(func)
316 def wrapper(self, *args, **kwargs):
317 """
318 Make sure the file is not cooling down before trying to generate
319 it. If it is cooling down, raise a ``GenerationError``.
320 """
321 if self.cooldown.in_progress():
322 raise CoolDownException(f'cooldown in progress for {self}, try'
323 ' later or delete cooldown file with '
324 '{self}.cooldown.delete()')
325 try:
326 return func(self, *args, **kwargs)
327 except: # noqa
328 self.cooldown.write()
329 raise
331 return wrapper
333 @staticmethod
334 def decorate_checkin(func):
335 """
336 If file generation failed, mark the file as cooling down when the
337 generation attempt is being checked back in.
338 """
340 @functools.wraps(func)
341 def wrapper(self, gen_result, *args, **kwargs):
342 """
343 If file generation failed, mark the file as cooling down when the
344 generation attempt is being checked back in. Checking in the file
345 will reraise any exceptions raised during generation, so catch and
346 handle them here. Also remove any old cooldown files if file
347 generation succeeded this time.
348 """
349 try:
350 res = func(self, gen_result, *args, **kwargs)
351 self.cooldown.delete()
352 return res
353 except: # noqa
354 self.cooldown.write()
355 raise
357 return wrapper
360class IntentMixin:
362 @property
363 def intent(self):
364 """
365 Set and get information about whether this file is being generated.
366 """
367 return Intent(self.eventdir, self.manifest_filehandlers)
369 @property
370 def eventlock(self):
371 """
372 Get a cooperative lock on the event directory.
373 """
374 return EventLock(self.eventdir)
376 @staticmethod
377 def decorate_checkout(func):
378 """
379 Make sure the file is not currently being generated before trying to
380 generate it. If it is currently being generated, raise a
381 ``GenerationError``. This will wrap ``FileHandler.checkout``, so it
382 needs to have the same signature.
383 """
385 @functools.wraps(func)
386 def wrapper(self, *args, **kwargs):
387 """
388 Make sure the file is not currently being generated before trying
389 to generate it. If it is currently being generated, raise a
390 ``IntentException``.
391 """
392 if self.intent.in_progress():
393 raise IntentException('file being generated, try later: '
394 f'{self}')
395 self.eventlock.acquire()
396 try:
397 self.intent.write(utcnow().timestamp() + self.TIMEOUT)
398 return func(self, *args, **kwargs)
399 except:
400 self.intent.delete()
401 raise
402 finally:
403 self.eventlock.release()
405 return wrapper
407 @staticmethod
408 def decorate_checkin(func):
409 """
410 Make sure the file's intent is still set, and clear it up regardless of
411 whether checkin succeeds. This will wrap ``FileHandler.checkin``, so it
412 needs to have the same signature.
413 """
415 @functools.wraps(func)
416 def wrapper(self, gen_result, *args, **kwargs):
417 """
418 Make sure the file's intent is still set, and clear it up
419 regardless of whether checkin succeeds. If it is currently being
420 generated, raise a ``GenerationError``.
421 """
422 # someone deleted the intent file; give up on committing
423 # finished file
424 if not self.intent.exists():
425 raise GenerationError("Intent file removed, not committing"
426 f" finished file: {self}")
427 self.eventlock.acquire() # wait till we get lock
428 try:
429 return func(self, gen_result, *args, **kwargs)
430 finally:
431 self.intent.delete()
432 self.eventlock.release()
434 return wrapper