1616
1717#pragma once
1818
19+ #include < folly/ScopeGuard.h>
1920#include < folly/container/F14Map.h>
2021#include < folly/lang/Align.h>
22+ #include < folly/logging/xlog.h>
2123
2224#include < atomic>
2325#include < mutex>
2426#include < optional>
2527#include < stdexcept>
28+ #include < string_view>
2629#include < vector>
2730
2831#include " cachelib/common/AtomicCounter.h"
32+ #include " cachelib/common/Serialization.h"
33+ #include " cachelib/common/Time.h"
2934#include " cachelib/common/Utils.h"
35+ #include " cachelib/navy/serialization/gen-cpp2/objects_types.h"
36+ #include " cachelib/shm/ShmManager.h"
3037
3138namespace facebook ::cachelib {
3239
@@ -42,10 +49,22 @@ namespace facebook::cachelib {
4249// mutex overhead on the read path.
4350class AccessTimeMap {
4451 public:
52+ static constexpr std::string_view kShmInfoName = " shm_atm_info" ;
53+ static constexpr std::string_view kShmDataName = " shm_atm_data" ;
54+ static constexpr uint32_t kVersion = 1 ;
55+
56+ // Flat entry layout for serialization into shared memory.
57+ struct ShmEntry {
58+ uint64_t keyHash;
59+ uint32_t accessTimeSecs;
60+ };
61+
4562 // maxSize: approximate upper bound on total entries across all shards.
4663 // Enforced per-shard as ceil(maxSize / numShards). 0 means unbounded.
4764 explicit AccessTimeMap (size_t numShards, size_t maxSize = 0 )
48- : shards_(numShards),
65+ : numShards_(numShards),
66+ maxSize_(maxSize),
67+ shards_(numShards),
4968 maxEntriesPerShard_(computeMaxEntriesPerShard(numShards, maxSize)) {}
5069
5170 // Store a timestamp for the given key hash.
@@ -120,8 +139,162 @@ class AccessTimeMap {
120139 // memory ordering, but avoids the contention of locking all shards.
121140 size_t size () const { return size_.load (std::memory_order_relaxed); }
122141
123- // Counters are read without a global barrier, so a snapshot may be
124- // internally inconsistent (e.g. hits + misses > gets).
142+ void persist (ShmManager& shm) {
143+ auto startTimeSecs = util::getCurrentTimeSec ();
144+
145+ size_t entryCount = 0 ;
146+ for (auto & shard : shards_) {
147+ std::lock_guard l (shard.mutex_ );
148+ entryCount += shard.map_ .size ();
149+ }
150+
151+ // Remove any stale segments to avoid recovering old state on warm roll.
152+ shm.removeShm (std::string (kShmDataName ));
153+ shm.removeShm (std::string (kShmInfoName ));
154+
155+ if (entryCount == 0 ) {
156+ XLOG (INFO) << " AccessTimeMap is empty, nothing to persist" ;
157+ return ;
158+ }
159+
160+ const size_t dataSize = entryCount * sizeof (ShmEntry);
161+ auto dataAddr = shm.createShm (std::string (kShmDataName ), dataSize);
162+ auto * entries = reinterpret_cast <ShmEntry*>(dataAddr.addr );
163+
164+ size_t idx = 0 ;
165+ for (auto & shard : shards_) {
166+ std::lock_guard l (shard.mutex_ );
167+ for (const auto & [keyHash, accessTimeSecs] : shard.map_ ) {
168+ if (idx >= entryCount) {
169+ break ;
170+ }
171+ entries[idx++] = ShmEntry{keyHash, accessTimeSecs};
172+ }
173+ if (idx >= entryCount) {
174+ break ;
175+ }
176+ }
177+
178+ navy::serialization::AccessTimeMapConfig cfg;
179+ *cfg.version () = static_cast <int32_t >(kVersion );
180+ *cfg.numShards () = static_cast <int64_t >(numShards_);
181+ *cfg.maxSize () = static_cast <int64_t >(maxSize_);
182+ *cfg.entryCount () = static_cast <int64_t >(idx);
183+
184+ auto ioBuf = Serializer::serializeToIOBuf (cfg);
185+ auto infoAddr = shm.createShm (std::string (kShmInfoName ), ioBuf->length ());
186+ Serializer serializer (
187+ reinterpret_cast <uint8_t *>(infoAddr.addr ),
188+ reinterpret_cast <uint8_t *>(infoAddr.addr ) + ioBuf->length ());
189+ serializer.writeToBuffer (std::move (ioBuf));
190+
191+ XLOGF (INFO,
192+ " Persisted AccessTimeMap: {} entries, {} bytes to shared memory "
193+ " in {} secs" ,
194+ idx, dataSize, util::getCurrentTimeSec () - startTimeSecs);
195+ }
196+
197+ // Restore entries from shared memory. On failure, starts empty.
198+ void recover (ShmManager& shm) {
199+ auto startTimeSecs = util::getCurrentTimeSec ();
200+
201+ ShmAddr infoAddr;
202+ try {
203+ infoAddr = shm.attachShm (std::string (kShmInfoName ));
204+ } catch (const std::exception&) {
205+ XLOG (INFO) << " No AccessTimeMap shm segments found, starting empty" ;
206+ return ;
207+ }
208+
209+ SCOPE_EXIT {
210+ shm.removeShm (std::string (kShmInfoName ));
211+ shm.removeShm (std::string (kShmDataName ));
212+ };
213+
214+ Deserializer deserializer (
215+ reinterpret_cast <uint8_t *>(infoAddr.addr ),
216+ reinterpret_cast <uint8_t *>(infoAddr.addr ) + infoAddr.size );
217+ auto cfg =
218+ deserializer.deserialize <navy::serialization::AccessTimeMapConfig>();
219+
220+ if (static_cast <uint32_t >(*cfg.version ()) != kVersion ) {
221+ XLOGF (WARN,
222+ " AccessTimeMap version mismatch: stored={}, current={}. "
223+ " Starting empty." ,
224+ *cfg.version (), kVersion );
225+ return ;
226+ }
227+
228+ const auto entryCount = static_cast <size_t >(*cfg.entryCount ());
229+ if (entryCount == 0 ) {
230+ // persist() skips empty maps, so this may indicate corruption.
231+ XLOG (WARN) << " AccessTimeMap shm segment has zero entries but info "
232+ " segment exists, possible corruption. Starting empty." ;
233+ return ;
234+ }
235+
236+ ShmAddr dataAddr;
237+ try {
238+ dataAddr = shm.attachShm (std::string (kShmDataName ));
239+ } catch (const std::exception& e) {
240+ XLOGF (WARN,
241+ " Failed to attach AccessTimeMap data segment: {}. "
242+ " Starting empty." ,
243+ e.what ());
244+ return ;
245+ }
246+
247+ const size_t expectedSize = entryCount * sizeof (ShmEntry);
248+ if (dataAddr.size < expectedSize) {
249+ XLOGF (WARN,
250+ " AccessTimeMap data segment size mismatch: got={}, expected={}. "
251+ " Starting empty." ,
252+ dataAddr.size , expectedSize);
253+ return ;
254+ }
255+
256+ const auto * entries = reinterpret_cast <const ShmEntry*>(dataAddr.addr );
257+
258+ // Group entries by shard so we only grab each lock once.
259+ std::vector<folly::F14FastMap<uint64_t , uint32_t >> perShard (shards_.size ());
260+ for (size_t i = 0 ; i < entryCount; ++i) {
261+ auto shardIdx = entries[i].keyHash % shards_.size ();
262+ perShard[shardIdx][entries[i].keyHash ] = entries[i].accessTimeSecs ;
263+ }
264+ size_t totalInserted = 0 ;
265+ for (size_t s = 0 ; s < shards_.size (); ++s) {
266+ if (perShard[s].empty ()) {
267+ continue ;
268+ }
269+ auto & shard = shards_[s];
270+ std::lock_guard l (shard.mutex_ );
271+ size_t inserted = 0 ;
272+ for (const auto & [keyHash, ts] : perShard[s]) {
273+ if (maxEntriesPerShard_ > 0 &&
274+ shard.map_ .size () >= maxEntriesPerShard_) {
275+ break ;
276+ }
277+ shard.map_ .insert_or_assign (keyHash, ts);
278+ ++inserted;
279+ }
280+ size_.fetch_add (inserted, std::memory_order_relaxed);
281+ totalInserted += inserted;
282+ }
283+
284+ if (totalInserted < entryCount) {
285+ XLOGF (WARN,
286+ " AccessTimeMap recovery dropped {} entries due to per-shard "
287+ " size cap (maxEntriesPerShard={}, persisted={})" ,
288+ entryCount - totalInserted, maxEntriesPerShard_, entryCount);
289+ }
290+
291+ XLOGF (INFO,
292+ " Recovered AccessTimeMap: {}/{} entries, {} bytes from shared memory "
293+ " in {} secs" ,
294+ totalInserted, entryCount, expectedSize,
295+ util::getCurrentTimeSec () - startTimeSecs);
296+ }
297+
125298 void getCounters (const util::CounterVisitor& visitor) const {
126299 visitor (" navy_atm_size" , size_.load (std::memory_order_relaxed),
127300 util::CounterVisitor::CounterType::COUNT);
@@ -164,6 +337,8 @@ class AccessTimeMap {
164337 return shards_[keyHash % shards_.size ()];
165338 }
166339
340+ const size_t numShards_;
341+ const size_t maxSize_;
167342 std::vector<Shard> shards_;
168343 const size_t maxEntriesPerShard_{0 };
169344 std::atomic<size_t > size_{0 };
0 commit comments