Skip to content

Commit ac75e2f

Browse files
committed
Split up serve functions for Serve and WithGracefulShutdown again
We should not spawn a tokio task to wait on a signal that can never arrive.
1 parent c1162d3 commit ac75e2f

File tree

1 file changed

+80
-47
lines changed

1 file changed

+80
-47
lines changed

axum/src/serve/mod.rs

Lines changed: 80 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,33 @@ where
166166
}
167167
}
168168

169+
#[cfg(all(feature = "tokio", any(feature = "http1", feature = "http2")))]
170+
impl<L, M, S> Serve<L, M, S>
171+
where
172+
L: Listener,
173+
L::Addr: Debug,
174+
M: for<'a> Service<IncomingStream<'a, L>, Error = Infallible, Response = S> + Send + 'static,
175+
for<'a> <M as Service<IncomingStream<'a, L>>>::Future: Send,
176+
S: Service<Request, Response = Response, Error = Infallible> + Clone + Send + 'static,
177+
S::Future: Send,
178+
{
179+
async fn run(self) -> ! {
180+
let Self {
181+
mut listener,
182+
mut make_service,
183+
_marker,
184+
} = self;
185+
186+
let (signal_tx, _signal_rx) = watch::channel(());
187+
let (_close_tx, close_rx) = watch::channel(());
188+
189+
loop {
190+
let (io, remote_addr) = listener.accept().await;
191+
handle_connection(&mut make_service, &signal_tx, &close_rx, io, remote_addr).await;
192+
}
193+
}
194+
}
195+
169196
#[cfg(all(feature = "tokio", any(feature = "http1", feature = "http2")))]
170197
impl<L, M, S> Debug for Serve<L, M, S>
171198
where
@@ -201,10 +228,7 @@ where
201228
type IntoFuture = private::ServeFuture;
202229

203230
fn into_future(self) -> Self::IntoFuture {
204-
private::ServeFuture(Box::pin(async move {
205-
do_serve(self.listener, self.make_service, std::future::pending()).await;
206-
Ok(())
207-
}))
231+
private::ServeFuture(Box::pin(async move { self.run().await }))
208232
}
209233
}
210234

@@ -229,6 +253,57 @@ where
229253
}
230254
}
231255

256+
#[cfg(all(feature = "tokio", any(feature = "http1", feature = "http2")))]
257+
impl<L, M, S, F> WithGracefulShutdown<L, M, S, F>
258+
where
259+
L: Listener,
260+
L::Addr: Debug,
261+
M: for<'a> Service<IncomingStream<'a, L>, Error = Infallible, Response = S> + Send + 'static,
262+
for<'a> <M as Service<IncomingStream<'a, L>>>::Future: Send,
263+
S: Service<Request, Response = Response, Error = Infallible> + Clone + Send + 'static,
264+
S::Future: Send,
265+
F: Future<Output = ()> + Send + 'static,
266+
{
267+
async fn run(self) {
268+
let Self {
269+
mut listener,
270+
mut make_service,
271+
signal,
272+
_marker,
273+
} = self;
274+
275+
let (signal_tx, signal_rx) = watch::channel(());
276+
tokio::spawn(async move {
277+
signal.await;
278+
trace!("received graceful shutdown signal. Telling tasks to shutdown");
279+
drop(signal_rx);
280+
});
281+
282+
let (close_tx, close_rx) = watch::channel(());
283+
284+
loop {
285+
let (io, remote_addr) = tokio::select! {
286+
conn = listener.accept() => conn,
287+
_ = signal_tx.closed() => {
288+
trace!("signal received, not accepting new connections");
289+
break;
290+
}
291+
};
292+
293+
handle_connection(&mut make_service, &signal_tx, &close_rx, io, remote_addr).await;
294+
}
295+
296+
drop(close_rx);
297+
drop(listener);
298+
299+
trace!(
300+
"waiting for {} task(s) to finish",
301+
close_tx.receiver_count()
302+
);
303+
close_tx.closed().await;
304+
}
305+
}
306+
232307
#[cfg(all(feature = "tokio", any(feature = "http1", feature = "http2")))]
233308
impl<L, M, S, F> Debug for WithGracefulShutdown<L, M, S, F>
234309
where
@@ -269,54 +344,12 @@ where
269344

270345
fn into_future(self) -> Self::IntoFuture {
271346
private::ServeFuture(Box::pin(async move {
272-
do_serve(self.listener, self.make_service, self.signal).await;
347+
self.run().await;
273348
Ok(())
274349
}))
275350
}
276351
}
277352

278-
#[cfg(all(feature = "tokio", any(feature = "http1", feature = "http2")))]
279-
async fn do_serve<L, M, F, S>(mut listener: L, mut make_service: M, signal: F)
280-
where
281-
L: Listener,
282-
L::Addr: Debug,
283-
M: for<'a> Service<IncomingStream<'a, L>, Error = Infallible, Response = S> + Send + 'static,
284-
for<'a> <M as Service<IncomingStream<'a, L>>>::Future: Send,
285-
S: Service<Request, Response = Response, Error = Infallible> + Clone + Send + 'static,
286-
S::Future: Send,
287-
F: Future<Output = ()> + Send + 'static,
288-
{
289-
let (signal_tx, signal_rx) = watch::channel(());
290-
tokio::spawn(async move {
291-
signal.await;
292-
trace!("received graceful shutdown signal. Telling tasks to shutdown");
293-
drop(signal_rx);
294-
});
295-
296-
let (close_tx, close_rx) = watch::channel(());
297-
298-
loop {
299-
let (io, remote_addr) = tokio::select! {
300-
conn = listener.accept() => conn,
301-
_ = signal_tx.closed() => {
302-
trace!("signal received, not accepting new connections");
303-
break;
304-
}
305-
};
306-
307-
handle_connection(&mut make_service, &signal_tx, &close_rx, io, remote_addr).await;
308-
}
309-
310-
drop(close_rx);
311-
drop(listener);
312-
313-
trace!(
314-
"waiting for {} task(s) to finish",
315-
close_tx.receiver_count()
316-
);
317-
close_tx.closed().await;
318-
}
319-
320353
async fn handle_connection<L, M, S>(
321354
make_service: &mut M,
322355
signal_tx: &watch::Sender<()>,

0 commit comments

Comments
 (0)