Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8716007
Replay distributed work onto users/qiazh/pre-merge-tikv-bugfix
May 20, 2026
4186747
Fix unneede diff
TerrenceZhangX May 20, 2026
ee97d3f
Remove unused stride-shard experiment
TerrenceZhangX May 20, 2026
4df704f
InsertVectors: dedupe branches, log InsertThreadNum ignore in bulk path
TerrenceZhangX May 20, 2026
c27a109
Restore (layers+1) multiplier in BlockController IO queue size
TerrenceZhangX May 20, 2026
f3a9de9
SetVersionBatch: bypass LRU cache, read TiKV directly
TerrenceZhangX May 20, 2026
f35ae85
Drop high-priority job queue from SPDKThreadPool
TerrenceZhangX May 20, 2026
a49b26d
Fix space
TerrenceZhangX May 20, 2026
689e5b2
Fix distributed benchmark README + drop dead orchestrator code
TerrenceZhangX May 20, 2026
ee405d4
README: clarify driver = worker 0 + dispatcher; workers peer-to-peer
TerrenceZhangX May 20, 2026
6cf7d36
README: drop unused TiKV pre-split helper section
TerrenceZhangX May 20, 2026
07bdc03
Clean comment
TerrenceZhangX May 20, 2026
f0d8fe5
Extract IsRemoteOwnedHead predicate for owner-ring checks
TerrenceZhangX May 21, 2026
d55de54
VersionMap extend: use stride formula capacity*numWorkers
TerrenceZhangX May 21, 2026
3703866
RemotePostingOps: move RPC chunk/retry/timeout/inflight into INI options
TerrenceZhangX May 21, 2026
9619b2f
Async Split/Merge jobs: retry counter + re-enqueue on failure
TerrenceZhangX May 21, 2026
864e268
DispatchResult: carry SPTAG::ErrorCode back to driver
TerrenceZhangX May 21, 2026
1cd19f1
AppendCallback: HandleRaceCondition gate against in-flight split/merge
TerrenceZhangX May 21, 2026
dca197b
SPANN distributed: TTL-based remote lock lease
TerrenceZhangX May 21, 2026
489ff4e
SPANN distributed: watchdog for failed async append batches
TerrenceZhangX May 21, 2026
7093d40
SPANN distributed: durable HeadSync log + Split WAL scaffolding
TerrenceZhangX May 21, 2026
111d37c
SPANN distributed: full lease-fencing with monotonic fencing tokens
TerrenceZhangX May 21, 2026
74c0350
SPANN distributed: wire split path through fenced cross-owner write
TerrenceZhangX May 21, 2026
de3fa64
SPANN distributed: route inner layers, retire async-job UAF, larger R…
TerrenceZhangX May 21, 2026
06d8899
feat(distributed): receiver-side durable Batch WAL for RemoteAppend (…
TerrenceZhangX May 21, 2026
7aca9f0
fix(distributed): receiver-side admission control for Batch WAL
TerrenceZhangX May 21, 2026
2088e13
fix(distributed): stop replaying moved-out items + per-layer remote-o…
TerrenceZhangX May 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion AnnService/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ include_directories(${Zstd}/lib)
file(GLOB_RECURSE HDR_FILES ${AnnService}/inc/Core/*.h ${AnnService}/inc/Helper/*.h)
file(GLOB_RECURSE SRC_FILES ${AnnService}/src/Core/*.cpp ${AnnService}/src/Helper/*.cpp)

# Include Socket sources in core lib for PostingRouter
file(GLOB SOCKET_HDR_FILES ${AnnService}/inc/Socket/*.h)
file(GLOB SOCKET_SRC_FILES ${AnnService}/src/Socket/*.cpp)
list(APPEND HDR_FILES ${SOCKET_HDR_FILES})
list(APPEND SRC_FILES ${SOCKET_SRC_FILES})

set(SPDK_LIBRARIES "")
if (SPDK)
set(Spdk ${PROJECT_SOURCE_DIR}/ThirdParty/spdk/build)
Expand Down Expand Up @@ -73,7 +79,7 @@ endif()
add_library (SPTAGLib SHARED ${SRC_FILES} ${HDR_FILES} ${TiKV_PROTO_SOURCES})
target_link_libraries (SPTAGLib DistanceUtils ${RocksDB_LIBRARIES} ${uring_LIBRARIES} libzstd_shared ${NUMA_LIBRARY} ${TBB_LIBRARIES} ${SPDK_LIBRARIES} ${TiKV_LIBRARIES})
add_library (SPTAGLibStatic STATIC ${SRC_FILES} ${HDR_FILES} ${TiKV_PROTO_SOURCES})
target_link_libraries (SPTAGLibStatic DistanceUtils ${RocksDB_LIBRARIES} ${uring_LIBRARIES} libzstd_static ${NUMA_LIBRARY_STATIC} ${TBB_LIBRARIES} ${SPDK_LIBRARIES} ${TiKV_LIBRARIES})
target_link_libraries (SPTAGLibStatic DistanceUtils ${RocksDB_LIBRARIES} ${uring_LIBRARIES} libzstd_static ${NUMA_LIBRARY_STATIC} ${TBB_LIBRARIES} ${SPDK_LIBRARIES} ${TiKV_LIBRARIES} ${Boost_LIBRARIES})

if (MSVC)
# SPANNIndex.cpp can exceed COFF section limits in Debug without /bigobj.
Expand Down
21 changes: 13 additions & 8 deletions AnnService/inc/Core/Common/FineGrainedLock.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ namespace SPTAG
{
return idx;
}

// Bucket index for the internal mutex-sharded unordered_map of
// per-posting locks. Exposed for callers that need an array sized
// to BucketCount and indexed by the same granularity as the lock
// pool (e.g. ExtraDynamicSearcher::m_remoteBucketLocked).
static inline unsigned BucketIndex(SizeType idx)
{
unsigned key = static_cast<unsigned>(idx);
return ((unsigned)(key * 99991) + _rotl(key, 2) + 101) & BucketMask;
}

static const int BucketMask = 32767;
static const int BucketCount = BucketMask + 1;
private:
struct Bucket {
std::mutex mutex;
Expand All @@ -76,14 +89,6 @@ namespace SPTAG
return *iter->second;
}

static inline unsigned BucketIndex(SizeType idx)
{
unsigned key = static_cast<unsigned>(idx);
return ((unsigned)(key * 99991) + _rotl(key, 2) + 101) & BucketMask;
}

static const int BucketMask = 32767;
static const int BucketCount = BucketMask + 1;
mutable std::unique_ptr<Bucket[]> m_buckets;
};
}
Expand Down
12 changes: 12 additions & 0 deletions AnnService/inc/Core/Common/IVersionMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ namespace SPTAG
virtual uint8_t GetVersion(const SizeType& key) = 0;
virtual uint8_t GetVersion(const SizeType& key, VersionReadPolicy policy) { return GetVersion(key); }
virtual void SetVersion(const SizeType& key, const uint8_t& version) = 0;

/// Batch SetVersion: apply (vids[i] -> versions[i]) for all i.
/// Default impl is a per-VID loop. TiKV-backed maps override this
/// to group writes by chunk so N records in the same chunk only
/// trigger 1 ReadChunk + 1 WriteChunk RPC pair
virtual void SetVersionBatch(const std::vector<SizeType>& vids, const std::vector<uint8_t>& versions)
{
size_t n = std::min(vids.size(), versions.size());
for (size_t i = 0; i < n; i++) {
SetVersion(vids[i], versions[i]);
}
}
/// Increment the version of a VID.
/// @param expectedOld If not 0xff, the caller asserts the current version should be this value.
/// If TiKV already holds (expectedOld+1)&0x7f, treat as success (another node did the same increment).
Expand Down
55 changes: 55 additions & 0 deletions AnnService/inc/Core/Common/TiKVVersionMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,61 @@ namespace SPTAG
else if (oldVal != 0xfe && version == 0xfe) m_deleted++;
}

// Group writes by chunk: 1 ReadChunk + N byte-modifications + 1 WriteChunk
// per chunk, instead of N × (ReadChunk + WriteChunk). Bypasses the LRU
// cache because runs that exercise this path always have
// VersionCacheMaxChunks=0; reading TiKV directly removes a layer of
// bookkeeping (cache invalidate-on-write) we no longer benefit from.
void SetVersionBatch(const std::vector<SizeType>& vids, const std::vector<uint8_t>& versions) override
{
size_t n = std::min(vids.size(), versions.size());
if (n == 0) return;
const SizeType localCount = m_count.load();

// Group (idx into vids/versions) by chunk id.
std::unordered_map<SizeType, std::vector<size_t>> byChunk;
byChunk.reserve(n);
for (size_t i = 0; i < n; i++) {
SizeType vid = vids[i];
if (vid < 0 || vid >= localCount) continue;
byChunk[ChunkId(vid)].push_back(i);
}
if (byChunk.empty()) return;

long deletedDelta = 0;
for (auto& kv : byChunk) {
SizeType cid = kv.first;
auto& idxs = kv.second;
std::lock_guard<std::mutex> lock(ChunkMutex(cid));
std::string chunk = ReadChunk(cid);
if (chunk.empty()) {
chunk.assign(m_chunkSize, static_cast<char>(0xff));
}
bool dirty = false;
for (size_t i : idxs) {
SizeType vid = vids[i];
uint8_t newVal = versions[i];
int offset = ChunkOffset(vid);
if (offset < 0 || offset >= (int)chunk.size()) continue;
uint8_t oldVal = static_cast<uint8_t>(chunk[offset]);
if (oldVal == newVal) continue;
if (oldVal == 0xfe && newVal != 0xfe) deletedDelta--;
else if (oldVal != 0xfe && newVal == 0xfe) deletedDelta++;
chunk[offset] = static_cast<char>(newVal);
dirty = true;
}
if (dirty) {
auto ret = WriteChunk(cid, chunk);
if (ret != ErrorCode::Success) {
SPTAGLIB_LOG(Helper::LogLevel::LL_Error,
"TiKVVersionMap::SetVersionBatch: WriteChunk failed chunk=%d layer=%d\n",
cid, m_layer);
}
}
}
if (deletedDelta != 0) m_deleted += deletedDelta;
}

bool IncVersion(const SizeType& key, uint8_t* newVersion, uint8_t expectedOld = 0xff) override
{
if (key < 0 || key >= m_count.load()) {
Expand Down
177 changes: 177 additions & 0 deletions AnnService/inc/Core/SPANN/Distributed/AsyncJobWatchdog.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

#ifndef _SPTAG_SPANN_DISTRIBUTED_ASYNCJOBWATCHDOG_H_
#define _SPTAG_SPANN_DISTRIBUTED_ASYNCJOBWATCHDOG_H_

#include "inc/Helper/Logging.h"

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <vector>

namespace SPTAG {
namespace SPANN {
namespace Distributed {

// AsyncJobWatchdog tracks async (fire-and-forget) inter-node dispatches
// and resends them on timeout or transport failure.
//
// Today the only fire-and-forget path is QueueRemoteAppend auto-flush in
// WorkerNode: it ships a batch of RemoteAppendRequests to a peer with no
// synchronous error propagation. Without a watchdog, transient network
// or peer-crash failures silently lose those appends.
//
// The watchdog is intentionally small: callers register a batch with a
// resend callback; the watchdog reschedules the callback up to
// MaxAttempts with exponential backoff. RemoteAppend is idempotent on
// the receive side (HandleRemoteAppend de-dups via per-posting RMW), so
// at-least-once delivery is safe.
class AsyncJobWatchdog {
public:
using ResendFn = std::function<bool()>; // returns true on success

AsyncJobWatchdog(int maxAttempts = 3,
int initialBackoffMs = 200)
: m_maxAttempts(maxAttempts),
m_initialBackoffMs(initialBackoffMs),
m_stop(false) {
m_worker = std::thread([this]() { Loop(); });
}

~AsyncJobWatchdog() {
{
std::lock_guard<std::mutex> lk(m_mutex);
m_stop = true;
}
m_cv.notify_all();
if (m_worker.joinable()) m_worker.join();
}

// Submit a fire-and-forget dispatch. The watchdog calls `resend` if
// and only if a prior attempt has failed; the caller is responsible
// for the initial attempt. After success, call MarkSuccess(id).
uint64_t Track(ResendFn resend, std::string tag = "") {
std::lock_guard<std::mutex> lk(m_mutex);
uint64_t id = ++m_nextId;
Entry e;
e.resend = std::move(resend);
e.attempts = 0;
e.tag = std::move(tag);
e.nextDeadline = std::chrono::steady_clock::time_point::max();
m_entries.emplace(id, std::move(e));
return id;
}

void MarkSuccess(uint64_t id) {
std::lock_guard<std::mutex> lk(m_mutex);
m_entries.erase(id);
}

// Schedule a resend after backoff for entry `id`. Called by producer
// when its synchronous attempt fails. Gives up after MaxAttempts.
void MarkFailureAndScheduleResend(uint64_t id) {
std::unique_lock<std::mutex> lk(m_mutex);
auto it = m_entries.find(id);
if (it == m_entries.end()) return;
if (++it->second.attempts >= m_maxAttempts) {
SPTAGLIB_LOG(Helper::LogLevel::LL_Error,
"AsyncJobWatchdog: %s giving up after %d attempts\n",
it->second.tag.c_str(), it->second.attempts);
m_entries.erase(it);
return;
}
int backoffMs = m_initialBackoffMs << (it->second.attempts - 1);
it->second.nextDeadline =
std::chrono::steady_clock::now() +
std::chrono::milliseconds(backoffMs);
lk.unlock();
m_cv.notify_all();
}

size_t OutstandingCount() const {
std::lock_guard<std::mutex> lk(m_mutex);
return m_entries.size();
}

private:
struct Entry {
ResendFn resend;
int attempts;
std::string tag;
std::chrono::steady_clock::time_point nextDeadline;
};

void Loop() {
std::unique_lock<std::mutex> lk(m_mutex);
while (!m_stop) {
auto now = std::chrono::steady_clock::now();
auto nextWake = now + std::chrono::seconds(1);
std::vector<uint64_t> due;
for (auto& kv : m_entries) {
if (kv.second.nextDeadline <= now) {
due.push_back(kv.first);
} else if (kv.second.nextDeadline < nextWake) {
nextWake = kv.second.nextDeadline;
}
}
for (uint64_t id : due) {
auto it = m_entries.find(id);
if (it == m_entries.end()) continue;
ResendFn fn = it->second.resend;
std::string tag = it->second.tag;
int attempt = it->second.attempts;
it->second.nextDeadline =
std::chrono::steady_clock::time_point::max();
lk.unlock();
SPTAGLIB_LOG(Helper::LogLevel::LL_Info,
"AsyncJobWatchdog: resending %s attempt=%d\n",
tag.c_str(), attempt + 1);
bool ok = false;
try { ok = fn(); } catch (...) { ok = false; }
lk.lock();
if (ok) {
m_entries.erase(id);
} else {
auto it2 = m_entries.find(id);
if (it2 != m_entries.end()) {
if (++it2->second.attempts >= m_maxAttempts) {
SPTAGLIB_LOG(Helper::LogLevel::LL_Error,
"AsyncJobWatchdog: %s giving up after %d attempts\n",
it2->second.tag.c_str(), it2->second.attempts);
m_entries.erase(it2);
} else {
int backoffMs =
m_initialBackoffMs << (it2->second.attempts - 1);
it2->second.nextDeadline =
std::chrono::steady_clock::now() +
std::chrono::milliseconds(backoffMs);
}
}
}
}
m_cv.wait_until(lk, nextWake, [this]() { return m_stop; });
}
}

mutable std::mutex m_mutex;
std::condition_variable m_cv;
std::unordered_map<uint64_t, Entry> m_entries;
uint64_t m_nextId = 0;
int m_maxAttempts;
int m_initialBackoffMs;
bool m_stop;
std::thread m_worker;
};

} // namespace Distributed
} // namespace SPANN
} // namespace SPTAG

#endif // _SPTAG_SPANN_DISTRIBUTED_ASYNCJOBWATCHDOG_H_
Loading