Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

SEA: Allow large metadata responses#653

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
varun-edachali-dbx merged 88 commits intomainfromrobust-metadata-sea
Aug 4, 2025
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
Show all changes
88 commits
Select commitHold shift + click to select a range
5bf5d4c
Separate Session related functionality from Connection class (#571)
varun-edachali-dbxMay 28, 2025
400a8bd
Introduce Backend Interface (DatabricksClient) (#573)
varun-edachali-dbxMay 30, 2025
3c78ed7
Implement ResultSet Abstraction (backend interfaces for fetch phase) …
varun-edachali-dbxJun 3, 2025
9625229
Introduce Sea HTTP Client and test script (#583)
varun-edachali-dbxJun 4, 2025
0887bc1
Introduce `SeaDatabricksClient` (Session Implementation) (#582)
varun-edachali-dbxJun 9, 2025
6d63df0
Normalise Execution Response (clean backend interfaces) (#587)
varun-edachali-dbxJun 11, 2025
ba8d9fd
Introduce models for `SeaDatabricksClient` (#595)
varun-edachali-dbxJun 12, 2025
bb3f15a
Introduce preliminary SEA Result Set (#588)
varun-edachali-dbxJun 12, 2025
19f1fae
Merge branch 'main' into sea-migration
varun-edachali-dbxJun 17, 2025
6c5ba6d
remove invalid ExecuteResponse import
varun-edachali-dbxJun 17, 2025
5e5147b
Separate Session related functionality from Connection class (#571)
varun-edachali-dbxMay 28, 2025
57370b3
Introduce Backend Interface (DatabricksClient) (#573)
varun-edachali-dbxMay 30, 2025
75752bf
Implement ResultSet Abstraction (backend interfaces for fetch phase) …
varun-edachali-dbxJun 3, 2025
450b80d
remove un-necessary initialisation assertions
varun-edachali-dbxJun 18, 2025
a926f02
remove un-necessary line break s
varun-edachali-dbxJun 18, 2025
55ad001
more un-necessary line breaks
varun-edachali-dbxJun 18, 2025
fa15730
constrain diff of test_closing_connection_closes_commands
varun-edachali-dbxJun 18, 2025
019c7fb
reduce diff of test_closing_connection_closes_commands
varun-edachali-dbxJun 18, 2025
726abe7
use pytest-like assertions for test_closing_connection_closes_commands
varun-edachali-dbxJun 18, 2025
bf6d41c
ensure command_id is not None
varun-edachali-dbxJun 18, 2025
5afa733
line breaks after multi-line pyfocs
varun-edachali-dbxJun 18, 2025
e3dfd36
ensure non null operationHandle for commandId creation
varun-edachali-dbxJun 18, 2025
63360b3
use command_id methods instead of explicit guid_to_hex_id conversion
varun-edachali-dbxJun 18, 2025
13ffb8d
remove un-necessary artifacts in test_session, add back assertion
varun-edachali-dbxJun 18, 2025
a74d279
Implement SeaDatabricksClient (Complete Execution Spec) (#590)
varun-edachali-dbxJun 18, 2025
d759050
add from __future__ import annotations to remove string literals arou…
varun-edachali-dbxJun 19, 2025
1e21434
move docstring of DatabricksClient within class
varun-edachali-dbxJun 24, 2025
cd4015b
move ThriftResultSet import to top of file
varun-edachali-dbxJun 24, 2025
ed8b610
make backend/utils __init__ file empty
varun-edachali-dbxJun 24, 2025
94d951e
use from __future__ import annotations to remove string literals arou…
varun-edachali-dbxJun 24, 2025
c20058e
use lazy logging
varun-edachali-dbxJun 24, 2025
fe3acb1
replace getters with property tag
varun-edachali-dbxJun 24, 2025
9fb6a76
Merge branch 'main' into backend-refactors
varun-edachali-dbxJun 24, 2025
61dfc4d
set active_command_id to None, not active_op_handle
varun-edachali-dbxJun 24, 2025
64fb9b2
align test_session with pytest instead of unittest
varun-edachali-dbxJun 24, 2025
cbf63f9
Merge branch 'main' into sea-migration
varun-edachali-dbxJun 26, 2025
59b4825
remove duplicate test, correct active_command_id attribute
varun-edachali-dbxJun 26, 2025
e380654
SeaDatabricksClient: Add Metadata Commands (#593)
varun-edachali-dbxJun 26, 2025
677a7b0
SEA volume operations fix: assign `manifest.is_volume_operation` to `…
varun-edachali-dbxJun 26, 2025
45585d4
Introduce manual SEA test scripts for Exec Phase (#589)
varun-edachali-dbxJun 27, 2025
70c7dc8
Complete Fetch Phase (for `INLINE` disposition and `JSON_ARRAY` forma…
varun-edachali-dbxJul 2, 2025
abf9aab
Merge branch 'main' into sea-migration
varun-edachali-dbxJul 3, 2025
9b4b606
Merge branch 'main' into backend-refactors
varun-edachali-dbxJul 3, 2025
4f11ff0
Introduce `row_limit` param (#607)
varun-edachali-dbxJul 7, 2025
45f5c26
Merge branch 'main' into backend-refactors
varun-edachali-dbxJul 10, 2025
2c9368a
formatting (black)
varun-edachali-dbxJul 10, 2025
9b1b1f5
remove repetition from Session.__init__
varun-edachali-dbxJul 10, 2025
77e23d3
Merge branch 'backend-refactors' into sea-migration
varun-edachali-dbxJul 11, 2025
3bd3aef
fix merge artifacts
varun-edachali-dbxJul 11, 2025
6d4701f
correct patch paths
varun-edachali-dbxJul 11, 2025
dc1cb6d
fix type issues
varun-edachali-dbxJul 14, 2025
5d04cd0
Merge branch 'main' into sea-migration
varun-edachali-dbxJul 15, 2025
922c448
explicitly close result queue
varun-edachali-dbxJul 15, 2025
1a0575a
Complete Fetch Phase (`EXTERNAL_LINKS` disposition and `ARROW` format…
varun-edachali-dbxJul 16, 2025
c07beb1
SEA Session Configuration Fix: Explicitly convert values to `str` (#…
varun-edachali-dbxJul 16, 2025
640cc82
SEA: add support for `Hybrid` disposition (#631)
varun-edachali-dbxJul 17, 2025
8fbca9d
SEA: Reduce network calls for synchronous commands (#633)
varun-edachali-dbxJul 19, 2025
806e5f5
SEA: Decouple Link Fetching (#632)
varun-edachali-dbxJul 21, 2025
b57c3f3
Chunk download latency (#634)
saishreeeeeJul 21, 2025
ef5836b
acquire lock before notif + formatting (black)
varun-edachali-dbxJul 21, 2025
4fd2a3f
Merge branch 'main' into sea-migration
varun-edachali-dbxJul 23, 2025
26f8947
fix imports
varun-edachali-dbxJul 23, 2025
2d44596
add get_chunk_link s
varun-edachali-dbxJul 23, 2025
99e7435
simplify description extraction
varun-edachali-dbxJul 23, 2025
54ec080
pass session_id_hex to ThriftResultSet
varun-edachali-dbxJul 23, 2025
f9f9f31
revert to main's extract description
varun-edachali-dbxJul 23, 2025
51cef2b
validate row count for sync query tests as well
varun-edachali-dbxJul 23, 2025
387102d
guid_hex -> hex_guid
varun-edachali-dbxJul 23, 2025
d53d1ea
reduce diff
varun-edachali-dbxJul 23, 2025
c7810aa
reduce diff
varun-edachali-dbxJul 23, 2025
b3072bd
reduce diff
varun-edachali-dbxJul 23, 2025
8be5264
set .value in compression
varun-edachali-dbxJul 23, 2025
80692e3
reduce diff
varun-edachali-dbxJul 23, 2025
83e45ae
is_direct_results -> has_more_rows
varun-edachali-dbxJul 25, 2025
89de17a
preliminary large metadata results
varun-edachali-dbxJul 27, 2025
d5ccf13
account for empty table in arrow table filter
varun-edachali-dbxJul 28, 2025
e6b256c
align flows
varun-edachali-dbxJul 28, 2025
20c9fbd
align flow of json with arrow
varun-edachali-dbxJul 28, 2025
a90bfeb
case sensitive support for arrow table
varun-edachali-dbxJul 28, 2025
11f2c54
remove un-necessary comment
varun-edachali-dbxJul 28, 2025
ea8ae9f
Merge branch 'main' into robust-metadata-sea
varun-edachali-dbxJul 29, 2025
12a5ff8
fix merge artifacts
varun-edachali-dbxJul 29, 2025
f126f4b
remove redundant method
varun-edachali-dbxJul 29, 2025
2b42ea0
remove incorrect docstring
varun-edachali-dbxJul 29, 2025
3ca9678
Merge branch 'main' into robust-metadata-sea
varun-edachali-dbxJul 31, 2025
81bf7db
Merge branch 'main' into robust-metadata-sea
varun-edachali-dbxAug 4, 2025
2f96e37
remove deepcopy
varun-edachali-dbxAug 4, 2025
2c5afe4
Merge branch 'main' into robust-metadata-sea
varun-edachali-dbxAug 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletionssrc/databricks/sql/backend/sea/backend.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -158,6 +158,7 @@ def __init__(
)

self.use_hybrid_disposition = kwargs.get("use_hybrid_disposition", True)
self.use_cloud_fetch = kwargs.get("use_cloud_fetch", True)

# Extract warehouse ID from http_path
self.warehouse_id = self._extract_warehouse_id(http_path)
Expand DownExpand Up@@ -694,7 +695,7 @@ def get_catalogs(
max_bytes=max_bytes,
lz4_compression=False,
cursor=cursor,
use_cloud_fetch=False,
use_cloud_fetch=self.use_cloud_fetch,
parameters=[],
async_op=False,
enforce_embedded_schema_correctness=False,
Expand DownExpand Up@@ -727,7 +728,7 @@ def get_schemas(
max_bytes=max_bytes,
lz4_compression=False,
cursor=cursor,
use_cloud_fetch=False,
use_cloud_fetch=self.use_cloud_fetch,
parameters=[],
async_op=False,
enforce_embedded_schema_correctness=False,
Expand DownExpand Up@@ -768,7 +769,7 @@ def get_tables(
max_bytes=max_bytes,
lz4_compression=False,
cursor=cursor,
use_cloud_fetch=False,
use_cloud_fetch=self.use_cloud_fetch,
parameters=[],
async_op=False,
enforce_embedded_schema_correctness=False,
Expand DownExpand Up@@ -815,7 +816,7 @@ def get_columns(
max_bytes=max_bytes,
lz4_compression=False,
cursor=cursor,
use_cloud_fetch=False,
use_cloud_fetch=self.use_cloud_fetch,
parameters=[],
async_op=False,
enforce_embedded_schema_correctness=False,
Expand Down
235 changes: 184 additions & 51 deletionssrc/databricks/sql/backend/sea/utils/filters.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -6,12 +6,12 @@

from __future__ import annotations

import io
import logging
from typing import (
List,
Optional,
Any,
Callable,
cast,
TYPE_CHECKING,
)
Expand All@@ -20,6 +20,16 @@
from databricks.sql.backend.sea.result_set import SeaResultSet

from databricks.sql.backend.types import ExecuteResponse
from databricks.sql.backend.sea.models.base import ResultData
from databricks.sql.backend.sea.backend import SeaDatabricksClient
from databricks.sql.utils import CloudFetchQueue, ArrowQueue

try:
import pyarrow
import pyarrow.compute as pc
except ImportError:
pyarrow = None
pc = None

logger = logging.getLogger(__name__)

Expand All@@ -30,32 +40,18 @@ class ResultSetFilter:
"""

@staticmethod
def _filter_sea_result_set(
result_set: SeaResultSet, filter_func: Callable[[List[Any]], bool]
) -> SeaResultSet:
def _create_execute_response(result_set: SeaResultSet) -> ExecuteResponse:
"""
Filter a SEA result set using theprovided filter function.
Create an ExecuteResponse with parameters from theoriginal result set.

Args:
result_set: The SEA result set to filter
filter_func: Function that takes a row and returns True if the row should be included
result_set: Original result set to copy parameters from

Returns:
A filtered SEA result set
ExecuteResponse: New execute response object
"""

# Get all remaining rows
all_rows = result_set.results.remaining_rows()

# Filter rows
filtered_rows = [row for row in all_rows if filter_func(row)]

# Reuse the command_id from the original result set
command_id = result_set.command_id

# Create an ExecuteResponse for the filtered data
execute_response = ExecuteResponse(
command_id=command_id,
return ExecuteResponse(
command_id=result_set.command_id,
status=result_set.status,
description=result_set.description,
has_been_closed_server_side=result_set.has_been_closed_server_side,
Expand All@@ -64,32 +60,145 @@ def _filter_sea_result_set(
is_staging_operation=False,
)

# Create a new ResultData object with filtered data
from databricks.sql.backend.sea.models.base import ResultData
@staticmethod
def _update_manifest(result_set: SeaResultSet, new_row_count: int):
"""
Create a copy of the manifest with updated row count.

Args:
result_set: Original result set to copy manifest from
new_row_count: New total row count for filtered data

result_data = ResultData(data=filtered_rows, external_links=None)
Returns:
Updated manifest copy
"""
filtered_manifest = result_set.manifest
filtered_manifest.total_row_count = new_row_count
return filtered_manifest

from databricks.sql.backend.sea.backend import SeaDatabricksClient
@staticmethod
def _create_filtered_result_set(
result_set: SeaResultSet,
result_data: ResultData,
row_count: int,
) -> "SeaResultSet":
"""
Create a new filtered SeaResultSet with the provided data.

Args:
result_set: Original result set to copy parameters from
result_data: New result data for the filtered set
row_count: Number of rows in the filtered data

Returns:
New filtered SeaResultSet
"""
from databricks.sql.backend.sea.result_set import SeaResultSet

# Create a new SeaResultSet with the filtered data
manifest = result_set.manifest
manifest.total_row_count = len(filtered_rows)
execute_response = ResultSetFilter._create_execute_response(result_set)
filtered_manifest = ResultSetFilter._update_manifest(result_set, row_count)

filtered_result_set = SeaResultSet(
return SeaResultSet(
connection=result_set.connection,
execute_response=execute_response,
sea_client=cast(SeaDatabricksClient, result_set.backend),
result_data=result_data,
manifest=manifest,
manifest=filtered_manifest,
buffer_size_bytes=result_set.buffer_size_bytes,
arraysize=result_set.arraysize,
)

return filtered_result_set
@staticmethod
def _filter_arrow_table(
table: Any, # pyarrow.Table
column_name: str,
allowed_values: List[str],
case_sensitive: bool = True,
) -> Any: # returns pyarrow.Table
"""
Filter a PyArrow table by column values.

Args:
table: The PyArrow table to filter
column_name: The name of the column to filter on
allowed_values: List of allowed values for the column
case_sensitive: Whether to perform case-sensitive comparison

Returns:
A filtered PyArrow table
"""
if not pyarrow:
raise ImportError("PyArrow is required for Arrow table filtering")

if table.num_rows == 0:
return table

# Handle case-insensitive filtering by normalizing both column and allowed values
if not case_sensitive:
# Convert allowed values to uppercase
allowed_values = [v.upper() for v in allowed_values]
# Get column values as uppercase
column = pc.utf8_upper(table[column_name])
else:
# Use column as-is
column = table[column_name]

# Convert allowed_values to PyArrow Array
allowed_array = pyarrow.array(allowed_values)

# Construct a boolean mask: True where column is in allowed_list
mask = pc.is_in(column, value_set=allowed_array)
return table.filter(mask)

@staticmethod
def _filter_arrow_result_set(
result_set: SeaResultSet,
column_index: int,
allowed_values: List[str],
case_sensitive: bool = True,
) -> SeaResultSet:
"""
Filter a SEA result set that contains Arrow tables.

Args:
result_set: The SEA result set to filter (containing Arrow data)
column_index: The index of the column to filter on
allowed_values: List of allowed values for the column
case_sensitive: Whether to perform case-sensitive comparison

Returns:
A filtered SEA result set
"""
# Validate column index and get column name
if column_index >= len(result_set.description):
raise ValueError(f"Column index {column_index} is out of bounds")
column_name = result_set.description[column_index][0]

# Get all remaining rows as Arrow table and filter it
arrow_table = result_set.results.remaining_rows()
filtered_table = ResultSetFilter._filter_arrow_table(
arrow_table, column_name, allowed_values, case_sensitive
)

# Convert the filtered table to Arrow stream format for ResultData
sink = io.BytesIO()
with pyarrow.ipc.new_stream(sink, filtered_table.schema) as writer:
writer.write_table(filtered_table)
arrow_stream_bytes = sink.getvalue()

# Create ResultData with attachment containing the filtered data
result_data = ResultData(
data=None, # No JSON data
external_links=None, # No external links
attachment=arrow_stream_bytes, # Arrow data as attachment
)

return ResultSetFilter._create_filtered_result_set(
result_set, result_data, filtered_table.num_rows
)

@staticmethod
deffilter_by_column_values(
def_filter_json_result_set(
result_set: SeaResultSet,
column_index: int,
allowed_values: List[str],
Expand All@@ -107,22 +216,35 @@ def filter_by_column_values(
Returns:
A filtered result set
"""
# Validate column index (optional - not in arrow version but good practice)
if column_index >= len(result_set.description):
raise ValueError(f"Column index {column_index} is out of bounds")

# Convert to uppercase for case-insensitive comparison if needed
# Extract rows
all_rows = result_set.results.remaining_rows()

# Convert allowed values if case-insensitive
if not case_sensitive:
allowed_values = [v.upper() for v in allowed_values]
# Helper lambda to get column value based on case sensitivity
get_column_value = (
lambda row: row[column_index].upper()
if not case_sensitive
else row[column_index]
)

# Filter rows based on allowed values
filtered_rows = [
row
for row in all_rows
if len(row) > column_index and get_column_value(row) in allowed_values
]

# Create filtered result set
result_data = ResultData(data=filtered_rows, external_links=None)

return ResultSetFilter._filter_sea_result_set(
result_set,
lambda row: (
len(row) > column_index
and (
row[column_index].upper()
if not case_sensitive
else row[column_index]
)
in allowed_values
),
return ResultSetFilter._create_filtered_result_set(
result_set, result_data, len(filtered_rows)
)

@staticmethod
Expand All@@ -143,14 +265,25 @@ def filter_tables_by_type(
Returns:
A filtered result set containing only tables of the specified types
"""

# Default table types if none specified
DEFAULT_TABLE_TYPES = ["TABLE", "VIEW", "SYSTEM TABLE"]
valid_types = (
table_types if table_types and len(table_types) > 0 else DEFAULT_TABLE_TYPES
)
valid_types = table_types if table_types else DEFAULT_TABLE_TYPES

# Check if we have an Arrow table (cloud fetch) or JSON data
# Table type is the 6th column (index 5)
return ResultSetFilter.filter_by_column_values(
result_set, 5, valid_types, case_sensitive=True
)
if isinstance(result_set.results, (CloudFetchQueue, ArrowQueue)):
# For Arrow tables, we need to handle filtering differently
return ResultSetFilter._filter_arrow_result_set(
result_set,
column_index=5,
allowed_values=valid_types,
case_sensitive=True,
)
else:
# For JSON data, use the existing filter method
return ResultSetFilter._filter_json_result_set(
result_set,
column_index=5,
allowed_values=valid_types,
case_sensitive=True,
)
Loading
Loading

[8]ページ先頭

©2009-2025 Movatter.jp