Skip to content

Commit a46c12e

Browse files
committed
wip: b4924ee9219475b74b0da53339239c85d6829f30
1 parent e5d21fe commit a46c12e

12 files changed

Lines changed: 580 additions & 7 deletions

packages/cbjs/src/binding.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,7 @@ export interface CppGetAllReplicasResponseEntry {
529529
export interface CppGetAllReplicasRequest {
530530
id: CppDocumentId;
531531
timeout?: number;
532+
read_preference: CppReadPreference;
532533
}
533534
export interface CppUpsertResponse {
534535
// ctx
@@ -572,6 +573,7 @@ export interface CppGetAnyReplicaResponse {
572573
export interface CppGetAnyReplicaRequest {
573574
id: CppDocumentId;
574575
timeout?: number;
576+
read_preference: CppReadPreference;
575577
}
576578
export interface CppAppendResponse {
577579
// ctx
@@ -773,7 +775,7 @@ export interface CppLookupInAllReplicasRequest {
773775
id: CppDocumentId;
774776
specs: CppImplSubdocCommand[];
775777
timeout?: number;
776-
// parent_span
778+
read_preference: CppReadPreference;
777779
}
778780
export interface CppAnalyticsResponse {
779781
// ctx
@@ -1113,7 +1115,7 @@ export interface CppLookupInAnyReplicaRequest {
11131115
id: CppDocumentId;
11141116
specs: CppImplSubdocCommand[];
11151117
timeout?: number;
1116-
// parent_span
1118+
read_preference: CppReadPreference;
11171119
}
11181120
export interface CppMutateInResponse {
11191121
// ctx
@@ -3557,6 +3559,13 @@ export interface CppTransaction {
35573559
callback: (err: CppError | null, result: CppTransactionGetResult | null) => void
35583560
): void;
35593561

3562+
getReplicaFromPreferredServerGroup(
3563+
options: {
3564+
id: CppDocumentId;
3565+
},
3566+
callback: (err: CppError | null, result: CppTransactionGetResult | null) => void
3567+
): void;
3568+
35603569
insert(
35613570
options: {
35623571
id: CppDocumentId;

packages/cbjs/src/bindingutilities.ts

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import binding, {
5050
CppQueryProfile,
5151
CppQueryScanConsistency,
5252
CppRangeScan,
53+
CppReadPreference,
5354
CppReplicateTo,
5455
CppSamplingScan,
5556
CppSearchHighlightStyle,
@@ -171,7 +172,12 @@ import {
171172
EventingFunctionProcessingStatus,
172173
EventingFunctionStatus,
173174
} from './eventingfunctionmanager.js';
174-
import { DurabilityLevel, ServiceType, StoreSemantics } from './generaltypes.js';
175+
import {
176+
DurabilityLevel,
177+
ReadPreference,
178+
ServiceType,
179+
StoreSemantics,
180+
} from './generaltypes.js';
175181
import { MutationState } from './mutationstate.js';
176182
import { QueryProfileMode, QueryScanConsistency } from './querytypes.js';
177183
import { PrefixScan, RangeScan, SamplingScan } from './rangeScan.js';
@@ -1649,3 +1655,23 @@ export function authDomainFromCpp(domain: CppManagementRbacAuthDomain): string {
16491655
}
16501656
throw new InvalidArgumentError('Unrecognized CppManagementRbacAuthDomain.');
16511657
}
1658+
1659+
/**
1660+
* @internal
1661+
*/
1662+
export function readPreferenceToCpp(
1663+
preference: ReadPreference | undefined
1664+
): CppReadPreference {
1665+
// Unspecified is allowed, and means no preference.
1666+
if (preference === null || preference === undefined) {
1667+
return binding.read_preference.no_preference;
1668+
}
1669+
1670+
if (preference === ReadPreference.NoPreference) {
1671+
return binding.read_preference.no_preference;
1672+
} else if (preference === ReadPreference.SelectedServerGroup) {
1673+
return binding.read_preference.selected_server_group;
1674+
}
1675+
1676+
throw new InvalidArgumentError('Unrecognized ReadPreference.');
1677+
}

packages/cbjs/src/cluster.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,12 @@ export type ConnectOptions<T extends CouchbaseClusterTypes = any> = {
259259
*
260260
*/
261261
configProfile?: string;
262+
263+
/**
264+
* Specifies the preferred server group to use for replica operations that specify a non-default
265+
* read preference.
266+
*/
267+
preferredServerGroup?: string;
262268
};
263269

264270
/**
@@ -290,6 +296,7 @@ export class Cluster<in out T extends CouchbaseClusterTypes = DefaultClusterType
290296
private _transactions?: Transactions<T>;
291297
private readonly _openBuckets: Map<BucketName<T>, Promise<void>>;
292298
private _dnsConfig: DnsConfig | null;
299+
private _preferredServerGroup: string | undefined;
293300

294301
/**
295302
* @internal
@@ -438,11 +445,14 @@ export class Cluster<in out T extends CouchbaseClusterTypes = DefaultClusterType
438445
this._bootstrapTimeout = options.timeouts?.bootstrapTimeout;
439446
this._connectTimeout = options.timeouts?.connectTimeout;
440447
this._resolveTimeout = options.timeouts?.resolveTimeout;
441-
442448
this._transcoder = options.transcoder ?? new DefaultTranscoder();
443449
this._queryResultParser = options.queryResultParser ?? JSON.parse;
444450
this._hooks = options.hooks;
445451

452+
if (options.preferredServerGroup) {
453+
this._preferredServerGroup = options.preferredServerGroup;
454+
}
455+
446456
if (options.transactions) {
447457
this._txnConfig = options.transactions;
448458
} else {
@@ -913,6 +923,10 @@ export class Cluster<in out T extends CouchbaseClusterTypes = DefaultClusterType
913923
dsnObj.options.resolve_timeout = this.resolveTimeout.toString();
914924
}
915925

926+
if (this._preferredServerGroup) {
927+
dsnObj.options['server_group'] = this._preferredServerGroup;
928+
}
929+
916930
const connStr = dsnObj.toString();
917931

918932
const authOpts: CppClusterCredentials = {};

packages/cbjs/src/collection.ts

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ import {
6969
errorFromCpp,
7070
mutationStateToCpp,
7171
persistToToCpp,
72+
readPreferenceToCpp,
7273
replicateToToCpp,
7374
scanTypeToCpp,
7475
storeSemanticToCpp,
@@ -112,7 +113,7 @@ import {
112113
DocumentUnretrievableError,
113114
InvalidArgumentError,
114115
} from './errors.js';
115-
import { DurabilityLevel, StoreSemantics } from './generaltypes.js';
116+
import { DurabilityLevel, ReadPreference, StoreSemantics } from './generaltypes.js';
116117
import { MutationState } from './mutationstate.js';
117118
import { CollectionQueryIndexManager } from './queryindexmanager.js';
118119
import { PrefixScan, RangeScan, SamplingScan } from './rangeScan.js';
@@ -289,6 +290,11 @@ export interface GetAnyReplicaOptions<ThrowIfMissing extends boolean = boolean>
289290
* @default true
290291
*/
291292
throwIfMissing?: ThrowIfMissing;
293+
294+
/**
295+
* Specifies how replica nodes will be filtered.
296+
*/
297+
readPreference?: ReadPreference;
292298
}
293299

294300
/**
@@ -304,6 +310,11 @@ export interface GetAllReplicasOptions {
304310
* The timeout for this operation, represented in milliseconds.
305311
*/
306312
timeout?: number;
313+
314+
/**
315+
* Specifies how replica nodes will be filtered.
316+
*/
317+
readPreference?: ReadPreference;
307318
}
308319

309320
/**
@@ -402,6 +413,11 @@ export interface LookupInAnyReplicaOptions<ThrowOnSpecError extends boolean = fa
402413
* @default false
403414
*/
404415
throwOnSpecError?: ThrowOnSpecError;
416+
417+
/**
418+
* Specifies how replica nodes will be filtered.
419+
*/
420+
readPreference?: ReadPreference;
405421
}
406422

407423
/**
@@ -420,6 +436,11 @@ export interface LookupInAllReplicasOptions<ThrowOnSpecError extends boolean = f
420436
* @default false
421437
*/
422438
throwOnSpecError?: ThrowOnSpecError;
439+
440+
/**
441+
* Specifies how replica nodes will be filtered.
442+
*/
443+
readPreference?: ReadPreference;
423444
}
424445

425446
/**
@@ -857,7 +878,11 @@ export class Collection<
857878
private _getReplica(
858879
key: string,
859880
getAllReplicas: boolean,
860-
options: { transcoder?: Transcoder; timeout?: number }
881+
options: {
882+
transcoder?: Transcoder;
883+
timeout?: number;
884+
readPreference?: ReadPreference;
885+
}
861886
): StreamableReplicasPromise<
862887
[GetReplicaResult<unknown>, ...GetReplicaResult<unknown>[]],
863888
GetReplicaResult<unknown>
@@ -872,7 +897,11 @@ export class Collection<
872897
private _getReplica(
873898
key: string,
874899
getAllReplicas: boolean,
875-
options?: { transcoder?: Transcoder; timeout?: number }
900+
options?: {
901+
transcoder?: Transcoder;
902+
timeout?: number;
903+
readPreference?: ReadPreference;
904+
}
876905
): StreamableReplicasPromise<
877906
[GetReplicaResult<unknown>, ...GetReplicaResult<unknown>[]],
878907
GetReplicaResult<unknown>
@@ -893,6 +922,7 @@ export class Collection<
893922
const request = {
894923
id: this.getDocId(key),
895924
timeout: timeout,
925+
read_preference: readPreferenceToCpp(options.readPreference),
896926
};
897927

898928
const getReplicas = getAllReplicas
@@ -2184,6 +2214,7 @@ export class Collection<
21842214
id: this.getDocId(key),
21852215
specs: cppSpecs,
21862216
timeout: timeout,
2217+
read_preference: readPreferenceToCpp(options.readPreference),
21872218
},
21882219
(cppErr, res) => {
21892220
if (cppErr) {

packages/cbjs/src/generaltypes.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,21 @@ export enum StoreSemantics {
109109
*/
110110
Insert = 2,
111111
}
112+
113+
/**
114+
* Represents the various scan consistency options that are available when
115+
* querying against the query service.
116+
*/
117+
export enum ReadPreference {
118+
/**
119+
* Indicates that filtering for replica set should not be enforced.
120+
*/
121+
NoPreference = 'no_preference',
122+
123+
/**
124+
* Indicates that any nodes that do not belong to local group selected during
125+
* cluster instantiation using the `ConnectOptions.preferredServerGroup` option
126+
* should be excluded.
127+
*/
128+
SelectedServerGroup = 'selected_server_group',
129+
}

packages/cbjs/src/transactions.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,16 @@ export interface TransactionGetOptions {
515515
transcoder?: Transcoder;
516516
}
517517

518+
/**
519+
* @category Transactions
520+
*/
521+
export interface TransactionGetReplicaFromPreferredServerGroupOptions {
522+
/**
523+
* Specifies an explicit transcoder to use for this specific operation.
524+
*/
525+
transcoder?: Transcoder;
526+
}
527+
518528
/**
519529
* @category Transactions
520530
*/
@@ -694,6 +704,53 @@ export class TransactionAttemptContext<
694704
}
695705
}
696706

707+
/**
708+
* Retrieves the value of a document from the collection.
709+
*
710+
* @param collection The collection the document lives in.
711+
* @param key The document key to retrieve.
712+
* @param options Optional parameters for this operation.
713+
*/
714+
async getReplicaFromPreferredServerGroup<
715+
LInstance extends AnyCollection,
716+
CKS extends CollectionKeyspace<LInstance>,
717+
const LKey extends KeyspaceDocDef<
718+
T,
719+
CKS['bucket'],
720+
CKS['scope'],
721+
CKS['collection']
722+
>['Key'],
723+
const Key extends KeyspaceDocDef<T, B, S, C>['Key'],
724+
>(
725+
...args: If<
726+
IsNever<Instance>,
727+
[
728+
collection: LInstance,
729+
key: LKey,
730+
options?: TransactionGetReplicaFromPreferredServerGroupOptions,
731+
],
732+
[key: Key, options?: TransactionGetOptions]
733+
>
734+
): Promise<TransactionGetResult> {
735+
const [collection, key, options] = args as [LInstance, LKey, TransactionGetOptions?];
736+
const transcoder = options?.transcoder ?? this.transcoder;
737+
738+
try {
739+
const get = promisify(this._impl.getReplicaFromPreferredServerGroup).bind(
740+
this._impl
741+
);
742+
const id = collection.getDocId(key);
743+
const result = await get({ id });
744+
745+
invariant(result);
746+
747+
return translateGetResult(result, transcoder) as never;
748+
} catch (cppError: unknown) {
749+
const err = errorFromCpp(cppError as CppError);
750+
throw err;
751+
}
752+
}
753+
697754
/**
698755
* Check if a specific document exists in the collection or not.
699756
*

packages/http-client/src/services/cluster/serverFeature.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ export const ServerFeatures = {
4848
ScopeEventingFunctionManagement: 'scope_eventing_function_management',
4949
NotLockedKVStatus: 'kv_not_locked',
5050
BinaryTransactions: 'binary_transactions',
51+
ServerGroups: 'server_groups',
5152
} as const;
5253

5354
export type ServerFeature = (typeof ServerFeatures)[keyof typeof ServerFeatures];
@@ -101,6 +102,8 @@ export function versionSupports(version: string, feature: ServerFeature) {
101102
return gte(version, '7.6.0');
102103
case ServerFeatures.BinaryTransactions:
103104
return gte(version, '7.6.2');
105+
case ServerFeatures.ServerGroups:
106+
return gte(version, '7.6.2');
104107
}
105108

106109
throw new Error(`Unknown feature '${feature}'`);

tests/cbjs/tests/kv.get.replica.spec.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import {
2121
DocumentNotFoundError,
2222
DocumentUnretrievableError,
2323
DurabilityLevel,
24+
ReadPreference,
2425
} from '@cbjsdev/cbjs';
2526
import { getPool } from '@cbjsdev/http-client';
2627
import { ServerFeatures } from '@cbjsdev/http-client';
@@ -207,4 +208,30 @@ describe
207208

208209
expect(result).toBeUndefined();
209210
});
211+
212+
test.runIf(serverSupportsFeatures(ServerFeatures.ServerGroups))(
213+
'should throw DocumentUnretrievableError for getAnyReplica with read preference but no cluster preference',
214+
async ({ expect, serverTestContext, testDocKey }) => {
215+
// the cluster setup does not set a preferred server group, so executing getAnyReplica
216+
// with a ReadPreference should fail.
217+
await expect(() =>
218+
serverTestContext.collection.getAnyReplica(testDocKey, {
219+
readPreference: ReadPreference.SelectedServerGroup,
220+
})
221+
).rejects.toThrow(DocumentUnretrievableError);
222+
}
223+
);
224+
225+
test.runIf(serverSupportsFeatures(ServerFeatures.ServerGroups))(
226+
'should throw DocumentUnretrievableError for getAnyReplica with read preference but no cluster preference',
227+
async ({ expect, serverTestContext, testDocKey }) => {
228+
// the cluster setup does not set a preferred server group, so executing getAllReplicas
229+
// with a ReadPreference should fail.
230+
await expect(() =>
231+
serverTestContext.collection.getAllReplicas(testDocKey, {
232+
readPreference: ReadPreference.SelectedServerGroup,
233+
})
234+
).rejects.toThrow(DocumentUnretrievableError);
235+
}
236+
);
210237
});

0 commit comments

Comments
 (0)