1+ from __future__import annotations
2+
13import logging
24import time
35import re
1012ResultDisposition ,
1113ResultCompression ,
1214WaitTimeout ,
15+ MetadataCommands ,
1316)
1417
1518if TYPE_CHECKING :
1619from databricks .sql .client import Cursor
17- from databricks .sql .result_set import ResultSet
20+ from databricks .sql .result_set import SeaResultSet
1821
1922from databricks .sql .backend .databricks_client import DatabricksClient
2023from databricks .sql .backend .types import (
2427BackendType ,
2528ExecuteResponse ,
2629)
27- from databricks .sql .exc import DatabaseError ,ServerOperationError
30+ from databricks .sql .exc import DatabaseError ,ProgrammingError , ServerOperationError
2831from databricks .sql .backend .sea .utils .http_client import SeaHttpClient
2932from databricks .sql .types import SSLOptions
3033
@@ -169,7 +172,7 @@ def _extract_warehouse_id(self, http_path: str) -> str:
169172f"Note: SEA only works for warehouses."
170173 )
171174logger .error (error_message )
172- raise ValueError (error_message )
175+ raise ProgrammingError (error_message )
173176
174177@property
175178def max_download_threads (self )-> int :
@@ -241,14 +244,14 @@ def close_session(self, session_id: SessionId) -> None:
241244 session_id: The session identifier returned by open_session()
242245
243246 Raises:
244- ValueError : If the session ID is invalid
247+ ProgrammingError : If the session ID is invalid
245248 OperationalError: If there's an error closing the session
246249 """
247250
248251logger .debug ("SeaDatabricksClient.close_session(session_id=%s)" ,session_id )
249252
250253if session_id .backend_type != BackendType .SEA :
251- raise ValueError ("Not a valid SEA session ID" )
254+ raise ProgrammingError ("Not a valid SEA session ID" )
252255sea_session_id = session_id .to_sea_session_id ()
253256
254257request_data = DeleteSessionRequest (
@@ -400,13 +403,13 @@ def execute_command(
400403max_rows :int ,
401404max_bytes :int ,
402405lz4_compression :bool ,
403- cursor :" Cursor" ,
406+ cursor :Cursor ,
404407use_cloud_fetch :bool ,
405408parameters :List [Dict [str ,Any ]],
406409async_op :bool ,
407410enforce_embedded_schema_correctness :bool ,
408411row_limit :Optional [int ]= None ,
409- )-> Union ["ResultSet" ,None ]:
412+ )-> Union [SeaResultSet ,None ]:
410413"""
411414 Execute a SQL command using the SEA backend.
412415
@@ -427,7 +430,7 @@ def execute_command(
427430 """
428431
429432if session_id .backend_type != BackendType .SEA :
430- raise ValueError ("Not a valid SEA session ID" )
433+ raise ProgrammingError ("Not a valid SEA session ID" )
431434
432435sea_session_id = session_id .to_sea_session_id ()
433436
@@ -502,11 +505,11 @@ def cancel_command(self, command_id: CommandId) -> None:
502505 command_id: Command identifier to cancel
503506
504507 Raises:
505- ValueError : If the command ID is invalid
508+ ProgrammingError : If the command ID is invalid
506509 """
507510
508511if command_id .backend_type != BackendType .SEA :
509- raise ValueError ("Not a valid SEA command ID" )
512+ raise ProgrammingError ("Not a valid SEA command ID" )
510513
511514sea_statement_id = command_id .to_sea_statement_id ()
512515
@@ -525,11 +528,11 @@ def close_command(self, command_id: CommandId) -> None:
525528 command_id: Command identifier to close
526529
527530 Raises:
528- ValueError : If the command ID is invalid
531+ ProgrammingError : If the command ID is invalid
529532 """
530533
531534if command_id .backend_type != BackendType .SEA :
532- raise ValueError ("Not a valid SEA command ID" )
535+ raise ProgrammingError ("Not a valid SEA command ID" )
533536
534537sea_statement_id = command_id .to_sea_statement_id ()
535538
@@ -551,7 +554,7 @@ def get_query_state(self, command_id: CommandId) -> CommandState:
551554 CommandState: The current state of the command
552555
553556 Raises:
554- ValueError : If the command ID is invalid
557+ ProgrammingError : If the command ID is invalid
555558 """
556559
557560if command_id .backend_type != BackendType .SEA :
@@ -573,8 +576,8 @@ def get_query_state(self, command_id: CommandId) -> CommandState:
573576def get_execution_result (
574577self ,
575578command_id :CommandId ,
576- cursor :" Cursor" ,
577- )-> "ResultSet" :
579+ cursor :Cursor ,
580+ )-> SeaResultSet :
578581"""
579582 Get the result of a command execution.
580583
@@ -583,14 +586,14 @@ def get_execution_result(
583586 cursor: Cursor executing the command
584587
585588 Returns:
586- ResultSet : A SeaResultSet instance with the execution results
589+ SeaResultSet : A SeaResultSet instance with the execution results
587590
588591 Raises:
589592 ValueError: If the command ID is invalid
590593 """
591594
592595if command_id .backend_type != BackendType .SEA :
593- raise ValueError ("Not a valid SEA command ID" )
596+ raise ProgrammingError ("Not a valid SEA command ID" )
594597
595598sea_statement_id = command_id .to_sea_statement_id ()
596599
@@ -627,47 +630,141 @@ def get_catalogs(
627630session_id :SessionId ,
628631max_rows :int ,
629632max_bytes :int ,
630- cursor :"Cursor" ,
631- ):
632- """Not implemented yet."""
633- raise NotImplementedError ("get_catalogs is not yet implemented for SEA backend" )
633+ cursor :Cursor ,
634+ )-> SeaResultSet :
635+ """Get available catalogs by executing 'SHOW CATALOGS'."""
636+ result = self .execute_command (
637+ operation = MetadataCommands .SHOW_CATALOGS .value ,
638+ session_id = session_id ,
639+ max_rows = max_rows ,
640+ max_bytes = max_bytes ,
641+ lz4_compression = False ,
642+ cursor = cursor ,
643+ use_cloud_fetch = False ,
644+ parameters = [],
645+ async_op = False ,
646+ enforce_embedded_schema_correctness = False ,
647+ )
648+ assert result is not None ,"execute_command returned None in synchronous mode"
649+ return result
634650
635651def get_schemas (
636652self ,
637653session_id :SessionId ,
638654max_rows :int ,
639655max_bytes :int ,
640- cursor :" Cursor" ,
656+ cursor :Cursor ,
641657catalog_name :Optional [str ]= None ,
642658schema_name :Optional [str ]= None ,
643- ):
644- """Not implemented yet."""
645- raise NotImplementedError ("get_schemas is not yet implemented for SEA backend" )
659+ )-> SeaResultSet :
660+ """Get schemas by executing 'SHOW SCHEMAS IN catalog [LIKE pattern]'."""
661+ if not catalog_name :
662+ raise DatabaseError ("Catalog name is required for get_schemas" )
663+
664+ operation = MetadataCommands .SHOW_SCHEMAS .value .format (catalog_name )
665+
666+ if schema_name :
667+ operation += MetadataCommands .LIKE_PATTERN .value .format (schema_name )
668+
669+ result = self .execute_command (
670+ operation = operation ,
671+ session_id = session_id ,
672+ max_rows = max_rows ,
673+ max_bytes = max_bytes ,
674+ lz4_compression = False ,
675+ cursor = cursor ,
676+ use_cloud_fetch = False ,
677+ parameters = [],
678+ async_op = False ,
679+ enforce_embedded_schema_correctness = False ,
680+ )
681+ assert result is not None ,"execute_command returned None in synchronous mode"
682+ return result
646683
647684def get_tables (
648685self ,
649686session_id :SessionId ,
650687max_rows :int ,
651688max_bytes :int ,
652- cursor :" Cursor" ,
689+ cursor :Cursor ,
653690catalog_name :Optional [str ]= None ,
654691schema_name :Optional [str ]= None ,
655692table_name :Optional [str ]= None ,
656693table_types :Optional [List [str ]]= None ,
657- ):
658- """Not implemented yet."""
659- raise NotImplementedError ("get_tables is not yet implemented for SEA backend" )
694+ )-> SeaResultSet :
695+ """Get tables by executing 'SHOW TABLES IN catalog [SCHEMA LIKE pattern] [LIKE pattern]'."""
696+ operation = (
697+ MetadataCommands .SHOW_TABLES_ALL_CATALOGS .value
698+ if catalog_name in [None ,"*" ,"%" ]
699+ else MetadataCommands .SHOW_TABLES .value .format (
700+ MetadataCommands .CATALOG_SPECIFIC .value .format (catalog_name )
701+ )
702+ )
703+
704+ if schema_name :
705+ operation += MetadataCommands .SCHEMA_LIKE_PATTERN .value .format (schema_name )
706+
707+ if table_name :
708+ operation += MetadataCommands .LIKE_PATTERN .value .format (table_name )
709+
710+ result = self .execute_command (
711+ operation = operation ,
712+ session_id = session_id ,
713+ max_rows = max_rows ,
714+ max_bytes = max_bytes ,
715+ lz4_compression = False ,
716+ cursor = cursor ,
717+ use_cloud_fetch = False ,
718+ parameters = [],
719+ async_op = False ,
720+ enforce_embedded_schema_correctness = False ,
721+ )
722+ assert result is not None ,"execute_command returned None in synchronous mode"
723+
724+ # Apply client-side filtering by table_types
725+ from databricks .sql .backend .sea .utils .filters import ResultSetFilter
726+
727+ result = ResultSetFilter .filter_tables_by_type (result ,table_types )
728+
729+ return result
660730
661731def get_columns (
662732self ,
663733session_id :SessionId ,
664734max_rows :int ,
665735max_bytes :int ,
666- cursor :" Cursor" ,
736+ cursor :Cursor ,
667737catalog_name :Optional [str ]= None ,
668738schema_name :Optional [str ]= None ,
669739table_name :Optional [str ]= None ,
670740column_name :Optional [str ]= None ,
671- ):
672- """Not implemented yet."""
673- raise NotImplementedError ("get_columns is not yet implemented for SEA backend" )
741+ )-> SeaResultSet :
742+ """Get columns by executing 'SHOW COLUMNS IN CATALOG catalog [SCHEMA LIKE pattern] [TABLE LIKE pattern] [LIKE pattern]'."""
743+ if not catalog_name :
744+ raise DatabaseError ("Catalog name is required for get_columns" )
745+
746+ operation = MetadataCommands .SHOW_COLUMNS .value .format (catalog_name )
747+
748+ if schema_name :
749+ operation += MetadataCommands .SCHEMA_LIKE_PATTERN .value .format (schema_name )
750+
751+ if table_name :
752+ operation += MetadataCommands .TABLE_LIKE_PATTERN .value .format (table_name )
753+
754+ if column_name :
755+ operation += MetadataCommands .LIKE_PATTERN .value .format (column_name )
756+
757+ result = self .execute_command (
758+ operation = operation ,
759+ session_id = session_id ,
760+ max_rows = max_rows ,
761+ max_bytes = max_bytes ,
762+ lz4_compression = False ,
763+ cursor = cursor ,
764+ use_cloud_fetch = False ,
765+ parameters = [],
766+ async_op = False ,
767+ enforce_embedded_schema_correctness = False ,
768+ )
769+ assert result is not None ,"execute_command returned None in synchronous mode"
770+ return result