diff --git a/lightning/Cargo.toml b/lightning/Cargo.toml index 03edeb252..e740b49c2 100644 --- a/lightning/Cargo.toml +++ b/lightning/Cargo.toml @@ -52,6 +52,7 @@ libm = { version = "0.2", default-features = false } inventory = { version = "0.3", optional = true } # RGB and related +bincode = "1.3" futures = "0.3" rgb-lib = { version = "0.3.0-beta.5", features = [ "electrum", @@ -60,7 +61,6 @@ rgb-lib = { version = "0.3.0-beta.5", features = [ serde = { version = "^1.0", features = [ "derive", ] } -serde_json = "^1.0" tokio = { version = "1.14.1", features = [ "macros", "rt-multi-thread", diff --git a/lightning/src/ln/chan_utils.rs b/lightning/src/ln/chan_utils.rs index 3e337e1aa..a26abb6e3 100644 --- a/lightning/src/ln/chan_utils.rs +++ b/lightning/src/ln/chan_utils.rs @@ -36,6 +36,7 @@ use crate::ln::msgs::DecodeError; use crate::rgb_utils::{color_htlc, is_tx_colored}; use crate::sign::EntropySource; use crate::types::payment::{PaymentHash, PaymentPreimage}; +use crate::util::persist::KVStoreSync; use crate::util::ser::{Readable, ReadableArgs, RequiredWrapper, Writeable, Writer}; use crate::util::transaction_utils; @@ -2156,6 +2157,7 @@ impl<'a> TrustedCommitmentTransaction<'a> { pub fn get_htlc_sigs( &self, htlc_base_key: &SecretKey, channel_parameters: &DirectedChannelTransactionParameters, entropy_source: &ES, secp_ctx: &Secp256k1, ldk_data_dir: &PathBuf, + rgb_kv_store: &dyn KVStoreSync, ) -> Result, ()> where ES::Target: EntropySource { let inner = self.inner; let keys = &inner.keys; @@ -2167,7 +2169,7 @@ impl<'a> TrustedCommitmentTransaction<'a> { assert!(this_htlc.transaction_output_index.is_some()); let mut htlc_tx = build_htlc_transaction(&txid, inner.feerate_per_kw, channel_parameters.contest_delay(), &this_htlc, &self.channel_type_features, &keys.broadcaster_delayed_payment_key, &keys.revocation_key); if inner.is_colored() { - if let Err(_e) = color_htlc(&mut htlc_tx, this_htlc, ldk_data_dir) { + if let Err(_e) = color_htlc(&mut htlc_tx, this_htlc, ldk_data_dir, rgb_kv_store) { return Err(()); } } diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index a3f8114e3..ce87251fb 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -78,9 +78,8 @@ use crate::ln::types::ChannelId; use crate::ln::LN_MAX_MSG_LEN; use crate::offers::static_invoice::StaticInvoice; use crate::rgb_utils::{ - color_closing, color_commitment, color_htlc, get_rgb_channel_info_path, - get_rgb_channel_info_pending, parse_rgb_channel_info, rename_rgb_files, - update_rgb_channel_amount_pending, + color_closing, color_commitment, color_htlc, get_rgb_channel_info_pending, update_rgb_channel_id, + update_rgb_channel_amount_pending, RgbKvStoreExt, }; use crate::routing::gossip::NodeId; use crate::sign::ecdsa::EcdsaChannelSigner; @@ -104,6 +103,8 @@ use crate::prelude::*; use crate::sign::type_resolver::ChannelSignerType; #[cfg(any(test, fuzzing, debug_assertions))] use crate::sync::Mutex; +use crate::sync::Arc; +use crate::util::persist::KVStoreSync; use core::ops::Deref; use core::time::Duration; use core::{cmp, fmt, mem}; @@ -3147,6 +3148,9 @@ where pub(super) is_colored: bool, pub(crate) ldk_data_dir: PathBuf, + + /// KVStore for RGB data persistence + pub(crate) rgb_kv_store: Arc, } /// A channel struct implementing this trait can receive an initial counterparty commitment @@ -3247,7 +3251,7 @@ where let temporary_channel_id = context.channel_id; context.channel_id = channel_id; if context.is_colored() { - rename_rgb_files(&context.channel_id, &temporary_channel_id, &context.ldk_data_dir); + update_rgb_channel_id(&context.channel_id, &temporary_channel_id, context.rgb_kv_store.as_ref()); } assert!(!context.channel_state.is_monitor_update_in_progress()); // We have not had any monitor(s) yet to fail update! @@ -3417,6 +3421,7 @@ where open_channel_fields: msgs::CommonOpenChannelFields, push_asset_amount: Option, ldk_data_dir: PathBuf, + rgb_kv_store: Arc, ) -> Result<(FundingScope, ChannelContext), ChannelError> where ES::Target: EntropySource, @@ -3741,6 +3746,7 @@ where is_colored: funding.consignment_endpoint.is_some(), ldk_data_dir, + rgb_kv_store, }; Ok((funding, channel_context)) @@ -3767,6 +3773,7 @@ where consignment_endpoint: Option, ldk_data_dir: PathBuf, push_asset_amount: Option, + rgb_kv_store: Arc, ) -> Result<(FundingScope, ChannelContext), APIError> where ES::Target: EntropySource, @@ -3987,6 +3994,7 @@ where is_colored: funding.consignment_endpoint.is_some(), ldk_data_dir, + rgb_kv_store, }; Ok((funding, channel_context)) @@ -4365,13 +4373,8 @@ where /// Get the channel local RGB amount pub fn get_local_rgb_amount(&self) -> u64 { - let info_file_path = get_rgb_channel_info_path( - &self.channel_id.0.as_hex().to_string(), - &self.ldk_data_dir, - false, - ); - if info_file_path.exists() { - let rgb_info = parse_rgb_channel_info(&info_file_path); + let channel_id_str = self.channel_id.0.as_hex().to_string(); + if let Ok(rgb_info) = self.rgb_kv_store.read_rgb_channel_info(&channel_id_str, false) { rgb_info.local_rgb_amount } else { 0 @@ -4380,13 +4383,8 @@ where /// Get the channel remote RGB amount pub fn get_remote_rgb_amount(&self) -> u64 { - let info_file_path = get_rgb_channel_info_path( - &self.channel_id.0.as_hex().to_string(), - &self.ldk_data_dir, - false, - ); - if info_file_path.exists() { - let rgb_info = parse_rgb_channel_info(&info_file_path); + let channel_id_str = self.channel_id.0.as_hex().to_string(); + if let Ok(rgb_info) = self.rgb_kv_store.read_rgb_channel_info(&channel_id_str, false) { rgb_info.remote_rgb_amount } else { 0 @@ -5101,7 +5099,7 @@ where &holder_keys.revocation_key, ); if self.is_colored() { - color_htlc(&mut htlc_tx, htlc, &self.ldk_data_dir) + color_htlc(&mut htlc_tx, htlc, &self.ldk_data_dir, self.rgb_kv_store.as_ref()) .expect("successful htlc coloring"); } @@ -7345,6 +7343,7 @@ where &self.context.channel_id, &mut closing_transaction, &self.context.ldk_data_dir, + self.context.rgb_kv_store.as_ref(), ) .expect("successful closing TX coloring"); } @@ -8944,7 +8943,7 @@ where &self.context.channel_id, rgb_offered_htlc, rgb_received_htlc, - &self.context.ldk_data_dir, + self.context.rgb_kv_store.as_ref(), ); } @@ -11758,7 +11757,7 @@ where let were_node_one = node_id.as_slice() < counterparty_node_id.as_slice(); let contract_id = if self.context.is_colored() { - let (rgb_info, _) = get_rgb_channel_info_pending(&self.context.channel_id, &self.context.ldk_data_dir); + let rgb_info = get_rgb_channel_info_pending(&self.context.channel_id, self.context.rgb_kv_store.as_ref()); Some(rgb_info.contract_id) } else { None @@ -12912,7 +12911,7 @@ where } } if self.context.is_colored() && rgb_received_htlc > 0 { - update_rgb_channel_amount_pending(&self.context.channel_id, 0, rgb_received_htlc, &self.context.ldk_data_dir); + update_rgb_channel_amount_pending(&self.context.channel_id, 0, rgb_received_htlc, self.context.rgb_kv_store.as_ref()); } if let Some((feerate, update_state)) = self.context.pending_update_fee { if update_state == FeeUpdateState::AwaitingRemoteRevokeToAnnounce { @@ -13579,6 +13578,7 @@ where fee_estimator: &LowerBoundedFeeEstimator, entropy_source: &ES, signer_provider: &SP, counterparty_node_id: PublicKey, their_features: &InitFeatures, channel_value_satoshis: u64, push_msat: u64, user_id: u128, config: &UserConfig, current_chain_height: u32, outbound_scid_alias: u64, temporary_channel_id: Option, logger: L, consignment_endpoint: Option, ldk_data_dir: PathBuf, push_asset_amount: Option, + rgb_kv_store: Arc, ) -> Result, APIError> where ES::Target: EntropySource, F::Target: FeeEstimator, @@ -13619,6 +13619,7 @@ where consignment_endpoint, ldk_data_dir, push_asset_amount, + rgb_kv_store, )?; let unfunded_context = UnfundedChannelContext { unfunded_channel_age_ticks: 0, @@ -13702,7 +13703,7 @@ where let temporary_channel_id = self.context.channel_id; self.context.channel_id = ChannelId::v1_from_funding_outpoint(funding_txo); if self.context.is_colored() { - rename_rgb_files(&self.context.channel_id, &temporary_channel_id, &self.context.ldk_data_dir); + update_rgb_channel_id(&self.context.channel_id, &temporary_channel_id, self.context.rgb_kv_store.as_ref()); } // If the funding transaction is a coinbase transaction, we need to set the minimum depth to 100. @@ -13956,7 +13957,8 @@ where fee_estimator: &LowerBoundedFeeEstimator, entropy_source: &ES, signer_provider: &SP, counterparty_node_id: PublicKey, our_supported_features: &ChannelTypeFeatures, their_features: &InitFeatures, msg: &msgs::OpenChannel, user_id: u128, config: &UserConfig, - current_chain_height: u32, logger: &L, is_0conf: bool, ldk_data_dir: PathBuf + current_chain_height: u32, logger: &L, is_0conf: bool, ldk_data_dir: PathBuf, + rgb_kv_store: Arc, ) -> Result, ChannelError> where ES::Target: EntropySource, F::Target: FeeEstimator, @@ -13998,6 +14000,7 @@ where msg.common_fields.clone(), msg.push_asset_amount, ldk_data_dir, + rgb_kv_store, )?; let unfunded_context = UnfundedChannelContext { unfunded_channel_age_ticks: 0, @@ -14198,7 +14201,7 @@ where counterparty_node_id: PublicKey, their_features: &InitFeatures, funding_satoshis: u64, funding_inputs: Vec, user_id: u128, config: &UserConfig, current_chain_height: u32, outbound_scid_alias: u64, funding_confirmation_target: ConfirmationTarget, - logger: L, ldk_data_dir: PathBuf, + logger: L, ldk_data_dir: PathBuf, rgb_kv_store: Arc, ) -> Result where ES::Target: EntropySource, F::Target: FeeEstimator, @@ -14242,6 +14245,7 @@ where None, ldk_data_dir, None, + rgb_kv_store, )?; let unfunded_context = UnfundedChannelContext { unfunded_channel_age_ticks: 0, @@ -14352,7 +14356,7 @@ where holder_node_id: PublicKey, counterparty_node_id: PublicKey, our_supported_features: &ChannelTypeFeatures, their_features: &InitFeatures, msg: &msgs::OpenChannelV2, user_id: u128, config: &UserConfig, current_chain_height: u32, logger: &L, - ldk_data_dir: PathBuf, + ldk_data_dir: PathBuf, rgb_kv_store: Arc, ) -> Result where ES::Target: EntropySource, F::Target: FeeEstimator, @@ -14399,6 +14403,7 @@ where msg.common_fields.clone(), None, ldk_data_dir, + rgb_kv_store, )?; let channel_id = ChannelId::v2_from_revocation_basepoints( &funding.get_holder_pubkeys().revocation_basepoint, @@ -15089,16 +15094,16 @@ where } } -impl<'a, 'b, 'c, ES: Deref, SP: Deref> - ReadableArgs<(&'a ES, &'b SP, &'c ChannelTypeFeatures, PathBuf)> for FundedChannel +impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, &'c ChannelTypeFeatures, PathBuf, Arc)> + for FundedChannel where ES::Target: EntropySource, SP::Target: SignerProvider, { fn read( - reader: &mut R, args: (&'a ES, &'b SP, &'c ChannelTypeFeatures, PathBuf), + reader: &mut R, args: (&'a ES, &'b SP, &'c ChannelTypeFeatures, PathBuf, Arc), ) -> Result { - let (entropy_source, signer_provider, our_supported_features, ldk_data_dir) = args; + let (entropy_source, signer_provider, our_supported_features, ldk_data_dir, rgb_kv_store) = args; let ver = read_ver_prefix!(reader, SERIALIZATION_VERSION); if ver <= 2 { return Err(DecodeError::UnknownVersion); @@ -15897,6 +15902,7 @@ where interactive_tx_signing_session, is_colored: consignment_endpoint.is_some(), ldk_data_dir, + rgb_kv_store, }, holder_commitment_point, pending_splice, diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index f0bd1c6bd..32b939a85 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -119,10 +119,7 @@ use crate::onion_message::messenger::{ MessageRouter, MessageSendInstructions, Responder, ResponseInstruction, }; use crate::onion_message::offers::{OffersMessage, OffersMessageHandler}; -use crate::rgb_utils::{ - get_rgb_channel_info, get_rgb_payment_info_path, handle_funding, is_channel_rgb, - parse_rgb_payment_info, -}; +use crate::rgb_utils::{handle_funding, is_channel_rgb, RgbKvStoreExt}; use crate::routing::router::{ BlindedTail, FixedRouter, InFlightHtlcs, Path, Payee, PaymentParameters, Route, RouteParameters, RouteParametersConfig, Router, @@ -139,6 +136,7 @@ use crate::types::string::UntrustedString; use crate::util::config::{ChannelConfig, ChannelConfigOverrides, ChannelConfigUpdate, UserConfig}; use crate::util::errors::APIError; use crate::util::logger::{Level, Logger, WithContext}; +use crate::util::persist::KVStoreSync; use crate::util::scid_utils::fake_scid; use crate::util::ser::{ BigSize, FixedLengthReader, LengthReadable, MaybeReadable, Readable, ReadableArgs, VecWriter, @@ -2963,6 +2961,9 @@ pub struct ChannelManager< logger: L, ldk_data_dir: PathBuf, + + /// KVStore for RGB data persistence + rgb_kv_store: Arc, } /// Chain-related parameters used to construct a new `ChannelManager`. @@ -3986,7 +3987,8 @@ where pub fn new( fee_est: F, chain_monitor: M, tx_broadcaster: T, router: R, message_router: MR, logger: L, entropy_source: ES, node_signer: NS, signer_provider: SP, config: UserConfig, - params: ChainParameters, current_timestamp: u32, ldk_data_dir: PathBuf + params: ChainParameters, current_timestamp: u32, ldk_data_dir: PathBuf, + rgb_kv_store: Arc, ) -> Self where L: Clone, @@ -4015,7 +4017,7 @@ where best_block: RwLock::new(params.best_block), outbound_scid_aliases: Mutex::new(new_hash_set()), - pending_outbound_payments: OutboundPayments::new(new_hash_map(), ldk_data_dir.clone()), + pending_outbound_payments: OutboundPayments::new(new_hash_map(), rgb_kv_store.clone()), forward_htlcs: Mutex::new(new_hash_map()), decode_update_add_htlcs: Mutex::new(new_hash_map()), claimable_payments: Mutex::new(ClaimablePayments { claimable_payments: new_hash_map(), pending_claiming_payments: new_hash_map() }), @@ -4062,6 +4064,7 @@ where testing_dnssec_proof_offer_resolution_override: Mutex::new(new_hash_map()), ldk_data_dir, + rgb_kv_store, } } @@ -4180,7 +4183,8 @@ where }; match OutboundV1Channel::new(&self.fee_estimator, &self.entropy_source, &self.signer_provider, their_network_key, their_features, channel_value_satoshis, push_msat, user_channel_id, config, - self.best_block.read().unwrap().height, outbound_scid_alias, temporary_channel_id, &*self.logger, consignment_endpoint, self.ldk_data_dir.clone(), push_asset_amount) + self.best_block.read().unwrap().height, outbound_scid_alias, temporary_channel_id, &*self.logger, consignment_endpoint, self.ldk_data_dir.clone(), push_asset_amount, + self.rgb_kv_store.clone()) { Ok(res) => res, Err(e) => { @@ -5284,18 +5288,11 @@ where // The top-level caller should hold the total_consistency_lock read lock. debug_assert!(self.total_consistency_lock.try_write().is_err()); - let rgb_payment_info_hash_path_outbound = - get_rgb_payment_info_path(payment_hash, &self.ldk_data_dir, false); - let needs_rgb_modification = if rgb_payment_info_hash_path_outbound.exists() { - let info = parse_rgb_payment_info(&rgb_payment_info_hash_path_outbound); - if !info.swap_payment { - Some(info) - } else { - None - } - } else { - None - }; + let needs_rgb_modification = + match self.rgb_kv_store.read_rgb_payment_info(payment_hash, false) { + Ok(info) if !info.swap_payment => Some(info), + _ => None, + }; let modified_path; let path = if let Some(rgb_payment_info) = needs_rgb_modification { modified_path = { @@ -7601,15 +7598,14 @@ where .contains(&outgoing_amt_msat); if is_in_range && chan.context.is_usable() { if let Some((cid, outgoing_amount_rgb)) = outgoing_rgb_payment { - if !is_channel_rgb(&chan.context.channel_id, &self.ldk_data_dir) + if !is_channel_rgb(&chan.context.channel_id, self.rgb_kv_store.as_ref()) { return None; } - let (rgb_chan_info, _) = get_rgb_channel_info( + let rgb_chan_info = self.rgb_kv_store.read_rgb_channel_info( &chan.context.channel_id.0.as_hex().to_string(), - &self.ldk_data_dir, false, - ); + ).expect("channel info must exist in KVStore"); if rgb_chan_info.contract_id == *cid && rgb_chan_info.local_rgb_amount >= *outgoing_amount_rgb { @@ -10071,7 +10067,8 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ InboundV1Channel::new( &self.fee_estimator, &self.entropy_source, &self.signer_provider, *counterparty_node_id, &self.channel_type_features(), &peer_state.latest_features, &open_channel_msg, - user_channel_id, &config, best_block_height, &self.logger, accept_0conf, self.ldk_data_dir.clone() + user_channel_id, &config, best_block_height, &self.logger, accept_0conf, self.ldk_data_dir.clone(), + self.rgb_kv_store.clone(), ).map_err(|err| MsgHandleErrInternal::from_chan_no_close(err, *temporary_channel_id) ).map(|mut channel| { let logger = WithChannelContext::from(&self.logger, &channel.context, None); @@ -10093,6 +10090,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ user_channel_id, &config, best_block_height, &self.logger, self.ldk_data_dir.clone(), + self.rgb_kv_store.clone(), ).map_err(|e| { let channel_id = open_channel_msg.common_fields.temporary_channel_id; MsgHandleErrInternal::from_chan_no_close(e, channel_id) @@ -10362,7 +10360,8 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ let mut channel = InboundV1Channel::new( &self.fee_estimator, &self.entropy_source, &self.signer_provider, *counterparty_node_id, &self.channel_type_features(), &peer_state.latest_features, msg, user_channel_id, - &self.config.read().unwrap(), best_block_height, &self.logger, /*is_0conf=*/false, self.ldk_data_dir.clone() + &self.config.read().unwrap(), best_block_height, &self.logger, /*is_0conf=*/false, self.ldk_data_dir.clone(), + self.rgb_kv_store.clone(), ).map_err(|e| MsgHandleErrInternal::from_chan_no_close(e, msg.common_fields.temporary_channel_id))?; let logger = WithChannelContext::from(&self.logger, &channel.context, None); let message_send_event = channel.accept_inbound_channel(&&logger).map(|msg| { @@ -10380,6 +10379,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ &peer_state.latest_features, msg, user_channel_id, &self.config.read().unwrap(), best_block_height, &self.logger, self.ldk_data_dir.clone(), + self.rgb_kv_store.clone(), ).map_err(|e| MsgHandleErrInternal::from_chan_no_close(e, msg.common_fields.temporary_channel_id))?; let message_send_event = MessageSendEvent::SendAcceptChannelV2 { node_id: *counterparty_node_id, @@ -10464,7 +10464,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ Some(Ok(inbound_chan)) => { let logger = WithChannelContext::from(&self.logger, &inbound_chan.context, None); if let Some(consignment_endpoint) = &inbound_chan.funding.consignment_endpoint { - match handle_funding(&msg.temporary_channel_id, msg.funding_txid.to_string(), &self.ldk_data_dir, consignment_endpoint.clone(), inbound_chan.funding.push_asset_amount) { + match handle_funding(&msg.temporary_channel_id, msg.funding_txid.to_string(), &self.ldk_data_dir, consignment_endpoint.clone(), inbound_chan.funding.push_asset_amount, self.rgb_kv_store.as_ref()) { Ok(()) => (), Err(e) => { // at this point the channel initiator already transitioned its channel to the funded channel ID @@ -16858,6 +16858,9 @@ pub struct ChannelManagerReadArgs< /// LDK data directory pub ldk_data_dir: PathBuf, + + /// KVStore for RGB data persistence + pub rgb_kv_store: Arc, } impl< @@ -16892,6 +16895,7 @@ where config: UserConfig, mut channel_monitors: Vec<&'a ChannelMonitor<::EcdsaSigner>>, ldk_data_dir: PathBuf, + rgb_kv_store: Arc, ) -> Self { Self { entropy_source, @@ -16908,6 +16912,7 @@ where channel_monitors.drain(..).map(|monitor| (monitor.channel_id(), monitor)), ), ldk_data_dir, + rgb_kv_store, } } } @@ -17011,6 +17016,7 @@ where &args.signer_provider, &provided_channel_type_features(&args.config), args.ldk_data_dir.clone(), + args.rgb_kv_store.clone(), ), )?; let logger = WithChannelContext::from(&args.logger, &channel.context, None); @@ -17440,8 +17446,7 @@ where } pending_outbound_payments = Some(outbounds); } - let pending_outbounds = - OutboundPayments::new(pending_outbound_payments.unwrap(), args.ldk_data_dir.clone()); + let pending_outbounds = OutboundPayments::new(pending_outbound_payments.unwrap(), args.rgb_kv_store.clone()); for (peer_pubkey, peer_storage) in peer_storage_dir { if let Some(peer_state) = per_peer_state.get_mut(&peer_pubkey) { @@ -18384,6 +18389,7 @@ where testing_dnssec_proof_offer_resolution_override: Mutex::new(new_hash_map()), ldk_data_dir: args.ldk_data_dir, + rgb_kv_store: args.rgb_kv_store, }; let mut processed_claims: HashSet> = new_hash_set(); diff --git a/lightning/src/ln/outbound_payment.rs b/lightning/src/ln/outbound_payment.rs index 1178b4477..395c28a9c 100644 --- a/lightning/src/ln/outbound_payment.rs +++ b/lightning/src/ln/outbound_payment.rs @@ -26,9 +26,7 @@ use crate::offers::invoice::{Bolt12Invoice, DerivedSigningPubkey, InvoiceBuilder use crate::offers::invoice_request::InvoiceRequest; use crate::offers::nonce::Nonce; use crate::offers::static_invoice::StaticInvoice; -use crate::rgb_utils::{ - filter_first_hops, get_rgb_payment_info_path, is_payment_rgb, parse_rgb_payment_info, -}; +use crate::rgb_utils::RgbKvStoreExt; use crate::routing::router::{ BlindedTail, InFlightHtlcs, Path, PaymentParameters, Route, RouteParameters, RouteParametersConfig, Router, @@ -47,10 +45,9 @@ use core::ops::Deref; use core::sync::atomic::{AtomicBool, Ordering}; use core::time::Duration; -use std::path::PathBuf; - use crate::prelude::*; -use crate::sync::Mutex; +use crate::sync::{Arc, Mutex}; +use crate::util::persist::KVStoreSync; /// The number of ticks of [`ChannelManager::timer_tick_occurred`] until we time-out the idempotency /// of payments by [`PaymentId`]. See [`OutboundPayments::remove_stale_payments`]. @@ -846,13 +843,13 @@ pub(super) struct OutboundPayments { pub(super) pending_outbound_payments: Mutex>, awaiting_invoice: AtomicBool, retry_lock: Mutex<()>, - ldk_data_dir: PathBuf, + rgb_kv_store: Arc, } impl OutboundPayments { pub(super) fn new( pending_outbound_payments: HashMap, - ldk_data_dir: PathBuf, + rgb_kv_store: Arc, ) -> Self { let has_invoice_requests = pending_outbound_payments.values().any(|payment| { matches!( @@ -867,7 +864,7 @@ impl OutboundPayments { pending_outbound_payments: Mutex::new(pending_outbound_payments), awaiting_invoice: AtomicBool::new(has_invoice_requests), retry_lock: Mutex::new(()), - ldk_data_dir, + rgb_kv_store, } } } @@ -1050,9 +1047,11 @@ impl OutboundPayments { } let mut filtered_first_hops = first_hops.into_iter().collect::>(); - let rgb_payment = is_payment_rgb(&self.ldk_data_dir, &payment_hash).then(|| { - filter_first_hops(&self.ldk_data_dir, &payment_hash, &mut filtered_first_hops) - }); + let rgb_payment = if self.rgb_kv_store.is_payment_rgb(&payment_hash) { + Some(self.rgb_kv_store.filter_first_hops(&payment_hash, &mut filtered_first_hops)) + } else { + None + }; let mut route_params = RouteParameters::from_payment_params_and_value( PaymentParameters::from_bolt12_invoice(&invoice) .with_user_config_ignoring_fee_limit(params_config), invoice.amount_msats(), @@ -1411,14 +1410,11 @@ impl OutboundPayments { .. } = pmt { - let rgb_payment_info_path = - get_rgb_payment_info_path(payment_hash, &self.ldk_data_dir, false); - let rgb_payment = if rgb_payment_info_path.exists() { - let rgb_payment_info = parse_rgb_payment_info(&rgb_payment_info_path); - Some((rgb_payment_info.contract_id, rgb_payment_info.amount)) - } else { - None - }; + let rgb_payment = + match self.rgb_kv_store.read_rgb_payment_info(payment_hash, false) { + Ok(info) => Some((info.contract_id, info.amount)), + Err(_) => None, + }; if pending_amt_msat < total_msat { retry_id_route_params = Some(( *payment_hash, @@ -1570,9 +1566,9 @@ impl OutboundPayments { SP: Fn(SendAlongPathArgs) -> Result<(), APIError>, { let mut filtered_first_hops = first_hops.into_iter().collect::>(); - is_payment_rgb(&self.ldk_data_dir, &payment_hash).then(|| { - filter_first_hops(&self.ldk_data_dir, &payment_hash, &mut filtered_first_hops) - }); + if self.rgb_kv_store.is_payment_rgb(&payment_hash) { + self.rgb_kv_store.filter_first_hops(&payment_hash, &mut filtered_first_hops); + } let route = self.find_initial_route( payment_id, payment_hash, &recipient_onion, keysend_preimage, None, &mut route_params, router, &filtered_first_hops, &inflight_htlcs, node_signer, best_block_height, logger, @@ -1627,9 +1623,9 @@ impl OutboundPayments { } let mut filtered_first_hops = first_hops.into_iter().collect::>(); - is_payment_rgb(&self.ldk_data_dir, &payment_hash).then(|| { - filter_first_hops(&self.ldk_data_dir, &payment_hash, &mut filtered_first_hops) - }); + if self.rgb_kv_store.is_payment_rgb(&payment_hash) { + self.rgb_kv_store.filter_first_hops(&payment_hash, &mut filtered_first_hops); + } let mut route = match router.find_route_with_id( &node_signer.get_node_id(Recipient::Node).unwrap(), &route_params, diff --git a/lightning/src/rgb_utils/mod.rs b/lightning/src/rgb_utils/mod.rs index 52d79e38a..d412b3b5e 100644 --- a/lightning/src/rgb_utils/mod.rs +++ b/lightning/src/rgb_utils/mod.rs @@ -10,6 +10,7 @@ use crate::ln::types::ChannelId; use crate::sign::SignerProvider; use crate::types::features::ChannelTypeFeatures; use crate::types::payment::PaymentHash; +use crate::util::persist::KVStoreSync; use bitcoin::blockdata::transaction::Transaction; use bitcoin::hex::DisplayHex; @@ -29,13 +30,15 @@ use rgb_lib::{ use serde::{Deserialize, Serialize}; use tokio::runtime::Handle; +use crate::io; use core::ops::Deref; use std::collections::HashMap; use std::fs; use std::path::{Path, PathBuf}; use std::str::FromStr; +use std::sync::Arc; -/// Static blinding costant (will be removed in the future) +/// Static blinding constant (will be removed in the future) pub const STATIC_BLINDING: u64 = 777; /// Name of the file containing the bitcoin network pub const BITCOIN_NETWORK_FNAME: &str = "bitcoin_network"; @@ -49,10 +52,24 @@ pub const WALLET_ACCOUNT_XPUB_VANILLA_FNAME: &str = "wallet_account_xpub_vanilla pub const WALLET_ACCOUNT_XPUB_COLORED_FNAME: &str = "wallet_account_xpub_colored"; /// Name of the file containing the master fingerprint of the wallet pub const WALLET_MASTER_FINGERPRINT_FNAME: &str = "wallet_master_fingerprint"; -const INBOUND_EXT: &str = "inbound"; -const OUTBOUND_EXT: &str = "outbound"; const VANILLA_SYNC_LOOKBACK: u32 = 20; +// kv_store namespace constants for RGB data persistence +/// Primary namespace for all RGB data +pub const RGB_PRIMARY_NS: &str = "rgb"; +/// Secondary namespace for channel info +pub const RGB_CHANNEL_INFO_NS: &str = "channel_info"; +/// Secondary namespace for pending channel info +pub const RGB_CHANNEL_INFO_PENDING_NS: &str = "channel_info_pending"; +/// Secondary namespace for inbound payment info +pub const RGB_PAYMENT_INFO_INBOUND_NS: &str = "payment_info_inbound"; +/// Secondary namespace for outbound payment info +pub const RGB_PAYMENT_INFO_OUTBOUND_NS: &str = "payment_info_outbound"; +/// Secondary namespace for transfer info +pub const RGB_TRANSFER_INFO_NS: &str = "transfer_info"; +/// Secondary namespace for consignment data +pub const RGB_CONSIGNMENT_NS: &str = "consignment"; + /// RGB channel info #[derive(Debug, Clone, Deserialize, Serialize)] pub struct RgbInfo { @@ -239,18 +256,6 @@ async fn _accept_transfer( .unwrap() } -/// Read TransferInfo file -pub fn read_rgb_transfer_info(path: &Path) -> TransferInfo { - let serialized_info = fs::read_to_string(path).expect("able to read transfer info file"); - serde_json::from_str(&serialized_info).expect("valid transfer info") -} - -/// Write TransferInfo file -pub fn write_rgb_transfer_info(path: &PathBuf, info: &TransferInfo) { - let serialized_info = serde_json::to_string(&info).expect("valid transfer info"); - fs::write(path, serialized_info).expect("able to write transfer info file") -} - fn _counterparty_output_index( outputs: &[TxOut], channel_type_features: &ChannelTypeFeatures, payment_key: &PublicKey, ) -> Option { @@ -283,10 +288,11 @@ where { let channel_id = &channel_context.channel_id; let ldk_data_dir = channel_context.ldk_data_dir.as_path(); + let kv_store = channel_context.rgb_kv_store.as_ref(); let commitment_tx = commitment_transaction.clone().built.transaction; - let (rgb_info, _) = get_rgb_channel_info_pending(channel_id, ldk_data_dir); + let rgb_info = get_rgb_channel_info_pending(channel_id, kv_store); let contract_id = rgb_info.contract_id; let chan_id = channel_id.0.as_hex(); @@ -307,31 +313,28 @@ where let htlc_payment_hash = htlc.payment_hash.0.as_hex().to_string(); let htlc_proxy_id = format!("{chan_id}{htlc_payment_hash}"); - let mut rgb_payment_info_proxy_id_path = ldk_data_dir.join(htlc_proxy_id); - let rgb_payment_info_path = ldk_data_dir.join(htlc_payment_hash); - let mut rgb_payment_info_path = rgb_payment_info_path.clone(); - if inbound { - rgb_payment_info_proxy_id_path.set_extension(INBOUND_EXT); - rgb_payment_info_path.set_extension(INBOUND_EXT); - } else { - rgb_payment_info_proxy_id_path.set_extension(OUTBOUND_EXT); - rgb_payment_info_path.set_extension(OUTBOUND_EXT); - } - let rgb_payment_info_tmp_path = _append_pending_extension(&rgb_payment_info_path); + let pending_key = format!("{htlc_payment_hash}_pending"); + let namespace = + if inbound { RGB_PAYMENT_INFO_INBOUND_NS } else { RGB_PAYMENT_INFO_OUTBOUND_NS }; - if rgb_payment_info_tmp_path.exists() { - let mut rgb_payment_info = parse_rgb_payment_info(&rgb_payment_info_tmp_path); + if let Ok(data) = kv_store.read(RGB_PRIMARY_NS, namespace, &pending_key) { + let mut rgb_payment_info: RgbPaymentInfo = + bincode::deserialize(&data).expect("valid data"); rgb_payment_info.local_rgb_amount = rgb_info.local_rgb_amount; rgb_payment_info.remote_rgb_amount = rgb_info.remote_rgb_amount; - let serialized_info = - serde_json::to_string(&rgb_payment_info).expect("valid rgb payment info"); - fs::write(&rgb_payment_info_proxy_id_path, serialized_info) - .expect("able to write rgb payment info file"); - fs::remove_file(rgb_payment_info_tmp_path).expect("able to remove file"); + let data = bincode::serialize(&rgb_payment_info).expect("valid rgb payment info"); + kv_store + .write(RGB_PRIMARY_NS, namespace, &htlc_proxy_id, data) + .expect("able to write rgb payment info"); + kv_store + .remove(RGB_PRIMARY_NS, namespace, &pending_key, false) + .expect("able to remove pending payment info"); } - let rgb_payment_info = if rgb_payment_info_proxy_id_path.exists() { - parse_rgb_payment_info(&rgb_payment_info_proxy_id_path) + let rgb_payment_info = if let Ok(data) = + kv_store.read(RGB_PRIMARY_NS, namespace, &htlc_proxy_id) + { + bincode::deserialize(&data).expect("valid data") } else { let rgb_payment_info = RgbPaymentInfo { contract_id, @@ -341,12 +344,13 @@ where swap_payment: true, inbound, }; - let serialized_info = - serde_json::to_string(&rgb_payment_info).expect("valid rgb payment info"); - fs::write(rgb_payment_info_proxy_id_path, serialized_info.clone()) - .expect("able to write rgb payment info file"); - fs::write(rgb_payment_info_path, serialized_info) - .expect("able to write rgb payment info file"); + let data = bincode::serialize(&rgb_payment_info).expect("valid rgb payment info"); + kv_store + .write(RGB_PRIMARY_NS, namespace, &htlc_proxy_id, data.clone()) + .expect("able to write rgb payment info"); + kv_store + .write(RGB_PRIMARY_NS, namespace, &htlc_payment_hash, data) + .expect("able to write rgb payment info"); rgb_payment_info }; @@ -415,15 +419,13 @@ where wallet.consume_fascia(fascia.clone(), Some(WitnessOrd::Ignored)).unwrap(); - // save RGB transfer data to disk let rgb_amount = if counterparty { vout_p2wpkh_amt + rgb_offered_htlc } else { vout_p2wsh_amt + rgb_received_htlc }; let transfer_info = TransferInfo { contract_id, rgb_amount }; - let transfer_info_path = ldk_data_dir.join(format!("{txid}_transfer_info")); - write_rgb_transfer_info(&transfer_info_path, &transfer_info); + kv_store.write_rgb_transfer_info(&txid.to_string(), &transfer_info); Ok(()) } @@ -431,6 +433,7 @@ where /// Color HTLC transaction pub(crate) fn color_htlc( htlc_tx: &mut Transaction, htlc: &HTLCOutputInCommitment, ldk_data_dir: &Path, + kv_store: &dyn KVStoreSync, ) -> Result<(), ChannelError> { if htlc.rgb_payment.is_none_or(|(_, a)| a == 0) { return Ok(()); @@ -440,8 +443,7 @@ pub(crate) fn color_htlc( let consignment_htlc_outpoint = htlc_tx.input.first().unwrap().previous_output; let commitment_txid = consignment_htlc_outpoint.txid.to_string(); - let transfer_info_path = ldk_data_dir.join(format!("{commitment_txid}_transfer_info")); - let transfer_info = read_rgb_transfer_info(&transfer_info_path); + let transfer_info = kv_store.read_rgb_transfer_info(&commitment_txid); let contract_id = transfer_info.contract_id; let asset_coloring_info = AssetColoringInfo { @@ -469,10 +471,8 @@ pub(crate) fn color_htlc( wallet.consume_fascia(fascia.clone(), Some(WitnessOrd::Ignored)).unwrap(); - // save RGB transfer data to disk let transfer_info = TransferInfo { contract_id, rgb_amount: htlc_amount_rgb }; - let transfer_info_path = ldk_data_dir.join(format!("{txid}_transfer_info")); - write_rgb_transfer_info(&transfer_info_path, &transfer_info); + kv_store.write_rgb_transfer_info(&txid.to_string(), &transfer_info); Ok(()) } @@ -480,10 +480,11 @@ pub(crate) fn color_htlc( /// Color closing transaction pub(crate) fn color_closing( channel_id: &ChannelId, closing_transaction: &mut ClosingTransaction, ldk_data_dir: &Path, + kv_store: &dyn KVStoreSync, ) -> Result<(), ChannelError> { let closing_tx = closing_transaction.clone().built; - let (rgb_info, _) = get_rgb_channel_info_pending(channel_id, ldk_data_dir); + let rgb_info = get_rgb_channel_info_pending(channel_id, kv_store); let contract_id = rgb_info.contract_id; let holder_vout_amount = rgb_info.local_rgb_amount; @@ -534,85 +535,35 @@ pub(crate) fn color_closing( wallet.consume_fascia(fascia.clone(), Some(WitnessOrd::Ignored)).unwrap(); - // save RGB transfer data to disk let transfer_info = TransferInfo { contract_id, rgb_amount: holder_vout_amount }; - let transfer_info_path = ldk_data_dir.join(format!("{txid}_transfer_info")); - write_rgb_transfer_info(&transfer_info_path, &transfer_info); + kv_store.write_rgb_transfer_info(&txid.to_string(), &transfer_info); Ok(()) } -/// Get RgbPaymentInfo file path -pub fn get_rgb_payment_info_path( - payment_hash: &PaymentHash, ldk_data_dir: &Path, inbound: bool, -) -> PathBuf { - let mut path = ldk_data_dir.join(payment_hash.0.as_hex().to_string()); - path.set_extension(if inbound { INBOUND_EXT } else { OUTBOUND_EXT }); - path -} - -/// Parse RgbPaymentInfo -pub fn parse_rgb_payment_info(rgb_payment_info_path: &PathBuf) -> RgbPaymentInfo { - let serialized_info = - fs::read_to_string(rgb_payment_info_path).expect("valid rgb payment info"); - serde_json::from_str(&serialized_info).expect("valid rgb info file") -} - -/// Get RgbInfo file path -pub fn get_rgb_channel_info_path(channel_id: &str, ldk_data_dir: &Path, pending: bool) -> PathBuf { - let mut info_file_path = ldk_data_dir.join(channel_id); - if pending { - info_file_path.set_extension("pending"); - } - info_file_path -} - -/// Get RgbInfo file +/// Get RgbInfo from KVStore pub(crate) fn get_rgb_channel_info( - channel_id: &str, ldk_data_dir: &Path, pending: bool, -) -> (RgbInfo, PathBuf) { - let info_file_path = get_rgb_channel_info_path(channel_id, ldk_data_dir, pending); - let info = parse_rgb_channel_info(&info_file_path); - (info, info_file_path) -} - -/// Get pending RgbInfo file -pub fn get_rgb_channel_info_pending( - channel_id: &ChannelId, ldk_data_dir: &Path, -) -> (RgbInfo, PathBuf) { - get_rgb_channel_info(&channel_id.0.as_hex().to_string(), ldk_data_dir, true) -} - -/// Parse RgbInfo -pub fn parse_rgb_channel_info(rgb_channel_info_path: &PathBuf) -> RgbInfo { - let serialized_info = fs::read_to_string(rgb_channel_info_path).expect("valid rgb info file"); - serde_json::from_str(&serialized_info).expect("valid rgb info file") -} - -/// Whether the channel data for a channel exist -pub fn is_channel_rgb(channel_id: &ChannelId, ldk_data_dir: &Path) -> bool { - get_rgb_channel_info_path(&channel_id.0.as_hex().to_string(), ldk_data_dir, false).exists() + channel_id: &str, pending: bool, kv_store: &dyn KVStoreSync, +) -> RgbInfo { + kv_store.read_rgb_channel_info(channel_id, pending).expect("channel info must exist in KVStore") } -/// Write RgbInfo file -pub fn write_rgb_channel_info(path: &PathBuf, rgb_info: &RgbInfo) { - let serialized_info = serde_json::to_string(&rgb_info).expect("valid rgb info"); - fs::write(path, serialized_info).expect("able to write") +/// Get pending RgbInfo from KVStore +pub fn get_rgb_channel_info_pending(channel_id: &ChannelId, kv_store: &dyn KVStoreSync) -> RgbInfo { + get_rgb_channel_info(&channel_id.0.as_hex().to_string(), true, kv_store) } -fn _append_pending_extension(path: &Path) -> PathBuf { - let mut new_path = path.to_path_buf(); - new_path.set_extension(format!("{}_pending", new_path.extension().unwrap().to_string_lossy())); - new_path +/// Whether the channel has RGB data in KVStore +pub fn is_channel_rgb(channel_id: &ChannelId, kv_store: &dyn KVStoreSync) -> bool { + let channel_id_str = channel_id.0.as_hex().to_string(); + kv_store.read_rgb_channel_info(&channel_id_str, false).is_ok() } -/// Write RGB payment info to file -pub fn write_rgb_payment_info_file( - ldk_data_dir: &Path, payment_hash: &PaymentHash, contract_id: ContractId, amount_rgb: u64, - swap_payment: bool, inbound: bool, +/// Write RGB payment info to database +pub fn write_rgb_payment_info( + payment_hash: &PaymentHash, contract_id: ContractId, amount_rgb: u64, swap_payment: bool, + inbound: bool, kv_store: &Arc, ) { - let rgb_payment_info_path = get_rgb_payment_info_path(payment_hash, ldk_data_dir, inbound); - let rgb_payment_info_tmp_path = _append_pending_extension(&rgb_payment_info_path); let rgb_payment_info = RgbPaymentInfo { contract_id, amount: amount_rgb, @@ -621,42 +572,40 @@ pub fn write_rgb_payment_info_file( swap_payment, inbound, }; - let serialized_info = serde_json::to_string(&rgb_payment_info).expect("valid rgb payment info"); - std::fs::write(rgb_payment_info_path, serialized_info.clone()) - .expect("able to write rgb payment info file"); - std::fs::write(rgb_payment_info_tmp_path, serialized_info) - .expect("able to write rgb payment info tmp file"); -} - -/// Rename RGB files from temporary to final channel ID -pub(crate) fn rename_rgb_files( - channel_id: &ChannelId, temporary_channel_id: &ChannelId, ldk_data_dir: &Path, + kv_store.write_rgb_payment_info(payment_hash, &rgb_payment_info); + let payment_hash_hex = payment_hash.0.as_hex(); + let pending_key = format!("{payment_hash_hex}_pending"); + let namespace = + if inbound { RGB_PAYMENT_INFO_INBOUND_NS } else { RGB_PAYMENT_INFO_OUTBOUND_NS }; + let data = bincode::serialize(&rgb_payment_info).expect("valid rgb payment info"); + kv_store + .write(RGB_PRIMARY_NS, namespace, &pending_key, data) + .expect("able to write rgb payment info pending"); +} + +/// update RGB data from temporary to final channel ID in KVStore +pub(crate) fn update_rgb_channel_id( + channel_id: &ChannelId, temporary_channel_id: &ChannelId, kv_store: &dyn KVStoreSync, ) { + if channel_id == temporary_channel_id { + return; + } let temp_chan_id = temporary_channel_id.0.as_hex().to_string(); let chan_id = channel_id.0.as_hex().to_string(); - fs::rename( - get_rgb_channel_info_path(&temp_chan_id, ldk_data_dir, false), - get_rgb_channel_info_path(&chan_id, ldk_data_dir, false), - ) - .expect("rename ok"); - fs::rename( - get_rgb_channel_info_path(&temp_chan_id, ldk_data_dir, true), - get_rgb_channel_info_path(&chan_id, ldk_data_dir, true), - ) - .expect("rename ok"); + kv_store.update_rgb_channel_info(&temp_chan_id, &chan_id, false).expect("rename ok"); + kv_store.update_rgb_channel_info(&temp_chan_id, &chan_id, true).expect("rename ok"); - let funding_consignment_tmp = ldk_data_dir.join(format!("consignment_{}", temp_chan_id)); - if funding_consignment_tmp.exists() { - let funding_consignment = ldk_data_dir.join(format!("consignment_{}", chan_id)); - fs::rename(funding_consignment_tmp, funding_consignment).expect("rename ok"); + if let Ok(consignment_data) = kv_store.read_rgb_consignment(&temp_chan_id) { + kv_store.write_rgb_consignment(&chan_id, consignment_data); + kv_store.remove_rgb_consignment(&temp_chan_id); } } /// Handle funding on the receiver side pub(crate) fn handle_funding( temporary_channel_id: &ChannelId, funding_txid: String, ldk_data_dir: &Path, - consignment_endpoint: RgbTransport, push_asset_amount: Option, + consignment_endpoint: RgbTransport, push_asset_amount: Option, kv_store: &dyn KVStoreSync, ) -> Result<(), ChannelError> { let handle = Handle::current(); let _ = handle.enter(); @@ -713,24 +662,19 @@ pub(crate) fn handle_funding( batch_transfer_idx: None, }; let temporary_channel_id_str = temporary_channel_id.0.as_hex().to_string(); - write_rgb_channel_info( - &get_rgb_channel_info_path(&temporary_channel_id_str, ldk_data_dir, true), - &rgb_info, - ); - write_rgb_channel_info( - &get_rgb_channel_info_path(&temporary_channel_id_str, ldk_data_dir, false), - &rgb_info, - ); + + kv_store.write_rgb_channel_info(&temporary_channel_id_str, &rgb_info, true); + kv_store.write_rgb_channel_info(&temporary_channel_id_str, &rgb_info, false); Ok(()) } -/// Update RGB channel amount +/// Update RGB channel amount in KVStore pub fn update_rgb_channel_amount( - channel_id: &str, rgb_offered_htlc: u64, rgb_received_htlc: u64, ldk_data_dir: &Path, - pending: bool, + channel_id: &str, rgb_offered_htlc: u64, rgb_received_htlc: u64, pending: bool, + kv_store: &dyn KVStoreSync, ) { - let (mut rgb_info, info_file_path) = get_rgb_channel_info(channel_id, ldk_data_dir, pending); + let mut rgb_info = get_rgb_channel_info(channel_id, pending, kv_store); if rgb_offered_htlc > rgb_received_htlc { let spent = rgb_offered_htlc - rgb_received_htlc; @@ -742,45 +686,152 @@ pub fn update_rgb_channel_amount( rgb_info.remote_rgb_amount -= received; } - write_rgb_channel_info(&info_file_path, &rgb_info) + kv_store.write_rgb_channel_info(channel_id, &rgb_info, pending); } /// Update pending RGB channel amount pub(crate) fn update_rgb_channel_amount_pending( - channel_id: &ChannelId, rgb_offered_htlc: u64, rgb_received_htlc: u64, ldk_data_dir: &Path, + channel_id: &ChannelId, rgb_offered_htlc: u64, rgb_received_htlc: u64, + kv_store: &dyn KVStoreSync, ) { update_rgb_channel_amount( &channel_id.0.as_hex().to_string(), rgb_offered_htlc, rgb_received_htlc, - ldk_data_dir, true, + kv_store, ) } -/// Whether the payment is colored -pub(crate) fn is_payment_rgb(ldk_data_dir: &Path, payment_hash: &PaymentHash) -> bool { - get_rgb_payment_info_path(payment_hash, ldk_data_dir, false).exists() - || get_rgb_payment_info_path(payment_hash, ldk_data_dir, true).exists() -} - -/// Detect the contract ID of the payment and then filter hops based on contract ID and amount -pub(crate) fn filter_first_hops( - ldk_data_dir: &Path, payment_hash: &PaymentHash, first_hops: &mut Vec, -) -> (ContractId, u64) { - let rgb_payment_info_path = get_rgb_payment_info_path(payment_hash, ldk_data_dir, false); - let rgb_payment_info = parse_rgb_payment_info(&rgb_payment_info_path); - let contract_id = rgb_payment_info.contract_id; - let rgb_amount = rgb_payment_info.amount; - first_hops.retain(|h| { - let info_file_path = ldk_data_dir.join(h.channel_id.0.as_hex().to_string()); - if !info_file_path.exists() { - return false; - } - let serialized_info = fs::read_to_string(info_file_path).expect("valid rgb info file"); - let rgb_info: RgbInfo = - serde_json::from_str(&serialized_info).expect("valid rgb info file"); - rgb_info.contract_id == contract_id && rgb_info.local_rgb_amount >= rgb_amount - }); - (contract_id, rgb_amount) +/// extension trait for RGB-specific KVStore operations +pub trait RgbKvStoreExt { + /// read transfer info from KVStore + fn read_rgb_transfer_info(&self, txid: &str) -> TransferInfo; + /// write transfer info to KVStore + fn write_rgb_transfer_info(&self, txid: &str, info: &TransferInfo); + /// read channel info from KVStore + fn read_rgb_channel_info(&self, channel_id: &str, pending: bool) -> Result; + /// write channel info to KVStore + fn write_rgb_channel_info(&self, channel_id: &str, rgb_info: &RgbInfo, pending: bool); + /// read payment info from KVStore + fn read_rgb_payment_info( + &self, payment_hash: &PaymentHash, inbound: bool, + ) -> Result; + /// write payment info to KVStore + fn write_rgb_payment_info(&self, payment_hash: &PaymentHash, info: &RgbPaymentInfo); + /// read consignment from KVStore + fn read_rgb_consignment(&self, id: &str) -> Result, io::Error>; + /// write consignment to KVStore + fn write_rgb_consignment(&self, id: &str, data: Vec); + /// remove channel info from KVStore + fn remove_rgb_channel_info(&self, channel_id: &str, pending: bool) -> Result<(), io::Error>; + /// remove consignment from KVStore + fn remove_rgb_consignment(&self, id: &str); + /// move channel info from one key to another (read + write + remove) + fn update_rgb_channel_info( + &self, old_channel_id: &str, new_channel_id: &str, pending: bool, + ) -> Result<(), io::Error>; + /// whether the payment is colored + fn is_payment_rgb(&self, payment_hash: &PaymentHash) -> bool; + /// filter first hops to only include channels with sufficient RGB assets + fn filter_first_hops( + &self, payment_hash: &PaymentHash, first_hops: &mut Vec, + ) -> (ContractId, u64); +} + +impl RgbKvStoreExt for K { + fn read_rgb_transfer_info(&self, txid: &str) -> TransferInfo { + let data = self.read(RGB_PRIMARY_NS, RGB_TRANSFER_INFO_NS, txid) + .expect("KVStore read failed"); + bincode::deserialize(&data).expect("valid transfer info") + } + + fn write_rgb_transfer_info(&self, txid: &str, info: &TransferInfo) { + let data = bincode::serialize(info).expect("valid transfer info"); + self.write(RGB_PRIMARY_NS, RGB_TRANSFER_INFO_NS, txid, data) + .expect("KVStore write failed"); + } + + fn read_rgb_channel_info(&self, channel_id: &str, pending: bool) -> Result { + let namespace = if pending { RGB_CHANNEL_INFO_PENDING_NS } else { RGB_CHANNEL_INFO_NS }; + let data = self.read(RGB_PRIMARY_NS, namespace, channel_id)?; + bincode::deserialize(&data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) + } + + fn write_rgb_channel_info(&self, channel_id: &str, rgb_info: &RgbInfo, pending: bool) { + let namespace = if pending { RGB_CHANNEL_INFO_PENDING_NS } else { RGB_CHANNEL_INFO_NS }; + let data = bincode::serialize(rgb_info).expect("valid rgb channel info"); + self.write(RGB_PRIMARY_NS, namespace, channel_id, data) + .expect("KVStore write failed"); + } + + fn read_rgb_payment_info( + &self, payment_hash: &PaymentHash, inbound: bool, + ) -> Result { + let namespace = + if inbound { RGB_PAYMENT_INFO_INBOUND_NS } else { RGB_PAYMENT_INFO_OUTBOUND_NS }; + let key = payment_hash.0.as_hex().to_string(); + let data = self.read(RGB_PRIMARY_NS, namespace, &key)?; + bincode::deserialize(&data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) + } + + fn write_rgb_payment_info(&self, payment_hash: &PaymentHash, info: &RgbPaymentInfo) { + let namespace = + if info.inbound { RGB_PAYMENT_INFO_INBOUND_NS } else { RGB_PAYMENT_INFO_OUTBOUND_NS }; + let key = payment_hash.0.as_hex().to_string(); + let data = bincode::serialize(info).expect("valid rgb payment info"); + self.write(RGB_PRIMARY_NS, namespace, &key, data) + .expect("KVStore write failed"); + } + + fn read_rgb_consignment(&self, id: &str) -> Result, io::Error> { + self.read(RGB_PRIMARY_NS, RGB_CONSIGNMENT_NS, id) + } + + fn write_rgb_consignment(&self, id: &str, data: Vec) { + self.write(RGB_PRIMARY_NS, RGB_CONSIGNMENT_NS, id, data) + .expect("KVStore write failed"); + } + + fn remove_rgb_channel_info(&self, channel_id: &str, pending: bool) -> Result<(), io::Error> { + let namespace = if pending { RGB_CHANNEL_INFO_PENDING_NS } else { RGB_CHANNEL_INFO_NS }; + self.remove(RGB_PRIMARY_NS, namespace, channel_id, false) + } + + fn remove_rgb_consignment(&self, id: &str) { + self.remove(RGB_PRIMARY_NS, RGB_CONSIGNMENT_NS, id, false) + .expect("KVStore remove failed"); + } + + fn update_rgb_channel_info( + &self, old_channel_id: &str, new_channel_id: &str, pending: bool, + ) -> Result<(), io::Error> { + let rgb_info = self.read_rgb_channel_info(old_channel_id, pending)?; + self.write_rgb_channel_info(new_channel_id, &rgb_info, pending); + self.remove_rgb_channel_info(old_channel_id, pending) + } + + fn is_payment_rgb(&self, payment_hash: &PaymentHash) -> bool { + self.read_rgb_payment_info(payment_hash, false).is_ok() + || self.read_rgb_payment_info(payment_hash, true).is_ok() + } + + fn filter_first_hops( + &self, payment_hash: &PaymentHash, first_hops: &mut Vec, + ) -> (ContractId, u64) { + let rgb_payment_info = self.read_rgb_payment_info(payment_hash, false) + .expect("payment info must exist"); + let contract_id = rgb_payment_info.contract_id; + let rgb_amount = rgb_payment_info.amount; + first_hops.retain(|h| { + let channel_id_str = h.channel_id.0.as_hex().to_string(); + match self.read_rgb_channel_info(&channel_id_str, false) { + Ok(rgb_info) => { + rgb_info.contract_id == contract_id && rgb_info.local_rgb_amount >= rgb_amount + }, + Err(_) => false, + } + }); + (contract_id, rgb_amount) + } } diff --git a/lightning/src/sign/mod.rs b/lightning/src/sign/mod.rs index 58eaeccac..5f9babfad 100644 --- a/lightning/src/sign/mod.rs +++ b/lightning/src/sign/mod.rs @@ -38,6 +38,7 @@ use bitcoin::{secp256k1, Psbt, Sequence, Txid, WPubkeyHash, Witness}; use lightning_invoice::RawBolt11Invoice; use std::path::PathBuf; +use std::sync::Arc; use crate::chain::transaction::OutPoint; use crate::crypto::utils::{hkdf_extract_expand_twice, sign, sign_with_aux_rand}; @@ -62,6 +63,7 @@ use crate::rgb_utils::color_htlc; use crate::types::features::ChannelTypeFeatures; use crate::types::payment::PaymentPreimage; use crate::util::async_poll::AsyncResult; +use crate::util::persist::KVStoreSync; use crate::util::ser::{ReadableArgs, Writeable}; use crate::util::transaction_utils; @@ -1218,6 +1220,8 @@ pub struct InMemorySigner { entropy_source: RandomBytes, /// The LDK data directory ldk_data_dir: PathBuf, + /// KVStore for RGB data persistence + rgb_kv_store: Arc, } impl PartialEq for InMemorySigner { @@ -1248,6 +1252,7 @@ impl Clone for InMemorySigner { channel_keys_id: self.channel_keys_id, entropy_source: RandomBytes::new(self.get_secure_random_bytes()), ldk_data_dir: self.ldk_data_dir.clone(), + rgb_kv_store: self.rgb_kv_store.clone(), } } } @@ -1259,6 +1264,7 @@ impl InMemorySigner { payment_key_v2: SecretKey, v2_remote_key_derivation: bool, delayed_payment_base_key: SecretKey, htlc_base_key: SecretKey, commitment_seed: [u8; 32], channel_keys_id: [u8; 32], ldk_data_dir: PathBuf, rand_bytes_unique_start: [u8; 32], + rgb_kv_store: Arc, ) -> InMemorySigner { InMemorySigner { funding_key: sealed::MaybeTweakedSecretKey::from(funding_key), @@ -1272,6 +1278,7 @@ impl InMemorySigner { channel_keys_id, entropy_source: RandomBytes::new(rand_bytes_unique_start), ldk_data_dir, + rgb_kv_store, } } @@ -1281,6 +1288,7 @@ impl InMemorySigner { payment_key_v2: SecretKey, v2_remote_key_derivation: bool, delayed_payment_base_key: SecretKey, htlc_base_key: SecretKey, commitment_seed: [u8; 32], channel_keys_id: [u8; 32], ldk_data_dir: PathBuf, rand_bytes_unique_start: [u8; 32], + rgb_kv_store: Arc, ) -> InMemorySigner { InMemorySigner { funding_key: sealed::MaybeTweakedSecretKey::from(funding_key), @@ -1294,6 +1302,7 @@ impl InMemorySigner { channel_keys_id, entropy_source: RandomBytes::new(rand_bytes_unique_start), ldk_data_dir, + rgb_kv_store, } } @@ -1568,7 +1577,7 @@ impl EcdsaChannelSigner for InMemorySigner { &keys.revocation_key, ); if commitment_tx.is_colored() { - if let Err(_e) = color_htlc(&mut htlc_tx, htlc, &self.ldk_data_dir) { + if let Err(_e) = color_htlc(&mut htlc_tx, htlc, &self.ldk_data_dir, self.rgb_kv_store.as_ref()) { return Err(()); } } @@ -1993,6 +2002,7 @@ pub struct KeysManager { starting_time_secs: u64, starting_time_nanos: u32, ldk_data_dir: PathBuf, + rgb_kv_store: Arc, } impl KeysManager { @@ -2021,6 +2031,7 @@ impl KeysManager { pub fn new( seed: &[u8; 32], starting_time_secs: u64, starting_time_nanos: u32, v2_remote_key_derivation: bool, ldk_data_dir: PathBuf, + rgb_kv_store: Arc, ) -> Self { // Constants for key derivation path indices used in this function. const NODE_SECRET_INDEX: ChildNumber = ChildNumber::Hardened { index: 0 }; @@ -2115,6 +2126,7 @@ impl KeysManager { starting_time_secs, starting_time_nanos, ldk_data_dir, + rgb_kv_store, }; let secp_seed = res.get_secure_random_bytes(); res.secp_ctx.seeded_randomize(&secp_seed); @@ -2234,6 +2246,7 @@ impl KeysManager { params.clone(), self.ldk_data_dir.clone(), prng_seed, + self.rgb_kv_store.clone(), ) } @@ -2651,6 +2664,7 @@ impl PhantomKeysManager { pub fn new( seed: &[u8; 32], starting_time_secs: u64, starting_time_nanos: u32, cross_node_seed: &[u8; 32], v2_remote_key_derivation: bool, ldk_data_dir: PathBuf, + rgb_kv_store: Arc, ) -> Self { let inner = KeysManager::new( seed, @@ -2658,6 +2672,7 @@ impl PhantomKeysManager { starting_time_nanos, v2_remote_key_derivation, ldk_data_dir, + rgb_kv_store, ); let (inbound_key, phantom_key) = hkdf_extract_expand_twice( b"LDK Inbound and Phantom Payment Key Expansion",