diff --git a/Cargo.lock b/Cargo.lock index 81f5c93207..04f6ea09d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1480,6 +1480,7 @@ dependencies = [ "linkerd-app-admin", "linkerd-app-core", "linkerd-app-test", + "linkerd-http-body-compat", "linkerd-meshtls", "linkerd-metrics", "linkerd-tracing", diff --git a/linkerd/app/integration/Cargo.toml b/linkerd/app/integration/Cargo.toml index 6b615ede73..249a8a8dde 100644 --- a/linkerd/app/integration/Cargo.toml +++ b/linkerd/app/integration/Cargo.toml @@ -63,6 +63,7 @@ flate2 = { version = "1", default-features = false, features = [ ] } # Log streaming isn't enabled by default globally, but we want to test it. linkerd-app-admin = { path = "../admin", features = ["log-streaming"] } +linkerd-http-body-compat = { path = "../../http/body-compat" } # No code from this crate is actually used; only necessary to enable the Rustls # implementation. linkerd-meshtls = { path = "../../meshtls", features = ["rustls"] } diff --git a/linkerd/app/integration/src/tests/profiles.rs b/linkerd/app/integration/src/tests/profiles.rs index 2972930d28..c0e90e040f 100644 --- a/linkerd/app/integration/src/tests/profiles.rs +++ b/linkerd/app/integration/src/tests/profiles.rs @@ -682,7 +682,9 @@ mod grpc_retry { assert_eq!(res.status(), 200); assert_eq!(res.headers().get(&GRPC_STATUS), None); - let mut body = res.into_body(); + let mut body = res + .map(linkerd_http_body_compat::ForwardCompatibleBody::new) + .into_body(); let trailers = trailers(&mut body).await; assert_eq!(trailers.get(&GRPC_STATUS), Some(&GRPC_STATUS_OK)); assert_eq!(retries.load(Ordering::Relaxed), 2); @@ -726,7 +728,9 @@ mod grpc_retry { assert_eq!(res.status(), 200); assert_eq!(res.headers().get(&GRPC_STATUS), None); - let mut body = res.into_body(); + let mut body = res + .map(linkerd_http_body_compat::ForwardCompatibleBody::new) + .into_body(); let data = data(&mut body).await; assert_eq!(data, Bytes::from("hello world")); @@ -777,7 +781,9 @@ mod grpc_retry { assert_eq!(res.status(), 200); assert_eq!(res.headers().get(&GRPC_STATUS), None); - let mut body = res.into_body(); + let mut body = res + .map(linkerd_http_body_compat::ForwardCompatibleBody::new) + .into_body(); let frame1 = data(&mut body).await; assert_eq!(frame1, Bytes::from("hello")); @@ -790,21 +796,38 @@ mod grpc_retry { assert_eq!(retries.load(Ordering::Relaxed), 1); } - async fn data(body: &mut hyper::Body) -> Bytes { + async fn data(body: &mut linkerd_http_body_compat::ForwardCompatibleBody) -> B::Data + where + B: http_body::Body + Unpin, + B::Data: std::fmt::Debug, + B::Error: std::fmt::Debug, + { let data = body - .data() + .frame() .await - .expect("body data frame must not be eaten") - .unwrap(); + .expect("a result") + .expect("a frame") + .into_data() + .expect("a chunk of data"); tracing::info!(?data); data } - async fn trailers(body: &mut hyper::Body) -> http::HeaderMap { + + async fn trailers( + body: &mut linkerd_http_body_compat::ForwardCompatibleBody, + ) -> http::HeaderMap + where + B: http_body::Body + Unpin, + B::Error: std::fmt::Debug, + { let trailers = body - .trailers() + .frame() .await - .expect("trailers future should not fail") - .expect("response should have trailers"); + .expect("a result") + .expect("a frame") + .into_trailers() + .ok() + .expect("a trailers frame"); tracing::info!(?trailers); trailers }