Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Add response wrapper type toTensorzeroHttpClient#5253

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Open
Aaron1011 wants to merge2 commits intomain
base:main
Choose a base branch
Loading
fromaaron/response-wrapper
Open
Show file tree
Hide file tree
Changes from1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
NextNext commit
Add response wrapper type toTensorzeroHttpClient
Previously, calling the `send` method would return a`reqwest::Response` and drop our `LimitedClientTicket`.This will incorrectly signal that we now have additionalspace in the pool (even though the HTTP connection is stillneeded to read the response body). It also will interfere withstoring a `Span` to track the total HTTP duration (for the upcoming'tensorzero_overhead' metric)I've introduced a new wrapper type. It's similar to`TensorzeroRequestBuilder` - we hold on to our `LimitedClientTicket`,and expose accessor methods that forward to the underlying type.
  • Loading branch information
@Aaron1011
Aaron1011 committedDec 17, 2025
commit14b9fb66eb0b2f6b1e72796c984196105f5d3b01
5 changes: 3 additions & 2 deletionstensorzero-core/src/client/mod.rs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -6,6 +6,7 @@ use crate::config::snapshot::ConfigSnapshot;
use crate::config::unwritten::UnwrittenConfig;
use crate::endpoints::openai_compatible::types::embeddings::OpenAICompatibleEmbeddingParams;
use crate::endpoints::openai_compatible::types::embeddings::OpenAIEmbeddingResponse;
use crate::http::TensorzeroResponseWrapper;
use crate::http::{DEFAULT_HTTP_CLIENT_TIMEOUT, TensorzeroHttpClient, TensorzeroRequestBuilder};
use crate::inference::types::stored_input::StoragePathResolver;
use crate::utils::gateway::DropWrapper;
Expand DownExpand Up@@ -83,8 +84,8 @@ pub struct HTTPGateway {
impl HTTPGateway {
pub async fn check_http_response(
&self,
resp: Result<reqwest::Response, reqwest::Error>,
) -> Result<reqwest::Response, TensorZeroError> {
resp: Result<TensorzeroResponseWrapper, reqwest::Error>,
) -> Result<TensorzeroResponseWrapper, TensorZeroError> {
let resp = resp.map_err(|e| {
if e.is_timeout() {
TensorZeroError::RequestTimeout
Expand Down
110 changes: 102 additions & 8 deletionstensorzero-core/src/http.rs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
use chrono::Duration;
use http_body::{Frame, SizeHint};
use once_cell::sync::OnceCell;
use opentelemetry_http::HeaderInjector;
use std::{
Expand All@@ -13,7 +14,7 @@ use tracing::Span;
use tracing_futures::Instrument;

use futures::Stream;
use http::{HeaderMap, HeaderName, HeaderValue};
use http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
use pin_project::pin_project;
use reqwest::{Body, Response};
use reqwest::{Client, IntoUrl, NoProxy, Proxy, RequestBuilder};
Expand DownExpand Up@@ -318,6 +319,94 @@ impl Stream for TensorZeroEventSource {
}
}

/// A wrapper type around `reqwest::Response`
/// We use this to extend the lifetime of a `Span`,
/// and drop it when the response is fully consumed
/// (e.g. after `text`) is called.
///
// At the moment, we don't actually store a Span - this will
// be added in a future PR
pub struct TensorzeroResponseWrapper {
/// IMPORTANT - do *not* directly expose this field.
/// Instead, add accessor methods to `TensorzeroResponseWrapper`,
/// so that the caller is forced to hold on to the entire `TensorzeroResponseWrapper`
/// until it gets 'consumed' (e.g. calling `text`)
response: Response,
/// We hold onto a ticket, since holding a `Response` still uses a logical HTTP connection
/// (since the body will not be read until `text` is called)
ticket: LimitedClientTicket<'static>,
}

#[pin_project]
/// A wrapper over a `reqwest::Body` that holds on to a `LimitedClientTicket`
/// We use this to extend the lifetime of our ticket until the body is fully consumed
/// (since the underlying HTTP connection is still in use as long as we're reading data from the body)
pub struct TensorzeroBodyWrapper {
#[pin]
body: reqwest::Body,
ticket: LimitedClientTicket<'static>,
}

#[deny(clippy::missing_trait_methods)]
impl http_body::Body for TensorzeroBodyWrapper {
type Data = <reqwest::Body as http_body::Body>::Data;
type Error = <reqwest::Body as http_body::Body>::Error;

fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
self.project().body.poll_frame(cx)
}

fn is_end_stream(&self) -> bool {
self.body.is_end_stream()
}

fn size_hint(&self) -> SizeHint {
self.body.size_hint()
}
}

impl TensorzeroResponseWrapper {
pub fn status(&self) -> StatusCode {
self.response.status()
}

pub fn headers(&self) -> &HeaderMap {
self.response.headers()
}

pub fn error_for_status_ref(&self) -> Result<&Self, reqwest::Error> {
self.response.error_for_status_ref()?;
Ok(self)
}

// These methods consume the `TensorzeroResponseWrapper`,
// and drop the ticket. They do *not* give the caller ownership of `self.response`
pub async fn text(self) -> Result<String, reqwest::Error> {
self.response.text().await
}

pub async fn json<T: DeserializeOwned>(self) -> Result<T, reqwest::Error> {
self.response.json().await
}

pub async fn bytes(self) -> Result<bytes::Bytes, reqwest::Error> {
self.response.bytes().await
}

/// Converts this `TensorzeroResponseWrapper` into an `http::Response<TensorzeroBodyWrapper>`.
/// preserving our `LimitedClientTicket` until the body is fully consumed
pub fn into_http_response(self) -> http::Response<TensorzeroBodyWrapper> {
let resp: http::Response<reqwest::Body> = self.response.into();
resp.map(|body| TensorzeroBodyWrapper {
body,
ticket: self.ticket,
})
}
}

// Workaround for https://github.com/hyperium/h2/issues/763
// The 'h2' crate creates a long-lived span for outgoing HTTP connections.
// Due to connection pooling, these spans can live for a long time -
Expand DownExpand Up@@ -432,14 +521,19 @@ impl<'a> TensorzeroRequestBuilder<'a> {
})
}

// This method takes an owned `self`, so we'll drop `self.ticket` when this method
// returns (after we've gotten a response)
pub async fn send(mut self) -> Result<Response, reqwest::Error> {
// This method preserves our ticket (by storing it in the `TensorzeroResponseWrapper`),
// since holding a `Reponse` still requires an active connection (since the
// body will not be read until `text()` is called)
pub async fn send(mut self) -> Result<TensorzeroResponseWrapper, reqwest::Error> {
self = self.with_otlp_headers();
self.builder
.send()
.instrument(tensorzero_h2_workaround_span())
.await
Ok(TensorzeroResponseWrapper {
response: self
.builder
.send()
.instrument(tensorzero_h2_workaround_span())
.await?,
ticket: self.ticket.into_owned(),
})
}

pub async fn send_and_parse_json<T: DeserializeOwned>(
Expand Down
2 changes: 1 addition & 1 deletiontensorzero-core/src/providers/aws_http_client.rs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -172,7 +172,7 @@ impl HttpConnector for ReqwestConnector {
let reqwest_response = req_builder.send().await.map_err(CallError::from)?;

// Converts from a reqwest Response into an http::Response<SdkBody>.
let (parts, body) =http::Response::from(reqwest_response).into_parts();
let (parts, body) = reqwest_response.into_http_response().into_parts();
let http_response = http::Response::from_parts(parts, SdkBody::from_body_1_x(body));

Ok(
Expand Down
4 changes: 2 additions & 2 deletionstensorzero-core/src/providers/helpers.rs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -8,7 +8,7 @@ use uuid::Uuid;

use crate::{
error::{DisplayOrDebugGateway, Error, ErrorDetails, IMPOSSIBLE_ERROR_MESSAGE},
http::{TensorZeroEventSource, TensorzeroRequestBuilder},
http::{TensorZeroEventSource, TensorzeroRequestBuilder, TensorzeroResponseWrapper},
inference::types::{
ProviderInferenceResponseChunk,
batch::{ProviderBatchInferenceOutput, ProviderBatchInferenceResponse},
Expand DownExpand Up@@ -172,7 +172,7 @@ pub async fn inject_extra_request_data_and_send(
model_name: &str,
mut body: serde_json::Value,
builder: TensorzeroRequestBuilder<'_>,
) -> Result<(reqwest::Response, String), Error> {
) -> Result<(TensorzeroResponseWrapper, String), Error> {
let headers = inject_extra_request_data(
config,
extra_headers_config,
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp