Movatterモバイル変換


[0]ホーム

URL:


Skip to content

RealtimeSession

Bases:RealtimeModelListener

A connection to a realtime model. It streams events from the model to you, and allows you tosend messages and audio to the model.

Example
runner=RealtimeRunner(agent)asyncwithawaitrunner.run()assession:# Send messagesawaitsession.send_message("Hello")awaitsession.send_audio(audio_bytes)# Stream eventsasyncforeventinsession:ifevent.type=="audio":# Handle audio eventpass
Source code insrc/agents/realtime/session.py
 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874
classRealtimeSession(RealtimeModelListener):"""A connection to a realtime model. It streams events from the model to you, and allows you to    send messages and audio to the model.    Example:        ```python        runner = RealtimeRunner(agent)        async with await runner.run() as session:            # Send messages            await session.send_message("Hello")            await session.send_audio(audio_bytes)            # Stream events            async for event in session:                if event.type == "audio":                    # Handle audio event                    pass        ```    """def__init__(self,model:RealtimeModel,agent:RealtimeAgent,context:TContext|None,model_config:RealtimeModelConfig|None=None,run_config:RealtimeRunConfig|None=None,)->None:"""Initialize the session.        Args:            model: The model to use.            agent: The current agent.            context: The context object.            model_config: Model configuration.            run_config: Runtime configuration including guardrails.        """self._model=modelself._current_agent=agentself._context_wrapper=RunContextWrapper(context)self._event_info=RealtimeEventInfo(context=self._context_wrapper)self._history:list[RealtimeItem]=[]self._model_config=model_configor{}self._run_config=run_configor{}initial_model_settings=self._model_config.get("initial_model_settings")run_config_settings=self._run_config.get("model_settings")self._base_model_settings:RealtimeSessionModelSettings={**(run_config_settingsor{}),**(initial_model_settingsor{}),}self._event_queue:asyncio.Queue[RealtimeSessionEvent]=asyncio.Queue()self._closed=Falseself._stored_exception:BaseException|None=None# Guardrails state trackingself._interrupted_response_ids:set[str]=set()self._item_transcripts:dict[str,str]={}# item_id -> accumulated transcriptself._item_guardrail_run_counts:dict[str,int]={}# item_id -> run countself._debounce_text_length=self._run_config.get("guardrails_settings",{}).get("debounce_text_length",100)self._guardrail_tasks:set[asyncio.Task[Any]]=set()self._tool_call_tasks:set[asyncio.Task[Any]]=set()self._async_tool_calls:bool=bool(self._run_config.get("async_tool_calls",True))@propertydefmodel(self)->RealtimeModel:"""Access the underlying model for adding listeners or other direct interaction."""returnself._modelasyncdef__aenter__(self)->RealtimeSession:"""Start the session by connecting to the model. After this, you will be able to stream        events from the model and send messages and audio to the model.        """# Add ourselves as a listenerself._model.add_listener(self)model_config=self._model_config.copy()model_config["initial_model_settings"]=awaitself._get_updated_model_settings_from_agent(starting_settings=self._model_config.get("initial_model_settings",None),agent=self._current_agent,)# Connect to the modelawaitself._model.connect(model_config)# Emit initial history updateawaitself._put_event(RealtimeHistoryUpdated(history=self._history,info=self._event_info,))returnselfasyncdefenter(self)->RealtimeSession:"""Enter the async context manager. We strongly recommend using the async context manager        pattern instead of this method. If you use this, you need to manually call `close()` when        you are done.        """returnawaitself.__aenter__()asyncdef__aexit__(self,_exc_type:Any,_exc_val:Any,_exc_tb:Any)->None:"""End the session."""awaitself.close()asyncdef__aiter__(self)->AsyncIterator[RealtimeSessionEvent]:"""Iterate over events from the session."""whilenotself._closed:try:# Check if there's a stored exception to raiseifself._stored_exceptionisnotNone:# Clean up resources before raisingawaitself._cleanup()raiseself._stored_exceptionevent=awaitself._event_queue.get()yieldeventexceptasyncio.CancelledError:breakasyncdefclose(self)->None:"""Close the session."""awaitself._cleanup()asyncdefsend_message(self,message:RealtimeUserInput)->None:"""Send a message to the model."""awaitself._model.send_event(RealtimeModelSendUserInput(user_input=message))asyncdefsend_audio(self,audio:bytes,*,commit:bool=False)->None:"""Send a raw audio chunk to the model."""awaitself._model.send_event(RealtimeModelSendAudio(audio=audio,commit=commit))asyncdefinterrupt(self)->None:"""Interrupt the model."""awaitself._model.send_event(RealtimeModelSendInterrupt())asyncdefupdate_agent(self,agent:RealtimeAgent)->None:"""Update the active agent for this session and apply its settings to the model."""self._current_agent=agentupdated_settings=awaitself._get_updated_model_settings_from_agent(starting_settings=None,agent=self._current_agent,)awaitself._model.send_event(RealtimeModelSendSessionUpdate(session_settings=updated_settings))asyncdefon_event(self,event:RealtimeModelEvent)->None:awaitself._put_event(RealtimeRawModelEvent(data=event,info=self._event_info))ifevent.type=="error":awaitself._put_event(RealtimeError(info=self._event_info,error=event.error))elifevent.type=="function_call":agent_snapshot=self._current_agentifself._async_tool_calls:self._enqueue_tool_call_task(event,agent_snapshot)else:awaitself._handle_tool_call(event,agent_snapshot=agent_snapshot)elifevent.type=="audio":awaitself._put_event(RealtimeAudio(info=self._event_info,audio=event,item_id=event.item_id,content_index=event.content_index,))elifevent.type=="audio_interrupted":awaitself._put_event(RealtimeAudioInterrupted(info=self._event_info,item_id=event.item_id,content_index=event.content_index))elifevent.type=="audio_done":awaitself._put_event(RealtimeAudioEnd(info=self._event_info,item_id=event.item_id,content_index=event.content_index))elifevent.type=="input_audio_transcription_completed":prev_len=len(self._history)self._history=RealtimeSession._get_new_history(self._history,event)# If a new user item was appended (no existing item),# emit history_added for incremental UIs.iflen(self._history)>prev_lenandlen(self._history)>0:new_item=self._history[-1]awaitself._put_event(RealtimeHistoryAdded(info=self._event_info,item=new_item))else:awaitself._put_event(RealtimeHistoryUpdated(info=self._event_info,history=self._history))elifevent.type=="input_audio_timeout_triggered":awaitself._put_event(RealtimeInputAudioTimeoutTriggered(info=self._event_info,))elifevent.type=="transcript_delta":# Accumulate transcript text for guardrail debouncing per item_iditem_id=event.item_idifitem_idnotinself._item_transcripts:self._item_transcripts[item_id]=""self._item_guardrail_run_counts[item_id]=0self._item_transcripts[item_id]+=event.deltaself._history=self._get_new_history(self._history,AssistantMessageItem(item_id=item_id,content=[AssistantAudio(transcript=self._item_transcripts[item_id])],),)# Check if we should run guardrails based on debounce thresholdcurrent_length=len(self._item_transcripts[item_id])threshold=self._debounce_text_lengthnext_run_threshold=(self._item_guardrail_run_counts[item_id]+1)*thresholdifcurrent_length>=next_run_threshold:self._item_guardrail_run_counts[item_id]+=1# Pass response_id so we can ensure only a single interrupt per responseself._enqueue_guardrail_task(self._item_transcripts[item_id],event.response_id)elifevent.type=="item_updated":is_new=notany(item.item_id==event.item.item_idforiteminself._history)# Preserve previously known transcripts when updating existing items.# This prevents transcripts from disappearing when an item is later# retrieved without transcript fields populated.incoming_item=event.itemexisting_item=next((iforiinself._historyifi.item_id==incoming_item.item_id),None)if(existing_itemisnotNoneandexisting_item.type=="message"andincoming_item.type=="message"):try:# Merge transcripts for matching content indicesexisting_content=existing_item.contentnew_content=[]foridx,entryinenumerate(incoming_item.content):# Only attempt to preserve for audio-like contentifentry.typein("audio","input_audio"):# Use tuple form for Python 3.9 compatibilityassertisinstance(entry,(InputAudio,AssistantAudio))# Determine if transcript is missing/empty on the incoming entryentry_transcript=entry.transcriptifnotentry_transcript:preserved:str|None=None# First prefer any transcript from the existing history itemifidx<len(existing_content):this_content=existing_content[idx]ifisinstance(this_content,AssistantAudio)orisinstance(this_content,InputAudio):preserved=this_content.transcript# If still missing and this is an assistant item, fall back to# accumulated transcript deltas tracked during the turn.ifincoming_item.role=="assistant":preserved=self._item_transcripts.get(incoming_item.item_id)ifpreserved:entry=entry.model_copy(update={"transcript":preserved})new_content.append(entry)ifnew_content:incoming_item=incoming_item.model_copy(update={"content":new_content})exceptException:logger.error("Error merging transcripts",exc_info=True)passself._history=self._get_new_history(self._history,incoming_item)ifis_new:new_item=next(itemforiteminself._historyifitem.item_id==event.item.item_id)awaitself._put_event(RealtimeHistoryAdded(info=self._event_info,item=new_item))else:awaitself._put_event(RealtimeHistoryUpdated(info=self._event_info,history=self._history))elifevent.type=="item_deleted":deleted_id=event.item_idself._history=[itemforiteminself._historyifitem.item_id!=deleted_id]awaitself._put_event(RealtimeHistoryUpdated(info=self._event_info,history=self._history))elifevent.type=="connection_status":passelifevent.type=="turn_started":awaitself._put_event(RealtimeAgentStartEvent(agent=self._current_agent,info=self._event_info,))elifevent.type=="turn_ended":# Clear guardrail state for next turnself._item_transcripts.clear()self._item_guardrail_run_counts.clear()awaitself._put_event(RealtimeAgentEndEvent(agent=self._current_agent,info=self._event_info,))elifevent.type=="exception":# Store the exception to be raised in __aiter__self._stored_exception=event.exceptionelifevent.type=="other":passelifevent.type=="raw_server_event":passelse:assert_never(event)asyncdef_put_event(self,event:RealtimeSessionEvent)->None:"""Put an event into the queue."""awaitself._event_queue.put(event)asyncdef_handle_tool_call(self,event:RealtimeModelToolCallEvent,*,agent_snapshot:RealtimeAgent|None=None,)->None:"""Handle a tool call event."""agent=agent_snapshotorself._current_agenttools,handoffs=awaitasyncio.gather(agent.get_all_tools(self._context_wrapper),self._get_handoffs(agent,self._context_wrapper),)function_map={tool.name:toolfortoolintoolsifisinstance(tool,FunctionTool)}handoff_map={handoff.tool_name:handoffforhandoffinhandoffs}ifevent.nameinfunction_map:awaitself._put_event(RealtimeToolStart(info=self._event_info,tool=function_map[event.name],agent=agent,arguments=event.arguments,))func_tool=function_map[event.name]tool_context=ToolContext(context=self._context_wrapper.context,usage=self._context_wrapper.usage,tool_name=event.name,tool_call_id=event.call_id,tool_arguments=event.arguments,)result=awaitfunc_tool.on_invoke_tool(tool_context,event.arguments)awaitself._model.send_event(RealtimeModelSendToolOutput(tool_call=event,output=str(result),start_response=True))awaitself._put_event(RealtimeToolEnd(info=self._event_info,tool=func_tool,output=result,agent=agent,arguments=event.arguments,))elifevent.nameinhandoff_map:handoff=handoff_map[event.name]tool_context=ToolContext(context=self._context_wrapper.context,usage=self._context_wrapper.usage,tool_name=event.name,tool_call_id=event.call_id,tool_arguments=event.arguments,)# Execute the handoff to get the new agentresult=awaithandoff.on_invoke_handoff(self._context_wrapper,event.arguments)ifnotisinstance(result,RealtimeAgent):raiseUserError(f"Handoff{handoff.tool_name} returned invalid result:{type(result)}")# Store previous agent for eventprevious_agent=agent# Update current agentself._current_agent=result# Get updated model settings from new agentupdated_settings=awaitself._get_updated_model_settings_from_agent(starting_settings=None,agent=self._current_agent,)# Send handoff eventawaitself._put_event(RealtimeHandoffEvent(from_agent=previous_agent,to_agent=self._current_agent,info=self._event_info,))# First, send the session update so the model receives the new instructionsawaitself._model.send_event(RealtimeModelSendSessionUpdate(session_settings=updated_settings))# Then send tool output to complete the handoff (this triggers a new response)transfer_message=handoff.get_transfer_message(result)awaitself._model.send_event(RealtimeModelSendToolOutput(tool_call=event,output=transfer_message,start_response=True,))else:raiseModelBehaviorError(f"Tool{event.name} not found")@classmethoddef_get_new_history(cls,old_history:list[RealtimeItem],event:RealtimeModelInputAudioTranscriptionCompletedEvent|RealtimeItem,)->list[RealtimeItem]:ifisinstance(event,RealtimeModelInputAudioTranscriptionCompletedEvent):new_history:list[RealtimeItem]=[]existing_item_found=Falseforiteminold_history:ifitem.item_id==event.item_idanditem.type=="message"anditem.role=="user":content:list[InputText|InputAudio]=[]forentryinitem.content:ifentry.type=="input_audio":copied_entry=entry.model_copy(update={"transcript":event.transcript})content.append(copied_entry)else:content.append(entry)# type: ignorenew_history.append(item.model_copy(update={"content":content,"status":"completed"}))existing_item_found=Trueelse:new_history.append(item)ifexisting_item_foundisFalse:new_history.append(UserMessageItem(item_id=event.item_id,content=[InputText(text=event.transcript)]))returnnew_history# TODO (rm) Add support for audio storage config# If the item already exists, update itexisting_index=next((ifori,iteminenumerate(old_history)ifitem.item_id==event.item_id),None)ifexisting_indexisnotNone:new_history=old_history.copy()ifevent.type=="message"andevent.contentisnotNoneandlen(event.content)>0:existing_item=old_history[existing_index]ifexisting_item.type=="message":# Merge content preserving existing transcript/text when incoming entry is emptyifevent.role=="assistant"andexisting_item.role=="assistant":assistant_existing_content=existing_item.contentassistant_incoming=event.contentassistant_new_content:list[AssistantText|AssistantAudio]=[]foridx,acinenumerate(assistant_incoming):ifidx>=len(assistant_existing_content):assistant_new_content.append(ac)continueassistant_current=assistant_existing_content[idx]ifac.type=="audio":ifac.transcriptisNone:assistant_new_content.append(assistant_current)else:assistant_new_content.append(ac)else:# textcur_text=(assistant_current.textifisinstance(assistant_current,AssistantText)elseNone)ifcur_textisnotNoneandac.textisNone:assistant_new_content.append(assistant_current)else:assistant_new_content.append(ac)updated_assistant=event.model_copy(update={"content":assistant_new_content})new_history[existing_index]=updated_assistantelifevent.role=="user"andexisting_item.role=="user":user_existing_content=existing_item.contentuser_incoming=event.content# Start from incoming content (prefer latest fields)user_new_content:list[InputText|InputAudio|InputImage]=list(user_incoming)# Merge by type with special handling for images and transcriptsdef_image_url_str(val:object)->str|None:ifisinstance(val,InputImage):returnval.image_urlorNonereturnNone# 1) Preserve any existing images that are missing from the incoming payloadincoming_image_urls:set[str]=set()forpartinuser_incoming:ifisinstance(part,InputImage):u=_image_url_str(part)ifu:incoming_image_urls.add(u)missing_images:list[InputImage]=[]forpartinuser_existing_content:ifisinstance(part,InputImage):u=_image_url_str(part)ifuandunotinincoming_image_urls:missing_images.append(part)# Insert missing images at the beginning to keep them visible and stableifmissing_images:user_new_content=missing_images+user_new_content# 2) For text/audio entries, preserve existing when incoming entry is emptymerged:list[InputText|InputAudio|InputImage]=[]foridx,ucinenumerate(user_new_content):ifuc.type=="input_audio":# Attempt to preserve transcript if emptytranscript=getattr(uc,"transcript",None)iftranscriptisNoneandidx<len(user_existing_content):prev=user_existing_content[idx]ifisinstance(prev,InputAudio)andprev.transcriptisnotNone:uc=uc.model_copy(update={"transcript":prev.transcript})merged.append(uc)elifuc.type=="input_text":text=getattr(uc,"text",None)if(textisNoneortext=="")andidx<len(user_existing_content):prev=user_existing_content[idx]ifisinstance(prev,InputText)andprev.text:uc=uc.model_copy(update={"text":prev.text})merged.append(uc)else:merged.append(uc)updated_user=event.model_copy(update={"content":merged})new_history[existing_index]=updated_userelifevent.role=="system"andexisting_item.role=="system":system_existing_content=existing_item.contentsystem_incoming=event.content# Prefer existing non-empty text when incoming is emptysystem_new_content:list[InputText]=[]foridx,scinenumerate(system_incoming):ifidx>=len(system_existing_content):system_new_content.append(sc)continuesystem_current=system_existing_content[idx]cur_text=system_current.textifcur_textisnotNoneandsc.textisNone:system_new_content.append(system_current)else:system_new_content.append(sc)updated_system=event.model_copy(update={"content":system_new_content})new_history[existing_index]=updated_systemelse:# Role changed or mismatched; just replacenew_history[existing_index]=eventelse:# If the existing item is not a message, just replace it.new_history[existing_index]=eventreturnnew_history# Otherwise, insert it after the previous_item_id if that is setelifevent.previous_item_id:# Insert the new item after the previous itemprevious_index=next((ifori,iteminenumerate(old_history)ifitem.item_id==event.previous_item_id),None,)ifprevious_indexisnotNone:new_history=old_history.copy()new_history.insert(previous_index+1,event)returnnew_history# Otherwise, add it to the endreturnold_history+[event]asyncdef_run_output_guardrails(self,text:str,response_id:str)->bool:"""Run output guardrails on the given text. Returns True if any guardrail was triggered."""combined_guardrails=self._current_agent.output_guardrails+self._run_config.get("output_guardrails",[])seen_ids:set[int]=set()output_guardrails=[]forguardrailincombined_guardrails:guardrail_id=id(guardrail)ifguardrail_idnotinseen_ids:output_guardrails.append(guardrail)seen_ids.add(guardrail_id)# If we've already interrupted this response, skipifnotoutput_guardrailsorresponse_idinself._interrupted_response_ids:returnFalsetriggered_results=[]forguardrailinoutput_guardrails:try:result=awaitguardrail.run(# TODO (rm) Remove this cast, it's wrongself._context_wrapper,cast(Agent[Any],self._current_agent),text,)ifresult.output.tripwire_triggered:triggered_results.append(result)exceptException:# Continue with other guardrails if one failscontinueiftriggered_results:# Double-check: bail if already interrupted for this responseifresponse_idinself._interrupted_response_ids:returnFalse# Mark as interrupted immediately (before any awaits) to minimize race windowself._interrupted_response_ids.add(response_id)# Emit guardrail tripped eventawaitself._put_event(RealtimeGuardrailTripped(guardrail_results=triggered_results,message=text,info=self._event_info,))# Interrupt the modelawaitself._model.send_event(RealtimeModelSendInterrupt(force_response_cancel=True))# Send guardrail triggered messageguardrail_names=[result.guardrail.get_name()forresultintriggered_results]awaitself._model.send_event(RealtimeModelSendUserInput(user_input=f"guardrail triggered:{', '.join(guardrail_names)}"))returnTruereturnFalsedef_enqueue_guardrail_task(self,text:str,response_id:str)->None:# Runs the guardrails in a separate task to avoid blocking the main looptask=asyncio.create_task(self._run_output_guardrails(text,response_id))self._guardrail_tasks.add(task)# Add callback to remove completed tasks and handle exceptionstask.add_done_callback(self._on_guardrail_task_done)def_on_guardrail_task_done(self,task:asyncio.Task[Any])->None:"""Handle completion of a guardrail task."""# Remove from tracking setself._guardrail_tasks.discard(task)# Check for exceptions and propagate as eventsifnottask.cancelled():exception=task.exception()ifexception:# Create an exception event instead of raisingasyncio.create_task(self._put_event(RealtimeError(info=self._event_info,error={"message":f"Guardrail task failed:{str(exception)}"},)))def_cleanup_guardrail_tasks(self)->None:fortaskinself._guardrail_tasks:ifnottask.done():task.cancel()self._guardrail_tasks.clear()def_enqueue_tool_call_task(self,event:RealtimeModelToolCallEvent,agent_snapshot:RealtimeAgent)->None:"""Run tool calls in the background to avoid blocking realtime transport."""task=asyncio.create_task(self._handle_tool_call(event,agent_snapshot=agent_snapshot))self._tool_call_tasks.add(task)task.add_done_callback(self._on_tool_call_task_done)def_on_tool_call_task_done(self,task:asyncio.Task[Any])->None:self._tool_call_tasks.discard(task)iftask.cancelled():returnexception=task.exception()ifexceptionisNone:returnlogger.exception("Realtime tool call task failed",exc_info=exception)ifself._stored_exceptionisNone:self._stored_exception=exceptionasyncio.create_task(self._put_event(RealtimeError(info=self._event_info,error={"message":f"Tool call task failed:{exception}"},)))def_cleanup_tool_call_tasks(self)->None:fortaskinself._tool_call_tasks:ifnottask.done():task.cancel()self._tool_call_tasks.clear()asyncdef_cleanup(self)->None:"""Clean up all resources and mark session as closed."""# Cancel and cleanup guardrail tasksself._cleanup_guardrail_tasks()self._cleanup_tool_call_tasks()# Remove ourselves as a listenerself._model.remove_listener(self)# Close the model connectionawaitself._model.close()# Mark as closedself._closed=Trueasyncdef_get_updated_model_settings_from_agent(self,starting_settings:RealtimeSessionModelSettings|None,agent:RealtimeAgent,)->RealtimeSessionModelSettings:# Start with the merged base settings from run and model configuration.updated_settings=self._base_model_settings.copy()ifagent.promptisnotNone:updated_settings["prompt"]=agent.promptinstructions,tools,handoffs=awaitasyncio.gather(agent.get_system_prompt(self._context_wrapper),agent.get_all_tools(self._context_wrapper),self._get_handoffs(agent,self._context_wrapper),)updated_settings["instructions"]=instructionsor""updated_settings["tools"]=toolsor[]updated_settings["handoffs"]=handoffsor[]# Apply starting settings (from model config) nextifstarting_settings:updated_settings.update(starting_settings)disable_tracing=self._run_config.get("tracing_disabled",False)ifdisable_tracing:updated_settings["tracing"]=Nonereturnupdated_settings@classmethodasyncdef_get_handoffs(cls,agent:RealtimeAgent[Any],context_wrapper:RunContextWrapper[Any])->list[Handoff[Any,RealtimeAgent[Any]]]:handoffs:list[Handoff[Any,RealtimeAgent[Any]]]=[]forhandoff_iteminagent.handoffs:ifisinstance(handoff_item,Handoff):handoffs.append(handoff_item)elifisinstance(handoff_item,RealtimeAgent):handoffs.append(realtime_handoff(handoff_item))asyncdef_check_handoff_enabled(handoff_obj:Handoff[Any,RealtimeAgent[Any]])->bool:attr=handoff_obj.is_enabledifisinstance(attr,bool):returnattrres=attr(context_wrapper,agent)ifinspect.isawaitable(res):returnawaitresreturnresresults=awaitasyncio.gather(*(_check_handoff_enabled(h)forhinhandoffs))enabled=[hforh,okinzip(handoffs,results)ifok]returnenabled

modelproperty

Access the underlying model for adding listeners or other direct interaction.

__init__

__init__(model:RealtimeModel,agent:RealtimeAgent,context:TContext|None,model_config:RealtimeModelConfig|None=None,run_config:RealtimeRunConfig|None=None,)->None

Initialize the session.

Parameters:

NameTypeDescriptionDefault
modelRealtimeModel

The model to use.

required
agentRealtimeAgent

The current agent.

required
contextTContext | None

The context object.

required
model_configRealtimeModelConfig | None

Model configuration.

None
run_configRealtimeRunConfig | None

Runtime configuration including guardrails.

None
Source code insrc/agents/realtime/session.py
def__init__(self,model:RealtimeModel,agent:RealtimeAgent,context:TContext|None,model_config:RealtimeModelConfig|None=None,run_config:RealtimeRunConfig|None=None,)->None:"""Initialize the session.    Args:        model: The model to use.        agent: The current agent.        context: The context object.        model_config: Model configuration.        run_config: Runtime configuration including guardrails.    """self._model=modelself._current_agent=agentself._context_wrapper=RunContextWrapper(context)self._event_info=RealtimeEventInfo(context=self._context_wrapper)self._history:list[RealtimeItem]=[]self._model_config=model_configor{}self._run_config=run_configor{}initial_model_settings=self._model_config.get("initial_model_settings")run_config_settings=self._run_config.get("model_settings")self._base_model_settings:RealtimeSessionModelSettings={**(run_config_settingsor{}),**(initial_model_settingsor{}),}self._event_queue:asyncio.Queue[RealtimeSessionEvent]=asyncio.Queue()self._closed=Falseself._stored_exception:BaseException|None=None# Guardrails state trackingself._interrupted_response_ids:set[str]=set()self._item_transcripts:dict[str,str]={}# item_id -> accumulated transcriptself._item_guardrail_run_counts:dict[str,int]={}# item_id -> run countself._debounce_text_length=self._run_config.get("guardrails_settings",{}).get("debounce_text_length",100)self._guardrail_tasks:set[asyncio.Task[Any]]=set()self._tool_call_tasks:set[asyncio.Task[Any]]=set()self._async_tool_calls:bool=bool(self._run_config.get("async_tool_calls",True))

