Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Processors

ConsoleSpanExporter

Bases:TracingExporter

Prints the traces and spans to the console.

Source code insrc/agents/tracing/processors.py
classConsoleSpanExporter(TracingExporter):"""Prints the traces and spans to the console."""defexport(self,items:list[Trace|Span[Any]])->None:foriteminitems:ifisinstance(item,Trace):print(f"[Exporter] Export trace_id={item.trace_id}, name={item.name}, ")else:print(f"[Exporter] Export span:{item.export()}")

BackendSpanExporter

Bases:TracingExporter

Source code insrc/agents/tracing/processors.py
classBackendSpanExporter(TracingExporter):def__init__(self,api_key:str|None=None,organization:str|None=None,project:str|None=None,endpoint:str="https://api.openai.com/v1/traces/ingest",max_retries:int=3,base_delay:float=1.0,max_delay:float=30.0,):"""        Args:            api_key: The API key for the "Authorization" header. Defaults to                `os.environ["OPENAI_API_KEY"]` if not provided.            organization: The OpenAI organization to use. Defaults to                `os.environ["OPENAI_ORG_ID"]` if not provided.            project: The OpenAI project to use. Defaults to                `os.environ["OPENAI_PROJECT_ID"]` if not provided.            endpoint: The HTTP endpoint to which traces/spans are posted.            max_retries: Maximum number of retries upon failures.            base_delay: Base delay (in seconds) for the first backoff.            max_delay: Maximum delay (in seconds) for backoff growth.        """self._api_key=api_keyself._organization=organizationself._project=projectself.endpoint=endpointself.max_retries=max_retriesself.base_delay=base_delayself.max_delay=max_delay# Keep a client open for connection pooling across multiple export callsself._client=httpx.Client(timeout=httpx.Timeout(timeout=60,connect=5.0))defset_api_key(self,api_key:str):"""Set the OpenAI API key for the exporter.        Args:            api_key: The OpenAI API key to use. This is the same key used by the OpenAI Python                client.        """# We're specifically setting the underlying cached property as wellself._api_key=api_keyself.api_key=api_key@cached_propertydefapi_key(self):returnself._api_keyoros.environ.get("OPENAI_API_KEY")@cached_propertydeforganization(self):returnself._organizationoros.environ.get("OPENAI_ORG_ID")@cached_propertydefproject(self):returnself._projectoros.environ.get("OPENAI_PROJECT_ID")defexport(self,items:list[Trace|Span[Any]])->None:ifnotitems:returnifnotself.api_key:logger.warning("OPENAI_API_KEY is not set, skipping trace export")returndata=[item.export()foriteminitemsifitem.export()]payload={"data":data}headers={"Authorization":f"Bearer{self.api_key}","Content-Type":"application/json","OpenAI-Beta":"traces=v1",}ifself.organization:headers["OpenAI-Organization"]=self.organizationifself.project:headers["OpenAI-Project"]=self.project# Exponential backoff loopattempt=0delay=self.base_delaywhileTrue:attempt+=1try:response=self._client.post(url=self.endpoint,headers=headers,json=payload)# If the response is successful, break out of the loopifresponse.status_code<300:logger.debug(f"Exported{len(items)} items")return# If the response is a client error (4xx), we wont retryif400<=response.status_code<500:logger.error(f"[non-fatal] Tracing client error{response.status_code}:{response.text}")return# For 5xx or other unexpected codes, treat it as transient and retrylogger.warning(f"[non-fatal] Tracing: server error{response.status_code}, retrying.")excepthttpx.RequestErrorasexc:# Network or other I/O error, we'll retrylogger.warning(f"[non-fatal] Tracing: request failed:{exc}")# If we reach here, we need to retry or give upifattempt>=self.max_retries:logger.error("[non-fatal] Tracing: max retries reached, giving up on this batch.")return# Exponential backoff + jittersleep_time=delay+random.uniform(0,0.1*delay)# 10% jittertime.sleep(sleep_time)delay=min(delay*2,self.max_delay)defclose(self):"""Close the underlying HTTP client."""self._client.close()

__init__

__init__(api_key:str|None=None,organization:str|None=None,project:str|None=None,endpoint:str="https://api.openai.com/v1/traces/ingest",max_retries:int=3,base_delay:float=1.0,max_delay:float=30.0,)

Parameters:

NameTypeDescriptionDefault
api_keystr | None

The API key for the "Authorization" header. Defaults toos.environ["OPENAI_API_KEY"] if not provided.

None
organizationstr | None

The OpenAI organization to use. Defaults toos.environ["OPENAI_ORG_ID"] if not provided.

None
projectstr | None

The OpenAI project to use. Defaults toos.environ["OPENAI_PROJECT_ID"] if not provided.

None
endpointstr

The HTTP endpoint to which traces/spans are posted.

'https://api.openai.com/v1/traces/ingest'
max_retriesint

Maximum number of retries upon failures.

3
base_delayfloat

Base delay (in seconds) for the first backoff.

1.0
max_delayfloat

Maximum delay (in seconds) for backoff growth.

30.0
Source code insrc/agents/tracing/processors.py
def__init__(self,api_key:str|None=None,organization:str|None=None,project:str|None=None,endpoint:str="https://api.openai.com/v1/traces/ingest",max_retries:int=3,base_delay:float=1.0,max_delay:float=30.0,):"""    Args:        api_key: The API key for the "Authorization" header. Defaults to            `os.environ["OPENAI_API_KEY"]` if not provided.        organization: The OpenAI organization to use. Defaults to            `os.environ["OPENAI_ORG_ID"]` if not provided.        project: The OpenAI project to use. Defaults to            `os.environ["OPENAI_PROJECT_ID"]` if not provided.        endpoint: The HTTP endpoint to which traces/spans are posted.        max_retries: Maximum number of retries upon failures.        base_delay: Base delay (in seconds) for the first backoff.        max_delay: Maximum delay (in seconds) for backoff growth.    """self._api_key=api_keyself._organization=organizationself._project=projectself.endpoint=endpointself.max_retries=max_retriesself.base_delay=base_delayself.max_delay=max_delay# Keep a client open for connection pooling across multiple export callsself._client=httpx.Client(timeout=httpx.Timeout(timeout=60,connect=5.0))

set_api_key

set_api_key(api_key:str)

Set the OpenAI API key for the exporter.

Parameters:

NameTypeDescriptionDefault
api_keystr

The OpenAI API key to use. This is the same key used by the OpenAI Pythonclient.

required
Source code insrc/agents/tracing/processors.py
defset_api_key(self,api_key:str):"""Set the OpenAI API key for the exporter.    Args:        api_key: The OpenAI API key to use. This is the same key used by the OpenAI Python            client.    """# We're specifically setting the underlying cached property as wellself._api_key=api_keyself.api_key=api_key

close

close()

Close the underlying HTTP client.

Source code insrc/agents/tracing/processors.py
defclose(self):"""Close the underlying HTTP client."""self._client.close()

BatchTraceProcessor

Bases:TracingProcessor

Some implementation notes:1. Using Queue, which is thread-safe.2. Using a background thread to export spans, to minimize any performance issues.3. Spans are stored in memory until they are exported.

Source code insrc/agents/tracing/processors.py
classBatchTraceProcessor(TracingProcessor):"""Some implementation notes:    1. Using Queue, which is thread-safe.    2. Using a background thread to export spans, to minimize any performance issues.    3. Spans are stored in memory until they are exported.    """def__init__(self,exporter:TracingExporter,max_queue_size:int=8192,max_batch_size:int=128,schedule_delay:float=5.0,export_trigger_ratio:float=0.7,):"""        Args:            exporter: The exporter to use.            max_queue_size: The maximum number of spans to store in the queue. After this, we will                start dropping spans.            max_batch_size: The maximum number of spans to export in a single batch.            schedule_delay: The delay between checks for new spans to export.            export_trigger_ratio: The ratio of the queue size at which we will trigger an export.        """self._exporter=exporterself._queue:queue.Queue[Trace|Span[Any]]=queue.Queue(maxsize=max_queue_size)self._max_queue_size=max_queue_sizeself._max_batch_size=max_batch_sizeself._schedule_delay=schedule_delayself._shutdown_event=threading.Event()# The queue size threshold at which we export immediately.self._export_trigger_size=int(max_queue_size*export_trigger_ratio)# Track when we next *must* perform a scheduled exportself._next_export_time=time.time()+self._schedule_delay# We lazily start the background worker thread the first time a span/trace is queued.self._worker_thread:threading.Thread|None=Noneself._thread_start_lock=threading.Lock()def_ensure_thread_started(self)->None:# Fast path without holding the lockifself._worker_threadandself._worker_thread.is_alive():return# Double-checked locking to avoid starting multiple threadswithself._thread_start_lock:ifself._worker_threadandself._worker_thread.is_alive():returnself._worker_thread=threading.Thread(target=self._run,daemon=True)self._worker_thread.start()defon_trace_start(self,trace:Trace)->None:# Ensure the background worker is running before we enqueue anything.self._ensure_thread_started()try:self._queue.put_nowait(trace)exceptqueue.Full:logger.warning("Queue is full, dropping trace.")defon_trace_end(self,trace:Trace)->None:# We send traces via on_trace_start, so we don't need to do anything here.passdefon_span_start(self,span:Span[Any])->None:# We send spans via on_span_end, so we don't need to do anything here.passdefon_span_end(self,span:Span[Any])->None:# Ensure the background worker is running before we enqueue anything.self._ensure_thread_started()try:self._queue.put_nowait(span)exceptqueue.Full:logger.warning("Queue is full, dropping span.")defshutdown(self,timeout:float|None=None):"""        Called when the application stops. We signal our thread to stop, then join it.        """self._shutdown_event.set()# Only join if we ever started the background thread; otherwise flush synchronously.ifself._worker_threadandself._worker_thread.is_alive():self._worker_thread.join(timeout=timeout)else:# No background thread: process any remaining items synchronously.self._export_batches(force=True)defforce_flush(self):"""        Forces an immediate flush of all queued spans.        """self._export_batches(force=True)def_run(self):whilenotself._shutdown_event.is_set():current_time=time.time()queue_size=self._queue.qsize()# If it's time for a scheduled flush or queue is above the trigger thresholdifcurrent_time>=self._next_export_timeorqueue_size>=self._export_trigger_size:self._export_batches(force=False)# Reset the next scheduled flush timeself._next_export_time=time.time()+self._schedule_delayelse:# Sleep a short interval so we don't busy-wait.time.sleep(0.2)# Final drain after shutdownself._export_batches(force=True)def_export_batches(self,force:bool=False):"""Drains the queue and exports in batches. If force=True, export everything.        Otherwise, export up to `max_batch_size` repeatedly until the queue is empty or below a        certain threshold.        """whileTrue:items_to_export:list[Span[Any]|Trace]=[]# Gather a batch of spans up to max_batch_sizewhilenotself._queue.empty()and(forceorlen(items_to_export)<self._max_batch_size):try:items_to_export.append(self._queue.get_nowait())exceptqueue.Empty:# Another thread might have emptied the queue between checksbreak# If we collected nothing, we're doneifnotitems_to_export:break# Export the batchself._exporter.export(items_to_export)

