classRealtimeSession(RealtimeModelListener):"""A connection to a realtime model. It streams events from the model to you, and allows you to send messages and audio to the model. Example: ```python runner = RealtimeRunner(agent) async with await runner.run() as session: # Send messages await session.send_message("Hello") await session.send_audio(audio_bytes) # Stream events async for event in session: if event.type == "audio": # Handle audio event pass ``` """def__init__(self,model:RealtimeModel,agent:RealtimeAgent,context:TContext|None,model_config:RealtimeModelConfig|None=None,run_config:RealtimeRunConfig|None=None,)->None:"""Initialize the session. Args: model: The model to use. agent: The current agent. context: The context object. model_config: Model configuration. run_config: Runtime configuration including guardrails. """self._model=modelself._current_agent=agentself._context_wrapper=RunContextWrapper(context)self._event_info=RealtimeEventInfo(context=self._context_wrapper)self._history:list[RealtimeItem]=[]self._model_config=model_configor{}self._run_config=run_configor{}initial_model_settings=self._model_config.get("initial_model_settings")run_config_settings=self._run_config.get("model_settings")self._base_model_settings:RealtimeSessionModelSettings={**(run_config_settingsor{}),**(initial_model_settingsor{}),}self._event_queue:asyncio.Queue[RealtimeSessionEvent]=asyncio.Queue()self._closed=Falseself._stored_exception:BaseException|None=None# Guardrails state trackingself._interrupted_response_ids:set[str]=set()self._item_transcripts:dict[str,str]={}# item_id -> accumulated transcriptself._item_guardrail_run_counts:dict[str,int]={}# item_id -> run countself._debounce_text_length=self._run_config.get("guardrails_settings",{}).get("debounce_text_length",100)self._guardrail_tasks:set[asyncio.Task[Any]]=set()self._tool_call_tasks:set[asyncio.Task[Any]]=set()self._async_tool_calls:bool=bool(self._run_config.get("async_tool_calls",True))@propertydefmodel(self)->RealtimeModel:"""Access the underlying model for adding listeners or other direct interaction."""returnself._modelasyncdef__aenter__(self)->RealtimeSession:"""Start the session by connecting to the model. After this, you will be able to stream events from the model and send messages and audio to the model. """# Add ourselves as a listenerself._model.add_listener(self)model_config=self._model_config.copy()model_config["initial_model_settings"]=awaitself._get_updated_model_settings_from_agent(starting_settings=self._model_config.get("initial_model_settings",None),agent=self._current_agent,)# Connect to the modelawaitself._model.connect(model_config)# Emit initial history updateawaitself._put_event(RealtimeHistoryUpdated(history=self._history,info=self._event_info,))returnselfasyncdefenter(self)->RealtimeSession:"""Enter the async context manager. We strongly recommend using the async context manager pattern instead of this method. If you use this, you need to manually call `close()` when you are done. """returnawaitself.__aenter__()asyncdef__aexit__(self,_exc_type:Any,_exc_val:Any,_exc_tb:Any)->None:"""End the session."""awaitself.close()asyncdef__aiter__(self)->AsyncIterator[RealtimeSessionEvent]:"""Iterate over events from the session."""whilenotself._closed:try:# Check if there's a stored exception to raiseifself._stored_exceptionisnotNone:# Clean up resources before raisingawaitself._cleanup()raiseself._stored_exceptionevent=awaitself._event_queue.get()yieldeventexceptasyncio.CancelledError:breakasyncdefclose(self)->None:"""Close the session."""awaitself._cleanup()asyncdefsend_message(self,message:RealtimeUserInput)->None:"""Send a message to the model."""awaitself._model.send_event(RealtimeModelSendUserInput(user_input=message))asyncdefsend_audio(self,audio:bytes,*,commit:bool=False)->None:"""Send a raw audio chunk to the model."""awaitself._model.send_event(RealtimeModelSendAudio(audio=audio,commit=commit))asyncdefinterrupt(self)->None:"""Interrupt the model."""awaitself._model.send_event(RealtimeModelSendInterrupt())asyncdefupdate_agent(self,agent:RealtimeAgent)->None:"""Update the active agent for this session and apply its settings to the model."""self._current_agent=agentupdated_settings=awaitself._get_updated_model_settings_from_agent(starting_settings=None,agent=self._current_agent,)awaitself._model.send_event(RealtimeModelSendSessionUpdate(session_settings=updated_settings))asyncdefon_event(self,event:RealtimeModelEvent)->None:awaitself._put_event(RealtimeRawModelEvent(data=event,info=self._event_info))ifevent.type=="error":awaitself._put_event(RealtimeError(info=self._event_info,error=event.error))elifevent.type=="function_call":agent_snapshot=self._current_agentifself._async_tool_calls:self._enqueue_tool_call_task(event,agent_snapshot)else:awaitself._handle_tool_call(event,agent_snapshot=agent_snapshot)elifevent.type=="audio":awaitself._put_event(RealtimeAudio(info=self._event_info,audio=event,item_id=event.item_id,content_index=event.content_index,))elifevent.type=="audio_interrupted":awaitself._put_event(RealtimeAudioInterrupted(info=self._event_info,item_id=event.item_id,content_index=event.content_index))elifevent.type=="audio_done":awaitself._put_event(RealtimeAudioEnd(info=self._event_info,item_id=event.item_id,content_index=event.content_index))elifevent.type=="input_audio_transcription_completed":prev_len=len(self._history)self._history=RealtimeSession._get_new_history(self._history,event)# If a new user item was appended (no existing item),# emit history_added for incremental UIs.iflen(self._history)>prev_lenandlen(self._history)>0:new_item=self._history[-1]awaitself._put_event(RealtimeHistoryAdded(info=self._event_info,item=new_item))else:awaitself._put_event(RealtimeHistoryUpdated(info=self._event_info,history=self._history))elifevent.type=="input_audio_timeout_triggered":awaitself._put_event(RealtimeInputAudioTimeoutTriggered(info=self._event_info,))elifevent.type=="transcript_delta":# Accumulate transcript text for guardrail debouncing per item_iditem_id=event.item_idifitem_idnotinself._item_transcripts:self._item_transcripts[item_id]=""self._item_guardrail_run_counts[item_id]=0self._item_transcripts[item_id]+=event.deltaself._history=self._get_new_history(self._history,AssistantMessageItem(item_id=item_id,content=[AssistantAudio(transcript=self._item_transcripts[item_id])],),)# Check if we should run guardrails based on debounce thresholdcurrent_length=len(self._item_transcripts[item_id])threshold=self._debounce_text_lengthnext_run_threshold=(self._item_guardrail_run_counts[item_id]+1)*thresholdifcurrent_length>=next_run_threshold:self._item_guardrail_run_counts[item_id]+=1# Pass response_id so we can ensure only a single interrupt per responseself._enqueue_guardrail_task(self._item_transcripts[item_id],event.response_id)elifevent.type=="item_updated":is_new=notany(item.item_id==event.item.item_idforiteminself._history)# Preserve previously known transcripts when updating existing items.# This prevents transcripts from disappearing when an item is later# retrieved without transcript fields populated.incoming_item=event.itemexisting_item=next((iforiinself._historyifi.item_id==incoming_item.item_id),None)if(existing_itemisnotNoneandexisting_item.type=="message"andincoming_item.type=="message"):try:# Merge transcripts for matching content indicesexisting_content=existing_item.contentnew_content=[]foridx,entryinenumerate(incoming_item.content):# Only attempt to preserve for audio-like contentifentry.typein("audio","input_audio"):# Use tuple form for Python 3.9 compatibilityassertisinstance(entry,(InputAudio,AssistantAudio))# Determine if transcript is missing/empty on the incoming entryentry_transcript=entry.transcriptifnotentry_transcript:preserved:str|None=None# First prefer any transcript from the existing history itemifidx<len(existing_content):this_content=existing_content[idx]ifisinstance(this_content,AssistantAudio)orisinstance(this_content,InputAudio):preserved=this_content.transcript# If still missing and this is an assistant item, fall back to# accumulated transcript deltas tracked during the turn.ifincoming_item.role=="assistant":preserved=self._item_transcripts.get(incoming_item.item_id)ifpreserved:entry=entry.model_copy(update={"transcript":preserved})new_content.append(entry)ifnew_content:incoming_item=incoming_item.model_copy(update={"content":new_content})exceptException:logger.error("Error merging transcripts",exc_info=True)passself._history=self._get_new_history(self._history,incoming_item)ifis_new:new_item=next(itemforiteminself._historyifitem.item_id==event.item.item_id)awaitself._put_event(RealtimeHistoryAdded(info=self._event_info,item=new_item))else:awaitself._put_event(RealtimeHistoryUpdated(info=self._event_info,history=self._history))elifevent.type=="item_deleted":deleted_id=event.item_idself._history=[itemforiteminself._historyifitem.item_id!=deleted_id]awaitself._put_event(RealtimeHistoryUpdated(info=self._event_info,history=self._history))elifevent.type=="connection_status":passelifevent.type=="turn_started":awaitself._put_event(RealtimeAgentStartEvent(agent=self._current_agent,info=self._event_info,))elifevent.type=="turn_ended":# Clear guardrail state for next turnself._item_transcripts.clear()self._item_guardrail_run_counts.clear()awaitself._put_event(RealtimeAgentEndEvent(agent=self._current_agent,info=self._event_info,))elifevent.type=="exception":# Store the exception to be raised in __aiter__self._stored_exception=event.exceptionelifevent.type=="other":passelifevent.type=="raw_server_event":passelse:assert_never(event)asyncdef_put_event(self,event:RealtimeSessionEvent)->None:"""Put an event into the queue."""awaitself._event_queue.put(event)asyncdef_handle_tool_call(self,event:RealtimeModelToolCallEvent,*,agent_snapshot:RealtimeAgent|None=None,)->None:"""Handle a tool call event."""agent=agent_snapshotorself._current_agenttools,handoffs=awaitasyncio.gather(agent.get_all_tools(self._context_wrapper),self._get_handoffs(agent,self._context_wrapper),)function_map={tool.name:toolfortoolintoolsifisinstance(tool,FunctionTool)}handoff_map={handoff.tool_name:handoffforhandoffinhandoffs}ifevent.nameinfunction_map:awaitself._put_event(RealtimeToolStart(info=self._event_info,tool=function_map[event.name],agent=agent,arguments=event.arguments,))func_tool=function_map[event.name]tool_context=ToolContext(context=self._context_wrapper.context,usage=self._context_wrapper.usage,tool_name=event.name,tool_call_id=event.call_id,tool_arguments=event.arguments,)result=awaitfunc_tool.on_invoke_tool(tool_context,event.arguments)awaitself._model.send_event(RealtimeModelSendToolOutput(tool_call=event,output=str(result),start_response=True))awaitself._put_event(RealtimeToolEnd(info=self._event_info,tool=func_tool,output=result,agent=agent,arguments=event.arguments,))elifevent.nameinhandoff_map:handoff=handoff_map[event.name]tool_context=ToolContext(context=self._context_wrapper.context,usage=self._context_wrapper.usage,tool_name=event.name,tool_call_id=event.call_id,tool_arguments=event.arguments,)# Execute the handoff to get the new agentresult=awaithandoff.on_invoke_handoff(self._context_wrapper,event.arguments)ifnotisinstance(result,RealtimeAgent):raiseUserError(f"Handoff{handoff.tool_name} returned invalid result:{type(result)}")# Store previous agent for eventprevious_agent=agent# Update current agentself._current_agent=result# Get updated model settings from new agentupdated_settings=awaitself._get_updated_model_settings_from_agent(starting_settings=None,agent=self._current_agent,)# Send handoff eventawaitself._put_event(RealtimeHandoffEvent(from_agent=previous_agent,to_agent=self._current_agent,info=self._event_info,))# First, send the session update so the model receives the new instructionsawaitself._model.send_event(RealtimeModelSendSessionUpdate(session_settings=updated_settings))# Then send tool output to complete the handoff (this triggers a new response)transfer_message=handoff.get_transfer_message(result)awaitself._model.send_event(RealtimeModelSendToolOutput(tool_call=event,output=transfer_message,start_response=True,))else:raiseModelBehaviorError(f"Tool{event.name} not found")@classmethoddef_get_new_history(cls,old_history:list[RealtimeItem],event:RealtimeModelInputAudioTranscriptionCompletedEvent|RealtimeItem,)->list[RealtimeItem]:ifisinstance(event,RealtimeModelInputAudioTranscriptionCompletedEvent):new_history:list[RealtimeItem]=[]existing_item_found=Falseforiteminold_history:ifitem.item_id==event.item_idanditem.type=="message"anditem.role=="user":content:list[InputText|InputAudio]=[]forentryinitem.content:ifentry.type=="input_audio":copied_entry=entry.model_copy(update={"transcript":event.transcript})content.append(copied_entry)else:content.append(entry)# type: ignorenew_history.append(item.model_copy(update={"content":content,"status":"completed"}))existing_item_found=Trueelse:new_history.append(item)ifexisting_item_foundisFalse:new_history.append(UserMessageItem(item_id=event.item_id,content=[InputText(text=event.transcript)]))returnnew_history# TODO (rm) Add support for audio storage config# If the item already exists, update itexisting_index=next((ifori,iteminenumerate(old_history)ifitem.item_id==event.item_id),None)ifexisting_indexisnotNone:new_history=old_history.copy()ifevent.type=="message"andevent.contentisnotNoneandlen(event.content)>0:existing_item=old_history[existing_index]ifexisting_item.type=="message":# Merge content preserving existing transcript/text when incoming entry is emptyifevent.role=="assistant"andexisting_item.role=="assistant":assistant_existing_content=existing_item.contentassistant_incoming=event.contentassistant_new_content:list[AssistantText|AssistantAudio]=[]foridx,acinenumerate(assistant_incoming):ifidx>=len(assistant_existing_content):assistant_new_content.append(ac)continueassistant_current=assistant_existing_content[idx]ifac.type=="audio":ifac.transcriptisNone:assistant_new_content.append(assistant_current)else:assistant_new_content.append(ac)else:# textcur_text=(assistant_current.textifisinstance(assistant_current,AssistantText)elseNone)ifcur_textisnotNoneandac.textisNone:assistant_new_content.append(assistant_current)else:assistant_new_content.append(ac)updated_assistant=event.model_copy(update={"content":assistant_new_content})new_history[existing_index]=updated_assistantelifevent.role=="user"andexisting_item.role=="user":user_existing_content=existing_item.contentuser_incoming=event.content# Start from incoming content (prefer latest fields)user_new_content:list[InputText|InputAudio|InputImage]=list(user_incoming)# Merge by type with special handling for images and transcriptsdef_image_url_str(val:object)->str|None:ifisinstance(val,InputImage):returnval.image_urlorNonereturnNone# 1) Preserve any existing images that are missing from the incoming payloadincoming_image_urls:set[str]=set()forpartinuser_incoming:ifisinstance(part,InputImage):u=_image_url_str(part)ifu:incoming_image_urls.add(u)missing_images:list[InputImage]=[]forpartinuser_existing_content:ifisinstance(part,InputImage):u=_image_url_str(part)ifuandunotinincoming_image_urls:missing_images.append(part)# Insert missing images at the beginning to keep them visible and stableifmissing_images:user_new_content=missing_images+user_new_content# 2) For text/audio entries, preserve existing when incoming entry is emptymerged:list[InputText|InputAudio|InputImage]=[]foridx,ucinenumerate(user_new_content):ifuc.type=="input_audio":# Attempt to preserve transcript if emptytranscript=getattr(uc,"transcript",None)iftranscriptisNoneandidx<len(user_existing_content):prev=user_existing_content[idx]ifisinstance(prev,InputAudio)andprev.transcriptisnotNone:uc=uc.model_copy(update={"transcript":prev.transcript})merged.append(uc)elifuc.type=="input_text":text=getattr(uc,"text",None)if(textisNoneortext=="")andidx<len(user_existing_content):prev=user_existing_content[idx]ifisinstance(prev,InputText)andprev.text:uc=uc.model_copy(update={"text":prev.text})merged.append(uc)else:merged.append(uc)updated_user=event.model_copy(update={"content":merged})new_history[existing_index]=updated_userelifevent.role=="system"andexisting_item.role=="system":system_existing_content=existing_item.contentsystem_incoming=event.content# Prefer existing non-empty text when incoming is emptysystem_new_content:list[InputText]=[]foridx,scinenumerate(system_incoming):ifidx>=len(system_existing_content):system_new_content.append(sc)continuesystem_current=system_existing_content[idx]cur_text=system_current.textifcur_textisnotNoneandsc.textisNone:system_new_content.append(system_current)else:system_new_content.append(sc)updated_system=event.model_copy(update={"content":system_new_content})new_history[existing_index]=updated_systemelse:# Role changed or mismatched; just replacenew_history[existing_index]=eventelse:# If the existing item is not a message, just replace it.new_history[existing_index]=eventreturnnew_history# Otherwise, insert it after the previous_item_id if that is setelifevent.previous_item_id:# Insert the new item after the previous itemprevious_index=next((ifori,iteminenumerate(old_history)ifitem.item_id==event.previous_item_id),None,)ifprevious_indexisnotNone:new_history=old_history.copy()new_history.insert(previous_index+1,event)returnnew_history# Otherwise, add it to the endreturnold_history+[event]asyncdef_run_output_guardrails(self,text:str,response_id:str)->bool:"""Run output guardrails on the given text. Returns True if any guardrail was triggered."""combined_guardrails=self._current_agent.output_guardrails+self._run_config.get("output_guardrails",[])seen_ids:set[int]=set()output_guardrails=[]forguardrailincombined_guardrails:guardrail_id=id(guardrail)ifguardrail_idnotinseen_ids:output_guardrails.append(guardrail)seen_ids.add(guardrail_id)# If we've already interrupted this response, skipifnotoutput_guardrailsorresponse_idinself._interrupted_response_ids:returnFalsetriggered_results=[]forguardrailinoutput_guardrails:try:result=awaitguardrail.run(# TODO (rm) Remove this cast, it's wrongself._context_wrapper,cast(Agent[Any],self._current_agent),text,)ifresult.output.tripwire_triggered:triggered_results.append(result)exceptException:# Continue with other guardrails if one failscontinueiftriggered_results:# Double-check: bail if already interrupted for this responseifresponse_idinself._interrupted_response_ids:returnFalse# Mark as interrupted immediately (before any awaits) to minimize race windowself._interrupted_response_ids.add(response_id)# Emit guardrail tripped eventawaitself._put_event(RealtimeGuardrailTripped(guardrail_results=triggered_results,message=text,info=self._event_info,))# Interrupt the modelawaitself._model.send_event(RealtimeModelSendInterrupt(force_response_cancel=True))# Send guardrail triggered messageguardrail_names=[result.guardrail.get_name()forresultintriggered_results]awaitself._model.send_event(RealtimeModelSendUserInput(user_input=f"guardrail triggered:{', '.join(guardrail_names)}"))returnTruereturnFalsedef_enqueue_guardrail_task(self,text:str,response_id:str)->None:# Runs the guardrails in a separate task to avoid blocking the main looptask=asyncio.create_task(self._run_output_guardrails(text,response_id))self._guardrail_tasks.add(task)# Add callback to remove completed tasks and handle exceptionstask.add_done_callback(self._on_guardrail_task_done)def_on_guardrail_task_done(self,task:asyncio.Task[Any])->None:"""Handle completion of a guardrail task."""# Remove from tracking setself._guardrail_tasks.discard(task)# Check for exceptions and propagate as eventsifnottask.cancelled():exception=task.exception()ifexception:# Create an exception event instead of raisingasyncio.create_task(self._put_event(RealtimeError(info=self._event_info,error={"message":f"Guardrail task failed:{str(exception)}"},)))def_cleanup_guardrail_tasks(self)->None:fortaskinself._guardrail_tasks:ifnottask.done():task.cancel()self._guardrail_tasks.clear()def_enqueue_tool_call_task(self,event:RealtimeModelToolCallEvent,agent_snapshot:RealtimeAgent)->None:"""Run tool calls in the background to avoid blocking realtime transport."""task=asyncio.create_task(self._handle_tool_call(event,agent_snapshot=agent_snapshot))self._tool_call_tasks.add(task)task.add_done_callback(self._on_tool_call_task_done)def_on_tool_call_task_done(self,task:asyncio.Task[Any])->None:self._tool_call_tasks.discard(task)iftask.cancelled():returnexception=task.exception()ifexceptionisNone:returnlogger.exception("Realtime tool call task failed",exc_info=exception)ifself._stored_exceptionisNone:self._stored_exception=exceptionasyncio.create_task(self._put_event(RealtimeError(info=self._event_info,error={"message":f"Tool call task failed:{exception}"},)))def_cleanup_tool_call_tasks(self)->None:fortaskinself._tool_call_tasks:ifnottask.done():task.cancel()self._tool_call_tasks.clear()asyncdef_cleanup(self)->None:"""Clean up all resources and mark session as closed."""# Cancel and cleanup guardrail tasksself._cleanup_guardrail_tasks()self._cleanup_tool_call_tasks()# Remove ourselves as a listenerself._model.remove_listener(self)# Close the model connectionawaitself._model.close()# Mark as closedself._closed=Trueasyncdef_get_updated_model_settings_from_agent(self,starting_settings:RealtimeSessionModelSettings|None,agent:RealtimeAgent,)->RealtimeSessionModelSettings:# Start with the merged base settings from run and model configuration.updated_settings=self._base_model_settings.copy()ifagent.promptisnotNone:updated_settings["prompt"]=agent.promptinstructions,tools,handoffs=awaitasyncio.gather(agent.get_system_prompt(self._context_wrapper),agent.get_all_tools(self._context_wrapper),self._get_handoffs(agent,self._context_wrapper),)updated_settings["instructions"]=instructionsor""updated_settings["tools"]=toolsor[]updated_settings["handoffs"]=handoffsor[]# Apply starting settings (from model config) nextifstarting_settings:updated_settings.update(starting_settings)disable_tracing=self._run_config.get("tracing_disabled",False)ifdisable_tracing:updated_settings["tracing"]=Nonereturnupdated_settings@classmethodasyncdef_get_handoffs(cls,agent:RealtimeAgent[Any],context_wrapper:RunContextWrapper[Any])->list[Handoff[Any,RealtimeAgent[Any]]]:handoffs:list[Handoff[Any,RealtimeAgent[Any]]]=[]forhandoff_iteminagent.handoffs:ifisinstance(handoff_item,Handoff):handoffs.append(handoff_item)elifisinstance(handoff_item,RealtimeAgent):handoffs.append(realtime_handoff(handoff_item))asyncdef_check_handoff_enabled(handoff_obj:Handoff[Any,RealtimeAgent[Any]])->bool:attr=handoff_obj.is_enabledifisinstance(attr,bool):returnattrres=attr(context_wrapper,agent)ifinspect.isawaitable(res):returnawaitresreturnresresults=awaitasyncio.gather(*(_check_handoff_enabled(h)forhinhandoffs))enabled=[hforh,okinzip(handoffs,results)ifok]returnenabled