From 0985eec4719ecf2ac94f184a57996bd1c0920522 Mon Sep 17 00:00:00 2001 From: Thanatat Tamtan Date: Sun, 31 May 2026 14:17:49 +0700 Subject: [PATCH] Add an optional idle timeout for downstream HTTP/2 connections --- pingora-core/src/apps/mod.rs | 35 ++++-- pingora-core/src/protocols/http/v2/mod.rs | 113 +++++++++++++++---- pingora-core/src/protocols/http/v2/server.rs | 47 ++++++-- 3 files changed, 156 insertions(+), 39 deletions(-) diff --git a/pingora-core/src/apps/mod.rs b/pingora-core/src/apps/mod.rs index 93bea8b4..6dd3dd23 100644 --- a/pingora-core/src/apps/mod.rs +++ b/pingora-core/src/apps/mod.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use log::{debug, error}; use std::any::Any; use std::sync::Arc; +use std::time::Duration; use crate::protocols::http::v2::server; use crate::protocols::http::ServerSession; @@ -81,6 +82,12 @@ pub struct HttpServerOptions { /// /// Unlike nginx, the default behavior here is _no limit_. pub keepalive_request_limit: Option, + + /// If set, close a downstream HTTP/2 connection that has been idle + /// for this duration. + /// + /// Default: `None` + pub h2_idle_timeout: Option, } /// Settings persisted across HTTP/1.x keepalive requests on the same downstream connection. @@ -262,15 +269,25 @@ where // the same code path is exercised by tests in `protocols::http::v2`. let app = self.clone(); let shutdown_for_session = shutdown.clone(); - server::accept_downstream_sessions(h2_conn, digest, shutdown.clone(), |h2_stream| { - let app = app.clone(); - let shutdown = shutdown_for_session.clone(); - pingora_runtime::current_handle().spawn(async move { - // Note, `PersistentSettings` not currently relevant for h2 - app.process_new_http(ServerSession::new_http2(h2_stream), &shutdown) - .await; - }); - }) + let h2_idle_timeout = self.server_options().and_then(|o| o.h2_idle_timeout); + server::accept_downstream_sessions( + h2_conn, + digest, + shutdown.clone(), + h2_idle_timeout, + |h2_stream, guard| { + let app = app.clone(); + let shutdown = shutdown_for_session.clone(); + pingora_runtime::current_handle().spawn(async move { + // hold `guard` for the session's lifetime so the accept + // loop's idle timeout sees this connection as busy. + let _guard = guard; + // Note, `PersistentSettings` not currently relevant for h2 + app.process_new_http(ServerSession::new_http2(h2_stream), &shutdown) + .await; + }); + }, + ) .await; } else if custom || matches!(stream.selected_alpn_proto(), Some(ALPN::Custom(_))) { return self.clone().process_custom_session(stream, shutdown).await; diff --git a/pingora-core/src/protocols/http/v2/mod.rs b/pingora-core/src/protocols/http/v2/mod.rs index 8f664c9d..2710d94e 100644 --- a/pingora-core/src/protocols/http/v2/mod.rs +++ b/pingora-core/src/protocols/http/v2/mod.rs @@ -349,14 +349,20 @@ mod test { }); let mut session_handles = vec![]; - server::accept_downstream_sessions(connection, digest, shutdown_rx, |mut session| { - session_handles.push(tokio::spawn(async move { - let req = session.req_header(); - assert_eq!(req.method, Method::GET); - let resp = Box::new(ResponseHeader::build(200, None).unwrap()); - session.write_response_header(resp, true).unwrap(); - })); - }) + server::accept_downstream_sessions( + connection, + digest, + shutdown_rx, + None, + |mut session, _guard| { + session_handles.push(tokio::spawn(async move { + let req = session.req_header(); + assert_eq!(req.method, Method::GET); + let resp = Box::new(ResponseHeader::build(200, None).unwrap()); + session.write_response_header(resp, true).unwrap(); + })); + }, + ) .await; trigger.await.unwrap(); @@ -443,12 +449,18 @@ mod test { }); let mut session_handles = vec![]; - server::accept_downstream_sessions(connection, digest, shutdown_rx, |mut session| { - session_handles.push(tokio::spawn(async move { - let resp = Box::new(ResponseHeader::build(200, None).unwrap()); - session.write_response_header(resp, true).unwrap(); - })); - }) + server::accept_downstream_sessions( + connection, + digest, + shutdown_rx, + None, + |mut session, _guard| { + session_handles.push(tokio::spawn(async move { + let resp = Box::new(ResponseHeader::build(200, None).unwrap()); + session.write_response_header(resp, true).unwrap(); + })); + }, + ) .await; trigger.await.unwrap(); @@ -502,9 +514,15 @@ mod test { let result = pingora_timeout::timeout( Duration::from_secs(2), - server::accept_downstream_sessions(connection, digest, shutdown_rx, |_session| { - panic!("did not expect any sessions on an idle connection"); - }), + server::accept_downstream_sessions( + connection, + digest, + shutdown_rx, + None, + |_session, _guard| { + panic!("did not expect any sessions on an idle connection"); + }, + ), ) .await; assert!(result.is_ok(), "accept loop hung after shutdown"); @@ -513,6 +531,49 @@ mod test { client_handle.await.unwrap(); } + #[tokio::test] + async fn test_h2_idle_timeout_closes_idle_connection() { + let (mut client, server) = duplex(65536); + // Keep the sender alive so `shutdown.changed()` stays pending — the only + // thing that should end the accept loop is the idle timeout. + let (_shutdown_tx, shutdown_rx) = watch::channel(false); + + let client_handle = tokio::spawn(async move { + client + .write_all(b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") + .await + .unwrap(); + let mut codec: h2::Codec = h2::Codec::new(client); + codec.send(Settings::default().into()).await.unwrap(); + codec.send(Settings::ack().into()).await.unwrap(); + // Open no streams; stay connected and drain frames until the server + // drops the connection on idle timeout (codec returns None on EOF). + while let Some(frame) = codec.next().await { + let _ = frame; + } + }); + + let connection = handshake(Box::new(server), None).await.unwrap(); + let digest = Arc::new(Digest::default()); + + // No shutdown is signaled and no stream is opened; a short idle timeout + // must make the accept loop return on its own. + let result = pingora_timeout::timeout( + Duration::from_secs(2), + server::accept_downstream_sessions( + connection, + digest, + shutdown_rx, + Some(Duration::from_millis(100)), + |_session, _guard| panic!("did not expect any sessions on an idle connection"), + ), + ) + .await; + assert!(result.is_ok(), "idle timeout did not close the connection"); + + client_handle.await.unwrap(); + } + #[tokio::test] async fn test_graceful_shutdown_refuses_stream_above_last_stream_id() { // After the server commits to a final last_stream_id and emits the @@ -600,12 +661,18 @@ mod test { let mut session_handles = vec![]; let result = pingora_timeout::timeout( Duration::from_secs(5), - server::accept_downstream_sessions(connection, digest, shutdown_rx, |mut session| { - session_handles.push(tokio::spawn(async move { - let resp = Box::new(ResponseHeader::build(200, None).unwrap()); - session.write_response_header(resp, true).unwrap(); - })); - }), + server::accept_downstream_sessions( + connection, + digest, + shutdown_rx, + None, + |mut session, _guard| { + session_handles.push(tokio::spawn(async move { + let resp = Box::new(ResponseHeader::build(200, None).unwrap()); + session.write_response_header(resp, true).unwrap(); + })); + }, + ), ) .await; assert!(result.is_ok(), "accept loop hung after shutdown"); diff --git a/pingora-core/src/protocols/http/v2/server.rs b/pingora-core/src/protocols/http/v2/server.rs index 604d53c6..0aa92106 100644 --- a/pingora-core/src/protocols/http/v2/server.rs +++ b/pingora-core/src/protocols/http/v2/server.rs @@ -25,6 +25,7 @@ use http::{header, HeaderMap, Response}; use log::{debug, warn}; use pingora_http::{RequestHeader, ResponseHeader}; use pingora_timeout::timeout; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::ready; use std::time::Duration; @@ -87,23 +88,29 @@ pub async fn handshake(io: Stream, options: Option) -> Result( mut conn: H2Connection, digest: Arc, mut shutdown: ShutdownWatch, + idle_timeout: Option, mut on_session: F, ) where - F: FnMut(HttpSession), + F: FnMut(HttpSession, StreamGuard), { let mut shutdown_initiated = false; + // In-flight sessions, decremented by the `StreamGuard` given to `on_session`. + let active = Arc::new(AtomicUsize::new(0)); loop { let h2_stream = if shutdown_initiated { HttpSession::from_h2_conn(&mut conn, digest.clone()).await @@ -119,6 +126,16 @@ pub(crate) async fn accept_downstream_sessions( continue; } h2_stream = HttpSession::from_h2_conn(&mut conn, digest.clone()) => h2_stream, + // Idle timeout: re-armed each iteration, so any accepted stream resets it + _ = pingora_timeout::sleep(idle_timeout.unwrap_or_default()), if idle_timeout.is_some() => { + if active.load(Ordering::Relaxed) == 0 { + // Idle with nothing in flight: drop `conn` to close the + // socket now (no graceful GOAWAY wait that could hang on + // a dead peer). + return; + } + continue; + } } }; match h2_stream { @@ -130,11 +147,27 @@ pub(crate) async fn accept_downstream_sessions( } // None means the connection is ready to be closed Ok(None) => return, - Ok(Some(session)) => on_session(session), + Ok(Some(session)) => { + active.fetch_add(1, Ordering::Relaxed); + on_session(session, StreamGuard(active.clone())); + } } } } +/// Tracks one in-flight downstream H2 session for [`accept_downstream_sessions`]. +/// `on_session` receives it alongside each session; keep it alive for as long as +/// the session is being processed (e.g. move it into the spawned task) so the +/// accept loop's idle timeout can tell a busy connection from an idle one. It +/// decrements the in-flight counter when dropped. +pub(crate) struct StreamGuard(Arc); + +impl Drop for StreamGuard { + fn drop(&mut self) { + self.0.fetch_sub(1, Ordering::Relaxed); + } +} + use futures::task::Context; use futures::task::Poll; use std::pin::Pin;