Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite23e8c5

Browse files
committed
feat: migrate service graph to SQL-based daemon architecture
Replace in-memory EdgeStore with daemon-based architecture that periodicallyqueries trace streams using SQL aggregation in DataFusion.Architecture:- Daemon runs in compactor job every 1 hour (default, configurable)- Queries last 60 minutes of trace data (configurable window)- Uses DataFusion SQL with CTE for efficient aggregation- Calculates p50, p95, p99 percentiles using approx functions- Writes aggregated edges to internal `_o2_service_graph` stream- Zero impact on trace ingestion performanceKey Changes:- Add `processor.rs` with SQL-based daemon that queries trace streams- Add `aggregator.rs` to write SQL results to `_o2_service_graph` stream (uses json! macro)- Refactor `api.rs` to query pre-aggregated data from stream- Move business logic to enterprise repo (`build_topology()`, `span_to_graph_span()`)- Remove ALL inline processing during trace ingestion- Delete in-memory EdgeStore, worker threads, and buffering code- Feature-gated for enterprise builds with non-enterprise stubsSQL Query:- CTE extracts client/server from span_kind ('2'=SERVER, '3'=CLIENT)- Aggregates by (client, server) with COUNT, percentiles, error rate- Filters: span_kind IN ('2','3') AND peer_service IS NOT NULLConfiguration:- O2_SERVICE_GRAPH_ENABLED (default: false)- O2_SERVICE_GRAPH_PROCESSING_INTERVAL_SECS (default: 3600) - Daemon run frequency- O2_SERVICE_GRAPH_QUERY_TIME_RANGE_MINUTES (default: 60) - Query window sizeTechnical Details:- Stream name: `_o2_service_graph` (internal stream)- Edge timestamp: Uses query end time for API compatibility- Span kinds: '2' (SERVER), '3' (CLIENT) as VARCHAR in parquet- Percentiles: approx_median(), approx_percentile_cont(0.95), approx_percentile_cont(0.99)
1 parent4ecc550 commite23e8c5

File tree

22 files changed

+2858
-588
lines changed

22 files changed

+2858
-588
lines changed

‎Cargo.lock‎

Lines changed: 1993 additions & 179 deletions
Some generated files are not rendered by default. Learn more aboutcustomizing how changed files appear on GitHub.

‎Cargo.toml‎

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,21 @@ publish = false
2222
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
2323

2424
[features]
25-
default = []
26-
enterprise = []
27-
cloud = []
25+
default = ["enterprise"]
26+
enterprise = [
27+
"dep:o2_enterprise",
28+
"dep:o2_openfga",
29+
"dep:o2_dex",
30+
"dep:o2_ratelimit",
31+
"o2_enterprise/vectorscan",
32+
]
33+
enterprise-no-vectorscan = [
34+
"dep:o2_enterprise",
35+
"dep:o2_openfga",
36+
"dep:o2_dex",
37+
"dep:o2_ratelimit",
38+
]
39+
cloud = ["infra/cloud","enterprise","o2_enterprise/cloud"]
2840
mimalloc = ["dep:mimalloc"]
2941
jemalloc = ["dep:tikv-jemallocator"]
3042
profiling = ["dep:pprof"]
@@ -110,6 +122,10 @@ maxminddb = "0.26"
110122
md5.workspace =true
111123
memchr.workspace =true
112124
mimalloc = {version ="0.1.43",default-features =false,optional =true }
125+
o2_dex = {version ="0.1.0",path ="../o2-enterprise/o2_dex",optional =true }
126+
o2_enterprise = {version ="0.1.0",path ="../o2-enterprise/o2_enterprise",optional =true,default-features =false }
127+
o2_openfga = {version ="0.1.0",path ="../o2-enterprise/o2_openfga",optional =true }
128+
o2_ratelimit = {version ="0.1.0",path ="../o2-enterprise/o2_ratelimit",optional =true }
113129
once_cell.workspace =true
114130
opentelemetry.workspace =true
115131
opentelemetry_sdk.workspace =true

