diff --git a/Cargo.lock b/Cargo.lock index 183fdff2e..c2dc86f16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2360,6 +2360,7 @@ dependencies = [ "fiber-wasm-db-common", "indicatif 0.18.4", "rusqlite", + "tempfile", "thiserror 1.0.69", "tracing", "wasm-bindgen", diff --git a/crates/fiber-bin/src/main.rs b/crates/fiber-bin/src/main.rs index 8ff0135b1..dd3175052 100644 --- a/crates/fiber-bin/src/main.rs +++ b/crates/fiber-bin/src/main.rs @@ -11,7 +11,9 @@ use fnn::ckb::{contracts::try_init_contracts_context, CkbChainActor}; use fnn::event_handler::forward_event_to_client; use fnn::fiber::{graph::NetworkGraph, network::init_chain_hash, network::NetworkActorMessage}; use fnn::rpc::server::start_rpc; +use fnn::store::actor::{StoreActor, StoreActorInitializationParameter}; use fnn::store::open_store; +use fnn::store::restore::restore; use fnn::tasks::{ cancel_tasks_and_wait_for_completion, new_tokio_cancellation_token, new_tokio_task_tracker, }; @@ -93,12 +95,31 @@ pub async fn main() -> Result<(), ExitMessage> { } } + if let Some(source_path) = &config.restore { + info!("Starting manual restore process from: {:?}", source_path); + + let parsed_fiber_config = config + .parsed_fiber() + .ok_or(ExitMessage("fiber config must be set".to_string()))?; + + let store_path = parsed_fiber_config.store_path(); + + restore(source_path, &store_path) + .map_err(|err| ExitMessage(format!("Failed to restore database: {}", err)))?; + + info!("Successfully restored database to {:?}.", store_path); + info!("All channels have been marked as 'Stale' for safety audit."); + + std::process::exit(0); + } + let parsed_fiber_config = config .parsed_fiber() .ok_or(ExitMessage("fiber config must be set".to_string()))?; // Derive store_path: prefer fiber config, fall back to base_dir/fiber/store let store_path = parsed_fiber_config.store_path(); + let raw_store = open_store(store_path).map_err(|err| ExitMessage(err.to_string()))?; if config.cch.is_some() || config.rpc.is_some() { @@ -131,7 +152,7 @@ async fn run_node( }); #[allow(unused_variables)] - let (network_actor, ckb_chain_actor, network_graph) = match config.fiber.clone() { + let (network_actor, ckb_chain_actor, network_graph, store_actor) = match config.fiber.clone() { Some(fiber_config) => { // TODO: this is not a super user friendly error message which has actionable information // for the user to fix the error and start the node. @@ -191,6 +212,27 @@ async fn run_node( info!("Starting fiber"); + let backup_path = fiber_config.base_dir().join("backups"); + let ckb_key_path = ckb_config.base_dir().join("key"); + let fiber_key_path = fiber_config.base_dir().join("sk"); + let store_actor = Actor::spawn_linked( + Some("store_actor".to_string()), + StoreActor { + _phantom: std::marker::PhantomData, + }, + StoreActorInitializationParameter { + store: store.clone(), + backup_path, + ckb_key_path, + fiber_key_path, + backup_interval_hours: 24, + }, + root_actor.get_cell(), + ) + .await + .map_err(|err| ExitMessage(format!("failed to start store actor: {}", err)))? + .0; + let chain_client = CkbRpcClient::new(&ckb_config); let network_actor: ActorRef = start_network( fiber_config.clone(), @@ -200,6 +242,7 @@ async fn run_node( new_tokio_task_tracker(), root_actor.get_cell(), store.clone(), + Some(store_actor.clone()), network_graph.clone(), default_shutdown_script, ) @@ -334,9 +377,10 @@ async fn run_node( Some(network_actor), Some(ckb_chain_actor), Some(network_graph), + Some(store_actor), ) } - None => (None, None, None), + None => (None, None, None, None), }; let cch_currency = config @@ -427,6 +471,7 @@ async fn run_node( network_actor, cch_actor, store, + store_actor, network_graph, root_actor.get_cell(), store_change_port, diff --git a/crates/fiber-json-types/src/channel.rs b/crates/fiber-json-types/src/channel.rs index 78455f79c..2b091a765 100644 --- a/crates/fiber-json-types/src/channel.rs +++ b/crates/fiber-json-types/src/channel.rs @@ -401,6 +401,9 @@ pub enum ChannelState { /// Both we and our counterparty consider the funding transaction confirmed and the channel is /// now operational. ChannelReady, + /// The channel state is potentially outdated (e.g., after a database restore). + /// We must perform a passive audit with the peer before resuming operations. + Stale, /// We've successfully negotiated a `closing_signed` dance. At this point, the `ChannelManager` ShuttingDown(#[schemars(schema_with = "schema_as_string")] ShuttingDownFlags), /// This channel is closed. diff --git a/crates/fiber-json-types/src/convert.rs b/crates/fiber-json-types/src/convert.rs index 4013c4878..322848cd0 100644 --- a/crates/fiber-json-types/src/convert.rs +++ b/crates/fiber-json-types/src/convert.rs @@ -121,6 +121,7 @@ impl From for JsonChannelState { JsonChannelState::AwaitingChannelReady(flags.bits().into()) } InternalChannelState::ChannelReady => JsonChannelState::ChannelReady, + InternalChannelState::Stale => JsonChannelState::Stale, InternalChannelState::ShuttingDown(flags) => { JsonChannelState::ShuttingDown(flags.bits().into()) } @@ -137,7 +138,8 @@ impl JsonChannelState { | JsonChannelState::CollaboratingFundingTx(_) | JsonChannelState::SigningCommitment(_) | JsonChannelState::AwaitingTxSignatures(_) - | JsonChannelState::AwaitingChannelReady(_) => true, + | JsonChannelState::AwaitingChannelReady(_) + | JsonChannelState::Stale => true, JsonChannelState::ChannelReady | JsonChannelState::ShuttingDown(_) => false, JsonChannelState::Closed(bits) => { // FUNDING_ABORTED or ABANDONED are "pending-failed" states diff --git a/crates/fiber-lib/Cargo.toml b/crates/fiber-lib/Cargo.toml index 236e140c1..6d52974ee 100644 --- a/crates/fiber-lib/Cargo.toml +++ b/crates/fiber-lib/Cargo.toml @@ -139,6 +139,7 @@ jsonrpsee = { version = "0.25.1", features = [ "macros", "http-client", ] } +tokio = { version = "1.x", features = ["test-util"] } [target.'cfg(target_arch = "wasm32")'.dev-dependencies] fiber-store = { path = "../fiber-store", features = ["browser-test"] } diff --git a/crates/fiber-lib/src/config.rs b/crates/fiber-lib/src/config.rs index 4703f3ed3..f12401d2b 100644 --- a/crates/fiber-lib/src/config.rs +++ b/crates/fiber-lib/src/config.rs @@ -25,6 +25,8 @@ pub struct Config { pub base_dir: PathBuf, /// When true, validate the database and exit without starting services. pub check_validate: bool, + /// Restorage path, restore database from a backup file and exit. + pub restore: Option, } impl Config { @@ -115,6 +117,10 @@ pub mod native { #[arg(long, default_value_t = false)] check_validate: bool, + /// Restore database from a backup file and exit + #[arg(long = "restore", value_name = "BACKUP_PATH")] + restore: Option, + /// config for fiber network #[command(flatten)] pub fiber: ::Opt, @@ -163,6 +169,7 @@ pub mod native { // Base directory for all things to be stored to disk let base_dir = args.base_dir.clone().unwrap_or(get_default_base_dir()); let check_validate = args.check_validate; + let restore = args.restore; // Get config file by // 1. Using the explicitly set command line argument `config` @@ -251,6 +258,7 @@ pub mod native { ckb, base_dir, check_validate, + restore, } } } @@ -318,6 +326,7 @@ mod wasm { ckb, base_dir: PathBuf::from_str(&database_prefix).unwrap(), check_validate: false, + restore: None, } } } diff --git a/crates/fiber-lib/src/errors.rs b/crates/fiber-lib/src/errors.rs index 8c8f2548f..012e435c5 100644 --- a/crates/fiber-lib/src/errors.rs +++ b/crates/fiber-lib/src/errors.rs @@ -75,6 +75,9 @@ impl From for Error { fn from(e: StoreError) -> Self { match e { StoreError::DBInternalError(msg) => Error::DBInternalError(msg), + StoreError::IOError(err) => Error::IO(err), + StoreError::RestoreError(msg) => Error::DBInternalError(msg), + StoreError::BackupError(msg) => Error::DBInternalError(msg), } } } diff --git a/crates/fiber-lib/src/fiber/channel.rs b/crates/fiber-lib/src/fiber/channel.rs index c3bbdd741..1b20022c4 100644 --- a/crates/fiber-lib/src/fiber/channel.rs +++ b/crates/fiber-lib/src/fiber/channel.rs @@ -12,6 +12,7 @@ use crate::fiber::fee::{check_open_channel_parameters, check_tlc_delta_with_epoc #[cfg(debug_assertions)] use crate::fiber::network::DebugEvent; use crate::fiber::types::{BroadcastMessageWithTimestamp, TxSignatures}; +use crate::store::actor::StoreActorMessage; use crate::time::{SystemTime, UNIX_EPOCH}; use crate::utils::actor::ActorHandleLogGuard; use crate::utils::payment::is_invoice_fulfilled; @@ -408,6 +409,7 @@ pub struct ChannelActor { remote_pubkey: Pubkey, network: ActorRef, store: S, + store_actor: Option>, } impl ChannelActor @@ -419,12 +421,14 @@ where remote_pubkey: Pubkey, network: ActorRef, store: S, + store_actor: Option>, ) -> Self { Self { local_pubkey, remote_pubkey, network, store, + store_actor, } } @@ -3354,14 +3358,16 @@ where remote_commitment_number: channel.get_remote_commitment_number(), }; - self.network - .send_message(NetworkActorMessage::new_command( - NetworkActorCommand::SendFiberMessage(FiberMessageWithTarget::new( - self.get_remote_pubkey(), - FiberMessage::reestablish_channel(reestablish_channel), - )), - )) - .expect(ASSUME_NETWORK_ACTOR_ALIVE); + if channel.state != ChannelState::Stale { + self.network + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::SendFiberMessage(FiberMessageWithTarget::new( + self.get_remote_pubkey(), + FiberMessage::reestablish_channel(reestablish_channel), + )), + )) + .expect(ASSUME_NETWORK_ACTOR_ALIVE); + } channel } @@ -3585,6 +3591,14 @@ where if self.should_persist_channel_state(state) { self.store.insert_channel_actor_state(state.clone()); + if state.needs_backup { + if let Some(ref store_actor) = self.store_actor { + store_actor + .cast(StoreActorMessage::RequestBackup) + .map_err(|e| e.to_string())?; + } + state.needs_backup = false; + } } else { debug!( "Skip persisting channel state during external funding pre-submission phase: {}", @@ -3932,6 +3946,11 @@ pub struct ChannelActorState { // signing key #[doc = "skip_store"] pub private_key: Option, + + // Indicates that the state has changed and a backup should be triggered + // at the end of the current message processing loop. + #[doc = "skip_store"] + pub needs_backup: bool, } impl std::ops::Deref for ChannelActorState { @@ -3968,6 +3987,7 @@ impl<'de> Deserialize<'de> for ChannelActorState { deferred_peer_tlc_updates: VecDeque::new(), ephemeral_config: Default::default(), private_key: None, + needs_backup: false, }) } } @@ -4210,6 +4230,13 @@ impl ChannelActorState { self.local_tlc_info.enabled } + pub fn is_risk_of_penalty(&self) -> bool { + matches!( + self.state, + ChannelState::ChannelReady | ChannelState::ShuttingDown(_) + ) + } + pub fn set_waiting_peer_response(&mut self) { self.waiting_peer_response = Some(now_timestamp_as_millis_u64()); } @@ -4748,6 +4775,7 @@ impl ChannelActorState { deferred_peer_tlc_updates: VecDeque::new(), ephemeral_config: Default::default(), private_key: Some(private_key), + needs_backup: true, }; if let Some(nonce) = remote_channel_announcement_nonce { state.update_remote_channel_announcement_nonce(&nonce); @@ -4839,6 +4867,7 @@ impl ChannelActorState { deferred_peer_tlc_updates: VecDeque::new(), ephemeral_config: Default::default(), private_key: Some(private_key), + needs_backup: true, }; state.log_ack_state("[ack] new_outbound_channel"); state @@ -4966,11 +4995,14 @@ impl ChannelActorState { } pub(crate) fn update_state(&mut self, new_state: ChannelState) { - debug!( - "Updating channel state from {:?} to {:?}", - &self.state, &new_state - ); - self.state = new_state; + if self.state != new_state { + debug!( + "Updating channel state from {:?} to {:?}", + &self.state, &new_state + ); + self.state = new_state; + self.needs_backup = true; + } } pub(crate) fn local_is_node1(&self) -> bool { @@ -7136,6 +7168,36 @@ impl ChannelActorState { debug_event!(network, "Reestablished channel in ChannelReady"); } } + ChannelState::Stale => { + let my_local = self.get_local_commitment_number(); + let my_remote = self.get_remote_commitment_number(); + let peer_local = reestablish_channel.local_commitment_number; + let peer_remote = reestablish_channel.remote_commitment_number; + + if peer_local.abs_diff(my_remote) > 1 || peer_remote.abs_diff(my_local) > 1 { + error!( + "Audit Failed for Stale channel: Local(L:{}, R:{}), Peer(L:{}, R:{})", + my_local, my_remote, peer_local, peer_remote + ); + return Err(ProcessingChannelError::InvalidParameter( + "reestablish channel message with invalid commitment numbers during audit" + .to_string(), + )); + } + + info!( + "Passive audit passed for channel {}. Resuming to Ready.", + self.get_id() + ); + self.update_state(ChannelState::ChannelReady); + + return Box::pin(self.handle_reestablish_channel_message( + myself, + reestablish_channel, + pending_commit_diff, + )) + .await; + } ChannelState::ShuttingDown(flags) => { // Resend the shutdown message to the peer if we have not received the peer's shutdown message. if !flags.contains(ShuttingDownFlags::THEIR_SHUTDOWN_SENT) { @@ -8124,6 +8186,7 @@ pub trait ChannelActorStateStore { .collect() } fn get_channel_state_by_outpoint(&self, id: &OutPoint) -> Option; + fn get_all_channel_states(&self) -> Vec; fn insert_payment_custom_records( &self, payment_hash: &Hash256, diff --git a/crates/fiber-lib/src/fiber/network.rs b/crates/fiber-lib/src/fiber/network.rs index d45a35eb0..1c52267d2 100644 --- a/crates/fiber-lib/src/fiber/network.rs +++ b/crates/fiber-lib/src/fiber/network.rs @@ -1,3 +1,4 @@ +use crate::store::actor::StoreActorMessage; use ckb_hash::blake2b_256; use ckb_sdk::rpc::ckb_indexer::{Order, ScriptType, SearchKey, SearchMode}; use ckb_types::core::tx_pool::TxStatus; @@ -785,6 +786,7 @@ pub struct NetworkActor { event_sender: mpsc::Sender, chain_actor: ActorRef, store: S, + store_actor: Option>, network_graph: Arc>>, chain_client: C, } @@ -808,6 +810,7 @@ where event_sender: mpsc::Sender, chain_actor: ActorRef, store: S, + store_actor: Option>, network_graph: Arc>>, chain_client: C, ) -> Self { @@ -815,6 +818,7 @@ where event_sender, chain_actor, store: store.clone(), + store_actor, network_graph, chain_client, } @@ -1414,6 +1418,11 @@ where flags.remove(CloseFlags::WAITING_ONCHAIN_SETTLEMENT); actor_state.state = ChannelState::Closed(flags); self.store.insert_channel_actor_state(actor_state); + if let Some(ref store_actor) = state.store_actor { + store_actor + .cast(StoreActorMessage::RequestBackup) + .map_err(|e| Error::DBInternalError(e.to_string()))?; + } info!("Channel {channel_id:?} on-chain settlement completed"); } } @@ -3564,6 +3573,7 @@ where pub struct NetworkActorState { store: S, state_to_be_persisted: PersistentNetworkActorState, + store_actor: Option>, // The name of the node to be announced to the network, may be empty. node_name: Option, announced_addrs: Vec, @@ -3990,7 +4000,13 @@ where &self.get_public_key(), &remote_pubkey, )), - ChannelActor::new(self.get_public_key(), remote_pubkey, network.clone(), store), + ChannelActor::new( + self.get_public_key(), + remote_pubkey, + network.clone(), + store, + self.store_actor.clone(), + ), ChannelInitializationParameter { operation: ChannelInitializationOperation::OpenChannel(OpenChannelParameter { funding_amount, @@ -4101,7 +4117,13 @@ where &self.get_public_key(), &remote_pubkey, )), - ChannelActor::new(self.get_public_key(), remote_pubkey, network.clone(), store), + ChannelActor::new( + self.get_public_key(), + remote_pubkey, + network.clone(), + store, + self.store_actor.clone(), + ), ChannelInitializationParameter { operation: ChannelInitializationOperation::OpenChannelWithExternalFunding( OpenChannelWithExternalFundingParameter { @@ -4187,7 +4209,13 @@ where &self.get_public_key(), &remote_pubkey, )), - ChannelActor::new(self.get_public_key(), remote_pubkey, network.clone(), store), + ChannelActor::new( + self.get_public_key(), + remote_pubkey, + network.clone(), + store, + self.store_actor.clone(), + ), ChannelInitializationParameter { operation: ChannelInitializationOperation::AcceptChannel(AcceptChannelParameter { funding_amount, @@ -4567,6 +4595,11 @@ where ShuttingDownFlags::WAITING_COMMITMENT_CONFIRMATION, )); self.store.insert_channel_actor_state(state); + if let Some(ref store_actor) = self.store_actor { + store_actor + .cast(StoreActorMessage::RequestBackup) + .map_err(|e| Error::DBInternalError(e.to_string()))?; + } let _ = rpc_reply.send(Ok(())); Ok(()) @@ -4667,6 +4700,7 @@ where remote_pubkey, self.network.clone(), self.store.clone(), + self.store_actor.clone(), ), ChannelInitializationParameter { operation: ChannelInitializationOperation::ReestablishChannel(channel_id), @@ -5567,6 +5601,7 @@ where let mut state = NetworkActorState { store: self.store.clone(), state_to_be_persisted, + store_actor: self.store_actor.clone(), node_name: config.announced_node_name, announced_addrs, auto_announce: config.auto_announce_node(), @@ -5919,6 +5954,7 @@ pub async fn start_network< tracker: TaskTracker, root_actor: ActorCell, store: S, + store_actor: Option>, network_graph: Arc>>, default_shutdown_script: Script, ) -> ActorRef { @@ -5930,6 +5966,7 @@ pub async fn start_network< event_sender, chain_actor, store, + store_actor, network_graph, chain_client, ), diff --git a/crates/fiber-lib/src/fiber/tests/channel.rs b/crates/fiber-lib/src/fiber/tests/channel.rs index 1c3b7715e..9e1f1aabb 100644 --- a/crates/fiber-lib/src/fiber/tests/channel.rs +++ b/crates/fiber-lib/src/fiber/tests/channel.rs @@ -45,6 +45,7 @@ use fiber_types::{ PaymentStatus, Privkey, RemoveTlcFulfill, RemoveTlcReason, ShuttingDownFlags, SigningCommitmentFlags, TLCId, TlcErrorCode, TlcStatus, NO_SHARED_SECRET, }; + use fiber_types::{CloseFlags, FeatureVector}; use musig2::secp::Point; use musig2::KeyAggContext; @@ -121,13 +122,14 @@ fn test_channel_state_bincode_compatibility() { &[4, 0, 0, 0, 0, 0, 0, 0], ); assert_channel_state_encoding(ChannelState::ChannelReady, &[5, 0, 0, 0]); + assert_channel_state_encoding(ChannelState::Stale, &[6, 0, 0, 0]); assert_channel_state_encoding( ChannelState::ShuttingDown(ShuttingDownFlags::empty()), - &[6, 0, 0, 0, 0, 0, 0, 0], + &[7, 0, 0, 0, 0, 0, 0, 0], ); assert_channel_state_encoding( ChannelState::Closed(CloseFlags::empty()), - &[7, 0, 0, 0, 0, 0, 0, 0], + &[8, 0, 0, 0, 0, 0, 0, 0], ); assert_channel_state_encoding( ChannelState::NegotiatingFunding(NegotiatingFundingFlags::AWAITING_EXTERNAL_FUNDING), @@ -8068,3 +8070,134 @@ async fn test_external_funding_signed_submission_not_aborted_by_stale_timeout() "channel should not be aborted by stale external funding timeout" ); } + +#[tokio::test] +async fn test_channel_stale_passive_wait_no_proactive_send() { + init_tracing(); + + let (node_a, node_b, channel_id) = + create_nodes_with_established_channel(9900000000, 9900000000, true).await; + + let mut state_a = node_a.get_channel_actor_state(channel_id); + state_a.state = ChannelState::Stale; + state_a.reestablishing = true; + node_a.store.insert_channel_actor_state(state_a); + let state_b = node_a.get_channel_actor_state(channel_id); + let peer_commitment_number = state_b.get_local_commitment_number(); + + node_a + .network_actor + .send_message(NetworkActorMessage::Command( + NetworkActorCommand::SendFiberMessage(FiberMessageWithTarget::new( + node_b.pubkey, + FiberMessage::reestablish_channel(ReestablishChannel { + channel_id, + local_commitment_number: peer_commitment_number, + remote_commitment_number: peer_commitment_number, + }), + )), + )) + .expect("send reestablish message"); + + let current_state = node_a.get_channel_actor_state(channel_id); + assert_eq!(current_state.state, ChannelState::Stale); +} + +#[tokio::test] +async fn test_channel_stale_audit_success_resumes_ready() { + init_tracing(); + + let (node_a, node_b, channel_id) = + create_nodes_with_established_channel(9900000000, 9900000000, true).await; + + let mut state_a = node_a.get_channel_actor_state(channel_id); + let original_cn = state_a.commitment_numbers.local; + state_a.state = ChannelState::Stale; + state_a.reestablishing = true; + node_a.store.insert_channel_actor_state(state_a); + + node_a + .network_actor + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::DisconnectPeer( + node_b.pubkey, + PeerDisconnectReason::Requested, + None, + ), + )) + .expect("disconnect sent"); + node_a + .network_actor + .send_message(NetworkActorMessage::Command( + NetworkActorCommand::SendFiberMessage(FiberMessageWithTarget::new( + node_b.pubkey, + FiberMessage::reestablish_channel(ReestablishChannel { + channel_id, + local_commitment_number: original_cn, + remote_commitment_number: original_cn, + }), + )), + )) + .expect("send reestablish message"); + + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + + let current_state = node_a.get_channel_actor_state(channel_id); + assert_eq!( + current_state.state, + ChannelState::ChannelReady, + "Should resume to ChannelReady" + ); + assert!( + !current_state.reestablishing, + "Reestablishing should be finished" + ); +} + +#[tokio::test] +async fn test_channel_stale_audit_failure_blocks_channel() { + init_tracing(); + + let (mut node_a, node_b, channel_id) = + create_nodes_with_established_channel(9900000000, 9900000000, true).await; + + let state_a = node_a.get_channel_actor_state(channel_id); + let mut state_b = node_b.get_channel_actor_state(channel_id); + let original_cn = state_a.commitment_numbers.local; + + let mut state_a_stale = state_a.clone(); + state_a_stale.state = ChannelState::Stale; + state_a_stale.reestablishing = true; + state_b.commitment_numbers.remote = original_cn + 5; + + node_a.store.insert_channel_actor_state(state_a_stale); + node_b.store.insert_channel_actor_state(state_b); + + node_a.restart().await; + + node_b + .network_actor + .send_message(NetworkActorMessage::Command( + NetworkActorCommand::SendFiberMessage(FiberMessageWithTarget::new( + node_a.pubkey, + FiberMessage::reestablish_channel(ReestablishChannel { + channel_id, + local_commitment_number: original_cn + 5, + remote_commitment_number: original_cn, + }), + )), + )) + .expect("send reestablish message"); + + let current_state = node_a.get_channel_actor_state(channel_id); + + assert_eq!( + current_state.state, + ChannelState::Stale, + "Audit should fail and stay in Stale" + ); + assert!( + current_state.reestablishing, + "Should still be in reestablishing phase" + ); +} diff --git a/crates/fiber-lib/src/fiber/tests/rpc.rs b/crates/fiber-lib/src/fiber/tests/rpc.rs index c9180de8a..6276c58f5 100644 --- a/crates/fiber-lib/src/fiber/tests/rpc.rs +++ b/crates/fiber-lib/src/fiber/tests/rpc.rs @@ -7,7 +7,6 @@ use crate::gen_rand_sha256_hash; use crate::invoice::CkbInvoice; use crate::rpc::channel::{ChannelState, ShutdownChannelParams}; use crate::rpc::config::RpcConfig; -use crate::rpc::info::NodeInfoResult; use crate::rpc::invoice::Attribute; use crate::tests::*; use crate::{ @@ -23,6 +22,7 @@ use crate::{ use biscuit_auth::macros::biscuit; use biscuit_auth::{KeyPair, PrivateKey}; use ckb_types::packed::Script; +use fiber_json_types::info::NodeInfoResult; use std::str::FromStr; fn rpc_config_with_auth() -> (RpcConfig, KeyPair) { diff --git a/crates/fiber-lib/src/fiber/tests/settle_tlc_set_command_tests.rs b/crates/fiber-lib/src/fiber/tests/settle_tlc_set_command_tests.rs index 80a801e7e..b3a895097 100644 --- a/crates/fiber-lib/src/fiber/tests/settle_tlc_set_command_tests.rs +++ b/crates/fiber-lib/src/fiber/tests/settle_tlc_set_command_tests.rs @@ -146,6 +146,10 @@ impl ChannelActorStateStore for MockStore { None } + fn get_all_channel_states(&self) -> Vec { + vec![] + } + fn insert_payment_custom_records( &self, _payment_hash: &Hash256, @@ -311,6 +315,7 @@ fn create_test_channel_state_with_tlc( deferred_peer_tlc_updates: std::collections::VecDeque::new(), ephemeral_config: Default::default(), private_key: None, + needs_backup: false, } } diff --git a/crates/fiber-lib/src/rpc/README.md b/crates/fiber-lib/src/rpc/README.md index f4f569d91..2eaa752d1 100644 --- a/crates/fiber-lib/src/rpc/README.md +++ b/crates/fiber-lib/src/rpc/README.md @@ -39,6 +39,7 @@ You may refer to the e2e test cases in the `tests/bruno/e2e` directory for examp * [Method `graph_channels`](#graph-graph_channels) * [Module Info](#module-info) * [Method `node_info`](#info-node_info) + * [Method `backup_now`](#info-backup_now) * [Module Invoice](#module-invoice) * [Method `new_invoice`](#invoice-new_invoice) * [Method `parse_invoice`](#invoice-parse_invoice) @@ -641,6 +642,22 @@ Get the node information. + +#### Method `backup_now` + +Backup the node information. + +##### Params +* None + +##### Returns + +* None + +--- + + + ### Module `Invoice` RPC module for invoice management. @@ -1339,6 +1356,8 @@ The state of a channel. funding transaction to confirm. * `ChannelReady` - Both we and our counterparty consider the funding transaction confirmed and the channel is now operational. +* `Stale` - The channel state is potentially outdated (e.g., after a database restore). + We must perform a passive audit with the peer before resuming operations. * `ShuttingDown` - `ShuttingDownFlags`, We've successfully negotiated a `closing_signed` dance. At this point, the `ChannelManager` * `Closed` - `CloseFlags`, This channel is closed. --- diff --git a/crates/fiber-lib/src/rpc/info.rs b/crates/fiber-lib/src/rpc/info.rs index 573934bb8..c472402dd 100644 --- a/crates/fiber-lib/src/rpc/info.rs +++ b/crates/fiber-lib/src/rpc/info.rs @@ -6,18 +6,27 @@ use ckb_jsonrpc_types::Script; use jsonrpsee::proc_macros::rpc; use jsonrpsee::types::ErrorObjectOwned; +pub use fiber_json_types::NodeInfoResult; use ractor::{call, ActorRef}; -pub use fiber_json_types::NodeInfoResult; +use crate::store::actor::StoreActorMessage; +#[cfg(not(target_arch = "wasm32"))] +use std::path::Path; pub struct InfoRpcServerImpl { actor: ActorRef, + #[allow(unused)] + store_actor: Option>, default_funding_lock_script: Script, } impl InfoRpcServerImpl { #[allow(unused_variables)] - pub fn new(actor: ActorRef, config: CkbConfig) -> Self { + pub fn new( + actor: ActorRef, + store_actor: Option>, + config: CkbConfig, + ) -> Self { #[cfg(not(test))] let default_funding_lock_script = config .get_default_funding_lock_script() @@ -31,6 +40,7 @@ impl InfoRpcServerImpl { InfoRpcServerImpl { actor, + store_actor, default_funding_lock_script, } } @@ -43,6 +53,10 @@ trait InfoRpc { /// Get the node information. #[method(name = "node_info")] async fn node_info(&self) -> Result; + + /// Backup the node information. + #[method(name = "backup_now")] + async fn backup_now(&self, target_path: &Path) -> Result<(), ErrorObjectOwned>; } #[async_trait::async_trait] @@ -51,6 +65,10 @@ impl InfoRpcServer for InfoRpcServerImpl { async fn node_info(&self) -> Result { self.node_info().await } + + async fn backup_now(&self, target_path: &Path) -> Result<(), ErrorObjectOwned> { + self.backup_now(target_path).await + } } impl InfoRpcServerImpl { pub async fn node_info(&self) -> Result { @@ -81,4 +99,13 @@ impl InfoRpcServerImpl { udt_cfg_infos: response.udt_cfg_infos.into(), }) } + + #[cfg(not(target_arch = "wasm32"))] + pub async fn backup_now(&self, target_path: &Path) -> Result<(), ErrorObjectOwned> { + if let Some(ref store_actor) = self.store_actor { + handle_actor_call!(store_actor, StoreActorMessage::ForceBackup, target_path) + } else { + log_and_error!(target_path, format!("Backup service is not initialized")) + } + } } diff --git a/crates/fiber-lib/src/rpc/mod.rs b/crates/fiber-lib/src/rpc/mod.rs index 9cdb19c62..c8acebd86 100644 --- a/crates/fiber-lib/src/rpc/mod.rs +++ b/crates/fiber-lib/src/rpc/mod.rs @@ -77,6 +77,7 @@ pub mod server { use tracing::debug; use super::biscuit::BiscuitAuth; + use crate::store::actor::StoreActorMessage; use crate::store::store_impl::StoreChange; use ractor::{ActorCell, OutputPort}; @@ -273,6 +274,7 @@ pub mod server { network_actor: Option>, cch_actor: Option>, store: S, + store_actor: Option>, network_graph: Option>>>, supervisor: ActorCell, store_change_port: Option>>, @@ -299,8 +301,12 @@ pub mod server { if config.is_module_enabled("invoice") { modules .merge( - InvoiceRpcServerImpl::new(store.clone(), network_actor.clone(), fiber_config) - .into_rpc(), + InvoiceRpcServerImpl::new( + store.clone(), + network_actor.clone(), + fiber_config.clone(), + ) + .into_rpc(), ) .unwrap(); } @@ -317,6 +323,7 @@ pub mod server { .merge( InfoRpcServerImpl::new( network_actor.clone(), + store_actor, ckb_config.clone().expect("ckb config should be set"), ) .into_rpc(), diff --git a/crates/fiber-lib/src/store/.schema.json b/crates/fiber-lib/src/store/.schema.json index a9a192741..c9563b68d 100644 --- a/crates/fiber-lib/src/store/.schema.json +++ b/crates/fiber-lib/src/store/.schema.json @@ -16,7 +16,7 @@ "ChannelData": "3bd20dae5a7371be757e9e78719e3a93878b170d3b90f2781d4aa540520ee407", "ChannelOpenRecord": "1b33dac86d37a7b0b53cb42f22c555d528c91da9bc706f9d65c99d0f58528d58", "ChannelOpeningStatus": "1588a026af0393659412c3ae76b00a63170a1c51bcb162d57e8e33adb0496a40", - "ChannelState": "109ed23959f282262164e378f45f45cac8a51e7a2abc3fc709168b427034c246", + "ChannelState": "4d72c4564f7b09b47bed3cc1c28ccd0a43a1667ad02c71d85d55bbd0fe27c048", "ChannelTlcInfo": "85df5da63c9a909a28fe54fa271043c942ca15a249a42c9a21a3c8b2ade456b6", "ChannelUpdate": "7ccc47e6a94391c5ac9b24e585563dca87b12cd8107a59ed46972677e4b918bb", "CkbInvoice": "9a5420d5a777497202a54ca3993db4b81093fe25e51730b380808a0ca78a8c11", diff --git a/crates/fiber-lib/src/store/actor.rs b/crates/fiber-lib/src/store/actor.rs new file mode 100644 index 000000000..6ea41a857 --- /dev/null +++ b/crates/fiber-lib/src/store/actor.rs @@ -0,0 +1,189 @@ +use fiber_store::StorageBackend; +use ractor::{Actor, ActorProcessingErr, ActorRef}; +#[cfg(not(any(target_arch = "wasm32", test)))] +use std::path::Path; +use std::path::PathBuf; +use std::time::{Duration, Instant}; +use tracing::{debug, error, info}; + +pub struct StoreActorInitializationParameter { + pub store: S, + pub backup_path: PathBuf, + pub ckb_key_path: PathBuf, + pub fiber_key_path: PathBuf, + pub backup_interval_hours: u64, +} + +pub enum StoreActorMessage { + /// Backup requests triggered when channel status changes + RequestBackup, + /// Scheduler tick to check if the deadline is reached + PeriodicCheck, + /// Manual trigger for rpc + ForceBackup(ractor::RpcReplyPort>), +} + +pub struct StoreActorState { + pub store: S, + pub backup_path: PathBuf, + pub ckb_key_path: PathBuf, + pub fiber_key_path: PathBuf, + pub backup_interval_hours: u64, + /// The specific moment the next backup is scheduled to run + pub next_backup_time: Instant, +} + +pub struct StoreActor { + pub _phantom: std::marker::PhantomData, +} + +impl StoreActor { + pub fn new() -> Self { + Self { + _phantom: std::marker::PhantomData, + } + } +} + +impl Default for StoreActor { + fn default() -> Self { + Self::new() + } +} + +#[async_trait::async_trait] +impl Actor for StoreActor +where + S: StorageBackend + Send + Sync + 'static, +{ + type Msg = StoreActorMessage; + type State = StoreActorState; + type Arguments = StoreActorInitializationParameter; + + async fn pre_start( + &self, + myself: ActorRef, + args: Self::Arguments, + ) -> Result { + // Run a high-frequency tick (e.g., 1s) to act as the scheduler engine + myself.send_interval(Duration::from_secs(1), || StoreActorMessage::PeriodicCheck); + + let first_deadline = Instant::now() - Duration::from_secs(61); + + Ok(StoreActorState { + store: args.store, + backup_path: args.backup_path, + ckb_key_path: args.ckb_key_path, + fiber_key_path: args.fiber_key_path, + backup_interval_hours: args.backup_interval_hours, + next_backup_time: first_deadline, + }) + } + + async fn handle( + &self, + _myself: ActorRef, + message: Self::Msg, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + let now = Instant::now(); + match message { + StoreActorMessage::RequestBackup => { + // Deadline Pulling Logic: + // If the currently scheduled backup is more than 60s away, + // pull the deadline closer to (now + 60s). + // If now time >= scheduled backup, + // then backup now. + if now >= state.next_backup_time { + state.next_backup_time = now; + debug!("StoreActor: Deadline reached, scheduling immediate backup."); + } else if state.next_backup_time.saturating_duration_since(now) + > Duration::from_secs(60) + { + state.next_backup_time = now + Duration::from_secs(60); + debug!("StoreActor: High-priority change detected. Backup scheduled for 60s from now."); + } + } + StoreActorMessage::PeriodicCheck => { + // Scheduler Engine: + // Execute backup if the deadline has been reached or passed. + if now >= state.next_backup_time { + if let Err(e) = self.do_backup(state).await { + error!("StoreActor: Scheduled backup failed but continuing: {}", e); + } + // After success, reset the deadline to the routine interval (e.g., 24h) + state.next_backup_time = + now + Duration::from_secs(state.backup_interval_hours * 3600); + } + } + StoreActorMessage::ForceBackup(reply) => { + // Backup immediately + let result = self.do_backup(state).await; + state.next_backup_time = + Instant::now() + Duration::from_secs(state.backup_interval_hours * 3600); + let _ = reply.send(result.map_err(|e| e.to_string())); + } + } + Ok(()) + } +} + +impl StoreActor +where + S: StorageBackend + Send + Sync + 'static, +{ + async fn do_backup(&self, state: &mut StoreActorState) -> Result<(), String> { + info!("StoreActor: Starting backup to {:?}", state.backup_path); + #[cfg(not(any(target_arch = "wasm32", test)))] + perform_key_backup( + &state.backup_path, + &state.ckb_key_path, + &state.fiber_key_path, + )?; + match state.store.backup(&state.backup_path) { + Ok(_) => { + info!( + "StoreActor: Backup successful. Next routine backup in {} hours.", + state.backup_interval_hours + ); + Ok(()) + } + Err(e) => { + error!("StoreActor: Backup failed: {:?}", e); + Err(format!("Backup failed: {e}")) + // Note: We don't reset the deadline here to avoid an immediate retry loop + // if the failure is persistent (e.g., disk full). + // It will retry in the next routine cycle or on next RequestBackup. + } + } + } +} + +#[cfg(not(any(target_arch = "wasm32", test)))] +/// Backup the node key files to a specified path. +fn perform_key_backup( + target_dir: &Path, + ckb_key_path: &Path, + fiber_key_path: &Path, +) -> Result<(), String> { + if let Err(e) = std::fs::create_dir_all(target_dir) { + return Err(format!( + "Failed to create backup dir {:?}: {}", + target_dir, e + )); + } + let keys_to_copy = [(ckb_key_path, "key"), (fiber_key_path, "sk")]; + + for (src_file, dest_name) in keys_to_copy { + if src_file.exists() { + let dest_file = target_dir.join(dest_name); + if let Err(e) = std::fs::copy(src_file, &dest_file) { + return Err(format!("Failed to copy key file {:?}: {}", src_file, e)); + } + tracing::info!("Successfully backed up key: {}", dest_name); + } else { + tracing::warn!("Key file not found at {:?}, skipping", src_file); + } + } + Ok(()) +} diff --git a/crates/fiber-lib/src/store/mod.rs b/crates/fiber-lib/src/store/mod.rs index b613940aa..77897d978 100644 --- a/crates/fiber-lib/src/store/mod.rs +++ b/crates/fiber-lib/src/store/mod.rs @@ -1,3 +1,5 @@ +pub mod actor; +pub mod restore; #[cfg(any(test, feature = "sample"))] pub mod sample; pub mod store_impl; diff --git a/crates/fiber-lib/src/store/restore.rs b/crates/fiber-lib/src/store/restore.rs new file mode 100644 index 000000000..e4a5701e4 --- /dev/null +++ b/crates/fiber-lib/src/store/restore.rs @@ -0,0 +1,39 @@ +use crate::errors::{Error, Result}; +use crate::fiber::channel::ChannelActorStateStore; +use crate::store::open_store; +use fiber_store::StorageBackend; +use fiber_types::ChannelState; +use std::path::Path; +use tracing::info; + +pub fn restore(restore_path: &Path, base_path: &Path) -> Result<()> { + let store = open_store(base_path).map_err(Error::DBInternalError)?; + #[cfg(not(target_arch = "wasm32"))] + restore_node_keys(restore_path, base_path)?; + store.restore(restore_path, base_path)?; + + info!("Scanning stale channels."); + for mut channel in store.get_all_channel_states() { + if channel.is_risk_of_penalty() { + channel.update_state(ChannelState::Stale); + } + } + Ok(()) +} + +#[cfg(not(target_arch = "wasm32"))] +pub fn restore_node_keys(restore_path: &Path, base_dir: &Path) -> Result<()> { + let keys = [("key", "key"), ("sk", "sk")]; + + for (src_name, dest_name) in keys { + let src = restore_path.join(src_name); + if src.exists() { + let dest = base_dir.join(dest_name); + std::fs::copy(&src, &dest).map_err(|e| { + Error::DBInternalError(format!("Failed to restore key {}: {}", src_name, e)) + })?; + tracing::info!("Restored key file: {}", dest_name); + } + } + Ok(()) +} diff --git a/crates/fiber-lib/src/store/sample/sample_channel.rs b/crates/fiber-lib/src/store/sample/sample_channel.rs index 519ecb571..e77ae414f 100644 --- a/crates/fiber-lib/src/store/sample/sample_channel.rs +++ b/crates/fiber-lib/src/store/sample/sample_channel.rs @@ -37,6 +37,7 @@ impl ChannelActorState { deferred_peer_tlc_updates: VecDeque::new(), ephemeral_config: Default::default(), private_key: None, + needs_backup: false, } } @@ -59,6 +60,7 @@ impl ChannelActorState { deferred_peer_tlc_updates: VecDeque::new(), ephemeral_config: Default::default(), private_key: None, + needs_backup: false, } } } diff --git a/crates/fiber-lib/src/store/store_impl/mod.rs b/crates/fiber-lib/src/store/store_impl/mod.rs index 097dc8252..80bac47bb 100644 --- a/crates/fiber-lib/src/store/store_impl/mod.rs +++ b/crates/fiber-lib/src/store/store_impl/mod.rs @@ -4,6 +4,7 @@ use ckb_types::packed::Script; use crate::store::store_trait::{FiberStore, PrefixIterOptions}; use fiber_store::backend::{BatchWriter, StorageBackend, TakeWhileFn}; use fiber_store::iterator::{IteratorDirection, KVPair}; +use fiber_store::StoreError; use std::path::Path; use std::sync::Arc; @@ -106,6 +107,14 @@ impl StorageBackend for Store { self.inner .collect_iterator(start, direction, take_while_fn, limit) } + + fn backup(&self, path: &Path) -> Result<(), StoreError> { + self.inner.backup(path) + } + + fn restore(&self, restore_path: &Path, db_path: &Path) -> Result<(), StoreError> { + self.inner.restore(restore_path, db_path) + } } #[derive(Copy, Clone)] @@ -585,6 +594,14 @@ impl ChannelActorStateStore for Store { .and_then(|channel_id: Hash256| self.get_channel_actor_state(&channel_id)) } + fn get_all_channel_states(&self) -> Vec { + let prefix = &[CHANNEL_ACTOR_STATE_PREFIX]; + self.collect_by_prefix(prefix) + .into_iter() + .map(|kv| deserialize_from(kv.value.as_ref(), "ChannelActorState")) + .collect() + } + fn insert_payment_custom_records( &self, payment_hash: &Hash256, diff --git a/crates/fiber-lib/src/store/tests/store.rs b/crates/fiber-lib/src/store/tests/store.rs index eb0491cae..b21cc0705 100644 --- a/crates/fiber-lib/src/store/tests/store.rs +++ b/crates/fiber-lib/src/store/tests/store.rs @@ -21,6 +21,7 @@ use crate::gen_rand_fiber_public_key; use crate::gen_rand_sha256_hash; use crate::invoice::*; use crate::now_timestamp_as_millis_u64; +#[cfg(not(target_arch = "wasm32"))] use crate::store::open_store; #[cfg(not(target_arch = "wasm32"))] use crate::store::sample::StoreSample; @@ -685,6 +686,7 @@ fn test_channel_actor_state_store() { deferred_peer_tlc_updates: Default::default(), ephemeral_config: Default::default(), private_key: None, + needs_backup: false, }; let bincode_encoded = bincode::serialize(&state).unwrap(); @@ -820,6 +822,7 @@ fn test_serde_channel_actor_state_ciborium() { deferred_peer_tlc_updates: Default::default(), ephemeral_config: Default::default(), private_key: None, + needs_backup: false, }; let mut serialized = Vec::new(); @@ -1281,3 +1284,141 @@ fn test_store_get_broadcast_messages_reverse_excludes_cursor() { .get_broadcast_messages_reverse(Some(&first_cursor), 1) .is_empty()); } + +#[cfg(all(test, not(target_arch = "wasm32")))] +mod store_actor_tests { + use crate::actors::RootActor; + use crate::store::actor::{StoreActor, StoreActorInitializationParameter, StoreActorMessage}; + use crate::tasks::{new_tokio_cancellation_token, new_tokio_task_tracker}; + use fiber_store::backend::TakeWhileFn; + use fiber_store::{IteratorDirection, KVPair, StorageBackend, StoreError}; + use std::path::Path; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering::SeqCst; + use std::sync::Arc; + use std::time::Duration; + use tempfile::tempdir; + use tokio::time::{advance, sleep}; + + struct MockStore { + backup_count: Arc, + } + + impl StorageBackend for MockStore { + type Batch = ::Batch; + + fn get>(&self, _key: K) -> Option> { + todo!() + } + fn put, V: AsRef<[u8]>>(&self, _key: K, _value: V) { + todo!() + } + fn delete>(&self, _key: K) { + todo!() + } + fn batch(&self) -> Self::Batch { + todo!() + } + fn collect_iterator( + &self, + _: Vec, + _: IteratorDirection, + _: TakeWhileFn, + _: usize, + ) -> Vec { + todo!() + } + fn restore(&self, _: &Path, _: &Path) -> Result<(), StoreError> { + todo!() + } + + fn backup(&self, _path: &std::path::Path) -> Result<(), StoreError> { + self.backup_count.fetch_add(1, SeqCst); + Ok(()) + } + } + + #[tokio::test(start_paused = true)] + async fn test_store_actor_buffered_backup() { + let backup_count = Arc::new(AtomicUsize::new(0)); + let mock_store = MockStore { + backup_count: Arc::clone(&backup_count), + }; + let temp_dir = tempdir().unwrap(); + + let (tracker, token) = (new_tokio_task_tracker(), new_tokio_cancellation_token()); + let root_actor = RootActor::start(tracker, token).await; + + let args = StoreActorInitializationParameter { + store: mock_store, + backup_path: temp_dir.path().to_path_buf(), + ckb_key_path: temp_dir.path().to_path_buf(), + fiber_key_path: temp_dir.path().to_path_buf(), + backup_interval_hours: 24, + }; + + let (store_actor, _handle) = + ractor::Actor::spawn_linked(None, StoreActor::new(), args, root_actor.get_cell()) + .await + .unwrap(); + + // First request should trigger the backup immediately + store_actor.cast(StoreActorMessage::RequestBackup).unwrap(); + tokio::task::yield_now().await; + sleep(Duration::from_millis(100)).await; + + advance(Duration::from_secs(61)).await; + sleep(Duration::from_millis(100)).await; + + assert_eq!( + backup_count.load(SeqCst), + 1, + "The backup request must be executed." + ); + } + + #[tokio::test(start_paused = true)] + async fn test_store_actor_backup_throttling() { + let backup_count = Arc::new(AtomicUsize::new(0)); + let mock_store = MockStore { + backup_count: Arc::clone(&backup_count), + }; + let temp_dir = tempdir().unwrap(); + let (tracker, token) = (new_tokio_task_tracker(), new_tokio_cancellation_token()); + let root_actor = RootActor::start(tracker, token).await; + + let (store_actor, _) = ractor::Actor::spawn_linked( + None, + StoreActor::new(), + StoreActorInitializationParameter { + store: mock_store, + backup_path: temp_dir.path().to_path_buf(), + ckb_key_path: temp_dir.path().to_path_buf(), + fiber_key_path: temp_dir.path().to_path_buf(), + backup_interval_hours: 24, + }, + root_actor.get_cell(), + ) + .await + .unwrap(); + + // Trigger initial backup + store_actor.cast(StoreActorMessage::RequestBackup).unwrap(); + tokio::task::yield_now().await; + + // Send multiple requests within the 60-second cooldown period + for _ in 0..20 { + store_actor.cast(StoreActorMessage::RequestBackup).unwrap(); + } + tokio::task::yield_now().await; + + advance(Duration::from_secs(61)).await; + sleep(Duration::from_millis(100)).await; + + assert_eq!( + backup_count.load(SeqCst), + 1, + "Requests during cooldown should be throttled and not trigger new backups." + ); + } +} diff --git a/crates/fiber-lib/src/tests/test_utils.rs b/crates/fiber-lib/src/tests/test_utils.rs index e66a6c58d..a5c3c7fed 100644 --- a/crates/fiber-lib/src/tests/test_utils.rs +++ b/crates/fiber-lib/src/tests/test_utils.rs @@ -1576,6 +1576,7 @@ impl NetworkNode { event_sender, chain_actor.clone(), store.clone(), + None, network_graph.clone(), chain_client.clone(), ), @@ -1658,6 +1659,7 @@ impl NetworkNode { Some(network_actor.clone()), None, store.clone(), + None, Some(network_graph.clone()), root.get_cell(), None, @@ -2040,6 +2042,11 @@ impl NetworkNode { pub fn get_store(&self) -> &Store { &self.store } + + #[cfg(test)] + pub fn get_actor(&self) -> ActorRef { + self.network_actor.clone() + } } pub async fn create_mock_chain_actor() -> ActorRef { diff --git a/crates/fiber-store/Cargo.toml b/crates/fiber-store/Cargo.toml index e7a83068d..a44853a0e 100644 --- a/crates/fiber-store/Cargo.toml +++ b/crates/fiber-store/Cargo.toml @@ -24,6 +24,9 @@ rocksdb = { package = "ckb-rocksdb", version = "=0.21.1", features = [ "lz4", ], default-features = false, optional = true } +[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] +tempfile = "3.10.1" + [target.'cfg(target_arch = "wasm32")'.dependencies] anyhow = "1.0.81" console_error_panic_hook = "0.1.7" diff --git a/crates/fiber-store/src/backend.rs b/crates/fiber-store/src/backend.rs index db2c57a4a..cfc80cfaa 100644 --- a/crates/fiber-store/src/backend.rs +++ b/crates/fiber-store/src/backend.rs @@ -1,3 +1,6 @@ +use std::path::Path; + +use crate::error::StoreError; use crate::iterator::{IteratorDirection, KVPair, PrefixIterator}; /// A function that determines whether to keep taking items during iteration. @@ -72,4 +75,10 @@ pub trait StorageBackend: Send + Sync { ) -> PrefixIterator<'_, Self> { PrefixIterator::new_from(self, prefix.into(), start_key.into()) } + + /// Backup the node database to a specified path. + fn backup(&self, path: &Path) -> Result<(), StoreError>; + + /// Restore the node database with a specified path. + fn restore(&self, restore_path: &Path, db_path: &Path) -> Result<(), StoreError>; } diff --git a/crates/fiber-store/src/browser.rs b/crates/fiber-store/src/browser.rs index 0ab4489ca..07e066a7d 100644 --- a/crates/fiber-store/src/browser.rs +++ b/crates/fiber-store/src/browser.rs @@ -25,6 +25,7 @@ use web_sys::js_sys::Uint8Array; use crate::backend::{BatchWriter, StorageBackend, TakeWhileFn}; use crate::iterator::{IteratorDirection, KVPair}; +use crate::StoreError; type TakeWhileCallback = Box bool + 'static>; @@ -167,6 +168,18 @@ impl StorageBackend for Store { }) .collect() } + + fn backup(&self, _path: &Path) -> Result<(), StoreError> { + Err(StoreError::BackupError( + "Not supported on browser yet".into(), + )) + } + + fn restore(&self, _restore_path: &Path, _db_path: &Path) -> Result<(), StoreError> { + Err(StoreError::RestoreError( + "Not supported on browser yet".into(), + )) + } } pub struct Batch { diff --git a/crates/fiber-store/src/browser_test.rs b/crates/fiber-store/src/browser_test.rs index 0e45a16b8..44e383ef4 100644 --- a/crates/fiber-store/src/browser_test.rs +++ b/crates/fiber-store/src/browser_test.rs @@ -3,6 +3,7 @@ use std::{cell::RefCell, collections::BTreeMap, path::Path, rc::Rc}; use crate::backend::{BatchWriter, StorageBackend, TakeWhileFn}; use crate::iterator::{IteratorDirection, KVPair}; +use crate::StoreError; #[derive(Clone)] pub struct Store { @@ -112,6 +113,18 @@ impl StorageBackend for Store { ) -> Vec { self.collect_from_btree(start, direction, &take_while_fn, limit) } + + fn backup(&self, _path: &Path) -> Result<(), StoreError> { + Err(StoreError::BackupError( + "Not supported on browser yet".into(), + )) + } + + fn restore(&self, _restore_path: &Path, _db_path: &Path) -> Result<(), StoreError> { + Err(StoreError::RestoreError( + "Not supported on browser yet".into(), + )) + } } struct KV { diff --git a/crates/fiber-store/src/error.rs b/crates/fiber-store/src/error.rs index 63c769675..68d8643f6 100644 --- a/crates/fiber-store/src/error.rs +++ b/crates/fiber-store/src/error.rs @@ -4,4 +4,13 @@ use thiserror::Error; pub enum StoreError { #[error("Database internal error: {0}")] DBInternalError(String), + + #[error("IO error: {0}")] + IOError(#[from] std::io::Error), + + #[error("Backup error: {0}")] + BackupError(String), + + #[error("Restore error: {0}")] + RestoreError(String), } diff --git a/crates/fiber-store/src/lib.rs b/crates/fiber-store/src/lib.rs index f45d7473a..48660c42c 100644 --- a/crates/fiber-store/src/lib.rs +++ b/crates/fiber-store/src/lib.rs @@ -39,3 +39,6 @@ pub use browser::{Batch, Store}; mod browser_test; #[cfg(all(target_arch = "wasm32", feature = "browser-test"))] pub use browser_test::{Batch, Store}; + +#[cfg(test)] +pub mod tests; diff --git a/crates/fiber-store/src/native.rs b/crates/fiber-store/src/native.rs index f84b5b7a8..dd553795c 100644 --- a/crates/fiber-store/src/native.rs +++ b/crates/fiber-store/src/native.rs @@ -1,11 +1,13 @@ -use rocksdb::Direction as DbDirection; -use rocksdb::IteratorMode; -use rocksdb::{prelude::*, DBCompressionType, WriteBatch, DB}; -use std::path::Path; +pub use rocksdb::Direction as DbDirection; +pub use rocksdb::IteratorMode; +use rocksdb::{checkpoint::Checkpoint, prelude::*, DBCompressionType, WriteBatch, DB}; +use std::path::{Path, PathBuf}; use std::sync::Arc; use crate::backend::{BatchWriter, StorageBackend, TakeWhileFn}; +use crate::error::StoreError; use crate::iterator::{IteratorDirection, KVPair}; +use tracing::{info, warn}; #[derive(Clone, Debug)] pub struct Store { @@ -84,6 +86,122 @@ impl StorageBackend for Store { results } + + fn backup(&self, path: &Path) -> Result<(), StoreError> { + let target_dir = PathBuf::from(&path); + + // Prevent overwriting existing data + if target_dir.exists() { + return Err(StoreError::BackupError(format!( + "Backup directory: {:?} already exists", + path + ))); + } + + if let Err(e) = std::fs::create_dir_all(&target_dir) { + return Err(StoreError::BackupError(format!( + "Failed to create backup directory: {}", + e + ))); + } + info!("Starting node backup to: {:?}", target_dir); + + let db_backup_path = target_dir.join("db"); + let checkpoint = + Checkpoint::new(&self.db).map_err(|e| StoreError::BackupError(e.into_string()))?; + if let Err(e) = checkpoint.create_checkpoint(&db_backup_path) { + return Err(StoreError::BackupError(format!( + "Failed to create DB checkpoint: {}", + e + ))); + } + + info!("Backup completed successfully"); + + Ok(()) + } + + /// Restore the RocksDB database from a checkpoint. + fn restore(&self, restore_path: &Path, db_path: &Path) -> Result<(), StoreError> { + // Usually, the actual RocksDB data is stored in a 'db' subdirectory of the backup + let source_dir = restore_path.join("db"); + + if !source_dir.exists() { + return Err(StoreError::RestoreError(format!( + "Restore source path does not exist: {:?}", + source_dir + ))); + } + + // Create a safety backup path for the current database + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Duration since unix epoch") + .as_millis() as u64; + let backup_path = db_path.with_extension(format!("bak.{}", now)); + + info!("Starting physical database restoration."); + info!("Source (Checkpoint): {:?}", source_dir); + info!("Target (Current DB): {:?}", db_path); + + let mut db_was_moved = false; + if db_path.exists() { + info!( + "Moving current database to safety backup: {:?}", + backup_path + ); + std::fs::rename(db_path, &backup_path).map_err(|e| { + StoreError::RestoreError(format!("Failed to move current database: {}", e)) + })?; + db_was_moved = true; + } + + info!("Copying files from checkpoint to target..."); + if let Err(e) = copy_dir_all(&source_dir, db_path) { + // Logging the error before starting rollback + warn!( + "Failed to copy checkpoint files: {}. Starting rollback...", + e + ); + + if db_was_moved { + // Attempt to restore the original database from the safety backup + if let Err(rollback_err) = std::fs::rename(&backup_path, db_path) { + return Err(StoreError::RestoreError(format!( + "Critical failure during copy and subsequent rollback. Error: {}", + rollback_err + ))); + } else { + warn!("Rollback successful. Original database restored."); + } + } else { + // If we didn't have an old DB to move, just clean up the partial copy + let _ = std::fs::remove_dir_all(db_path); + } + + return Err(StoreError::RestoreError(format!( + "Failed to restore database from checkpoint: {}", + e + ))); + } + + info!("Physical database swap completed successfully."); + Ok(()) + } +} + +fn copy_dir_all(src: impl AsRef, dst: impl AsRef) -> std::io::Result<()> { + std::fs::create_dir_all(&dst)?; + for entry in std::fs::read_dir(src)? { + let entry = entry?; + let ty = entry.file_type()?; + if ty.is_dir() { + copy_dir_all(entry.path(), dst.as_ref().join(entry.file_name()))?; + } else { + std::fs::copy(entry.path(), dst.as_ref().join(entry.file_name()))?; + } + } + Ok(()) } pub struct Batch { diff --git a/crates/fiber-store/src/sqlite.rs b/crates/fiber-store/src/sqlite.rs index fdaed745b..27d54976f 100644 --- a/crates/fiber-store/src/sqlite.rs +++ b/crates/fiber-store/src/sqlite.rs @@ -1,9 +1,10 @@ -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use rusqlite::Connection; use crate::backend::{BatchWriter, StorageBackend, TakeWhileFn}; +use crate::error::StoreError; use crate::iterator::{IteratorDirection, KVPair}; /// SQLite-backed key-value store. @@ -131,6 +132,90 @@ impl StorageBackend for Store { } results } + + fn backup(&self, path: &Path) -> Result<(), StoreError> { + let target_dir = PathBuf::from(&path); + + if target_dir.exists() { + return Err(StoreError::BackupError(format!( + "Backup directory: {:?} already exists", + path + ))); + } + + if let Err(e) = std::fs::create_dir_all(&target_dir) { + return Err(StoreError::BackupError(format!( + "Failed to create backup directory: {}", + e + ))); + } + + let db_file_path = target_dir.join("data.sqlite"); + let db_file_str = db_file_path + .to_str() + .ok_or_else(|| StoreError::BackupError("Invalid backup path encoding".to_string()))?; + + let conn = self + .conn + .lock() + .map_err(|_| StoreError::BackupError("lock poisoned".to_string()))?; + + conn.execute("VACUUM INTO ?1", [db_file_str]) + .map_err(|e| StoreError::BackupError(format!("SQLite VACUUM INTO failed: {}", e)))?; + + Ok(()) + } + + fn restore(&self, restore_path: &Path, db_path: &Path) -> Result<(), StoreError> { + // Restore source: restore_path/data.sqlite + let source_file = restore_path.join("data.sqlite"); + // Target db file: db_path/data.sqlite + let target_file = db_path.join("data.sqlite"); + + if !source_file.exists() || !source_file.is_file() { + return Err(StoreError::RestoreError(format!( + "Backup source file not found at: {:?}", + source_file + ))); + } + + if !db_path.exists() { + std::fs::create_dir_all(db_path).map_err(|e| { + StoreError::RestoreError(format!("Failed to create DB directory: {}", e)) + })?; + } + + if target_file.exists() { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Duration since unix epoch") + .as_millis() as u64; + // Prevent accidental deletion of the current database + let backup_name = format!("data.sqlite.bak.{}", now); + let safety_bak_path = db_path.join(backup_name); + + std::fs::rename(&target_file, &safety_bak_path).map_err(|e| { + StoreError::RestoreError(format!("Failed to backup current DB: {}", e)) + })?; + + // Old logs and indexes must be deleted to prevent offset conflicts when the new database is started. + let wal_file = db_path.join("data.sqlite-wal"); + let shm_file = db_path.join("data.sqlite-shm"); + + if wal_file.exists() { + let _ = std::fs::remove_file(wal_file); + } + if shm_file.exists() { + let _ = std::fs::remove_file(shm_file); + } + } + + std::fs::copy(&source_file, &target_file).map_err(|e| { + StoreError::RestoreError(format!("Failed to copy database file: {}", e)) + })?; + + Ok(()) + } } enum BatchOp { diff --git a/crates/fiber-store/src/tests/backend_test.rs b/crates/fiber-store/src/tests/backend_test.rs new file mode 100644 index 000000000..4e3386013 --- /dev/null +++ b/crates/fiber-store/src/tests/backend_test.rs @@ -0,0 +1,171 @@ +use crate::backend::StorageBackend; +use std::fs; +#[allow(unused_imports)] +use std::path::Path; +use tempfile::tempdir; + +// --- RocksDB (Native) Tests --- +#[cfg(all( + not(target_arch = "wasm32"), + feature = "rocksdb", + not(feature = "sqlite") +))] +mod testrocksdb_tests { + use super::*; + use crate::native::Store as RocksdbStore; + + #[test] + fn test_rocksdb_backup_and_restore() { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("rocksdb_data"); + let backup_dir = dir.path().join("rocksdb_backup"); + + // Initialize and put data + let store = RocksdbStore::open_db(&db_path).expect("Open RocksDB failed"); + store.put(b"key1", b"value1"); + + // Perform Backup + // RocksDB stores actual data in backup_dir/db + store.backup(&backup_dir).expect("RocksDB backup failed"); + assert!(backup_dir.join("db").exists()); + + // Simulate data change + store.put(b"key1", b"new_value"); + assert_eq!(store.get(b"key1").unwrap(), b"new_value"); + + // Perform Restore + // In physical restore, we drop the store to release file locks + drop(store); + let store_for_restore = RocksdbStore::open_db(&db_path).unwrap(); + store_for_restore + .restore(&backup_dir, &db_path) + .expect("Restore failed"); + drop(store_for_restore); + + // Verify restored data + let restored_store = RocksdbStore::open_db(&db_path).unwrap(); + assert_eq!(restored_store.get(b"key1").unwrap(), b"value1"); + } + + #[test] + fn test_rocksdb_restore_rollback_mechanism() { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("rocksdb_rollback"); + let invalid_backup = dir.path().join("non_existent_source"); + + // Setup original database + let store = RocksdbStore::open_db(&db_path).unwrap(); + store.put(b"critical", b"data"); + drop(store); + + // Attempt restore from invalid path + let store_for_restore = RocksdbStore::open_db(&db_path).unwrap(); + let result = store_for_restore.restore(&invalid_backup, &db_path); + assert!(result.is_err()); + drop(store_for_restore); + + // Verify original data was preserved (safety backup worked) + let restored_store = RocksdbStore::open_db(&db_path).unwrap(); + assert_eq!(restored_store.get(b"critical").unwrap(), b"data"); + } + + #[test] + fn test_perform_key_backup_logic() { + let dir = tempdir().unwrap(); + let base_dir = dir.path(); + let backup_dir = base_dir.join("key_backup"); + fs::create_dir_all(&backup_dir).unwrap(); + + // Create mock key files + let ckb_key_path = base_dir.join("ckb_key"); + let fiber_key_path = base_dir.join("fiber_key"); + fs::write(&ckb_key_path, b"ckb_secret_content").unwrap(); + fs::write(&fiber_key_path, b"fiber_secret_content").unwrap(); + + // Execute key backup + perform_key_backup_internal(&backup_dir, &ckb_key_path, &fiber_key_path) + .expect("Key backup failed"); + + // Assert keys are copied with correct names + assert_eq!( + fs::read(backup_dir.join("key")).unwrap(), + b"ckb_secret_content" + ); + assert_eq!( + fs::read(backup_dir.join("sk")).unwrap(), + b"fiber_secret_content" + ); + } + + fn perform_key_backup_internal( + target_dir: &Path, + ckb_key_path: &Path, + fiber_key_path: &Path, + ) -> Result<(), String> { + let keys_to_copy = [(ckb_key_path, "key"), (fiber_key_path, "sk")]; + for (src_file, dest_name) in keys_to_copy { + if src_file.exists() { + let dest_file = target_dir.join(dest_name); + std::fs::copy(src_file, &dest_file).map_err(|e| e.to_string())?; + } + } + Ok(()) + } +} + +// --- SQLite Tests --- +#[cfg(all(not(target_arch = "wasm32"), feature = "sqlite"))] +mod sqlite_tests { + use super::*; + use crate::sqlite::Store as SqliteStore; + + #[test] + fn test_sqlite_backup_and_restore() { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("sqlite_data"); + let backup_dir = dir.path().join("sqlite_backup"); + + // Initialize and put data + let store = SqliteStore::open_db(&db_path).expect("Open SQLite failed"); + store.put(b"sql_key", b"sql_value"); + + // Perform Backup + // SQLite stores actual data in backup_dir/data.sqlite + store.backup(&backup_dir).expect("SQLite backup failed"); + assert!(backup_dir.join("data.sqlite").exists()); + + // Modify data + store.put(b"sql_key", b"corrupted"); + + // Perform Restore + drop(store); + let store_for_restore = SqliteStore::open_db(&db_path).unwrap(); + store_for_restore + .restore(&backup_dir, &db_path) + .expect("SQLite Restore failed"); + + // WAL and SHM files should be cleaned up during restore + assert!(!db_path.join("data.sqlite-wal").exists()); + assert!(!db_path.join("data.sqlite-shm").exists()); + + drop(store_for_restore); + + // Verify restored data and sidecar files removal + let restored_store = SqliteStore::open_db(&db_path).unwrap(); + assert_eq!(restored_store.get(b"sql_key").unwrap(), b"sql_value"); + } + + #[test] + fn test_sqlite_backup_already_exists_error() { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("sqlite_err"); + let backup_dir = dir.path().join("already_exists"); + + fs::create_dir_all(&backup_dir).unwrap(); + let store = SqliteStore::open_db(&db_path).unwrap(); + + let result = store.backup(&backup_dir); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("already exists")); + } +} diff --git a/crates/fiber-store/src/tests/mod.rs b/crates/fiber-store/src/tests/mod.rs new file mode 100644 index 000000000..b4e1cc264 --- /dev/null +++ b/crates/fiber-store/src/tests/mod.rs @@ -0,0 +1 @@ +pub mod backend_test; diff --git a/crates/fiber-types/src/channel.rs b/crates/fiber-types/src/channel.rs index 29cd16083..c9c7d4ed6 100644 --- a/crates/fiber-types/src/channel.rs +++ b/crates/fiber-types/src/channel.rs @@ -252,6 +252,9 @@ pub enum ChannelState { /// Both we and our counterparty consider the funding transaction confirmed and the channel is /// now operational. ChannelReady, + /// The channel state is potentially outdated (e.g., after a database restore). + /// We must perform a passive audit with the peer before resuming operations. + Stale, /// We've successfully negotiated a `closing_signed` dance. ShuttingDown(ShuttingDownFlags), /// This channel is closed. diff --git a/crates/fiber-wasm/src/lib.rs b/crates/fiber-wasm/src/lib.rs index 14c9395ed..a625594b6 100644 --- a/crates/fiber-wasm/src/lib.rs +++ b/crates/fiber-wasm/src/lib.rs @@ -211,6 +211,7 @@ pub async fn fiber( new_tokio_task_tracker(), root_actor.get_cell(), store.clone(), + None, network_graph.clone(), default_shutdown_script, ) @@ -282,7 +283,11 @@ pub async fn fiber( .set(WrappedFiberWasm { channel: ChannelRpcServerImpl::new(network_actor.clone(), store.clone()), graph: GraphRpcServerImpl::new(network_graph.clone(), store.clone()), - info: InfoRpcServerImpl::new(network_actor.clone(), config.ckb.unwrap_or_default()), + info: InfoRpcServerImpl::new( + network_actor.clone(), + None, + config.ckb.unwrap_or_default(), + ), invoice: InvoiceRpcServerImpl::new( store.clone(), Some(network_actor.clone()), diff --git a/openrpc-json-generator/Cargo.lock b/openrpc-json-generator/Cargo.lock index b01f09c7a..6b4d80163 100644 --- a/openrpc-json-generator/Cargo.lock +++ b/openrpc-json-generator/Cargo.lock @@ -3802,6 +3802,7 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" name = "openrpc-json-generator" version = "0.1.0" dependencies = [ + "fiber-json-types", "fnn", "openrpsee", "phf 0.13.1", diff --git a/openrpc-json-generator/Cargo.toml b/openrpc-json-generator/Cargo.toml index d13bba67d..22f29116b 100644 --- a/openrpc-json-generator/Cargo.toml +++ b/openrpc-json-generator/Cargo.toml @@ -5,6 +5,7 @@ version = "0.1.0" [dependencies] fnn = {path = "../crates/fiber-lib", features = ["rocksdb"]} +fiber-json-types = { path = "../crates/fiber-json-types" } openrpsee = {path = "./openrpsee-0.1.0"} phf = {version = "0.13.1", features = ["macros"]} serde_json = "1" diff --git a/openrpc-json-generator/build.rs b/openrpc-json-generator/build.rs index 930f35061..215426ced 100644 --- a/openrpc-json-generator/build.rs +++ b/openrpc-json-generator/build.rs @@ -59,7 +59,7 @@ fn main() { .open(current_output_dir.join("rpc_openrpc.rs")) .unwrap(); generated_rpc_defs - .write(format!("use fnn::rpc::{}::*;", file_name.replace(".rs", "")).as_bytes()) + .write(format!("#[allow(unused_imports)]\nuse fnn::rpc::{}::*;\n#[allow(unused_imports)]\nuse fiber_json_types::*;\nuse std::path::Path;", file_name.replace(".rs", "")).as_bytes()) .unwrap(); } root_mod_file.write(format!("pub const API_METHODS: [ (&str, &::phf::Map<&str, openrpsee::openrpc::RpcMethod>); {}] = [",rpc_mods.len()).as_bytes()).unwrap(); diff --git a/openrpc-json-generator/src/methods.rs b/openrpc-json-generator/src/methods.rs index 17f84f001..45cc0511d 100644 --- a/openrpc-json-generator/src/methods.rs +++ b/openrpc-json-generator/src/methods.rs @@ -2,3 +2,5 @@ pub const PARAM_PARAMS_DESC: &str = ""; pub const PARAM_PAYMENT_HASH_DESC: &str = ""; pub const PARAM_SETTLE_INVOICE_DESC: &str = ""; pub const PARAM_CTX_DESC: &str = ""; +pub const PARAM_PATH_DESC: &str = ""; +pub const PARAM_TARGET_PATH_DESC: &str = ""; \ No newline at end of file