Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9dfb623

Browse files
committed
simplified CB config
Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com>
1 parente3d85f4 commit9dfb623

File tree

8 files changed

+506
-843
lines changed

8 files changed

+506
-843
lines changed

‎src/databricks/sql/telemetry/circuit_breaker_manager.py‎

Lines changed: 9 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,15 @@
1717
logger=logging.getLogger(__name__)
1818

1919
# Circuit Breaker Configuration Constants
20-
DEFAULT_FAILURE_THRESHOLD=0.5
21-
DEFAULT_MINIMUM_CALLS=20
22-
DEFAULT_TIMEOUT=30
23-
DEFAULT_RESET_TIMEOUT=30
24-
DEFAULT_EXPECTED_EXCEPTION= (Exception,)
25-
DEFAULT_NAME="telemetry-circuit-breaker"
20+
MINIMUM_CALLS=20
21+
RESET_TIMEOUT=30
22+
CIRCUIT_BREAKER_NAME="telemetry-circuit-breaker"
2623

2724
# Circuit Breaker State Constants
2825
CIRCUIT_BREAKER_STATE_OPEN="open"
2926
CIRCUIT_BREAKER_STATE_CLOSED="closed"
3027
CIRCUIT_BREAKER_STATE_HALF_OPEN="half-open"
3128
CIRCUIT_BREAKER_STATE_DISABLED="disabled"
32-
CIRCUIT_BREAKER_STATE_NOT_INITIALIZED="not_initialized"
3329

3430
# Logging Message Constants
3531
LOG_CIRCUIT_BREAKER_STATE_CHANGED="Circuit breaker state changed from %s to %s for %s"
@@ -76,56 +72,18 @@ def state_change(self, cb: CircuitBreaker, old_state, new_state) -> None:
7672
logger.info(LOG_CIRCUIT_BREAKER_HALF_OPEN,cb.name)
7773

7874

79-
@dataclass(frozen=True)
80-
classCircuitBreakerConfig:
81-
"""Configuration for circuit breaker behavior.
82-
83-
This class is immutable to prevent modification of circuit breaker settings.
84-
All configuration values are set to constants defined at the module level.
85-
"""
86-
87-
# Failure threshold percentage (0.0 to 1.0)
88-
failure_threshold:float=DEFAULT_FAILURE_THRESHOLD
89-
90-
# Minimum number of calls before circuit can open
91-
minimum_calls:int=DEFAULT_MINIMUM_CALLS
92-
93-
# Time window for counting failures (in seconds)
94-
timeout:int=DEFAULT_TIMEOUT
95-
96-
# Time to wait before trying to close circuit (in seconds)
97-
reset_timeout:int=DEFAULT_RESET_TIMEOUT
98-
99-
# Expected exception types that should trigger circuit breaker
100-
expected_exception:tuple=DEFAULT_EXPECTED_EXCEPTION
101-
102-
# Name for the circuit breaker (for logging)
103-
name:str=DEFAULT_NAME
104-
105-
10675
classCircuitBreakerManager:
10776
"""
10877
Manages circuit breaker instances for telemetry requests.
10978
11079
This class provides a singleton pattern to manage circuit breaker instances
11180
per host, ensuring that telemetry failures don't impact main SQL operations.
81+
82+
Circuit breaker configuration is fixed and cannot be overridden.
11283
"""
11384

11485
_instances:Dict[str,CircuitBreaker]= {}
11586
_lock=threading.RLock()
116-
_config:Optional[CircuitBreakerConfig]=None
117-
118-
@classmethod
119-
definitialize(cls,config:CircuitBreakerConfig)->None:
120-
"""
121-
Initialize the circuit breaker manager with configuration.
122-
123-
Args:
124-
config: Circuit breaker configuration
125-
"""
126-
withcls._lock:
127-
cls._config=config
128-
logger.debug("CircuitBreakerManager initialized with config: %s",config)
12987

13088
@classmethod
13189
defget_circuit_breaker(cls,host:str)->CircuitBreaker:
@@ -138,10 +96,6 @@ def get_circuit_breaker(cls, host: str) -> CircuitBreaker:
13896
Returns:
13997
CircuitBreaker instance for the host
14098
"""
141-
ifnotcls._config:
142-
# Return a no-op circuit breaker if not initialized
143-
returncls._create_noop_circuit_breaker()
144-
14599
withcls._lock:
146100
ifhostnotincls._instances:
147101
cls._instances[host]=cls._create_circuit_breaker(host)
@@ -160,93 +114,16 @@ def _create_circuit_breaker(cls, host: str) -> CircuitBreaker:
160114
Returns:
161115
New CircuitBreaker instance
162116
"""
163-
config=cls._config
164-
ifconfigisNone:
165-
raiseRuntimeError("CircuitBreakerManager not initialized")
166-
167-
# Create circuit breaker with configuration
117+
# Create circuit breaker with fixed configuration
168118
breaker=CircuitBreaker(
169-
fail_max=config.minimum_calls,# Number of failures before circuit opens
170-
reset_timeout=config.reset_timeout,
171-
name=f"{config.name}-{host}",
119+
fail_max=MINIMUM_CALLS,
120+
reset_timeout=RESET_TIMEOUT,
121+
name=f"{CIRCUIT_BREAKER_NAME}-{host}",
172122
)
173-
174-
# Add state change listeners for logging
175123
breaker.add_listener(CircuitBreakerStateListener())
176124

