Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit43dc49f

Browse files
authored
fix: result cache for histogram (#9483) (#9486)
1 parentbb0a180 commit43dc49f

File tree

8 files changed

+98
-119
lines changed

8 files changed

+98
-119
lines changed

‎src/config/src/meta/search.rs‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1327,7 +1327,7 @@ const AGGREGATION_CACHE_INTERVALS: [(Option<Duration>, Interval); 6] = [
13271327
(Duration::try_minutes(15),Interval::FiveMinutes),
13281328
];
13291329

1330-
#[derive(Debug,Clone,Copy,PartialEq)]
1330+
#[derive(Debug,Clone,Copy,PartialEq,PartialOrd)]
13311331
pubenumInterval{
13321332
Zero =0,
13331333
FiveMinutes =5,

‎src/service/search/cache/cacher.rs‎

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ pub async fn check_cache(
358358
}),
359359
req.query.start_time,
360360
req.query.end_time,
361+
histogram_interval,
361362
&mut deltas,
362363
);
363364

@@ -388,9 +389,13 @@ pub async fn check_cache(
388389
}
389390
}
390391
};
391-
multi_resp.deltas = c_resp.deltas.clone();
392392
multi_resp.has_cached_data = c_resp.has_cached_data;
393-
multi_resp.cached_response.push(c_resp);
393+
if !c_resp.deltas.is_empty(){
394+
multi_resp.deltas = c_resp.deltas.clone();
395+
};
396+
if c_resp.has_cached_data{
397+
multi_resp.cached_response.push(c_resp);
398+
}
394399
multi_resp.took = start.elapsed().as_millis()asusize;
395400
multi_resp.cache_query_response =true;
396401
multi_resp.limit = sql.limitasi64;
@@ -482,19 +487,22 @@ pub async fn get_cached_results(
482487
let last_ts =get_ts_value(&cache_req.ts_column, cached_response.hits.last().unwrap());
483488
let data_start_time = std::cmp::min(first_ts, last_ts);
484489
let data_end_time = std::cmp::max(first_ts, last_ts);
490+
// convert histogram interval to microseconds
491+
let histogram_interval = cached_response.histogram_interval.unwrap_or_default()*1_000_000;
485492
// check if need to filter the data
486493
if data_start_time < cache_req.q_start_time || data_end_time > cache_req.q_end_time{
487494
cached_response.hits.retain(|hit|{
488495
let hit_ts =get_ts_value(&cache_req.ts_column, hit);
489-
hit_ts < hits_allowed_end_time && hit_ts >= hits_allowed_start_time
496+
hit_ts+ histogram_interval< hits_allowed_end_time && hit_ts >= hits_allowed_start_time
490497
});
491498
// if the data is empty after filtering, return None
492499
if cached_response.hits.is_empty(){
493500
returnNone;
494501
}
495502
// reset the start and end time
496503
let first_ts =get_ts_value(&cache_req.ts_column, cached_response.hits.first().unwrap());
497-
let last_ts =get_ts_value(&cache_req.ts_column, cached_response.hits.last().unwrap());
504+
let last_ts =get_ts_value(&cache_req.ts_column, cached_response.hits.last().unwrap())
505+
+ histogram_interval;
498506
matching_meta.start_time = std::cmp::min(first_ts, last_ts);
499507
matching_meta.end_time = std::cmp::max(first_ts, last_ts);
500508
}
@@ -523,6 +531,7 @@ pub fn calculate_deltas(
523531
result_meta:&ResultCacheMeta,
524532
query_start_time:i64,
525533
query_end_time:i64,
534+
histogram_interval:i64,
526535
deltas:&mutVec<QueryDelta>,
527536
){
528537
if query_start_time == result_meta.start_time && query_end_time == result_meta.end_time{
@@ -535,8 +544,13 @@ pub fn calculate_deltas(
535544
// for delta start time we need to add 1 microsecond to the end time
536545
// because we will include the start_time, if we don't add 1 microsecond
537546
// the start_time will be include in the next search, we will get duplicate data
547+
let delta_start_time =if histogram_interval >0{
548+
result_meta.end_time
549+
}else{
550+
result_meta.end_time +1
551+
};
538552
deltas.push(QueryDelta{
539-
delta_start_time: result_meta.end_time +1,
553+
delta_start_time,
540554
delta_end_time: query_end_time,
541555
});
542556
}

‎src/service/search/cache/mod.rs‎

Lines changed: 54 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,10 @@ pub async fn search(
293293
log::info!(
294294
"{}",
295295
search_inspector_fields(
296-
format!("[trace_id {trace_id}] cache done"),
296+
format!(
297+
"[trace_id {trace_id}] search for cache is done, took: {} ms",
298+
start.elapsed().as_millis()
299+
),
297300
SearchInspectorFieldsBuilder::new()
298301
.node_name(LOCAL_NODE.name.clone())
299302
.component("summary".to_string())
@@ -423,6 +426,10 @@ pub async fn search(
423426
&& res.new_end_time.is_none()
424427
&& res.function_error.is_empty()
425428
&& !res.hits.is_empty();
429+
log::info!(
430+
"[trace_id {trace_id}] should_cache_results: {should_cache_results}, is_http2_streaming: {is_http2_streaming}, hits: {}",
431+
res.hits.len()
432+
);
426433
if should_cache_results
427434
&&(results.first().is_some_and(|res| !res.hits.is_empty())
428435
|| results.last().is_some_and(|res| !res.hits.is_empty()))
@@ -583,27 +590,27 @@ pub async fn prepare_cache_response(
583590
#[allow(clippy::too_many_arguments)]
584591
pubfnmerge_response(
585592
trace_id:&str,
586-
cache_responses:&mutVec<config::meta::search::Response>,
587-
search_response:&mutVec<config::meta::search::Response>,
593+
cached_responses:&mutVec<config::meta::search::Response>,
594+
search_responses:&mutVec<config::meta::search::Response>,
588595
ts_column:&str,
589596
limit:i64,
590597
is_descending:bool,
591598
cache_took:usize,
592599
order_by:Vec<(String,OrderBy)>,
593600
) -> config::meta::search::Response{
594-
cache_responses.retain(|res| !res.hits.is_empty());
595-
search_response.retain(|res| !res.hits.is_empty());
601+
cached_responses.retain(|res| !res.hits.is_empty());
602+
search_responses.retain(|res| !res.hits.is_empty());
596603

597-
ifcache_responses.is_empty() &&search_response.is_empty(){
604+
ifcached_responses.is_empty() &&search_responses.is_empty(){
598605
return config::meta::search::Response::default();
599606
}
600607
letmut fn_error =vec![];
601608

602-
letmut cache_response =ifcache_responses.is_empty(){
609+
letmut cache_response =ifcached_responses.is_empty(){
603610
config::meta::search::Response::default()
604611
}else{
605612
letmut resp = config::meta::search::Response::default();
606-
for resincache_responses{
613+
for resincached_responses{
607614
resp.total += res.total;
608615
resp.scan_size += res.scan_size;
609616
resp.scan_records += res.scan_records;
@@ -621,13 +628,15 @@ pub fn merge_response(
621628
};
622629

623630
if cache_response.hits.is_empty()
624-
&& !search_response.is_empty()
625-
&&search_response
631+
&& !search_responses.is_empty()
632+
&&search_responses
626633
.first()
627634
.is_none_or(|res| res.hits.is_empty())
628-
&& search_response.last().is_none_or(|res| res.hits.is_empty())
635+
&& search_responses
636+
.last()
637+
.is_none_or(|res| res.hits.is_empty())
629638
{
630-
for resinsearch_response{
639+
for resinsearch_responses{
631640
cache_response.total += res.total;
632641
cache_response.scan_size += res.scan_size;
633642
cache_response.took += res.took;
@@ -639,7 +648,7 @@ pub fn merge_response(
639648
cache_response.function_error = fn_error;
640649
return cache_response;
641650
}
642-
letcache_hits_len = cache_response.hits.len();
651+
letcached_hits_len = cache_response.hits.len();
643652

644653
cache_response.scan_size =0;
645654

@@ -648,7 +657,7 @@ pub fn merge_response(
648657

649658
letmut res_took =ResponseTook::default();
650659

651-
for resinsearch_response.clone(){
660+
for resinsearch_responses.clone(){
652661
cache_response.total += res.total;
653662
cache_response.scan_size += res.scan_size;
654663
cache_response.took += res.took;
@@ -682,32 +691,32 @@ pub fn merge_response(
682691
cache_response.total = cache_response.hits.len();
683692
}
684693

685-
if !search_response.is_empty(){
686-
cache_response.cached_ratio = files_cache_ratio /search_response.len();
694+
if !search_responses.is_empty(){
695+
cache_response.cached_ratio = files_cache_ratio /search_responses.len();
687696
}
688697
cache_response.size = cache_response.hits.len()asi64;
689698
log::info!(
690-
"[trace_id {trace_id}]cache_response.hits.len: {},Result cache len: {}",
691-
cache_hits_len,
699+
"[trace_id {trace_id}]cachedhitslen: {},result cache len: {}",
700+
cached_hits_len,
692701
result_cache_len
693702
);
694703
cache_response.took_detail = res_took;
695-
cache_response.order_by =search_response
704+
cache_response.order_by =search_responses
696705
.first()
697706
.map(|res| res.order_by)
698707
.unwrap_or_default();
699-
cache_response.order_by_metadata =search_response
708+
cache_response.order_by_metadata =search_responses
700709
.first()
701710
.map(|res| res.order_by_metadata.clone())
702711
.unwrap_or_default();
703-
cache_response.result_cache_ratio =(((cache_hits_lenasf64)*100_f64)
704-
/((result_cache_len +cache_hits_len)asf64))
712+
cache_response.result_cache_ratio =(((cached_hits_lenasf64)*100_f64)
713+
/((result_cache_len +cached_hits_len)asf64))
705714
asusize;
706715
if !fn_error.is_empty(){
707716
cache_response.function_error.extend(fn_error);
708717
cache_response.is_partial =true;
709718
}
710-
cache_response.is_histogram_eligible =search_response
719+
cache_response.is_histogram_eligible =search_responses
711720
.first()
712721
.map(|res| res.is_histogram_eligible)
713722
.unwrap_or_default();
@@ -836,6 +845,7 @@ pub async fn write_results(
836845
// 1. alignment time range for incomplete records for histogram
837846
letmut accept_start_time = req_query_start_time;
838847
letmut accept_end_time = req_query_end_time;
848+
letmut need_adjust_end_time =false;
839849
if is_aggregate
840850
&&letSome(interval) = res.histogram_interval
841851
&& interval >0
@@ -847,6 +857,7 @@ pub async fn write_results(
847857
}
848858
// previous interval of end_time
849859
if(accept_end_time % interval) !=0{
860+
need_adjust_end_time =true;
850861
accept_end_time = accept_end_time -(accept_end_time % interval) - interval;
851862
}
852863
}
@@ -858,12 +869,9 @@ pub async fn write_results(
858869
let(data_start_time, data_end_time) =
859870
extract_timestamp_range(&res.hits, ts_column, is_time_ordered);
860871
let delay_ts =second_micros(get_config().limit.cache_delay_secs);
861-
let accept_end_time = std::cmp::min(Utc::now().timestamp_micros() - delay_ts, accept_end_time);
862-
863-
// Track if we need to recalculate timestamp range after filtering
864-
let needs_filtering = data_start_time < accept_start_time || data_end_time > accept_end_time;
865-
866-
if needs_filtering{
872+
letmut accept_end_time =
873+
std::cmp::min(Utc::now().timestamp_micros() - delay_ts, accept_end_time);
874+
if data_start_time < accept_start_time || data_end_time > accept_end_time{
867875
res.hits.retain(|hit|{
868876
ifletSome(hit_ts) = hit.get(ts_column)
869877
&&letSome(hit_ts_datetime) =convert_ts_value_to_datetime(hit_ts)
@@ -885,27 +893,26 @@ pub async fn write_results(
885893
return;
886894
}
887895

888-
// 3.5. Determine final time range for cache filename
889-
// If we filtered data, recalculate; otherwise use the original data range
890-
let(final_start_time, final_end_time) =if needs_filtering{
891-
// Recalculate after filtering - only happens when data was actually filtered
892-
extract_timestamp_range(&res.hits, ts_column, is_time_ordered)
893-
}else{
894-
// No filtering occurred, use the original data range
895-
(data_start_time, data_end_time)
896-
};
897-
898896
// 4. check if the time range is less than discard_duration
899-
if(final_end_time -final_start_time) < delay_ts{
897+
if(accept_end_time -accept_start_time) < delay_ts{
900898
log::info!("[trace_id {trace_id}] Time range is too short for caching, skipping caching");
901899
return;
902900
}
903901

904-
// 5. cache to disk
902+
// 5. adjust the cache time range
903+
if need_adjust_end_time
904+
&& is_aggregate
905+
&&letSome(interval) = res.histogram_interval
906+
&& interval >0
907+
{
908+
accept_end_time += interval*1000*1000;
909+
}
910+
911+
// 6. cache to disk
905912
let file_name =format!(
906913
"{}_{}_{}_{}.json",
907-
final_start_time,
908-
final_end_time,
914+
accept_start_time,
915+
accept_end_time,
909916
if is_aggregate{1} else{0},
910917
if is_descending{1} else{0}
911918
);
@@ -919,8 +926,8 @@ pub async fn write_results(
919926
&file_name,
920927
res_cache,
921928
clear_cache,
922-
Some(final_start_time),
923-
Some(final_end_time),
929+
Some(accept_start_time),
930+
Some(accept_end_time),
924931
)
925932
.await
926933
{
@@ -934,8 +941,8 @@ pub async fn write_results(
934941
.entry(query_key)
935942
.or_insert_with(Vec::new)
936943
.push(ResultCacheMeta{
937-
start_time:final_start_time,
938-
end_time:final_end_time,
944+
start_time:accept_start_time,
945+
end_time:accept_end_time,
939946
is_aggregate,
940947
is_descending,
941948
});

‎src/service/search/mod.rs‎

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -813,12 +813,11 @@ pub async fn search_partition(
813813
returnOk(response);
814814
};
815815

816-
log::info!("[trace_id {trace_id}] search_partition: getting nodes");
817816
let nodes = infra_cluster::get_cached_online_querier_nodes(Some(RoleGroup::Interactive))
818817
.await
819818
.unwrap_or_default();
820819
if nodes.is_empty(){
821-
log::error!("no querier node online");
820+
log::error!("[trace_id {trace_id}] search_partition:no querier node online");
822821
returnErr(Error::Message("no querier node online".to_string()));
823822
}
824823
let cpu_cores = nodes.iter().map(|n| n.cpu_num).sum::<u64>()asusize;
@@ -1045,8 +1044,13 @@ pub async fn search_partition(
10451044
query.end_time,
10461045
)
10471046
}else{
1048-
matchdiscover_cache_for_query(&cache_file_path, query.start_time, query.end_time)
1049-
.await
1047+
matchdiscover_cache_for_query(
1048+
&cache_file_path,
1049+
query.start_time,
1050+
query.end_time,
1051+
cache_interval,
1052+
)
1053+
.await
10501054
{
10511055
Ok(result) => result,
10521056
Err(e) =>{

‎src/service/search/partition.rs‎

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,8 @@ impl PartitionGenerator {
158158
order_by:OrderBy,
159159
add_mini_partition:bool,
160160
) ->Vec<[i64;2]>{
161-
let mini_partition_size =self.calculate_mini_partition_size(step);
162-
163161
letmut partitions =if add_mini_partition{
162+
let mini_partition_size =self.calculate_mini_partition_size(step);
164163
self.generate_histogram_aligned_partitions_with_mini(
165164
start_time,
166165
end_time,
@@ -190,13 +189,13 @@ impl PartitionGenerator {
190189
// Handle the alignment for the first and last partition
191190
letmut new_end = end_time;
192191
let last_partition_step = end_time %self.min_step;
193-
if last_partition_step >0 && duration >self.min_step*3{
192+
if last_partition_step >0 && duration >self.min_step{
194193
new_end = end_time - last_partition_step;
195194
partitions.push([new_end, end_time]);
196195
}
197196
letmut new_start = start_time;
198197
let last_partition_step = start_time %self.min_step;
199-
if last_partition_step >0 && duration >self.min_step*3{
198+
if last_partition_step >0 && duration >self.min_step{
200199
new_start = start_time +self.min_step - last_partition_step;
201200
partitions.push([start_time, new_start]);
202201
}

‎src/service/search/streaming/cache.rs‎

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,9 @@ pub async fn write_results_to_cache(
4545
returnOk(());
4646
}
4747

48+
let start = std::time::Instant::now();
4849
log::info!(
49-
"[HTTP2_STREAM]: Writing results to file for trace_id: {}, file_path: {}, accumulated_results len: {}",
50+
"[HTTP2_STREAM trace_id {}] Writing results to file: {}, accumulated_results len: {}",
5051
c_resp.trace_id,
5152
c_resp.file_path,
5253
accumulated_results.len()
@@ -104,9 +105,10 @@ pub async fn write_results_to_cache(
104105
)
105106
.await;
106107
log::info!(
107-
"[HTTP2_STREAM]: Results written to file for trace_id: {},file_path: {}",
108+
"[HTTP2_STREAM trace_id {}] Results written to file: {},async cache task created, took: {} ms",
108109
c_resp.trace_id,
109110
c_resp.file_path,
111+
start.elapsed().as_millis()
110112
);
111113
}
112114

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp