Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Circuit breaker changes using pybreaker#705

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Open
nikhilsuri-db wants to merge15 commits intomain
base:main
Choose a base branch
Loading
fromPECOBLR-993
Open
Show file tree
Hide file tree
Changes fromall commits
Commits
Show all changes
15 commits
Select commitHold shift + click to select a range
37ec282
Added driver connection params
nikhilsuri-dbOct 24, 2025
2504053
Added model fields for chunk/result latency
nikhilsuri-dbOct 24, 2025
ef41f4c
fixed linting issues
nikhilsuri-dbOct 24, 2025
2f54be8
lint issue fixing
nikhilsuri-dbOct 27, 2025
db93974
circuit breaker changes using pybreaker
nikhilsuri-dbSep 26, 2025
1f9c4d3
Added interface layer top of http client to use circuit rbeaker
nikhilsuri-dbSep 30, 2025
939b548
Added test cases to validate ciruit breaker
nikhilsuri-dbSep 30, 2025
6c72f86
fixing broken tests
nikhilsuri-dbSep 30, 2025
ac845a5
fixed linting issues
nikhilsuri-dbSep 30, 2025
a602c39
fixed failing test cases
nikhilsuri-dbSep 30, 2025
c1b6e25
fixed urllib3 issue
nikhilsuri-dbSep 30, 2025
e3d85f4
added more test cases for telemetry
nikhilsuri-dbSep 30, 2025
9dfb623
simplified CB config
nikhilsuri-dbOct 6, 2025
e7e8b4b
poetry lock
nikhilsuri-dbNov 4, 2025
dab4b38
fix minor issues & improvement
nikhilsuri-dbNov 5, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletionspoetry.lock
View file
Open in desktop

Some generated files are not rendered by default. Learn more abouthow customized files appear on GitHub.

1 change: 1 addition & 0 deletionspyproject.toml
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -26,6 +26,7 @@ pyarrow = [
{ version = ">=18.0.0", python = ">=3.13", optional=true }
]
pyjwt = "^2.0.0"
pybreaker = "^1.0.0"
requests-kerberos = {version = "^0.15.0", optional = true}


Expand Down
2 changes: 2 additions & 0 deletionssrc/databricks/sql/auth/common.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -51,6 +51,7 @@ def __init__(
pool_connections: Optional[int] = None,
pool_maxsize: Optional[int] = None,
user_agent: Optional[str] = None,
telemetry_circuit_breaker_enabled: Optional[bool] = None,
):
self.hostname = hostname
self.access_token = access_token
Expand DownExpand Up@@ -83,6 +84,7 @@ def __init__(
self.pool_connections = pool_connections or 10
self.pool_maxsize = pool_maxsize or 20
self.user_agent = user_agent
self.telemetry_circuit_breaker_enabled = bool(telemetry_circuit_breaker_enabled)


def get_effective_azure_login_app_id(hostname) -> str:
Expand Down
35 changes: 34 additions & 1 deletionsrc/databricks/sql/client.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -9,6 +9,7 @@
import json
import os
import decimal
from urllib.parse import urlparse
from uuid import UUID

from databricks.sql import __version__
Expand DownExpand Up@@ -322,6 +323,20 @@ def read(self) -> Optional[OAuthToken]:
session_id_hex=self.get_session_id_hex()
)

# Determine proxy usage
use_proxy = self.http_client.using_proxy()
proxy_host_info = None
if (
use_proxy
and self.http_client.proxy_uri
and isinstance(self.http_client.proxy_uri, str)
):
parsed = urlparse(self.http_client.proxy_uri)
proxy_host_info = HostDetails(
host_url=parsed.hostname or self.http_client.proxy_uri,
port=parsed.port or 8080,
)

driver_connection_params = DriverConnectionParameters(
http_path=http_path,
mode=DatabricksClientType.SEA
Expand All@@ -331,13 +346,31 @@ def read(self) -> Optional[OAuthToken]:
auth_mech=TelemetryHelper.get_auth_mechanism(self.session.auth_provider),
auth_flow=TelemetryHelper.get_auth_flow(self.session.auth_provider),
socket_timeout=kwargs.get("_socket_timeout", None),
azure_workspace_resource_id=kwargs.get("azure_workspace_resource_id", None),
azure_tenant_id=kwargs.get("azure_tenant_id", None),
use_proxy=use_proxy,
use_system_proxy=use_proxy,
proxy_host_info=proxy_host_info,
use_cf_proxy=False, # CloudFlare proxy not yet supported in Python
cf_proxy_host_info=None, # CloudFlare proxy not yet supported in Python
non_proxy_hosts=None,
allow_self_signed_support=kwargs.get("_tls_no_verify", False),
use_system_trust_store=True, # Python uses system SSL by default
enable_arrow=pyarrow is not None,
enable_direct_results=True, # Always enabled in Python
enable_sea_hybrid_results=kwargs.get("use_hybrid_disposition", False),
http_connection_pool_size=kwargs.get("pool_maxsize", None),
rows_fetched_per_block=DEFAULT_ARRAY_SIZE,
async_poll_interval_millis=2000, # Default polling interval
support_many_parameters=True, # Native parameters supported
enable_complex_datatype_support=_use_arrow_native_complex_types,
allowed_volume_ingestion_paths=self.staging_allowed_local_path,
)

self._telemetry_client.export_initial_telemetry_log(
driver_connection_params=driver_connection_params,
user_agent=self.session.useragent_header,
)
self.staging_allowed_local_path = kwargs.get("staging_allowed_local_path", None)

def _set_use_inline_params_with_warning(self, value: Union[bool, str]):
"""Valid values are True, False, and "silent"
Expand Down
5 changes: 5 additions & 0 deletionssrc/databricks/sql/common/unified_http_client.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -301,6 +301,11 @@ def using_proxy(self) -> bool:
"""Check if proxy support is available (not whether it's being used for a specific request)."""
return self._proxy_pool_manager is not None

@property
def proxy_uri(self) -> Optional[str]:
"""Get the configured proxy URI, if any."""
return self._proxy_uri

def close(self):
"""Close the underlying connection pools."""
if self._direct_pool_manager:
Expand Down
138 changes: 138 additions & 0 deletionssrc/databricks/sql/telemetry/circuit_breaker_manager.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
"""
Circuit breaker implementation for telemetry requests.

This module provides circuit breaker functionality to prevent telemetry failures
from impacting the main SQL operations. It uses pybreaker library to implement
the circuit breaker pattern with configurable thresholds and timeouts.
"""

import logging
import threading
from typing import Dict, Optional, Any
from dataclasses import dataclass

import pybreaker
from pybreaker import CircuitBreaker, CircuitBreakerError, CircuitBreakerListener

logger = logging.getLogger(__name__)

# Circuit Breaker Configuration Constants
MINIMUM_CALLS = 20
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Hardcoded values as do not want user to configure them anywhere in the driver

RESET_TIMEOUT = 30
CIRCUIT_BREAKER_NAME = "telemetry-circuit-breaker"

# Circuit Breaker State Constants
CIRCUIT_BREAKER_STATE_OPEN = "open"
CIRCUIT_BREAKER_STATE_CLOSED = "closed"
CIRCUIT_BREAKER_STATE_HALF_OPEN = "half-open"
CIRCUIT_BREAKER_STATE_DISABLED = "disabled"

# Logging Message Constants
LOG_CIRCUIT_BREAKER_STATE_CHANGED = "Circuit breaker state changed from %s to %s for %s"
LOG_CIRCUIT_BREAKER_OPENED = (
"Circuit breaker opened for %s - telemetry requests will be blocked"
)
LOG_CIRCUIT_BREAKER_CLOSED = (
"Circuit breaker closed for %s - telemetry requests will be allowed"
)
LOG_CIRCUIT_BREAKER_HALF_OPEN = (
"Circuit breaker half-open for %s - testing telemetry requests"
)


class CircuitBreakerStateListener(CircuitBreakerListener):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Only used for logging purposed for now

"""Listener for circuit breaker state changes."""

def before_call(self, cb: CircuitBreaker, func, *args, **kwargs) -> None:
"""Called before the circuit breaker calls a function."""
pass

def failure(self, cb: CircuitBreaker, exc: BaseException) -> None:
"""Called when a function called by the circuit breaker fails."""
pass

def success(self, cb: CircuitBreaker) -> None:
"""Called when a function called by the circuit breaker succeeds."""
pass

def state_change(self, cb: CircuitBreaker, old_state, new_state) -> None:
"""Called when the circuit breaker state changes."""
old_state_name = old_state.name if old_state else "None"
new_state_name = new_state.name if new_state else "None"

logger.info(
LOG_CIRCUIT_BREAKER_STATE_CHANGED, old_state_name, new_state_name, cb.name
)

if new_state_name == CIRCUIT_BREAKER_STATE_OPEN:
logger.warning(LOG_CIRCUIT_BREAKER_OPENED, cb.name)
elif new_state_name == CIRCUIT_BREAKER_STATE_CLOSED:
logger.info(LOG_CIRCUIT_BREAKER_CLOSED, cb.name)
elif new_state_name == CIRCUIT_BREAKER_STATE_HALF_OPEN:
logger.info(LOG_CIRCUIT_BREAKER_HALF_OPEN, cb.name)


class CircuitBreakerManager:
"""
Manages circuit breaker instances for telemetry requests.

This class provides a singleton pattern to manage circuit breaker instances
per host, ensuring that telemetry failures don't impact main SQL operations.

Circuit breaker configuration is fixed and cannot be overridden.
"""

_instances: Dict[str, CircuitBreaker] = {}
_lock = threading.RLock()

@classmethod
def get_circuit_breaker(cls, host: str) -> CircuitBreaker:
"""
Get or create a circuit breaker instance for the specified host.

Args:
host: The hostname for which to get the circuit breaker

Returns:
CircuitBreaker instance for the host
"""
with cls._lock:
if host not in cls._instances:
cls._instances[host] = cls._create_circuit_breaker(host)
logger.debug("Created circuit breaker for host: %s", host)

return cls._instances[host]

@classmethod
def _create_circuit_breaker(cls, host: str) -> CircuitBreaker:
"""
Create a new circuit breaker instance for the specified host.

Args:
host: The hostname for the circuit breaker

Returns:
New CircuitBreaker instance
"""
# Create circuit breaker with fixed configuration
breaker = CircuitBreaker(
fail_max=MINIMUM_CALLS,
reset_timeout=RESET_TIMEOUT,
name=f"{CIRCUIT_BREAKER_NAME}-{host}",
)
breaker.add_listener(CircuitBreakerStateListener())

return breaker


def is_circuit_breaker_error(exception: Exception) -> bool:
"""
Check if an exception is a circuit breaker error.

Args:
exception: The exception to check

Returns:
True if the exception is a circuit breaker error
"""
return isinstance(exception, CircuitBreakerError)
Loading
Loading

[8]ページ先頭

©2009-2025 Movatter.jp