Skip to content

Commit c4b6a66

Browse files
committed
feat(network/onion): detect liveness via TcpStream close
instead of polling
1 parent b75a785 commit c4b6a66

3 files changed

Lines changed: 86 additions & 30 deletions

File tree

test/src/specs/tor/mod.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ mod tor_basic;
22
mod tor_connect;
33
mod tor_hash_password;
44
mod tor_reconnect;
5-
use ckb_async_runtime::Runtime;
5+
use ckb_async_runtime::{Runtime, new_global_runtime};
66
use ckb_logger::{error, info};
77
use std::{path::Path, process::Child};
88
use tempfile::{TempDir, tempdir};
@@ -52,9 +52,15 @@ impl TorServer {
5252
pub fn tor_wait_bootstrap_done(&self) {
5353
let tor_controller_url = format!("127.0.0.1:{}", self.control_port);
5454
let controller_password = self.controller_password.clone();
55-
Runtime::new().unwrap().block_on(async {
56-
let tor_controller =
57-
ckb_onion::TorController::new(tor_controller_url, controller_password, None).await;
55+
let (handle, _handle_stop_rx, _runtime) = new_global_runtime(None);
56+
handle.block_on(async {
57+
let tor_controller = ckb_onion::TorController::new(
58+
tor_controller_url,
59+
controller_password,
60+
None,
61+
handle.clone(),
62+
)
63+
.await;
5864
let mut tor_controller = tor_controller.unwrap();
5965
if let Err(err) = tor_controller.wait_tor_server_bootstrap_done().await {
6066
error!("wait tor server bootstrap done error: {:?}", err);

util/onion/src/onion_service.rs

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ impl OnionService {
104104
let tor_controller = self.config.tor_controller.to_string();
105105
let tor_password = self.config.tor_password.clone();
106106

107-
let mut tor_controller = TorController::new(tor_controller, tor_password, None).await?;
107+
let mut tor_controller =
108+
TorController::new(tor_controller, tor_password, None, self.handle.clone()).await?;
108109

109110
tor_controller.wait_tor_server_bootstrap_done().await?;
110111

@@ -128,22 +129,15 @@ impl OnionService {
128129
);
129130

130131
self.handle.spawn(async move {
131-
let mut ticker = tokio::time::interval(tokio::time::Duration::from_secs(3));
132-
loop {
133-
tokio::select! {
134-
_ = ticker.tick() => {
135-
let uptime = tor_controller.get_uptime().await;
136-
if let Err(err) = uptime {
137-
error!("Failed to get tor server uptime: {:?}", err);
138-
drop(tor_server_alive_tx);
139-
return;
140-
}
141-
}
142-
_ = stop_rx.cancelled() => {
143-
info!("OnionService received stop signal, exiting...");
144-
drop(tor_server_alive_tx);
145-
return;
146-
}
132+
tokio::select! {
133+
_ = tor_controller.wait_for_disconnect() => {
134+
error!("not alive");
135+
drop(tor_server_alive_tx);
136+
137+
}
138+
_ = stop_rx.cancelled() => {
139+
info!("OnionService received stop signal, exiting...");
140+
drop(tor_server_alive_tx);
147141
}
148142
}
149143
});

util/onion/src/tor_controller.rs

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
1+
use ckb_async_runtime::Handle;
12
use ckb_error::{Error, InternalErrorKind};
23
use ckb_logger::{debug, error, info, warn};
34
use futures::future::BoxFuture;
45
use std::borrow::Cow;
56
use std::net::SocketAddr;
67
use std::time::Duration;
78
use tokio::fs::File;
8-
use tokio::io::AsyncReadExt;
9+
use tokio::io::{duplex, AsyncRead, AsyncReadExt, AsyncWrite, DuplexStream};
910
use tokio::net::TcpStream;
11+
use tokio::sync::mpsc;
1012
use torut::control::{TorAuthData, TorAuthMethod, UnauthenticatedConn, COOKIE_LENGTH};
1113
use torut::{
1214
control::{AsyncEvent, AuthenticatedConn, ConnError},
@@ -15,12 +17,18 @@ use torut::{
1517

1618
use crate::TorEventHandlerFn;
1719

18-
type TorAuthenticatedConn =
19-
AuthenticatedConn<TcpStream, fn(AsyncEvent<'_>) -> BoxFuture<'static, Result<(), ConnError>>>;
20+
type TorAuthenticatedConn = AuthenticatedConn<
21+
DuplexStream,
22+
fn(AsyncEvent<'_>) -> BoxFuture<'static, Result<(), ConnError>>,
23+
>;
2024

2125
/// A controller for a Tor server.
2226
pub struct TorController {
2327
inner: TorAuthenticatedConn,
28+
/// Notified when the underlying TCP connection to the Tor control port is
29+
/// closed or encounters a fatal I/O error. Receiving a value indicates
30+
/// the connection is dead.
31+
_disconnect_rx: mpsc::UnboundedReceiver<()>,
2432
}
2533

2634
impl TorController {
@@ -30,8 +38,13 @@ impl TorController {
3038
tor_controller_url: String,
3139
tor_password: Option<String>,
3240
event_handler: Option<TorEventHandlerFn>,
41+
handle: Handle,
3342
) -> Result<Self, Error> {
34-
let s = TcpStream::connect(tor_controller_url.clone())
43+
let (client, server) = duplex(1024);
44+
45+
let (disconnect_tx, disconnect_rx) = mpsc::unbounded_channel();
46+
47+
let raw_stream = TcpStream::connect(tor_controller_url.clone())
3548
.await
3649
.map_err(|err| {
3750
InternalErrorKind::Other.other(format!(
@@ -40,15 +53,45 @@ impl TorController {
4053
))
4154
})?;
4255

43-
let mut utc: UnauthenticatedConn<TcpStream> = UnauthenticatedConn::new(s);
56+
handle.spawn(async move {
57+
let (mut tcp_read, mut tcp_write) = raw_stream.into_split();
58+
let (mut server_read, mut server_write) = tokio::io::split(server);
59+
60+
tokio::select! {
61+
// TCP -> Duplex
62+
res = tokio::io::copy(&mut tcp_read, &mut server_write) => {
63+
match res {
64+
Ok(n) => debug!("Tor TCP connection closed after {} bytes", n),
65+
Err(e) => error!("Tor TCP read error: {}", e),
66+
}
67+
_ = disconnect_tx.send(());
68+
}
69+
70+
// Duplex -> TCP
71+
res = tokio::io::copy(&mut server_read, &mut tcp_write) => {
72+
match res {
73+
Ok(n) => debug!("Tor Duplex closed after {} bytes", n),
74+
Err(e) => error!("Tor TCP write error: {}", e),
75+
}
76+
_ = disconnect_tx.send(());
77+
}
78+
}
79+
80+
debug!("Tor DuplexStream exited.");
81+
});
82+
83+
let mut utc: UnauthenticatedConn<_> = UnauthenticatedConn::new(client);
4484

4585
authenticate(tor_password, &mut utc).await?;
4686

4787
let mut ac = utc.into_authenticated().await;
4888

4989
ac.set_async_event_handler(event_handler);
5090

51-
Ok(TorController { inner: ac })
91+
Ok(TorController {
92+
inner: ac,
93+
_disconnect_rx: disconnect_rx,
94+
})
5295
}
5396

5497
/// get tor server's status
@@ -112,6 +155,16 @@ impl TorController {
112155
Ok(())
113156
}
114157

158+
/// Waits asynchronously until the underlying TCP connection to the Tor
159+
/// controller is severed (either cleanly closed or due to an I/O error).
160+
///
161+
/// Returns `Some(())` when the connection is lost, or `None` if the internal
162+
/// notification channel was closed unexpectedly (which should not occur during
163+
/// normal operation).
164+
pub async fn wait_for_disconnect(&mut self) -> Option<()> {
165+
self._disconnect_rx.recv().await
166+
}
167+
115168
/// Add a new v3 onion service to the Tor server.
116169
pub async fn add_onion_v3(
117170
&mut self,
@@ -125,10 +178,13 @@ impl TorController {
125178
}
126179

127180
/// Authenticates with the Tor controller using the given password or cookie.
128-
pub async fn authenticate(
181+
pub async fn authenticate<S>(
129182
tor_password: Option<String>,
130-
utc: &mut UnauthenticatedConn<TcpStream>,
131-
) -> Result<(), Error> {
183+
utc: &mut UnauthenticatedConn<S>,
184+
) -> Result<(), Error>
185+
where
186+
S: AsyncRead + AsyncWrite + Unpin,
187+
{
132188
let proto_info = utc.load_protocol_info().await.map_err(|err| {
133189
InternalErrorKind::Other.other(format!("Failed to load protocol info: {:?}", err))
134190
})?;

0 commit comments

Comments
 (0)