Source code for catalyst.contrib.utils.parallel
# description     :Multiprocessing and tqdm wrapper for easy paralelizing
# author          :Vsevolod Poletaev
# author_email    :poletaev.va@gmail.com
# date            :20190822
# version         :19.08.7
# ==============================================================================
from typing import List, TypeVar, Union
from multiprocessing.pool import Pool
from tqdm import tqdm
T = TypeVar("T")
class DumbPool:
    """@TODO: Docs. Contribution is welcome."""
    def imap_unordered(self, func, args):
        """@TODO: Docs. Contribution is welcome."""
        return map(func, args)
    def __enter__(self):
        """Enter the runtime context related to ``DumbPool`` object."""
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Exit the runtime context related to ``DumbPool`` object."""
        return self
[docs]def parallel_imap(func, args, pool: Union[Pool, DumbPool],) -> List[T]:
    """@TODO: Docs. Contribution is welcome."""
    result = list(pool.imap_unordered(func, args))
    return result 
[docs]def tqdm_parallel_imap(
    func, args, pool: Union[Pool, DumbPool], total: int = None, pbar=tqdm,
) -> List[T]:
    """@TODO: Docs. Contribution is welcome."""
    if total is None and hasattr(args, "__len__"):
        total = len(args)
    if pbar is None:
        result = parallel_imap(func, args, pool)
    else:
        result = list(pbar(pool.imap_unordered(func, args), total=total))
    return result 
[docs]def get_pool(workers: int) -> Union[Pool, DumbPool]:
    """@TODO: Docs. Contribution is welcome."""
    pool = Pool(workers) if workers > 0 and workers is not None else DumbPool()
    return pool 
__all__ = ["parallel_imap", "tqdm_parallel_imap", "get_pool"]