Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# (c) Stefan Countryman 2019 

2 

3""" 

4Classes for tracking whether files are cooling down after a failed generation 

5attempt or are currently being generated. 

6""" 

7 

8import threading 

9import atexit 

10import os 

11import functools 

12import time 

13import logging 

14import datetime 

15from queue import Queue 

16from collections import namedtuple 

17from dateutil.parser import parse as parsetime 

18from llama.utils import GenerationError 

19from llama.classes import ( 

20 JsonRiderMixin, 

21 RiderFile, 

22 IntentException, 

23 CoolDownException, 

24) 

25 

26LOGGER = logging.getLogger(__name__) 

27LOCK_TIMEOUT = 20 

28 

29# this is just a function, so don't give it an uppercase constant var name 

30# pylint: disable=invalid-name 

31utcnow = datetime.datetime.utcnow 

32 

33 

34 

35CoolDownParams = namedtuple("CoolDownParams", ("base", "increment", "maximum")) 

36 

37 

38class CoolDown(RiderFile, JsonRiderMixin): 

39 """ 

40 An object for keeping track of whether a particular file failed to 

41 generate recently and determining how long the pipeline should wait before 

42 trying again to generate it. Instances of this class are meant to be 

43 calculated as properties from FileHandlers and similar classes. 

44 

45 Parameters 

46 ---------- 

47 manifest : list 

48 List of names of the files that need to be generated (and which might 

49 fail, necessitating a cooldown countdown). 

50 eventdir : str 

51 Path to the directory containing that file. 

52 """ 

53 

54 rider_fmt = '.{}.cooldown.json' 

55 

56 def in_progress(self): 

57 """Determine whether a cooldown is still in progress for generating 

58 this file, i.e. whether this file generation routine failed too 

59 recently for another generation attempt.""" 

60 params = {f.COOLDOWN_PARAMS for f in self.manifest_filehandlers} 

61 assert len(params) == 1 

62 base, inc, maximum = params.pop() 

63 data = self.read_json() 

64 if data is None: 

65 return False # no cooldown files exist 

66 # get time since last attempt and number of attempts 

67 deltat = (utcnow() - parsetime(data['last_attempt'])).total_seconds() 

68 nattempt = data['num_attempts'] 

69 cooldown = min(base+(nattempt-1)*inc, maximum) 

70 return deltat < cooldown 

71 

72 def write(self): 

73 """Write a json file with the name .<FILENAME>.cooldown.json for each 

74 file in the manifest containing 

75 the time of the last attempt at generating the file as well as the 

76 number of consecutive failed attempts at generating the file. Format is 

77 

78 { 

79 "num_attempts": 3, 

80 "last_attempt": "2016-09-12T17:16:20.337867" 

81 } 

82 

83 Where last_attempt in particular is an ISO formatted time string.""" 

84 LOGGER.debug(f"Writing cooldown for {self}") 

85 last_attempt_str = utcnow().isoformat() 

86 olddata = self.read_json() 

87 if olddata is None: 

88 data = {'last_attempt': last_attempt_str, 'num_attempts': 0} 

89 else: 

90 data = { 

91 'last_attempt': last_attempt_str, 

92 'num_attempts': olddata['num_attempts'] + 1 

93 } 

94 self.write_json(data) 

95 

96 

97class Intent(RiderFile, JsonRiderMixin): 

98 """ 

99 An object tracking intent to make a file. Tracks whether another process 

100 should start making this file. Saves the timestamp at which intent expires 

101 to a rider file for each file in a ``FileHandler``'s manifest. 

102 """ 

103 

104 rider_fmt = '.{}.intent.json' 

105 

106 def in_progress(self): 

107 """Check whether this file is already being generated and has not timed 

108 out yet.""" 

109 timeout = self.read_json() 

110 if timeout is None: 

111 return False 

112 return utcnow().timestamp() < timeout 

113 

114 def write(self, timeout): 

115 """Write the ``timeout`` (as a UNIX timestamp) at which this file has 

116 timed out and another file can attempt to generate it.""" 

117 self.write_json(timeout) 

118 

119 

120_CLEANUP_QUEUE = Queue() 

121 

122 

123def _cleanup(cleanup_queue): 

124 """ 

125 Remove inactive locks from ``_CLEANUP_QUEUE`` to keep it from getting too 

126 long. If ``SystemExit`` is read from the ``_CLEANUP_QUEUE``, delete any 

127 files in the queue and end the thread. This runs in a separate thread 

128 and should not be called anywhere else. 

129 """ 

130 while True: 

131 time.sleep(0.05) 

132 cleanup_queue.put(None) 

133 active = set() 

134 finished = set() 

135 go_down = False 

136 lock = cleanup_queue.get() 

137 while lock is not None: 

138 try: 

139 if lock is SystemExit: 

140 LOGGER.info("Got SystemExit in cleanup thread") 

141 go_down = True 

142 elif lock[0] == 'active': 

143 active.add(lock[1]) 

144 elif lock[0] == 'finished': 

145 finished.add(lock[1]) 

146 else: 

147 LOGGER.error("Unrecognized lock instruction: %s", lock) 

148 pass 

149 except (TypeError, IndexError): 

150 LOGGER.error("Unrecognized lock structure: %s", lock) 

151 pass 

152 lock = cleanup_queue.get() 

153 for still_active in active - finished: 

154 if os.path.isdir(still_active): 

155 cleanup_queue.put(('active', still_active)) 

156 else: 

157 LOGGER.debug("%s marked still active but does not exist " 

158 "any more, removing from cleanup queue", 

159 still_active) 

160 pass 

161 if go_down: 

162 LOGGER.debug("GOING DOWN; REMOVE EVENT LOCKS") 

163 while not cleanup_queue.empty(): 

164 lock = cleanup_queue.get() 

165 if lock is SystemExit: 

166 LOGGER.debug("EVENTLOCK TERMINATE SEMAPHORE, SKIPPING") 

167 continue 

168 if lock is None: 

169 LOGGER.debug("EVENTLOCK CLEANUP SEMAPHORE, SKIPPING") 

170 continue 

171 try: 

172 status, path = lock 

173 except (ValueError, TypeError): 

174 LOGGER.critical("UNRECOGNIZED LOCK FORMAT: %s", lock) 

175 continue 

176 if status == 'finished': 

177 continue 

178 if status != 'active': 

179 LOGGER.critical("UNRECOGNIZED LOCK CLEANUP STATUS: %s", 

180 status) 

181 continue 

182 try: 

183 os.rmdir(path) 

184 LOGGER.info("REMOVED LOCK: %s", path) 

185 except FileNotFoundError: 

186 LOGGER.info("NO LOCK TO REMOVE: %s", path) 

187 pass 

188 except NotADirectoryError: 

189 LOGGER.critical("LOCKDIR NOT A DIR, CORRUPTED, " 

190 "MANUALLY RM %s", path) 

191 pass 

192 LOGGER.debug("EVENTLOCK CLEANUP COMPLETED SUCCESSFULLY.") 

193 return 

194 

195 

196_CLEANUP_THREAD = threading.Thread( 

197 target=_cleanup, 

198 args=(_CLEANUP_QUEUE,), 

199 name='LockClean', 

200 daemon=True, 

201) 

202_CLEANUP_THREAD.start() 

203atexit.register(lambda t: t.is_alive() and t.join(), _CLEANUP_THREAD) 

204atexit.register(_CLEANUP_QUEUE.put, SystemExit) 

205 

206 

207class EventLock: 

208 """ 

209 A class for getting a cooperative lock on an ``Event`` directory. 

210 """ 

211 

212 def __init__(self, eventdir): 

213 self.eventdir = eventdir 

214 self._fullpath = os.path.join(eventdir, 'EVENTLOCK') 

215 

216 def acquire(self, wait=True, steal=True): 

217 """ 

218 Try to get a cooperative lock on the current directory. Lock expires 

219 after ``LOCK_TIMEOUT`` seconds. Lock directory will be cleaned up at 

220 exit of program. 

221 

222 Parameters 

223 ---------- 

224 wait : bool, optional 

225 If ``True``, and the lock cannot be acquired, keep waiting until 

226 the lock goes away (note that this will not time out unless 

227 ``steal`` is also ``True``). 

228 steal : bool, optional 

229 If this and ``wait`` are ``True``, if the lock times out without 

230 being deleted, delete it and acquire lock on the assumption that a 

231 previous transaction failed. Note that you can still fail to 

232 acquire lock after the timeout if another thread/process manages to 

233 acquire it after the expiration but before you try to acquire lock. 

234 

235 Returns 

236 ------- 

237 success : bool 

238 Whether lock was acquired. 

239 """ 

240 while True: 

241 try: 

242 os.mkdir(self._fullpath) 

243 _CLEANUP_QUEUE.put(('active', self._fullpath)) 

244 LOGGER.debug('Acquired EventLock for %s', self.eventdir) 

245 return True 

246 except FileExistsError: 

247 if not wait: 

248 LOGGER.debug('EventLock failed for %s', self.eventdir) 

249 return False 

250 if steal and time.time() > self.expiration(): 

251 LOGGER.debug('Stealing timed-out EventLock for %s', 

252 self.eventdir) 

253 self.release() 

254 time.sleep(0.05) 

255 

256 def renew(self): 

257 """ 

258 Set the modification time of the lock directory to the current time, 

259 effectively extending the timeout. If the lockdir does not exist, 

260 ``FileNotFound`` will be raised. It's up to you to make sure to keep 

261 renewing your lock well before ``LOCK_TIMEOUT`` to make sure you don't 

262 lose lock. 

263 """ 

264 os.utime(self._fullpath) 

265 

266 def release(self): 

267 """ 

268 Try removing lock. Returns ``True`` if an existing lock was 

269 successfully removed and ``False`` if the lock did not previously 

270 exist. 

271 """ 

272 try: 

273 os.rmdir(self._fullpath) 

274 _CLEANUP_QUEUE.put(('finished', self._fullpath)) 

275 LOGGER.debug("Removed event lock for %s", self.eventdir) 

276 return True 

277 except FileNotFoundError: 

278 return False 

279 

280 def expiration(self): 

281 """ 

282 Get the UNIX timestamp for when the lock expires. If not locked, 

283 returns ``0``. After this time, it's likely that the locking process 

284 crashed and the lock can be safely freed. 

285 """ 

286 try: 

287 return os.path.getmtime(self._fullpath) + LOCK_TIMEOUT 

288 except FileNotFoundError: 

289 return 0 

290 

291 

292class CoolDownMixin: 

293 """ 

294 Add a ``cooldown`` property to ``FileHandler`` class. 

295 """ 

296 

297 @property 

298 def cooldown(self): 

299 """ 

300 Set and get information about whether an attempt to generate this file 

301 failed, and, if so, whether enough time has passed to warrant retrying 

302 that file's generation. 

303 """ 

304 return CoolDown(self.eventdir, self.manifest_filehandlers) 

305 

306 @staticmethod 

307 def decorate_checkout(func): 

308 """ 

309 Make sure the file is not cooling down before trying to generate it. If 

310 it is cooling down, raise a ``GenerationError``. This will wrap a 

311 ``FileHandler.generate`` method, so it needs to have the same 

312 signature. 

313 """ 

314 

315 @functools.wraps(func) 

316 def wrapper(self, *args, **kwargs): 

317 """ 

318 Make sure the file is not cooling down before trying to generate 

319 it. If it is cooling down, raise a ``GenerationError``. 

320 """ 

321 if self.cooldown.in_progress(): 

322 raise CoolDownException(f'cooldown in progress for {self}, try' 

323 ' later or delete cooldown file with ' 

324 '{self}.cooldown.delete()') 

325 try: 

326 return func(self, *args, **kwargs) 

327 except: # noqa 

328 self.cooldown.write() 

329 raise 

330 

331 return wrapper 

332 

333 @staticmethod 

334 def decorate_checkin(func): 

335 """ 

336 If file generation failed, mark the file as cooling down when the 

337 generation attempt is being checked back in. 

338 """ 

339 

340 @functools.wraps(func) 

341 def wrapper(self, gen_result, *args, **kwargs): 

342 """ 

343 If file generation failed, mark the file as cooling down when the 

344 generation attempt is being checked back in. Checking in the file 

345 will reraise any exceptions raised during generation, so catch and 

346 handle them here. Also remove any old cooldown files if file 

347 generation succeeded this time. 

348 """ 

349 try: 

350 res = func(self, gen_result, *args, **kwargs) 

351 self.cooldown.delete() 

352 return res 

353 except: # noqa 

354 self.cooldown.write() 

355 raise 

356 

357 return wrapper 

358 

359 

360class IntentMixin: 

361 

362 @property 

363 def intent(self): 

364 """ 

365 Set and get information about whether this file is being generated. 

366 """ 

367 return Intent(self.eventdir, self.manifest_filehandlers) 

368 

369 @property 

370 def eventlock(self): 

371 """ 

372 Get a cooperative lock on the event directory. 

373 """ 

374 return EventLock(self.eventdir) 

375 

376 @staticmethod 

377 def decorate_checkout(func): 

378 """ 

379 Make sure the file is not currently being generated before trying to 

380 generate it. If it is currently being generated, raise a 

381 ``GenerationError``. This will wrap ``FileHandler.checkout``, so it 

382 needs to have the same signature. 

383 """ 

384 

385 @functools.wraps(func) 

386 def wrapper(self, *args, **kwargs): 

387 """ 

388 Make sure the file is not currently being generated before trying 

389 to generate it. If it is currently being generated, raise a 

390 ``IntentException``. 

391 """ 

392 if self.intent.in_progress(): 

393 raise IntentException('file being generated, try later: ' 

394 f'{self}') 

395 self.eventlock.acquire() 

396 try: 

397 self.intent.write(utcnow().timestamp() + self.TIMEOUT) 

398 return func(self, *args, **kwargs) 

399 except: 

400 self.intent.delete() 

401 raise 

402 finally: 

403 self.eventlock.release() 

404 

405 return wrapper 

406 

407 @staticmethod 

408 def decorate_checkin(func): 

409 """ 

410 Make sure the file's intent is still set, and clear it up regardless of 

411 whether checkin succeeds. This will wrap ``FileHandler.checkin``, so it 

412 needs to have the same signature. 

413 """ 

414 

415 @functools.wraps(func) 

416 def wrapper(self, gen_result, *args, **kwargs): 

417 """ 

418 Make sure the file's intent is still set, and clear it up 

419 regardless of whether checkin succeeds. If it is currently being 

420 generated, raise a ``GenerationError``. 

421 """ 

422 # someone deleted the intent file; give up on committing 

423 # finished file 

424 if not self.intent.exists(): 

425 raise GenerationError("Intent file removed, not committing" 

426 f" finished file: {self}") 

427 self.eventlock.acquire() # wait till we get lock 

428 try: 

429 return func(self, gen_result, *args, **kwargs) 

430 finally: 

431 self.intent.delete() 

432 self.eventlock.release() 

433 

434 return wrapper