pandas
accessors¤
pandas
accessors provide syntactic sugar for dataframes and series.
they make it possible to manipulate pandas
objects using the familiar
fluent interface permitted by chaining pandas methods.
further, we consider calling corountines and asynchronous objects during pandas
computations. at the end, we have a powerful, pluggable interface that permits pandas
affordances to be consistently used in an application. designing accessors makes it possible to develop reusable interfaces outside the context of an application.
import pandas, anyio, typing, inspect, pathlib
from functools import partial
from operator import *
from functools import singledispatch, partialmethod
from itertools import chain
from enum import auto, IntFlag
class Kind(IntFlag): INDEX, SERIES, DATAFRAME = auto(),auto(),auto()
INDEX, SERIES, FRAME = Kind.INDEX, Kind.SERIES, Kind.DATAFRAME
each pandas collection has a mapping method, but the methods vary based on the type of the dataframe. the apply
function is an async aware generalized mapper for pandas collections that always return a series of dataframe.
the Accessor
implements the base pattern for pandas
accessors. we add some conveniences for registering accessors
to the class creation
class Accessor:
def __init__(self, object):
self.object = object
def __init_subclass__(cls, method=None, types=INDEX|SERIES|FRAME, name=None):
cls.method = method
for t in (INDEX, SERIES, FRAME):
if t in types:
getattr(pandas.api.extensions, F"register_{t.name.lower()}_accessor")(name or cls.__name__.lower())(cls)
ACC_METHODS = {pandas.Index: "index", pandas.Series: "series", pandas.DataFrame: "dataframe"}
the Method
generically consumes methods to create accessors for entire method apis.
class Method(Accessor):
def __init_subclass__(cls, method=None, types=[pandas.Index, pandas.Series, pandas.DataFrame], name=None):
super().__init_subclass__(method=method, types=types, name=name)
for k,v in vars(method).items():
if hasattr(cls, k): continue
if callable(method) and not isinstance(v, classmethod):
setattr(cls, k, partialmethod(cls.apply, v))
elif isinstance(method, property):
setattr(cls, k, property(partialmethod(cls.apply, operator.attrgetter(k))))
def apply(self, f, *args, **kwargs):
return apply(self.object,f, *args, **kwargs)
a generalized, async-aware apply method¤
@singledispatch
def apply(x, f, *args, **kwargs):
return apply_index(pandas.Index(x), f, *args, **kwargs)
@apply.register(pandas.Index)
def apply_index(x, f, *args, **kwargs):
return apply_series(x.to_series(), f, *args, **kwargs)
@apply.register(pandas.Series)
def apply_series(x, f, *args, **kwargs):
return x.apply(f, args=args, **kwargs).rename(f.__name__).pipe(_sync_or_async)
@apply.register(pandas.DataFrame)
def apply_frame(x, f, *args, **kwargs):
return x.apply(f, axis=1, args=args, **kwargs).pipe(_sync_or_async)
@apply.register(pandas.core.groupby.DataFrameGroupBy)
def apply_group(x, f, *args, **kwargs):
return x.apply(f, *args, **kwargs).pipe(_sync_or_async)
async def _asyncgen(x): return [x async for x in x]
async def _update(s, gen=None, asyn=None):
from asyncio import gather
if not len(s): return ()
if isinstance(s[0], (typing.AsyncGenerator, typing.AsyncIterable)):
s = s.apply(_asyncgen)
if isinstance(s[0], typing.Coroutine):
y = await gather(*s.tolist())
s.update(dict(zip(s.index, y)))
return s
def _sync_or_async(s):
return _update(s) if _isasync(s) else s
def _isasync(x):
if len(x):
return isinstance(x[0], (typing.AsyncGenerator, typing.AsyncIterable, typing.AsyncIterator, typing.Coroutine))
return False
let use that boring ass sleep
example to compare that pandas
is in fact running async.
def hold(x): __import__("time").sleep(x*.1); return x
async def ahold(x): await __import__("asyncio").sleep(x*.1); return x
when comparing async and sync version the sleep
code we find that our wall time, the actual duration of time, can be drastically reduced.
this comes at a cost of the cpu time required. the async cpu is greater because mode code is required to handle coroutines and async evaluations.
N = 10
__import__("nest_asyncio").apply()
print("the async version runs faster in parallel")
%time __import__("asyncio").run(apply_index(pandas.RangeIndex(N)[::-1], ahold))
print("while the sync version runs in serial")
%time apply_index(pandas.RangeIndex(N)[::-1], hold)
print("the difference in the times allows us to see that are accessors are running in parallel.")
sync and async paths¤
class Path(Method, method=pathlib.Path, name="path", types=INDEX | SERIES):
def __call__(self, *args, **kwargs):
if isinstance(self.object, pandas.Index):
return self.object.map(self.method)
return self.object.apply(self.method)
sync_dirs = (
pandas.Index([""]).path().path.glob("*.ipynb")
).apply(list).explode()
normal pandas
apply methods are not async
aware.
class APath(Method, method=anyio.Path, name="apath", types=INDEX | SERIES):
def __call__(self, *args, **kwargs):
if isinstance(self.object, pandas.Index):
return self.object.map(self.method)
return self.object.apply(self.method)
the async_dirs
is returned from the async
variant that requires an await
statement ammended to the sync_dirs
definition
async_dirs = (
await pandas.Index([""]).apath().apath.glob("*.ipynb")
).apply(list).explode()
assert pandas.testing.assert_series_equal(sync_dirs, async_dirs) is None
(
await async_dirs.pipe(pandas.Index).apath.read_text()
).apply(pandas.read_json, typ="series").explode("cells")
import jinja2
class Template(Method, method=jinja2.Environment(), types=FRAME):
def render_string(self, tpl, **kwargs):
tpl = self.method.from_string(tpl)
return apply_frame(self.object, self._render_series, tpl)
def _render_series(self, row, tpl, **kwargs):
if self.method.is_async:
return tpl.render_async(**row, **kwargs)
return tpl.render(**row, **kwargs)
def render_template(self, tpl, **kwargs):
tpl = self.method.get_template(tpl)
return apply_frame(self.object, self._render_series, tpl)
class ATemplate(Template, method=jinja2.Environment(enable_async=True), types=FRAME):
pass
df = sync_dirs.pipe(pandas.Index, name="path").to_series().to_frame()
sync_dirs.pipe(pandas.Index, name="path").to_series().to_frame().template.render_string("sha256sum {{path}}")
await sync_dirs.pipe(pandas.Index, name="path").to_series().to_frame().atemplate.render_string("sha256sum {{path}}")