Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 47 additions & 2 deletions crates/fiber-bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use fnn::ckb::{contracts::try_init_contracts_context, CkbChainActor};
use fnn::event_handler::forward_event_to_client;
use fnn::fiber::{graph::NetworkGraph, network::init_chain_hash, network::NetworkActorMessage};
use fnn::rpc::server::start_rpc;
use fnn::store::actor::{StoreActor, StoreActorInitializationParameter};
use fnn::store::open_store;
use fnn::store::restore::restore;
use fnn::tasks::{
cancel_tasks_and_wait_for_completion, new_tokio_cancellation_token, new_tokio_task_tracker,
};
Expand Down Expand Up @@ -93,12 +95,31 @@ pub async fn main() -> Result<(), ExitMessage> {
}
}

if let Some(source_path) = &config.restore {
info!("Starting manual restore process from: {:?}", source_path);

let parsed_fiber_config = config
.parsed_fiber()
.ok_or(ExitMessage("fiber config must be set".to_string()))?;

let store_path = parsed_fiber_config.store_path();

restore(source_path, &store_path)
.map_err(|err| ExitMessage(format!("Failed to restore database: {}", err)))?;

info!("Successfully restored database to {:?}.", store_path);
info!("All channels have been marked as 'Stale' for safety audit.");

std::process::exit(0);
}

let parsed_fiber_config = config
.parsed_fiber()
.ok_or(ExitMessage("fiber config must be set".to_string()))?;

// Derive store_path: prefer fiber config, fall back to base_dir/fiber/store
let store_path = parsed_fiber_config.store_path();

let raw_store = open_store(store_path).map_err(|err| ExitMessage(err.to_string()))?;

if config.cch.is_some() || config.rpc.is_some() {
Expand Down Expand Up @@ -131,7 +152,7 @@ async fn run_node(
});

