libraries

This commit is contained in:
2024-09-28 22:52:53 -07:00
parent 5cdaf1f76b
commit 4929d1fa66
7378 changed files with 1550978 additions and 14 deletions

View File

@@ -0,0 +1,130 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import ClassVar, Callable
import pandas as pd
from pandas import DataFrame
from seaborn._core.scales import Scale
from seaborn._core.groupby import GroupBy
from seaborn._stats.base import Stat
from seaborn._statistics import (
EstimateAggregator,
WeightedAggregator,
)
from seaborn._core.typing import Vector
@dataclass
class Agg(Stat):
"""
Aggregate data along the value axis using given method.
Parameters
----------
func : str or callable
Name of a :class:`pandas.Series` method or a vector -> scalar function.
See Also
--------
objects.Est : Aggregation with error bars.
Examples
--------
.. include:: ../docstrings/objects.Agg.rst
"""
func: str | Callable[[Vector], float] = "mean"
group_by_orient: ClassVar[bool] = True
def __call__(
self, data: DataFrame, groupby: GroupBy, orient: str, scales: dict[str, Scale],
) -> DataFrame:
var = {"x": "y", "y": "x"}.get(orient)
res = (
groupby
.agg(data, {var: self.func})
.dropna(subset=[var])
.reset_index(drop=True)
)
return res
@dataclass
class Est(Stat):
"""
Calculate a point estimate and error bar interval.
For more information about the various `errorbar` choices, see the
:doc:`errorbar tutorial </tutorial/error_bars>`.
Additional variables:
- **weight**: When passed to a layer that uses this stat, a weighted estimate
will be computed. Note that use of weights currently limits the choice of
function and error bar method to `"mean"` and `"ci"`, respectively.
Parameters
----------
func : str or callable
Name of a :class:`numpy.ndarray` method or a vector -> scalar function.
errorbar : str, (str, float) tuple, or callable
Name of errorbar method (one of "ci", "pi", "se" or "sd"), or a tuple
with a method name ane a level parameter, or a function that maps from a
vector to a (min, max) interval.
n_boot : int
Number of bootstrap samples to draw for "ci" errorbars.
seed : int
Seed for the PRNG used to draw bootstrap samples.
Examples
--------
.. include:: ../docstrings/objects.Est.rst
"""
func: str | Callable[[Vector], float] = "mean"
errorbar: str | tuple[str, float] = ("ci", 95)
n_boot: int = 1000
seed: int | None = None
group_by_orient: ClassVar[bool] = True
def _process(
self, data: DataFrame, var: str, estimator: EstimateAggregator
) -> DataFrame:
# Needed because GroupBy.apply assumes func is DataFrame -> DataFrame
# which we could probably make more general to allow Series return
res = estimator(data, var)
return pd.DataFrame([res])
def __call__(
self, data: DataFrame, groupby: GroupBy, orient: str, scales: dict[str, Scale],
) -> DataFrame:
boot_kws = {"n_boot": self.n_boot, "seed": self.seed}
if "weight" in data:
engine = WeightedAggregator(self.func, self.errorbar, **boot_kws)
else:
engine = EstimateAggregator(self.func, self.errorbar, **boot_kws)
var = {"x": "y", "y": "x"}[orient]
res = (
groupby
.apply(data, self._process, var, engine)
.dropna(subset=[var])
.reset_index(drop=True)
)
res = res.fillna({f"{var}min": res[var], f"{var}max": res[var]})
return res
@dataclass
class Rolling(Stat):
...
def __call__(self, data, groupby, orient, scales):
...

View File