__init__

__init__(exporter:TracingExporter,max_queue_size:int=8192,max_batch_size:int=128,schedule_delay:float=5.0,export_trigger_ratio:float=0.7,)

Parameters:

NameTypeDescriptionDefault
exporterTracingExporter

The exporter to use.

required
max_queue_sizeint

The maximum number of spans to store in the queue. After this, we willstart dropping spans.

8192
max_batch_sizeint

The maximum number of spans to export in a single batch.

128
schedule_delayfloat

The delay between checks for new spans to export.

5.0
export_trigger_ratiofloat

The ratio of the queue size at which we will trigger an export.

0.7
Source code insrc/agents/tracing/processors.py
def__init__(self,exporter:TracingExporter,max_queue_size:int=8192,max_batch_size:int=128,schedule_delay:float=5.0,export_trigger_ratio:float=0.7,):"""    Args:        exporter: The exporter to use.        max_queue_size: The maximum number of spans to store in the queue. After this, we will            start dropping spans.        max_batch_size: The maximum number of spans to export in a single batch.        schedule_delay: The delay between checks for new spans to export.        export_trigger_ratio: The ratio of the queue size at which we will trigger an export.    """self._exporter=exporterself._queue:queue.Queue[Trace|Span[Any]]=queue.Queue(maxsize=max_queue_size)self._max_queue_size=max_queue_sizeself._max_batch_size=max_batch_sizeself._schedule_delay=schedule_delayself._shutdown_event=threading.Event()# The queue size threshold at which we export immediately.self._export_trigger_size=int(max_queue_size*export_trigger_ratio)# Track when we next *must* perform a scheduled exportself._next_export_time=time.time()+self._schedule_delay# We lazily start the background worker thread the first time a span/trace is queued.self._worker_thread:threading.Thread|None=Noneself._thread_start_lock=threading.Lock()

shutdown

shutdown(timeout:float|None=None)

Called when the application stops. We signal our thread to stop, then join it.

Source code insrc/agents/tracing/processors.py
defshutdown(self,timeout:float|None=None):"""    Called when the application stops. We signal our thread to stop, then join it.    """self._shutdown_event.set()# Only join if we ever started the background thread; otherwise flush synchronously.ifself._worker_threadandself._worker_thread.is_alive():self._worker_thread.join(timeout=timeout)else:# No background thread: process any remaining items synchronously.self._export_batches(force=True)

force_flush

force_flush()

Forces an immediate flush of all queued spans.

Source code insrc/agents/tracing/processors.py
defforce_flush(self):"""    Forces an immediate flush of all queued spans.    """self._export_batches(force=True)

default_exporter

default_exporter()->BackendSpanExporter

The default exporter, which exports traces and spans to the backend in batches.

Source code insrc/agents/tracing/processors.py
defdefault_exporter()->BackendSpanExporter:"""The default exporter, which exports traces and spans to the backend in batches."""return_global_exporter

default_processor

default_processor()->BatchTraceProcessor

The default processor, which exports traces and spans to the backend in batches.

Source code insrc/agents/tracing/processors.py
defdefault_processor()->BatchTraceProcessor:"""The default processor, which exports traces and spans to the backend in batches."""return_global_processor

[8]ページ先頭

©2009-2025 Movatter.jp