@@ -458,7 +458,7 @@ pub async fn run_evaluation_core_streaming(
458458} ;
459459
460460// Process all datapoints across all variants
461- let ( mut join_set, task_id_map) =process_batch ( & batch_params, dataset, & variants) ;
461+ let ( mut join_set, task_id_map) =process_batch ( & batch_params, dataset, & variants) . await ? ;
462462
463463// Get a shared reference to the evaluation config (which includes evaluators)
464464let evaluators = evaluation_config. clone ( ) ;
@@ -745,10 +745,10 @@ pub enum BatchItemResult {
745745}
746746
747747/// Return type for process_batch: a JoinSet of results and a map from task ID to (datapoint_id, variant).
748- type ProcessBatchResult =(
748+ type ProcessBatchResult =Result < (
749749JoinSet < Result < DatapointVariantResult > > ,
750750HashMap < tokio:: task:: Id , ( Uuid , Arc < EvaluationVariant > ) > ,
751- ) ;
751+ ) > ;
752752
753753/// Process a batch of datapoints across all variants.
754754///
@@ -771,7 +771,7 @@ type ProcessBatchResult = (
771771/// # Returns
772772/// A JoinSet that yields `DatapointVariantResult` as tasks complete, along with a mapping
773773/// from task ID to (datapoint_id, variant) for error reporting.
774- pub fn process_batch (
774+ pub async fn process_batch (
775775params : & ProcessBatchParams ,
776776datapoints : Vec < Datapoint > ,
777777variants : & [ Arc < EvaluationVariant > ] ,
@@ -782,8 +782,22 @@ pub fn process_batch(
782782let EvaluationConfig :: Inference ( inference_evaluation_config) =& * params. evaluation_config ;
783783let function_name = inference_evaluation_config. function_name . clone ( ) ;
784784
785+ // Pre-resolve all datapoint inputs before spawning tasks (avoids redundant work per variant)
786+ let mut datapoints_with_inputs: Vec < ( Arc < Datapoint > , Arc < Input > ) > =
787+ Vec :: with_capacity ( datapoints. len ( ) ) ;
785788for datapointin datapoints{
786- let datapoint =Arc :: new ( datapoint) ;
789+ let stored_input = datapoint
790+ . input ( )
791+ . clone ( )
792+ . into_stored_input_without_file_handling ( ) ?;
793+ let resolved_input = stored_input
794+ . reresolve ( & params. clients . tensorzero_client )
795+ . await ?;
796+ let input =Arc :: new ( resolved_input_to_client_input ( resolved_input) ?) ;
797+ datapoints_with_inputs. push ( ( Arc :: new ( datapoint) , input) ) ;
798+ }
799+
800+ for ( datapoint, input) in datapoints_with_inputs{
787801let datapoint_id = datapoint. id ( ) ;
788802
789803for variantin variants{
@@ -800,6 +814,7 @@ pub fn process_batch(
800814let variant_for_map = variant. clone ( ) ; // Clone before moving into async block
801815let datapoint = datapoint. clone ( ) ;
802816let function_name = function_name. clone ( ) ;
817+ let input = input. clone ( ) ;
803818
804819// Skip feedback for dynamic variants (they're not production-ready)
805820let send_feedback = !matches ! ( variant. as_ref( ) , EvaluationVariant :: Info ( _) ) ;
@@ -813,14 +828,6 @@ pub fn process_batch(
813828anyhow ! ( "Function '{function_name}' not found in function configs table" )
814829} ) ?;
815830
816- // Resolve input from datapoint
817- let stored_input = datapoint
818- . input ( )
819- . clone ( )
820- . into_stored_input_without_file_handling ( ) ?;
821- let resolved_input = stored_input. reresolve ( & clients. tensorzero_client ) . await ?;
822- let input =Arc :: new ( resolved_input_to_client_input ( resolved_input) ?) ;
823-
824831// Run inference
825832let inference_response =Arc :: new (
826833infer_datapoint ( InferDatapointParams {
@@ -886,7 +893,7 @@ pub fn process_batch(
886893}
887894}
888895
889- ( join_set, task_id_map)
896+ Ok ( ( join_set, task_id_map) )
890897}
891898
892899/// Collect results from a JoinSet into BatchItemResults.