Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Results

RunResultBasedataclass

Bases:ABC

Source code insrc/agents/result.py
@dataclassclassRunResultBase(abc.ABC):input:str|list[TResponseInputItem]"""The original input items i.e. the items before run() was called. This may be a mutated    version of the input, if there are handoff input filters that mutate the input.    """new_items:list[RunItem]"""The new items generated during the agent run. These include things like new messages, tool    calls and their outputs, etc.    """raw_responses:list[ModelResponse]"""The raw LLM responses generated by the model during the agent run."""final_output:Any"""The output of the last agent."""input_guardrail_results:list[InputGuardrailResult]"""Guardrail results for the input messages."""output_guardrail_results:list[OutputGuardrailResult]"""Guardrail results for the final output of the agent."""tool_input_guardrail_results:list[ToolInputGuardrailResult]"""Tool input guardrail results from all tools executed during the run."""tool_output_guardrail_results:list[ToolOutputGuardrailResult]"""Tool output guardrail results from all tools executed during the run."""context_wrapper:RunContextWrapper[Any]"""The context wrapper for the agent run."""_trace_state:TraceState|None=field(default=None,init=False,repr=False)"""Serialized trace metadata captured during the run."""@property@abc.abstractmethoddeflast_agent(self)->Agent[Any]:"""The last agent that was run."""defrelease_agents(self,*,release_new_items:bool=True)->None:"""        Release strong references to agents held by this result. After calling this method,        accessing `item.agent` or `last_agent` may return `None` if the agent has been garbage        collected. Callers can use this when they are done inspecting the result and want to        eagerly drop any associated agent graph.        """ifrelease_new_items:foriteminself.new_items:release=getattr(item,"release_agent",None)ifcallable(release):release()self._release_last_agent_reference()def__del__(self)->None:try:# Fall back to releasing agents automatically in case the caller never invoked# `release_agents()` explicitly so GC of the RunResult drops the last strong reference.# We pass `release_new_items=False` so RunItems that the user intentionally keeps# continue exposing their originating agent until that agent itself is collected.self.release_agents(release_new_items=False)exceptException:# Avoid raising from __del__.pass@abc.abstractmethoddef_release_last_agent_reference(self)->None:"""Release stored agent reference specific to the concrete result type."""deffinal_output_as(self,cls:type[T],raise_if_incorrect_type:bool=False)->T:"""A convenience method to cast the final output to a specific type. By default, the cast        is only for the typechecker. If you set `raise_if_incorrect_type` to True, we'll raise a        TypeError if the final output is not of the given type.        Args:            cls: The type to cast the final output to.            raise_if_incorrect_type: If True, we'll raise a TypeError if the final output is not of                the given type.        Returns:            The final output casted to the given type.        """ifraise_if_incorrect_typeandnotisinstance(self.final_output,cls):raiseTypeError(f"Final output is not of type{cls.__name__}")returncast(T,self.final_output)defto_input_list(self)->list[TResponseInputItem]:"""Creates a new input list, merging the original input with all the new items generated."""original_items:list[TResponseInputItem]=ItemHelpers.input_to_new_input_list(self.input)new_items:list[TResponseInputItem]=[]foriteminself.new_items:ifisinstance(item,ToolApprovalItem):continuenew_items.append(item.to_input_item())returnoriginal_items+new_items@propertydeflast_response_id(self)->str|None:"""Convenience method to get the response ID of the last model response."""ifnotself.raw_responses:returnNonereturnself.raw_responses[-1].response_id

inputinstance-attribute

input:str|list[TResponseInputItem]

The original input items i.e. the items before run() was called. This may be a mutatedversion of the input, if there are handoff input filters that mutate the input.

new_itemsinstance-attribute

new_items:list[RunItem]

The new items generated during the agent run. These include things like new messages, toolcalls and their outputs, etc.

raw_responsesinstance-attribute

raw_responses:list[ModelResponse]

The raw LLM responses generated by the model during the agent run.

final_outputinstance-attribute

final_output:Any

The output of the last agent.

input_guardrail_resultsinstance-attribute

input_guardrail_results:list[InputGuardrailResult]

Guardrail results for the input messages.

output_guardrail_resultsinstance-attribute

output_guardrail_results:list[OutputGuardrailResult]

Guardrail results for the final output of the agent.

tool_input_guardrail_resultsinstance-attribute

tool_input_guardrail_results:list[ToolInputGuardrailResult]

Tool input guardrail results from all tools executed during the run.

tool_output_guardrail_resultsinstance-attribute

tool_output_guardrail_results:list[ToolOutputGuardrailResult]

Tool output guardrail results from all tools executed during the run.

context_wrapperinstance-attribute

context_wrapper:RunContextWrapper[Any]

The context wrapper for the agent run.

last_agentabstractmethodproperty

last_agent:Agent[Any]

The last agent that was run.

last_response_idproperty

last_response_id:str|None

Convenience method to get the response ID of the last model response.

release_agents

release_agents(*,release_new_items:bool=True)->None

Release strong references to agents held by this result. After calling this method,accessingitem.agent orlast_agent may returnNone if the agent has been garbagecollected. Callers can use this when they are done inspecting the result and want toeagerly drop any associated agent graph.

Source code insrc/agents/result.py
defrelease_agents(self,*,release_new_items:bool=True)->None:"""    Release strong references to agents held by this result. After calling this method,    accessing `item.agent` or `last_agent` may return `None` if the agent has been garbage    collected. Callers can use this when they are done inspecting the result and want to    eagerly drop any associated agent graph.    """ifrelease_new_items:foriteminself.new_items:release=getattr(item,"release_agent",None)ifcallable(release):release()self._release_last_agent_reference()

final_output_as

final_output_as(cls:type[T],raise_if_incorrect_type:bool=False)->T

A convenience method to cast the final output to a specific type. By default, the castis only for the typechecker. If you setraise_if_incorrect_type to True, we'll raise aTypeError if the final output is not of the given type.

Parameters:

NameTypeDescriptionDefault
clstype[T]

The type to cast the final output to.

required
raise_if_incorrect_typebool

If True, we'll raise a TypeError if the final output is not ofthe given type.

False

Returns:

TypeDescription
T

The final output casted to the given type.

Source code insrc/agents/result.py
deffinal_output_as(self,cls:type[T],raise_if_incorrect_type:bool=False)->T:"""A convenience method to cast the final output to a specific type. By default, the cast    is only for the typechecker. If you set `raise_if_incorrect_type` to True, we'll raise a    TypeError if the final output is not of the given type.    Args:        cls: The type to cast the final output to.        raise_if_incorrect_type: If True, we'll raise a TypeError if the final output is not of            the given type.    Returns:        The final output casted to the given type.    """ifraise_if_incorrect_typeandnotisinstance(self.final_output,cls):raiseTypeError(f"Final output is not of type{cls.__name__}")returncast(T,self.final_output)

to_input_list

to_input_list()->list[TResponseInputItem]

Creates a new input list, merging the original input with all the new items generated.

Source code insrc/agents/result.py
defto_input_list(self)->list[TResponseInputItem]:"""Creates a new input list, merging the original input with all the new items generated."""original_items:list[TResponseInputItem]=ItemHelpers.input_to_new_input_list(self.input)new_items:list[TResponseInputItem]=[]foriteminself.new_items:ifisinstance(item,ToolApprovalItem):continuenew_items.append(item.to_input_item())returnoriginal_items+new_items

RunResultdataclass

Bases:RunResultBase

Source code insrc/agents/result.py
@dataclassclassRunResult(RunResultBase):_last_agent:Agent[Any]_last_agent_ref:weakref.ReferenceType[Agent[Any]]|None=field(init=False,repr=False,default=None,)_last_processed_response:ProcessedResponse|None=field(default=None,repr=False)"""The last processed model response. This is needed for resuming from interruptions."""_tool_use_tracker_snapshot:dict[str,list[str]]=field(default_factory=dict,repr=False)_current_turn_persisted_item_count:int=0"""Number of items from new_items already persisted to session for the    current turn."""_current_turn:int=0"""The current turn number. This is preserved when converting to RunState."""_model_input_items:list[RunItem]=field(default_factory=list,repr=False)"""Filtered items used to build model input when resuming runs."""_original_input:str|list[TResponseInputItem]|None=field(default=None,repr=False)"""The original input for the current run segment.    This is updated when handoffs or resume logic replace the input history, and used by to_state()    to preserve the correct originalInput when serializing state."""_conversation_id:str|None=field(default=None,repr=False)"""Conversation identifier for server-managed runs."""_previous_response_id:str|None=field(default=None,repr=False)"""Response identifier returned by the server for the last turn."""_auto_previous_response_id:bool=field(default=False,repr=False)"""Whether automatic previous response tracking was enabled."""max_turns:int=10"""The maximum number of turns allowed for this run."""interruptions:list[ToolApprovalItem]=field(default_factory=list)"""Pending tool approval requests (interruptions) for this run."""def__post_init__(self)->None:self._last_agent_ref=weakref.ref(self._last_agent)@propertydeflast_agent(self)->Agent[Any]:"""The last agent that was run."""agent=cast("Agent[Any] | None",self.__dict__.get("_last_agent"))ifagentisnotNone:returnagentifself._last_agent_ref:agent=self._last_agent_ref()ifagentisnotNone:returnagentraiseAgentsException("Last agent reference is no longer available.")def_release_last_agent_reference(self)->None:agent=cast("Agent[Any] | None",self.__dict__.get("_last_agent"))ifagentisNone:returnself._last_agent_ref=weakref.ref(agent)# Preserve dataclass field so repr/asdict continue to succeed.self.__dict__["_last_agent"]=Nonedefto_state(self)->RunState[Any]:"""Create a RunState from this result to resume execution.        This is useful when the run was interrupted (e.g., for tool approval). You can        approve or reject the tool calls on the returned state, then pass it back to        `Runner.run()` to continue execution.        Returns:            A RunState that can be used to resume the run.        Example:            ```python            # Run agent until it needs approval            result = await Runner.run(agent, "Use the delete_file tool")            if result.interruptions:                # Approve the tool call                state = result.to_state()                state.approve(result.interruptions[0])                # Resume the run                result = await Runner.run(agent, state)            ```        """# Create a RunState from the current resultoriginal_input_for_state=getattr(self,"_original_input",None)state=RunState(context=self.context_wrapper,original_input=original_input_for_stateiforiginal_input_for_stateisnotNoneelseself.input,starting_agent=self.last_agent,max_turns=self.max_turns,)return_populate_state_from_result(state,self,current_turn=self._current_turn,last_processed_response=self._last_processed_response,current_turn_persisted_item_count=self._current_turn_persisted_item_count,tool_use_tracker_snapshot=self._tool_use_tracker_snapshot,conversation_id=self._conversation_id,previous_response_id=self._previous_response_id,auto_previous_response_id=self._auto_previous_response_id,)def__str__(self)->str:returnpretty_print_result(self)

max_turnsclass-attributeinstance-attribute

max_turns:int=10

The maximum number of turns allowed for this run.

interruptionsclass-attributeinstance-attribute

interruptions:list[ToolApprovalItem]=field(default_factory=list)

Pending tool approval requests (interruptions) for this run.

last_agentproperty

last_agent:Agent[Any]

The last agent that was run.

inputinstance-attribute

input:str|list[TResponseInputItem]

The original input items i.e. the items before run() was called. This may be a mutatedversion of the input, if there are handoff input filters that mutate the input.

new_itemsinstance-attribute

new_items:list[RunItem]

The new items generated during the agent run. These include things like new messages, toolcalls and their outputs, etc.

raw_responsesinstance-attribute

raw_responses:list[ModelResponse]

The raw LLM responses generated by the model during the agent run.

final_outputinstance-attribute

final_output:Any

The output of the last agent.

input_guardrail_resultsinstance-attribute

input_guardrail_results:list[InputGuardrailResult]

Guardrail results for the input messages.

output_guardrail_resultsinstance-attribute

output_guardrail_results:list[OutputGuardrailResult]

Guardrail results for the final output of the agent.

tool_input_guardrail_resultsinstance-attribute

tool_input_guardrail_results:list[ToolInputGuardrailResult]

Tool input guardrail results from all tools executed during the run.

tool_output_guardrail_resultsinstance-attribute

tool_output_guardrail_results:list[ToolOutputGuardrailResult]

Tool output guardrail results from all tools executed during the run.

context_wrapperinstance-attribute

context_wrapper:RunContextWrapper[Any]

The context wrapper for the agent run.

last_response_idproperty

last_response_id:str|None

Convenience method to get the response ID of the last model response.

to_state

to_state()->RunState[Any]

Create a RunState from this result to resume execution.

This is useful when the run was interrupted (e.g., for tool approval). You canapprove or reject the tool calls on the returned state, then pass it back toRunner.run() to continue execution.

Returns:

TypeDescription
RunState[Any]

A RunState that can be used to resume the run.

Example
# Run agent until it needs approvalresult=awaitRunner.run(agent,"Use the delete_file tool")ifresult.interruptions:# Approve the tool callstate=result.to_state()state.approve(result.interruptions[0])# Resume the runresult=awaitRunner.run(agent,state)
Source code insrc/agents/result.py
defto_state(self)->RunState[Any]:"""Create a RunState from this result to resume execution.    This is useful when the run was interrupted (e.g., for tool approval). You can    approve or reject the tool calls on the returned state, then pass it back to    `Runner.run()` to continue execution.    Returns:        A RunState that can be used to resume the run.    Example:        ```python        # Run agent until it needs approval        result = await Runner.run(agent, "Use the delete_file tool")        if result.interruptions:            # Approve the tool call            state = result.to_state()            state.approve(result.interruptions[0])            # Resume the run            result = await Runner.run(agent, state)        ```    """# Create a RunState from the current resultoriginal_input_for_state=getattr(self,"_original_input",None)state=RunState(context=self.context_wrapper,original_input=original_input_for_stateiforiginal_input_for_stateisnotNoneelseself.input,starting_agent=self.last_agent,max_turns=self.max_turns,)return_populate_state_from_result(state,self,current_turn=self._current_turn,last_processed_response=self._last_processed_response,current_turn_persisted_item_count=self._current_turn_persisted_item_count,tool_use_tracker_snapshot=self._tool_use_tracker_snapshot,conversation_id=self._conversation_id,previous_response_id=self._previous_response_id,auto_previous_response_id=self._auto_previous_response_id,)

release_agents

release_agents(*,release_new_items:bool=True)->None

Release strong references to agents held by this result. After calling this method,accessingitem.agent orlast_agent may returnNone if the agent has been garbagecollected. Callers can use this when they are done inspecting the result and want toeagerly drop any associated agent graph.

Source code insrc/agents/result.py
defrelease_agents(self,*,release_new_items:bool=True)->None:"""    Release strong references to agents held by this result. After calling this method,    accessing `item.agent` or `last_agent` may return `None` if the agent has been garbage    collected. Callers can use this when they are done inspecting the result and want to    eagerly drop any associated agent graph.    """ifrelease_new_items:foriteminself.new_items:release=getattr(item,"release_agent",None)ifcallable(release):release()self._release_last_agent_reference()

final_output_as

final_output_as(cls:type[T],raise_if_incorrect_type:bool=False)->T

A convenience method to cast the final output to a specific type. By default, the castis only for the typechecker. If you setraise_if_incorrect_type to True, we'll raise aTypeError if the final output is not of the given type.

Parameters:

NameTypeDescriptionDefault
clstype[T]

The type to cast the final output to.

required
raise_if_incorrect_typebool

If True, we'll raise a TypeError if the final output is not ofthe given type.

False

Returns:

TypeDescription
T

The final output casted to the given type.

Source code insrc/agents/result.py
deffinal_output_as(self,cls:type[T],raise_if_incorrect_type:bool=False)->T:"""A convenience method to cast the final output to a specific type. By default, the cast    is only for the typechecker. If you set `raise_if_incorrect_type` to True, we'll raise a    TypeError if the final output is not of the given type.    Args:        cls: The type to cast the final output to.        raise_if_incorrect_type: If True, we'll raise a TypeError if the final output is not of            the given type.    Returns:        The final output casted to the given type.    """ifraise_if_incorrect_typeandnotisinstance(self.final_output,cls):raiseTypeError(f"Final output is not of type{cls.__name__}")returncast(T,self.final_output)

to_input_list

to_input_list()->list[TResponseInputItem]

Creates a new input list, merging the original input with all the new items generated.

Source code insrc/agents/result.py
defto_input_list(self)->list[TResponseInputItem]:"""Creates a new input list, merging the original input with all the new items generated."""original_items:list[TResponseInputItem]=ItemHelpers.input_to_new_input_list(self.input)new_items:list[TResponseInputItem]=[]foriteminself.new_items:ifisinstance(item,ToolApprovalItem):continuenew_items.append(item.to_input_item())returnoriginal_items+new_items

RunResultStreamingdataclass

Bases:RunResultBase

The result of an agent run in streaming mode. You can use thestream_events method toreceive semantic events as they are generated.

The streaming method will raise:- A MaxTurnsExceeded exception if the agent exceeds the max_turns limit.- A GuardrailTripwireTriggered exception if a guardrail is tripped.

