- Notifications
You must be signed in to change notification settings - Fork126
[PECOBLR-201] add variant support#560
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Merged
Uh oh!
There was an error while loading.Please reload this page.
Merged
Changes fromall commits
Commits
Show all changes
7 commits Select commitHold shift + click to select a range
b834ce7 add variant support
shivam26803e8cce3 add extensive tests for data types
shivam2680d0e39ec addressed comments
shivam26805354788 Merge branch 'main' into variant
shivam2680177c197 resolve merge conflicts
shivam26800d122e8 variant type detection
shivam26806880834 address comments
shivam2680File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Jump to file
Failed to load files.
Loading
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
55 changes: 42 additions & 13 deletionssrc/databricks/sql/backend/thrift_backend.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -735,7 +735,7 @@ def convert_col(t_column_desc): | ||
| return pyarrow.schema([convert_col(col) for col in t_table_schema.columns]) | ||
| @staticmethod | ||
| def _col_to_description(col,field=None,session_id_hex=None): | ||
| type_entry = col.typeDesc.types[0] | ||
| if type_entry.primitiveEntry: | ||
| @@ -764,12 +764,39 @@ def _col_to_description(col, session_id_hex=None): | ||
| else: | ||
| precision, scale = None, None | ||
| # Extract variant type from field if available | ||
| if field is not None: | ||
| try: | ||
| # Check for variant type in metadata | ||
| if field.metadata and b"Spark:DataType:SqlName" in field.metadata: | ||
| sql_type = field.metadata.get(b"Spark:DataType:SqlName") | ||
| if sql_type == b"VARIANT": | ||
| cleaned_type = "variant" | ||
| except Exception as e: | ||
| logger.debug(f"Could not extract variant type from field: {e}") | ||
| return col.columnName, cleaned_type, None, None, precision, scale, None | ||
| @staticmethod | ||
| def _hive_schema_to_description( | ||
| t_table_schema, schema_bytes=None, session_id_hex=None | ||
| ): | ||
| field_dict = {} | ||
| if pyarrow and schema_bytes: | ||
| try: | ||
| arrow_schema = pyarrow.ipc.read_schema(pyarrow.py_buffer(schema_bytes)) | ||
| # Build a dictionary mapping column names to fields | ||
| for field in arrow_schema: | ||
| field_dict[field.name] = field | ||
| except Exception as e: | ||
| logger.debug(f"Could not parse arrow schema: {e}") | ||
| return [ | ||
| ThriftDatabricksClient._col_to_description( | ||
| col, | ||
| field_dict.get(col.columnName) if field_dict else None, | ||
| session_id_hex, | ||
| ) | ||
| for col in t_table_schema.columns | ||
shivam2680 marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| ] | ||
| @@ -802,11 +829,6 @@ def _results_message_to_execute_response(self, resp, operation_state): | ||
| or direct_results.resultSet.hasMoreRows | ||
| ) | ||
| if pyarrow: | ||
| schema_bytes = ( | ||
| t_result_set_metadata_resp.arrowSchema | ||
| @@ -819,6 +841,12 @@ def _results_message_to_execute_response(self, resp, operation_state): | ||
| else: | ||
| schema_bytes = None | ||
| description = self._hive_schema_to_description( | ||
| t_result_set_metadata_resp.schema, | ||
| schema_bytes, | ||
| self._session_id_hex, | ||
| ) | ||
| lz4_compressed = t_result_set_metadata_resp.lz4Compressed | ||
| command_id = CommandId.from_thrift_handle(resp.operationHandle) | ||
| @@ -863,11 +891,6 @@ def get_execution_result( | ||
| t_result_set_metadata_resp = resp.resultSetMetadata | ||
| if pyarrow: | ||
| schema_bytes = ( | ||
| t_result_set_metadata_resp.arrowSchema | ||
| @@ -880,6 +903,12 @@ def get_execution_result( | ||
| else: | ||
| schema_bytes = None | ||
| description = self._hive_schema_to_description( | ||
| t_result_set_metadata_resp.schema, | ||
| schema_bytes, | ||
| self._session_id_hex, | ||
| ) | ||
| lz4_compressed = t_result_set_metadata_resp.lz4Compressed | ||
| is_staging_operation = t_result_set_metadata_resp.isStagingOperation | ||
| has_more_rows = resp.hasMoreRows | ||
91 changes: 91 additions & 0 deletionstests/e2e/test_variant_types.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,91 @@ | ||
| import pytest | ||
| from datetime import datetime | ||
| import json | ||
| try: | ||
| import pyarrow | ||
| except ImportError: | ||
| pyarrow = None | ||
| from tests.e2e.test_driver import PySQLPytestTestCase | ||
| from tests.e2e.common.predicates import pysql_supports_arrow | ||
| @pytest.mark.skipif(not pysql_supports_arrow(), reason="Requires arrow support") | ||
| class TestVariantTypes(PySQLPytestTestCase): | ||
shivam2680 marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| """Tests for the proper detection and handling of VARIANT type columns""" | ||
| @pytest.fixture(scope="class") | ||
| def variant_table(self, connection_details): | ||
| """A pytest fixture that creates a test table and cleans up after tests""" | ||
| self.arguments = connection_details.copy() | ||
| table_name = "pysql_test_variant_types_table" | ||
| with self.cursor() as cursor: | ||
| try: | ||
| # Create the table with variant columns | ||
| cursor.execute( | ||
| """ | ||
| CREATE TABLE IF NOT EXISTS pysql_test_variant_types_table ( | ||
| id INTEGER, | ||
| variant_col VARIANT, | ||
| regular_string_col STRING | ||
| ) | ||
| """ | ||
| ) | ||
| # Insert test records with different variant values | ||
| cursor.execute( | ||
| """ | ||
| INSERT INTO pysql_test_variant_types_table | ||
| VALUES | ||
| (1, PARSE_JSON('{"name": "John", "age": 30}'), 'regular string'), | ||
| (2, PARSE_JSON('[1, 2, 3, 4]'), 'another string') | ||
| """ | ||
| ) | ||
| yield table_name | ||
| finally: | ||
| cursor.execute(f"DROP TABLE IF EXISTS {table_name}") | ||
| def test_variant_type_detection(self, variant_table): | ||
| """Test that VARIANT type columns are properly detected in schema""" | ||
| with self.cursor() as cursor: | ||
| cursor.execute(f"SELECT * FROM {variant_table} LIMIT 0") | ||
| # Verify column types in description | ||
| assert ( | ||
| cursor.description[0][1] == "int" | ||
| ), "Integer column type not correctly identified" | ||
| assert ( | ||
| cursor.description[1][1] == "variant" | ||
| ), "VARIANT column type not correctly identified" | ||
| assert ( | ||
| cursor.description[2][1] == "string" | ||
| ), "String column type not correctly identified" | ||
| def test_variant_data_retrieval(self, variant_table): | ||
| """Test that VARIANT data is properly retrieved and can be accessed as JSON""" | ||
| with self.cursor() as cursor: | ||
| cursor.execute(f"SELECT * FROM {variant_table} ORDER BY id") | ||
| rows = cursor.fetchall() | ||
| # First row should have a JSON object | ||
| json_obj = rows[0][1] | ||
| assert isinstance( | ||
| json_obj, str | ||
| ), "VARIANT column should be returned as string" | ||
| parsed = json.loads(json_obj) | ||
| assert parsed.get("name") == "John" | ||
| assert parsed.get("age") == 30 | ||
| # Second row should have a JSON array | ||
| json_array = rows[1][1] | ||
| assert isinstance( | ||
| json_array, str | ||
| ), "VARIANT array should be returned as string" | ||
| # Parsing to verify it's valid JSON array | ||
| parsed_array = json.loads(json_array) | ||
| assert isinstance(parsed_array, list) | ||
| assert parsed_array == [1, 2, 3, 4] | ||
82 changes: 81 additions & 1 deletiontests/unit/test_thrift_backend.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.Learn more about bidirectional Unicode characters
Oops, something went wrong.
Uh oh!
There was an error while loading.Please reload this page.
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.