Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitfa0672a

Browse files
authored
feat: add super cluster support for metrics (#9225)
1 parentb75aa57 commitfa0672a

File tree

22 files changed

+1008
-407
lines changed

22 files changed

+1008
-407
lines changed

‎src/common/infra/cluster/nats.rs‎

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -147,18 +147,8 @@ async fn register() -> Result<()> {
147147
id: new_node_id,
148148
uuid:LOCAL_NODE.uuid.clone(),
149149
name: cfg.common.instance_name.clone(),
150-
http_addr:format!(
151-
"{}://{}:{}",
152-
get_http_schema(),
153-
get_local_http_ip(),
154-
cfg.http.port
155-
),
156-
grpc_addr:format!(
157-
"{}://{}:{}",
158-
get_grpc_schema(),
159-
get_local_grpc_ip(),
160-
cfg.grpc.port
161-
),
150+
http_addr:get_local_http_addr(),
151+
grpc_addr:get_local_grpc_addr(),
162152
role:LOCAL_NODE.role.clone(),
163153
role_group:LOCAL_NODE.role_group,
164154
cpu_num: cfg.limit.cpu_numasu64,
@@ -229,18 +219,8 @@ pub(crate) async fn set_status(status: NodeStatus) -> Result<()> {
229219
id:LOCAL_NODE_ID.load(Ordering::Relaxed),
230220
uuid:LOCAL_NODE.uuid.clone(),
231221
name: cfg.common.instance_name.clone(),
232-
http_addr:format!(
233-
"{}://{}:{}",
234-
get_http_schema(),
235-
get_local_http_ip(),
236-
cfg.http.port
237-
),
238-
grpc_addr:format!(
239-
"{}://{}:{}",
240-
get_grpc_schema(),
241-
get_local_grpc_ip(),
242-
cfg.grpc.port
243-
),
222+
http_addr:get_local_http_addr(),
223+
grpc_addr:get_local_grpc_addr(),
244224
role:LOCAL_NODE.role.clone(),
245225
role_group:LOCAL_NODE.role_group,
246226
cpu_num: cfg.limit.cpu_numasu64,

‎src/config/src/cluster.rs‎

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,8 @@ pub fn load_local_node() -> Node {
3838
role:load_local_node_role(),
3939
role_group:load_role_group(),
4040
name: cfg.common.instance_name.clone(),
41-
http_addr:format!(
42-
"{}://{}:{}",
43-
get_http_schema(),
44-
get_local_http_ip(),
45-
cfg.http.port
46-
),
47-
grpc_addr:format!(
48-
"{}://{}:{}",
49-
get_grpc_schema(),
50-
get_local_grpc_ip(),
51-
cfg.grpc.port
52-
),
41+
http_addr:get_local_http_addr(),
42+
grpc_addr:get_local_grpc_addr(),
5343
cpu_num: cfg.limit.cpu_numasu64,
5444
scheduled:false,
5545
broadcasted:false,
@@ -77,6 +67,24 @@ pub fn load_role_group() -> RoleGroup {
7767
RoleGroup::from(get_config().common.node_role_group.as_str())
7868
}
7969

70+
pubfnget_local_http_addr() ->String{
71+
format!(
72+
"{}://{}:{}",
73+
get_http_schema(),
74+
get_local_http_ip(),
75+
get_config().http.port
76+
)
77+
}
78+
79+
pubfnget_local_grpc_addr() ->String{
80+
format!(
81+
"{}://{}:{}",
82+
get_grpc_schema(),
83+
get_local_grpc_ip(),
84+
get_config().grpc.port
85+
)
86+
}
87+
8088
pubfnget_local_http_ip() ->String{
8189
let cfg =get_config();
8290
if !cfg.http.addr.is_empty(){

‎src/config/src/meta/cluster.rs‎

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub trait NodeInfo: Debug + Send + Sync {
3737
fnget_cluster(&self) ->String{
3838
crate::config::get_cluster_name()
3939
}
40+
fnis_local(&self) ->bool;
4041
}
4142

4243
#[derive(Clone,Debug,PartialEq,Serialize,Deserialize,ToSchema)]
@@ -162,6 +163,10 @@ impl NodeInfo for Node {
162163
fnget_grpc_addr(&self) ->String{
163164
self.grpc_addr.clone()
164165
}
166+
167+
fnis_local(&self) ->bool{
168+
self.grpc_addr ==crate::cluster::get_local_grpc_addr()
169+
}
165170
}
166171

167172
pubtraitIntoArcVec{
@@ -301,3 +306,14 @@ pub enum CompactionJobType {
301306
Current,
302307
Historical,
303308
}
309+
310+
#[cfg(test)]
311+
mod tests{
312+
usesuper::*;
313+
314+
#[test]
315+
fntest_node_is_local(){
316+
let local_node =&*crate::cluster::LOCAL_NODE;
317+
assert!(local_node.is_local());
318+
}
319+
}

‎src/config/src/meta/promql/mod.rs‎

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,54 @@
1616
use hashbrown::HashMap;
1717
use proto::prometheus_rpc;
1818
use regex::Regex;
19-
use serde::{Deserialize,Serialize};
19+
use serde::{Deserialize,Deserializer,Serialize};
2020
use strum::Display;
2121
use utoipa::ToSchema;
2222

23+
usecrate::meta::search::SearchEventType;
24+
25+
/// Custom deserializer that accepts either a comma-separated string or a string array
26+
fndeserialize_string_or_vec<'de,D>(deserializer:D) ->Result<Vec<String>,D::Error>
27+
where
28+
D:Deserializer<'de>,
29+
{
30+
use serde::de::{self,SeqAccess,Visitor};
31+
32+
structStringOrVec;
33+
34+
impl<'de>Visitor<'de>forStringOrVec{
35+
typeValue =Vec<String>;
36+
37+
fnexpecting(&self,formatter:&mut std::fmt::Formatter) -> std::fmt::Result{
38+
formatter.write_str("a string or array of strings")
39+
}
40+
41+
fnvisit_str<E>(self,value:&str) ->Result<Self::Value,E>
42+
where
43+
E: de::Error,
44+
{
45+
if value.is_empty(){
46+
Ok(Vec::new())
47+
}else{
48+
Ok(value.split(',').map(|s| s.trim().to_string()).collect())
49+
}
50+
}
51+
52+
fnvisit_seq<A>(self,mutseq:A) ->Result<Self::Value,A::Error>
53+
where
54+
A:SeqAccess<'de>,
55+
{
56+
letmut vec =Vec::new();
57+
whileletSome(item) = seq.next_element::<String>()?{
58+
vec.push(item);
59+
}
60+
Ok(vec)
61+
}
62+
}
63+
64+
deserializer.deserialize_any(StringOrVec)
65+
}
66+
2367
pubmod grpc;
2468
pubmod value;
2569

@@ -152,6 +196,12 @@ pub struct RequestRangeQuery {
152196
pubuse_cache:Option<bool>,
153197
/// Use streaming output.
154198
pubuse_streaming:Option<bool>,
199+
#[serde(skip_serializing_if ="Option::is_none",default)]
200+
pubsearch_type:Option<SearchEventType>,
201+
#[serde(default, deserialize_with ="deserialize_string_or_vec")]
202+
pubregions:Vec<String>,// default query all regions, local: only query local region clusters
203+
#[serde(default, deserialize_with ="deserialize_string_or_vec")]
204+
pubclusters:Vec<String>,// default query all clusters, local: only query local cluster
155205
}
156206

157207
#[derive(Debug,Deserialize)]
@@ -376,6 +426,33 @@ mod tests {
376426
assert_eq!(MetricType::Unknown.to_string(),"unknown");
377427
}
378428

429+
#[test]
430+
fntest_deserialize_string_or_vec(){
431+
// Test with comma-separated string
432+
let json =r#"{"regions": "region1,region2,region3", "clusters": "cluster1"}"#;
433+
let result:RequestRangeQuery = serde_json::from_str(json).unwrap();
434+
assert_eq!(result.regions, vec!["region1","region2","region3"]);
435+
assert_eq!(result.clusters, vec!["cluster1"]);
436+
437+
// Test with array
438+
let json =r#"{"regions": ["region1", "region2"], "clusters": ["cluster1", "cluster2"]}"#;
439+
let result:RequestRangeQuery = serde_json::from_str(json).unwrap();
440+
assert_eq!(result.regions, vec!["region1","region2"]);
441+
assert_eq!(result.clusters, vec!["cluster1","cluster2"]);
442+
443+
// Test with empty string
444+
let json =r#"{"regions": "", "clusters": []}"#;
445+
let result:RequestRangeQuery = serde_json::from_str(json).unwrap();
446+
assert!(result.regions.is_empty());
447+
assert!(result.clusters.is_empty());
448+
449+
// Test with default (missing fields)
450+
let json =r#"{}"#;
451+
let result:RequestRangeQuery = serde_json::from_str(json).unwrap();
452+
assert!(result.regions.is_empty());
453+
assert!(result.clusters.is_empty());
454+
}
455+
379456
#[test]
380457
fntest_api_func_response_serialize(){
381458
let ok =ApiFuncResponse::ok("hello".to_owned(),None);

‎src/config/src/meta/promql/value.rs‎

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use serde::{
2626

2727
usecrate::{
2828
FxIndexMap,
29-
meta::promql::NAME_LABEL,
29+
meta::{promql::NAME_LABEL, search::SearchEventType},
3030
utils::{json, sort::sort_float},
3131
};
3232

@@ -404,10 +404,10 @@ pub struct EvalContext {
404404
implEvalContext{
405405
pubfnnew(start:i64,end:i64,step:i64,trace_id:String) ->Self{
406406
Self{
407+
trace_id,
407408
start,
408409
end,
409410
step,
410-
trace_id,
411411
}
412412
}
413413

@@ -429,6 +429,21 @@ impl EvalContext {
429429
}
430430
}
431431

432+
#[derive(Debug,Clone)]
433+
pubstructQueryContext{
434+
pubtrace_id:String,
435+
puborg_id:String,
436+
pubquery_exemplars:bool,
437+
pubquery_data:bool,
438+
pubneed_wal:bool,
439+
pubuse_cache:bool,
440+
pubtimeout:u64,// seconds, query timeout
441+
pubsearch_event_type:Option<SearchEventType>,
442+
pubregions:Vec<String>,
443+
pubclusters:Vec<String>,
444+
pubis_super_cluster:bool,
445+
}
446+
432447
#[derive(Debug,Default,Clone)]
433448
pubstructRangeValue{
434449
publabels:Labels,

‎src/handler/grpc/mod.rs‎

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,14 @@ impl From<crate::service::promql::MetricsQueryRequest> for cluster_rpc::MetricsQ
5050
end: req.end,
5151
step: req.step,
5252
query_exemplars: req.query_exemplars,
53+
query_data:false,
54+
label_selector:vec![],
5355
};
5456

57+
let trace_id = ider::generate_trace_id();
5558
let job = cluster_rpc::Job{
56-
trace_id:ider::generate_trace_id(),
57-
job:"".to_string(),
59+
trace_id:trace_id.to_string(),
60+
job:trace_id[..7].to_string(),
5861
stage:0,
5962
partition:0,
6063
};
@@ -64,8 +67,12 @@ impl From<crate::service::promql::MetricsQueryRequest> for cluster_rpc::MetricsQ
6467
org_id:"".to_string(),
6568
need_wal:false,
6669
query:Some(req_query),
67-
timeout:0,
6870
use_cache: req.use_cache.unwrap_or(true),
71+
timeout:0,
72+
search_event_type: req.search_type.map(|v| v.to_string()).unwrap_or_default(),
73+
regions: req.regions.clone(),
74+
clusters: req.clusters.clone(),
75+
is_super_cluster:false,
6976
}
7077
}
7178
}

‎src/handler/grpc/request/metrics/querier.rs‎

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,14 @@
1313
// You should have received a copy of the GNU Affero General Public License
1414
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1515

16+
use std::pin::Pin;
17+
1618
use config::{meta::stream::StreamType, metrics};
19+
use futures::Stream;
1720
use infra::errors;
1821
use opentelemetry::global;
22+
use tokio::sync::mpsc;
23+
use tokio_stream::wrappers::ReceiverStream;
1924
use tonic::{Request,Response,Status};
2025
use tracing_opentelemetry::OpenTelemetrySpanExt;
2126

@@ -31,6 +36,9 @@ pub struct MetricsQuerier;
3136

3237
#[tonic::async_trait]
3338
implMetricsforMetricsQuerier{
39+
typeDataStream =
40+
Pin<Box<dynStream<Item =Result<MetricsQueryResponse,Status>> +Send +'static>>;
41+
3442
#[tracing::instrument(name ="grpc:metrics:query", skip_all, fields(org_id = req.get_ref().org_id))]
3543
asyncfnquery(
3644
&self,
@@ -70,4 +78,37 @@ impl Metrics for MetricsQuerier {
7078

7179
Ok(Response::new(result))
7280
}
81+
82+
#[tracing::instrument(name ="grpc:metrics:data", skip_all, fields(org_id = req.get_ref().org_id))]
83+
asyncfndata(
84+
&self,
85+
req:Request<MetricsQueryRequest>,
86+
) ->Result<Response<Self::DataStream>,Status>{
87+
let cap = std::cmp::max(2, config::get_config().limit.cpu_num);
88+
let(tx, rx) = mpsc::channel::<Result<MetricsQueryResponse,Status>>(cap);
89+
letmut req:MetricsQueryRequest = req.into_inner();
90+
req.query.as_mut().unwrap().query_data =true;
91+
92+
log::info!(
93+
"[trace_id {}] promql->data->grpc: org_id: {}, use_cache: {}, time_range: [{},{}), step: {}, query: {}, label_selector: {:?}",
94+
req.job.as_ref().unwrap().trace_id,
95+
req.org_id,
96+
req.use_cache,
97+
req.query.as_ref().unwrap().start,
98+
req.query.as_ref().unwrap().end,
99+
req.query.as_ref().unwrap().step,
100+
req.query.as_ref().unwrap().query,
101+
req.query.as_ref().unwrap().label_selector,
102+
);
103+
104+
// spawn a task to push streaming responses
105+
tokio::task::spawn(asyncmove{
106+
ifletErr(e) =crate::service::promql::search::grpc::data(&req, tx).await{
107+
log::error!("[gRPC:metrics:data] get data error: req:{req:?}, error:{e:?}")
108+
}
109+
});
110+
111+
let out_stream =ReceiverStream::new(rx);
112+
Ok(Response::new(Box::pin(out_stream)asSelf::DataStream))
113+
}
73114
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp