@@ -293,7 +293,10 @@ pub async fn search(
293293 log:: info!(
294294"{}" ,
295295 search_inspector_fields(
296- format!( "[trace_id {trace_id}] cache done" ) ,
296+ format!(
297+ "[trace_id {trace_id}] search for cache is done, took: {} ms" ,
298+ start. elapsed( ) . as_millis( )
299+ ) ,
297300SearchInspectorFieldsBuilder :: new( )
298301. node_name( LOCAL_NODE . name. clone( ) )
299302. component( "summary" . to_string( ) )
@@ -423,6 +426,10 @@ pub async fn search(
423426 && res. new_end_time . is_none ( )
424427 && res. function_error . is_empty ( )
425428 && !res. hits . is_empty ( ) ;
429+ log:: info!(
430+ "[trace_id {trace_id}] should_cache_results: {should_cache_results}, is_http2_streaming: {is_http2_streaming}, hits: {}" ,
431+ res. hits. len( )
432+ ) ;
426433if should_cache_results
427434 &&( results. first ( ) . is_some_and ( |res| !res. hits . is_empty ( ) )
428435 || results. last ( ) . is_some_and ( |res| !res. hits . is_empty ( ) ) )
@@ -583,27 +590,27 @@ pub async fn prepare_cache_response(
583590#[ allow( clippy:: too_many_arguments) ]
584591pub fn merge_response (
585592trace_id : & str ,
586- cache_responses : & mut Vec < config:: meta:: search:: Response > ,
587- search_response : & mut Vec < config:: meta:: search:: Response > ,
593+ cached_responses : & mut Vec < config:: meta:: search:: Response > ,
594+ search_responses : & mut Vec < config:: meta:: search:: Response > ,
588595ts_column : & str ,
589596limit : i64 ,
590597is_descending : bool ,
591598cache_took : usize ,
592599order_by : Vec < ( String , OrderBy ) > ,
593600) -> config:: meta:: search:: Response {
594- cache_responses . retain ( |res| !res. hits . is_empty ( ) ) ;
595- search_response . retain ( |res| !res. hits . is_empty ( ) ) ;
601+ cached_responses . retain ( |res| !res. hits . is_empty ( ) ) ;
602+ search_responses . retain ( |res| !res. hits . is_empty ( ) ) ;
596603
597- if cache_responses . is_empty ( ) &&search_response . is_empty ( ) {
604+ if cached_responses . is_empty ( ) &&search_responses . is_empty ( ) {
598605return config:: meta:: search:: Response :: default ( ) ;
599606}
600607let mut fn_error =vec ! [ ] ;
601608
602- let mut cache_response =if cache_responses . is_empty ( ) {
609+ let mut cache_response =if cached_responses . is_empty ( ) {
603610 config:: meta:: search:: Response :: default ( )
604611} else {
605612let mut resp = config:: meta:: search:: Response :: default ( ) ;
606- for resin cache_responses {
613+ for resin cached_responses {
607614 resp. total += res. total ;
608615 resp. scan_size += res. scan_size ;
609616 resp. scan_records += res. scan_records ;
@@ -621,13 +628,15 @@ pub fn merge_response(
621628} ;
622629
623630if cache_response. hits . is_empty ( )
624- && !search_response . is_empty ( )
625- &&search_response
631+ && !search_responses . is_empty ( )
632+ &&search_responses
626633. first ( )
627634. is_none_or ( |res| res. hits . is_empty ( ) )
628- && search_response. last ( ) . is_none_or ( |res| res. hits . is_empty ( ) )
635+ && search_responses
636+ . last ( )
637+ . is_none_or ( |res| res. hits . is_empty ( ) )
629638{
630- for resin search_response {
639+ for resin search_responses {
631640 cache_response. total += res. total ;
632641 cache_response. scan_size += res. scan_size ;
633642 cache_response. took += res. took ;
@@ -639,7 +648,7 @@ pub fn merge_response(
639648 cache_response. function_error = fn_error;
640649return cache_response;
641650}
642- let cache_hits_len = cache_response. hits . len ( ) ;
651+ let cached_hits_len = cache_response. hits . len ( ) ;
643652
644653 cache_response. scan_size =0 ;
645654
@@ -648,7 +657,7 @@ pub fn merge_response(
648657
649658let mut res_took =ResponseTook :: default ( ) ;
650659
651- for resin search_response . clone ( ) {
660+ for resin search_responses . clone ( ) {
652661 cache_response. total += res. total ;
653662 cache_response. scan_size += res. scan_size ;
654663 cache_response. took += res. took ;
@@ -682,32 +691,32 @@ pub fn merge_response(
682691 cache_response. total = cache_response. hits . len ( ) ;
683692}
684693
685- if !search_response . is_empty ( ) {
686- cache_response. cached_ratio = files_cache_ratio /search_response . len ( ) ;
694+ if !search_responses . is_empty ( ) {
695+ cache_response. cached_ratio = files_cache_ratio /search_responses . len ( ) ;
687696}
688697 cache_response. size = cache_response. hits . len ( ) as i64 ;
689698 log:: info!(
690- "[trace_id {trace_id}]cache_response. hits. len: {},Result cache len: {}" ,
691- cache_hits_len ,
699+ "[trace_id {trace_id}]cached hits len: {},result cache len: {}" ,
700+ cached_hits_len ,
692701 result_cache_len
693702) ;
694703 cache_response. took_detail = res_took;
695- cache_response. order_by =search_response
704+ cache_response. order_by =search_responses
696705. first ( )
697706. map ( |res| res. order_by )
698707. unwrap_or_default ( ) ;
699- cache_response. order_by_metadata =search_response
708+ cache_response. order_by_metadata =search_responses
700709. first ( )
701710. map ( |res| res. order_by_metadata . clone ( ) )
702711. unwrap_or_default ( ) ;
703- cache_response. result_cache_ratio =( ( ( cache_hits_len as f64 ) * 100_f64 )
704- /( ( result_cache_len +cache_hits_len ) as f64 ) )
712+ cache_response. result_cache_ratio =( ( ( cached_hits_len as f64 ) * 100_f64 )
713+ /( ( result_cache_len +cached_hits_len ) as f64 ) )
705714as usize ;
706715if !fn_error. is_empty ( ) {
707716 cache_response. function_error . extend ( fn_error) ;
708717 cache_response. is_partial =true ;
709718}
710- cache_response. is_histogram_eligible =search_response
719+ cache_response. is_histogram_eligible =search_responses
711720. first ( )
712721. map ( |res| res. is_histogram_eligible )
713722. unwrap_or_default ( ) ;
@@ -836,6 +845,7 @@ pub async fn write_results(
836845// 1. alignment time range for incomplete records for histogram
837846let mut accept_start_time = req_query_start_time;
838847let mut accept_end_time = req_query_end_time;
848+ let mut need_adjust_end_time =false ;
839849if is_aggregate
840850 &&let Some ( interval) = res. histogram_interval
841851 && interval >0
@@ -847,6 +857,7 @@ pub async fn write_results(
847857}
848858// previous interval of end_time
849859if ( accept_end_time % interval) !=0 {
860+ need_adjust_end_time =true ;
850861 accept_end_time = accept_end_time -( accept_end_time % interval) - interval;
851862}
852863}
@@ -858,12 +869,9 @@ pub async fn write_results(
858869let ( data_start_time, data_end_time) =
859870extract_timestamp_range ( & res. hits , ts_column, is_time_ordered) ;
860871let delay_ts =second_micros ( get_config ( ) . limit . cache_delay_secs ) ;
861- let accept_end_time = std:: cmp:: min ( Utc :: now ( ) . timestamp_micros ( ) - delay_ts, accept_end_time) ;
862-
863- // Track if we need to recalculate timestamp range after filtering
864- let needs_filtering = data_start_time < accept_start_time || data_end_time > accept_end_time;
865-
866- if needs_filtering{
872+ let mut accept_end_time =
873+ std:: cmp:: min ( Utc :: now ( ) . timestamp_micros ( ) - delay_ts, accept_end_time) ;
874+ if data_start_time < accept_start_time || data_end_time > accept_end_time{
867875 res. hits . retain ( |hit|{
868876if let Some ( hit_ts) = hit. get ( ts_column)
869877 &&let Some ( hit_ts_datetime) =convert_ts_value_to_datetime ( hit_ts)
@@ -885,27 +893,26 @@ pub async fn write_results(
885893return ;
886894}
887895
888- // 3.5. Determine final time range for cache filename
889- // If we filtered data, recalculate; otherwise use the original data range
890- let ( final_start_time, final_end_time) =if needs_filtering{
891- // Recalculate after filtering - only happens when data was actually filtered
892- extract_timestamp_range ( & res. hits , ts_column, is_time_ordered)
893- } else {
894- // No filtering occurred, use the original data range
895- ( data_start_time, data_end_time)
896- } ;
897-
898896// 4. check if the time range is less than discard_duration
899- if ( final_end_time -final_start_time ) < delay_ts{
897+ if ( accept_end_time -accept_start_time ) < delay_ts{
900898 log:: info!( "[trace_id {trace_id}] Time range is too short for caching, skipping caching" ) ;
901899return ;
902900}
903901
904- // 5. cache to disk
902+ // 5. adjust the cache time range
903+ if need_adjust_end_time
904+ && is_aggregate
905+ &&let Some ( interval) = res. histogram_interval
906+ && interval >0
907+ {
908+ accept_end_time += interval* 1000 * 1000 ;
909+ }
910+
911+ // 6. cache to disk
905912let file_name =format ! (
906913"{}_{}_{}_{}.json" ,
907- final_start_time ,
908- final_end_time ,
914+ accept_start_time ,
915+ accept_end_time ,
909916if is_aggregate{ 1 } else{ 0 } ,
910917if is_descending{ 1 } else{ 0 }
911918) ;
@@ -919,8 +926,8 @@ pub async fn write_results(
919926& file_name,
920927 res_cache,
921928 clear_cache,
922- Some ( final_start_time ) ,
923- Some ( final_end_time ) ,
929+ Some ( accept_start_time ) ,
930+ Some ( accept_end_time ) ,
924931)
925932. await
926933{
@@ -934,8 +941,8 @@ pub async fn write_results(
934941. entry ( query_key)
935942. or_insert_with ( Vec :: new)
936943. push ( ResultCacheMeta {
937- start_time : final_start_time ,
938- end_time : final_end_time ,
944+ start_time : accept_start_time ,
945+ end_time : accept_end_time ,
939946 is_aggregate,
940947 is_descending,
941948} ) ;