Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitdb93974

Browse files
committed
circuit breaker changes using pybreaker
Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com>
1 parent2f54be8 commitdb93974

10 files changed

+1687
-3
lines changed

‎docs/parameters.md‎

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,3 +254,73 @@ You should only set `use_inline_params=True` in the following cases:
254254
4. Your client code uses[sequences as parameter values](#passing-sequences-as-parameter-values)
255255

256256
We expect limitations (1) and (2) to be addressed in a future Databricks Runtime release.
257+
258+
#Telemetry Circuit Breaker Configuration
259+
260+
The Databricks SQL connector includes a circuit breaker pattern for telemetry requests to prevent telemetry failures from impacting main SQL operations. This feature is enabled by default and can be controlled through a connection parameter.
261+
262+
##Overview
263+
264+
The circuit breaker monitors telemetry request failures and automatically blocks telemetry requests when the failure rate exceeds a configured threshold. This prevents telemetry service issues from affecting your main SQL operations.
265+
266+
##Configuration Parameter
267+
268+
| Parameter| Type| Default| Description|
269+
|-----------|------|---------|-------------|
270+
|`telemetry_circuit_breaker_enabled`| bool|`True`| Enable or disable the telemetry circuit breaker|
271+
272+
##Usage Examples
273+
274+
###Default Configuration (Circuit Breaker Enabled)
275+
276+
```python
277+
from databricksimport sql
278+
279+
# Circuit breaker is enabled by default
280+
with sql.connect(
281+
server_hostname="your-host.cloud.databricks.com",
282+
http_path="/sql/1.0/warehouses/your-warehouse-id",
283+
access_token="your-token"
284+
)as conn:
285+
# Your SQL operations here
286+
pass
287+
```
288+
289+
###Disable Circuit Breaker
290+
291+
```python
292+
from databricksimport sql
293+
294+
# Disable circuit breaker entirely
295+
with sql.connect(
296+
server_hostname="your-host.cloud.databricks.com",
297+
http_path="/sql/1.0/warehouses/your-warehouse-id",
298+
access_token="your-token",
299+
telemetry_circuit_breaker_enabled=False
300+
)as conn:
301+
# Your SQL operations here
302+
pass
303+
```
304+
305+
##Circuit Breaker States
306+
307+
The circuit breaker operates in three states:
308+
309+
1.**Closed**: Normal operation, telemetry requests are allowed
310+
2.**Open**: Circuit breaker is open, telemetry requests are blocked
311+
3.**Half-Open**: Testing state, limited telemetry requests are allowed
312+
313+
314+
##Performance Impact
315+
316+
The circuit breaker has minimal performance impact on SQL operations:
317+
318+
- Circuit breaker only affects telemetry requests, not SQL queries
319+
- When circuit breaker is open, telemetry requests are simply skipped
320+
- No additional latency is added to successful operations
321+
322+
##Best Practices
323+
324+
1.**Keep circuit breaker enabled**: The default configuration works well for most use cases
325+
2.**Don't disable unless necessary**: Circuit breaker provides important protection against telemetry failures
326+
3.**Monitor application logs**: Circuit breaker state changes are logged for troubleshooting

‎pyproject.toml‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pyarrow = [
2626
{version =">=18.0.0",python =">=3.13",optional=true }
2727
]
2828
pyjwt ="^2.0.0"
29+
pybreaker ="^1.0.0"
2930
requests-kerberos = {version ="^0.15.0",optional =true}
3031

3132

‎src/databricks/sql/auth/common.py‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ def __init__(
5151
pool_connections:Optional[int]=None,
5252
pool_maxsize:Optional[int]=None,
5353
user_agent:Optional[str]=None,
54+
# Telemetry circuit breaker configuration
55+
telemetry_circuit_breaker_enabled:Optional[bool]=None,
5456
):
5557
self.hostname=hostname
5658
self.access_token=access_token
@@ -83,6 +85,9 @@ def __init__(
8385
self.pool_connections=pool_connectionsor10
8486
self.pool_maxsize=pool_maxsizeor20
8587
self.user_agent=user_agent
88+
89+
# Telemetry circuit breaker configuration
90+
self.telemetry_circuit_breaker_enabled=telemetry_circuit_breaker_enablediftelemetry_circuit_breaker_enabledisnotNoneelseTrue
8691

8792

8893
defget_effective_azure_login_app_id(hostname)->str:
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
"""
2+
Circuit breaker implementation for telemetry requests.
3+
4+
This module provides circuit breaker functionality to prevent telemetry failures
5+
from impacting the main SQL operations. It uses pybreaker library to implement
6+
the circuit breaker pattern with configurable thresholds and timeouts.
7+
"""
8+
9+
importlogging
10+
importthreading
11+
fromtypingimportDict,Optional,Any
12+
fromdataclassesimportdataclass
13+
14+
importpybreaker
15+
frompybreakerimportCircuitBreaker,CircuitBreakerError
16+
17+
logger=logging.getLogger(__name__)
18+
19+
20+
@dataclass
21+
classCircuitBreakerConfig:
22+
"""Configuration for circuit breaker behavior."""
23+
24+
# Failure threshold percentage (0.0 to 1.0)
25+
failure_threshold:float=0.5
26+
27+
# Minimum number of calls before circuit can open
28+
minimum_calls:int=20
29+
30+
# Time window for counting failures (in seconds)
31+
timeout:int=30
32+
33+
# Time to wait before trying to close circuit (in seconds)
34+
reset_timeout:int=30
35+
36+
# Expected exception types that should trigger circuit breaker
37+
expected_exception:tuple= (Exception,)
38+
39+
# Name for the circuit breaker (for logging)
40+
name:str="telemetry-circuit-breaker"
41+
42+
43+
classCircuitBreakerManager:
44+
"""
45+
Manages circuit breaker instances for telemetry requests.
46+
47+
This class provides a singleton pattern to manage circuit breaker instances
48+
per host, ensuring that telemetry failures don't impact main SQL operations.
49+
"""
50+
51+
_instances:Dict[str,CircuitBreaker]= {}
52+
_lock=threading.RLock()
53+
_config:Optional[CircuitBreakerConfig]=None
54+
55+
@classmethod
56+
definitialize(cls,config:CircuitBreakerConfig)->None:
57+
"""
58+
Initialize the circuit breaker manager with configuration.
59+
60+
Args:
61+
config: Circuit breaker configuration
62+
"""
63+
withcls._lock:
64+
cls._config=config
65+
logger.debug("CircuitBreakerManager initialized with config: %s",config)
66+
67+
@classmethod
68+
defget_circuit_breaker(cls,host:str)->CircuitBreaker:
69+
"""
70+
Get or create a circuit breaker instance for the specified host.
71+
72+
Args:
73+
host: The hostname for which to get the circuit breaker
74+
75+
Returns:
76+
CircuitBreaker instance for the host
77+
"""
78+
ifnotcls._config:
79+
# Return a no-op circuit breaker if not initialized
80+
returncls._create_noop_circuit_breaker()
81+
82+
withcls._lock:
83+
ifhostnotincls._instances:
84+
cls._instances[host]=cls._create_circuit_breaker(host)
85+
logger.debug("Created circuit breaker for host: %s",host)
86+
87+
returncls._instances[host]
88+
89+
@classmethod
90+
def_create_circuit_breaker(cls,host:str)->CircuitBreaker:
91+
"""
92+
Create a new circuit breaker instance for the specified host.
93+
94+
Args:
95+
host: The hostname for the circuit breaker
96+
97+
Returns:
98+
New CircuitBreaker instance
99+
"""
100+
config=cls._config
101+
102+
# Create circuit breaker with configuration
103+
breaker=CircuitBreaker(
104+
fail_max=config.minimum_calls,
105+
reset_timeout=config.reset_timeout,
106+
name=f"{config.name}-{host}"
107+
)
108+
109+
# Set failure threshold
110+
breaker.failure_threshold=config.failure_threshold
111+
112+
# Add state change listeners for logging
113+
breaker.add_listener(cls._on_state_change)
114+
115+
returnbreaker
116+
117+
@classmethod
118+
def_create_noop_circuit_breaker(cls)->CircuitBreaker:
119+
"""
120+
Create a no-op circuit breaker that always allows calls.
121+
122+
Returns:
123+
CircuitBreaker that never opens
124+
"""
125+
# Create a circuit breaker with very high thresholds so it never opens
126+
breaker=CircuitBreaker(
127+
fail_max=1000000,# Very high threshold
128+
reset_timeout=1,# Short reset time
129+
name="noop-circuit-breaker"
130+
)
131+
breaker.failure_threshold=1.0# 100% failure threshold
132+
returnbreaker
133+
134+
@classmethod
135+
def_on_state_change(cls,old_state:str,new_state:str,breaker:CircuitBreaker)->None:
136+
"""
137+
Handle circuit breaker state changes.
138+
139+
Args:
140+
old_state: Previous state of the circuit breaker
141+
new_state: New state of the circuit breaker
142+
breaker: The circuit breaker instance
143+
"""
144+
logger.info(
145+
"Circuit breaker state changed from %s to %s for %s",
146+
old_state,new_state,breaker.name
147+
)
148+
149+
ifnew_state=="open":
150+
logger.warning(
151+
"Circuit breaker opened for %s - telemetry requests will be blocked",
152+
breaker.name
153+
)
154+
elifnew_state=="closed":
155+
logger.info(
156+
"Circuit breaker closed for %s - telemetry requests will be allowed",
157+
breaker.name
158+
)
159+
elifnew_state=="half-open":
160+
logger.info(
161+
"Circuit breaker half-open for %s - testing telemetry requests",
162+
breaker.name
163+
)
164+
165+
@classmethod
166+
defget_circuit_breaker_state(cls,host:str)->str:
167+
"""
168+
Get the current state of the circuit breaker for a host.
169+
170+
Args:
171+
host: The hostname
172+
173+
Returns:
174+
Current state of the circuit breaker
175+
"""
176+
ifnotcls._config:
177+
return"disabled"
178+
179+
withcls._lock:
180+
ifhostnotincls._instances:
181+
return"not_initialized"
182+
183+
breaker=cls._instances[host]
184+
returnbreaker.current_state
185+
186+
@classmethod
187+
defreset_circuit_breaker(cls,host:str)->None:
188+
"""
189+
Reset the circuit breaker for a host to closed state.
190+
191+
Args:
192+
host: The hostname
193+
"""
194+
withcls._lock:
195+
ifhostincls._instances:
196+
# pybreaker doesn't have a reset method, we need to recreate the breaker
197+
delcls._instances[host]
198+
logger.info("Reset circuit breaker for host: %s",host)
199+
200+
@classmethod
201+
defclear_circuit_breaker(cls,host:str)->None:
202+
"""
203+
Remove the circuit breaker instance for a host.
204+
205+
Args:
206+
host: The hostname
207+
"""
208+
withcls._lock:
209+
ifhostincls._instances:
210+
delcls._instances[host]
211+
logger.debug("Cleared circuit breaker for host: %s",host)
212+
213+
@classmethod
214+
defclear_all_circuit_breakers(cls)->None:
215+
"""Clear all circuit breaker instances."""
216+
withcls._lock:
217+
cls._instances.clear()
218+
logger.debug("Cleared all circuit breakers")
219+
220+
221+
defis_circuit_breaker_error(exception:Exception)->bool:
222+
"""
223+
Check if an exception is a circuit breaker error.
224+
225+
Args:
226+
exception: The exception to check
227+
228+
Returns:
229+
True if the exception is a circuit breaker error
230+
"""
231+
returnisinstance(exception,CircuitBreakerError)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp