turbo_broccoli.parallel

Guarded parallel calls

Usage

It works in a way that is similar to joblib.Parallel, for example

from math import sqrt
import turbo_broccoli as tb

# Note the use of `tb.delayed` instead of `joblib.delayed`.
#          ↓
jobs = [tb.delayed(sqrt)(i**2) for i in range(5)]
executor = tb.Parallel("foo/bar.json", only_one_arg=True, n_jobs=2)
results = executor(jobs)

gives

{0: 0.0, 1: 1.0, 2: 2.0, 3: 3.0, 4: 4.0, 5: 5.0}

however, only the calls for which the corresponding entry in out/foo.json does not exist will actually be executed, the others will simply be loaded. Note that unlike joblib.Parallel, the result is a dict of arg/result, not just a list of results.

If the function in the jobs take more than one argument, simply drop the only_one_arg=True argument:

from math import sqrt
import turbo_broccoli as tb

f = lambda a, b: a * b

jobs = [tb.delayed(f)(i, j) for i in range(5) for j in range(5)]
executor = tb.Parallel("foo/bar.json", n_jobs=2)
results = executor(jobs)

gives

{(0, 0): 0, (0, 1): 0, ..., (4, 4): 16}

Notes & caveats

  • The result of executor(jobs) is a dict or a generator of key/value pairs.

  • The order of the results is guaranteed to be consistent with the order of the jobs.

  • The argument(s) in the jobs must be hashable. Furthermore, if a job has kwargs, a ValueError is raised.

  • Every job must have a different tuple of argument, or in other words, every job must be unique. So something like this is not acceptable:

    f = lambda a: 2 * a
    
    jobs = [tb.delayed(f)(i % 2) for i in range(5)]
    executor = tb.Parallel(...)
    results = executor(jobs)
    

    because f is in effect called multiple times with value 0. In particular, TurboBroccoli's Parallel is not suited for functions with no arguments (unless if they are executed only once but that kind of defeats the idea of parallelism).

  • Beyond the arguments documented in Parallel, joblib.Parallel arguments can be passed as kwargs to the constructor.

  • TurboBroccoli's Parallel honors the return_as argument of joblib.Parallel (which can be "list" or "generator"). However, the value return_as="generator_unordered" is not supported and will fall back to "generator" with a warning. Also, note that eventhough you might set return_as="list", the result will still be a dict.

  • Everytime a new result is obtained, it is immediately written to the output file. This means that if there are N jobs, the output file will be written to up to N times. If the results are accessed in quick succession (e.g. return_as="list" which is the default), this can slam the filesystem pretty hard.

  • If the output of the job's function are large, it might be inefficient to rewrite every past results the output file each time a new result is generated. Using embedded objects, can make writes faster.

    def f(i):
        ...
        return tb.EmbeddedDict({"result": something_big, ...})
    
    jobs = [tb.delayed(f)(i) for i in range(1000)]
    executor = tb.Parallel(...)
    results = executor(jobs)
    
  1"""
  2Guarded parallel calls
  3
  4## Usage
  5
  6It works in a way that is similar to
  7[`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html),
  8for example
  9
 10```py
 11from math import sqrt
 12import turbo_broccoli as tb
 13
 14# Note the use of `tb.delayed` instead of `joblib.delayed`.
 15#          ↓
 16jobs = [tb.delayed(sqrt)(i**2) for i in range(5)]
 17executor = tb.Parallel("foo/bar.json", only_one_arg=True, n_jobs=2)
 18results = executor(jobs)
 19```
 20
 21gives
 22
 23```py
 24{0: 0.0, 1: 1.0, 2: 2.0, 3: 3.0, 4: 4.0, 5: 5.0}
 25```
 26
 27however, only the calls for which the corresponding entry in `out/foo.json`
 28does not exist will actually be executed, the others will simply be loaded.
 29Note that unlike `joblib.Parallel`, the result is a `dict` of arg/result, not
 30just a `list` of results.
 31
 32If the function in the jobs take more than one argument, simply drop the
 33`only_one_arg=True` argument:
 34
 35```py
 36from math import sqrt
 37import turbo_broccoli as tb
 38
 39f = lambda a, b: a * b
 40
 41jobs = [tb.delayed(f)(i, j) for i in range(5) for j in range(5)]
 42executor = tb.Parallel("foo/bar.json", n_jobs=2)
 43results = executor(jobs)
 44```
 45
 46gives
 47
 48```py
 49{(0, 0): 0, (0, 1): 0, ..., (4, 4): 16}
 50```
 51
 52## Notes & caveats
 53
 54* The result of `executor(jobs)` is a dict or a generator of key/value pairs.
 55
 56* The order of the results is guaranteed to be consistent with the order of the
 57  jobs.
 58
 59* The argument(s) in the jobs must be hashable. Furthermore, if a job has
 60  kwargs, a `ValueError` is raised.
 61
 62* Every job must have a different tuple of argument, or in other words, every
 63  job must be unique. So something like this is not acceptable:
 64    ```py
 65    f = lambda a: 2 * a
 66
 67    jobs = [tb.delayed(f)(i % 2) for i in range(5)]
 68    executor = tb.Parallel(...)
 69    results = executor(jobs)
 70    ```
 71    because `f` is in effect called multiple times with value `0`. In
 72    particular, TurboBroccoli's `Parallel` is not suited for functions with no
 73    arguments (unless if they are executed only once but that kind of defeats
 74    the idea of parallelism).
 75
 76* Beyond the arguments documented in `Parallel`, [`joblib.Parallel`
 77  arguments](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)
 78  can be passed as kwargs to the constructor.
 79
 80* TurboBroccoli's `Parallel` honors the `return_as` argument of
 81  [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)
 82  (which can be `"list"` or `"generator"`). However, the value
 83  `return_as="generator_unordered"` is not supported and will fall back to
 84  `"generator"` with a warning. Also, note that eventhough you might set
 85  `return_as="list"`, the result will still be a dict.
 86
 87* Everytime a new result is obtained, it is immediately written to the output
 88  file. This means that if there are N jobs, the output file will be written to
 89  up to N times. If the results are accessed in quick succession (e.g.
 90  `return_as="list"` which is the default), this can slam the filesystem pretty
 91  hard.
 92
 93* If the output of the job's function are large, it might be inefficient to
 94  rewrite every past results the output file each time a new result is
 95  generated. Using [embedded
 96  objects](https://altaris.github.io/turbo-broccoli/turbo_broccoli/custom/embedded.html),
 97  can make writes faster.
 98
 99    ```py
100    def f(i):
101        ...
102        return tb.EmbeddedDict({"result": something_big, ...})
103
104    jobs = [tb.delayed(f)(i) for i in range(1000)]
105    executor = tb.Parallel(...)
106    results = executor(jobs)
107    ```
108"""
109
110from itertools import combinations
111from pathlib import Path
112from typing import Any, Callable, Generator, Iterable
113
114import joblib
115
116try:
117    from loguru import logger as logging
118except ModuleNotFoundError:
119    import logging  # type: ignore
120
121from .context import Context
122from .turbo_broccoli import load_json, save_json
123
124
125class _DelayedCall:
126
127    function: Callable
128    args: tuple[Any, ...]
129
130    def __call__(self, *args: Any, **kwds: Any) -> "_DelayedCall":
131        if kwds:
132            raise ValueError("Keyword arguments are not supported")
133        self.args = args
134        return self
135
136    def __init__(self, function: Callable) -> None:
137        self.function = function
138
139    def to_joblib_delayed(self) -> Callable:
140        """
141        Returns a `joblib.delayed` object that can be used with
142        `joblib.Parallel`.
143        """
144        return joblib.delayed(self.function)(*self.args)
145
146
147class Parallel:
148    """
149    Guarded analogue to
150    [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html).
151    See module documentation.
152    """
153
154    context: Context
155    executor: joblib.Parallel
156    lonly_one_arg: bool
157
158    def __init__(
159        self,
160        output_file: str | Path,
161        context: Context | None = None,
162        only_one_arg: bool = False,
163        **kwargs: Any,
164    ) -> None:
165        """
166        Args:
167            output_file (str | Path):
168            context (Context | None, optional):
169            only_one_arg (bool, optional): If `True`, assumes that every job
170                has exactly one argument. This produces more compact output
171                files.
172            kwargs (Any): Forwarded to
173                [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)
174        """
175        if kwargs.get("return_as") == "generator_unordered":
176            logging.warning(
177                "The option return_as='generator_unordered' is not supported. "
178                "Using 'generator' instead."
179            )
180            kwargs["return_as"] = "generator"
181        self.context = context or Context(output_file)
182        self.executor = joblib.Parallel(**kwargs)
183        self.only_one_arg = only_one_arg
184
185    def __call__(
186        self, jobs: Iterable[_DelayedCall]
187    ) -> dict | Generator[tuple[Any, Any], None, None]:
188        jobs = list(jobs)
189        self.sanity_check(jobs)
190        g = self._execute(jobs)
191        if self.executor.return_generator:
192            return g
193        return dict(g)
194
195    # pylint: disable=stop-iteration-return
196    def _execute(
197        self, jobs: Iterable[_DelayedCall]
198    ) -> Generator[tuple[Any, Any], None, None]:
199        """
200        Executes the jobs in parallel and yields the results. Saves to the
201        output file each time a new result (i.e. one that was not already
202        present in the output file) is obtained.
203
204        Args:
205            jobs (Iterable[_DelayedCall]): All the jobs, including those whose
206                results are already in the output file (and therefore shall not
207                be run again)
208        """
209
210        def _key(j: _DelayedCall) -> Any:
211            """What the key of a job should be in the result dict"""
212            return j.args[0] if self.only_one_arg else tuple(j.args)
213
214        job_status = {
215            _key(j): {"job": j, "done": False, "result": None} for j in jobs
216        }
217
218        # Check if some jobs already have their results in the output file
219        assert self.context.file_path is not None  # for typechecking
220        if self.context.file_path.exists():
221            results = load_json(self.context.file_path, self.context)
222            if not isinstance(results, dict):
223                raise RuntimeError(
224                    f"The contents of '{self.context.file_path}' is not a dict"
225                )
226            # Mark the jobs that are already done
227            for k, r in results.items():
228                if k in job_status:
229                    job_status[k]["done"], job_status[k]["result"] = True, r
230        else:
231            results = {}
232
233        new_results_it = iter(
234            self.executor(
235                d["job"].to_joblib_delayed()  # type: ignore
236                for d in job_status.values()
237                if not d["done"]
238            )
239        )
240        # This loops strongly assumes that `jobs`, `loaded_results`,
241        # `job_status`, and `new_results` are ordered in a consistent way
242        for k, s in job_status.items():
243            if s["done"]:
244                yield k, s["result"]
245            else:
246                results[k] = next(new_results_it)
247                save_json(results, self.context.file_path, self.context)
248                yield k, results[k]
249
250        # At this point, new_results should have been fully consumed
251        try:
252            next(new_results_it)
253            raise RuntimeError("The executor returned too many results")
254        except StopIteration:
255            pass
256
257    def sanity_check(self, jobs: list[_DelayedCall]) -> None:
258        """
259        Performs various sanity checks on a list of jobs.
260        """
261        if self.only_one_arg:
262            for i, j in enumerate(jobs):
263                if len(j.args) != 1:
264                    raise ValueError(
265                        f"The only_one_arg option is set to True but job {i} "
266                        f"has {len(j.args)} arguments: {j.args}"
267                    )
268        for i1, i2 in combinations(range(len(jobs)), 2):
269            if jobs[i1].args == jobs[i2].args:
270                raise ValueError(
271                    f"Jobs {i1} and {i2} have the same arguments: "
272                    f"{jobs[i1].args}"
273                )
274
275
276def delayed(function: Callable) -> Callable:
277    """Use this like `joblib.delayed`"""
278    return _DelayedCall(function)
class Parallel:
148class Parallel:
149    """
150    Guarded analogue to
151    [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html).
152    See module documentation.
153    """
154
155    context: Context
156    executor: joblib.Parallel
157    lonly_one_arg: bool
158
159    def __init__(
160        self,
161        output_file: str | Path,
162        context: Context | None = None,
163        only_one_arg: bool = False,
164        **kwargs: Any,
165    ) -> None:
166        """
167        Args:
168            output_file (str | Path):
169            context (Context | None, optional):
170            only_one_arg (bool, optional): If `True`, assumes that every job
171                has exactly one argument. This produces more compact output
172                files.
173            kwargs (Any): Forwarded to
174                [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)
175        """
176        if kwargs.get("return_as") == "generator_unordered":
177            logging.warning(
178                "The option return_as='generator_unordered' is not supported. "
179                "Using 'generator' instead."
180            )
181            kwargs["return_as"] = "generator"
182        self.context = context or Context(output_file)
183        self.executor = joblib.Parallel(**kwargs)
184        self.only_one_arg = only_one_arg
185
186    def __call__(
187        self, jobs: Iterable[_DelayedCall]
188    ) -> dict | Generator[tuple[Any, Any], None, None]:
189        jobs = list(jobs)
190        self.sanity_check(jobs)
191        g = self._execute(jobs)
192        if self.executor.return_generator:
193            return g
194        return dict(g)
195
196    # pylint: disable=stop-iteration-return
197    def _execute(
198        self, jobs: Iterable[_DelayedCall]
199    ) -> Generator[tuple[Any, Any], None, None]:
200        """
201        Executes the jobs in parallel and yields the results. Saves to the
202        output file each time a new result (i.e. one that was not already
203        present in the output file) is obtained.
204
205        Args:
206            jobs (Iterable[_DelayedCall]): All the jobs, including those whose
207                results are already in the output file (and therefore shall not
208                be run again)
209        """
210
211        def _key(j: _DelayedCall) -> Any:
212            """What the key of a job should be in the result dict"""
213            return j.args[0] if self.only_one_arg else tuple(j.args)
214
215        job_status = {
216            _key(j): {"job": j, "done": False, "result": None} for j in jobs
217        }
218
219        # Check if some jobs already have their results in the output file
220        assert self.context.file_path is not None  # for typechecking
221        if self.context.file_path.exists():
222            results = load_json(self.context.file_path, self.context)
223            if not isinstance(results, dict):
224                raise RuntimeError(
225                    f"The contents of '{self.context.file_path}' is not a dict"
226                )
227            # Mark the jobs that are already done
228            for k, r in results.items():
229                if k in job_status:
230                    job_status[k]["done"], job_status[k]["result"] = True, r
231        else:
232            results = {}
233
234        new_results_it = iter(
235            self.executor(
236                d["job"].to_joblib_delayed()  # type: ignore
237                for d in job_status.values()
238                if not d["done"]
239            )
240        )
241        # This loops strongly assumes that `jobs`, `loaded_results`,
242        # `job_status`, and `new_results` are ordered in a consistent way
243        for k, s in job_status.items():
244            if s["done"]:
245                yield k, s["result"]
246            else:
247                results[k] = next(new_results_it)
248                save_json(results, self.context.file_path, self.context)
249                yield k, results[k]
250
251        # At this point, new_results should have been fully consumed
252        try:
253            next(new_results_it)
254            raise RuntimeError("The executor returned too many results")
255        except StopIteration:
256            pass
257
258    def sanity_check(self, jobs: list[_DelayedCall]) -> None:
259        """
260        Performs various sanity checks on a list of jobs.
261        """
262        if self.only_one_arg:
263            for i, j in enumerate(jobs):
264                if len(j.args) != 1:
265                    raise ValueError(
266                        f"The only_one_arg option is set to True but job {i} "
267                        f"has {len(j.args)} arguments: {j.args}"
268                    )
269        for i1, i2 in combinations(range(len(jobs)), 2):
270            if jobs[i1].args == jobs[i2].args:
271                raise ValueError(
272                    f"Jobs {i1} and {i2} have the same arguments: "
273                    f"{jobs[i1].args}"
274                )

Guarded analogue to joblib.Parallel. See module documentation.

Parallel( output_file: str | pathlib.Path, context: turbo_broccoli.context.Context | None = None, only_one_arg: bool = False, **kwargs: Any)
159    def __init__(
160        self,
161        output_file: str | Path,
162        context: Context | None = None,
163        only_one_arg: bool = False,
164        **kwargs: Any,
165    ) -> None:
166        """
167        Args:
168            output_file (str | Path):
169            context (Context | None, optional):
170            only_one_arg (bool, optional): If `True`, assumes that every job
171                has exactly one argument. This produces more compact output
172                files.
173            kwargs (Any): Forwarded to
174                [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)
175        """
176        if kwargs.get("return_as") == "generator_unordered":
177            logging.warning(
178                "The option return_as='generator_unordered' is not supported. "
179                "Using 'generator' instead."
180            )
181            kwargs["return_as"] = "generator"
182        self.context = context or Context(output_file)
183        self.executor = joblib.Parallel(**kwargs)
184        self.only_one_arg = only_one_arg

Args: output_file (str | Path): context (Context | None, optional): only_one_arg (bool, optional): If True, assumes that every job has exactly one argument. This produces more compact output files. kwargs (Any): Forwarded to joblib.Parallel

executor: joblib.parallel.Parallel
lonly_one_arg: bool
only_one_arg
def sanity_check(self, jobs: list[turbo_broccoli.parallel._DelayedCall]) -> None:
258    def sanity_check(self, jobs: list[_DelayedCall]) -> None:
259        """
260        Performs various sanity checks on a list of jobs.
261        """
262        if self.only_one_arg:
263            for i, j in enumerate(jobs):
264                if len(j.args) != 1:
265                    raise ValueError(
266                        f"The only_one_arg option is set to True but job {i} "
267                        f"has {len(j.args)} arguments: {j.args}"
268                    )
269        for i1, i2 in combinations(range(len(jobs)), 2):
270            if jobs[i1].args == jobs[i2].args:
271                raise ValueError(
272                    f"Jobs {i1} and {i2} have the same arguments: "
273                    f"{jobs[i1].args}"
274                )

Performs various sanity checks on a list of jobs.

def delayed(function: Callable) -> Callable:
277def delayed(function: Callable) -> Callable:
278    """Use this like `joblib.delayed`"""
279    return _DelayedCall(function)

Use this like joblib.delayed