‎src/config/src/meta/mod.rs‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub mod promql;
3434
pubmod ratelimit;
3535
pubmod search;
3636
pubmod self_reporting;
37+
pubmod service_graph;
3738
pubmod short_url;
3839
pubmod sql;
3940
pubmod stream;
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Copyright 2025 OpenObserve Inc.
2+
//
3+
// This program is free software: you can redistribute it and/or modify
4+
// it under the terms of the GNU Affero General Public License as published by
5+
// the Free Software Foundation, either version 3 of the License, or
6+
// (at your option) any later version.
7+
//
8+
// This program is distributed in the hope that it will be useful
9+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
// GNU Affero General Public License for more details.
12+
//
13+
// You should have received a copy of the GNU Affero General Public License
14+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
15+
16+
use serde::{Deserialize,Serialize};
17+
use utoipa::ToSchema;
18+
19+
/// Service Graph topology snapshot record
20+
/// Stored in ServiceGraph stream for historical queries
21+
#[derive(Clone,Debug,Serialize,Deserialize,ToSchema)]
22+
pubstructServiceGraphSnapshot{
23+
/// Snapshot timestamp (microseconds since epoch)
24+
#[serde(rename ="_timestamp")]
25+
pubtimestamp:i64,
26+
27+
/// Organization identifier
28+
puborg_id:String,
29+
30+
/// Source trace stream name
31+
pubtrace_stream_name:String,
32+
33+
/// Client service name (initiator)
34+
pubclient_service:String,
35+
36+
/// Server service name (receiver)
37+
pubserver_service:String,
38+
39+
/// Connection type: "standard", "database", "messaging", "virtual"
40+
pubconnection_type:String,
41+
42+
/// Total requests (cumulative counter)
43+
pubtotal_requests:u64,
44+
45+
/// Failed requests (cumulative counter)
46+
pubfailed_requests:u64,
47+
48+
/// Error rate percentage (0-100)
49+
puberror_rate:f64,
50+
51+
/// P50 latency in nanoseconds
52+
pubp50_latency_ns:u64,
53+
54+
/// P95 latency in nanoseconds
55+
pubp95_latency_ns:u64,
56+
57+
/// P99 latency in nanoseconds
58+
pubp99_latency_ns:u64,
59+
60+
/// First time this edge was seen (microseconds)
61+
pubfirst_seen:i64,
62+
63+
/// Last time this edge was seen (microseconds)
64+
publast_seen:i64,
65+
66+
/// Snapshot version (monotonic counter for deduplication)
67+
pubsnapshot_version:u64,
68+
}
69+
70+
implServiceGraphSnapshot{
71+
/// Convert to JSON value for stream ingestion
72+
pubfnto_json(&self) -> serde_json::Value{
73+
serde_json::to_value(self).expect("Failed to serialize ServiceGraphSnapshot")
74+
}
75+
}
76+
77+
/// Graph format for frontend visualization
78+
#[derive(Clone,Debug,Serialize,Deserialize,ToSchema)]
79+
pubstructServiceGraphData{
80+
pubnodes:Vec<ServiceNode>,
81+
pubedges:Vec<ServiceEdge>,
82+
}
83+
84+
/// Node in service graph
85+
#[derive(Clone,Debug,Serialize,Deserialize,ToSchema)]
86+
pubstructServiceNode{
87+
pubid:String,
88+
publabel:String,
89+
pubrequests:u64,
90+
puberrors:u64,
91+
puberror_rate:f64,
92+
}
93+
94+
/// Edge in service graph
95+
#[derive(Clone,Debug,Serialize,Deserialize,ToSchema)]
96+
pubstructServiceEdge{
97+
pubfrom:String,
98+
pubto:String,
99+
pubtotal_requests:u64,
100+
pubfailed_requests:u64,
101+
puberror_rate:f64,
102+
pubp50_latency_ns:u64,
103+
pubp95_latency_ns:u64,
104+
pubp99_latency_ns:u64,
105+
pubconnection_type:String,
106+
}

‎src/config/src/meta/stream.rs‎

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,11 @@ impl PartialEq for DataField {
106106
}
107107
}
108108

