Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8ed5061

Browse files
authored
Merge branch 'main' into feat/metrics-cache
2 parents8bd579c +d360151 commit8ed5061

File tree

13 files changed

+96
-134
lines changed

13 files changed

+96
-134
lines changed

‎src/handler/http/request/status/mod.rs‎

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -419,11 +419,6 @@ pub async fn cache_status() -> Result<HttpResponse, Error> {
419419
}),
420420
);
421421

422-
let last_file_list_offset = db::compact::file_list::get_offset().await.unwrap();
423-
stats.insert(
424-
"COMPACT",
425-
json::json!({"file_list_offset": last_file_list_offset}),
426-
);
427422
stats.insert(
428423
"DATAFUSION",
429424
json::json!({"file_stat_cache":{

‎src/service/compact/mod.rs‎

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,13 @@ pub async fn run_merge(job_tx: mpsc::Sender<worker::MergeJob>) -> Result<(), any
366366
ifLOCAL_NODE.name.ne(&node_name){
367367
need_release_ids.push(job.id);// not this node
368368
}
369+
370+
// check if there is another job running for this stream
371+
if db::compact::stream::is_running(&job.stream){
372+
need_release_ids.push(job.id);// another job is running
373+
}else{
374+
db::compact::stream::set_running(&job.stream);
375+
}
369376
}
370377
}
371378

‎src/service/compact/worker.rs‎

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,14 @@ impl JobScheduler {
129129
e
130130
);
131131
}
132+
// release locked stream
133+
let key =format!(
134+
"{}/{}/{}",
135+
job.org_id,
136+
job.stream_type.as_str(),
137+
job.stream_name
138+
);
139+
crate::service::db::compact::stream::clear_running(&key);
132140
}
133141
}
134142
}

‎src/service/db/compact/file_list.rs‎

Lines changed: 0 additions & 102 deletions
This file was deleted.

‎src/service/db/compact/mod.rs‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515

1616
pubmod compactor_manual_jobs;
1717
pubmod downsampling;
18-
pubmod file_list;
1918
pubmod files;
2019
pubmod organization;
2120
pubmod retention;
2221
pubmod stats;
22+
pubmod stream;

‎src/service/db/compact/stream.rs‎

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright 2025 OpenObserve Inc.
2+
//
3+
// This program is free software: you can redistribute it and/or modify
4+
// it under the terms of the GNU Affero General Public License as published by
5+
// the Free Software Foundation, either version 3 of the License, or
6+
// (at your option) any later version.
7+
//
8+
// This program is distributed in the hope that it will be useful
9+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
// GNU Affero General Public License for more details.
12+
//
13+
// You should have received a copy of the GNU Affero General Public License
14+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
15+
16+
use config::RwHashSet;
17+
use once_cell::sync::Lazy;
18+
19+
staticSTREAMS:Lazy<RwHashSet<String>> =Lazy::new(Default::default);
20+
21+
pubfnis_running(stream:&str) ->bool{
22+
STREAMS.contains(stream)
23+
}
24+
25+
pubfnset_running(stream:&str){
26+
STREAMS.insert(stream.to_string());
27+
}
28+
29+
pubfnclear_running(stream:&str){
30+
STREAMS.remove(stream);
31+
}

‎src/service/organization.rs‎

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,11 @@ use crate::{
6161
pubasyncfnget_summary(org_id:&str) ->OrgSummary{
6262
let streams =get_streams(org_id,None,false,None).await;
6363
letmut stream_summary =StreamSummary::default();
64+
letmut has_trigger_stream =false;
6465
for streamin streams.iter(){
66+
if stream.name == usage::TRIGGERS_USAGE_STREAM{
67+
has_trigger_stream =true;
68+
}
6569
if !stream.stream_type.eq(&StreamType::Index)
6670
&& !stream.stream_type.eq(&StreamType::Metadata)
6771
{
@@ -73,19 +77,23 @@ pub async fn get_summary(org_id: &str) -> OrgSummary {
7377
}
7478
}
7579

76-
let sql =format!(
77-
"SELECT module, status FROM {} WHERE org = '{}' GROUP BY module, status, key",
78-
usage::TRIGGERS_USAGE_STREAM,
79-
org_id
80-
);
81-
let end_time = time::now_micros();
82-
let start_time = end_time - time::second_micros(900);// 15 mins
83-
let trigger_status_results = self_reporting::search::get_usage(sql, start_time, end_time)
84-
.await
85-
.unwrap_or_default()
86-
.into_iter()
87-
.filter_map(|v| json::from_value::<TriggerStatusSearchResult>(v).ok())
88-
.collect::<Vec<_>>();
80+
let trigger_status_results =if !has_trigger_stream{
81+
vec![]
82+
}else{
83+
let sql =format!(
84+
"SELECT module, status FROM {} WHERE org = '{}' GROUP BY module, status, key",
85+
usage::TRIGGERS_USAGE_STREAM,
86+
org_id
87+
);
88+
let end_time = time::now_micros();
89+
let start_time = end_time - time::second_micros(900);// 15 mins
90+
self_reporting::search::get_usage(sql, start_time, end_time)
91+
.await
92+
.unwrap_or_default()
93+
.into_iter()
94+
.filter_map(|v| json::from_value::<TriggerStatusSearchResult>(v).ok())
95+
.collect::<Vec<_>>()
96+
};
8997

9098
let pipelines = db::pipeline::list_by_org(org_id).await.unwrap_or_default();
9199
let pipeline_summary =PipelineSummary{

‎src/service/promql/engine.rs‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1319,7 +1319,7 @@ async fn selector_load_data_from_datafusion(
13191319
}
13201320

13211321
log::info!(
1322-
"[trace_id: {trace_id}] load series took: {:?}",
1322+
"[trace_id: {trace_id}] loadhashing andseries took: {:?}",
13231323
start_time.elapsed()
13241324
);
13251325

‎src/service/promql/search/mod.rs‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ async fn search_in_cluster(
257257
let scan_stats = response.scan_stats.as_ref().unwrap();
258258

259259
log::info!(
260-
"[trace_id {trace_id}] promql->search->grpc: result node: {}, need_wal: {}, files: {}, scan_size: {}bytes, took: {} ms",
260+
"[trace_id {trace_id}] promql->search->grpc: result node: {}, need_wal: {}, files: {}, scan_size: {}mb, took: {} ms",
261261
&node.get_grpc_addr(),
262262
req_need_wal,
263263
scan_stats.files,
@@ -322,7 +322,7 @@ async fn search_in_cluster(
322322
returnErr(server_internal_error("invalid result type"));
323323
};
324324
log::info!(
325-
"[trace_id {trace_id}] promql->search->result: files: {}, scan_size: {}bytes, took: {} ms",
325+
"[trace_id {trace_id}] promql->search->result: files: {}, scan_size: {}mb, took: {} ms",
326326
scan_stats.files,
327327
scan_stats.original_size,
328328
op_start.elapsed().as_millis(),

‎web/src/components/dashboards/addPanel/ChartSelection.vue‎

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
3939
item.id != 'metric' &&
4040
item.id != 'gauge' &&
4141
item.id != 'html' &&
42-
item.id != 'markdown') ||
43-
(allowedchartstype &&
44-
allowedchartstype.length > 0 &&
42+
item.id != 'markdown' &&
43+
item.id != 'custom_chart') ||
44+
(allowedchartstype &&
45+
allowedchartstype.length > 0 &&
4546
!allowedchartstype.includes(item.id))
4647
"
4748
:key="index"

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp