Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# (c) Stefan Countryman, 2019 

2 

3""" 

4Base classes for input/output operations. 

5""" 

6 

7import functools 

8from abc import ABC, abstractmethod 

9from pathlib import Path 

10from concurrent.futures import Future 

11from typing import ( 

12 List, 

13 Iterable, 

14 NamedTuple, 

15 Optional, 

16 Callable, 

17 Type, 

18 Tuple, 

19) 

20from .registry import io_class_from_rundir 

21 

22 

23class NamedTupleABCMeta(NamedTuple.__class__, ABC.__class__): 

24 """ 

25 Metaclass resolver for ``NamedTuple`` and ``ABC``. See: 

26 

27 https://stackoverflow.com/questions/11276037/resolving-metaclass-conflicts 

28 """ 

29 

30 

31class IOComponent(ABC): 

32 """ 

33 Base class providing utility methods for all ``IO`` components. 

34 """ 

35 

36 

37class RunIOComponent(NamedTuple, IOComponent, metaclass=NamedTupleABCMeta): 

38 """ 

39 Base class for an IO component of ``IO`` that needs to know the active run. 

40 """ 

41 rundir: str 

42 

43 

44class EventIOComponent(NamedTuple, IOComponent, metaclass=NamedTupleABCMeta): 

45 """ 

46 Base class for an IO component of ``IO`` that needs to know the active run 

47 and the eventid. 

48 """ 

49 rundir: str 

50 eventid: str 

51 

52 

53class ManifestIOComponent(NamedTuple, IOComponent, 

54 metaclass=NamedTupleABCMeta): 

55 """ 

56 Base class for an IO component of ``IO`` that needs to know the active run, 

57 eventid, and a manifest of filenames related to file generation. All 

58 filenames in the manifest share a ``rundir`` and ``eventid``. 

59 """ 

60 rundir: str 

61 eventid: str 

62 manifest: Tuple[str] 

63 

64 

65class RunIO(RunIOComponent): 

66 """ 

67 Base class for input/output operations on ``llama.Run`` classes. 

68 """ 

69 

70 

71class EventIO(EventIOComponent): 

72 """ 

73 Base class for input/output operations on ``llama.Events`` classes. 

74 """ 

75 

76 

77class FileHandlerIO(ManifestIOComponent): 

78 """ 

79 Base class for input/output operations on ``llama.FileHandler`` classes. 

80 """ 

81 

82 

83class GraphExecutor(ABC): 

84 """ 

85 An implementation-agnostic interface for submitting iterables of 

86 ``FileHandler`` instances that need to be updated. 

87 """ 

88 

89 @classmethod 

90 @abstractmethod 

91 def submit(cls, graph) -> Iterable[Tuple]: 

92 """ 

93 Submit a ``FileGraph`` instance ``graph`` for (potentially parallel) 

94 generation of its files. Returns an iterable of ``Tuple[FileHandler, 

95 Future]`` instances matching the ``FileHandler`` instance that is 

96 being generated to a ``Future`` that will either return the 

97 same successfully-generated ``FileHandler`` instance or raise any 

98 exceptions occuring during generation when its ``result`` method is 

99 called. *An attempt will be made to generate all files in the graph, 

100 so downselect accordingly.* 

101 """ 

102 

103 

104class VersioningIO(ManifestIOComponent): 

105 """ 

106 Base class for input/output operations on ``llama.Event`` versioning 

107 classes. 

108 """ 

109 

110 @abstractmethod 

111 def commit_changes(self, message): 

112 """ 

113 Commit the current snapshot of the event and record a descriptive 

114 message of the last round of changes. 

115 """ 

116 

117 @property 

118 def current_hash(self): 

119 """ 

120 Get a unique ID for the latest version of this event. The ID can vary 

121 by storage engine, but it must be unique to the file configuration 

122 specified. Equivalent to ``hashes`` with ``max_results=1`` and 

123 ``filenames=['--']``. 

124 """ 

125 return self.hashes('--', max_results=1)[0] 

126 

127 @abstractmethod 

128 def reset_hard(self, ref=None): 

129 """ 

130 Reset the state of the pipeline to the specified ``ref`` (as returned 

131 by ``current_hash`` or ``hashes``), cleaning messy state if it exists 

132 (for transactional storage engines, such state does not exist). This 

133 does not delete history; instead, it labels ``ref`` as the official 

134 current snapshot from which future history will be generated. To clean 

135 state and reset to the last good commit, leave ``ref=None``. 

136 """ 

