turbo_broccoli.custom.pandas

pandas (de)serialization utilities.

  1"""pandas (de)serialization utilities."""
  2
  3import json
  4from io import StringIO
  5from typing import Any, Callable, Tuple
  6
  7import pandas as pd
  8
  9from turbo_broccoli.context import Context
 10from turbo_broccoli.exceptions import DeserializationError, TypeNotSupported
 11
 12
 13def _dataframe_to_json(df: pd.DataFrame, ctx: Context) -> dict:
 14    dtypes = [[str(k), v.name] for k, v in df.dtypes.items()]
 15    if df.memory_usage(deep=True).sum() <= ctx.min_artifact_size:
 16        return {
 17            "__type__": "pandas.dataframe",
 18            "__version__": 2,
 19            "data": json.loads(df.to_json(date_format="iso", date_unit="ns")),
 20            "dtypes": dtypes,
 21        }
 22    fmt = ctx.pandas_format
 23    path, name = ctx.new_artifact_path()
 24    getattr(df, f"to_{fmt}")(path, **ctx.pandas_kwargs)
 25    return {
 26        "__type__": "pandas.dataframe",
 27        "__version__": 2,
 28        "dtypes": dtypes,
 29        "id": name,
 30        "format": fmt,
 31    }
 32
 33
 34def _json_to_dataframe(dct: dict, ctx: Context) -> pd.DataFrame:
 35    decoders = {
 36        2: _json_to_dataframe_v2,
 37    }
 38    return decoders[dct["__version__"]](dct, ctx)
 39
 40
 41def _json_to_dataframe_v2(dct: dict, ctx: Context) -> pd.DataFrame:
 42    if "data" in dct:
 43        df = pd.read_json(StringIO(json.dumps(dct["data"])))
 44    else:
 45        fmt = dct["format"]
 46        path = ctx.id_to_artifact_path(dct["id"])
 47        if fmt in ["h5", "hdf"]:
 48            df = pd.read_hdf(path, "main")
 49        else:
 50            df = getattr(pd, f"read_{fmt}")(path)
 51    # Rename columns with non-string names
 52    # df.rename({str(d[0]): d[0] for d in dct["dtypes"]}, inplace=True)
 53    df = df.astype(
 54        {
 55            str(a): b
 56            for a, b in dct["dtypes"]
 57            if not str(b).startswith("datetime")
 58        }
 59    )
 60    for a, _ in filter(lambda x: x[1].startswith("datetime"), dct["dtypes"]):
 61        df[a] = pd.to_datetime(df[a]).dt.tz_localize(None)
 62    return df
 63
 64
 65def _json_to_series(dct: dict, ctx: Context) -> pd.Series:
 66    ctx.raise_if_nodecode("pandas.dataframe")
 67    decoders = {
 68        2: _json_to_series_v2,
 69    }
 70    return decoders[dct["__version__"]](dct, ctx)
 71
 72
 73def _json_to_series_v2(dct: dict, ctx: Context) -> pd.Series:
 74    return dct["data"][dct["name"]]
 75
 76
 77def _series_to_json(ser: pd.Series, ctx: Context) -> dict:
 78    name = ser.name if ser.name is not None else "main"
 79    return {
 80        "__type__": "pandas.series",
 81        "__version__": 2,
 82        "data": ser.to_frame(name=name),
 83        "name": name,
 84    }
 85
 86
 87# pylint: disable=missing-function-docstring
 88def from_json(dct: dict, ctx: Context) -> Any:
 89    decoders = {
 90        "pandas.dataframe": _json_to_dataframe,
 91        "pandas.series": _json_to_series,
 92    }
 93    try:
 94        type_name = dct["__type__"]
 95        return decoders[type_name](dct, ctx)
 96    except KeyError as exc:
 97        raise DeserializationError() from exc
 98
 99
100def to_json(obj: Any, ctx: Context) -> dict:
101    """
102    Serializes a pandas object into JSON by cases. See the README for the
103    precise list of supported types. The return dict has the following
104    structure:
105
106    - `pandas.DataFrame`: A dataframe is processed differently depending on its
107      size and on the `TB_MAX_NBYTES` environment variable. If the dataframe is
108      small, i.e. at most `TB_MAX_NBYTES` bytes, then it is directly stored in
109      the resulting JSON document as
110
111        ```py
112        {
113            "__type__": "pandas.dataframe",
114            "__version__": 2,
115            "data": {...},
116            "dtypes": [
117                [col1, dtype1],
118                [col2, dtype2],
119                ...
120            ],
121        }
122        ```
123
124      where `{...}` is the result of `pandas.DataFrame.to_json` (in `dict`
125      form). On the other hand, the dataframe is too large, then its content is
126      stored in an artifact, whose format follows the `TB_PANDAS_FORMAT`
127      environment (CSV by default). The resulting JSON document looks like
128
129        ```py
130        {
131            "__type__": "pandas.dataframe",
132            "__version__": 2,
133            "dtypes": [
134                [col1, dtype1],
135                [col2, dtype2],
136                ...
137            ],
138            "id": <UUID4 str>,
139            "format": <str>
140        }
141        ```
142
143    - `pandas.Series`: A series will be converted to a dataframe before being
144      serialized. The final document will look like this
145
146        ```py
147        {
148            "__type__": "pandas.series",
149            "__version__": 2,
150            "data": {...},
151            "name": <str>,
152        }
153        ```
154
155      where `{...}` is the document of the dataframe'd series, see above.
156
157    Warning:
158        Series and column names must be strings!
159
160    """
161    encoders: list[Tuple[type, Callable[[Any, Context], dict]]] = [
162        (pd.DataFrame, _dataframe_to_json),
163        (pd.Series, _series_to_json),
164    ]
165    for t, f in encoders:
166        if isinstance(obj, t):
167            return f(obj, ctx)
168    raise TypeNotSupported()
def from_json(dct: dict, ctx: turbo_broccoli.context.Context) -> Any:
89def from_json(dct: dict, ctx: Context) -> Any:
90    decoders = {
91        "pandas.dataframe": _json_to_dataframe,
92        "pandas.series": _json_to_series,
93    }
94    try:
95        type_name = dct["__type__"]
96        return decoders[type_name](dct, ctx)
97    except KeyError as exc:
98        raise DeserializationError() from exc
def to_json(obj: Any, ctx: turbo_broccoli.context.Context) -> dict:
101def to_json(obj: Any, ctx: Context) -> dict:
102    """
103    Serializes a pandas object into JSON by cases. See the README for the
104    precise list of supported types. The return dict has the following
105    structure:
106
107    - `pandas.DataFrame`: A dataframe is processed differently depending on its
108      size and on the `TB_MAX_NBYTES` environment variable. If the dataframe is
109      small, i.e. at most `TB_MAX_NBYTES` bytes, then it is directly stored in
110      the resulting JSON document as
111
112        ```py
113        {
114            "__type__": "pandas.dataframe",
115            "__version__": 2,
116            "data": {...},
117            "dtypes": [
118                [col1, dtype1],
119                [col2, dtype2],
120                ...
121            ],
122        }
123        ```
124
125      where `{...}` is the result of `pandas.DataFrame.to_json` (in `dict`
126      form). On the other hand, the dataframe is too large, then its content is
127      stored in an artifact, whose format follows the `TB_PANDAS_FORMAT`
128      environment (CSV by default). The resulting JSON document looks like
129
130        ```py
131        {
132            "__type__": "pandas.dataframe",
133            "__version__": 2,
134            "dtypes": [
135                [col1, dtype1],
136                [col2, dtype2],
137                ...
138            ],
139            "id": <UUID4 str>,
140            "format": <str>
141        }
142        ```
143
144    - `pandas.Series`: A series will be converted to a dataframe before being
145      serialized. The final document will look like this
146
147        ```py
148        {
149            "__type__": "pandas.series",
150            "__version__": 2,
151            "data": {...},
152            "name": <str>,
153        }
154        ```
155
156      where `{...}` is the document of the dataframe'd series, see above.
157
158    Warning:
159        Series and column names must be strings!
160
161    """
162    encoders: list[Tuple[type, Callable[[Any, Context], dict]]] = [
163        (pd.DataFrame, _dataframe_to_json),
164        (pd.Series, _series_to_json),
165    ]
166    for t, f in encoders:
167        if isinstance(obj, t):
168            return f(obj, ctx)
169    raise TypeNotSupported()

Serializes a pandas object into JSON by cases. See the README for the precise list of supported types. The return dict has the following structure:

  • pandas.DataFrame: A dataframe is processed differently depending on its size and on the TB_MAX_NBYTES environment variable. If the dataframe is small, i.e. at most TB_MAX_NBYTES bytes, then it is directly stored in the resulting JSON document as

    {
        "__type__": "pandas.dataframe",
        "__version__": 2,
        "data": {...},
        "dtypes": [
            [col1, dtype1],
            [col2, dtype2],
            ...
        ],
    }
    

    where {...} is the result of pandas.DataFrame.to_json (in dict form). On the other hand, the dataframe is too large, then its content is stored in an artifact, whose format follows the TB_PANDAS_FORMAT environment (CSV by default). The resulting JSON document looks like

    {
        "__type__": "pandas.dataframe",
        "__version__": 2,
        "dtypes": [
            [col1, dtype1],
            [col2, dtype2],
            ...
        ],
        "id": <UUID4 str>,
        "format": <str>
    }
    
  • pandas.Series: A series will be converted to a dataframe before being serialized. The final document will look like this

    {
        "__type__": "pandas.series",
        "__version__": 2,
        "data": {...},
        "name": <str>,
    }
    

    where {...} is the document of the dataframe'd series, see above.

Warning: Series and column names must be strings!