Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9e67c79

Browse files
authored
streaming ingestion support for PUT operation (#643)
* streaming put supoprtSigned-off-by: Sreekanth Vadigi <sreekanth.vadigi@databricks.com>* code formattingSigned-off-by: Sreekanth Vadigi <sreekanth.vadigi@databricks.com>* simplifying testsSigned-off-by: Sreekanth Vadigi <sreekanth.vadigi@databricks.com>---------Signed-off-by: Sreekanth Vadigi <sreekanth.vadigi@databricks.com>
1 parente2a972e commit9e67c79

File tree

5 files changed

+282
-14
lines changed

5 files changed

+282
-14
lines changed

‎examples/streaming_put.py‎

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Simple example of streaming PUT operations.
4+
5+
This demonstrates the basic usage of streaming PUT with the __input_stream__ token.
6+
"""
7+
8+
importio
9+
importos
10+
fromdatabricksimportsql
11+
12+
withsql.connect(
13+
server_hostname=os.getenv("DATABRICKS_SERVER_HOSTNAME"),
14+
http_path=os.getenv("DATABRICKS_HTTP_PATH"),
15+
access_token=os.getenv("DATABRICKS_TOKEN"),
16+
)asconnection:
17+
18+
withconnection.cursor()ascursor:
19+
# Create a simple data stream
20+
data=b"Hello, streaming world!"
21+
stream=io.BytesIO(data)
22+
23+
# Get catalog, schema, and volume from environment variables
24+
catalog=os.getenv("DATABRICKS_CATALOG")
25+
schema=os.getenv("DATABRICKS_SCHEMA")
26+
volume=os.getenv("DATABRICKS_VOLUME")
27+
28+
# Upload to Unity Catalog volume
29+
cursor.execute(
30+
f"PUT '__input_stream__' INTO '/Volumes/{catalog}/{schema}/{volume}/hello.txt' OVERWRITE",
31+
input_stream=stream
32+
)
33+
34+
print("File uploaded successfully!")

‎src/databricks/sql/client.py‎

Lines changed: 68 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
importtime
2-
fromtypingimportDict,Tuple,List,Optional,Any,Union,Sequence
2+
fromtypingimportDict,Tuple,List,Optional,Any,Union,Sequence,BinaryIO
33
importpandas
44

55
try:
@@ -662,7 +662,9 @@ def _check_not_closed(self):
662662
)
663663

664664
def_handle_staging_operation(
665-
self,staging_allowed_local_path:Union[None,str,List[str]]
665+
self,
666+
staging_allowed_local_path:Union[None,str,List[str]],
667+
input_stream:Optional[BinaryIO]=None,
666668
):
667669
"""Fetch the HTTP request instruction from a staging ingestion command
668670
and call the designated handler.
@@ -671,6 +673,28 @@ def _handle_staging_operation(
671673
is not descended from staging_allowed_local_path.
672674
"""
673675

676+
assertself.active_result_setisnotNone
677+
row=self.active_result_set.fetchone()
678+
assertrowisnotNone
679+
680+
# May be real headers, or could be json string
681+
headers= (
682+
json.loads(row.headers)ifisinstance(row.headers,str)elserow.headers
683+
)
684+
headers=dict(headers)ifheaderselse {}
685+
686+
# Handle __input_stream__ token for PUT operations
687+
if (
688+
row.operation=="PUT"
689+
andgetattr(row,"localFile",None)=="__input_stream__"
690+
):
691+
returnself._handle_staging_put_stream(
692+
presigned_url=row.presignedUrl,
693+
stream=input_stream,
694+
headers=headers,
695+
)
696+
697+
# For non-streaming operations, validate staging_allowed_local_path
674698
ifisinstance(staging_allowed_local_path,type(str())):
675699
_staging_allowed_local_paths= [staging_allowed_local_path]
676700
elifisinstance(staging_allowed_local_path,type(list())):
@@ -685,10 +709,6 @@ def _handle_staging_operation(
685709
os.path.abspath(i)foriin_staging_allowed_local_paths
686710
]
687711

688-
assertself.active_result_setisnotNone
689-
row=self.active_result_set.fetchone()
690-
assertrowisnotNone
691-
692712
# Must set to None in cases where server response does not include localFile
693713
abs_localFile=None
694714

@@ -711,19 +731,16 @@ def _handle_staging_operation(
711731
session_id_hex=self.connection.get_session_id_hex(),
712732
)
713733

714-
# May be real headers, or could be json string
715-
headers= (
716-
json.loads(row.headers)ifisinstance(row.headers,str)elserow.headers
717-
)
718-
719734
handler_args= {
720735
"presigned_url":row.presignedUrl,
721736
"local_file":abs_localFile,
722-
"headers":dict(headers)or {},
737+
"headers":headers,
723738
}
724739

725740
logger.debug(
726-
f"Attempting staging operation indicated by server:{row.operation} -{getattr(row,'localFile','')}"
741+
"Attempting staging operation indicated by server: %s - %s",
742+
row.operation,
743+
getattr(row,"localFile",""),
727744
)
728745

729746
# TODO: Create a retry loop here to re-attempt if the request times out or fails
@@ -762,6 +779,10 @@ def _handle_staging_put(
762779
HttpMethod.PUT,presigned_url,body=fh.read(),headers=headers
763780
)
764781

782+
self._handle_staging_http_response(r)
783+
784+
def_handle_staging_http_response(self,r):
785+
765786
# fmt: off
766787
# HTTP status codes
767788
OK=200
@@ -784,6 +805,37 @@ def _handle_staging_put(
784805
+"but not yet applied on the server. It's possible this command may fail later."
785806
)
786807

808+
@log_latency(StatementType.SQL)
809+
def_handle_staging_put_stream(
810+
self,
811+
presigned_url:str,
812+
stream:BinaryIO,
813+
headers:dict= {},
814+
)->None:
815+
"""Handle PUT operation with streaming data.
816+
817+
Args:
818+
presigned_url: The presigned URL for upload
819+
stream: Binary stream to upload
820+
headers: HTTP headers
821+
822+
Raises:
823+
ProgrammingError: If no input stream is provided
824+
OperationalError: If the upload fails
825+
"""
826+
827+
ifnotstream:
828+
raiseProgrammingError(
829+
"No input stream provided for streaming operation",
830+
session_id_hex=self.connection.get_session_id_hex(),
831+
)
832+
833+
r=self.connection.http_client.request(
834+
HttpMethod.PUT,presigned_url,body=stream.read(),headers=headers
835+
)
836+
837+
self._handle_staging_http_response(r)
838+
787839
@log_latency(StatementType.SQL)
788840
def_handle_staging_get(
789841
self,local_file:str,presigned_url:str,headers:Optional[dict]=None
@@ -840,6 +892,7 @@ def execute(
840892
operation:str,
841893
parameters:Optional[TParameterCollection]=None,
842894
enforce_embedded_schema_correctness=False,
895+
input_stream:Optional[BinaryIO]=None,
843896
)->"Cursor":
844897
"""
845898
Execute a query and wait for execution to complete.
@@ -914,7 +967,8 @@ def execute(
914967

915968
ifself.active_result_setandself.active_result_set.is_staging_operation:
916969
self._handle_staging_operation(
917-
staging_allowed_local_path=self.connection.staging_allowed_local_path
970+
staging_allowed_local_path=self.connection.staging_allowed_local_path,
971+
input_stream=input_stream,
918972
)
919973

920974
returnself
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
#!/usr/bin/env python3
2+
"""
3+
E2E tests for streaming PUT operations.
4+
"""
5+
6+
importio
7+
importlogging
8+
importpytest
9+
10+
logger=logging.getLogger(__name__)
11+
12+
13+
classPySQLStreamingPutTestSuiteMixin:
14+
"""Test suite for streaming PUT operations."""
15+
16+
deftest_streaming_put_basic(self,catalog,schema):
17+
"""Test basic streaming PUT functionality."""
18+
19+
# Create test data
20+
test_data=b"Hello, streaming world! This is test data."
21+
filename="streaming_put_test.txt"
22+
file_path=f"/Volumes/{catalog}/{schema}/e2etests/{filename}"
23+
24+
try:
25+
withself.connection()asconn:
26+
withconn.cursor()ascursor:
27+
self._cleanup_test_file(file_path)
28+
29+
withio.BytesIO(test_data)asstream:
30+
cursor.execute(
31+
f"PUT '__input_stream__' INTO '{file_path}'",
32+
input_stream=stream
33+
)
34+
35+
# Verify file exists
36+
cursor.execute(f"LIST '/Volumes/{catalog}/{schema}/e2etests/'")
37+
files=cursor.fetchall()
38+
39+
# Check if our file is in the list
40+
file_paths= [row[0]forrowinfiles]
41+
assertfile_pathinfile_paths,f"File{file_path} not found in{file_paths}"
42+
finally:
43+
self._cleanup_test_file(file_path)
44+
45+
deftest_streaming_put_missing_stream(self,catalog,schema):
46+
"""Test that missing stream raises appropriate error."""
47+
48+
withself.connection()asconn:
49+
withconn.cursor()ascursor:
50+
# Test without providing stream
51+
withpytest.raises(Exception):# Should fail
52+
cursor.execute(
53+
f"PUT '__input_stream__' INTO '/Volumes/{catalog}/{schema}/e2etests/test.txt'"
54+
# Note: No input_stream parameter
55+
)
56+
57+
def_cleanup_test_file(self,file_path):
58+
"""Clean up a test file if it exists."""
59+
try:
60+
withself.connection(extra_params={"staging_allowed_local_path":"/"})asconn:
61+
withconn.cursor()ascursor:
62+
cursor.execute(f"REMOVE '{file_path}'")
63+
logger.info("Successfully cleaned up test file: %s",file_path)
64+
exceptExceptionase:
65+
logger.error("Cleanup failed for %s: %s",file_path,e)

‎tests/e2e/test_driver.py‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
fromtests.e2e.common.retry_test_mixinsimportPySQLRetryTestsMixin
5151

5252
fromtests.e2e.common.uc_volume_testsimportPySQLUCVolumeTestSuiteMixin
53+
fromtests.e2e.common.streaming_put_testsimportPySQLStreamingPutTestSuiteMixin
5354

5455
fromdatabricks.sql.excimportSessionAlreadyClosedError
5556

@@ -290,6 +291,7 @@ class TestPySQLCoreSuite(
290291
PySQLStagingIngestionTestSuiteMixin,
291292
PySQLRetryTestsMixin,
292293
PySQLUCVolumeTestSuiteMixin,
294+
PySQLStreamingPutTestSuiteMixin,
293295
):
294296
validate_row_value_type=True
295297
validate_result=True

‎tests/unit/test_streaming_put.py‎

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
importio
2+
fromunittest.mockimportpatch,Mock,MagicMock
3+
4+
importpytest
5+
6+
importdatabricks.sql.clientasclient
7+
8+
9+
classTestStreamingPut:
10+
"""Unit tests for streaming PUT functionality."""
11+
12+
@pytest.fixture
13+
defcursor(self):
14+
returnclient.Cursor(connection=Mock(),backend=Mock())
15+
16+
def_setup_mock_staging_put_stream_response(self,mock_backend):
17+
"""Helper method to set up mock staging PUT stream response."""
18+
mock_result_set=Mock()
19+
mock_result_set.is_staging_operation=True
20+
mock_backend.execute_command.return_value=mock_result_set
21+
22+
mock_row=Mock()
23+
mock_row.operation="PUT"
24+
mock_row.localFile="__input_stream__"
25+
mock_row.presignedUrl="https://example.com/upload"
26+
mock_row.headers="{}"
27+
mock_result_set.fetchone.return_value=mock_row
28+
29+
returnmock_result_set
30+
31+
deftest_execute_with_valid_stream(self,cursor):
32+
"""Test execute method with valid input stream."""
33+
34+
# Mock the backend response
35+
self._setup_mock_staging_put_stream_response(cursor.backend)
36+
37+
# Test with valid stream
38+
test_stream=io.BytesIO(b"test data")
39+
40+
withpatch.object(cursor,"_handle_staging_put_stream")asmock_handler:
41+
cursor.execute(
42+
"PUT '__input_stream__' INTO '/Volumes/test/cat/schema/vol/file.txt'",
43+
input_stream=test_stream,
44+
)
45+
46+
# Verify staging handler was called
47+
mock_handler.assert_called_once()
48+
49+
deftest_execute_with_none_stream_for_staging_put(self,cursor):
50+
"""Test execute method rejects None stream for streaming PUT operations."""
51+
52+
# Mock staging operation response for None case
53+
self._setup_mock_staging_put_stream_response(cursor.backend)
54+
55+
# None with __input_stream__ raises ProgrammingError
56+
withpytest.raises(client.ProgrammingError)asexcinfo:
57+
cursor.execute(
58+
"PUT '__input_stream__' INTO '/Volumes/test/cat/schema/vol/file.txt'",
59+
input_stream=None,
60+
)
61+
error_msg=str(excinfo.value)
62+
assert"No input stream provided for streaming operation"inerror_msg
63+
64+
deftest_handle_staging_put_stream_success(self,cursor):
65+
"""Test successful streaming PUT operation."""
66+
67+
presigned_url="https://example.com/upload"
68+
headers= {"Content-Type":"text/plain"}
69+
70+
withpatch.object(
71+
cursor.connection.http_client,"request"
72+
)asmock_http_request:
73+
mock_response=MagicMock()
74+
mock_response.status=200
75+
mock_response.data=b"success"
76+
mock_http_request.return_value=mock_response
77+
78+
test_stream=io.BytesIO(b"test data")
79+
cursor._handle_staging_put_stream(
80+
presigned_url=presigned_url,stream=test_stream,headers=headers
81+
)
82+
83+
# Verify the HTTP client was called correctly
84+
mock_http_request.assert_called_once()
85+
call_args=mock_http_request.call_args
86+
# Check positional arguments: (method, url, body=..., headers=...)
87+
assertcall_args[0][0].value=="PUT"# First positional arg is method
88+
assertcall_args[0][1]==presigned_url# Second positional arg is url
89+
# Check keyword arguments
90+
assertcall_args[1]["body"]==b"test data"
91+
assertcall_args[1]["headers"]==headers
92+
93+
deftest_handle_staging_put_stream_http_error(self,cursor):
94+
"""Test streaming PUT operation with HTTP error."""
95+
96+
presigned_url="https://example.com/upload"
97+
98+
withpatch.object(
99+
cursor.connection.http_client,"request"
100+
)asmock_http_request:
101+
mock_response=MagicMock()
102+
mock_response.status=500
103+
mock_response.data=b"Internal Server Error"
104+
mock_http_request.return_value=mock_response
105+
106+
test_stream=io.BytesIO(b"test data")
107+
withpytest.raises(client.OperationalError)asexcinfo:
108+
cursor._handle_staging_put_stream(
109+
presigned_url=presigned_url,stream=test_stream
110+
)
111+
112+
# Check for the actual error message format
113+
assert"500"instr(excinfo.value)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp