Pull request 123: TRUST-241: Add logging and fix hanging connections

Squashed commit of the following:

commit 07e1a490f3e7a7f381de5f91c9645911aa4f3ada
Author: Andrey Meshkov <am@adguard.com>
Date:   Sat Dec 27 13:26:26 2025 +0300

    Fix tests

commit ed836634066c89934bac989e7785096f2a86b5c2
Author: Andrey Meshkov <am@adguard.com>
Date:   Fri Dec 26 21:03:36 2025 +0300

    Added cross-compile instructions

commit 497733e542bad09ae9606aa6fc0c244d755ddae5
Author: Andrey Meshkov <am@adguard.com>
Date:   Fri Dec 26 20:50:39 2025 +0300

    Added tcp keepalive

commit 8a4c9c2b56c35ade23ccbd1b6ec2413c4ad072ad
Author: Andrey Meshkov <am@adguard.com>
Date:   Fri Dec 26 20:37:19 2025 +0300

    Fix eof in tcp forwarder

commit 84af0fecbef3a2c96a63f4d0e2c5bbd6a97fa26d
Merge: 6adfe25 8d8addd
Author: Andrey Meshkov <am@adguard.com>
Date:   Fri Dec 26 18:30:03 2025 +0300

    Merge branch 'master' into add_logging

commit 6adfe2576824f18f9174c6d7e0c3a86b272d350f
Merge: f723e93 0daedac
Author: Andrey Meshkov <am@adguard.com>
Date:   Fri Dec 26 18:26:27 2025 +0300

    Merge branch 'fix/TRUST-231' into add_logging

commit f723e93bbd8f98afd686db3895621654946e49d0
Author: Andrey Meshkov <am@adguard.com>
Date:   Fri Dec 26 18:18:50 2025 +0300

    Added more logging

commit 0daedac118bcd6e4e85e8223ca120fc5be41f275
Author: Zhavoronkov Aleksei <a.zhavoronkov@adguard.com>
Date:   Thu Dec 25 13:49:49 2025 +0300

    Use tls_handshake_timeout for handshake
This commit is contained in:
Andrey Meshkov
2025-12-29 12:41:55 +03:00
committed by Sergey Fionov
parent 8d8addd0f2
commit ddffd17835
9 changed files with 297 additions and 61 deletions

9
Cargo.lock generated
View File

@@ -310,9 +310,9 @@ checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3"
[[package]]
name = "cc"
version = "1.2.50"
version = "1.2.51"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f50d563227a1c37cc0a263f64eca3334388c01c5e4c4861a9def205c614383c"
checksum = "7a0aeaff4ff1a90589618835a598e545176939b97874f7abc7851caa0618f203"
dependencies = [
"find-msvc-tools",
"shlex",
@@ -681,9 +681,9 @@ checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "find-msvc-tools"
version = "0.1.5"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a3076410a55c90011c298b04d0cfa770b00fa04e1e3c97d3f6c9de105a03844"
checksum = "645cbb3a84e60b7531617d5ae4e57f7e27308f6445f5abf653209ea76dec8dff"
[[package]]
name = "flate2"
@@ -3343,6 +3343,7 @@ dependencies = [
"rustls-pki-types",
"serde",
"smallvec",
"socket2 0.5.10",
"tls-parser",
"tokio",
"tokio-rustls 0.24.1",

View File

@@ -1,6 +1,17 @@
# Development documentation
## Getting Started with building the endpoint
## Table of Contents
- [Getting Started](#getting-started)
- [Prerequisites](#prerequisites)
- [Building](#building)
- [Cross-compiling for Linux](#cross-compiling-for-linux)
- [Usage](#usage)
- [Setup](#setup)
- [Customized Configuration](#customized-configuration)
- [See Also](#see-also)
## Getting Started
### Prerequisites
@@ -12,17 +23,17 @@ Run `make init` to prepare the development environment.
- macOS/Linux: `curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --default-toolchain 1.85 -y`
- CMake version 3.31.6 or higher
- macOS: `brew install cmake`
- Linux: (Debian/Ubuntu): `apt install cmake`
- Linux (Debian/Ubuntu): `apt install cmake`
- [libclang](https://rust-lang.github.io/rust-bindgen/requirements.html#installing-clang) library 9.0 or higher.
- C++ compiler
- macOS: `xcode-select --install` but you likely already have it if you are using `brew`
- Linux (Debian/Ubuntu): `apt install build-essential`
For running linters and tests you need additionally:
For running linters and tests, you additionally need:
- Node.js version 22.12 or higher
- macOS: `brew install node`
- Linux: (Debian/Ubuntu): `apt install nodejs`
- Linux (Debian/Ubuntu): `apt install nodejs`
- Markdownlint
- `npm install -g markdownlint-cli`
@@ -34,13 +45,28 @@ Build the binaries using Cargo:
cargo build --bins --release
```
or to build binaries for debug:
Or to build binaries for debug:
```shell
cargo build --bins
```
This command will generate the executables in the `target/release` or `target/debug` directory accordingly.
These commands will generate the executables in the `target/release` or `target/debug` directory accordingly.
### Cross-compiling for Linux
To build for Linux (x86_64-unknown-linux-musl) from macOS or other platforms, use the Docker-based build:
```shell
docker run --rm --platform linux/amd64 -v "$(pwd)":/work -w /work adguard/core-libs:2.6 sh -c '\
CC=x86_64-linux-musl-gcc \
CXX=x86_64-linux-musl-g++ \
CARGO_TARGET_X86_64_UNKNOWN_LINUX_MUSL_LINKER=x86_64-linux-musl-gcc \
BINDGEN_EXTRA_CLANG_ARGS="--sysroot=/opt/cross/x86_64-linux-musl" \
cargo build --release --target x86_64-unknown-linux-musl'
```
This will produce the binaries in `target/x86_64-unknown-linux-musl/release/`.
## Usage
@@ -68,14 +94,14 @@ These commands perform the following actions:
5. Start the endpoint.
Alternatively, you can run the endpoint in a docker container:
Alternatively, you can run the endpoint in a Docker container:
```shell
docker build -t trusttunnel-endpoint:latest . # build an image
docker build -t trusttunnel-endpoint:latest . # Build an image
docker run -it trusttunnel-endpoint:latest --name trusttunnel-endpoint # create docker container and start it in an interactive mode
docker run -it trusttunnel-endpoint:latest --name trusttunnel-endpoint # Create a Docker container and start it in interactive mode
docker start -i trusttunnel-endpoint # if you need to start your vpn endpoint again
docker start -i trusttunnel-endpoint # If you need to start your VPN endpoint again
```
### Customized Configuration
@@ -83,8 +109,8 @@ docker start -i trusttunnel-endpoint # if you need to start your vpn endpoint ag
For a more customized configuration experience, run the following commands:
```shell
make endpoint/build-wizard # If you skipped the previous chapter
cargo run --bin setup_wizard # Launches a dialogue session allowing you to tweak the settings
make endpoint/build-wizard # If you skipped the previous section
cargo run --bin setup_wizard # Launch a dialogue session allowing you to tweak the settings
cargo run --bin trusttunnel_endpoint -- <lib-settings> <hosts-settings> # File names depend on the previous step
```

View File

@@ -20,7 +20,7 @@ bytes = "1.4.0"
chrono = { version = "0.4.26", default-features = false, features = ["clock"] }
dynfmt = { version = "0.1.5", features = ["curly"], default-features = false }
futures = "0.3.28"
h2 = "0.3.20"
h2 = "0.3.26"
hex = "0.4.3"
http = "0.2.9"
httparse = "1.8.0"
@@ -37,6 +37,7 @@ rustls = { version = "0.21.2", features = ["logging"] }
rustls-pki-types = "1.13.2"
serde = "1.0.164"
smallvec = "1.10.0"
socket2 = "0.5"
tokio = { version = "1.42", features = ["net", "rt", "sync", "time", "macros", "rt-multi-thread"] }
tokio-rustls = "0.24.1"
toml_edit = "0.19.10"

View File

@@ -19,6 +19,7 @@ use crate::{
authentication, http_ping_handler, http_speedtest_handler, log_id, log_utils, metrics,
net_utils, reverse_proxy, rules, settings, tls_demultiplexer, tunnel,
};
use socket2::SockRef;
use std::io;
use std::io::ErrorKind;
use std::sync::atomic::{AtomicU64, Ordering};
@@ -173,8 +174,13 @@ impl Core {
log_utils::CLIENT_ID_FMT,
self.context.next_client_id.fetch_add(1, Ordering::Relaxed),
));
log_id!(trace, client_id, "Accepting TCP connection");
let (stream, client_addr) = match tcp_listener.accept().await.and_then(|(s, a)| {
s.set_nodelay(true)?;
// Enable TCP keepalive to detect broken connections.
let sock_ref = SockRef::from(&s);
sock_ref.set_keepalive(true)?;
Ok((s, a))
}) {
Ok((stream, addr)) => {
@@ -195,12 +201,18 @@ impl Core {
let context = self.context.clone();
let tls_listener = tls_listener.clone();
async move {
log_id!(trace, client_id, "Starting TLS handshake");
let handshake_timeout = context.settings.tls_handshake_timeout;
match tokio::time::timeout(handshake_timeout, tls_listener.listen(stream))
.await
.unwrap_or_else(|_| Err(io::Error::from(ErrorKind::TimedOut)))
{
Ok(acceptor) => {
log_id!(
trace,
client_id,
"TLS handshake complete, processing connection"
);
if let Err((client_id, message)) = Core::on_new_tls_connection(
context.clone(),
acceptor,
@@ -264,6 +276,12 @@ impl Core {
client_ip: std::net::IpAddr,
client_id: log_utils::IdChain<u64>,
) -> Result<(), (log_utils::IdChain<u64>, String)> {
log_id!(
trace,
client_id,
"Processing TLS connection from {}",
client_ip
);
let sni = match acceptor.sni() {
Some(s) => s,
None => {
@@ -273,6 +291,12 @@ impl Core {
))
}
};
log_id!(
trace,
client_id,
"TLS SNI: {}",
net_utils::scrub_sni(sni.to_string())
);
// Apply connection filtering rules
if let Err(deny_reason) = Self::evaluate_connection_rules(
&context,
@@ -311,31 +335,51 @@ impl Core {
tls_connection_meta
);
let stream = match acceptor
.accept(
log_id!(
trace,
client_id,
"Accepting TLS connection with protocol {:?}",
tls_connection_meta.protocol
);
let stream = match tokio::time::timeout(
context.settings.tls_handshake_timeout,
acceptor.accept(
tls_connection_meta.protocol,
tls_connection_meta.cert_chain,
tls_connection_meta.key,
&client_id,
)
.await
),
)
.await
{
Ok(s) => {
Ok(Ok(s)) => {
log_id!(debug, client_id, "New TLS client: {:?}", s);
s
}
Err(e) => {
Ok(Err(e)) => {
return Err((client_id, format!("TLS connection failed: {}", e)));
}
Err(_) => {
return Err((
client_id,
"TLS connection failed: handshake timed out".to_string(),
));
}
};
log_id!(
trace,
client_id,
"Routing to channel: {:?}",
tls_connection_meta.channel
);
match tls_connection_meta.channel {
net_utils::Channel::Tunnel => {
let tunnel_id = client_id.extended(log_utils::IdItem::new(
log_utils::TUNNEL_ID_FMT,
context.next_tunnel_id.fetch_add(1, Ordering::Relaxed),
));
log_id!(trace, tunnel_id, "Creating tunnel");
Self::on_tunnel_request(
context,
tls_connection_meta.protocol,

View File

@@ -87,8 +87,9 @@ where
impl<IO: AsyncRead + AsyncWrite + Send + Unpin> HttpCodec for Http2Codec<IO> {
async fn listen(&mut self) -> io::Result<Option<Box<dyn http_codec::Stream>>> {
if let State::Handshake(handshake) = &mut self.state {
log_id!(trace, self.parent_id_chain, "H2 handshake in progress");
self.state = State::Established(handshake.await.map_err(h2_to_io_error)?);
log_id!(trace, self.parent_id_chain, "HTTP2 connection established");
log_id!(trace, self.parent_id_chain, "H2 connection established");
}
let session = match &mut self.state {
@@ -96,6 +97,7 @@ impl<IO: AsyncRead + AsyncWrite + Send + Unpin> HttpCodec for Http2Codec<IO> {
State::Established(s) => s,
};
log_id!(trace, self.parent_id_chain, "H2 waiting for stream");
match session.accept().await {
Some(Ok((request, respond))) => {
let (request, rx) = request.into_parts();
@@ -106,7 +108,12 @@ impl<IO: AsyncRead + AsyncWrite + Send + Unpin> HttpCodec for Http2Codec<IO> {
// @note: [`h2::StreamId`] cannot be converted to raw integer, so just log it
// to have a link between stream and out own generated IDs in the logs
// @note: could be worked around by allowing any id type in the id chain
log_id!(debug, id, "H2 stream id: {:?}", rx.stream_id());
log_id!(
debug,
id,
"H2 stream accepted, stream_id: {:?}",
rx.stream_id()
);
Ok(Some(Box::new(Stream {
request: Request {
request,
@@ -117,25 +124,64 @@ impl<IO: AsyncRead + AsyncWrite + Send + Unpin> HttpCodec for Http2Codec<IO> {
respond: Respond { tx: respond, id },
})))
}
Some(Err(e)) if e.is_io() => Err(e.into_io().unwrap()),
Some(Err(e)) if e.reason() == Some(Reason::NO_ERROR) => Ok(None),
Some(Err(e)) => Err(h2_to_io_error(e)),
None => Ok(None),
Some(Err(e)) if e.is_io() => {
log_id!(trace, self.parent_id_chain, "H2 stream accept IO error");
Err(e.into_io().unwrap())
}
Some(Err(e)) if e.reason() == Some(Reason::NO_ERROR) => {
log_id!(
trace,
self.parent_id_chain,
"H2 connection closed gracefully"
);
Ok(None)
}
Some(Err(e)) => {
log_id!(
trace,
self.parent_id_chain,
"H2 stream accept error: {:?}",
e.reason()
);
Err(h2_to_io_error(e))
}
None => {
log_id!(trace, self.parent_id_chain, "H2 no more streams");
Ok(None)
}
}
}
async fn graceful_shutdown(&mut self) -> io::Result<()> {
let session = match &mut self.state {
State::Handshake(_) => return Ok(()),
State::Handshake(_) => {
log_id!(
trace,
self.parent_id_chain,
"H2 graceful shutdown (handshake state)"
);
return Ok(());
}
State::Established(s) => s,
};
log_id!(
trace,
self.parent_id_chain,
"H2 initiating graceful shutdown"
);
session.graceful_shutdown();
loop {
match session.accept().await {
None => break Ok(()),
Some(Err(e)) if e.is_io() => break Err(e.into_io().unwrap()),
None => {
log_id!(trace, self.parent_id_chain, "H2 graceful shutdown complete");
break Ok(());
}
Some(Err(e)) if e.is_io() => {
log_id!(trace, self.parent_id_chain, "H2 graceful shutdown IO error");
break Err(e.into_io().unwrap());
}
Some(_) => continue,
}
}
@@ -193,11 +239,24 @@ impl pipe::Source for RequestStream {
}
async fn read(&mut self) -> io::Result<pipe::Data> {
log_id!(trace, self.id, "H2 stream reading data");
match self.rx.data().await {
None => Ok(pipe::Data::Eof),
Some(Ok(chunk)) => Ok(pipe::Data::Chunk(chunk)),
Some(Err(e)) if e.reason().is_none_or(|r| r == Reason::NO_ERROR) => Ok(pipe::Data::Eof),
Some(Err(e)) => Err(h2_to_io_error(e)),
None => {
log_id!(trace, self.id, "H2 stream read EOF");
Ok(pipe::Data::Eof)
}
Some(Ok(chunk)) => {
log_id!(trace, self.id, "H2 stream read {} bytes", chunk.len());
Ok(pipe::Data::Chunk(chunk))
}
Some(Err(e)) if e.reason().is_none_or(|r| r == Reason::NO_ERROR) => {
log_id!(trace, self.id, "H2 stream read EOF (NO_ERROR)");
Ok(pipe::Data::Eof)
}
Some(Err(e)) => {
log_id!(trace, self.id, "H2 stream read error: {:?}", e.reason());
Err(h2_to_io_error(e))
}
}
}
@@ -220,10 +279,10 @@ impl http_codec::PendingRespond for Respond {
eof: bool,
) -> io::Result<Box<dyn http_codec::RespondedStreamSink>> {
log_id!(
debug,
trace,
self.id,
"Sending response: {:?} (eof={})",
response,
"H2 sending response: status={} (eof={})",
response.status,
eof
);
@@ -232,6 +291,7 @@ impl http_codec::PendingRespond for Respond {
.send_response(http::Response::from_parts(response, ()), eof)
.map_err(h2_to_io_error)?;
log_id!(trace, self.id, "H2 response sent successfully");
Ok(Box::new(RespondStream { tx, id: self.id }))
}
}
@@ -273,20 +333,31 @@ impl pipe::Sink for RespondStream {
}
fn write(&mut self, mut data: Bytes) -> io::Result<Bytes> {
let original_len = data.len();
self.tx.reserve_capacity(data.len());
let to_send = self.tx.capacity();
self.tx
.send_data(data.split_to(self.tx.capacity()), false)
.send_data(data.split_to(to_send), false)
.map_err(h2_to_io_error)?;
log_id!(
trace,
self.id,
"H2 stream wrote {}/{} bytes",
to_send,
original_len
);
Ok(data)
}
fn eof(&mut self) -> io::Result<()> {
log_id!(trace, self.id, "H2 stream sending EOF");
self.tx
.send_data(Bytes::new(), true)
.map_err(h2_to_io_error)
}
async fn wait_writable(&mut self) -> io::Result<()> {
log_id!(trace, self.id, "H2 stream waiting for writable");
WaitWritable {
stream: &mut self.tx,
}

View File

@@ -94,6 +94,13 @@ impl Downstream for HttpDownstream {
};
let request = stream.request().request();
let stream_id = stream.id();
log_id!(
trace,
stream_id,
"HTTP downstream received request: {} {}",
request.method,
request.uri
);
log_id!(
debug,
stream_id,
@@ -104,14 +111,23 @@ impl Downstream for HttpDownstream {
let protocol = self.protocol();
let settings = self.core_settings.clone();
let shutdown = self.shutdown.clone();
match self.request_demux.select(self.protocol(), request) {
let channel = self.request_demux.select(self.protocol(), request);
log_id!(
trace,
stream_id,
"HTTP downstream routing to channel: {:?}",
channel
);
match channel {
net_utils::Channel::Tunnel => {
log_id!(trace, stream_id, "HTTP downstream: tunnel request");
break Ok(Some(Box::new(PendingRequest {
stream,
id: stream_id,
})))
})));
}
net_utils::Channel::Ping => {
log_id!(trace, stream_id, "HTTP downstream: ping request");
tokio::spawn(async move {
http_ping_handler::listen(
shutdown.clone(),
@@ -123,6 +139,7 @@ impl Downstream for HttpDownstream {
});
}
net_utils::Channel::Speedtest => {
log_id!(trace, stream_id, "HTTP downstream: speedtest request");
tokio::spawn(async move {
http_speedtest_handler::listen(
shutdown.clone(),
@@ -134,6 +151,7 @@ impl Downstream for HttpDownstream {
});
}
net_utils::Channel::ReverseProxy => {
log_id!(trace, stream_id, "HTTP downstream: reverse proxy request");
tokio::spawn({
let sni = self.tls_domain.clone();
async move {

View File

@@ -27,7 +27,7 @@ struct StreamTx {
/// supported in the wild. For example,
/// nginx https://mailman.nginx.org/pipermail/nginx/2008-September/007388.html, or
/// golang https://github.com/golang/go/issues/18527.
is_shut_down: bool,
eof_pending: bool,
id: log_utils::IdChain<u64>,
}
@@ -45,7 +45,7 @@ impl TcpForwarder {
Box::new(StreamRx { rx, id: id.clone() }),
Box::new(StreamTx {
tx,
is_shut_down: false,
eof_pending: false,
id,
}),
)
@@ -124,7 +124,17 @@ impl TcpConnector for TcpForwarder {
s.set_nodelay(true)?;
Ok(s)
})
.map(|s| TcpForwarder::pipe_from_stream(s, id))
.map(|s| {
if let Ok(local_addr) = s.local_addr() {
log_id!(
trace,
id,
"Connection established, local port: {}",
local_addr.port()
);
}
TcpForwarder::pipe_from_stream(s, id)
})
.map_err(io_to_connection_error)
}
}
@@ -162,7 +172,7 @@ impl pipe::Sink for StreamTx {
}
fn write(&mut self, mut data: Bytes) -> io::Result<Bytes> {
if self.is_shut_down {
if self.eof_pending {
return Err(io::Error::new(
ErrorKind::Other,
"Already shut down".to_string(),
@@ -181,12 +191,14 @@ impl pipe::Sink for StreamTx {
}
fn eof(&mut self) -> io::Result<()> {
self.is_shut_down = true;
// Mark eof as pending but do not close the connection yet, it will
// be done in flush.
self.eof_pending = true;
Ok(())
}
async fn wait_writable(&mut self) -> io::Result<()> {
if self.is_shut_down {
if self.eof_pending {
return Err(io::Error::new(
ErrorKind::Other,
"Already shut down".to_string(),
@@ -197,7 +209,11 @@ impl pipe::Sink for StreamTx {
}
async fn flush(&mut self) -> io::Result<()> {
self.tx.flush().await
self.tx.flush().await?;
if self.eof_pending {
self.tx.shutdown().await?;
}
Ok(())
}
}

View File

@@ -90,6 +90,7 @@ impl Tunnel {
async fn listen_inner(&mut self) -> io::Result<()> {
loop {
log_id!(trace, self.id, "Tunnel waiting for request");
let request = match tokio::time::timeout(
self.context.settings.client_listener_timeout,
self.downstream.listen(),
@@ -100,9 +101,18 @@ impl Tunnel {
log_id!(debug, self.id, "Tunnel closed gracefully");
return Ok(());
}
Ok(Ok(Some(r))) => r,
Ok(Err(e)) => return Err(e),
Err(_) => return Err(io::Error::from(ErrorKind::TimedOut)),
Ok(Ok(Some(r))) => {
log_id!(trace, self.id, "Tunnel received request");
r
}
Ok(Err(e)) => {
log_id!(trace, self.id, "Tunnel listen error: {}", e);
return Err(e);
}
Err(_) => {
log_id!(trace, self.id, "Tunnel listen timeout");
return Err(io::Error::from(ErrorKind::TimedOut));
}
};
let context = self.context.clone();
@@ -121,6 +131,7 @@ impl Tunnel {
tokio::spawn(async move {
let request_id = request.id();
log_id!(trace, request_id, "Processing tunnel request");
let auth_info = request
.auth_info()
.map(|x| x.map(authentication::Source::into_owned));
@@ -162,9 +173,17 @@ impl Tunnel {
}
};
log_id!(
trace,
request_id,
"Authentication complete, promoting request"
);
match request.promote_to_next_state() {
Ok(None) => (), // skip forwarder authentication for health check requests
Ok(None) => {
log_id!(trace, request_id, "Health check request completed");
}
Ok(Some(PendingDemultiplexedRequest::TcpConnect(request))) => {
log_id!(trace, request_id, "Handling TCP connect request");
if let Err((request, message, e)) = Tunnel::on_tcp_connect_request(
context,
forwarder,
@@ -182,6 +201,7 @@ impl Tunnel {
}
}
Ok(Some(PendingDemultiplexedRequest::DatagramMultiplexer(request))) => {
log_id!(trace, request_id, "Handling datagram multiplexer request");
if let Err((request, message, e)) = Tunnel::on_datagram_mux_request(
context,
forwarder,
@@ -222,8 +242,12 @@ impl Tunnel {
),
> {
let request_id = request.id();
log_id!(trace, request_id, "TCP connect: extracting destination");
let destination = match request.destination() {
Ok(d) => d,
Ok(d) => {
log_id!(trace, request_id, "TCP connect: destination={:?}", d);
d
}
Err(e) => {
return Err((
Some(request),
@@ -250,6 +274,7 @@ impl Tunnel {
user_agent: request.user_agent(),
};
log_id!(trace, request_id, "TCP connect: connecting to peer");
let connector = forwarder.lock().unwrap().tcp_connector();
let (fwd_rx, fwd_tx) = match tokio::time::timeout(
context.settings.connection_establishment_timeout,
@@ -258,13 +283,32 @@ impl Tunnel {
.await
.unwrap_or(Err(ConnectionError::Timeout))
{
Ok(x) => x,
Ok(x) => {
log_id!(
trace,
request_id,
"TCP connect: peer connection established"
);
x
}
Err(e) => return Err((Some(request), "Connection to peer failed", e)),
};
log_id!(debug, request_id, "Successfully connected to {:?}", meta);
log_id!(
trace,
request_id,
"TCP connect: promoting downstream request"
);
let (dstr_rx, dstr_tx) = match request.promote_to_next_state() {
Ok(x) => x,
Ok(x) => {
log_id!(
trace,
request_id,
"TCP connect: downstream ready, starting pipe"
);
x
}
Err(e) => return Err((None, "Failed to complete request", ConnectionError::Io(e))),
};
@@ -274,15 +318,19 @@ impl Tunnel {
update_metrics,
);
log_id!(trace, request_id, "TCP connect: pipe exchange started");
match pipe
.exchange(context.settings.tcp_connections_timeout)
.await
{
Ok(_) => {
log_id!(trace, request_id, "Both ends closed gracefully");
log_id!(trace, request_id, "TCP connect: pipe closed gracefully");
Ok(())
}
Err(e) => Err((None, "Error on pipe", ConnectionError::Io(e))),
Err(e) => {
log_id!(trace, request_id, "TCP connect: pipe error: {}", e);
Err((None, "Error on pipe", ConnectionError::Io(e)))
}
}
}

View File

@@ -29,11 +29,22 @@ macro_rules! reverse_proxy_tests {
assert_eq!(body.as_ref(), b"how much watch?");
};
// Pin both tasks to avoid moving them
tokio::pin!(client_task);
tokio::pin!(proxy_task);
tokio::select! {
_ = run_endpoint(&endpoint_address, &proxy_address) => unreachable!(),
_ = proxy_task => unreachable!(),
_ = client_task => (),
_ = tokio::time::sleep(Duration::from_secs(10)) => panic!("Timed out"),
// Wait for client_task first; if proxy_task completes, continue waiting for client
_ = &mut client_task => (),
_ = &mut proxy_task => {
// Proxy completed (expected after handling request), now wait for client
tokio::select! {
_ = client_task => (),
_ = tokio::time::sleep(Duration::from_secs(5)) => panic!("Client timed out after proxy completed"),
}
},
}
}
)*