@@ -0,0 +1,65 @@
"""Base module for statistical transformations."""
from __future__ import annotations
from collections.abc import Iterable
from dataclasses import dataclass
from typing import ClassVar, Any
import warnings
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from pandas import DataFrame
from seaborn._core.groupby import GroupBy
from seaborn._core.scales import Scale
@dataclass
class Stat:
"""Base class for objects that apply statistical transformations."""
# The class supports a partial-function application pattern. The object is
# initialized with desired parameters and the result is a callable that
# accepts and returns dataframes.
# The statistical transformation logic should not add any state to the instance
# beyond what is defined with the initialization parameters.
# Subclasses can declare whether the orient dimension should be used in grouping
# TODO consider whether this should be a parameter. Motivating example:
# use the same KDE class violin plots and univariate density estimation.
# In the former case, we would expect separate densities for each unique
# value on the orient axis, but we would not in the latter case.
group_by_orient: ClassVar[bool] = False
def _check_param_one_of(self, param: str, options: Iterable[Any]) -> None:
"""Raise when parameter value is not one of a specified set."""
value = getattr(self, param)
if value not in options:
*most, last = options
option_str = ", ".join(f"{x!r}" for x in most[:-1]) + f" or {last!r}"
err = " ".join([
f"The `{param}` parameter for `{self.__class__.__name__}` must be",
f"one of {option_str}; not {value!r}.",
])
raise ValueError(err)
def _check_grouping_vars(
self, param: str, data_vars: list[str], stacklevel: int = 2,
) -> None:
"""Warn if vars are named in parameter without being present in the data."""
param_vars = getattr(self, param)
undefined = set(param_vars) - set(data_vars)
if undefined:
param = f"{self.__class__.__name__}.{param}"
names = ", ".join(f"{x!r}" for x in undefined)
msg = f"Undefined variable(s) passed for {param}: {names}."
warnings.warn(msg, stacklevel=stacklevel)
def __call__(
self,
data: DataFrame,
groupby: GroupBy,
orient: str,
scales: dict[str, Scale],
) -> DataFrame:
"""Apply statistical transform to data subgroups and return combined result."""
return data

View File

@@ -0,0 +1,232 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import ClassVar
import numpy as np
import pandas as pd
from pandas import DataFrame
from seaborn._core.groupby import GroupBy
from seaborn._core.scales import Scale
from seaborn._stats.base import Stat
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from numpy.typing import ArrayLike
@dataclass
class Count(Stat):
"""
Count distinct observations within groups.
See Also
--------
Hist : A more fully-featured transform including binning and/or normalization.
Examples
--------
.. include:: ../docstrings/objects.Count.rst
"""
group_by_orient: ClassVar[bool] = True
def __call__(
self, data: DataFrame, groupby: GroupBy, orient: str, scales: dict[str, Scale],
) -> DataFrame:
var = {"x": "y", "y": "x"}[orient]
res = (
groupby
.agg(data.assign(**{var: data[orient]}), {var: len})
.dropna(subset=["x", "y"])
.reset_index(drop=True)
)
return res
@dataclass
class Hist(Stat):
"""
Bin observations, count them, and optionally normalize or cumulate.
Parameters
----------
stat : str
Aggregate statistic to compute in each bin:
- `count`: the number of observations
- `density`: normalize so that the total area of the histogram equals 1
- `percent`: normalize so that bar heights sum to 100
- `probability` or `proportion`: normalize so that bar heights sum to 1
- `frequency`: divide the number of observations by the bin width
bins : str, int, or ArrayLike
Generic parameter that can be the name of a reference rule, the number
of bins, or the bin breaks. Passed to :func:`numpy.histogram_bin_edges`.
binwidth : float
Width of each bin; overrides `bins` but can be used with `binrange`.
Note that if `binwidth` does not evenly divide the bin range, the actual
bin width used will be only approximately equal to the parameter value.
binrange : (min, max)
Lowest and highest value for bin edges; can be used with either
`bins` (when a number) or `binwidth`. Defaults to data extremes.
common_norm : bool or list of variables
When not `False`, the normalization is applied across groups. Use
`True` to normalize across all groups, or pass variable name(s) that
define normalization groups.
common_bins : bool or list of variables
When not `False`, the same bins are used for all groups. Use `True` to
share bins across all groups, or pass variable name(s) to share within.
cumulative : bool
If True, cumulate the bin values.
discrete : bool
If True, set `binwidth` and `binrange` so that bins have unit width and
are centered on integer values
Notes
-----
The choice of bins for computing and plotting a histogram can exert
substantial influence on the insights that one is able to draw from the
visualization. If the bins are too large, they may erase important features.
On the other hand, bins that are too small may be dominated by random
variability, obscuring the shape of the true underlying distribution. The
default bin size is determined using a reference rule that depends on the
sample size and variance. This works well in many cases, (i.e., with
"well-behaved" data) but it fails in others. It is always a good to try
different bin sizes to be sure that you are not missing something important.
This function allows you to specify bins in several different ways, such as
by setting the total number of bins to use, the width of each bin, or the
specific locations where the bins should break.
Examples
--------
.. include:: ../docstrings/objects.Hist.rst
"""
stat: str = "count"
bins: str | int | ArrayLike = "auto"
binwidth: float | None = None
binrange: tuple[float, float] | None = None
common_norm: bool | list[str] = True
common_bins: bool | list[str] = True
cumulative: bool = False
discrete: bool = False
def __post_init__(self):
stat_options = [
"count", "density", "percent", "probability", "proportion", "frequency"
]
self._check_param_one_of("stat", stat_options)
def _define_bin_edges(self, vals, weight, bins, binwidth, binrange, discrete):
"""Inner function that takes bin parameters as arguments."""
vals = vals.replace(-np.inf, np.nan).replace(np.inf, np.nan).dropna()
if binrange is None:
start, stop = vals.min(), vals.max()
else:
start, stop = binrange
if discrete:
bin_edges = np.arange(start - .5, stop + 1.5)
else:
if binwidth is not None:
bins = int(round((stop - start) / binwidth))
bin_edges = np.histogram_bin_edges(vals, bins, binrange, weight)
# TODO warning or cap on too many bins?
return bin_edges
def _define_bin_params(self, data, orient, scale_type):
"""Given data, return numpy.histogram parameters to define bins."""
vals = data[orient]
weights = data.get("weight", None)
# TODO We'll want this for ordinal / discrete scales too
# (Do we need discrete as a parameter or just infer from scale?)
discrete = self.discrete or scale_type == "nominal"
bin_edges = self._define_bin_edges(
vals, weights, self.bins, self.binwidth, self.binrange, discrete,
)
if isinstance(self.bins, (str, int)):
n_bins = len(bin_edges) - 1
bin_range = bin_edges.min(), bin_edges.max()
bin_kws = dict(bins=n_bins, range=bin_range)
else:
bin_kws = dict(bins=bin_edges)
return bin_kws
def _get_bins_and_eval(self, data, orient, groupby, scale_type):
bin_kws = self._define_bin_params(data, orient, scale_type)
return groupby.apply(data, self._eval, orient, bin_kws)
def _eval(self, data, orient, bin_kws):
vals = data[orient]
weights = data.get("weight", None)
density = self.stat == "density"
hist, edges = np.histogram(vals, **bin_kws, weights=weights, density=density)
width = np.diff(edges)
center = edges[:-1] + width / 2
return pd.DataFrame({orient: center, "count": hist, "space": width})
def _normalize(self, data):
hist = data["count"]
if self.stat == "probability" or self.stat == "proportion":
hist = hist.astype(float) / hist.sum()
elif self.stat == "percent":
hist = hist.astype(float) / hist.sum() * 100
elif self.stat == "frequency":
hist = hist.astype(float) / data["space"]
if self.cumulative:
if self.stat in ["density", "frequency"]:
hist = (hist * data["space"]).cumsum()
else:
hist = hist.cumsum()
return data.assign(**{self.stat: hist})
def __call__(
self, data: DataFrame, groupby: GroupBy, orient: str, scales: dict[str, Scale],
) -> DataFrame:
scale_type = scales[orient].__class__.__name__.lower()
grouping_vars = [str(v) for v in data if v in groupby.order]
if not grouping_vars or self.common_bins is True:
bin_kws = self._define_bin_params(data, orient, scale_type)
data = groupby.apply(data, self._eval, orient, bin_kws)
else:
if self.common_bins is False:
bin_groupby = GroupBy(grouping_vars)
else:
bin_groupby = GroupBy(self.common_bins)
self._check_grouping_vars("common_bins", grouping_vars)
data = bin_groupby.apply(
data, self._get_bins_and_eval, orient, groupby, scale_type,
)
if not grouping_vars or self.common_norm is True:
data = self._normalize(data)
else:
if self.common_norm is False:
norm_groupby = GroupBy(grouping_vars)
else:
norm_groupby = GroupBy(self.common_norm)
self._check_grouping_vars("common_norm", grouping_vars)
data = norm_groupby.apply(data, self._normalize)
other = {"x": "y", "y": "x"}[orient]
return data.assign(**{other: data[self.stat]})

View File

@@ -0,0 +1,214 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Callable
import numpy as np
from numpy import ndarray
import pandas as pd
from pandas import DataFrame
try:
from scipy.stats import gaussian_kde
_no_scipy = False
except ImportError:
from seaborn.external.kde import gaussian_kde
_no_scipy = True
from seaborn._core.groupby import GroupBy
from seaborn._core.scales import Scale
from seaborn._stats.base import Stat
@dataclass
class KDE(Stat):
"""
Compute a univariate kernel density estimate.
Parameters
----------
bw_adjust : float
Factor that multiplicatively scales the value chosen using
`bw_method`. Increasing will make the curve smoother. See Notes.
bw_method : string, scalar, or callable
Method for determining the smoothing bandwidth to use. Passed directly
to :class:`scipy.stats.gaussian_kde`; see there for options.
common_norm : bool or list of variables
If `True`, normalize so that the areas of all curves sums to 1.
If `False`, normalize each curve independently. If a list, defines
variable(s) to group by and normalize within.
common_grid : bool or list of variables
If `True`, all curves will share the same evaluation grid.
If `False`, each evaluation grid is independent. If a list, defines
variable(s) to group by and share a grid within.
gridsize : int or None
Number of points in the evaluation grid. If None, the density is
evaluated at the original datapoints.
cut : float
Factor, multiplied by the kernel bandwidth, that determines how far
the evaluation grid extends past the extreme datapoints. When set to 0,
the curve is truncated at the data limits.
cumulative : bool
If True, estimate a cumulative distribution function. Requires scipy.
Notes
-----
The *bandwidth*, or standard deviation of the smoothing kernel, is an
important parameter. Much like histogram bin width, using the wrong
bandwidth can produce a distorted representation. Over-smoothing can erase
true features, while under-smoothing can create false ones. The default
uses a rule-of-thumb that works best for distributions that are roughly
bell-shaped. It is a good idea to check the default by varying `bw_adjust`.
Because the smoothing is performed with a Gaussian kernel, the estimated
density curve can extend to values that may not make sense. For example, the
curve may be drawn over negative values when data that are naturally
positive. The `cut` parameter can be used to control the evaluation range,
but datasets that have many observations close to a natural boundary may be
better served by a different method.
Similar distortions may arise when a dataset is naturally discrete or "spiky"
(containing many repeated observations of the same value). KDEs will always
produce a smooth curve, which could be misleading.
The units on the density axis are a common source of confusion. While kernel
density estimation produces a probability distribution, the height of the curve
at each point gives a density, not a probability. A probability can be obtained
only by integrating the density across a range. The curve is normalized so
that the integral over all possible values is 1, meaning that the scale of
the density axis depends on the data values.
If scipy is installed, its cython-accelerated implementation will be used.
Examples
--------
.. include:: ../docstrings/objects.KDE.rst
"""
bw_adjust: float = 1
bw_method: str | float | Callable[[gaussian_kde], float] = "scott"
common_norm: bool | list[str] = True
common_grid: bool | list[str] = True
gridsize: int | None = 200
cut: float = 3
cumulative: bool = False
def __post_init__(self):
if self.cumulative and _no_scipy:
raise RuntimeError("Cumulative KDE evaluation requires scipy")
def _check_var_list_or_boolean(self, param: str, grouping_vars: Any) -> None:
"""Do input checks on grouping parameters."""
value = getattr(self, param)
if not (
isinstance(value, bool)
or (isinstance(value, list) and all(isinstance(v, str) for v in value))
):
param_name = f"{self.__class__.__name__}.{param}"
raise TypeError(f"{param_name} must be a boolean or list of strings.")
self._check_grouping_vars(param, grouping_vars, stacklevel=3)
def _fit(self, data: DataFrame, orient: str) -> gaussian_kde:
"""Fit and return a KDE object."""
# TODO need to handle singular data
fit_kws: dict[str, Any] = {"bw_method": self.bw_method}
if "weight" in data:
fit_kws["weights"] = data["weight"]
kde = gaussian_kde(data[orient], **fit_kws)
kde.set_bandwidth(kde.factor * self.bw_adjust)
return kde
def _get_support(self, data: DataFrame, orient: str) -> ndarray:
"""Define the grid that the KDE will be evaluated on."""
if self.gridsize is None:
return data[orient].to_numpy()
kde = self._fit(data, orient)
bw = np.sqrt(kde.covariance.squeeze())
gridmin = data[orient].min() - bw * self.cut
gridmax = data[orient].max() + bw * self.cut
return np.linspace(gridmin, gridmax, self.gridsize)
def _fit_and_evaluate(
self, data: DataFrame, orient: str, support: ndarray
) -> DataFrame:
"""Transform single group by fitting a KDE and evaluating on a support grid."""
empty = pd.DataFrame(columns=[orient, "weight", "density"], dtype=float)
if len(data) < 2:
return empty
try:
kde = self._fit(data, orient)
except np.linalg.LinAlgError:
return empty
if self.cumulative:
s_0 = support[0]
density = np.array([kde.integrate_box_1d(s_0, s_i) for s_i in support])
else:
density = kde(support)
weight = data["weight"].sum()
return pd.DataFrame({orient: support, "weight": weight, "density": density})
def _transform(
self, data: DataFrame, orient: str, grouping_vars: list[str]
) -> DataFrame:
"""Transform multiple groups by fitting KDEs and evaluating."""
empty = pd.DataFrame(columns=[*data.columns, "density"], dtype=float)
if len(data) < 2:
return empty
try:
support = self._get_support(data, orient)
except np.linalg.LinAlgError:
return empty
grouping_vars = [x for x in grouping_vars if data[x].nunique() > 1]
if not grouping_vars:
return self._fit_and_evaluate(data, orient, support)
groupby = GroupBy(grouping_vars)
return groupby.apply(data, self._fit_and_evaluate, orient, support)
def __call__(
self, data: DataFrame, groupby: GroupBy, orient: str, scales: dict[str, Scale],
) -> DataFrame:
if "weight" not in data:
data = data.assign(weight=1)
data = data.dropna(subset=[orient, "weight"])
# Transform each group separately
grouping_vars = [str(v) for v in data if v in groupby.order]
if not grouping_vars or self.common_grid is True:
res = self._transform(data, orient, grouping_vars)
else:
if self.common_grid is False:
grid_vars = grouping_vars
else:
self._check_var_list_or_boolean("common_grid", grouping_vars)
grid_vars = [v for v in self.common_grid if v in grouping_vars]
res = (
GroupBy(grid_vars)
.apply(data, self._transform, orient, grouping_vars)
)
# Normalize, potentially within groups
if not grouping_vars or self.common_norm is True:
res = res.assign(group_weight=data["weight"].sum())
else:
if self.common_norm is False:
norm_vars = grouping_vars
else:
self._check_var_list_or_boolean("common_norm", grouping_vars)
norm_vars = [v for v in self.common_norm if v in grouping_vars]
res = res.join(
data.groupby(norm_vars)["weight"].sum().rename("group_weight"),
on=norm_vars,
)
res["density"] *= res.eval("weight / group_weight")
value = {"x": "y", "y": "x"}[orient]
res[value] = res["density"]
return res.drop(["weight", "group_weight"], axis=1)

