axum/serve/
listener.rs

1use std::{fmt, future::Future, time::Duration};
2
3use tokio::{
4    io::{self, AsyncRead, AsyncWrite},
5    net::{TcpListener, TcpStream},
6};
7
8/// Types that can listen for connections.
9pub trait Listener: Send + 'static {
10    /// The listener's IO type.
11    type Io: AsyncRead + AsyncWrite + Unpin + Send + 'static;
12
13    /// The listener's address type.
14    type Addr: Send;
15
16    /// Accept a new incoming connection to this listener.
17    ///
18    /// If the underlying accept call can return an error, this function must
19    /// take care of logging and retrying.
20    fn accept(&mut self) -> impl Future<Output = (Self::Io, Self::Addr)> + Send;
21
22    /// Returns the local address that this listener is bound to.
23    fn local_addr(&self) -> io::Result<Self::Addr>;
24}
25
26impl Listener for TcpListener {
27    type Io = TcpStream;
28    type Addr = std::net::SocketAddr;
29
30    async fn accept(&mut self) -> (Self::Io, Self::Addr) {
31        loop {
32            match Self::accept(self).await {
33                Ok(tup) => return tup,
34                Err(e) => handle_accept_error(e).await,
35            }
36        }
37    }
38
39    #[inline]
40    fn local_addr(&self) -> io::Result<Self::Addr> {
41        Self::local_addr(self)
42    }
43}
44
45#[cfg(unix)]
46impl Listener for tokio::net::UnixListener {
47    type Io = tokio::net::UnixStream;
48    type Addr = tokio::net::unix::SocketAddr;
49
50    async fn accept(&mut self) -> (Self::Io, Self::Addr) {
51        loop {
52            match Self::accept(self).await {
53                Ok(tup) => return tup,
54                Err(e) => handle_accept_error(e).await,
55            }
56        }
57    }
58
59    #[inline]
60    fn local_addr(&self) -> io::Result<Self::Addr> {
61        Self::local_addr(self)
62    }
63}
64
65/// Extensions to [`Listener`].
66pub trait ListenerExt: Listener + Sized {
67    /// Run a mutable closure on every accepted `Io`.
68    ///
69    /// # Example
70    ///
71    /// ```
72    /// use axum::{Router, routing::get, serve::ListenerExt};
73    /// use tracing::trace;
74    ///
75    /// # async {
76    /// let router = Router::new().route("/", get(|| async { "Hello, World!" }));
77    ///
78    /// let listener = tokio::net::TcpListener::bind("0.0.0.0:3000")
79    ///     .await
80    ///     .unwrap()
81    ///     .tap_io(|tcp_stream| {
82    ///         if let Err(err) = tcp_stream.set_nodelay(true) {
83    ///             trace!("failed to set TCP_NODELAY on incoming connection: {err:#}");
84    ///         }
85    ///     });
86    /// axum::serve(listener, router).await.unwrap();
87    /// # };
88    /// ```
89    fn tap_io<F>(self, tap_fn: F) -> TapIo<Self, F>
90    where
91        F: FnMut(&mut Self::Io) + Send + 'static,
92    {
93        TapIo {
94            listener: self,
95            tap_fn,
96        }
97    }
98}
99
100impl<L: Listener> ListenerExt for L {}
101
102/// Return type of [`ListenerExt::tap_io`].
103///
104/// See that method for details.
105pub struct TapIo<L, F> {
106    listener: L,
107    tap_fn: F,
108}
109
110impl<L, F> fmt::Debug for TapIo<L, F>
111where
112    L: Listener + fmt::Debug,
113{
114    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115        f.debug_struct("TapIo")
116            .field("listener", &self.listener)
117            .finish_non_exhaustive()
118    }
119}
120
121impl<L, F> Listener for TapIo<L, F>
122where
123    L: Listener,
124    F: FnMut(&mut L::Io) + Send + 'static,
125{
126    type Io = L::Io;
127    type Addr = L::Addr;
128
129    async fn accept(&mut self) -> (Self::Io, Self::Addr) {
130        let (mut io, addr) = self.listener.accept().await;
131        (self.tap_fn)(&mut io);
132        (io, addr)
133    }
134
135    fn local_addr(&self) -> io::Result<Self::Addr> {
136        self.listener.local_addr()
137    }
138}
139
140async fn handle_accept_error(e: io::Error) {
141    if is_connection_error(&e) {
142        return;
143    }
144
145    // [From `hyper::Server` in 0.14](https://github.com/hyperium/hyper/blob/v0.14.27/src/server/tcp.rs#L186)
146    //
147    // > A possible scenario is that the process has hit the max open files
148    // > allowed, and so trying to accept a new connection will fail with
149    // > `EMFILE`. In some cases, it's preferable to just wait for some time, if
150    // > the application will likely close some files (or connections), and try
151    // > to accept the connection again. If this option is `true`, the error
152    // > will be logged at the `error` level, since it is still a big deal,
153    // > and then the listener will sleep for 1 second.
154    //
155    // hyper allowed customizing this but axum does not.
156    error!("accept error: {e}");
157    tokio::time::sleep(Duration::from_secs(1)).await;
158}
159
160fn is_connection_error(e: &io::Error) -> bool {
161    matches!(
162        e.kind(),
163        io::ErrorKind::ConnectionRefused
164            | io::ErrorKind::ConnectionAborted
165            | io::ErrorKind::ConnectionReset
166    )
167}