@@ -138,19 +138,18 @@ class TelemetryClient(BaseTelemetryClient):
138138TELEMETRY_AUTHENTICATED_PATH = "/telemetry-ext"
139139TELEMETRY_UNAUTHENTICATED_PATH = "/telemetry-unauth"
140140
141- DEFAULT_BATCH_SIZE = 100
142-
143141def __init__ (
144142self ,
145143telemetry_enabled ,
146144session_id_hex ,
147145auth_provider ,
148146host_url ,
149147executor ,
148+ batch_size ,
150149 ):
151150logger .debug ("Initializing TelemetryClient for connection: %s" ,session_id_hex )
152151self ._telemetry_enabled = telemetry_enabled
153- self ._batch_size = self . DEFAULT_BATCH_SIZE
152+ self ._batch_size = batch_size
154153self ._session_id_hex = session_id_hex
155154self ._auth_provider = auth_provider
156155self ._user_agent = None
@@ -318,7 +317,7 @@ def close(self):
318317class TelemetryClientFactory :
319318"""
320319 Static factory class for creating and managing telemetry clients.
321- It uses a thread pool to handle asynchronous operations.
320+ It uses a thread pool to handle asynchronous operations and a single flush thread for all clients .
322321 """
323322
324323_clients :Dict [
@@ -331,6 +330,13 @@ class TelemetryClientFactory:
331330_original_excepthook = None
332331_excepthook_installed = False
333332
333+ # Shared flush thread for all clients
334+ _flush_thread = None
335+ _flush_event = threading .Event ()
336+ _flush_interval_seconds = 90
337+
338+ DEFAULT_BATCH_SIZE = 100
339+
334340@classmethod
335341def _initialize (cls ):
336342"""Initialize the factory if not already initialized"""
@@ -341,11 +347,39 @@ def _initialize(cls):
341347max_workers = 10
342348 )# Thread pool for async operations
343349cls ._install_exception_hook ()
350+ cls ._start_flush_thread ()
344351cls ._initialized = True
345352logger .debug (
346353"TelemetryClientFactory initialized with thread pool (max_workers=10)"
347354 )
348355
356+ @classmethod
357+ def _start_flush_thread (cls ):
358+ """Start the shared background thread for periodic flushing of all clients"""
359+ cls ._flush_event .clear ()
360+ cls ._flush_thread = threading .Thread (target = cls ._flush_worker ,daemon = True )
361+ cls ._flush_thread .start ()
362+
363+ @classmethod
364+ def _flush_worker (cls ):
365+ """Background worker thread for periodic flushing of all clients"""
366+ while not cls ._flush_event .wait (cls ._flush_interval_seconds ):
367+ logger .debug ("Performing periodic flush for all telemetry clients" )
368+
369+ with cls ._lock :
370+ clients_to_flush = list (cls ._clients .values ())
371+
372+ for client in clients_to_flush :
373+ client ._flush ()
374+
375+ @classmethod
376+ def _stop_flush_thread (cls ):
377+ """Stop the shared background flush thread"""
378+ if cls ._flush_thread is not None :
379+ cls ._flush_event .set ()
380+ cls ._flush_thread .join (timeout = 1.0 )
381+ cls ._flush_thread = None
382+
349383@classmethod
350384def _install_exception_hook (cls ):
351385"""Install global exception handler for unhandled exceptions"""
@@ -374,6 +408,7 @@ def initialize_telemetry_client(
374408session_id_hex ,
375409auth_provider ,
376410host_url ,
411+ batch_size ,
377412 ):
378413"""Initialize a telemetry client for a specific connection if telemetry is enabled"""
379414try :
@@ -395,6 +430,7 @@ def initialize_telemetry_client(
395430auth_provider = auth_provider ,
396431host_url = host_url ,
397432executor = TelemetryClientFactory ._executor ,
433+ batch_size = batch_size ,
398434 )
399435else :
400436TelemetryClientFactory ._clients [
@@ -433,6 +469,7 @@ def close(session_id_hex):
433469"No more telemetry clients, shutting down thread pool executor"
434470 )
435471try :
472+ TelemetryClientFactory ._stop_flush_thread ()
436473TelemetryClientFactory ._executor .shutdown (wait = True )
437474TelemetryHttpClient .close ()
438475except Exception as e :
@@ -458,6 +495,7 @@ def connection_failure_log(
458495session_id_hex = UNAUTH_DUMMY_SESSION_ID ,
459496auth_provider = None ,
460497host_url = host_url ,
498+ batch_size = TelemetryClientFactory .DEFAULT_BATCH_SIZE ,
461499 )
462500
463501telemetry_client = TelemetryClientFactory .get_telemetry_client (