137 

138 @abstractmethod 

139 def init(self): 

140 """ 

141 Perform any initialization tasks required to record versions of an 

142 event. 

143 """ 

144 

145 @abstractmethod 

146 def hashes(self, *filenames, pretty="", last_hash=None, max_results=None): 

147 """ 

148 Return a list of unique IDs corresponding to event snapshots that 

149 include updates to the specified filenames. The IDs can bary by storage 

150 engine, but they must be unique to the file configuration specified by 

151 each snapshot. If specified, return at most ``max_results`` results. 

152 Returns a list of unique IDs, with optional ``pretty`` formatting 

153 following git pretty-print format string conventions (note that not all 

154 pretty print features might be implemented). If specified, return 

155 results up-to and including ``last_hash``. 

156 """ 

157 

158 @abstractmethod 

159 def filename_for_download(self, filename, last_hash=None): 

160 # TODO copy this implementation from versioning.py 

161 pass 

162 

163 # FIXME this should be the interface, and git.remove should operate on 

164 # individual files/aux files. fix wherever this happens. 

165 @abstractmethod 

166 def remove(self, *filehandlers): 

167 """ 

168 Create a new snapshot with ``filehandlers`` and any auxilliary data 

169 removed. 

170 """ 

171 

172 # FIXME rename to ``exists`` wherever it occurs. 

173 @abstractmethod 

174 def is_repo(self): 

175 """ 

176 Whether snapshots of this event exist in the storage engine. This can 

177 be distinct from the question of whether files for this event exist in 

178 storage. 

179 """ 

180 

181 @abstractmethod 

182 def text_graph(self, *filenames, style='html'): 

183 pass 

184 # TODO copy implementationfrom versioning.py 

185 

186 @abstractmethod 

187 def show_log(self, ref): 

188 """ 

189 Show the commit message for the specified ``ref``, which can be a UID 

190 of the sort returned by ``hashes`` or else a storage-engine-specific 

191 unambiguous identifier for a snapshot. 

192 """ 

193 

194 

195class EventLock(EventIOComponent): 

196 """ 

197 Base class for input/output operations on an ``EventLock`` object. 

198 """ 

199 # TODO fill out 

200 

201 

202class IntentIO(ManifestIOComponent): 

203 """ 

204 Base class for input/output operations on an ``Intent`` object. 

205 """ 

206 # TODO fill out 

207 

208 

209class CoolDownIO(ManifestIOComponent): 

210 """ 

211 Base class for input/output operations on a cooldown object. 

212 """ 

213 # TODO fill out 

214 

215 

216class FlagsIO(EventIOComponent): 

217 """ 

218 Base class for input/output operations on ``llama.Event`` flags. 

219 """ 

220 # TODO fill out 

221 

222 

223class VetoIO(ManifestIOComponent): 

224 """ 

225 Base class for input/output operations on ``llama.FileHandler`` vetoes. 

226 """ 

227 # TODO fill out 

228 

229class LockIO(ManifestIOComponent): 

230 """ 

231 Base class for input/output operations on ``llama.FileHandler`` version 

232 locks. 

233 """ 

234 # TODO fill out 

235 

236class MetaDataIO(ManifestIOComponent): 

237 """ 

238 Base class for input/output operations on ``llama.FileHandler`` metadata. 

239 """ 

240 # TODO fill out 

241 

242 

243class IO(NamedTuple): 

244 """ 

245 An abstract interface for safe input/output primitives used by LLAMA. 

246 Allows for multiple underlying file storage and computational 

247 implementations. Partial implementations are allowed by omitting arguments 

248 for certain components. 

249 """ 

250 schemes: Tuple[str] 

251 generate: GraphExecutor = None 

252 run: Optional[Type[RunIO]] = None 

253 event: Optional[Type[EventIO]] = None 

254 fh: Optional[Type[FileHandlerIO]] = None 

255 ver: Optional[Type[VersioningIO]] = None 

256 eventlock: Optional[Type[EventLock]] = None 

257 intent: Optional[Type[IntentIO]] = None 

258 cooldown: Optional[Type[CoolDownIO]] = None 

259 flags: Optional[Type[FlagsIO]] = None 

