Source code for catalyst.utils.parallel
# description :Multiprocessing and tqdm wrapper for easy paralelizing
# author :Vsevolod Poletaev
# author_email :poletaev.va@gmail.com
# date :20190822
# version :19.08.7
# ==============================================================================
from typing import List, TypeVar, Union # isort:skip
from multiprocessing.pool import Pool
from tqdm import tqdm
T = TypeVar("T")
[docs]class DumbPool:
[docs] def imap_unordered(self, func, args):
return map(func, args)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
return self
[docs]def parallel_imap(
func,
args,
pool: Union[Pool, DumbPool],
) -> List[T]:
result = list(pool.imap_unordered(func, args))
return result
[docs]def tqdm_parallel_imap(
func,
args,
pool: Union[Pool, DumbPool],
total: int = None,
pbar=tqdm,
) -> List[T]:
if total is None and hasattr(args, "__len__"):
total = len(args)
if pbar is None:
result = parallel_imap(func, args, pool)
else:
result = list(pbar(pool.imap_unordered(func, args), total=total))
return result
[docs]def get_pool(workers: int) -> Union[Pool, DumbPool]:
pool = Pool(workers) if workers > 0 and workers is not None else DumbPool()
return pool