View File

@@ -0,0 +1,78 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import ClassVar, cast
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal # type: ignore
import numpy as np
from pandas import DataFrame
from seaborn._core.scales import Scale
from seaborn._core.groupby import GroupBy
from seaborn._stats.base import Stat
from seaborn.utils import _version_predates
# From https://github.com/numpy/numpy/blob/main/numpy/lib/function_base.pyi
_MethodKind = Literal[
"inverted_cdf",
"averaged_inverted_cdf",
"closest_observation",
"interpolated_inverted_cdf",
"hazen",
"weibull",
"linear",
"median_unbiased",
"normal_unbiased",
"lower",
"higher",
"midpoint",
"nearest",
]
@dataclass
class Perc(Stat):
"""
Replace observations with percentile values.
Parameters
----------
k : list of numbers or int
If a list of numbers, this gives the percentiles (in [0, 100]) to compute.
If an integer, compute `k` evenly-spaced percentiles between 0 and 100.
For example, `k=5` computes the 0, 25, 50, 75, and 100th percentiles.
method : str
Method for interpolating percentiles between observed datapoints.
See :func:`numpy.percentile` for valid options and more information.
Examples
--------
.. include:: ../docstrings/objects.Perc.rst
"""
k: int | list[float] = 5
method: str = "linear"
group_by_orient: ClassVar[bool] = True
def _percentile(self, data: DataFrame, var: str) -> DataFrame:
k = list(np.linspace(0, 100, self.k)) if isinstance(self.k, int) else self.k
method = cast(_MethodKind, self.method)
values = data[var].dropna()
if _version_predates(np, "1.22"):
res = np.percentile(values, k, interpolation=method) # type: ignore
else:
res = np.percentile(data[var].dropna(), k, method=method)
return DataFrame({var: res, "percentile": k})
def __call__(
self, data: DataFrame, groupby: GroupBy, orient: str, scales: dict[str, Scale],
) -> DataFrame:
var = {"x": "y", "y": "x"}[orient]
return groupby.apply(data, self._percentile, var)

View File

@@ -0,0 +1,50 @@
from __future__ import annotations
from dataclasses import dataclass
import numpy as np
import pandas as pd
from seaborn._stats.base import Stat
@dataclass
class PolyFit(Stat):
"""
Fit a polynomial of the given order and resample data onto predicted curve.
"""
# This is a provisional class that is useful for building out functionality.
# It may or may not change substantially in form or dissappear as we think
# through the organization of the stats subpackage.
order: int = 2
gridsize: int = 100
def _fit_predict(self, data):
x = data["x"]
y = data["y"]
if x.nunique() <= self.order:
# TODO warn?
xx = yy = []
else:
p = np.polyfit(x, y, self.order)
xx = np.linspace(x.min(), x.max(), self.gridsize)
yy = np.polyval(p, xx)
return pd.DataFrame(dict(x=xx, y=yy))
# TODO we should have a way of identifying the method that will be applied
# and then only define __call__ on a base-class of stats with this pattern
def __call__(self, data, groupby, orient, scales):
return (
groupby
.apply(data.dropna(subset=["x", "y"]), self._fit_predict)
)
@dataclass
class OLSFit(Stat):
...