109-
pubconstALL_STREAM_TYPES:[StreamType;7] =[
109+
pubconstALL_STREAM_TYPES:[StreamType;8] =[
110110
StreamType::Logs,
111111
StreamType::Metrics,
112112
StreamType::Traces,
113+
StreamType::ServiceGraph,
113114
StreamType::EnrichmentTables,
114115
StreamType::Filelist,
115116
StreamType::Metadata,
@@ -123,6 +124,8 @@ pub enum StreamType {
123124
Logs,
124125
Metrics,
125126
Traces,
127+
#[serde(rename ="service_graph")]
128+
ServiceGraph,
126129
#[serde(rename ="enrichment_tables")]
127130
EnrichmentTables,
128131
#[serde(rename ="file_list")]
@@ -151,6 +154,7 @@ impl StreamType {
151154
StreamType::Logs =>"logs",
152155
StreamType::Metrics =>"metrics",
153156
StreamType::Traces =>"traces",
157+
StreamType::ServiceGraph =>"service_graph",
154158
StreamType::EnrichmentTables =>"enrichment_tables",
155159
StreamType::Filelist =>"file_list",
156160
StreamType::Metadata =>"metadata",
@@ -165,6 +169,7 @@ impl From<&str> for StreamType {
165169
"logs" =>StreamType::Logs,
166170
"metrics" =>StreamType::Metrics,
167171
"traces" =>StreamType::Traces,
172+
"service_graph" =>StreamType::ServiceGraph,
168173
"enrichment_tables" |"enrich" =>StreamType::EnrichmentTables,
169174
"file_list" =>StreamType::Filelist,
170175
"metadata" =>StreamType::Metadata,
@@ -186,6 +191,7 @@ impl std::fmt::Display for StreamType {
186191
StreamType::Logs =>write!(f,"logs"),
187192
StreamType::Metrics =>write!(f,"metrics"),
188193
StreamType::Traces =>write!(f,"traces"),
194+
StreamType::ServiceGraph =>write!(f,"service_graph"),
189195
StreamType::EnrichmentTables =>write!(f,"enrichment_tables"),
190196
StreamType::Filelist =>write!(f,"file_list"),
191197
StreamType::Metadata =>write!(f,"metadata"),

‎src/handler/grpc/request/ingest.rs‎

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,27 @@ impl Ingest for Ingester {
155155
}
156156
}
157157
}
158+
StreamType::ServiceGraph =>{
159+
// Service graph edges - use same pattern as Logs
160+
let log_ingestion_type = req.ingestion_type.unwrap_or_default();
161+
let data = bytes::Bytes::from(in_data.data);
162+
matchcreate_log_ingestion_req(log_ingestion_type,&data){
163+
Err(e) =>Err(e),
164+
Ok(ingestion_req) =>crate::service::logs::ingest::ingest(
165+
0,
166+
&org_id,
167+
&stream_name,
168+
ingestion_req,
169+
"",
170+
None,
171+
is_derived,
172+
)
173+
.await
174+
.map_or_else(Err, |_|Ok(())),
175+
}
176+
}
158177
_ =>Err(Error::IngestionError(
159-
"InternalgPRC ingestion service currently only supports LogsandEnrichmentTables"
178+
"InternalgRPC ingestion service currently only supports Logs, EnrichmentTables,andServiceGraph"
160179
.to_string(),
161180
)),
162181
};

‎src/handler/http/models/alerts/mod.rs‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,9 @@ impl From<meta_stream::StreamType> for StreamType {
454454
meta_stream::StreamType::Logs =>Self::Logs,
455455
meta_stream::StreamType::Metrics =>Self::Metrics,
456456
meta_stream::StreamType::Traces =>Self::Traces,
457+
meta_stream::StreamType::ServiceGraph =>Self::Metadata,// ServiceGraph not
458+
// alertable, map to
459+
// Metadata
457460
meta_stream::StreamType::EnrichmentTables =>Self::EnrichmentTables,
458461
meta_stream::StreamType::Filelist =>Self::Filelist,
459462
meta_stream::StreamType::Metadata =>Self::Metadata,

‎src/handler/http/request/traces/mod.rs‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use tracing::{Instrument, Span};
2929
#[cfg(feature ="cloud")]
3030
usecrate::service::ingestion::check_ingestion_allowed;
3131
// Re-export service graph API handlers
32-
pubusecrate::service::traces::service_graph::{self,get_service_graph_metrics, get_store_stats};
32+
pubusecrate::service::traces::service_graph::{self,get_current_topology};
3333
usecrate::{
3434
common::{
3535
meta::{self, http::HttpResponseasMetaHttpResponse},

‎src/handler/http/router/mod.rs‎

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -633,8 +633,7 @@ pub fn get_service_routes(svc: &mut web::ServiceConfig) {
633633
.service(domain_management::set_domain_management_config)
634634
.service(license::get_license_info)
635635
.service(license::store_license)
636-
.service(traces::get_service_graph_metrics)
637-
.service(traces::get_store_stats)
636+
.service(traces::get_current_topology)
638637
.service(patterns::extract_patterns);
639638

640639
#[cfg(feature ="enterprise")]

‎src/handler/http/router/openapi.rs‎

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,6 @@ use crate::{common::meta, handler::http::request};
5252
request::logs::loki::loki_push,
5353
request::traces::traces_write,
5454
request::traces::get_latest_traces,
55-
crate::service::traces::service_graph::api::get_service_graph_metrics,
56-
crate::service::traces::service_graph::api::get_store_stats,
5755
request::metrics::ingest::json,
5856
request::promql::remote_write,
5957
request::promql::query_get,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp