Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

[471] - Close underlying HTTP Client on closingConnection#674

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Open
Varun0157 wants to merge37 commits intodatabricks:main
base:main
Choose a base branch
Loading
fromVarun0157:close-conn
Open
Show file tree
Hide file tree
Changes fromall commits
Commits
Show all changes
37 commits
Select commitHold shift + click to select a range
4437a2a
Refactor codebase to use a unified http client
vikrantpuppalaAug 8, 2025
30c04a6
Some more fixes and aligned tests
vikrantpuppalaAug 8, 2025
4294600
Fix all tests
vikrantpuppalaAug 8, 2025
3155211
fmt
vikrantpuppalaAug 8, 2025
1143838
preliminary connection closure func
Varun0157Aug 9, 2025
68cc822
unit test for backend closure
Varun0157Aug 9, 2025
ef1d9fd
remove redundant comment
Varun0157Aug 9, 2025
4bb2e4b
assert SEA http client closure in unit tests
Varun0157Aug 10, 2025
734dd06
correct docstrng
Varun0157Aug 10, 2025
d00e3c8
fix e2e
vikrantpuppalaAug 11, 2025
000d3a3
fix unit
vikrantpuppalaAug 11, 2025
cba3da7
more fixes
vikrantpuppalaAug 11, 2025
2a1f719
more fixes
vikrantpuppalaAug 11, 2025
1dd40a1
review comments
vikrantpuppalaAug 12, 2025
3847aca
fix warnings
vikrantpuppalaAug 12, 2025
d9a4797
fix check-types
vikrantpuppalaAug 12, 2025
ba2a3a9
remove separate http client for telemetry
vikrantpuppalaAug 12, 2025
d1f045e
more clean up
vikrantpuppalaAug 12, 2025
ea3b0b0
Merge remote-tracking branch 'target/http-client-refactor-2' into clo…
Varun0157Aug 13, 2025
4e66230
remove excess release_connection call
Varun0157Aug 13, 2025
bf0a2f6
Merge remote-tracking branch 'target/main' into close-conn
Varun0157Aug 13, 2025
67020f1
formatting (black) - fix some closures
Varun0157Aug 13, 2025
496d7f7
Revert "formatting (black) - fix some closures"
Varun0157Aug 13, 2025
84ec33a
add more http_client closures
Varun0157Aug 13, 2025
76ce5ce
remove excess close call
Varun0157Aug 13, 2025
4452725
wait for _flush before closing HTTP client
Varun0157Aug 14, 2025
d90ac80
make close() async
Varun0157Aug 14, 2025
8ff6552
Merge remote-tracking branch 'target/main' into close-conn
Varun0157Sep 3, 2025
666fe62
Merge remote-tracking branch 'origin/main' into close-conn
Varun0157Sep 20, 2025
1c88a01
Merge branch 'main' into close-conn
Varun0157Sep 26, 2025
f78456b
Merge branch 'main' into close-conn
Varun0157Sep 30, 2025
b97ac03
Merge branch 'main' into close-conn
Varun0157Oct 27, 2025
c78bace
simplify close_session (remove secondary _close_session invocation)
Varun0157Oct 28, 2025
bedfc06
simplify changes
Varun0157Oct 28, 2025
a36353b
simplify diff
Varun0157Oct 28, 2025
76fb623
simplify imports, log TelemetryClient closure
Varun0157Oct 28, 2025
66192b4
simplify diff
Varun0157Oct 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletionsrc/databricks/sql/auth/thrift_http_client.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -147,11 +147,14 @@ def open(self):
else:
self.__pool = pool_class(self.host, self.port, **_pool_kwargs)

defclose(self):
defrelease_connection(self):
self.__resp and self.__resp.drain_conn()
self.__resp and self.__resp.release_conn()
self.__resp = None

def close(self):
self.__pool.close()

def read(self, sz):
return self.__resp.read(sz)

Expand Down
7 changes: 5 additions & 2 deletionssrc/databricks/sql/backend/sea/backend.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -287,9 +287,9 @@ def close_session(self, session_id: SessionId) -> None:

logger.debug("SeaDatabricksClient.close_session(session_id=%s)", session_id)

if session_id.backend_type != BackendType.SEA:
raise ValueError("Not a valid SEA session ID")
sea_session_id = session_id.to_sea_session_id()
if sea_session_id is None:
raise ValueError("Not a valid SEA session ID")

request_data = DeleteSessionRequest(
warehouse_id=self.warehouse_id,
Expand All@@ -302,6 +302,9 @@ def close_session(self, session_id: SessionId) -> None:
data=request_data.to_dict(),
)

# close the HTTP client
self._http_client.close()

def _extract_description_from_manifest(
self, manifest: ResultManifest
) -> List[Tuple]:
Expand Down
2 changes: 1 addition & 1 deletionsrc/databricks/sql/backend/sea/utils/http_client.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -183,7 +183,7 @@ def _open(self):
def close(self):
"""Close the connection pool."""
if self._pool:
self._pool.clear()
self._pool.close()

def using_proxy(self) -> bool:
"""Check if proxy is being used."""
Expand Down
4 changes: 2 additions & 2 deletionssrc/databricks/sql/backend/thrift_backend.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -483,8 +483,8 @@ def attempt_request(attempt):
)
)
finally:
# Calling `close()` here releases the active HTTP connection back to the pool
self._transport.close()
# Calling `release_connection()` here releases the active HTTP connection back to the pool
self._transport.release_connection()

return RequestErrorInfo(
error=error,
Expand Down
34 changes: 30 additions & 4 deletionssrc/databricks/sql/telemetry/telemetry_client.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -2,8 +2,7 @@
import time
import logging
import json
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import Future
from concurrent.futures import ThreadPoolExecutor, wait
from datetime import datetime, timezone
from typing import List, Dict, Any, Optional, TYPE_CHECKING
from databricks.sql.telemetry.models.event import (
Expand DownExpand Up@@ -182,6 +181,7 @@ def __init__(
self._user_agent = None
self._events_batch = []
self._lock = threading.RLock()
self._pending_futures = set()
self._driver_connection_params = None
self._host_url = host_url
self._executor = executor
Expand DownExpand Up@@ -245,6 +245,9 @@ def _send_telemetry(self, events):
timeout=900,
)

with self._lock:
self._pending_futures.add(future)

future.add_done_callback(
lambda fut: self._telemetry_request_callback(fut, sent_count=sent_count)
)
Expand DownExpand Up@@ -303,6 +306,9 @@ def _telemetry_request_callback(self, future, sent_count: int):

except Exception as e:
logger.debug("Telemetry request failed with exception: %s", e)
finally:
with self._lock:
self._pending_futures.discard(future)

def _export_telemetry_log(self, **telemetry_event_kwargs):
"""
Expand DownExpand Up@@ -356,10 +362,30 @@ def export_latency_log(self, latency_ms, sql_execution_event, sql_statement_id):
)

def close(self):
"""Flush remaining events before closing"""
logger.debug("Closing TelemetryClient for connection %s", self._session_id_hex)
"""Schedule client closure."""
logger.debug(
"Scheduling closure for TelemetryClient of connection %s",
self._session_id_hex,
)
self._executor.submit(self._close_and_wait)

def _close_and_wait(self):
"""Flush remaining events and wait for them to complete before closing."""
self._flush()

with self._lock:
pending_events = list(self._pending_futures)

if pending_events:
logger.debug(
"Waiting for %s pending telemetry requests to complete.",
len(pending_events),
)
wait(pending_events)

logger.debug("Closing TelemetryClient for connection %s", self._session_id_hex)
self._http_client.close()


class TelemetryClientFactory:
"""
Expand Down
1 change: 1 addition & 0 deletionstests/unit/test_sea_backend.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -222,6 +222,7 @@ def test_session_management(self, sea_client, mock_http_client, thrift_session_i
path=sea_client.SESSION_PATH_WITH_ID.format("test-session-789"),
data={"session_id": "test-session-789", "warehouse_id": "abc123"},
)
mock_http_client.close.assert_called_once()

# Test close_session with invalid ID type
with pytest.raises(ValueError) as excinfo:
Expand Down
9 changes: 8 additions & 1 deletiontests/unit/test_thrift_backend.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -1436,8 +1436,12 @@ def test_op_handle_respected_in_close_command(self, tcli_service_class):
)

@patch("databricks.sql.backend.thrift_backend.TCLIService.Client", autospec=True)
def test_session_handle_respected_in_close_session(self, tcli_service_class):
@patch("databricks.sql.auth.thrift_http_client.THttpClient", autospec=True)
def test_session_handle_respected_in_close_session(
self, mock_http_client_class, tcli_service_class
):
tcli_service_instance = tcli_service_class.return_value
mock_http_client_instance = mock_http_client_class.return_value
thrift_backend = ThriftDatabricksClient(
"foobar",
443,
Expand All@@ -1447,12 +1451,15 @@ def test_session_handle_respected_in_close_session(self, tcli_service_class):
ssl_options=SSLOptions(),
http_client=MagicMock(),
)
thrift_backend._transport = mock_http_client_instance

session_id = SessionId.from_thrift_handle(self.session_handle)
thrift_backend.close_session(session_id)
self.assertEqual(
tcli_service_instance.CloseSession.call_args[0][0].sessionHandle,
self.session_handle,
)
mock_http_client_instance.close.assert_called_once()

@patch("databricks.sql.backend.thrift_backend.TCLIService.Client", autospec=True)
def test_non_arrow_non_column_based_set_triggers_exception(
Expand Down

[8]ページ先頭

©2009-2025 Movatter.jp