Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# (c) Stefan Countryman 2019 

2 

3""" 

4Generate files locally. 

5""" 

6 

7import atexit 

8import logging 

9from typing import Iterable, Tuple 

10from concurrent.futures import ProcessPoolExecutor, Future 

11from threading import Lock 

12from llama.classes import AbstractFileHandler, optional_env_var 

13from llama.utils import label_worker_proc 

14from ..classes import GraphExecutor 

15 

16LOGGER = logging.getLogger(__name__) 

17_LOCK = Lock() 

18 

19_DEFAULT_MAX_WORKERS = '4' 

20MAX_WORKERS = int(optional_env_var( 

21 ['LLAMA_IO_DEFAULT_GEN_MAX_WORKERS'], 

22 f""" 

23 Specify the maximum number of subprocesses to use for local file generation 

24 using LLAMA's `default` file generation manager. If not specified, the 

25 default of {_DEFAULT_MAX_WORKERS} subprocesses will be used. 

26 """, 

27 [_DEFAULT_MAX_WORKERS], 

28)[0]) 

29 

30 

31class MultiprocessingGraphExecutor(GraphExecutor): 

32 """ 

33 Submit iterables of ``AbstractFileHandler`` instances and generate their files in 

34 parallel in subprocesses of the main process. 

35 """ 

36 

37 @classmethod 

38 def submit(cls, graph) -> Tuple[Tuple[AbstractFileHandler, Future]]: 

39 """ 

40 Submit each file in the ``FileGraph`` ``graph`` that is ready to be 

41 generated to the file generation ``ProcessPoolExecutor`` and generate 

42 them in parallel. Returns an iterable of ``Tuple[AbstractFileHandler, 

43 Future]`` instances matching the ``AbstractFileHandler`` instance that 

44 is being generated to a ``Future`` that will either return the same 

45 successfully-generated ``AbstractFileHandler`` instance or raise any 

46 exceptions occuring during generation when its ``result`` method is 

47 called. *An attempt will be made to generate all files in the graph, 

48 so downselect accordingly.* 

49 """ 

50 executor = cls.gen_manager() 

51 return tuple((fh, executor.submit(fh.generate)) 

52 for fh in graph.values()) 

53 

54 _GEN_MANAGER = [] 

55 

56 @classmethod 

57 def gen_manager(cls): 

58 """ 

59 Get a file generation manager (following the 

60 ``concurrent.futures.Executor`` interface). This base implementation 

61 uses ``ProcessPoolExecutor`` to generate files in parallel using 

62 multiple subprocesses. 

63 """ 

64 _LOCK.acquire() 

65 if not cls._GEN_MANAGER: 

66 LOGGER.debug("Starting MultiprocessingGraphExecutor pool...") 

67 pool = ProcessPoolExecutor(max_workers=MAX_WORKERS, 

68 initializer=label_worker_proc) 

69 atexit.register(pool.shutdown) 

70 cls._GEN_MANAGER.append(pool) 

71 _LOCK.release() 

72 return cls._GEN_MANAGER[0]