Skip to content

Commit dab8e47

Browse files
authored
use http_body::Body (#92)
* implement bodies using http_body::Body * erase the Incoming type it only needs to be an internal implementation detail * complete has trailers too * switch up constructors * docs, fixes, minor tweaks * http server: more complex streaming example * add methods for stream splice, write_all that bypasses AsyncRead/AsyncWrite traits because the traits have ?Send bounds, but the actual AsyncInputStream and AsyncOutputStream are both Send, so when we bypass the traits Rust can prove that the Future for splicing is Send. Same with write_all: when used through AsyncWrite its ?Send, but when used on AsyncOutputStream the Future for write_all is Send. Together, these changes allow us to show Rust that the Body::send and http::Client::send futures are Send. * rename forward to copy_to (code review yosh) * remove all added async_trait stuff (unnecessary, and i dont want it) * no you cannot
1 parent 27a9d59 commit dab8e47

28 files changed

+1113
-1057
lines changed

Cargo.toml

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,13 @@ default = ["json"]
1717
json = ["dep:serde", "dep:serde_json"]
1818

1919
[dependencies]
20+
anyhow.workspace = true
2021
async-task.workspace = true
21-
futures-core.workspace = true
22+
async-trait.workspace = true
23+
bytes.workspace = true
24+
futures-lite.workspace = true
25+
http-body-util.workspace = true
26+
http-body.workspace = true
2227
http.workspace = true
2328
itoa.workspace = true
2429
pin-project-lite.workspace = true
@@ -33,7 +38,7 @@ serde_json = { workspace = true, optional = true }
3338
[dev-dependencies]
3439
anyhow.workspace = true
3540
clap.workspace = true
36-
futures-lite.workspace = true
41+
http-body-util.workspace = true
3742
futures-concurrency.workspace = true
3843
humantime.workspace = true
3944
serde = { workspace = true, features = ["derive"] }
@@ -65,6 +70,8 @@ authors = [
6570
[workspace.dependencies]
6671
anyhow = "1"
6772
async-task = "4.7"
73+
async-trait = "*"
74+
bytes = "1.10.1"
6875
cargo_metadata = "0.22"
6976
clap = { version = "4.5.26", features = ["derive"] }
7077
futures-core = "0.3.19"
@@ -73,6 +80,8 @@ futures-concurrency = "7.6"
7380
humantime = "2.1.0"
7481
heck = "0.5"
7582
http = "1.1"
83+
http-body = "1.0.1"
84+
http-body-util = "0.1.3"
7685
itoa = "1"
7786
pin-project-lite = "0.2.8"
7887
quote = "1.0"

examples/complex_http_client.rs

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
use anyhow::{anyhow, Result};
22
use clap::{ArgAction, Parser};
33
use std::str::FromStr;
4-
use wstd::http::{
5-
body::BodyForthcoming, Client, HeaderMap, HeaderName, HeaderValue, Method, Request, Uri,
6-
};
4+
use wstd::http::{Body, BodyExt, Client, HeaderMap, HeaderName, HeaderValue, Method, Request, Uri};
5+
use wstd::io::AsyncWrite;
76

87
/// Complex HTTP client
98
///
@@ -86,23 +85,29 @@ async fn main() -> Result<()> {
8685
trailers.insert(HeaderName::from_str(key)?, HeaderValue::from_str(value)?);
8786
}
8887

89-
// Send the request.
90-
91-
let request = request.body(BodyForthcoming)?;
88+
let body = if args.body {
89+
Body::from_try_stream(wstd::io::stdin().into_inner().into_stream()).into_boxed_body()
90+
} else {
91+
Body::empty().into_boxed_body()
92+
};
93+
let t = trailers.clone();
94+
let body = body.with_trailers(async move {
95+
if t.is_empty() {
96+
None
97+
} else {
98+
Some(Ok(t))
99+
}
100+
});
101+
let request = request.body(Body::from_http_body(body))?;
92102

103+
// Send the request.
93104
eprintln!("> {} / {:?}", request.method(), request.version());
94105
for (key, value) in request.headers().iter() {
95106
let value = String::from_utf8_lossy(value.as_bytes());
96107
eprintln!("> {key}: {value}");
97108
}
98109

99-
let (mut outgoing_body, response) = client.start_request(request).await?;
100-
101-
if args.body {
102-
wstd::io::copy(wstd::io::stdin(), &mut outgoing_body).await?;
103-
} else {
104-
wstd::io::copy(wstd::io::empty(), &mut outgoing_body).await?;
105-
}
110+
let response = client.send(request).await?;
106111

107112
if !trailers.is_empty() {
108113
eprintln!("...");
@@ -112,10 +117,6 @@ async fn main() -> Result<()> {
112117
eprintln!("> {key}: {value}");
113118
}
114119

115-
Client::finish(outgoing_body, Some(trailers))?;
116-
117-
let response = response.await?;
118-
119120
// Print the response.
120121

121122
eprintln!("< {:?} {}", response.version(), response.status());
@@ -124,10 +125,12 @@ async fn main() -> Result<()> {
124125
eprintln!("< {key}: {value}");
125126
}
126127

127-
let mut body = response.into_body();
128-
wstd::io::copy(&mut body, wstd::io::stdout()).await?;
128+
let body = response.into_body().into_boxed_body().collect().await?;
129+
let trailers = body.trailers().cloned();
130+
wstd::io::stdout()
131+
.write_all(body.to_bytes().as_ref())
132+
.await?;
129133

130-
let trailers = body.finish().await?;
131134
if let Some(trailers) = trailers {
132135
for (key, value) in trailers.iter() {
133136
let value = String::from_utf8_lossy(value.as_bytes());

examples/http_client.rs

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
use anyhow::{anyhow, Result};
22
use clap::{ArgAction, Parser};
3-
use wstd::http::{
4-
body::{IncomingBody, StreamedBody},
5-
request::Builder,
6-
Body, Client, Method, Request, Response, Uri,
7-
};
3+
use wstd::http::{Body, BodyExt, Client, Method, Request, Uri};
4+
use wstd::io::AsyncWrite;
85

96
/// Simple HTTP client
107
///
@@ -75,39 +72,35 @@ async fn main() -> Result<()> {
7572

7673
// Send the request.
7774

78-
async fn send_request<B: Body>(
79-
client: &Client,
80-
request: Builder,
81-
body: B,
82-
) -> Result<Response<IncomingBody>> {
83-
let request = request.body(body)?;
75+
let body = if args.body {
76+
Body::from_try_stream(wstd::io::stdin().into_inner().into_stream())
77+
} else {
78+
Body::empty()
79+
};
8480

85-
eprintln!("> {} / {:?}", request.method(), request.version());
86-
for (key, value) in request.headers().iter() {
87-
let value = String::from_utf8_lossy(value.as_bytes());
88-
eprintln!("> {key}: {value}");
89-
}
81+
let request = request.body(body)?;
9082

91-
Ok(client.send(request).await?)
83+
eprintln!("> {} / {:?}", request.method(), request.version());
84+
for (key, value) in request.headers().iter() {
85+
let value = String::from_utf8_lossy(value.as_bytes());
86+
eprintln!("> {key}: {value}");
9287
}
93-
let response = if args.body {
94-
send_request(&client, request, StreamedBody::new(wstd::io::stdin())).await
95-
} else {
96-
send_request(&client, request, wstd::io::empty()).await
97-
}?;
9888

99-
// Print the response.
89+
let response = client.send(request).await?;
10090

91+
// Print the response.
10192
eprintln!("< {:?} {}", response.version(), response.status());
10293
for (key, value) in response.headers().iter() {
10394
let value = String::from_utf8_lossy(value.as_bytes());
10495
eprintln!("< {key}: {value}");
10596
}
10697

107-
let mut body = response.into_body();
108-
wstd::io::copy(&mut body, wstd::io::stdout()).await?;
98+
let body = response.into_body().into_boxed_body().collect().await?;
99+
let trailers = body.trailers().cloned();
100+
wstd::io::stdout()
101+
.write_all(body.to_bytes().as_ref())
102+
.await?;
109103

110-
let trailers = body.finish().await?;
111104
if let Some(trailers) = trailers {
112105
for (key, value) in trailers.iter() {
113106
let value = String::from_utf8_lossy(value.as_bytes());

examples/http_server.rs

Lines changed: 110 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,38 @@
1-
use wstd::http::body::{BodyForthcoming, IncomingBody, OutgoingBody};
2-
use wstd::http::server::{Finished, Responder};
3-
use wstd::http::{IntoBody, Request, Response, StatusCode};
4-
use wstd::io::{copy, empty, AsyncWrite};
1+
use anyhow::{Context, Result};
2+
use futures_lite::stream::{once_future, unfold};
3+
use http_body_util::{BodyExt, StreamBody};
4+
use std::convert::Infallible;
5+
use wstd::http::body::{Body, Bytes, Frame};
6+
use wstd::http::{Error, HeaderMap, Request, Response, StatusCode};
57
use wstd::time::{Duration, Instant};
68

79
#[wstd::http_server]
8-
async fn main(request: Request<IncomingBody>, responder: Responder) -> Finished {
9-
match request.uri().path_and_query().unwrap().as_str() {
10-
"/wait" => http_wait(request, responder).await,
11-
"/echo" => http_echo(request, responder).await,
12-
"/echo-headers" => http_echo_headers(request, responder).await,
13-
"/echo-trailers" => http_echo_trailers(request, responder).await,
14-
"/fail" => http_fail(request, responder).await,
15-
"/bigfail" => http_bigfail(request, responder).await,
16-
"/" => http_home(request, responder).await,
17-
_ => http_not_found(request, responder).await,
10+
async fn main(request: Request<Body>) -> Result<Response<Body>, Error> {
11+
let path = request.uri().path_and_query().unwrap().as_str();
12+
println!("serving {path}");
13+
match path {
14+
"/" => http_home(request).await,
15+
"/wait-response" => http_wait_response(request).await,
16+
"/wait-body" => http_wait_body(request).await,
17+
"/stream-body" => http_stream_body(request).await,
18+
"/echo" => http_echo(request).await,
19+
"/echo-headers" => http_echo_headers(request).await,
20+
"/echo-trailers" => http_echo_trailers(request).await,
21+
"/response-status" => http_response_status(request).await,
22+
"/response-fail" => http_response_fail(request).await,
23+
"/response-body-fail" => http_body_fail(request).await,
24+
_ => http_not_found(request).await,
1825
}
1926
}
2027

21-
async fn http_home(_request: Request<IncomingBody>, responder: Responder) -> Finished {
28+
async fn http_home(_request: Request<Body>) -> Result<Response<Body>> {
2229
// To send a single string as the response body, use `Responder::respond`.
23-
responder
24-
.respond(Response::new("Hello, wasi:http/proxy world!\n".into_body()))
25-
.await
30+
Ok(Response::new(
31+
"Hello, wasi:http/proxy world!\n".to_owned().into(),
32+
))
2633
}
2734

28-
async fn http_wait(_request: Request<IncomingBody>, responder: Responder) -> Finished {
35+
async fn http_wait_response(_request: Request<Body>) -> Result<Response<Body>> {
2936
// Get the time now
3037
let now = Instant::now();
3138

@@ -35,60 +42,107 @@ async fn http_wait(_request: Request<IncomingBody>, responder: Responder) -> Fin
3542
// Compute how long we slept for.
3643
let elapsed = Instant::now().duration_since(now).as_millis();
3744

38-
// To stream data to the response body, use `Responder::start_response`.
39-
let mut body = responder.start_response(Response::new(BodyForthcoming));
40-
let result = body
41-
.write_all(format!("slept for {elapsed} millis\n").as_bytes())
42-
.await;
43-
Finished::finish(body, result, None)
45+
Ok(Response::new(
46+
format!("slept for {elapsed} millis\n").into(),
47+
))
4448
}
4549

46-
async fn http_echo(mut request: Request<IncomingBody>, responder: Responder) -> Finished {
47-
// Stream data from the request body to the response body.
48-
let mut body = responder.start_response(Response::new(BodyForthcoming));
49-
let result = copy(request.body_mut(), &mut body).await;
50-
Finished::finish(body, result, None)
51-
}
50+
async fn http_wait_body(_request: Request<Body>) -> Result<Response<Body>> {
51+
// Get the time now
52+
let now = Instant::now();
53+
54+
let body = async move {
55+
// Sleep for one second.
56+
wstd::task::sleep(Duration::from_secs(1)).await;
57+
58+
// Compute how long we slept for.
59+
let elapsed = Instant::now().duration_since(now).as_millis();
60+
Ok::<_, Infallible>(Bytes::from(format!("slept for {elapsed} millis\n")))
61+
};
5262

53-
async fn http_fail(_request: Request<IncomingBody>, responder: Responder) -> Finished {
54-
let body = responder.start_response(Response::new(BodyForthcoming));
55-
Finished::fail(body)
63+
Ok(Response::new(Body::from_try_stream(once_future(body))))
5664
}
5765

58-
async fn http_bigfail(_request: Request<IncomingBody>, responder: Responder) -> Finished {
59-
async fn write_body(body: &mut OutgoingBody) -> wstd::io::Result<()> {
60-
for _ in 0..0x10 {
61-
body.write_all("big big big big\n".as_bytes()).await?;
66+
async fn http_stream_body(_request: Request<Body>) -> Result<Response<Body>> {
67+
// Get the time now
68+
let start = Instant::now();
69+
70+
let body = move |iters: usize| async move {
71+
if iters == 0 {
72+
return None;
6273
}
63-
body.flush().await?;
64-
Ok(())
65-
}
74+
// Sleep for 0.1 second.
75+
wstd::task::sleep(Duration::from_millis(100)).await;
76+
77+
// Compute how long we slept for.
78+
let elapsed = Instant::now().duration_since(start).as_millis();
79+
Some((
80+
Ok::<_, Infallible>(Bytes::from(format!(
81+
"stream started {elapsed} millis ago\n"
82+
))),
83+
iters - 1,
84+
))
85+
};
6686

67-
let mut body = responder.start_response(Response::new(BodyForthcoming));
68-
let _ = write_body(&mut body).await;
69-
Finished::fail(body)
87+
Ok(Response::new(Body::from_try_stream(unfold(5, body))))
7088
}
7189

72-
async fn http_echo_headers(request: Request<IncomingBody>, responder: Responder) -> Finished {
90+
async fn http_echo(request: Request<Body>) -> Result<Response<Body>> {
91+
let (_parts, body) = request.into_parts();
92+
Ok(Response::new(body))
93+
}
94+
95+
async fn http_echo_headers(request: Request<Body>) -> Result<Response<Body>> {
7396
let mut response = Response::builder();
7497
*response.headers_mut().unwrap() = request.into_parts().0.headers;
75-
let response = response.body(empty()).unwrap();
76-
responder.respond(response).await
98+
Ok(response.body("".to_owned().into())?)
99+
}
100+
101+
async fn http_echo_trailers(request: Request<Body>) -> Result<Response<Body>> {
102+
let collected = request.into_body().into_boxed_body().collect().await?;
103+
let trailers = collected.trailers().cloned().unwrap_or_else(|| {
104+
let mut trailers = HeaderMap::new();
105+
trailers.insert("x-no-trailers", "1".parse().unwrap());
106+
trailers
107+
});
108+
109+
let body = StreamBody::new(once_future(async move {
110+
anyhow::Ok(Frame::<Bytes>::trailers(trailers))
111+
}));
112+
Ok(Response::new(Body::from_http_body(body)))
77113
}
78114

79-
async fn http_echo_trailers(request: Request<IncomingBody>, responder: Responder) -> Finished {
80-
let body = responder.start_response(Response::new(BodyForthcoming));
81-
let (trailers, result) = match request.into_body().finish().await {
82-
Ok(trailers) => (trailers, Ok(())),
83-
Err(err) => (Default::default(), Err(std::io::Error::other(err))),
115+
async fn http_response_status(request: Request<Body>) -> Result<Response<Body>> {
116+
let status = if let Some(header_val) = request.headers().get("x-response-status") {
117+
header_val
118+
.to_str()
119+
.context("contents of x-response-status")?
120+
.parse::<u16>()
121+
.context("u16 value from x-response-status")?
122+
} else {
123+
500
84124
};
85-
Finished::finish(body, result, trailers)
125+
Ok(Response::builder()
126+
.status(status)
127+
.body(String::new().into())?)
128+
}
129+
130+
async fn http_response_fail(_request: Request<Body>) -> Result<Response<Body>> {
131+
Err(anyhow::anyhow!("error creating response"))
132+
}
133+
134+
async fn http_body_fail(_request: Request<Body>) -> Result<Response<Body>> {
135+
let body = StreamBody::new(once_future(async move {
136+
Err::<Frame<Bytes>, _>(anyhow::anyhow!("error creating body"))
137+
}));
138+
139+
Ok(Response::new(Body::from_http_body(body)))
86140
}
87141

88-
async fn http_not_found(_request: Request<IncomingBody>, responder: Responder) -> Finished {
142+
async fn http_not_found(_request: Request<Body>) -> Result<Response<Body>> {
89143
let response = Response::builder()
90144
.status(StatusCode::NOT_FOUND)
91-
.body(empty())
145+
.body(Body::empty())
92146
.unwrap();
93-
responder.respond(response).await
147+
Ok(response)
94148
}

0 commit comments

Comments
 (0)