- Notifications
You must be signed in to change notification settings - Fork126
Complete Fetch Phase (forINLINE disposition andJSON_ARRAY format)#594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
138c2ae3e3ab944a781650dac4aa1b794c7da5a6fe686ade431e6c8369ea23866d751771feef9ae9862fd8aa69edb139bcb977b12da615c00da04a6ea9d4568985c62d9bcdbeee9fa1c24c615267fd101271fcafbf26ea3ed7cf91dae15e3db5bbead5d36996137a3d75b07734494dcd4d0aeca7cece5e8977c060216d7ad97463b139e2464cb15fde3ee4e4f448a8f82ca1eee96a07827158b1dee47f7d3200c48a014f039c41ab2cd04df067a01948c83e0281a9e9192901d55f5c45edc36b581280e7c1d3be25ee4136b881ab053bf71545a32bee3fe299e8038d32f6ec1973bc282e385d5b484064e030edf84e07f1e65e7c6b30f8266033ae7333821f471b451a170f33940f79b5c038d5a3e22c6c716304be96e5b8787f1f7165c4f3a6e40d052e3088641c09b8bd12d8ffded6e227f6b368657a33940eec37813ba267c9f4296711947fd60d982fdf29e14d48be1997ee8e8ee705ee4e73ffa8982952d8d89e2aa0cbace3fc075b07c62f76d199402e8ac574b398ca70b1acc5bef2a7ee699942daf8f74e5540c5cefe388136ab59b1d57c99df6dac2ad0e527ed446a038e4b5c94879c01809956da5260c0385ffb23963fcdd4371534a7f66715cc13a0705bc1b90c4af7c11b9349c0216229848fd5235664e58b02903473b300709021ff4cadecd53bfc1f010a2cdfd90bb09ccd2238982e0f8be64b81b27564ca5ab9bbe1ab6e87f469c2468ec65fffd478ef6d873d28675f53578659871302322dc252390f59228308fe2712d1c984e8ee0ce144d50cc1e2242307a35f1ef0a515d2659b1330293e356dd40beb14057aca4d5bdb156421aeb1a9b4e9b13148ede41409a1b115e01e7b21c389d734321a9f0f96904a1936278b8cd91b7f7f7a5ae13f1776f361433318949d0c5eaded4eeed9a1f23388668ac4377fd0845ea7ff73563da71a018273c0e98f47343035ec500b60b3e91d7664e44db7b8e5f75f2b5e2d4ef521e3078bb015e6bb948a0921a8c1cc5203dcc868323f1fd930bdf8f928b4d7b245aa777d21ad14d10dcc14c562531a0e52e86e75570350984566cb172a5cd3511c449c21ff5e72100b9aab33a19a6db300135d33cc9db8bbb135fcFile filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -17,7 +17,7 @@ | ||
| if TYPE_CHECKING: | ||
| from databricks.sql.client import Cursor | ||
| from databricks.sql.backend.sea.result_set import SeaResultSet | ||
| from databricks.sql.backend.databricks_client import DatabricksClient | ||
| from databricks.sql.backend.types import ( | ||
| @@ -251,7 +251,7 @@ def close_session(self, session_id: SessionId) -> None: | ||
| logger.debug("SeaDatabricksClient.close_session(session_id=%s)", session_id) | ||
| if session_id.backend_type != BackendType.SEA: | ||
| raiseValueError("Not a valid SEA session ID") | ||
varun-edachali-dbx marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| sea_session_id = session_id.to_sea_session_id() | ||
| request_data = DeleteSessionRequest( | ||
| @@ -290,7 +290,7 @@ def get_allowed_session_configurations() -> List[str]: | ||
| def _extract_description_from_manifest( | ||
| self, manifest: ResultManifest | ||
| ) -> List[Tuple]: | ||
| """ | ||
| Extract column description from a manifest object, in the format defined by | ||
| the spec: https://peps.python.org/pep-0249/#description | ||
| @@ -299,15 +299,12 @@ def _extract_description_from_manifest( | ||
| manifest: The ResultManifest object containing schema information | ||
| Returns: | ||
| List[Tuple]: A list of column tuples | ||
| """ | ||
| schema_data = manifest.schema | ||
| columns_data = schema_data.get("columns", []) | ||
| columns = [] | ||
| for col_data in columns_data: | ||
| # Format: (name, type_code, display_size, internal_size, precision, scale, null_ok) | ||
| @@ -323,7 +320,7 @@ def _extract_description_from_manifest( | ||
| ) | ||
| ) | ||
| return columns | ||
| def _results_message_to_execute_response( | ||
| self, response: GetStatementResponse | ||
| @@ -429,7 +426,7 @@ def execute_command( | ||
| """ | ||
| if session_id.backend_type != BackendType.SEA: | ||
| raiseValueError("Not a valid SEA session ID") | ||
varun-edachali-dbx marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| sea_session_id = session_id.to_sea_session_id() | ||
| @@ -508,9 +505,11 @@ def cancel_command(self, command_id: CommandId) -> None: | ||
| """ | ||
| if command_id.backend_type != BackendType.SEA: | ||
| raiseValueError("Not a valid SEA command ID") | ||
| sea_statement_id = command_id.to_sea_statement_id() | ||
| if sea_statement_id is None: | ||
| raise ValueError("Not a valid SEA command ID") | ||
| request = CancelStatementRequest(statement_id=sea_statement_id) | ||
| self.http_client._make_request( | ||
| @@ -531,9 +530,11 @@ def close_command(self, command_id: CommandId) -> None: | ||
| """ | ||
| if command_id.backend_type != BackendType.SEA: | ||
| raiseValueError("Not a valid SEA command ID") | ||
| sea_statement_id = command_id.to_sea_statement_id() | ||
| if sea_statement_id is None: | ||
| raise ValueError("Not a valid SEA command ID") | ||
| request = CloseStatementRequest(statement_id=sea_statement_id) | ||
| self.http_client._make_request( | ||
| @@ -560,6 +561,8 @@ def get_query_state(self, command_id: CommandId) -> CommandState: | ||
| raise ValueError("Not a valid SEA command ID") | ||
| sea_statement_id = command_id.to_sea_statement_id() | ||
| if sea_statement_id is None: | ||
| raise ValueError("Not a valid SEA command ID") | ||
| request = GetStatementRequest(statement_id=sea_statement_id) | ||
| response_data = self.http_client._make_request( | ||
| @@ -592,9 +595,11 @@ def get_execution_result( | ||
| """ | ||
| if command_id.backend_type != BackendType.SEA: | ||
| raiseValueError("Not a valid SEA command ID") | ||
| sea_statement_id = command_id.to_sea_statement_id() | ||
| if sea_statement_id is None: | ||
| raise ValueError("Not a valid SEA command ID") | ||
| # Create the request model | ||
| request = GetStatementRequest(statement_id=sea_statement_id) | ||
| @@ -608,18 +613,18 @@ def get_execution_result( | ||
| response = GetStatementResponse.from_dict(response_data) | ||
| # Create and return a SeaResultSet | ||
| from databricks.sql.backend.sea.result_set import SeaResultSet | ||
| execute_response = self._results_message_to_execute_response(response) | ||
| return SeaResultSet( | ||
| connection=cursor.connection, | ||
| execute_response=execute_response, | ||
| sea_client=self, | ||
| result_data=response.result, | ||
| manifest=response.manifest, | ||
| buffer_size_bytes=cursor.buffer_size_bytes, | ||
| arraysize=cursor.arraysize, | ||
| ) | ||
| # == Metadata Operations == | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| from __future__ import annotations | ||
| from abc import ABC | ||
| from typing import List, Optional, Tuple | ||
| from databricks.sql.backend.sea.backend import SeaDatabricksClient | ||
| from databricks.sql.backend.sea.models.base import ResultData, ResultManifest | ||
| from databricks.sql.backend.sea.utils.constants import ResultFormat | ||
| from databricks.sql.exc import ProgrammingError | ||
| from databricks.sql.utils import ResultSetQueue | ||
| class SeaResultSetQueueFactory(ABC): | ||
| @staticmethod | ||
| def build_queue( | ||
| sea_result_data: ResultData, | ||
| manifest: ResultManifest, | ||
| statement_id: str, | ||
| description: List[Tuple] = [], | ||
| max_download_threads: Optional[int] = None, | ||
| sea_client: Optional[SeaDatabricksClient] = None, | ||
| lz4_compressed: bool = False, | ||
| ) -> ResultSetQueue: | ||
| """ | ||
| Factory method to build a result set queue for SEA backend. | ||
| Args: | ||
| sea_result_data (ResultData): Result data from SEA response | ||
| manifest (ResultManifest): Manifest from SEA response | ||
| statement_id (str): Statement ID for the query | ||
| description (List[List[Any]]): Column descriptions | ||
| max_download_threads (int): Maximum number of download threads | ||
| sea_client (SeaDatabricksClient): SEA client for fetching additional links | ||
| lz4_compressed (bool): Whether the data is LZ4 compressed | ||
| Returns: | ||
| ResultSetQueue: The appropriate queue for the result data | ||
| """ | ||
| if manifest.format == ResultFormat.JSON_ARRAY.value: | ||
| # INLINE disposition with JSON_ARRAY format | ||
| return JsonQueue(sea_result_data.data) | ||
| elif manifest.format == ResultFormat.ARROW_STREAM.value: | ||
| # EXTERNAL_LINKS disposition | ||
| raise NotImplementedError( | ||
| "EXTERNAL_LINKS disposition is not implemented for SEA backend" | ||
| ) | ||
| raise ProgrammingError("Invalid result format") | ||
varun-edachali-dbx marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| class JsonQueue(ResultSetQueue): | ||
| """Queue implementation for JSON_ARRAY format data.""" | ||
| def __init__(self, data_array: Optional[List[List[str]]]): | ||
| """Initialize with JSON array data.""" | ||
| self.data_array = data_array or [] | ||
| self.cur_row_index = 0 | ||
| self.num_rows = len(self.data_array) | ||
| def next_n_rows(self, num_rows: int) -> List[List[str]]: | ||
| """Get the next n rows from the data array.""" | ||
| length = min(num_rows, self.num_rows - self.cur_row_index) | ||
| slice = self.data_array[self.cur_row_index : self.cur_row_index + length] | ||
| self.cur_row_index += length | ||
| return slice | ||
| def remaining_rows(self) -> List[List[str]]: | ||
| """Get all remaining rows from the data array.""" | ||
| slice = self.data_array[self.cur_row_index :] | ||
| self.cur_row_index += len(slice) | ||
| return slice | ||
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.