__aenter__async

__aenter__()->RealtimeSession

Start the session by connecting to the model. After this, you will be able to streamevents from the model and send messages and audio to the model.

Source code insrc/agents/realtime/session.py
asyncdef__aenter__(self)->RealtimeSession:"""Start the session by connecting to the model. After this, you will be able to stream    events from the model and send messages and audio to the model.    """# Add ourselves as a listenerself._model.add_listener(self)model_config=self._model_config.copy()model_config["initial_model_settings"]=awaitself._get_updated_model_settings_from_agent(starting_settings=self._model_config.get("initial_model_settings",None),agent=self._current_agent,)# Connect to the modelawaitself._model.connect(model_config)# Emit initial history updateawaitself._put_event(RealtimeHistoryUpdated(history=self._history,info=self._event_info,))returnself

enterasync

Enter the async context manager. We strongly recommend using the async context managerpattern instead of this method. If you use this, you need to manually callclose() whenyou are done.

Source code insrc/agents/realtime/session.py
asyncdefenter(self)->RealtimeSession:"""Enter the async context manager. We strongly recommend using the async context manager    pattern instead of this method. If you use this, you need to manually call `close()` when    you are done.    """returnawaitself.__aenter__()

__aexit__async

__aexit__(_exc_type:Any,_exc_val:Any,_exc_tb:Any)->None

End the session.

Source code insrc/agents/realtime/session.py
asyncdef__aexit__(self,_exc_type:Any,_exc_val:Any,_exc_tb:Any)->None:"""End the session."""awaitself.close()

__aiter__async

__aiter__()->AsyncIterator[RealtimeSessionEvent]

Iterate over events from the session.

Source code insrc/agents/realtime/session.py
asyncdef__aiter__(self)->AsyncIterator[RealtimeSessionEvent]:"""Iterate over events from the session."""whilenotself._closed:try:# Check if there's a stored exception to raiseifself._stored_exceptionisnotNone:# Clean up resources before raisingawaitself._cleanup()raiseself._stored_exceptionevent=awaitself._event_queue.get()yieldeventexceptasyncio.CancelledError:break

closeasync

close()->None

Close the session.

Source code insrc/agents/realtime/session.py
asyncdefclose(self)->None:"""Close the session."""awaitself._cleanup()

send_messageasync

send_message(message:RealtimeUserInput)->None

Send a message to the model.

Source code insrc/agents/realtime/session.py
asyncdefsend_message(self,message:RealtimeUserInput)->None:"""Send a message to the model."""awaitself._model.send_event(RealtimeModelSendUserInput(user_input=message))

send_audioasync

send_audio(audio:bytes,*,commit:bool=False)->None

Send a raw audio chunk to the model.

Source code insrc/agents/realtime/session.py
asyncdefsend_audio(self,audio:bytes,*,commit:bool=False)->None:"""Send a raw audio chunk to the model."""awaitself._model.send_event(RealtimeModelSendAudio(audio=audio,commit=commit))

interruptasync

interrupt()->None

Interrupt the model.

Source code insrc/agents/realtime/session.py
asyncdefinterrupt(self)->None:"""Interrupt the model."""awaitself._model.send_event(RealtimeModelSendInterrupt())

update_agentasync

update_agent(agent:RealtimeAgent)->None

Update the active agent for this session and apply its settings to the model.

Source code insrc/agents/realtime/session.py
asyncdefupdate_agent(self,agent:RealtimeAgent)->None:"""Update the active agent for this session and apply its settings to the model."""self._current_agent=agentupdated_settings=awaitself._get_updated_model_settings_from_agent(starting_settings=None,agent=self._current_agent,)awaitself._model.send_event(RealtimeModelSendSessionUpdate(session_settings=updated_settings))

[8]ページ先頭

©2009-2025 Movatter.jp