-
Notifications
You must be signed in to change notification settings - Fork 417
[diskann-garnet] Implement BIN and Q8 quantizers #1050
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,7 +26,7 @@ | |
| use crossbeam::queue::ArrayQueue; | ||
| use std::sync::{ | ||
| RwLock, | ||
| atomic::{AtomicBool, AtomicU32, Ordering}, | ||
| atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}, | ||
| }; | ||
| use thiserror::Error; | ||
|
|
||
|
|
@@ -47,11 +47,23 @@ pub enum FsmError { | |
| } | ||
|
|
||
| pub struct FreeSpaceMap { | ||
| /// Garnet callbacks for reading/writing FSM keys | ||
| callbacks: Callbacks, | ||
| /// A flag to signal whether there are free IDs in the FSM. | ||
| /// This is set after a scan of the FSM, and is used to prevent extraneous reads | ||
| /// of FSM blocks. | ||
| has_free_ids: AtomicBool, | ||
| /// A queue of previously deleted IDs to prevent excessive reads to the FSM | ||
| fast_free_list: ArrayQueue<u32>, | ||
| /// The maximum block ID stored in the FSM | ||
| max_block: RwLock<u32>, | ||
| /// The next ID that will be minted if reusing previously deleted IDs is unavailable | ||
| next_id: AtomicU32, | ||
| /// The total number of IDs marked used in the FSM | ||
| total_used: AtomicUsize, | ||
| /// Lock that prevents reuse of prevously used IDs. | ||
| /// This is used to disable ID reuse during quantization backfill. | ||
| reuse_lock: AtomicUsize, | ||
| } | ||
|
|
||
| impl FreeSpaceMap { | ||
|
|
@@ -60,13 +72,16 @@ impl FreeSpaceMap { | |
| let fast_free_list = ArrayQueue::new(FAST_SIZE); | ||
| let max_block = RwLock::new(u32::MAX); | ||
| let next_id = AtomicU32::new(0); | ||
| let total_used = AtomicUsize::new(0); | ||
|
|
||
| let mut this = Self { | ||
| callbacks, | ||
| has_free_ids, | ||
| fast_free_list, | ||
| max_block, | ||
| next_id, | ||
| total_used, | ||
| reuse_lock: AtomicUsize::new(0), | ||
| }; | ||
|
|
||
| // Attempt to load state from Garnet. | ||
|
|
@@ -97,6 +112,7 @@ impl FreeSpaceMap { | |
|
|
||
| let mut block = vec![0u8; BLOCK_SIZE_BYTES]; | ||
| let mut last_used_id = -1i64; | ||
| let mut total_used = 0usize; | ||
|
|
||
| for block_id in (0..max_block_id).rev() { | ||
| let block_key = Self::block_key(block_id); | ||
|
|
@@ -115,8 +131,9 @@ impl FreeSpaceMap { | |
| let used = bit_used(byte, bidx); | ||
| if used { | ||
| last_used_id = last_used_id.max(id as i64); | ||
| } else if (id as i64) < last_used_id && self.fast_free_list.push(id).is_err() { | ||
| break; | ||
| total_used += 1; | ||
| } else if (id as i64) < last_used_id { | ||
| let _ = self.fast_free_list.push(id); | ||
| } | ||
|
|
||
| id = id.saturating_sub(1); | ||
|
|
@@ -130,6 +147,8 @@ impl FreeSpaceMap { | |
| self.next_id | ||
| .store((last_used_id + 1) as u32, Ordering::Release); | ||
|
|
||
| self.total_used.store(total_used, Ordering::Release); | ||
|
|
||
| if !self.fast_free_list.is_empty() { | ||
| self.has_free_ids.store(true, Ordering::Release); | ||
| } | ||
|
|
@@ -174,6 +193,14 @@ impl FreeSpaceMap { | |
| return Err(FsmError::Garnet(GarnetError::Write)); | ||
| } | ||
|
|
||
| if changed { | ||
| if used { | ||
| self.total_used.fetch_add(1, Ordering::AcqRel); | ||
| } else { | ||
| self.total_used.fetch_sub(1, Ordering::AcqRel); | ||
| } | ||
| } | ||
|
|
||
| // NOTE: We don't modify the free list if the id was already free. | ||
| if !used && changed { | ||
| // Push the id onto the fast free list. If the queue is full, ignore it. | ||
|
|
@@ -214,7 +241,7 @@ impl FreeSpaceMap { | |
| /// This may be a a fresh ID larger than all the others, or it may be a reused ID that | ||
| /// previously belonged to a deleted element. The returned ID is marked as used. | ||
| pub fn next_id(&self, ctx: Context) -> Result<u32, FsmError> { | ||
| if self.has_free_ids.load(Ordering::Acquire) { | ||
| if self.can_reuse() && self.has_free_ids.load(Ordering::Acquire) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this check correct? |
||
| // We retry reusing a freed ID until there are none or we get one and marking it used | ||
| // succeeds in changing the value. | ||
| loop { | ||
|
|
@@ -270,6 +297,10 @@ impl FreeSpaceMap { | |
| self.next_id.load(Ordering::Acquire).saturating_sub(1) | ||
| } | ||
|
|
||
| pub fn total_used(&self) -> usize { | ||
| self.total_used.load(Ordering::Acquire) | ||
| } | ||
|
|
||
| /// Return the FSM block number, byte index, and bit index for a given ID. | ||
| /// The block number is the block which stores this ID, the byte index is byte offset | ||
| /// within the block which contains the status bits, and the bit index is the bit index | ||
|
|
@@ -301,9 +332,9 @@ impl FreeSpaceMap { | |
|
|
||
| let mut has_free_ids = false; | ||
| let mut id = 0u32; | ||
| let mut block = vec![0u8; BLOCK_SIZE_BYTES]; | ||
| 'scan: for block_id in 0..*max_block { | ||
| let block_key = Self::block_key(block_id); | ||
| let mut block = vec![0u8; BLOCK_SIZE_BYTES]; | ||
| if !self | ||
| .callbacks | ||
| .read_single_wid(ctx.term(Term::Metadata), block_key, &mut block) | ||
|
|
@@ -363,6 +394,66 @@ impl FreeSpaceMap { | |
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// Visit each used id in the FSM, invoking f on each id. | ||
| pub fn visit_used<F>(&self, ctx: Context, mut f: F) -> Result<(), FsmError> | ||
| where | ||
| F: FnMut(u32) -> bool, | ||
| { | ||
| let max_block = { *self.max_block.read().unwrap() }; | ||
| let mut block = vec![0u8; BLOCK_SIZE_BYTES]; | ||
| let mut id = 0u32; | ||
|
|
||
| for block_id in 0..max_block + 1 { | ||
| let block_key = Self::block_key(block_id); | ||
| if !self | ||
| .callbacks | ||
| .read_single_wid(ctx.term(Term::Metadata), block_key, &mut block) | ||
| { | ||
| return Err(FsmError::Garnet(GarnetError::Read)); | ||
| } | ||
|
|
||
| for &byte in &block { | ||
| if byte == 0x00 { | ||
| id += 8; | ||
| continue; | ||
| } | ||
|
|
||
| for bidx in 0..8 { | ||
| if bit_used(byte, bidx) { | ||
| let keep_going = f(id); | ||
| if !keep_going { | ||
| return Ok(()); | ||
| } | ||
| } | ||
| id += 1; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// Returns whether previously deleted IDs may be reused. | ||
| fn can_reuse(&self) -> bool { | ||
| self.reuse_lock.load(Ordering::Acquire) == 0 | ||
| } | ||
|
|
||
| /// Prevent the reuse of previously deleted IDs. | ||
| /// Each call to this increments a counter, and only once the counter is back to zero | ||
| /// will reuse be allowed again. | ||
| pub fn lock_reuse(&self) { | ||
| self.reuse_lock.fetch_add(1, Ordering::AcqRel); | ||
| } | ||
|
|
||
| /// Resume reuse of previously deleted IDs. | ||
| /// Each call to this decrements a counter, and only once the counter is back to zero | ||
| /// will reuse be allowed. This returns whether reuse was actually enabled. | ||
| pub fn unlock_reuse(&self) -> bool { | ||
| let prev = self.reuse_lock.fetch_sub(1, Ordering::AcqRel); | ||
| debug_assert_ne!(prev, 0); | ||
| prev == 1 | ||
| } | ||
| } | ||
|
|
||
| /// Return whether the `bidx`th bit is set in byte, where bits are labeled from left to right. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IS this used outside the crate?