260 veto: Optional[Type[VetoIO]] = None 

261 lock: Optional[Type[LockIO]] = None 

262 meta: Optional[Type[MetaDataIO]] = None 

263 

264 

265def io_fetcher_helper(func: Callable): 

266 """ 

267 Wrapper to implement ``IO`` instantiation. 

268 """ 

269 

270 @functools.wraps(func) 

271 def wrapper(self): 

272 hiddenname = '_'+func.__name__ 

273 if not hasattr(self, hiddenname): 

274 io = getattr(self.io, func.__name__) 

275 if io is None: 

276 raise NotImplementedError() 

277 if isinstance(io, ManifestIOComponent): 

278 if self.eventid is None or self.manifest is None: 

279 raise TypeError(f"{type(self)} cannot instantiate {io}") 

280 setattr(self, hiddenname, 

281 io(self.rundir, self.eventid, self.manifest)) 

282 elif isinstance(io, EventIOComponent): 

283 if self.eventid is None: 

284 raise TypeError(f"{type(self)} cannot instantiate {io}") 

285 setattr(self, hiddenname, io(self.rundir, self.eventid)) 

286 else: 

287 setattr(self, hiddenname, io(self.rundir)) 

288 return getattr(self, hiddenname) 

289 

290 rettype = wrapper.__annotations__['return'] 

291 wrapper.__doc__ = rettype.__doc__ 

292 wrapper.__doc__ += (f" This property returns an instance of ``{rettype}`` " 

293 "with specific implementation set by IO class " 

294 "and ``rundir``, ``eventid``, and ``manifest`` " 

295 "properties. Will raise a ``TypeError`` if any of " 

296 "those argument are required by the ``IOComponent`` " 

297 "but unavailable for this object type.") 

298 

299 return wrapper 

300 

301 

302class IOFetcher(NamedTuple): 

303 """ 

304 Class for fetching instantiated 

305 """ 

306 io: IO 

307 rundir: str 

308 eventid: Optional[str] = None 

309 manifest: Optional[Tuple[str]] = None 

310 

311 @property 

312 @io_fetcher_helper 

313 def run(self) -> RunIO: 

314 pass 

315 

316 @property 

317 @io_fetcher_helper 

318 def event(self) -> EventIO: 

319 pass 

320 

321 @property 

322 @io_fetcher_helper 

323 def fh(self) -> FileHandlerIO: 

324 pass 

325 

326 @property 

327 @io_fetcher_helper 

328 def ver(self) -> VersioningIO: 

329 pass 

330 

331 @property 

332 @io_fetcher_helper 

333 def eventlock(self) -> EventLock: 

334 pass 

335 

336 @property 

337 @io_fetcher_helper 

338 def intent(self) -> IntentIO: 

339 pass 

340 

341 @property 

342 @io_fetcher_helper 

343 def cooldown(self) -> CoolDownIO: 

344 pass 

345 

346 @property 

347 @io_fetcher_helper 

348 def flags(self) -> FlagsIO: 

349 pass 

350 

351 @property 

352 @io_fetcher_helper 

353 def veto(self) -> VetoIO: 

354 pass 

355 

356 @property 

357 @io_fetcher_helper 

358 def lock(self) -> LockIO: 

359 pass 

360 

361 @property 

362 @io_fetcher_helper 

363 def meta(self) -> MetaDataIO: 

364 pass 

365 

366 

367class IOMixin: 

368 """ 

369 A mixin providing an ``io`` attribute returning an ``IOFetcher`` for this 

370 instance with the correct ``IO`` implementation and attributes. 

371 """ 

372 

373 @property 

374 def io(self): 

375 """ 

376 An implementation-agnostic interface for input/output primitives. The 

377 specific implementation will be chosen based on the URL scheme of the 

378 ``rundir`` attribute of this class; see ``llama.io.get_schemes`` for a 

379 list of registered IO schemes. 

380 """ 

381 if not hasattr(self, '_io'): 

382 setattr( 

383 self, 

384 '_io', 

385 IOFetcher( 

386 io=io_class_from_rundir(self.rundir), 

387 rundir=self.rundir, 

388 eventid=getattr(self, 'eventid', None), 

389 manifest=getattr(self, 'MANIFEST', None), 

390 ) 

391 ) 

392 return getattr(self, '_io')