Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 55 additions & 29 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import pyarrow as pa
import pyarrow.feather as pa_feather

import bigframes.core.compile
import bigframes.core.expression as ex
import bigframes.core.guid
import bigframes.core.identifiers as ids
Expand All @@ -35,15 +34,13 @@
import bigframes.core.nodes as nodes
from bigframes.core.ordering import OrderingExpression
import bigframes.core.ordering as orderings
import bigframes.core.rewrite
import bigframes.core.schema as schemata
import bigframes.core.tree_properties
import bigframes.core.utils
from bigframes.core.window_spec import WindowSpec
import bigframes.dtypes
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
import bigframes.session._io.bigquery

if typing.TYPE_CHECKING:
from bigframes.session import Session
Expand Down Expand Up @@ -199,6 +196,8 @@ def as_cached(

def _try_evaluate_local(self):
"""Use only for unit testing paths - not fully featured. Will throw exception if fails."""
import bigframes.core.compile

return bigframes.core.compile.test_only_try_evaluate(self.node)

def get_column_type(self, key: str) -> bigframes.dtypes.Dtype:
Expand Down Expand Up @@ -422,22 +421,7 @@ def relational_join(
l_mapping = { # Identity mapping, only rename right side
lcol.name: lcol.name for lcol in self.node.ids
}
r_mapping = { # Rename conflicting names
rcol.name: rcol.name
if (rcol.name not in l_mapping)
else bigframes.core.guid.generate_guid()
for rcol in other.node.ids
}
other_node = other.node
if set(other_node.ids) & set(self.node.ids):
other_node = nodes.SelectionNode(
other_node,
tuple(
(ex.deref(old_id), ids.ColumnId(new_id))
for old_id, new_id in r_mapping.items()
),
)

other_node, r_mapping = self.prepare_join_names(other)
join_node = nodes.JoinNode(
left_child=self.node,
right_child=other_node,
Expand All @@ -449,14 +433,63 @@ def relational_join(
)
return ArrayValue(join_node), (l_mapping, r_mapping)

def try_align_as_projection(
def try_row_join(
self,
other: ArrayValue,
conditions: typing.Tuple[typing.Tuple[str, str], ...] = (),
) -> Optional[
typing.Tuple[ArrayValue, typing.Tuple[dict[str, str], dict[str, str]]]
]:
l_mapping = { # Identity mapping, only rename right side
lcol.name: lcol.name for lcol in self.node.ids
}
other_node, r_mapping = self.prepare_join_names(other)
import bigframes.core.rewrite

result_node = bigframes.core.rewrite.try_join_as_projection(
self.node, other_node, conditions
)
if result_node is None:
return None

return (
ArrayValue(result_node),
(l_mapping, r_mapping),
)

def prepare_join_names(
self, other: ArrayValue
) -> Tuple[bigframes.core.nodes.BigFrameNode, dict[str, str]]:
if set(other.node.ids) & set(self.node.ids):
r_mapping = { # Rename conflicting names
rcol.name: rcol.name
if (rcol.name not in self.column_ids)
else bigframes.core.guid.generate_guid()
for rcol in other.node.ids
}
return (
nodes.SelectionNode(
other.node,
tuple(
(ex.deref(old_id), ids.ColumnId(new_id))
for old_id, new_id in r_mapping.items()
),
),
r_mapping,
)
else:
return other.node, {id: id for id in other.column_ids}

def try_legacy_row_join(
self,
other: ArrayValue,
join_type: join_def.JoinType,
join_keys: typing.Tuple[join_def.CoalescedColumnMapping, ...],
mappings: typing.Tuple[join_def.JoinColumnMapping, ...],
) -> typing.Optional[ArrayValue]:
result = bigframes.core.rewrite.join_as_projection(
import bigframes.core.rewrite

result = bigframes.core.rewrite.legacy_join_as_projection(
self.node, other.node, join_keys, mappings, join_type
)
if result is not None:
Expand Down Expand Up @@ -488,11 +521,4 @@ def _gen_namespaced_uid(self) -> str:
return self._gen_namespaced_uids(1)[0]

def _gen_namespaced_uids(self, n: int) -> List[str]:
i = len(self.node.defined_variables)
genned_ids: List[str] = []
while len(genned_ids) < n:
attempted_id = f"col_{i}"
if attempted_id not in self.node.defined_variables:
genned_ids.append(attempted_id)
i = i + 1
return genned_ids
return [ids.ColumnId.unique().name for _ in range(n)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this make the column IDs less deterministic than the previous logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not any less deterministic than we are now. If we want an isomorphism between query structure and output syntax, there is a bit more work that needs to be done to the system, which I think basically amounts to late binding identifiers serially through the tree.

40 changes: 36 additions & 4 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2341,7 +2341,9 @@ def join(
# Handle null index, which only supports row join
# This is the canonical way of aligning on null index, so always allow (ignore block_identity_join)
if self.index.nlevels == other.index.nlevels == 0:
result = try_row_join(self, other, how=how)
result = try_legacy_row_join(self, other, how=how) or try_new_row_join(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No how necessary in the new row join?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed now, as now emulating more recent pandas alignment logic, where the sides need to match exactly, without any view-like interpretations of filtered objects.

self, other
)
if result is not None:
return result
raise bigframes.exceptions.NullIndexError(
Expand All @@ -2354,7 +2356,9 @@ def join(
and (self.index.nlevels == other.index.nlevels)
and (self.index.dtypes == other.index.dtypes)
):
result = try_row_join(self, other, how=how)
result = try_legacy_row_join(self, other, how=how) or try_new_row_join(
self, other
)
if result is not None:
return result

Expand Down Expand Up @@ -2693,7 +2697,35 @@ def is_uniquely_named(self: BlockIndexProperties):
return len(set(self.names)) == len(self.names)


def try_row_join(
def try_new_row_join(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto, re try_row_join here, but perhaps in a separate PR since I see that it would make it harder to identify the one's that should be replaced with try_legacy_row_join if we did that.

left: Block, right: Block
) -> Optional[Tuple[Block, Tuple[Mapping[str, str], Mapping[str, str]],]]:
join_keys = tuple(
(left_id, right_id)
for left_id, right_id in zip(left.index_columns, right.index_columns)
)
join_result = left.expr.try_row_join(right.expr, join_keys)
if join_result is None: # did not succeed
return None
combined_expr, (get_column_left, get_column_right) = join_result
# Keep the left index column, and drop the matching right column
index_cols_post_join = [get_column_left[id] for id in left.index_columns]
combined_expr = combined_expr.drop_columns(
[get_column_right[id] for id in right.index_columns]
)
block = Block(
combined_expr,
index_columns=index_cols_post_join,
column_labels=left.column_labels.append(right.column_labels),
index_labels=left.index.names,
)
return (
block,
(get_column_left, get_column_right),
)


def try_legacy_row_join(
left: Block,
right: Block,
*,
Expand Down Expand Up @@ -2727,7 +2759,7 @@ def try_row_join(
)
for id in right.value_columns
]
combined_expr = left_expr.try_align_as_projection(
combined_expr = left_expr.try_legacy_row_join(
right_expr,
join_type=how,
join_keys=join_keys,
Expand Down
6 changes: 6 additions & 0 deletions bigframes/core/identifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import itertools
from typing import Generator

import bigframes.core.guid


def standard_id_strings(prefix: str = "col_") -> Generator[str, None, None]:
i = 0
Expand Down Expand Up @@ -47,6 +49,10 @@ def local_normalized(self) -> ColumnId:
def __lt__(self, other: ColumnId) -> bool:
return self.sql < other.sql

@classmethod
def unique(cls) -> ColumnId:
return ColumnId(name=bigframes.core.guid.generate_guid())


@dataclasses.dataclass(frozen=True)
class SerialColumnId(ColumnId):
Expand Down
Loading