Skip to content

Commit b9e6eda

Browse files
committed
Optimize and clean up internal replicator purge checkpoints
Previously, the internal replicator created twice the number of checkpoints needed to replicate purges between nodes. An internal replication job first pulls the purges from the target to the source, then pushes the purges from the source to the target, then finally pushes the document updates to the target. During the pull operation, for example, from node `B` (the target) to node `A` (the source), it creates an `A->B` checkpoint on node `B` (the target). Then, during the push from `A` to `B` it creates an `A->B` checkpoint on node A (the source). As a result, after the job finishes there are two checkpoints: an A->B one on A, and an `A->B` one on B. It may look something like this: ``` [node A] [node B] <-------pull------ (A->B) (A->B) --------push------> ``` When the internal replication job runs on node B and _pushes_ purges to node A, it will create a `B->A` checkpoint on B. After this instant, there will be two checkpoints on B for replicating purges from B to A: one is `A->B`, from the first job, and another `B->A`, from the second job. Both of the checkpoints essentially checkpoint the same thing. It may looke like this after both replication jobs finish: ``` [node A] [node B] <-------pull------ (A->B) JOB1 (A->B) --------push------> (B->A) --------pull------> <-------push------ (B->A) JOB2 ``` On B, the checkpoints `A->B` and `B->A` could have a different purge sequence: one higher than the other, and so the lower one could delay the compactor from cleaning up purge infos. This also makes it harder to reason about the replication process, since we have an `A->B` checkpoint on `B`, but it's for sending changes _from_ B _to_ A, not like one might expect `A->B` based on its name. To fix this, make sure to use a single checkpoint per direction of replication. So, when change are pulled from B to A, the checkpoint is now B->A, and when changes are pushed from B to A the checkpoint is also B->A. It should look something like this: ``` [node A] [node B] <-------pull------ JOB1 (A->B) --------push------> --------pull------> <-------push------ (B->A) JOB2 ``` Since after this change we'll have some deprecated purge checkpoints to clean up, it's probably also a good time to introduce purge checkpoint cleanup. We have this for indexes but we didn't have it for the internal replicator. That meant that on shard map reconfigurations, or node membership changes, user would have to manually hunt down local (un-clustered) stale purge checkpoints and remove them. Now this happens automatically when we compact, and before we replicate between nodes.
1 parent d512917 commit b9e6eda

4 files changed

Lines changed: 263 additions & 54 deletions

File tree

src/couch/src/couch_bt_engine_compactor.erl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,10 @@ copy_purge_info(#comp_st{} = CompSt) ->
158158
retry = Retry
159159
} = CompSt,
160160
?COMP_EVENT(purge_init),
161+
% The minumum purge sequence calculation involves finding the lowest
162+
% reported purge sequence across all checkpoints. Make sure to clean up any
163+
% stale or deprecated internal replicator checkpoints beforehand.
164+
ok = mem3_rep:cleanup_purge_checkpoints(DbName),
161165
MinPurgeSeq = couch_util:with_db(DbName, fun(Db) ->
162166
couch_db:get_minimum_purge_seq(Db)
163167
end),

src/mem3/src/mem3_rep.erl

Lines changed: 102 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
make_local_id/3,
2020
make_purge_id/2,
2121
verify_purge_checkpoint/2,
22+
cleanup_purge_checkpoints/1,
2223
find_source_seq/4,
2324
find_split_target_seq/4,
2425
local_id_hash/1
@@ -56,6 +57,10 @@
5657
}).
5758

5859
-define(DEFAULT_REXI_TIMEOUT, 600000).
60+
-define(CHECKPOINT_PREFIX, "_local/shard-sync-").
61+
-define(PURGE_PREFIX, "_local/purge-mem3-").
62+
-define(UUID_SIZE, 32).
63+
-define(PURGE_TYPE, <<"internal_replication">>).
5964

6065
go(Source, Target) ->
6166
go(Source, Target, []).
@@ -148,12 +153,12 @@ make_local_id(#shard{node = SourceNode}, #shard{node = TargetNode}, Filter) ->
148153
make_local_id(SourceThing, TargetThing, F) when is_binary(F) ->
149154
S = local_id_hash(SourceThing),
150155
T = local_id_hash(TargetThing),
151-
<<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>;
156+
<<?CHECKPOINT_PREFIX, S/binary, "-", T/binary, F/binary>>;
152157
make_local_id(SourceThing, TargetThing, Filter) ->
153158
S = local_id_hash(SourceThing),
154159
T = local_id_hash(TargetThing),
155160
F = filter_hash(Filter),
156-
<<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.
161+
<<?CHECKPOINT_PREFIX, S/binary, "-", T/binary, F/binary>>.
157162

158163
filter_hash(Filter) when is_function(Filter) ->
159164
{new_uniq, Hash} = erlang:fun_info(Filter, new_uniq),
@@ -166,44 +171,98 @@ local_id_hash(Thing) ->
166171
couch_util:encodeBase64Url(couch_hash:md5_hash(?term_to_bin(Thing))).
167172

168173
make_purge_id(SourceUUID, TargetUUID) ->
169-
<<"_local/purge-mem3-", SourceUUID/binary, "-", TargetUUID/binary>>.
174+
<<?PURGE_PREFIX, SourceUUID/binary, "-", TargetUUID/binary>>.
170175

171-
verify_purge_checkpoint(DbName, Props) ->
172-
try
173-
Type = couch_util:get_value(<<"type">>, Props),
174-
if
175-
Type =/= <<"internal_replication">> ->
176-
false;
177-
true ->
178-
SourceBin = couch_util:get_value(<<"source">>, Props),
179-
TargetBin = couch_util:get_value(<<"target">>, Props),
180-
Range = couch_util:get_value(<<"range">>, Props),
176+
remote_id_to_local(<<?PURGE_PREFIX, Remote:?UUID_SIZE/binary, "-", Local:?UUID_SIZE/binary>>) ->
177+
<<?PURGE_PREFIX, Local/binary, "-", Remote/binary>>.
181178

182-
Source = binary_to_existing_atom(SourceBin, latin1),
183-
Target = binary_to_existing_atom(TargetBin, latin1),
179+
% If the shard map changed, nodes are decomissioned, or user upgraded from a
180+
% version before 3.6 we may have some some checkpoints to clean up. Call this
181+
% function before compacting, right before we calculate the minimum purge
182+
% sequence, and also before we replicate purges to/from other copies.
183+
%
184+
cleanup_purge_checkpoints(ShardName) when is_binary(ShardName) ->
185+
couch_util:with_db(ShardName, fun(Db) -> cleanup_purge_checkpoints(Db) end);
186+
cleanup_purge_checkpoints(Db) ->
187+
Shards = shards(couch_db:name(Db)),
188+
UUID = couch_db:get_uuid(Db),
189+
FoldFun = fun(#doc{id = Id, body = {Props}}, Acc) ->
190+
case Id of
191+
<<?PURGE_PREFIX, UUID:?UUID_SIZE/binary, "-", _:?UUID_SIZE/binary>> ->
192+
case verify_checkpoint_shard(Shards, Props) of
193+
true -> {ok, Acc};
194+
false -> {ok, [Id | Acc]}
195+
end;
196+
<<?PURGE_PREFIX, _:?UUID_SIZE/binary, "-", _:?UUID_SIZE/binary>> ->
197+
% Cleanup checkpoints not originating at the current shard.
198+
% Previously, before version 3.6, during a pull from shard B to
199+
% shard A we checkpointed on target B with doc ID
200+
% mem3-purge-$AUuid-$BUuid. That created a redundant checkpoint
201+
% which was the same as target B pushing changes to target A,
202+
% which already had a checkpoint: mem3-purge-$BUuid-$AUuid,
203+
% with the same direction and same purge sequence ID. So here
204+
% we remove those redundant checkpoints.
205+
{ok, [Id | Acc]};
206+
_ ->
207+
{stop, Acc}
208+
end
209+
end,
210+
Opts = [{start_key, list_to_binary(?PURGE_PREFIX)}],
211+
{ok, ToDelete} = couch_db:fold_local_docs(Db, FoldFun, [], Opts),
212+
DeleteFun = fun(DocId) -> delete_checkpoint(Db, DocId) end,
213+
lists:foreach(DeleteFun, ToDelete).
214+
215+
delete_checkpoint(Db, DocId) ->
216+
DbName = couch_db:name(Db),
217+
LogMsg = "~p : deleting inactive purge checkpoint ~s : ~s",
218+
couch_log:warning(LogMsg, [?MODULE, DbName, DocId]),
219+
try couch_db:open_doc(Db, DocId, []) of
220+
{ok, Doc = #doc{}} ->
221+
Deleted = Doc#doc{deleted = true, body = {[]}},
222+
couch_db:update_doc(Db, Deleted, [?ADMIN_CTX]);
223+
{not_found, _} ->
224+
ok
225+
catch
226+
Tag:Error ->
227+
ErrLog = "~p : error deleting checkpoint ~s : ~s error: ~p:~p",
228+
couch_log:error(ErrLog, [?MODULE, DbName, DocId, Tag, Error]),
229+
ok
230+
end.
184231

185-
try
186-
Nodes = lists:foldl(
187-
fun(Shard, Acc) ->
188-
case Shard#shard.range == Range of
189-
true -> [Shard#shard.node | Acc];
190-
false -> Acc
191-
end
192-
end,
193-
[],
194-
mem3:shards(mem3:dbname(DbName))
195-
),
196-
lists:member(Source, Nodes) andalso lists:member(Target, Nodes)
197-
catch
198-
error:database_does_not_exist ->
199-
false
200-
end
232+
verify_purge_checkpoint(DbName, Props) ->
233+
try
234+
case couch_util:get_value(<<"type">>, Props) of
235+
?PURGE_TYPE -> verify_checkpoint_shard(shards(DbName), Props);
236+
_ -> false
201237
end
202238
catch
203-
_:_ ->
239+
Tag:Error ->
240+
ErrLog = "~p : invalid checkpoint shard:~s props:~p error: ~p:~p",
241+
couch_log:error(ErrLog, [?MODULE, DbName, Props, Tag, Error]),
204242
false
205243
end.
206244

245+
shards(DbName) ->
246+
try
247+
mem3:shards(mem3:dbname(DbName))
248+
catch
249+
error:database_does_not_exist ->
250+
[]
251+
end.
252+
253+
verify_checkpoint_shard(Shards, Props) when is_list(Shards), is_list(Props) ->
254+
Range = couch_util:get_value(<<"range">>, Props),
255+
Fun = fun(S, Acc) ->
256+
case mem3:range(S) == Range of
257+
true -> [mem3:node(S) | Acc];
258+
false -> Acc
259+
end
260+
end,
261+
Nodes = lists:foldl(Fun, [], Shards),
262+
TBin = couch_util:get_value(<<"target">>, Props),
263+
TNode = binary_to_existing_atom(TBin, latin1),
264+
lists:member(TNode, Nodes) andalso lists:member(TNode, mem3:nodes()).
265+
207266
%% @doc Find and return the largest update_seq in SourceDb
208267
%% that the client has seen from TargetNode.
209268
%%
@@ -335,6 +394,7 @@ pull_purges_multi(#acc{} = Acc0) ->
335394
hashfun = HashFun
336395
} = Acc0,
337396
with_src_db(Acc0, fun(Db) ->
397+
cleanup_purge_checkpoints(Db),
338398
Targets = maps:map(
339399
fun(_, #tgt{} = T) ->
340400
pull_purges(Db, Count, Source, T, HashFun)
@@ -365,9 +425,9 @@ pull_purges(Db, Count, #shard{} = SrcShard, #tgt{} = Tgt0, HashFun) ->
365425
#tgt{shard = TgtShard} = Tgt0,
366426
SrcUUID = couch_db:get_uuid(Db),
367427
#shard{node = TgtNode, name = TgtDbName} = TgtShard,
368-
{LocalPurgeId, Infos, ThroughSeq, Remaining} =
428+
{RemoteId, Infos, ThroughSeq, Remaining} =
369429
mem3_rpc:load_purge_infos(TgtNode, TgtDbName, SrcUUID, Count),
370-
Tgt = Tgt0#tgt{purgeid = LocalPurgeId},
430+
Tgt = Tgt0#tgt{purgeid = RemoteId},
371431
if
372432
Infos == [] ->
373433
ok;
@@ -391,7 +451,7 @@ pull_purges(Db, Count, #shard{} = SrcShard, #tgt{} = Tgt0, HashFun) ->
391451
Infos1 = lists:filter(BelongsFun, Infos),
392452
{ok, _} = couch_db:purge_docs(Db, Infos1, [?REPLICATED_CHANGES]),
393453
Body = purge_cp_body(SrcShard, TgtShard, ThroughSeq),
394-
mem3_rpc:save_purge_checkpoint(TgtNode, TgtDbName, LocalPurgeId, Body)
454+
mem3_rpc:save_purge_checkpoint(TgtNode, TgtDbName, RemoteId, Body)
395455
end,
396456
Tgt#tgt{remaining = max(0, Remaining)}.
397457

@@ -427,7 +487,8 @@ push_purges_multi(#acc{} = Acc) ->
427487
end).
428488

429489
push_purges(Db, BatchSize, SrcShard, Tgt, HashFun) ->
430-
#tgt{shard = TgtShard, purgeid = LocalPurgeId} = Tgt,
490+
#tgt{shard = TgtShard, purgeid = RemotePurgeId} = Tgt,
491+
LocalPurgeId = remote_id_to_local(RemotePurgeId),
431492
#shard{node = TgtNode, range = TgtRange, name = TgtDbName} = TgtShard,
432493
StartSeq =
433494
case couch_db:open_doc(Db, LocalPurgeId, []) of
@@ -741,21 +802,19 @@ update_locals(Target, Db, Seq) ->
741802
{ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
742803

743804
purge_cp_body(#shard{} = Source, #shard{} = Target, PurgeSeq) ->
744-
{Mega, Secs, _} = os:timestamp(),
745-
NowSecs = Mega * 1000000 + Secs,
746805
{[
747-
{<<"type">>, <<"internal_replication">>},
748-
{<<"updated_on">>, NowSecs},
806+
{<<"type">>, ?PURGE_TYPE},
807+
{<<"updated_on">>, os:system_time(second)},
749808
{<<"purge_seq">>, PurgeSeq},
750809
{<<"source">>, atom_to_binary(Source#shard.node, latin1)},
751810
{<<"target">>, atom_to_binary(Target#shard.node, latin1)},
752-
{<<"range">>, Source#shard.range}
811+
{<<"range">>, Target#shard.range}
753812
]}.
754813

755814
find_repl_doc(SrcDb, TgtUUIDPrefix) ->
756815
SrcUUID = couch_db:get_uuid(SrcDb),
757816
S = local_id_hash(SrcUUID),
758-
DocIdPrefix = <<"_local/shard-sync-", S/binary, "-">>,
817+
DocIdPrefix = <<?CHECKPOINT_PREFIX, S/binary, "-">>,
759818
FoldFun = fun(#doc{id = DocId, body = {BodyProps}} = Doc, _) ->
760819
TgtUUID = couch_util:get_value(<<"target_uuid">>, BodyProps, <<>>),
761820
case is_prefix(DocIdPrefix, DocId) of
@@ -802,7 +861,7 @@ find_split_target_seq_int(TgtDb, Node, SrcUUIDPrefix) ->
802861
{ok, not_found}
803862
end
804863
end,
805-
Options = [{start_key, <<"_local/shard-sync-">>}],
864+
Options = [{start_key, <<?CHECKPOINT_PREFIX>>}],
806865
case couch_db:fold_local_docs(TgtDb, FoldFun, not_found, Options) of
807866
{ok, Seqs} when is_list(Seqs) ->
808867
{ok, Seqs};

src/mem3/src/mem3_rpc.erl

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,12 @@ load_purge_infos_rpc(DbName, SrcUUID, BatchSize) ->
189189
case get_or_create_db(DbName, [?ADMIN_CTX]) of
190190
{ok, Db} ->
191191
TgtUUID = couch_db:get_uuid(Db),
192-
PurgeDocId = mem3_rep:make_purge_id(SrcUUID, TgtUUID),
192+
% This is the remote checkpoint running on the target to pull
193+
% purges to the source. The changes are flowing from the target to
194+
% the source, that's why checkpoint is from tgt to src here. This
195+
% is also the same checkpoint used when the target pushed changes
196+
% to the source.
197+
PurgeDocId = mem3_rep:make_purge_id(TgtUUID, SrcUUID),
193198
StartSeq =
194199
case couch_db:open_doc(Db, PurgeDocId, []) of
195200
{ok, #doc{body = {Props}}} ->
@@ -222,19 +227,36 @@ save_purge_checkpoint_rpc(DbName, PurgeDocId, Body) ->
222227
erlang:put(io_priority, {internal_repl, DbName}),
223228
case get_or_create_db(DbName, [?ADMIN_CTX]) of
224229
{ok, Db} ->
225-
Doc = #doc{id = PurgeDocId, body = Body},
226-
Resp =
227-
try couch_db:update_doc(Db, Doc, []) of
228-
Resp0 -> Resp0
229-
catch
230-
T:R ->
231-
{T, R}
232-
end,
233-
rexi:reply(Resp);
230+
case purge_checkpoint_updated(Db, PurgeDocId, Body) of
231+
true ->
232+
% Checkpoint on the target updated while source pulled the
233+
% changes. Do not update the doc then to avoid rewinding
234+
% back.
235+
rexi:reply({ok, stale});
236+
false ->
237+
Doc = #doc{id = PurgeDocId, body = Body},
238+
rexi:reply(
239+
try
240+
couch_db:update_doc(Db, Doc, [])
241+
catch
242+
T:R -> {T, R}
243+
end
244+
)
245+
end;
234246
Error ->
235247
rexi:reply(Error)
236248
end.
237249

250+
purge_checkpoint_updated(Db, DocId, {Props}) when is_binary(DocId), is_list(Props) ->
251+
Seq = couch_util:get_value(<<"purge_seq">>, Props),
252+
case couch_db:open_doc(Db, DocId, []) of
253+
{ok, #doc{body = {DocProps}}} ->
254+
DocSeq = couch_util:get_value(<<"purge_seq">>, DocProps),
255+
is_integer(Seq) andalso is_integer(DocSeq) andalso DocSeq > Seq;
256+
{not_found, _} ->
257+
false
258+
end.
259+
238260
replicate_rpc(DbName, Target) ->
239261
rexi:reply(
240262
try

0 commit comments

Comments
 (0)