@@ -64,7 +64,7 @@ def __init__(
6464 """
6565
6666self .connection = connection
67- self .backend = backend # Store the backend client directly
67+ self .backend = backend
6868self .arraysize = arraysize
6969self .buffer_size_bytes = buffer_size_bytes
7070self ._next_row_index = 0
@@ -115,12 +115,12 @@ def fetchall(self) -> List[Row]:
115115pass
116116
117117@abstractmethod
118- def fetchmany_arrow (self ,size :int )-> Any :
118+ def fetchmany_arrow (self ,size :int )-> "pyarrow.Table" :
119119"""Fetch the next set of rows as an Arrow table."""
120120pass
121121
122122@abstractmethod
123- def fetchall_arrow (self )-> Any :
123+ def fetchall_arrow (self )-> "pyarrow.Table" :
124124"""Fetch all remaining rows as an Arrow table."""
125125pass
126126
@@ -207,7 +207,7 @@ def _fill_results_buffer(self):
207207use_cloud_fetch = self ._use_cloud_fetch ,
208208 )
209209self .results = results
210- self ._has_more_rows = has_more_rows
210+ self .has_more_rows = has_more_rows
211211
212212def _convert_columnar_table (self ,table ):
213213column_names = [c [0 ]for c in self .description ]
@@ -291,7 +291,7 @@ def fetchmany_arrow(self, size: int) -> "pyarrow.Table":
291291while (
292292n_remaining_rows > 0
293293and not self .has_been_closed_server_side
294- and self ._has_more_rows
294+ and self .has_more_rows
295295 ):
296296self ._fill_results_buffer ()
297297partial_results = self .results .next_n_rows (n_remaining_rows )
@@ -316,7 +316,7 @@ def fetchmany_columnar(self, size: int):
316316while (
317317n_remaining_rows > 0
318318and not self .has_been_closed_server_side
319- and self ._has_more_rows
319+ and self .has_more_rows
320320 ):
321321self ._fill_results_buffer ()
322322partial_results = self .results .next_n_rows (n_remaining_rows )
@@ -331,7 +331,7 @@ def fetchall_arrow(self) -> "pyarrow.Table":
331331results = self .results .remaining_rows ()
332332self ._next_row_index += results .num_rows
333333
334- while not self .has_been_closed_server_side and self ._has_more_rows :
334+ while not self .has_been_closed_server_side and self .has_more_rows :
335335self ._fill_results_buffer ()
336336partial_results = self .results .remaining_rows ()
337337if isinstance (results ,ColumnTable )and isinstance (
@@ -357,7 +357,7 @@ def fetchall_columnar(self):
357357results = self .results .remaining_rows ()
358358self ._next_row_index += results .num_rows
359359
360- while not self .has_been_closed_server_side and self ._has_more_rows :
360+ while not self .has_been_closed_server_side and self .has_more_rows :
361361self ._fill_results_buffer ()
362362partial_results = self .results .remaining_rows ()
363363results = self .merge_columnar (results ,partial_results )
@@ -402,6 +402,33 @@ def fetchmany(self, size: int) -> List[Row]:
402402
403403@staticmethod
404404def _get_schema_description (table_schema_message ):
405+ """
406+ Takes a TableSchema message and returns a description 7-tuple as specified by PEP-249
407+ """
408+
409+ def map_col_type (type_ ):
410+ if type_ .startswith ("decimal" ):
411+ return "decimal"
412+ else :
413+ return type_
414+
415+ return [
416+ (column .name ,map_col_type (column .datatype ),None ,None ,None ,None ,None )
417+ for column in table_schema_message .columns
418+ ]
419+
420+
421+ class SeaResultSet (ResultSet ):
422+ """ResultSet implementation for the SEA backend."""
423+
424+ def __init__ (
425+ self ,
426+ connection :"Connection" ,
427+ execute_response :"ExecuteResponse" ,
428+ sea_client :"SeaDatabricksClient" ,
429+ buffer_size_bytes :int = 104857600 ,
430+ arraysize :int = 10000 ,
431+ ):
405432"""
406433 Initialize a SeaResultSet with the response from a SEA query execution.
407434
@@ -413,53 +440,19 @@ def _get_schema_description(table_schema_message):
413440 execute_response: Response from the execute command (new style)
414441 sea_response: Direct SEA response (legacy style)
415442 """
416- # Handle both initialization styles
417- if execute_response is not None :
418- # New style with ExecuteResponse
419- command_id = execute_response .command_id
420- status = execute_response .status
421- has_been_closed_server_side = execute_response .has_been_closed_server_side
422- has_more_rows = execute_response .has_more_rows
423- results_queue = execute_response .results_queue
424- description = execute_response .description
425- is_staging_operation = execute_response .is_staging_operation
426- self ._response = getattr (execute_response ,"sea_response" , {})
427- self .statement_id = command_id .to_sea_statement_id ()if command_id else None
428- elif sea_response is not None :
429- # Legacy style with direct sea_response
430- self ._response = sea_response
431- # Extract values from sea_response
432- command_id = CommandId .from_sea_statement_id (
433- sea_response .get ("statement_id" ,"" )
434- )
435- self .statement_id = sea_response .get ("statement_id" ,"" )
436-
437- # Extract status
438- status_data = sea_response .get ("status" , {})
439- status = CommandState .from_sea_state (status_data .get ("state" ,"PENDING" ))
440-
441- # Set defaults for other fields
442- has_been_closed_server_side = False
443- has_more_rows = False
444- results_queue = None
445- description = None
446- is_staging_operation = False
447- else :
448- raise ValueError ("Either execute_response or sea_response must be provided" )
449443
450- # Call parent constructor with common attributes
451444super ().__init__ (
452445connection = connection ,
453446backend = sea_client ,
454447arraysize = arraysize ,
455448buffer_size_bytes = buffer_size_bytes ,
456- command_id = command_id ,
457- status = status ,
458- has_been_closed_server_side = has_been_closed_server_side ,
459- has_more_rows = has_more_rows ,
460- results_queue = results_queue ,
461- description = description ,
462- is_staging_operation = is_staging_operation ,
449+ command_id = execute_response . command_id ,
450+ status = execute_response . status ,
451+ has_been_closed_server_side = execute_response . has_been_closed_server_side ,
452+ has_more_rows = execute_response . has_more_rows ,
453+ results_queue = execute_response . results_queue ,
454+ description = execute_response . description ,
455+ is_staging_operation = execute_response . is_staging_operation ,
463456 )
464457
465458def _fill_results_buffer (self ):