Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# (c) Stefan Countryman 2019
3"""
4Generate files locally.
5"""
7import atexit
8import logging
9from typing import Iterable, Tuple
10from concurrent.futures import ProcessPoolExecutor, Future
11from threading import Lock
12from llama.classes import AbstractFileHandler, optional_env_var
13from llama.utils import label_worker_proc
14from ..classes import GraphExecutor
16LOGGER = logging.getLogger(__name__)
17_LOCK = Lock()
19_DEFAULT_MAX_WORKERS = '4'
20MAX_WORKERS = int(optional_env_var(
21 ['LLAMA_IO_DEFAULT_GEN_MAX_WORKERS'],
22 f"""
23 Specify the maximum number of subprocesses to use for local file generation
24 using LLAMA's `default` file generation manager. If not specified, the
25 default of {_DEFAULT_MAX_WORKERS} subprocesses will be used.
26 """,
27 [_DEFAULT_MAX_WORKERS],
28)[0])
31class MultiprocessingGraphExecutor(GraphExecutor):
32 """
33 Submit iterables of ``AbstractFileHandler`` instances and generate their files in
34 parallel in subprocesses of the main process.
35 """
37 @classmethod
38 def submit(cls, graph) -> Tuple[Tuple[AbstractFileHandler, Future]]:
39 """
40 Submit each file in the ``FileGraph`` ``graph`` that is ready to be
41 generated to the file generation ``ProcessPoolExecutor`` and generate
42 them in parallel. Returns an iterable of ``Tuple[AbstractFileHandler,
43 Future]`` instances matching the ``AbstractFileHandler`` instance that
44 is being generated to a ``Future`` that will either return the same
45 successfully-generated ``AbstractFileHandler`` instance or raise any
46 exceptions occuring during generation when its ``result`` method is
47 called. *An attempt will be made to generate all files in the graph,
48 so downselect accordingly.*
49 """
50 executor = cls.gen_manager()
51 return tuple((fh, executor.submit(fh.generate))
52 for fh in graph.values())
54 _GEN_MANAGER = []
56 @classmethod
57 def gen_manager(cls):
58 """
59 Get a file generation manager (following the
60 ``concurrent.futures.Executor`` interface). This base implementation
61 uses ``ProcessPoolExecutor`` to generate files in parallel using
62 multiple subprocesses.
63 """
64 _LOCK.acquire()
65 if not cls._GEN_MANAGER:
66 LOGGER.debug("Starting MultiprocessingGraphExecutor pool...")
67 pool = ProcessPoolExecutor(max_workers=MAX_WORKERS,
68 initializer=label_worker_proc)
69 atexit.register(pool.shutdown)
70 cls._GEN_MANAGER.append(pool)
71 _LOCK.release()
72 return cls._GEN_MANAGER[0]