Source code insrc/agents/result.py
306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689
@dataclassclassRunResultStreaming(RunResultBase):"""The result of an agent run in streaming mode. You can use the `stream_events` method to    receive semantic events as they are generated.    The streaming method will raise:    - A MaxTurnsExceeded exception if the agent exceeds the max_turns limit.    - A GuardrailTripwireTriggered exception if a guardrail is tripped.    """current_agent:Agent[Any]"""The current agent that is running."""current_turn:int"""The current turn number."""max_turns:int"""The maximum number of turns the agent can run for."""final_output:Any"""The final output of the agent. This is None until the agent has finished running."""_current_agent_output_schema:AgentOutputSchemaBase|None=field(repr=False)trace:Trace|None=field(repr=False)is_complete:bool=False"""Whether the agent has finished running."""_current_agent_ref:weakref.ReferenceType[Agent[Any]]|None=field(init=False,repr=False,default=None,)_model_input_items:list[RunItem]=field(default_factory=list,repr=False)"""Filtered items used to build model input between streaming turns."""# Queues that the background run_loop writes to_event_queue:asyncio.Queue[StreamEvent|QueueCompleteSentinel]=field(default_factory=asyncio.Queue,repr=False)_input_guardrail_queue:asyncio.Queue[InputGuardrailResult]=field(default_factory=asyncio.Queue,repr=False)# Store the asyncio tasks that we're waiting onrun_loop_task:asyncio.Task[Any]|None=field(default=None,repr=False)_input_guardrails_task:asyncio.Task[Any]|None=field(default=None,repr=False)_output_guardrails_task:asyncio.Task[Any]|None=field(default=None,repr=False)_stored_exception:Exception|None=field(default=None,repr=False)_cancel_mode:Literal["none","immediate","after_turn"]=field(default="none",repr=False)_last_processed_response:ProcessedResponse|None=field(default=None,repr=False)"""The last processed model response. This is needed for resuming from interruptions."""interruptions:list[ToolApprovalItem]=field(default_factory=list)"""Pending tool approval requests (interruptions) for this run."""_waiting_on_event_queue:bool=field(default=False,repr=False)_current_turn_persisted_item_count:int=0"""Number of items from new_items already persisted to session for the    current turn."""_stream_input_persisted:bool=False"""Whether the input has been persisted to the session. Prevents double-saving."""_original_input_for_persistence:list[TResponseInputItem]=field(default_factory=list)"""Original turn input before session history was merged, used for    persistence (matches JS sessionInputOriginalSnapshot)."""_max_turns_handled:bool=field(default=False,repr=False)_original_input:str|list[TResponseInputItem]|None=field(default=None,repr=False)"""The original input from the first turn. Unlike `input`, this is never updated during the run.    Used by to_state() to preserve the correct originalInput when serializing state."""_tool_use_tracker_snapshot:dict[str,list[str]]=field(default_factory=dict,repr=False)_state:Any=field(default=None,repr=False)"""Internal reference to the RunState for streaming results."""_conversation_id:str|None=field(default=None,repr=False)"""Conversation identifier for server-managed runs."""_previous_response_id:str|None=field(default=None,repr=False)"""Response identifier returned by the server for the last turn."""_auto_previous_response_id:bool=field(default=False,repr=False)"""Whether automatic previous response tracking was enabled."""_run_impl_task:InitVar[asyncio.Task[Any]|None]=Nonedef__post_init__(self,_run_impl_task:asyncio.Task[Any]|None)->None:self._current_agent_ref=weakref.ref(self.current_agent)# Store the original input at creation time (it will be set via input field)ifself._original_inputisNone:self._original_input=self.input# Compatibility shim: accept legacy `_run_impl_task` constructor keyword.ifself.run_loop_taskisNoneand_run_impl_taskisnotNone:self.run_loop_task=_run_impl_task@propertydeflast_agent(self)->Agent[Any]:"""The last agent that was run. Updates as the agent run progresses, so the true last agent        is only available after the agent run is complete.        """agent=cast("Agent[Any] | None",self.__dict__.get("current_agent"))ifagentisnotNone:returnagentifself._current_agent_ref:agent=self._current_agent_ref()ifagentisnotNone:returnagentraiseAgentsException("Last agent reference is no longer available.")def_release_last_agent_reference(self)->None:agent=cast("Agent[Any] | None",self.__dict__.get("current_agent"))ifagentisNone:returnself._current_agent_ref=weakref.ref(agent)# Preserve dataclass field so repr/asdict continue to succeed.self.__dict__["current_agent"]=Nonedefcancel(self,mode:Literal["immediate","after_turn"]="immediate")->None:"""Cancel the streaming run.        Args:            mode: Cancellation strategy:                - "immediate": Stop immediately, cancel all tasks, clear queues (default)                - "after_turn": Complete current turn gracefully before stopping                    * Allows LLM response to finish                    * Executes pending tool calls                    * Saves session state properly                    * Tracks usage accurately                    * Stops before next turn begins        Example:            ```python            result = Runner.run_streamed(agent, "Task", session=session)            async for event in result.stream_events():                if user_interrupted():                    result.cancel(mode="after_turn")  # Graceful                    # result.cancel()  # Immediate (default)            ```        Note: After calling cancel(), you should continue consuming stream_events()        to allow the cancellation to complete properly.        """# Store the cancel mode for the background task to checkself._cancel_mode=modeifmode=="immediate":# Existing behavior - immediate shutdownself._cleanup_tasks()# Cancel all running tasksself.is_complete=True# Mark the run as complete to stop event streamingwhilenotself._input_guardrail_queue.empty():self._input_guardrail_queue.get_nowait()# Unblock any streamers waiting on the event queue.self._event_queue.put_nowait(QueueCompleteSentinel())ifnotself._waiting_on_event_queue:self._drain_event_queue()elifmode=="after_turn":# Soft cancel - just set the flag# The streaming loop will check this and stop gracefully# Don't call _cleanup_tasks() or clear queues yetpassasyncdefstream_events(self)->AsyncIterator[StreamEvent]:"""Stream deltas for new items as they are generated. We're using the types from the        OpenAI Responses API, so these are semantic events: each event has a `type` field that        describes the type of the event, along with the data for that event.        This will raise:        - A MaxTurnsExceeded exception if the agent exceeds the max_turns limit.        - A GuardrailTripwireTriggered exception if a guardrail is tripped.        """cancelled=Falsetry:whileTrue:self._check_errors()ifself._stored_exception:logger.debug("Breaking due to stored exception")self.is_complete=Truebreakifself.is_completeandself._event_queue.empty():breaktry:self._waiting_on_event_queue=Trueitem=awaitself._event_queue.get()exceptasyncio.CancelledError:cancelled=Trueself.cancel()raisefinally:self._waiting_on_event_queue=Falseifisinstance(item,QueueCompleteSentinel):# Await input guardrails if they are still running, so late# exceptions are captured.awaitself._await_task_safely(self._input_guardrails_task)self._event_queue.task_done()# Check for errors, in case the queue was completed# due to an exceptionself._check_errors()breakyielditemself._event_queue.task_done()finally:ifcancelled:# Cancellation should return promptly, so avoid waiting on long-running tasks.# Tasks have already been cancelled above.self._cleanup_tasks()else:# Ensure main execution completes before cleanup to avoid race conditions# with session operationsawaitself._await_task_safely(self.run_loop_task)# Safely terminate all background tasks after main execution has finishedself._cleanup_tasks()# Allow any pending callbacks (e.g., cancellation handlers) to enqueue their# completion sentinels before we clear the queues for observability.awaitasyncio.sleep(0)# Drain queues so callers observing internal state see them empty after completion.self._drain_event_queue()self._drain_input_guardrail_queue()ifself._stored_exception:raiseself._stored_exceptiondef_create_error_details(self)->RunErrorDetails:"""Return a `RunErrorDetails` object considering the current attributes of the class."""returnRunErrorDetails(input=self.input,new_items=self.new_items,raw_responses=self.raw_responses,last_agent=self.current_agent,context_wrapper=self.context_wrapper,input_guardrail_results=self.input_guardrail_results,output_guardrail_results=self.output_guardrail_results,)def_check_errors(self):ifself.current_turn>self.max_turnsandnotself._max_turns_handled:max_turns_exc=MaxTurnsExceeded(f"Max turns ({self.max_turns}) exceeded")max_turns_exc.run_data=self._create_error_details()self._stored_exception=max_turns_exc# Fetch all the completed guardrail results from the queue and raise if neededwhilenotself._input_guardrail_queue.empty():guardrail_result=self._input_guardrail_queue.get_nowait()ifguardrail_result.output.tripwire_triggered:tripwire_exc=InputGuardrailTripwireTriggered(guardrail_result)tripwire_exc.run_data=self._create_error_details()self._stored_exception=tripwire_exc# Check the tasks for any exceptionsifself.run_loop_taskandself.run_loop_task.done():ifnotself.run_loop_task.cancelled():run_impl_exc=self.run_loop_task.exception()ifrun_impl_excandisinstance(run_impl_exc,Exception):ifisinstance(run_impl_exc,AgentsException)andrun_impl_exc.run_dataisNone:run_impl_exc.run_data=self._create_error_details()self._stored_exception=run_impl_excifself._input_guardrails_taskandself._input_guardrails_task.done():ifnotself._input_guardrails_task.cancelled():in_guard_exc=self._input_guardrails_task.exception()ifin_guard_excandisinstance(in_guard_exc,Exception):ifisinstance(in_guard_exc,AgentsException)andin_guard_exc.run_dataisNone:in_guard_exc.run_data=self._create_error_details()self._stored_exception=in_guard_excifself._output_guardrails_taskandself._output_guardrails_task.done():ifnotself._output_guardrails_task.cancelled():out_guard_exc=self._output_guardrails_task.exception()ifout_guard_excandisinstance(out_guard_exc,Exception):if(isinstance(out_guard_exc,AgentsException)andout_guard_exc.run_dataisNone):out_guard_exc.run_data=self._create_error_details()self._stored_exception=out_guard_excdef_cleanup_tasks(self):ifself.run_loop_taskandnotself.run_loop_task.done():self.run_loop_task.cancel()ifself._input_guardrails_taskandnotself._input_guardrails_task.done():self._input_guardrails_task.cancel()ifself._output_guardrails_taskandnotself._output_guardrails_task.done():self._output_guardrails_task.cancel()def__str__(self)->str:returnpretty_print_run_result_streaming(self)asyncdef_await_task_safely(self,task:asyncio.Task[Any]|None)->None:"""Await a task if present, ignoring cancellation and storing exceptions elsewhere.        This ensures we do not lose late guardrail exceptions while not surfacing        CancelledError to callers of stream_events.        """iftaskandnottask.done():try:awaittaskexceptasyncio.CancelledError:# Task was cancelled (e.g., due to result.cancel()). Nothing to do here.passexceptException:# The exception will be surfaced via _check_errors() if needed.passdef_drain_event_queue(self)->None:"""Remove any pending items from the event queue and mark them done."""whilenotself._event_queue.empty():try:self._event_queue.get_nowait()self._event_queue.task_done()exceptasyncio.QueueEmpty:breakexceptValueError:# task_done called too many times; nothing more to drain.breakdef_drain_input_guardrail_queue(self)->None:"""Remove any pending items from the input guardrail queue."""whilenotself._input_guardrail_queue.empty():try:self._input_guardrail_queue.get_nowait()exceptasyncio.QueueEmpty:breakdefto_state(self)->RunState[Any]:"""Create a RunState from this streaming result to resume execution.        This is useful when the run was interrupted (e.g., for tool approval). You can        approve or reject the tool calls on the returned state, then pass it back to        `Runner.run_streamed()` to continue execution.        Returns:            A RunState that can be used to resume the run.        Example:            ```python            # Run agent until it needs approval            result = Runner.run_streamed(agent, "Use the delete_file tool")            async for event in result.stream_events():                pass            if result.interruptions:                # Approve the tool call                state = result.to_state()                state.approve(result.interruptions[0])                # Resume the run                result = Runner.run_streamed(agent, state)                async for event in result.stream_events():                    pass            ```        """# Create a RunState from the current result# Use _original_input (updated on handoffs/resume when input history changes).# This avoids serializing a mutated view of input history.state=RunState(context=self.context_wrapper,original_input=self._original_inputifself._original_inputisnotNoneelseself.input,starting_agent=self.last_agent,max_turns=self.max_turns,)return_populate_state_from_result(state,self,current_turn=self.current_turn,last_processed_response=self._last_processed_response,current_turn_persisted_item_count=self._current_turn_persisted_item_count,tool_use_tracker_snapshot=self._tool_use_tracker_snapshot,conversation_id=self._conversation_id,previous_response_id=self._previous_response_id,auto_previous_response_id=self._auto_previous_response_id,)

current_agentinstance-attribute

current_agent:Agent[Any]

The current agent that is running.

current_turninstance-attribute

current_turn:int

The current turn number.

max_turnsinstance-attribute

max_turns:int

The maximum number of turns the agent can run for.

final_outputinstance-attribute

final_output:Any

The final output of the agent. This is None until the agent has finished running.

is_completeclass-attributeinstance-attribute

is_complete:bool=False

Whether the agent has finished running.

interruptionsclass-attributeinstance-attribute

interruptions:list[ToolApprovalItem]=field(default_factory=list)

Pending tool approval requests (interruptions) for this run.

last_agentproperty

last_agent:Agent[Any]

The last agent that was run. Updates as the agent run progresses, so the true last agentis only available after the agent run is complete.

inputinstance-attribute

input:str|list[TResponseInputItem]

The original input items i.e. the items before run() was called. This may be a mutatedversion of the input, if there are handoff input filters that mutate the input.

new_itemsinstance-attribute

new_items:list[RunItem]

The new items generated during the agent run. These include things like new messages, toolcalls and their outputs, etc.

raw_responsesinstance-attribute

raw_responses:list[ModelResponse]

The raw LLM responses generated by the model during the agent run.

input_guardrail_resultsinstance-attribute

input_guardrail_results:list[InputGuardrailResult]

Guardrail results for the input messages.

output_guardrail_resultsinstance-attribute

output_guardrail_results:list[OutputGuardrailResult]

Guardrail results for the final output of the agent.

tool_input_guardrail_resultsinstance-attribute

tool_input_guardrail_results:list[ToolInputGuardrailResult]

Tool input guardrail results from all tools executed during the run.

tool_output_guardrail_resultsinstance-attribute

tool_output_guardrail_results:list[ToolOutputGuardrailResult]

Tool output guardrail results from all tools executed during the run.

context_wrapperinstance-attribute

context_wrapper:RunContextWrapper[Any]

The context wrapper for the agent run.

last_response_idproperty

last_response_id:str|None

Convenience method to get the response ID of the last model response.

cancel

cancel(mode:Literal["immediate","after_turn"]="immediate",)->None

Cancel the streaming run.

Parameters:

NameTypeDescriptionDefault
modeLiteral['immediate', 'after_turn']

Cancellation strategy:- "immediate": Stop immediately, cancel all tasks, clear queues (default)- "after_turn": Complete current turn gracefully before stopping * Allows LLM response to finish * Executes pending tool calls * Saves session state properly * Tracks usage accurately * Stops before next turn begins

'immediate'
Example
result=Runner.run_streamed(agent,"Task",session=session)asyncforeventinresult.stream_events():ifuser_interrupted():result.cancel(mode="after_turn")# Graceful# result.cancel()  # Immediate (default)

Note: After calling cancel(), you should continue consuming stream_events()to allow the cancellation to complete properly.

Source code insrc/agents/result.py
defcancel(self,mode:Literal["immediate","after_turn"]="immediate")->None:"""Cancel the streaming run.    Args:        mode: Cancellation strategy:            - "immediate": Stop immediately, cancel all tasks, clear queues (default)            - "after_turn": Complete current turn gracefully before stopping                * Allows LLM response to finish                * Executes pending tool calls                * Saves session state properly                * Tracks usage accurately                * Stops before next turn begins    Example:        ```python        result = Runner.run_streamed(agent, "Task", session=session)        async for event in result.stream_events():            if user_interrupted():                result.cancel(mode="after_turn")  # Graceful                # result.cancel()  # Immediate (default)        ```    Note: After calling cancel(), you should continue consuming stream_events()    to allow the cancellation to complete properly.    """# Store the cancel mode for the background task to checkself._cancel_mode=modeifmode=="immediate":# Existing behavior - immediate shutdownself._cleanup_tasks()# Cancel all running tasksself.is_complete=True# Mark the run as complete to stop event streamingwhilenotself._input_guardrail_queue.empty():self._input_guardrail_queue.get_nowait()# Unblock any streamers waiting on the event queue.self._event_queue.put_nowait(QueueCompleteSentinel())ifnotself._waiting_on_event_queue:self._drain_event_queue()elifmode=="after_turn":# Soft cancel - just set the flag# The streaming loop will check this and stop gracefully# Don't call _cleanup_tasks() or clear queues yetpass

stream_eventsasync

stream_events()->AsyncIterator[StreamEvent]

Stream deltas for new items as they are generated. We're using the types from theOpenAI Responses API, so these are semantic events: each event has atype field thatdescribes the type of the event, along with the data for that event.

This will raise:- A MaxTurnsExceeded exception if the agent exceeds the max_turns limit.- A GuardrailTripwireTriggered exception if a guardrail is tripped.

Source code insrc/agents/result.py
asyncdefstream_events(self)->AsyncIterator[StreamEvent]:"""Stream deltas for new items as they are generated. We're using the types from the    OpenAI Responses API, so these are semantic events: each event has a `type` field that    describes the type of the event, along with the data for that event.    This will raise:    - A MaxTurnsExceeded exception if the agent exceeds the max_turns limit.    - A GuardrailTripwireTriggered exception if a guardrail is tripped.    """cancelled=Falsetry:whileTrue:self._check_errors()ifself._stored_exception:logger.debug("Breaking due to stored exception")self.is_complete=Truebreakifself.is_completeandself._event_queue.empty():breaktry:self._waiting_on_event_queue=Trueitem=awaitself._event_queue.get()exceptasyncio.CancelledError:cancelled=Trueself.cancel()raisefinally:self._waiting_on_event_queue=Falseifisinstance(item,QueueCompleteSentinel):# Await input guardrails if they are still running, so late# exceptions are captured.awaitself._await_task_safely(self._input_guardrails_task)self._event_queue.task_done()# Check for errors, in case the queue was completed# due to an exceptionself._check_errors()breakyielditemself._event_queue.task_done()finally:ifcancelled:# Cancellation should return promptly, so avoid waiting on long-running tasks.# Tasks have already been cancelled above.self._cleanup_tasks()else:# Ensure main execution completes before cleanup to avoid race conditions# with session operationsawaitself._await_task_safely(self.run_loop_task)# Safely terminate all background tasks after main execution has finishedself._cleanup_tasks()# Allow any pending callbacks (e.g., cancellation handlers) to enqueue their# completion sentinels before we clear the queues for observability.awaitasyncio.sleep(0)# Drain queues so callers observing internal state see them empty after completion.self._drain_event_queue()self._drain_input_guardrail_queue()ifself._stored_exception:raiseself._stored_exception

to_state

to_state()->RunState[Any]

Create a RunState from this streaming result to resume execution.

This is useful when the run was interrupted (e.g., for tool approval). You canapprove or reject the tool calls on the returned state, then pass it back toRunner.run_streamed() to continue execution.

Returns:

TypeDescription
RunState[Any]

A RunState that can be used to resume the run.

Example
# Run agent until it needs approvalresult=Runner.run_streamed(agent,"Use the delete_file tool")asyncforeventinresult.stream_events():passifresult.interruptions:# Approve the tool callstate=result.to_state()state.approve(result.interruptions[0])# Resume the runresult=Runner.run_streamed(agent,state)asyncforeventinresult.stream_events():pass
Source code insrc/agents/result.py
defto_state(self)->RunState[Any]:"""Create a RunState from this streaming result to resume execution.    This is useful when the run was interrupted (e.g., for tool approval). You can    approve or reject the tool calls on the returned state, then pass it back to    `Runner.run_streamed()` to continue execution.    Returns:        A RunState that can be used to resume the run.    Example:        ```python        # Run agent until it needs approval        result = Runner.run_streamed(agent, "Use the delete_file tool")        async for event in result.stream_events():            pass        if result.interruptions:            # Approve the tool call            state = result.to_state()            state.approve(result.interruptions[0])            # Resume the run            result = Runner.run_streamed(agent, state)            async for event in result.stream_events():                pass        ```    """# Create a RunState from the current result# Use _original_input (updated on handoffs/resume when input history changes).# This avoids serializing a mutated view of input history.state=RunState(context=self.context_wrapper,original_input=self._original_inputifself._original_inputisnotNoneelseself.input,starting_agent=self.last_agent,max_turns=self.max_turns,)return_populate_state_from_result(state,self,current_turn=self.current_turn,last_processed_response=self._last_processed_response,current_turn_persisted_item_count=self._current_turn_persisted_item_count,tool_use_tracker_snapshot=self._tool_use_tracker_snapshot,conversation_id=self._conversation_id,previous_response_id=self._previous_response_id,auto_previous_response_id=self._auto_previous_response_id,)

release_agents

release_agents(*,release_new_items:bool=True)->None

Release strong references to agents held by this result. After calling this method,accessingitem.agent orlast_agent may returnNone if the agent has been garbagecollected. Callers can use this when they are done inspecting the result and want toeagerly drop any associated agent graph.

Source code insrc/agents/result.py
defrelease_agents(self,*,release_new_items:bool=True)->None:"""    Release strong references to agents held by this result. After calling this method,    accessing `item.agent` or `last_agent` may return `None` if the agent has been garbage    collected. Callers can use this when they are done inspecting the result and want to    eagerly drop any associated agent graph.    """ifrelease_new_items:foriteminself.new_items:release=getattr(item,"release_agent",None)ifcallable(release):release()self._release_last_agent_reference()

final_output_as

final_output_as(cls:type[T],raise_if_incorrect_type:bool=False)->T

A convenience method to cast the final output to a specific type. By default, the castis only for the typechecker. If you setraise_if_incorrect_type to True, we'll raise aTypeError if the final output is not of the given type.

Parameters:

NameTypeDescriptionDefault
clstype[T]

The type to cast the final output to.

required
raise_if_incorrect_typebool

If True, we'll raise a TypeError if the final output is not ofthe given type.

False

Returns:

TypeDescription
T

The final output casted to the given type.

Source code insrc/agents/result.py
deffinal_output_as(self,cls:type[T],raise_if_incorrect_type:bool=False)->T:"""A convenience method to cast the final output to a specific type. By default, the cast    is only for the typechecker. If you set `raise_if_incorrect_type` to True, we'll raise a    TypeError if the final output is not of the given type.    Args:        cls: The type to cast the final output to.        raise_if_incorrect_type: If True, we'll raise a TypeError if the final output is not of            the given type.    Returns:        The final output casted to the given type.    """ifraise_if_incorrect_typeandnotisinstance(self.final_output,cls):raiseTypeError(f"Final output is not of type{cls.__name__}")returncast(T,self.final_output)

to_input_list

to_input_list()->list[TResponseInputItem]

Creates a new input list, merging the original input with all the new items generated.

Source code insrc/agents/result.py
defto_input_list(self)->list[TResponseInputItem]:"""Creates a new input list, merging the original input with all the new items generated."""original_items:list[TResponseInputItem]=ItemHelpers.input_to_new_input_list(self.input)new_items:list[TResponseInputItem]=[]foriteminself.new_items:ifisinstance(item,ToolApprovalItem):continuenew_items.append(item.to_input_item())returnoriginal_items+new_items

[8]ページ先頭

©2009-2026 Movatter.jp