classVoicePipeline:"""An opinionated voice agent pipeline. It works in three steps: 1. Transcribe audio input into text. 2. Run the provided `workflow`, which produces a sequence of text responses. 3. Convert the text responses into streaming audio output. """def__init__(self,*,workflow:VoiceWorkflowBase,stt_model:STTModel|str|None=None,tts_model:TTSModel|str|None=None,config:VoicePipelineConfig|None=None,):"""Create a new voice pipeline. Args: workflow: The workflow to run. See `VoiceWorkflowBase`. stt_model: The speech-to-text model to use. If not provided, a default OpenAI model will be used. tts_model: The text-to-speech model to use. If not provided, a default OpenAI model will be used. config: The pipeline configuration. If not provided, a default configuration will be used. """self.workflow=workflowself.stt_model=stt_modelifisinstance(stt_model,STTModel)elseNoneself.tts_model=tts_modelifisinstance(tts_model,TTSModel)elseNoneself._stt_model_name=stt_modelifisinstance(stt_model,str)elseNoneself._tts_model_name=tts_modelifisinstance(tts_model,str)elseNoneself.config=configorVoicePipelineConfig()asyncdefrun(self,audio_input:AudioInput|StreamedAudioInput)->StreamedAudioResult:"""Run the voice pipeline. Args: audio_input: The audio input to process. This can either be an `AudioInput` instance, which is a single static buffer, or a `StreamedAudioInput` instance, which is a stream of audio data that you can append to. Returns: A `StreamedAudioResult` instance. You can use this object to stream audio events and play them out. """ifisinstance(audio_input,AudioInput):returnawaitself._run_single_turn(audio_input)elifisinstance(audio_input,StreamedAudioInput):returnawaitself._run_multi_turn(audio_input)else:raiseUserError(f"Unsupported audio input type:{type(audio_input)}")def_get_tts_model(self)->TTSModel:ifnotself.tts_model:self.tts_model=self.config.model_provider.get_tts_model(self._tts_model_name)returnself.tts_modeldef_get_stt_model(self)->STTModel:ifnotself.stt_model:self.stt_model=self.config.model_provider.get_stt_model(self._stt_model_name)returnself.stt_modelasyncdef_process_audio_input(self,audio_input:AudioInput)->str:model=self._get_stt_model()returnawaitmodel.transcribe(audio_input,self.config.stt_settings,self.config.trace_include_sensitive_data,self.config.trace_include_sensitive_audio_data,)asyncdef_run_single_turn(self,audio_input:AudioInput)->StreamedAudioResult:# Since this is single turn, we can use the TraceCtxManager to manage starting/ending the# tracewithTraceCtxManager(workflow_name=self.config.workflow_nameor"Voice Agent",trace_id=None,# Automatically generatedgroup_id=self.config.group_id,metadata=self.config.trace_metadata,disabled=self.config.tracing_disabled,):input_text=awaitself._process_audio_input(audio_input)output=StreamedAudioResult(self._get_tts_model(),self.config.tts_settings,self.config)asyncdefstream_events():try:asyncfortext_eventinself.workflow.run(input_text):awaitoutput._add_text(text_event)awaitoutput._turn_done()awaitoutput._done()exceptExceptionase:logger.error(f"Error processing single turn:{e}")awaitoutput._add_error(e)raiseeoutput._set_task(asyncio.create_task(stream_events()))returnoutputasyncdef_run_multi_turn(self,audio_input:StreamedAudioInput)->StreamedAudioResult:withTraceCtxManager(workflow_name=self.config.workflow_nameor"Voice Agent",trace_id=None,group_id=self.config.group_id,metadata=self.config.trace_metadata,disabled=self.config.tracing_disabled,):output=StreamedAudioResult(self._get_tts_model(),self.config.tts_settings,self.config)try:asyncforintro_textinself.workflow.on_start():awaitoutput._add_text(intro_text)exceptExceptionase:logger.warning(f"on_start() failed:{e}")transcription_session=awaitself._get_stt_model().create_session(audio_input,self.config.stt_settings,self.config.trace_include_sensitive_data,self.config.trace_include_sensitive_audio_data,)asyncdefprocess_turns():try:asyncforinput_textintranscription_session.transcribe_turns():result=self.workflow.run(input_text)asyncfortext_eventinresult:awaitoutput._add_text(text_event)awaitoutput._turn_done()exceptExceptionase:logger.error(f"Error processing turns:{e}")awaitoutput._add_error(e)raiseefinally:awaittranscription_session.close()awaitoutput._done()output._set_task(asyncio.create_task(process_turns()))returnoutput