177125
returnbreaker
178126

179-
@classmethod
180-
def_create_noop_circuit_breaker(cls)->CircuitBreaker:
181-
"""
182-
Create a no-op circuit breaker that always allows calls.
183-
184-
Returns:
185-
CircuitBreaker that never opens
186-
"""
187-
# Create a circuit breaker with very high thresholds so it never opens
188-
breaker=CircuitBreaker(
189-
fail_max=1000000,# Very high threshold
190-
reset_timeout=1,# Short reset time
191-
name="noop-circuit-breaker",
192-
)
193-
returnbreaker
194-
195-
@classmethod
196-
defget_circuit_breaker_state(cls,host:str)->str:
197-
"""
198-
Get the current state of the circuit breaker for a host.
199-
200-
Args:
201-
host: The hostname
202-
203-
Returns:
204-
Current state of the circuit breaker
205-
"""
206-
ifnotcls._config:
207-
returnCIRCUIT_BREAKER_STATE_DISABLED
208-
209-
withcls._lock:
210-
ifhostnotincls._instances:
211-
returnCIRCUIT_BREAKER_STATE_NOT_INITIALIZED
212-
213-
breaker=cls._instances[host]
214-
returnbreaker.current_state
215-
216-
@classmethod
217-
defreset_circuit_breaker(cls,host:str)->None:
218-
"""
219-
Reset the circuit breaker for a host to closed state.
220-
221-
Args:
222-
host: The hostname
223-
"""
224-
withcls._lock:
225-
ifhostincls._instances:
226-
# pybreaker doesn't have a reset method, we need to recreate the breaker
227-
delcls._instances[host]
228-
logger.info("Reset circuit breaker for host: %s",host)
229-
230-
@classmethod
231-
defclear_circuit_breaker(cls,host:str)->None:
232-
"""
233-
Remove the circuit breaker instance for a host.
234-
235-
Args:
236-
host: The hostname
237-
"""
238-
withcls._lock:
239-
ifhostincls._instances:
240-
delcls._instances[host]
241-
logger.debug("Cleared circuit breaker for host: %s",host)
242-
243-
@classmethod
244-
defclear_all_circuit_breakers(cls)->None:
245-
"""Clear all circuit breaker instances."""
246-
withcls._lock:
247-
cls._instances.clear()
248-
logger.debug("Cleared all circuit breakers")
249-
250127

251128
defis_circuit_breaker_error(exception:Exception)->bool:
252129
"""

‎src/databricks/sql/telemetry/telemetry_client.py‎

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
CircuitBreakerTelemetryPushClient,
4848
)
4949
fromdatabricks.sql.telemetry.circuit_breaker_managerimport (
50-
CircuitBreakerConfig,
5150
is_circuit_breaker_error,
5251
)
5352

@@ -200,34 +199,15 @@ def __init__(
200199

201200
# Create telemetry push client based on circuit breaker enabled flag
202201
ifclient_context.telemetry_circuit_breaker_enabled:
203-
# Create circuit breaker configuration from client context or use defaults
204-
self._circuit_breaker_config=CircuitBreakerConfig(
205-
failure_threshold=getattr(
206-
client_context,"telemetry_circuit_breaker_failure_threshold",0.5
207-
),
208-
minimum_calls=getattr(
209-
client_context,"telemetry_circuit_breaker_minimum_calls",20
210-
),
211-
timeout=getattr(
212-
client_context,"telemetry_circuit_breaker_timeout",30
213-
),
214-
reset_timeout=getattr(
215-
client_context,"telemetry_circuit_breaker_reset_timeout",30
216-
),
217-
name=f"telemetry-circuit-breaker-{session_id_hex}",
218-
)
219-
220-
# Create circuit breaker telemetry push client
202+
# Create circuit breaker telemetry push client with fixed configuration
221203
self._telemetry_push_client:ITelemetryPushClient= (
222204
CircuitBreakerTelemetryPushClient(
223205
TelemetryPushClient(self._http_client),
224206
host_url,
225-
self._circuit_breaker_config,
226207
)
227208
)
228209
else:
229210
# Circuit breaker disabled - use direct telemetry push client
230-
self._circuit_breaker_config=None
231211
self._telemetry_push_client:ITelemetryPushClient=TelemetryPushClient(
232212
self._http_client
233213
)
@@ -410,18 +390,6 @@ def close(self):
410390
logger.debug("Closing TelemetryClient for connection %s",self._session_id_hex)
411391
self._flush()
412392

413-
defget_circuit_breaker_state(self)->str:
414-
"""Get the current state of the circuit breaker."""
415-
returnself._telemetry_push_client.get_circuit_breaker_state()
416-
417-
defis_circuit_breaker_open(self)->bool:
418-
"""Check if the circuit breaker is currently open."""
419-
returnself._telemetry_push_client.is_circuit_breaker_open()
420-
421-
defreset_circuit_breaker(self)->None:
422-
"""Reset the circuit breaker."""
423-
self._telemetry_push_client.reset_circuit_breaker()
424-
425393

426394
classTelemetryClientFactory:
427395
"""

‎src/databricks/sql/telemetry/telemetry_push_client.py‎

Lines changed: 3 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@
2020
fromdatabricks.sql.common.unified_http_clientimportUnifiedHttpClient
2121
fromdatabricks.sql.common.httpimportHttpMethod
2222
fromdatabricks.sql.telemetry.circuit_breaker_managerimport (
23-
CircuitBreakerConfig,
2423
CircuitBreakerManager,
2524
is_circuit_breaker_error,
26-
CIRCUIT_BREAKER_STATE_OPEN,
2725
)
2826

2927
logger=logging.getLogger(__name__)
@@ -55,21 +53,6 @@ def request_context(
5553
"""Context manager for making HTTP requests."""
5654
pass
5755

58-
@abstractmethod
59-
defget_circuit_breaker_state(self)->str:
60-
"""Get the current state of the circuit breaker."""
61-
pass
62-
63-
@abstractmethod
64-
defis_circuit_breaker_open(self)->bool:
65-
"""Check if the circuit breaker is currently open."""
66-
pass
67-
68-
@abstractmethod
69-
defreset_circuit_breaker(self)->None:
70-
"""Reset the circuit breaker to closed state."""
71-
pass
72-
7356

7457
classTelemetryPushClient(ITelemetryPushClient):
7558
"""Direct HTTP client implementation for telemetry requests."""
@@ -108,47 +91,27 @@ def request_context(
10891
)asresponse:
10992
yieldresponse
11093

111-
defget_circuit_breaker_state(self)->str:
112-
"""Circuit breaker is not available in direct implementation."""
113-
return"not_available"
114-
115-
defis_circuit_breaker_open(self)->bool:
116-
"""Circuit breaker is not available in direct implementation."""
117-
returnFalse
118-
119-
defreset_circuit_breaker(self)->None:
120-
"""Circuit breaker is not available in direct implementation."""
121-
pass
122-
12394

12495
classCircuitBreakerTelemetryPushClient(ITelemetryPushClient):
12596
"""Circuit breaker wrapper implementation for telemetry requests."""
12697

127-
def__init__(
128-
self,delegate:ITelemetryPushClient,host:str,config:CircuitBreakerConfig
129-
):
98+
def__init__(self,delegate:ITelemetryPushClient,host:str):
13099
"""
131100
Initialize the circuit breaker telemetry push client.
132101
133102
Args:
134103
delegate: The underlying telemetry push client to wrap
135104
host: The hostname for circuit breaker identification
136-
config: Circuit breaker configuration
137105
"""
138106
self._delegate=delegate
139107
self._host=host
140-
self._config=config
141108

142-
# Initialize circuit breaker manager with config
143-
CircuitBreakerManager.initialize(config)
144-
145-
# Get circuit breaker for this host
109+
# Get circuit breaker for this host (creates if doesn't exist)
146110
self._circuit_breaker=CircuitBreakerManager.get_circuit_breaker(host)
147111

148112
logger.debug(
149-
"CircuitBreakerTelemetryPushClient initialized for host %s with config: %s",
113+
"CircuitBreakerTelemetryPushClient initialized for host %s",
150114
host,
151-
config,
152115
)
153116

154117
defrequest(
@@ -208,15 +171,3 @@ def _make_request():
208171
# Re-raise non-circuit breaker exceptions
209172
logger.debug("Telemetry request failed for host %s: %s",self._host,e)
210173
raise
211-
212-
defget_circuit_breaker_state(self)->str:
213-
"""Get the current state of the circuit breaker."""
214-
returnCircuitBreakerManager.get_circuit_breaker_state(self._host)
215-
216-
defis_circuit_breaker_open(self)->bool:
217-
"""Check if the circuit breaker is currently open."""
218-
returnself.get_circuit_breaker_state()==CIRCUIT_BREAKER_STATE_OPEN
219-
220-
defreset_circuit_breaker(self)->None:
221-
"""Reset the circuit breaker to closed state."""
222-
CircuitBreakerManager.reset_circuit_breaker(self._host)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp