classStreamedAudioResult:"""The output of a `VoicePipeline`. Streams events and audio data as they're generated."""def__init__(self,tts_model:TTSModel,tts_settings:TTSModelSettings,voice_pipeline_config:VoicePipelineConfig,):"""Create a new `StreamedAudioResult` instance. Args: tts_model: The TTS model to use. tts_settings: The TTS settings to use. voice_pipeline_config: The voice pipeline config to use. """self.tts_model=tts_modelself.tts_settings=tts_settingsself.total_output_text=""self.instructions=tts_settings.instructionsself.text_generation_task:asyncio.Task[Any]|None=Noneself._voice_pipeline_config=voice_pipeline_configself._text_buffer=""self._turn_text_buffer=""self._queue:asyncio.Queue[VoiceStreamEvent]=asyncio.Queue()self._tasks:list[asyncio.Task[Any]]=[]self._ordered_tasks:list[asyncio.Queue[VoiceStreamEvent|None]]=[]# New: list to hold local queues for each text segmentself._dispatcher_task:asyncio.Task[Any]|None=(None# Task to dispatch audio chunks in order)self._done_processing=Falseself._buffer_size=tts_settings.buffer_sizeself._started_processing_turn=Falseself._first_byte_received=Falseself._generation_start_time:str|None=Noneself._completed_session=Falseself._stored_exception:BaseException|None=Noneself._tracing_span:Span[SpeechGroupSpanData]|None=Noneasyncdef_start_turn(self):ifself._started_processing_turn:returnself._tracing_span=speech_group_span()self._tracing_span.start()self._started_processing_turn=Trueself._first_byte_received=Falseself._generation_start_time=time_iso()awaitself._queue.put(VoiceStreamEventLifecycle(event="turn_started"))def_set_task(self,task:asyncio.Task[Any]):self.text_generation_task=taskasyncdef_add_error(self,error:Exception):awaitself._queue.put(VoiceStreamEventError(error))def_transform_audio_buffer(self,buffer:list[bytes],output_dtype:npt.DTypeLike)->npt.NDArray[np.int16|np.float32]:np_array=np.frombuffer(b"".join(buffer),dtype=np.int16)ifoutput_dtype==np.int16:returnnp_arrayelifoutput_dtype==np.float32:return(np_array.astype(np.float32)/32767.0).reshape(-1,1)else:raiseUserError("Invalid output dtype")asyncdef_stream_audio(self,text:str,local_queue:asyncio.Queue[VoiceStreamEvent|None],finish_turn:bool=False,):withspeech_span(model=self.tts_model.model_name,input=textifself._voice_pipeline_config.trace_include_sensitive_dataelse"",model_config={"voice":self.tts_settings.voice,"instructions":self.instructions,"speed":self.tts_settings.speed,},output_format="pcm",parent=self._tracing_span,)astts_span:try:first_byte_received=Falsebuffer:list[bytes]=[]full_audio_data:list[bytes]=[]asyncforchunkinself.tts_model.run(text,self.tts_settings):ifnotfirst_byte_received:first_byte_received=Truetts_span.span_data.first_content_at=time_iso()ifchunk:buffer.append(chunk)full_audio_data.append(chunk)iflen(buffer)>=self._buffer_size:audio_np=self._transform_audio_buffer(buffer,self.tts_settings.dtype)ifself.tts_settings.transform_data:audio_np=self.tts_settings.transform_data(audio_np)awaitlocal_queue.put(VoiceStreamEventAudio(data=audio_np))# Use local queuebuffer=[]ifbuffer:audio_np=self._transform_audio_buffer(buffer,self.tts_settings.dtype)ifself.tts_settings.transform_data:audio_np=self.tts_settings.transform_data(audio_np)awaitlocal_queue.put(VoiceStreamEventAudio(data=audio_np))# Use local queueifself._voice_pipeline_config.trace_include_sensitive_audio_data:tts_span.span_data.output=_audio_to_base64(full_audio_data)else:tts_span.span_data.output=""iffinish_turn:awaitlocal_queue.put(VoiceStreamEventLifecycle(event="turn_ended"))else:awaitlocal_queue.put(None)# Signal completion for this segmentexceptExceptionase:tts_span.set_error({"message":str(e),"data":{"text":textifself._voice_pipeline_config.trace_include_sensitive_dataelse"",},})logger.error(f"Error streaming audio:{e}")# Signal completion for whole session because of errorawaitlocal_queue.put(VoiceStreamEventLifecycle(event="session_ended"))raiseeasyncdef_add_text(self,text:str):awaitself._start_turn()self._text_buffer+=textself.total_output_text+=textself._turn_text_buffer+=textcombined_sentences,self._text_buffer=self.tts_settings.text_splitter(self._text_buffer)iflen(combined_sentences)>=20:local_queue:asyncio.Queue[VoiceStreamEvent|None]=asyncio.Queue()self._ordered_tasks.append(local_queue)self._tasks.append(asyncio.create_task(self._stream_audio(combined_sentences,local_queue)))ifself._dispatcher_taskisNone:self._dispatcher_task=asyncio.create_task(self._dispatch_audio())asyncdef_turn_done(self):ifself._text_buffer:local_queue:asyncio.Queue[VoiceStreamEvent|None]=asyncio.Queue()self._ordered_tasks.append(local_queue)# Append the local queue for the final segmentself._tasks.append(asyncio.create_task(self._stream_audio(self._text_buffer,local_queue,finish_turn=True)))self._text_buffer=""self._done_processing=Trueifself._dispatcher_taskisNone:self._dispatcher_task=asyncio.create_task(self._dispatch_audio())awaitasyncio.gather(*self._tasks)def_finish_turn(self):ifself._tracing_span:ifself._voice_pipeline_config.trace_include_sensitive_data:self._tracing_span.span_data.input=self._turn_text_bufferelse:self._tracing_span.span_data.input=""self._tracing_span.finish()self._tracing_span=Noneself._turn_text_buffer=""self._started_processing_turn=Falseasyncdef_done(self):self._completed_session=Trueawaitself._wait_for_completion()asyncdef_dispatch_audio(self):# Dispatch audio chunks from each segment in the order they were addedwhileTrue:iflen(self._ordered_tasks)==0:ifself._completed_session:breakawaitasyncio.sleep(0)continuelocal_queue=self._ordered_tasks.pop(0)whileTrue:chunk=awaitlocal_queue.get()ifchunkisNone:breakawaitself._queue.put(chunk)ifisinstance(chunk,VoiceStreamEventLifecycle):local_queue.task_done()ifchunk.event=="turn_ended":self._finish_turn()breakawaitself._queue.put(VoiceStreamEventLifecycle(event="session_ended"))asyncdef_wait_for_completion(self):tasks:list[asyncio.Task[Any]]=self._tasksifself._dispatcher_taskisnotNone:tasks.append(self._dispatcher_task)awaitasyncio.gather(*tasks)def_cleanup_tasks(self):self._finish_turn()fortaskinself._tasks:ifnottask.done():task.cancel()ifself._dispatcher_taskandnotself._dispatcher_task.done():self._dispatcher_task.cancel()ifself.text_generation_taskandnotself.text_generation_task.done():self.text_generation_task.cancel()def_check_errors(self):fortaskinself._tasks:iftask.done():iftask.exception():self._stored_exception=task.exception()breakasyncdefstream(self)->AsyncIterator[VoiceStreamEvent]:"""Stream the events and audio data as they're generated."""whileTrue:try:event=awaitself._queue.get()exceptasyncio.CancelledError:breakifisinstance(event,VoiceStreamEventError):self._stored_exception=event.errorlogger.error(f"Error processing output:{event.error}")breakifeventisNone:breakyieldeventifevent.type=="voice_stream_event_lifecycle"andevent.event=="session_ended":breakself._check_errors()self._cleanup_tasks()ifself._stored_exception:raiseself._stored_exception