Skip to content
28 changes: 28 additions & 0 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,34 @@ def aggregate_all_and_stack(
index_labels=self.index.names,
)

def aggregate_size(
self,
by_column_ids: typing.Sequence[str] = (),
*,
dropna: bool = True,
):
"""Returns a block object to compute the size(s) of groups."""
agg_specs = [
(ex.NullaryAggregation(agg_ops.SizeOp()), guid.generate_guid()),
]
output_col_ids = [agg_spec[1] for agg_spec in agg_specs]
result_expr = self.expr.aggregate(agg_specs, by_column_ids, dropna=dropna)
names: typing.List[Label] = []
for by_col_id in by_column_ids:
if by_col_id in self.value_columns:
names.append(self.col_id_to_label[by_col_id])
else:
names.append(self.col_id_to_index_name[by_col_id])
return (
Block(
result_expr,
index_columns=by_column_ids,
column_labels=["size"],
index_labels=names,
),
output_col_ids,
)

def select_column(self, id: str) -> Block:
return self.select_columns([id])

Expand Down
19 changes: 18 additions & 1 deletion bigframes/core/compile/aggregate_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def compile_aggregate(
bindings: typing.Dict[str, ibis_types.Value],
order_by: typing.Sequence[ibis_types.Value] = [],
) -> ibis_types.Value:
if isinstance(aggregate, ex.NullaryAggregation):
return compile_nullary_agg(aggregate.op)
if isinstance(aggregate, ex.UnaryAggregation):
input = scalar_compiler.compile_expression(aggregate.arg, bindings=bindings)
if aggregate.op.can_order_by:
Expand All @@ -55,7 +57,9 @@ def compile_analytic(
window: window_spec.WindowSpec,
bindings: typing.Dict[str, ibis_types.Value],
) -> ibis_types.Value:
if isinstance(aggregate, ex.UnaryAggregation):
if isinstance(aggregate, ex.NullaryAggregation):
return compile_nullary_agg(aggregate.op, window)
elif isinstance(aggregate, ex.UnaryAggregation):
input = scalar_compiler.compile_expression(aggregate.arg, bindings=bindings)
return compile_unary_agg(aggregate.op, input, window)
elif isinstance(aggregate, ex.BinaryAggregation):
Expand Down Expand Up @@ -93,6 +97,14 @@ def compile_ordered_unary_agg(
raise ValueError(f"Can't compile unrecognized operation: {op}")


@functools.singledispatch
def compile_nullary_agg(
op: agg_ops.WindowOp,
window: Optional[window_spec.WindowSpec] = None,
) -> ibis_types.Value:
raise ValueError(f"Can't compile unrecognized operation: {op}")


def numeric_op(operation):
@functools.wraps(operation)
def constrained_op(
Expand All @@ -118,6 +130,11 @@ def constrained_op(
### Specific Op implementations Below


@compile_nullary_agg.register
def _(op: agg_ops.SizeOp, window=None) -> ibis_types.NumericValue:
return _apply_window_if_present(vendored_ibis_ops.count(1), window)


@compile_unary_agg.register
@numeric_op
def _(
Expand Down
10 changes: 10 additions & 0 deletions bigframes/core/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ def output_type(
...


@dataclasses.dataclass(frozen=True)
class NullaryAggregation(Aggregation):
op: agg_ops.NullaryWindowOp = dataclasses.field()

def output_type(
self, input_types: dict[str, bigframes.dtypes.Dtype]
) -> dtypes.ExpressionType:
return self.op.output_type()


@dataclasses.dataclass(frozen=True)
class UnaryAggregation(Aggregation):
op: agg_ops.UnaryWindowOp = dataclasses.field()
Expand Down
21 changes: 21 additions & 0 deletions bigframes/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,20 @@ def __getitem__(
dropna=self._dropna,
)

def size(self) -> typing.Union[df.DataFrame, series.Series]:
agg_block, _ = self._block.aggregate_size(
by_column_ids=self._by_col_ids,
dropna=self._dropna,
)
agg_block = agg_block.with_column_labels(pd.Index(["size"]))
dataframe = df.DataFrame(agg_block)

if self._as_index:
series = dataframe["size"]
return series.rename(None)
else:
return self._convert_index(dataframe)

def sum(self, numeric_only: bool = False, *args) -> df.DataFrame:
if not numeric_only:
self._raise_on_non_numeric("sum")
Expand Down Expand Up @@ -520,6 +534,13 @@ def std(self, *args, **kwargs) -> series.Series:
def var(self, *args, **kwargs) -> series.Series:
return self._aggregate(agg_ops.var_op)

def size(self) -> series.Series:
agg_block, _ = self._block.aggregate_size(
by_column_ids=self._by_col_ids,
dropna=self._dropna,
)
return series.Series(agg_block, name=self._value_name)

def skew(self, *args, **kwargs) -> series.Series:
block = block_ops.skew(self._block, [self._value_column], self._by_col_ids)
return series.Series(block)
Expand Down
23 changes: 23 additions & 0 deletions bigframes/operations/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT
...


@dataclasses.dataclass(frozen=True)
class NullaryWindowOp(WindowOp):
@property
def arguments(self) -> int:
return 0


@dataclasses.dataclass(frozen=True)
class UnaryWindowOp(WindowOp):
@property
Expand All @@ -72,6 +79,13 @@ def arguments(self) -> int:
...


@dataclasses.dataclass(frozen=True)
class NullaryAggregateOp(AggregateOp, NullaryWindowOp):
@property
def arguments(self) -> int:
return 0


@dataclasses.dataclass(frozen=True)
class UnaryAggregateOp(AggregateOp, UnaryWindowOp):
@property
Expand All @@ -86,6 +100,14 @@ def arguments(self) -> int:
return 2


@dataclasses.dataclass(frozen=True)
class SizeOp(NullaryAggregateOp):
name: ClassVar[str] = "size"

def output_type(self, *input_types: dtypes.ExpressionType):
return dtypes.INT_DTYPE


@dataclasses.dataclass(frozen=True)
class SumOp(UnaryAggregateOp):
name: ClassVar[str] = "sum"
Expand Down Expand Up @@ -446,6 +468,7 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT
)


size_op = SizeOp()
sum_op = SumOp()
mean_op = MeanOp()
median_op = MedianOp()
Expand Down
107 changes: 84 additions & 23 deletions tests/system/small/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import bigframes.pandas as bpd
from tests.system.utils import assert_pandas_df_equal

# =================
# DataFrame.groupby
# =================


@pytest.mark.parametrize(
("operator"),
Expand Down Expand Up @@ -269,21 +273,26 @@ def test_dataframe_groupby_analytic(
pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False)


def test_series_groupby_skew(scalars_df_index, scalars_pandas_df_index):
bf_result = scalars_df_index.groupby("bool_col")["int64_too"].skew().to_pandas()
pd_result = scalars_pandas_df_index.groupby("bool_col")["int64_too"].skew()
def test_dataframe_groupby_size_as_index_false(
scalars_df_index, scalars_pandas_df_index
):
bf_result = scalars_df_index.groupby("string_col", as_index=False).size()
bf_result_computed = bf_result.to_pandas()
pd_result = scalars_pandas_df_index.groupby("string_col", as_index=False).size()

pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)
pd.testing.assert_frame_equal(
pd_result, bf_result_computed, check_dtype=False, check_index_type=False
)


def test_series_groupby_kurt(scalars_df_index, scalars_pandas_df_index):
bf_result = scalars_df_index.groupby("bool_col")["int64_too"].kurt().to_pandas()
Copy link
Collaborator Author

@tswast tswast Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved below to other series_groupby tests. I also updated the test to actually do a Series.groupby.

# Pandas doesn't have groupby.kurt yet: https://github.com/pandas-dev/pandas/issues/40139
pd_result = scalars_pandas_df_index.groupby("bool_col")["int64_too"].apply(
pd.Series.kurt
)
def test_dataframe_groupby_size_as_index_true(
scalars_df_index, scalars_pandas_df_index
):
bf_result = scalars_df_index.groupby("string_col", as_index=True).size()
pd_result = scalars_pandas_df_index.groupby("string_col", as_index=True).size()
bf_result_computed = bf_result.to_pandas()

pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)
pd.testing.assert_series_equal(pd_result, bf_result_computed, check_dtype=False)


def test_dataframe_groupby_skew(scalars_df_index, scalars_pandas_df_index):
Expand Down Expand Up @@ -356,6 +365,30 @@ def test_dataframe_groupby_getitem_list(
pd.testing.assert_frame_equal(pd_result, bf_result, check_dtype=False)


def test_dataframe_groupby_nonnumeric_with_mean():
df = pd.DataFrame(
{
"key1": ["a", "a", "a", "b"],
"key2": ["a", "a", "c", "c"],
"key3": [1, 2, 3, 4],
"key4": [1.6, 2, 3, 4],
}
)
pd_result = df.groupby(["key1", "key2"]).mean()

with bpd.option_context("bigquery.location", "US"):
bf_result = bpd.DataFrame(df).groupby(["key1", "key2"]).mean().to_pandas()

pd.testing.assert_frame_equal(
pd_result, bf_result, check_index_type=False, check_dtype=False
)


# ==============
# Series.groupby
# ==============


def test_series_groupby_agg_string(scalars_df_index, scalars_pandas_df_index):
bf_result = (
scalars_df_index["int64_col"]
Expand Down Expand Up @@ -392,21 +425,49 @@ def test_series_groupby_agg_list(scalars_df_index, scalars_pandas_df_index):
)


def test_dataframe_groupby_nonnumeric_with_mean():
df = pd.DataFrame(
{
"key1": ["a", "a", "a", "b"],
"key2": ["a", "a", "c", "c"],
"key3": [1, 2, 3, 4],
"key4": [1.6, 2, 3, 4],
}
def test_series_groupby_kurt(scalars_df_index, scalars_pandas_df_index):
bf_result = (
scalars_df_index["int64_too"]
.groupby(scalars_df_index["bool_col"])
.kurt()
.to_pandas()
)
# Pandas doesn't have groupby.kurt yet: https://github.com/pandas-dev/pandas/issues/40139
pd_result = scalars_pandas_df_index.groupby("bool_col")["int64_too"].apply(
pd.Series.kurt
)
pd_result = df.groupby(["key1", "key2"]).mean()
bf_result = bpd.DataFrame(df).groupby(["key1", "key2"]).mean().to_pandas()

pd.testing.assert_frame_equal(
pd_result, bf_result, check_index_type=False, check_dtype=False
pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)


def test_series_groupby_size(scalars_df_index, scalars_pandas_df_index):
bf_result = (
scalars_df_index["int64_too"].groupby(scalars_df_index["bool_col"]).size()
)
pd_result = (
scalars_pandas_df_index["int64_too"]
.groupby(scalars_pandas_df_index["bool_col"])
.size()
)
bf_result_computed = bf_result.to_pandas()

pd.testing.assert_series_equal(pd_result, bf_result_computed, check_dtype=False)


def test_series_groupby_skew(scalars_df_index, scalars_pandas_df_index):
bf_result = (
scalars_df_index["int64_too"]
.groupby(scalars_df_index["bool_col"])
.skew()
.to_pandas()
)
pd_result = (
scalars_pandas_df_index["int64_too"]
.groupby(scalars_pandas_df_index["bool_col"])
.skew()
)

pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)


@pytest.mark.parametrize(
Expand Down
10 changes: 10 additions & 0 deletions third_party/bigframes_vendored/ibis/expr/operations/analytic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,19 @@

from __future__ import annotations

import ibis
import ibis.expr.operations as ops
import ibis.expr.rules as rlz


# TODO(swast): We can remove this if ibis adds aggregates over scalar values.
# See: https://github.com/ibis-project/ibis/issues/8698
@ibis.udf.agg.builtin
def count(value: int) -> int:
"""Count of a scalar."""
return 0 # pragma: NO COVER


class FirstNonNullValue(ops.Analytic):
"""Retrieve the first element."""

Expand All @@ -21,6 +30,7 @@ class LastNonNullValue(ops.Analytic):


__all__ = [
"count",
"FirstNonNullValue",
"LastNonNullValue",
]