- Notifications
You must be signed in to change notification settings - Fork126
SeaDatabricksClient: Add Metadata Commands#593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
138c2ae3e3ab944a781650dac4aa1b794c7da5a6fe686ade431e6c8369ea23866d751771feef9ae9862fd8aa69edb139bcb977b12da615c00da04a6ea9d4568985c62d9bcdbeee9fa1c24c615267fd101271fcafbf26ea3ed7cf91dae15e3db5bbead5d36996137a3d75b07734494dcd4d0aeca7cece5e8977c060216d7a4cb15fddee47f7e385d5b484064e030edf830f8266033ae7333821f43e22c6c787f1f7165c4f3a6e40d052e3088641c09b8bd12d8ffded6e227f6b368657a33940eec37813ba267c9f4296711947fd60d982fdf29e14d48be1997ee8e8ee705ee4e73ffa8982952d8d89e2aa0cbace3fc075b07c62f76d199402e8ac574b398ca70b1acc5bef2a7ee699942daf8f74e5540c5cefe388136ab59b1d57c99df6dac2ad0e527ed446a038e4b5c94879c01809956da5260c0385ffb349c0216229848fd5235664e58b00a2cdfd90bb09ccd2238982e0f8be64b81b5ab9bbe1ab6e87f469c2468ec65fffd478ef6d873d28675f53578659871302322dc252390f59235f1ef0a515d2659b1330293e356dd40beb14057aca4d5bdbe9b13148ede41409a1b11File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,5 @@ | ||
| from __future__ import annotations | ||
| import logging | ||
| import time | ||
| import re | ||
| @@ -10,11 +12,12 @@ | ||
| ResultDisposition, | ||
| ResultCompression, | ||
| WaitTimeout, | ||
| MetadataCommands, | ||
| ) | ||
| if TYPE_CHECKING: | ||
| from databricks.sql.client import Cursor | ||
| from databricks.sql.result_set importSeaResultSet | ||
| from databricks.sql.backend.databricks_client import DatabricksClient | ||
| from databricks.sql.backend.types import ( | ||
| @@ -24,7 +27,7 @@ | ||
| BackendType, | ||
| ExecuteResponse, | ||
| ) | ||
| from databricks.sql.exc import DatabaseError,ProgrammingError,ServerOperationError | ||
| from databricks.sql.backend.sea.utils.http_client import SeaHttpClient | ||
| from databricks.sql.types import SSLOptions | ||
| @@ -169,7 +172,7 @@ def _extract_warehouse_id(self, http_path: str) -> str: | ||
| f"Note: SEA only works for warehouses." | ||
| ) | ||
| logger.error(error_message) | ||
| raiseProgrammingError(error_message) | ||
| @property | ||
| def max_download_threads(self) -> int: | ||
| @@ -241,14 +244,14 @@ def close_session(self, session_id: SessionId) -> None: | ||
| session_id: The session identifier returned by open_session() | ||
| Raises: | ||
| ProgrammingError: If the session ID is invalid | ||
| OperationalError: If there's an error closing the session | ||
| """ | ||
| logger.debug("SeaDatabricksClient.close_session(session_id=%s)", session_id) | ||
| if session_id.backend_type != BackendType.SEA: | ||
| raiseProgrammingError("Not a valid SEA session ID") | ||
| sea_session_id = session_id.to_sea_session_id() | ||
| request_data = DeleteSessionRequest( | ||
| @@ -400,12 +403,12 @@ def execute_command( | ||
| max_rows: int, | ||
| max_bytes: int, | ||
| lz4_compression: bool, | ||
| cursor: Cursor, | ||
| use_cloud_fetch: bool, | ||
| parameters: List[Dict[str, Any]], | ||
| async_op: bool, | ||
| enforce_embedded_schema_correctness: bool, | ||
| ) -> Union[SeaResultSet, None]: | ||
| """ | ||
| Execute a SQL command using the SEA backend. | ||
| @@ -426,7 +429,7 @@ def execute_command( | ||
| """ | ||
| if session_id.backend_type != BackendType.SEA: | ||
| raiseProgrammingError("Not a valid SEA session ID") | ||
| sea_session_id = session_id.to_sea_session_id() | ||
| @@ -501,11 +504,11 @@ def cancel_command(self, command_id: CommandId) -> None: | ||
| command_id: Command identifier to cancel | ||
| Raises: | ||
| ProgrammingError: If the command ID is invalid | ||
| """ | ||
| if command_id.backend_type != BackendType.SEA: | ||
| raiseProgrammingError("Not a valid SEA command ID") | ||
| sea_statement_id = command_id.to_sea_statement_id() | ||
| @@ -524,11 +527,11 @@ def close_command(self, command_id: CommandId) -> None: | ||
| command_id: Command identifier to close | ||
| Raises: | ||
| ProgrammingError: If the command ID is invalid | ||
| """ | ||
| if command_id.backend_type != BackendType.SEA: | ||
| raiseProgrammingError("Not a valid SEA command ID") | ||
| sea_statement_id = command_id.to_sea_statement_id() | ||
| @@ -550,7 +553,7 @@ def get_query_state(self, command_id: CommandId) -> CommandState: | ||
| CommandState: The current state of the command | ||
| Raises: | ||
| ProgrammingError: If the command ID is invalid | ||
| """ | ||
| if command_id.backend_type != BackendType.SEA: | ||
| @@ -572,8 +575,8 @@ def get_query_state(self, command_id: CommandId) -> CommandState: | ||
| def get_execution_result( | ||
| self, | ||
| command_id: CommandId, | ||
| cursor: Cursor, | ||
| ) ->SeaResultSet: | ||
| """ | ||
| Get the result of a command execution. | ||
| @@ -582,14 +585,14 @@ def get_execution_result( | ||
| cursor: Cursor executing the command | ||
| Returns: | ||
| SeaResultSet: A SeaResultSet instance with the execution results | ||
| Raises: | ||
| ValueError: If the command ID is invalid | ||
| """ | ||
| if command_id.backend_type != BackendType.SEA: | ||
| raiseProgrammingError("Not a valid SEA command ID") | ||
| sea_statement_id = command_id.to_sea_statement_id() | ||
| @@ -626,47 +629,141 @@ def get_catalogs( | ||
| session_id: SessionId, | ||
| max_rows: int, | ||
| max_bytes: int, | ||
| cursor: Cursor, | ||
| ) -> SeaResultSet: | ||
| """Get available catalogs by executing 'SHOW CATALOGS'.""" | ||
| result = self.execute_command( | ||
| operation=MetadataCommands.SHOW_CATALOGS.value, | ||
| session_id=session_id, | ||
| max_rows=max_rows, | ||
| max_bytes=max_bytes, | ||
| lz4_compression=False, | ||
Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. not using compression for metadata? ContributorAuthor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. This is a side effect of setting | ||
| cursor=cursor, | ||
| use_cloud_fetch=False, | ||
| parameters=[], | ||
| async_op=False, | ||
| enforce_embedded_schema_correctness=False, | ||
Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. this is a thrift-specific param? ContributorAuthor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Yes, but it is a param passed to | ||
| ) | ||
| assert result is not None, "execute_command returned None in synchronous mode" | ||
| return result | ||
| def get_schemas( | ||
| self, | ||
| session_id: SessionId, | ||
| max_rows: int, | ||
| max_bytes: int, | ||
| cursor: Cursor, | ||
| catalog_name: Optional[str] = None, | ||
| schema_name: Optional[str] = None, | ||
| ) -> SeaResultSet: | ||
| """Get schemas by executing 'SHOW SCHEMAS IN catalog [LIKE pattern]'.""" | ||
| if not catalog_name: | ||
| raise DatabaseError("Catalog name is required for get_schemas") | ||
| operation = MetadataCommands.SHOW_SCHEMAS.value.format(catalog_name) | ||
| if schema_name: | ||
| operation += MetadataCommands.LIKE_PATTERN.value.format(schema_name) | ||
| result = self.execute_command( | ||
| operation=operation, | ||
| session_id=session_id, | ||
| max_rows=max_rows, | ||
| max_bytes=max_bytes, | ||
| lz4_compression=False, | ||
| cursor=cursor, | ||
| use_cloud_fetch=False, | ||
| parameters=[], | ||
| async_op=False, | ||
| enforce_embedded_schema_correctness=False, | ||
| ) | ||
| assert result is not None, "execute_command returned None in synchronous mode" | ||
| return result | ||
| def get_tables( | ||
| self, | ||
| session_id: SessionId, | ||
| max_rows: int, | ||
| max_bytes: int, | ||
| cursor: Cursor, | ||
| catalog_name: Optional[str] = None, | ||
| schema_name: Optional[str] = None, | ||
| table_name: Optional[str] = None, | ||
| table_types: Optional[List[str]] = None, | ||
| ) -> SeaResultSet: | ||
| """Get tables by executing 'SHOW TABLES IN catalog [SCHEMA LIKE pattern] [LIKE pattern]'.""" | ||
| operation = ( | ||
| MetadataCommands.SHOW_TABLES_ALL_CATALOGS.value | ||
| if catalog_name in [None, "*", "%"] | ||
| else MetadataCommands.SHOW_TABLES.value.format( | ||
| MetadataCommands.CATALOG_SPECIFIC.value.format(catalog_name) | ||
| ) | ||
| ) | ||
| if schema_name: | ||
| operation += MetadataCommands.SCHEMA_LIKE_PATTERN.value.format(schema_name) | ||
| if table_name: | ||
| operation += MetadataCommands.LIKE_PATTERN.value.format(table_name) | ||
| result = self.execute_command( | ||
| operation=operation, | ||
| session_id=session_id, | ||
| max_rows=max_rows, | ||
| max_bytes=max_bytes, | ||
| lz4_compression=False, | ||
| cursor=cursor, | ||
| use_cloud_fetch=False, | ||
| parameters=[], | ||
| async_op=False, | ||
| enforce_embedded_schema_correctness=False, | ||
| ) | ||
| assert result is not None, "execute_command returned None in synchronous mode" | ||
| # Apply client-side filtering by table_types | ||
| from databricks.sql.backend.sea.utils.filters import ResultSetFilter | ||
| result = ResultSetFilter.filter_tables_by_type(result, table_types) | ||
| return result | ||
| def get_columns( | ||
| self, | ||
| session_id: SessionId, | ||
| max_rows: int, | ||
| max_bytes: int, | ||
| cursor: Cursor, | ||
| catalog_name: Optional[str] = None, | ||
| schema_name: Optional[str] = None, | ||
| table_name: Optional[str] = None, | ||
| column_name: Optional[str] = None, | ||
| ) -> SeaResultSet: | ||
| """Get columns by executing 'SHOW COLUMNS IN CATALOG catalog [SCHEMA LIKE pattern] [TABLE LIKE pattern] [LIKE pattern]'.""" | ||
| if not catalog_name: | ||
| raise DatabaseError("Catalog name is required for get_columns") | ||
| operation = MetadataCommands.SHOW_COLUMNS.value.format(catalog_name) | ||
| if schema_name: | ||
| operation += MetadataCommands.SCHEMA_LIKE_PATTERN.value.format(schema_name) | ||
| if table_name: | ||
| operation += MetadataCommands.TABLE_LIKE_PATTERN.value.format(table_name) | ||
| if column_name: | ||
| operation += MetadataCommands.LIKE_PATTERN.value.format(column_name) | ||
| result = self.execute_command( | ||
| operation=operation, | ||
| session_id=session_id, | ||
| max_rows=max_rows, | ||
| max_bytes=max_bytes, | ||
| lz4_compression=False, | ||
| cursor=cursor, | ||
| use_cloud_fetch=False, | ||
| parameters=[], | ||
| async_op=False, | ||
| enforce_embedded_schema_correctness=False, | ||
| ) | ||
| assert result is not None, "execute_command returned None in synchronous mode" | ||
| return result | ||
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.