pandas accessors¤
pandas accessors provide syntactic sugar for dataframes and series.
they make it possible to manipulate pandas objects using the familiar
fluent interface permitted by chaining pandas methods.
further, we consider calling corountines and asynchronous objects during pandas computations. at the end, we have a powerful, pluggable interface that permits pandas
affordances to be consistently used in an application. designing accessors makes it possible to develop reusable interfaces outside the context of an application.
import pandas, anyio, typing, inspect, pathlib
from functools import partial
from operator import *
from functools import singledispatch, partialmethod
from itertools import chain
from enum import auto, IntFlag
class Kind(IntFlag): INDEX, SERIES, DATAFRAME = auto(),auto(),auto()
INDEX, SERIES, FRAME = Kind.INDEX, Kind.SERIES, Kind.DATAFRAME
each pandas collection has a mapping method, but the methods vary based on the type of the dataframe. the apply function is an async aware generalized mapper for pandas collections that always return a series of dataframe.
the Accessor implements the base pattern for pandas accessors. we add some conveniences for registering accessors
to the class creation
class Accessor:
def __init__(self, object):
self.object = object
def __init_subclass__(cls, method=None, types=INDEX|SERIES|FRAME, name=None):
cls.method = method
for t in (INDEX, SERIES, FRAME):
if t in types:
getattr(pandas.api.extensions, F"register_{t.name.lower()}_accessor")(name or cls.__name__.lower())(cls)
ACC_METHODS = {pandas.Index: "index", pandas.Series: "series", pandas.DataFrame: "dataframe"}
the Method generically consumes methods to create accessors for entire method apis.
class Method(Accessor):
def __init_subclass__(cls, method=None, types=[pandas.Index, pandas.Series, pandas.DataFrame], name=None):
super().__init_subclass__(method=method, types=types, name=name)
for k,v in vars(method).items():
if hasattr(cls, k): continue
if callable(method) and not isinstance(v, classmethod):
setattr(cls, k, partialmethod(cls.apply, v))
elif isinstance(method, property):
setattr(cls, k, property(partialmethod(cls.apply, operator.attrgetter(k))))
def apply(self, f, *args, **kwargs):
return apply(self.object,f, *args, **kwargs)
a generalized, async-aware apply method¤
@singledispatch
def apply(x, f, *args, **kwargs):
return apply_index(pandas.Index(x), f, *args, **kwargs)
@apply.register(pandas.Index)
def apply_index(x, f, *args, **kwargs):
return apply_series(x.to_series(), f, *args, **kwargs)
@apply.register(pandas.Series)
def apply_series(x, f, *args, **kwargs):
return x.apply(f, args=args, **kwargs).rename(f.__name__).pipe(_sync_or_async)
@apply.register(pandas.DataFrame)
def apply_frame(x, f, *args, **kwargs):
return x.apply(f, axis=1, args=args, **kwargs).pipe(_sync_or_async)
@apply.register(pandas.core.groupby.DataFrameGroupBy)
def apply_group(x, f, *args, **kwargs):
return x.apply(f, *args, **kwargs).pipe(_sync_or_async)
async def _asyncgen(x): return [x async for x in x]
async def _update(s, gen=None, asyn=None):
from asyncio import gather
if not len(s): return ()
if isinstance(s[0], (typing.AsyncGenerator, typing.AsyncIterable)):
s = s.apply(_asyncgen)
if isinstance(s[0], typing.Coroutine):
y = await gather(*s.tolist())
s.update(dict(zip(s.index, y)))
return s
def _sync_or_async(s):
return _update(s) if _isasync(s) else s
def _isasync(x):
if len(x):
return isinstance(x[0], (typing.AsyncGenerator, typing.AsyncIterable, typing.AsyncIterator, typing.Coroutine))
return False
let use that boring ass sleep example to compare that pandas is in fact running async.
def hold(x): __import__("time").sleep(x*.1); return x
async def ahold(x): await __import__("asyncio").sleep(x*.1); return x
when comparing async and sync version the sleep code we find that our wall time, the actual duration of time, can be drastically reduced.
this comes at a cost of the cpu time required. the async cpu is greater because mode code is required to handle coroutines and async evaluations.
N = 10
__import__("nest_asyncio").apply()
print("the async version runs faster in parallel")
%time __import__("asyncio").run(apply_index(pandas.RangeIndex(N)[::-1], ahold))
print("while the sync version runs in serial")
%time apply_index(pandas.RangeIndex(N)[::-1], hold)
print("the difference in the times allows us to see that are accessors are running in parallel.")
sync and async paths¤
class Path(Method, method=pathlib.Path, name="path", types=INDEX | SERIES):
def __call__(self, *args, **kwargs):
if isinstance(self.object, pandas.Index):
return self.object.map(self.method)
return self.object.apply(self.method)
sync_dirs = (
pandas.Index([""]).path().path.glob("*.ipynb")
).apply(list).explode()
normal pandas apply methods are not async aware.
class APath(Method, method=anyio.Path, name="apath", types=INDEX | SERIES):
def __call__(self, *args, **kwargs):
if isinstance(self.object, pandas.Index):
return self.object.map(self.method)
return self.object.apply(self.method)
the async_dirs is returned from the async variant that requires an await statement ammended to the sync_dirs definition
async_dirs = (
await pandas.Index([""]).apath().apath.glob("*.ipynb")
).apply(list).explode()
assert pandas.testing.assert_series_equal(sync_dirs, async_dirs) is None
(
await async_dirs.pipe(pandas.Index).apath.read_text()
).apply(pandas.read_json, typ="series").explode("cells")
import jinja2
class Template(Method, method=jinja2.Environment(), types=FRAME):
def render_string(self, tpl, **kwargs):
tpl = self.method.from_string(tpl)
return apply_frame(self.object, self._render_series, tpl)
def _render_series(self, row, tpl, **kwargs):
if self.method.is_async:
return tpl.render_async(**row, **kwargs)
return tpl.render(**row, **kwargs)
def render_template(self, tpl, **kwargs):
tpl = self.method.get_template(tpl)
return apply_frame(self.object, self._render_series, tpl)
class ATemplate(Template, method=jinja2.Environment(enable_async=True), types=FRAME):
pass
df = sync_dirs.pipe(pandas.Index, name="path").to_series().to_frame()
sync_dirs.pipe(pandas.Index, name="path").to_series().to_frame().template.render_string("sha256sum {{path}}")
await sync_dirs.pipe(pandas.Index, name="path").to_series().to_frame().atemplate.render_string("sha256sum {{path}}")