turbo_broccoli.parallel
Guarded parallel calls
Usage
It works in a way that is similar to
joblib.Parallel
,
for example
from math import sqrt
import turbo_broccoli as tb
# Note the use of `tb.delayed` instead of `joblib.delayed`.
# ↓
jobs = [tb.delayed(sqrt)(i**2) for i in range(5)]
executor = tb.Parallel("foo/bar.json", only_one_arg=True, n_jobs=2)
results = executor(jobs)
gives
{0: 0.0, 1: 1.0, 2: 2.0, 3: 3.0, 4: 4.0, 5: 5.0}
however, only the calls for which the corresponding entry in out/foo.json
does not exist will actually be executed, the others will simply be loaded.
Note that unlike joblib.Parallel
, the result is a dict
of arg/result, not
just a list
of results.
If the function in the jobs take more than one argument, simply drop the
only_one_arg=True
argument:
from math import sqrt
import turbo_broccoli as tb
f = lambda a, b: a * b
jobs = [tb.delayed(f)(i, j) for i in range(5) for j in range(5)]
executor = tb.Parallel("foo/bar.json", n_jobs=2)
results = executor(jobs)
gives
{(0, 0): 0, (0, 1): 0, ..., (4, 4): 16}
Notes & caveats
The result of
executor(jobs)
is a dict or a generator of key/value pairs.The order of the results is guaranteed to be consistent with the order of the jobs.
The argument(s) in the jobs must be hashable. Furthermore, if a job has kwargs, a
ValueError
is raised.Every job must have a different tuple of argument, or in other words, every job must be unique. So something like this is not acceptable:
f = lambda a: 2 * a jobs = [tb.delayed(f)(i % 2) for i in range(5)] executor = tb.Parallel(...) results = executor(jobs)
because
f
is in effect called multiple times with value0
. In particular, TurboBroccoli'sParallel
is not suited for functions with no arguments (unless if they are executed only once but that kind of defeats the idea of parallelism).Beyond the arguments documented in
Parallel
,joblib.Parallel
arguments can be passed as kwargs to the constructor.TurboBroccoli's
Parallel
honors thereturn_as
argument ofjoblib.Parallel
(which can be"list"
or"generator"
). However, the valuereturn_as="generator_unordered"
is not supported and will fall back to"generator"
with a warning. Also, note that eventhough you might setreturn_as="list"
, the result will still be a dict.Everytime a new result is obtained, it is immediately written to the output file. This means that if there are N jobs, the output file will be written to up to N times. If the results are accessed in quick succession (e.g.
return_as="list"
which is the default), this can slam the filesystem pretty hard.If the output of the job's function are large, it might be inefficient to rewrite every past results the output file each time a new result is generated. Using embedded objects, can make writes faster.
def f(i): ... return tb.EmbeddedDict({"result": something_big, ...}) jobs = [tb.delayed(f)(i) for i in range(1000)] executor = tb.Parallel(...) results = executor(jobs)
1""" 2Guarded parallel calls 3 4## Usage 5 6It works in a way that is similar to 7[`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html), 8for example 9 10```py 11from math import sqrt 12import turbo_broccoli as tb 13 14# Note the use of `tb.delayed` instead of `joblib.delayed`. 15# ↓ 16jobs = [tb.delayed(sqrt)(i**2) for i in range(5)] 17executor = tb.Parallel("foo/bar.json", only_one_arg=True, n_jobs=2) 18results = executor(jobs) 19``` 20 21gives 22 23```py 24{0: 0.0, 1: 1.0, 2: 2.0, 3: 3.0, 4: 4.0, 5: 5.0} 25``` 26 27however, only the calls for which the corresponding entry in `out/foo.json` 28does not exist will actually be executed, the others will simply be loaded. 29Note that unlike `joblib.Parallel`, the result is a `dict` of arg/result, not 30just a `list` of results. 31 32If the function in the jobs take more than one argument, simply drop the 33`only_one_arg=True` argument: 34 35```py 36from math import sqrt 37import turbo_broccoli as tb 38 39f = lambda a, b: a * b 40 41jobs = [tb.delayed(f)(i, j) for i in range(5) for j in range(5)] 42executor = tb.Parallel("foo/bar.json", n_jobs=2) 43results = executor(jobs) 44``` 45 46gives 47 48```py 49{(0, 0): 0, (0, 1): 0, ..., (4, 4): 16} 50``` 51 52## Notes & caveats 53 54* The result of `executor(jobs)` is a dict or a generator of key/value pairs. 55 56* The order of the results is guaranteed to be consistent with the order of the 57 jobs. 58 59* The argument(s) in the jobs must be hashable. Furthermore, if a job has 60 kwargs, a `ValueError` is raised. 61 62* Every job must have a different tuple of argument, or in other words, every 63 job must be unique. So something like this is not acceptable: 64 ```py 65 f = lambda a: 2 * a 66 67 jobs = [tb.delayed(f)(i % 2) for i in range(5)] 68 executor = tb.Parallel(...) 69 results = executor(jobs) 70 ``` 71 because `f` is in effect called multiple times with value `0`. In 72 particular, TurboBroccoli's `Parallel` is not suited for functions with no 73 arguments (unless if they are executed only once but that kind of defeats 74 the idea of parallelism). 75 76* Beyond the arguments documented in `Parallel`, [`joblib.Parallel` 77 arguments](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html) 78 can be passed as kwargs to the constructor. 79 80* TurboBroccoli's `Parallel` honors the `return_as` argument of 81 [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html) 82 (which can be `"list"` or `"generator"`). However, the value 83 `return_as="generator_unordered"` is not supported and will fall back to 84 `"generator"` with a warning. Also, note that eventhough you might set 85 `return_as="list"`, the result will still be a dict. 86 87* Everytime a new result is obtained, it is immediately written to the output 88 file. This means that if there are N jobs, the output file will be written to 89 up to N times. If the results are accessed in quick succession (e.g. 90 `return_as="list"` which is the default), this can slam the filesystem pretty 91 hard. 92 93* If the output of the job's function are large, it might be inefficient to 94 rewrite every past results the output file each time a new result is 95 generated. Using [embedded 96 objects](https://altaris.github.io/turbo-broccoli/turbo_broccoli/custom/embedded.html), 97 can make writes faster. 98 99 ```py 100 def f(i): 101 ... 102 return tb.EmbeddedDict({"result": something_big, ...}) 103 104 jobs = [tb.delayed(f)(i) for i in range(1000)] 105 executor = tb.Parallel(...) 106 results = executor(jobs) 107 ``` 108""" 109 110from itertools import combinations 111from pathlib import Path 112from typing import Any, Callable, Generator, Iterable 113 114import joblib 115 116try: 117 from loguru import logger as logging 118except ModuleNotFoundError: 119 import logging # type: ignore 120 121from .context import Context 122from .turbo_broccoli import load_json, save_json 123 124 125class _DelayedCall: 126 127 function: Callable 128 args: tuple[Any, ...] 129 130 def __call__(self, *args: Any, **kwds: Any) -> "_DelayedCall": 131 if kwds: 132 raise ValueError("Keyword arguments are not supported") 133 self.args = args 134 return self 135 136 def __init__(self, function: Callable) -> None: 137 self.function = function 138 139 def to_joblib_delayed(self) -> Callable: 140 """ 141 Returns a `joblib.delayed` object that can be used with 142 `joblib.Parallel`. 143 """ 144 return joblib.delayed(self.function)(*self.args) 145 146 147class Parallel: 148 """ 149 Guarded analogue to 150 [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html). 151 See module documentation. 152 """ 153 154 context: Context 155 executor: joblib.Parallel 156 lonly_one_arg: bool 157 158 def __init__( 159 self, 160 output_file: str | Path, 161 context: Context | None = None, 162 only_one_arg: bool = False, 163 **kwargs: Any, 164 ) -> None: 165 """ 166 Args: 167 output_file (str | Path): 168 context (Context | None, optional): 169 only_one_arg (bool, optional): If `True`, assumes that every job 170 has exactly one argument. This produces more compact output 171 files. 172 kwargs (Any): Forwarded to 173 [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html) 174 """ 175 if kwargs.get("return_as") == "generator_unordered": 176 logging.warning( 177 "The option return_as='generator_unordered' is not supported. " 178 "Using 'generator' instead." 179 ) 180 kwargs["return_as"] = "generator" 181 self.context = context or Context(output_file) 182 self.executor = joblib.Parallel(**kwargs) 183 self.only_one_arg = only_one_arg 184 185 def __call__( 186 self, jobs: Iterable[_DelayedCall] 187 ) -> dict | Generator[tuple[Any, Any], None, None]: 188 jobs = list(jobs) 189 self.sanity_check(jobs) 190 g = self._execute(jobs) 191 if self.executor.return_generator: 192 return g 193 return dict(g) 194 195 # pylint: disable=stop-iteration-return 196 def _execute( 197 self, jobs: Iterable[_DelayedCall] 198 ) -> Generator[tuple[Any, Any], None, None]: 199 """ 200 Executes the jobs in parallel and yields the results. Saves to the 201 output file each time a new result (i.e. one that was not already 202 present in the output file) is obtained. 203 204 Args: 205 jobs (Iterable[_DelayedCall]): All the jobs, including those whose 206 results are already in the output file (and therefore shall not 207 be run again) 208 """ 209 210 def _key(j: _DelayedCall) -> Any: 211 """What the key of a job should be in the result dict""" 212 return j.args[0] if self.only_one_arg else tuple(j.args) 213 214 job_status = { 215 _key(j): {"job": j, "done": False, "result": None} for j in jobs 216 } 217 218 # Check if some jobs already have their results in the output file 219 assert self.context.file_path is not None # for typechecking 220 if self.context.file_path.exists(): 221 results = load_json(self.context.file_path, self.context) 222 if not isinstance(results, dict): 223 raise RuntimeError( 224 f"The contents of '{self.context.file_path}' is not a dict" 225 ) 226 # Mark the jobs that are already done 227 for k, r in results.items(): 228 if k in job_status: 229 job_status[k]["done"], job_status[k]["result"] = True, r 230 else: 231 results = {} 232 233 new_results_it = iter( 234 self.executor( 235 d["job"].to_joblib_delayed() # type: ignore 236 for d in job_status.values() 237 if not d["done"] 238 ) 239 ) 240 # This loops strongly assumes that `jobs`, `loaded_results`, 241 # `job_status`, and `new_results` are ordered in a consistent way 242 for k, s in job_status.items(): 243 if s["done"]: 244 yield k, s["result"] 245 else: 246 results[k] = next(new_results_it) 247 save_json(results, self.context.file_path, self.context) 248 yield k, results[k] 249 250 # At this point, new_results should have been fully consumed 251 try: 252 next(new_results_it) 253 raise RuntimeError("The executor returned too many results") 254 except StopIteration: 255 pass 256 257 def sanity_check(self, jobs: list[_DelayedCall]) -> None: 258 """ 259 Performs various sanity checks on a list of jobs. 260 """ 261 if self.only_one_arg: 262 for i, j in enumerate(jobs): 263 if len(j.args) != 1: 264 raise ValueError( 265 f"The only_one_arg option is set to True but job {i} " 266 f"has {len(j.args)} arguments: {j.args}" 267 ) 268 for i1, i2 in combinations(range(len(jobs)), 2): 269 if jobs[i1].args == jobs[i2].args: 270 raise ValueError( 271 f"Jobs {i1} and {i2} have the same arguments: " 272 f"{jobs[i1].args}" 273 ) 274 275 276def delayed(function: Callable) -> Callable: 277 """Use this like `joblib.delayed`""" 278 return _DelayedCall(function)
148class Parallel: 149 """ 150 Guarded analogue to 151 [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html). 152 See module documentation. 153 """ 154 155 context: Context 156 executor: joblib.Parallel 157 lonly_one_arg: bool 158 159 def __init__( 160 self, 161 output_file: str | Path, 162 context: Context | None = None, 163 only_one_arg: bool = False, 164 **kwargs: Any, 165 ) -> None: 166 """ 167 Args: 168 output_file (str | Path): 169 context (Context | None, optional): 170 only_one_arg (bool, optional): If `True`, assumes that every job 171 has exactly one argument. This produces more compact output 172 files. 173 kwargs (Any): Forwarded to 174 [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html) 175 """ 176 if kwargs.get("return_as") == "generator_unordered": 177 logging.warning( 178 "The option return_as='generator_unordered' is not supported. " 179 "Using 'generator' instead." 180 ) 181 kwargs["return_as"] = "generator" 182 self.context = context or Context(output_file) 183 self.executor = joblib.Parallel(**kwargs) 184 self.only_one_arg = only_one_arg 185 186 def __call__( 187 self, jobs: Iterable[_DelayedCall] 188 ) -> dict | Generator[tuple[Any, Any], None, None]: 189 jobs = list(jobs) 190 self.sanity_check(jobs) 191 g = self._execute(jobs) 192 if self.executor.return_generator: 193 return g 194 return dict(g) 195 196 # pylint: disable=stop-iteration-return 197 def _execute( 198 self, jobs: Iterable[_DelayedCall] 199 ) -> Generator[tuple[Any, Any], None, None]: 200 """ 201 Executes the jobs in parallel and yields the results. Saves to the 202 output file each time a new result (i.e. one that was not already 203 present in the output file) is obtained. 204 205 Args: 206 jobs (Iterable[_DelayedCall]): All the jobs, including those whose 207 results are already in the output file (and therefore shall not 208 be run again) 209 """ 210 211 def _key(j: _DelayedCall) -> Any: 212 """What the key of a job should be in the result dict""" 213 return j.args[0] if self.only_one_arg else tuple(j.args) 214 215 job_status = { 216 _key(j): {"job": j, "done": False, "result": None} for j in jobs 217 } 218 219 # Check if some jobs already have their results in the output file 220 assert self.context.file_path is not None # for typechecking 221 if self.context.file_path.exists(): 222 results = load_json(self.context.file_path, self.context) 223 if not isinstance(results, dict): 224 raise RuntimeError( 225 f"The contents of '{self.context.file_path}' is not a dict" 226 ) 227 # Mark the jobs that are already done 228 for k, r in results.items(): 229 if k in job_status: 230 job_status[k]["done"], job_status[k]["result"] = True, r 231 else: 232 results = {} 233 234 new_results_it = iter( 235 self.executor( 236 d["job"].to_joblib_delayed() # type: ignore 237 for d in job_status.values() 238 if not d["done"] 239 ) 240 ) 241 # This loops strongly assumes that `jobs`, `loaded_results`, 242 # `job_status`, and `new_results` are ordered in a consistent way 243 for k, s in job_status.items(): 244 if s["done"]: 245 yield k, s["result"] 246 else: 247 results[k] = next(new_results_it) 248 save_json(results, self.context.file_path, self.context) 249 yield k, results[k] 250 251 # At this point, new_results should have been fully consumed 252 try: 253 next(new_results_it) 254 raise RuntimeError("The executor returned too many results") 255 except StopIteration: 256 pass 257 258 def sanity_check(self, jobs: list[_DelayedCall]) -> None: 259 """ 260 Performs various sanity checks on a list of jobs. 261 """ 262 if self.only_one_arg: 263 for i, j in enumerate(jobs): 264 if len(j.args) != 1: 265 raise ValueError( 266 f"The only_one_arg option is set to True but job {i} " 267 f"has {len(j.args)} arguments: {j.args}" 268 ) 269 for i1, i2 in combinations(range(len(jobs)), 2): 270 if jobs[i1].args == jobs[i2].args: 271 raise ValueError( 272 f"Jobs {i1} and {i2} have the same arguments: " 273 f"{jobs[i1].args}" 274 )
Guarded analogue to
joblib.Parallel
.
See module documentation.
159 def __init__( 160 self, 161 output_file: str | Path, 162 context: Context | None = None, 163 only_one_arg: bool = False, 164 **kwargs: Any, 165 ) -> None: 166 """ 167 Args: 168 output_file (str | Path): 169 context (Context | None, optional): 170 only_one_arg (bool, optional): If `True`, assumes that every job 171 has exactly one argument. This produces more compact output 172 files. 173 kwargs (Any): Forwarded to 174 [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html) 175 """ 176 if kwargs.get("return_as") == "generator_unordered": 177 logging.warning( 178 "The option return_as='generator_unordered' is not supported. " 179 "Using 'generator' instead." 180 ) 181 kwargs["return_as"] = "generator" 182 self.context = context or Context(output_file) 183 self.executor = joblib.Parallel(**kwargs) 184 self.only_one_arg = only_one_arg
Args:
output_file (str | Path):
context (Context | None, optional):
only_one_arg (bool, optional): If True
, assumes that every job
has exactly one argument. This produces more compact output
files.
kwargs (Any): Forwarded to
joblib.Parallel
258 def sanity_check(self, jobs: list[_DelayedCall]) -> None: 259 """ 260 Performs various sanity checks on a list of jobs. 261 """ 262 if self.only_one_arg: 263 for i, j in enumerate(jobs): 264 if len(j.args) != 1: 265 raise ValueError( 266 f"The only_one_arg option is set to True but job {i} " 267 f"has {len(j.args)} arguments: {j.args}" 268 ) 269 for i1, i2 in combinations(range(len(jobs)), 2): 270 if jobs[i1].args == jobs[i2].args: 271 raise ValueError( 272 f"Jobs {i1} and {i2} have the same arguments: " 273 f"{jobs[i1].args}" 274 )
Performs various sanity checks on a list of jobs.
277def delayed(function: Callable) -> Callable: 278 """Use this like `joblib.delayed`""" 279 return _DelayedCall(function)
Use this like joblib.delayed