#[allow(unused_variables)]
let (network_actor, ckb_chain_actor, network_graph) = match config.fiber.clone() {
let (network_actor, ckb_chain_actor, network_graph, store_actor) = match config.fiber.clone() {
Some(fiber_config) => {
// TODO: this is not a super user friendly error message which has actionable information
// for the user to fix the error and start the node.
Expand Down Expand Up @@ -191,6 +212,27 @@ async fn run_node(

info!("Starting fiber");

let backup_path = fiber_config.base_dir().join("backups");
let ckb_key_path = ckb_config.base_dir().join("key");
let fiber_key_path = fiber_config.base_dir().join("sk");
let store_actor = Actor::spawn_linked(
Some("store_actor".to_string()),
StoreActor {
_phantom: std::marker::PhantomData,
},
StoreActorInitializationParameter {
store: store.clone(),
backup_path,
ckb_key_path,
fiber_key_path,
backup_interval_hours: 24,
},
root_actor.get_cell(),
)
.await
.map_err(|err| ExitMessage(format!("failed to start store actor: {}", err)))?
.0;

let chain_client = CkbRpcClient::new(&ckb_config);
let network_actor: ActorRef<NetworkActorMessage> = start_network(
fiber_config.clone(),
Expand All @@ -200,6 +242,7 @@ async fn run_node(
new_tokio_task_tracker(),
root_actor.get_cell(),
store.clone(),
Some(store_actor.clone()),
network_graph.clone(),
default_shutdown_script,
)
Expand Down Expand Up @@ -334,9 +377,10 @@ async fn run_node(
Some(network_actor),
Some(ckb_chain_actor),
Some(network_graph),
Some(store_actor),
)
}
None => (None, None, None),
None => (None, None, None, None),
};

let cch_currency = config
Expand Down Expand Up @@ -427,6 +471,7 @@ async fn run_node(
network_actor,
cch_actor,
store,
store_actor,
network_graph,
root_actor.get_cell(),
store_change_port,
Expand Down
3 changes: 3 additions & 0 deletions crates/fiber-json-types/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,9 @@ pub enum ChannelState {
/// Both we and our counterparty consider the funding transaction confirmed and the channel is
/// now operational.
ChannelReady,
/// The channel state is potentially outdated (e.g., after a database restore).
/// We must perform a passive audit with the peer before resuming operations.
Stale,
/// We've successfully negotiated a `closing_signed` dance. At this point, the `ChannelManager`
ShuttingDown(#[schemars(schema_with = "schema_as_string")] ShuttingDownFlags),
/// This channel is closed.
Expand Down
4 changes: 3 additions & 1 deletion crates/fiber-json-types/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl From<InternalChannelState> for JsonChannelState {
JsonChannelState::AwaitingChannelReady(flags.bits().into())
}
InternalChannelState::ChannelReady => JsonChannelState::ChannelReady,
InternalChannelState::Stale => JsonChannelState::Stale,
InternalChannelState::ShuttingDown(flags) => {
JsonChannelState::ShuttingDown(flags.bits().into())
}
Expand All @@ -137,7 +138,8 @@ impl JsonChannelState {
| JsonChannelState::CollaboratingFundingTx(_)
| JsonChannelState::SigningCommitment(_)
| JsonChannelState::AwaitingTxSignatures(_)
| JsonChannelState::AwaitingChannelReady(_) => true,
| JsonChannelState::AwaitingChannelReady(_)
| JsonChannelState::Stale => true,
JsonChannelState::ChannelReady | JsonChannelState::ShuttingDown(_) => false,
JsonChannelState::Closed(bits) => {
// FUNDING_ABORTED or ABANDONED are "pending-failed" states
Expand Down
1 change: 1 addition & 0 deletions crates/fiber-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ jsonrpsee = { version = "0.25.1", features = [
"macros",
"http-client",
] }
tokio = { version = "1.x", features = ["test-util"] }

[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
fiber-store = { path = "../fiber-store", features = ["browser-test"] }
Expand Down
9 changes: 9 additions & 0 deletions crates/fiber-lib/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub struct Config {
pub base_dir: PathBuf,
/// When true, validate the database and exit without starting services.
pub check_validate: bool,
/// Restorage path, restore database from a backup file and exit.
pub restore: Option<PathBuf>,
}

impl Config {
Expand Down Expand Up @@ -115,6 +117,10 @@ pub mod native {
#[arg(long, default_value_t = false)]
check_validate: bool,

/// Restore database from a backup file and exit
#[arg(long = "restore", value_name = "BACKUP_PATH")]
restore: Option<std::path::PathBuf>,

/// config for fiber network
#[command(flatten)]
pub fiber: <FiberConfig as ClapSerde>::Opt,
Expand Down Expand Up @@ -163,6 +169,7 @@ pub mod native {
// Base directory for all things to be stored to disk
let base_dir = args.base_dir.clone().unwrap_or(get_default_base_dir());
let check_validate = args.check_validate;
let restore = args.restore;

// Get config file by
// 1. Using the explicitly set command line argument `config`
Expand Down Expand Up @@ -251,6 +258,7 @@ pub mod native {
ckb,
base_dir,
check_validate,
restore,
}
}
}
Expand Down Expand Up @@ -318,6 +326,7 @@ mod wasm {
ckb,
base_dir: PathBuf::from_str(&database_prefix).unwrap(),
check_validate: false,
restore: None,
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions crates/fiber-lib/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ impl From<StoreError> for Error {
fn from(e: StoreError) -> Self {
match e {
StoreError::DBInternalError(msg) => Error::DBInternalError(msg),
StoreError::IOError(err) => Error::IO(err),
StoreError::RestoreError(msg) => Error::DBInternalError(msg),
StoreError::BackupError(msg) => Error::DBInternalError(msg),
}
}
}
89 changes: 76 additions & 13 deletions crates/fiber-lib/src/fiber/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::fiber::fee::{check_open_channel_parameters, check_tlc_delta_with_epoc
#[cfg(debug_assertions)]
use crate::fiber::network::DebugEvent;
use crate::fiber::types::{BroadcastMessageWithTimestamp, TxSignatures};
use crate::store::actor::StoreActorMessage;
use crate::time::{SystemTime, UNIX_EPOCH};
use crate::utils::actor::ActorHandleLogGuard;
use crate::utils::payment::is_invoice_fulfilled;
Expand Down Expand Up @@ -408,6 +409,7 @@ pub struct ChannelActor<S> {
remote_pubkey: Pubkey,
network: ActorRef<NetworkActorMessage>,
store: S,
store_actor: Option<ActorRef<StoreActorMessage>>,
}

impl<S> ChannelActor<S>
Expand All @@ -419,12 +421,14 @@ where
remote_pubkey: Pubkey,
network: ActorRef<NetworkActorMessage>,
store: S,
store_actor: Option<ActorRef<StoreActorMessage>>,
) -> Self {
Self {
local_pubkey,
remote_pubkey,
network,
store,
store_actor,
}
}

Expand Down Expand Up @@ -3354,14 +3358,16 @@ where
remote_commitment_number: channel.get_remote_commitment_number(),
};

self.network
.send_message(NetworkActorMessage::new_command(
NetworkActorCommand::SendFiberMessage(FiberMessageWithTarget::new(
self.get_remote_pubkey(),
FiberMessage::reestablish_channel(reestablish_channel),
)),
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);
if channel.state != ChannelState::Stale {
self.network
.send_message(NetworkActorMessage::new_command(
NetworkActorCommand::SendFiberMessage(FiberMessageWithTarget::new(
self.get_remote_pubkey(),
FiberMessage::reestablish_channel(reestablish_channel),
)),
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);
}

channel
}
Expand Down Expand Up @@ -3585,6 +3591,14 @@ where

if self.should_persist_channel_state(state) {
self.store.insert_channel_actor_state(state.clone());
if state.needs_backup {
if let Some(ref store_actor) = self.store_actor {
store_actor
.cast(StoreActorMessage::RequestBackup)
.map_err(|e| e.to_string())?;
}
state.needs_backup = false;
}
} else {
debug!(
"Skip persisting channel state during external funding pre-submission phase: {}",
Expand Down Expand Up @@ -3932,6 +3946,11 @@ pub struct ChannelActorState {
// signing key
#[doc = "skip_store"]
pub private_key: Option<Privkey>,

// Indicates that the state has changed and a backup should be triggered
// at the end of the current message processing loop.
#[doc = "skip_store"]
pub needs_backup: bool,
}

impl std::ops::Deref for ChannelActorState {
Expand Down Expand Up @@ -3968,6 +3987,7 @@ impl<'de> Deserialize<'de> for ChannelActorState {
deferred_peer_tlc_updates: VecDeque::new(),
ephemeral_config: Default::default(),
private_key: None,
needs_backup: false,
})
}
}
Expand Down Expand Up @@ -4210,6 +4230,13 @@ impl ChannelActorState {
self.local_tlc_info.enabled
}

pub fn is_risk_of_penalty(&self) -> bool {
matches!(
self.state,
ChannelState::ChannelReady | ChannelState::ShuttingDown(_)
)
}

pub fn set_waiting_peer_response(&mut self) {
self.waiting_peer_response = Some(now_timestamp_as_millis_u64());
}
Expand Down Expand Up @@ -4748,6 +4775,7 @@ impl ChannelActorState {
deferred_peer_tlc_updates: VecDeque::new(),
ephemeral_config: Default::default(),
private_key: Some(private_key),
needs_backup: true,
};
if let Some(nonce) = remote_channel_announcement_nonce {
state.update_remote_channel_announcement_nonce(&nonce);
Expand Down Expand Up @@ -4839,6 +4867,7 @@ impl ChannelActorState {
deferred_peer_tlc_updates: VecDeque::new(),
ephemeral_config: Default::default(),
private_key: Some(private_key),
needs_backup: true,
};
state.log_ack_state("[ack] new_outbound_channel");
state
Expand Down Expand Up @@ -4966,11 +4995,14 @@ impl ChannelActorState {
}

pub(crate) fn update_state(&mut self, new_state: ChannelState) {
debug!(
"Updating channel state from {:?} to {:?}",
&self.state, &new_state
);
self.state = new_state;
if self.state != new_state {
debug!(
"Updating channel state from {:?} to {:?}",
&self.state, &new_state
);
self.state = new_state;
self.needs_backup = true;
}
}

pub(crate) fn local_is_node1(&self) -> bool {
Expand Down Expand Up @@ -7136,6 +7168,36 @@ impl ChannelActorState {
debug_event!(network, "Reestablished channel in ChannelReady");
}
}
ChannelState::Stale => {
let my_local = self.get_local_commitment_number();
let my_remote = self.get_remote_commitment_number();
let peer_local = reestablish_channel.local_commitment_number;
let peer_remote = reestablish_channel.remote_commitment_number;

if peer_local.abs_diff(my_remote) > 1 || peer_remote.abs_diff(my_local) > 1 {
error!(
"Audit Failed for Stale channel: Local(L:{}, R:{}), Peer(L:{}, R:{})",
my_local, my_remote, peer_local, peer_remote
);
return Err(ProcessingChannelError::InvalidParameter(
"reestablish channel message with invalid commitment numbers during audit"
.to_string(),
));
}

info!(
"Passive audit passed for channel {}. Resuming to Ready.",
self.get_id()
);
self.update_state(ChannelState::ChannelReady);

return Box::pin(self.handle_reestablish_channel_message(
myself,
reestablish_channel,
pending_commit_diff,
))
.await;
}
ChannelState::ShuttingDown(flags) => {
// Resend the shutdown message to the peer if we have not received the peer's shutdown message.
if !flags.contains(ShuttingDownFlags::THEIR_SHUTDOWN_SENT) {
Expand Down Expand Up @@ -8124,6 +8186,7 @@ pub trait ChannelActorStateStore {
.collect()
}
fn get_channel_state_by_outpoint(&self, id: &OutPoint) -> Option<ChannelActorState>;
fn get_all_channel_states(&self) -> Vec<ChannelActorState>;
fn insert_payment_custom_records(
&self,
payment_hash: &Hash256,
Expand Down
Loading
Loading