turbo_broccoli.custom.pandas
pandas (de)serialization utilities.
1"""pandas (de)serialization utilities.""" 2 3import json 4from io import StringIO 5from typing import Any, Callable, Tuple 6 7import pandas as pd 8 9from turbo_broccoli.context import Context 10from turbo_broccoli.exceptions import DeserializationError, TypeNotSupported 11 12 13def _dataframe_to_json(df: pd.DataFrame, ctx: Context) -> dict: 14 dtypes = [[str(k), v.name] for k, v in df.dtypes.items()] 15 if df.memory_usage(deep=True).sum() <= ctx.min_artifact_size: 16 return { 17 "__type__": "pandas.dataframe", 18 "__version__": 2, 19 "data": json.loads(df.to_json(date_format="iso", date_unit="ns")), 20 "dtypes": dtypes, 21 } 22 fmt = ctx.pandas_format 23 path, name = ctx.new_artifact_path() 24 getattr(df, f"to_{fmt}")(path, **ctx.pandas_kwargs) 25 return { 26 "__type__": "pandas.dataframe", 27 "__version__": 2, 28 "dtypes": dtypes, 29 "id": name, 30 "format": fmt, 31 } 32 33 34def _json_to_dataframe(dct: dict, ctx: Context) -> pd.DataFrame: 35 decoders = { 36 2: _json_to_dataframe_v2, 37 } 38 return decoders[dct["__version__"]](dct, ctx) 39 40 41def _json_to_dataframe_v2(dct: dict, ctx: Context) -> pd.DataFrame: 42 if "data" in dct: 43 df = pd.read_json(StringIO(json.dumps(dct["data"]))) 44 else: 45 fmt = dct["format"] 46 path = ctx.id_to_artifact_path(dct["id"]) 47 if fmt in ["h5", "hdf"]: 48 df = pd.read_hdf(path, "main") 49 else: 50 df = getattr(pd, f"read_{fmt}")(path) 51 # Rename columns with non-string names 52 # df.rename({str(d[0]): d[0] for d in dct["dtypes"]}, inplace=True) 53 df = df.astype( 54 { 55 str(a): b 56 for a, b in dct["dtypes"] 57 if not str(b).startswith("datetime") 58 } 59 ) 60 for a, _ in filter(lambda x: x[1].startswith("datetime"), dct["dtypes"]): 61 df[a] = pd.to_datetime(df[a]).dt.tz_localize(None) 62 return df 63 64 65def _json_to_series(dct: dict, ctx: Context) -> pd.Series: 66 ctx.raise_if_nodecode("pandas.dataframe") 67 decoders = { 68 2: _json_to_series_v2, 69 } 70 return decoders[dct["__version__"]](dct, ctx) 71 72 73def _json_to_series_v2(dct: dict, ctx: Context) -> pd.Series: 74 return dct["data"][dct["name"]] 75 76 77def _series_to_json(ser: pd.Series, ctx: Context) -> dict: 78 name = ser.name if ser.name is not None else "main" 79 return { 80 "__type__": "pandas.series", 81 "__version__": 2, 82 "data": ser.to_frame(name=name), 83 "name": name, 84 } 85 86 87# pylint: disable=missing-function-docstring 88def from_json(dct: dict, ctx: Context) -> Any: 89 decoders = { 90 "pandas.dataframe": _json_to_dataframe, 91 "pandas.series": _json_to_series, 92 } 93 try: 94 type_name = dct["__type__"] 95 return decoders[type_name](dct, ctx) 96 except KeyError as exc: 97 raise DeserializationError() from exc 98 99 100def to_json(obj: Any, ctx: Context) -> dict: 101 """ 102 Serializes a pandas object into JSON by cases. See the README for the 103 precise list of supported types. The return dict has the following 104 structure: 105 106 - `pandas.DataFrame`: A dataframe is processed differently depending on its 107 size and on the `TB_MAX_NBYTES` environment variable. If the dataframe is 108 small, i.e. at most `TB_MAX_NBYTES` bytes, then it is directly stored in 109 the resulting JSON document as 110 111 ```py 112 { 113 "__type__": "pandas.dataframe", 114 "__version__": 2, 115 "data": {...}, 116 "dtypes": [ 117 [col1, dtype1], 118 [col2, dtype2], 119 ... 120 ], 121 } 122 ``` 123 124 where `{...}` is the result of `pandas.DataFrame.to_json` (in `dict` 125 form). On the other hand, the dataframe is too large, then its content is 126 stored in an artifact, whose format follows the `TB_PANDAS_FORMAT` 127 environment (CSV by default). The resulting JSON document looks like 128 129 ```py 130 { 131 "__type__": "pandas.dataframe", 132 "__version__": 2, 133 "dtypes": [ 134 [col1, dtype1], 135 [col2, dtype2], 136 ... 137 ], 138 "id": <UUID4 str>, 139 "format": <str> 140 } 141 ``` 142 143 - `pandas.Series`: A series will be converted to a dataframe before being 144 serialized. The final document will look like this 145 146 ```py 147 { 148 "__type__": "pandas.series", 149 "__version__": 2, 150 "data": {...}, 151 "name": <str>, 152 } 153 ``` 154 155 where `{...}` is the document of the dataframe'd series, see above. 156 157 Warning: 158 Series and column names must be strings! 159 160 """ 161 encoders: list[Tuple[type, Callable[[Any, Context], dict]]] = [ 162 (pd.DataFrame, _dataframe_to_json), 163 (pd.Series, _series_to_json), 164 ] 165 for t, f in encoders: 166 if isinstance(obj, t): 167 return f(obj, ctx) 168 raise TypeNotSupported()
101def to_json(obj: Any, ctx: Context) -> dict: 102 """ 103 Serializes a pandas object into JSON by cases. See the README for the 104 precise list of supported types. The return dict has the following 105 structure: 106 107 - `pandas.DataFrame`: A dataframe is processed differently depending on its 108 size and on the `TB_MAX_NBYTES` environment variable. If the dataframe is 109 small, i.e. at most `TB_MAX_NBYTES` bytes, then it is directly stored in 110 the resulting JSON document as 111 112 ```py 113 { 114 "__type__": "pandas.dataframe", 115 "__version__": 2, 116 "data": {...}, 117 "dtypes": [ 118 [col1, dtype1], 119 [col2, dtype2], 120 ... 121 ], 122 } 123 ``` 124 125 where `{...}` is the result of `pandas.DataFrame.to_json` (in `dict` 126 form). On the other hand, the dataframe is too large, then its content is 127 stored in an artifact, whose format follows the `TB_PANDAS_FORMAT` 128 environment (CSV by default). The resulting JSON document looks like 129 130 ```py 131 { 132 "__type__": "pandas.dataframe", 133 "__version__": 2, 134 "dtypes": [ 135 [col1, dtype1], 136 [col2, dtype2], 137 ... 138 ], 139 "id": <UUID4 str>, 140 "format": <str> 141 } 142 ``` 143 144 - `pandas.Series`: A series will be converted to a dataframe before being 145 serialized. The final document will look like this 146 147 ```py 148 { 149 "__type__": "pandas.series", 150 "__version__": 2, 151 "data": {...}, 152 "name": <str>, 153 } 154 ``` 155 156 where `{...}` is the document of the dataframe'd series, see above. 157 158 Warning: 159 Series and column names must be strings! 160 161 """ 162 encoders: list[Tuple[type, Callable[[Any, Context], dict]]] = [ 163 (pd.DataFrame, _dataframe_to_json), 164 (pd.Series, _series_to_json), 165 ] 166 for t, f in encoders: 167 if isinstance(obj, t): 168 return f(obj, ctx) 169 raise TypeNotSupported()
Serializes a pandas object into JSON by cases. See the README for the precise list of supported types. The return dict has the following structure:
pandas.DataFrame
: A dataframe is processed differently depending on its size and on theTB_MAX_NBYTES
environment variable. If the dataframe is small, i.e. at mostTB_MAX_NBYTES
bytes, then it is directly stored in the resulting JSON document as{ "__type__": "pandas.dataframe", "__version__": 2, "data": {...}, "dtypes": [ [col1, dtype1], [col2, dtype2], ... ], }
where
{...}
is the result ofpandas.DataFrame.to_json
(indict
form). On the other hand, the dataframe is too large, then its content is stored in an artifact, whose format follows theTB_PANDAS_FORMAT
environment (CSV by default). The resulting JSON document looks like{ "__type__": "pandas.dataframe", "__version__": 2, "dtypes": [ [col1, dtype1], [col2, dtype2], ... ], "id": <UUID4 str>, "format": <str> }
pandas.Series
: A series will be converted to a dataframe before being serialized. The final document will look like this{ "__type__": "pandas.series", "__version__": 2, "data": {...}, "name": <str>, }
where
{...}
is the document of the dataframe'd series, see above.
Warning: Series and column names must be strings!