Skip to content

Commit 6ce6f6b

Browse files
feat(fdc): Streaming implementation for data connect (#18174)
* feat(fdc): Happy Path Implementation (#18151) * Initial Commit * Fix formatting and analyzer warnings * Fix tests and licenses * Denver feedback: var initialization best practice * sorted keys for id generation * Fix analyze info messages * feat(fdc): Handle disconnects and reconnects (#18157) * Handle disconnects and reconnects * Address gemini review comments * Reconnection logic * Fixes to reconnect on startup * Fix to decode binary data from server * Handle gemini feedback * Update rest_transport mocks with new windows provider sig * feat(fdc): Hardening (#18173) * debug logging and hardering * Hardening: ensure only one multicast stream controller is created in a ref * Handle cancellations * Formatting fix * Cynthia feedback (final, ...) * Gemini feedback: checkTransport single instancing. * Gemini feedback: cancel and nullify authSubscription in disconnect * Reverting authSubscription nullifying This affects reconnects. Taking tech debt instead. * Handle low priority gemini feedback * Gemini feedback: optimized operation id creation initialized as a lazy var that is used everywhere instead of computing always * Remove _queryId and use the computed operationId * Cynthia feedback on sending empty message to server * Remove grpc transport since its no longer used. * Update tests to incorporate operationId * Remove authToken appCheckToken fields. * Remove metadata file
1 parent 55a7f6f commit 6ce6f6b

24 files changed

Lines changed: 1209 additions & 378 deletions
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
include: package:flutter_lints/flutter.yaml

packages/firebase_data_connect/firebase_data_connect/lib/src/cache/cache.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class Cache {
5151

5252
String _constructCacheIdentifier() {
5353
final rawPrefix =
54-
'${_settings.storage}-${dataConnect.app.options.projectId}-${dataConnect.app.name}-${dataConnect.connectorConfig.serviceId}-${dataConnect.connectorConfig.connector}-${dataConnect.connectorConfig.location}-${dataConnect.transport.transportOptions.host}';
54+
'${_settings.storage}-${dataConnect.app.options.projectId}-${dataConnect.app.name}-${dataConnect.connectorConfig.serviceId}-${dataConnect.connectorConfig.connector}-${dataConnect.connectorConfig.location}-${dataConnect.transport?.transportOptions.host}';
5555
final prefixSha = convertToSha256(rawPrefix);
5656
final rawSuffix = dataConnect.auth?.currentUser?.uid ?? 'anon';
5757
final suffixSha = convertToSha256(rawSuffix);

packages/firebase_data_connect/firebase_data_connect/lib/src/cache/cache_data_types.dart

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -332,11 +332,11 @@ class EntityNode {
332332
srcListMap.forEach((key, value) {
333333
List<EntityNode> enodeList = [];
334334
List<dynamic> jsonList = value as List<dynamic>;
335-
jsonList.forEach((jsonObj) {
335+
for (var jsonObj in jsonList) {
336336
Map<String, dynamic> jmap = jsonObj as Map<String, dynamic>;
337337
EntityNode en = EntityNode.fromJson(jmap, cacheProvider);
338338
enodeList.add(en);
339-
});
339+
}
340340
objLists?[key] = enodeList;
341341
});
342342
}
@@ -367,9 +367,9 @@ class EntityNode {
367367
if (nestedObjectLists != null) {
368368
nestedObjectLists!.forEach((key, edoList) {
369369
List<Map<String, dynamic>> jsonList = [];
370-
edoList.forEach((edo) {
370+
for (var edo in edoList) {
371371
jsonList.add(edo.toJson(mode: mode));
372-
});
372+
}
373373
jsonData[key] = jsonList;
374374
});
375375
}
@@ -396,9 +396,9 @@ class EntityNode {
396396
Map<String, dynamic> nestedObjectListsJson = {};
397397
nestedObjectLists!.forEach((key, edoList) {
398398
List<Map<String, dynamic>> jsonList = [];
399-
edoList.forEach((edo) {
399+
for (var edo in edoList) {
400400
jsonList.add(edo.toJson(mode: mode));
401-
});
401+
}
402402
nestedObjectListsJson[key] = jsonList;
403403
});
404404
jsonData[listsKey] = nestedObjectListsJson;

packages/firebase_data_connect/firebase_data_connect/lib/src/common/common_library.dart

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ abstract class DataConnectTransport {
102102

103103
/// Invokes corresponding query endpoint.
104104
Future<ServerResponse> invokeQuery<Data, Variables>(
105+
String operationId,
105106
String queryName,
106107
Deserializer<Data> deserializer,
107108
Serializer<Variables> serializer,
@@ -111,6 +112,17 @@ abstract class DataConnectTransport {
111112

112113
/// Invokes corresponding mutation endpoint.
113114
Future<ServerResponse> invokeMutation<Data, Variables>(
115+
String operationId,
116+
String queryName,
117+
Deserializer<Data> deserializer,
118+
Serializer<Variables> serializer,
119+
Variables? vars,
120+
String? token,
121+
);
122+
123+
/// Invokes corresponding stream query endpoint.
124+
Stream<ServerResponse> invokeStreamQuery<Data, Variables>(
125+
String operationId,
114126
String queryName,
115127
Deserializer<Data> deserializer,
116128
Serializer<Variables> serializer,

packages/firebase_data_connect/firebase_data_connect/lib/src/common/dataconnect_error.dart

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,7 @@ class DataConnectError extends FirebaseException {
3636

3737
/// Error thrown when an operation is partially successful.
3838
class DataConnectOperationError<T> extends DataConnectError {
39-
DataConnectOperationError(
40-
DataConnectErrorCode code, String message, this.response)
41-
: super(code, message);
39+
DataConnectOperationError(super.code, super.message, this.response);
4240
final DataConnectOperationFailureResponse<T> response;
4341
}
4442

packages/firebase_data_connect/firebase_data_connect/lib/src/core/ref.dart

Lines changed: 153 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -51,17 +51,51 @@ abstract class OperationRef<Data, Variables> {
5151
this.serializer,
5252
this.variables,
5353
);
54-
Variables? variables;
55-
String operationName;
56-
DataConnectTransport _transport;
57-
Deserializer<Data> deserializer;
58-
Serializer<Variables> serializer;
54+
final Variables? variables;
55+
final String operationName;
56+
final DataConnectTransport _transport;
57+
final Deserializer<Data> deserializer;
58+
final Serializer<Variables> serializer;
5959
String? _lastToken;
6060

61-
FirebaseDataConnect dataConnect;
61+
final FirebaseDataConnect dataConnect;
62+
63+
late final String operationId =
64+
createOperationId(operationName, variables, serializer);
6265

63-
Future<OperationResult<Data, Variables>> execute(
64-
{QueryFetchPolicy fetchPolicy = QueryFetchPolicy.preferCache});
66+
static dynamic _sortKeys(dynamic value) {
67+
if (value is Map) {
68+
final sortedMap = <String, dynamic>{};
69+
final sortedKeys = value.keys.toList()..sort();
70+
for (final key in sortedKeys) {
71+
sortedMap[key.toString()] = _sortKeys(value[key]);
72+
}
73+
return sortedMap;
74+
} else if (value is List) {
75+
return value.map(_sortKeys).toList();
76+
}
77+
return value;
78+
}
79+
80+
static String createOperationId<Variables>(String operationName,
81+
Variables? vars, Serializer<Variables>? serializer) {
82+
if (vars != null && serializer != null) {
83+
try {
84+
final decoded = jsonDecode(serializer(vars));
85+
final sortedStr = jsonEncode(_sortKeys(decoded));
86+
final hashVars = convertToSha256(sortedStr);
87+
return '$operationName::$hashVars';
88+
} catch (_) {
89+
final rawVars = serializer(vars);
90+
final hashVars = convertToSha256(rawVars);
91+
return '$operationName::$hashVars';
92+
}
93+
} else {
94+
return operationName;
95+
}
96+
}
97+
98+
Future<OperationResult<Data, Variables>> execute();
6599

66100
Future<bool> _shouldRetry() async {
67101
String? newToken;
@@ -152,7 +186,7 @@ class QueryManager {
152186
try {
153187
await queryRef.execute(fetchPolicy: QueryFetchPolicy.cacheOnly);
154188
} catch (e) {
155-
log('Error executing impacted query $e');
189+
log('Error executing impacted query $queryId $e');
156190
}
157191
}
158192
}
@@ -175,24 +209,20 @@ class QueryManager {
175209
StreamController<QueryResult<Data, Variables>> addQuery<Data, Variables>(
176210
QueryRef<Data, Variables> ref,
177211
) {
178-
final queryId = ref._queryId;
212+
final queryId = ref.operationId;
179213
trackedQueries[queryId] = ref;
180214

181215
final streamController =
182-
StreamController<QueryResult<Data, Variables>>.broadcast();
216+
StreamController<QueryResult<Data, Variables>>.broadcast(
217+
onCancel: () {
218+
trackedQueries.remove(queryId);
219+
ref._onAllSubscribersCancelled();
220+
},
221+
);
183222

184223
return streamController;
185224
}
186225

187-
static String createQueryId<QueryVariables>(String queryName,
188-
QueryVariables? vars, Serializer<QueryVariables> varSerializer) {
189-
if (vars != null) {
190-
return '$queryName::${varSerializer(vars)}';
191-
} else {
192-
return queryName;
193-
}
194-
}
195-
196226
void dispose() {
197227
_impactedQueriesSubscription?.cancel();
198228
}
@@ -216,7 +246,7 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
216246
variables,
217247
);
218248

219-
QueryManager _queryManager;
249+
final QueryManager _queryManager;
220250

221251
@override
222252
Future<QueryResult<Data, Variables>> execute(
@@ -239,9 +269,6 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
239269
}
240270
}
241271

242-
String get _queryId =>
243-
QueryManager.createQueryId(operationName, variables, serializer);
244-
245272
Future<QueryResult<Data, Variables>> _executeFromCache(
246273
QueryFetchPolicy fetchPolicy) async {
247274
if (dataConnect.cacheManager == null) {
@@ -251,7 +278,7 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
251278
final cacheManager = dataConnect.cacheManager!;
252279
bool allowStale = fetchPolicy ==
253280
QueryFetchPolicy.cacheOnly; //if its cache only, we always allow stale
254-
final cachedData = await cacheManager.resultTree(_queryId, allowStale);
281+
final cachedData = await cacheManager.resultTree(operationId, allowStale);
255282

256283
if (cachedData != null) {
257284
try {
@@ -280,6 +307,7 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
280307
try {
281308
ServerResponse serverResponse =
282309
await _transport.invokeQuery<Data, Variables>(
310+
operationId,
283311
operationName,
284312
deserializer,
285313
serializer,
@@ -288,7 +316,7 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
288316
);
289317

290318
if (dataConnect.cacheManager != null) {
291-
await dataConnect.cacheManager!.update(_queryId, serverResponse);
319+
await dataConnect.cacheManager!.update(operationId, serverResponse);
292320
}
293321
Data typedData = _convertBodyJsonToData(serverResponse.data);
294322

@@ -307,22 +335,109 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
307335
}
308336

309337
StreamController<QueryResult<Data, Variables>>? _streamController;
338+
Stream<ServerResponse>? _serverStream;
339+
StreamSubscription<ServerResponse>? _serverStreamSubscription;
340+
341+
void _onAllSubscribersCancelled() {
342+
_serverStreamSubscription?.cancel();
343+
_serverStreamSubscription = null;
344+
_serverStream = null;
345+
log("QueryRef $operationId: All subscribers cancelled. Unsubscribed from server stream.");
346+
}
310347

311348
Stream<QueryResult<Data, Variables>> subscribe() {
312349
_streamController ??= _queryManager.addQuery(this);
313350

314-
execute();
351+
final stream =
352+
_streamController!.stream.cast<QueryResult<Data, Variables>>();
353+
354+
// Return the stream to the caller, then execute fetches
355+
Future.microtask(() async {
356+
if (dataConnect.cacheManager != null) {
357+
try {
358+
await _executeFromCache(QueryFetchPolicy.cacheOnly);
359+
} catch (err) {
360+
log("Error fetching from cache during subscribe $err");
361+
// Ignore cache misses here, server stream will provide latest data
362+
}
363+
}
364+
365+
// Initiate Web Socket stream only if not already streaming
366+
if (_serverStream == null) {
367+
_streamFromServer();
368+
}
369+
});
370+
371+
return stream;
372+
}
373+
374+
void _streamFromServer() async {
375+
bool shouldRetry = await _shouldRetry();
376+
log("QueryRef $operationId _streamFromServer loop started.");
377+
try {
378+
_serverStream = _transport.invokeStreamQuery<Data, Variables>(
379+
operationId,
380+
operationName,
381+
deserializer,
382+
serializer,
383+
variables,
384+
_lastToken,
385+
);
315386

316-
return _streamController!.stream.cast<QueryResult<Data, Variables>>();
387+
_serverStreamSubscription = _serverStream!.listen(
388+
(serverResponse) async {
389+
log("QueryRef $operationId _streamFromServer loop received snapshot.");
390+
if (dataConnect.cacheManager != null) {
391+
try {
392+
await dataConnect.cacheManager!
393+
.update(operationId, serverResponse);
394+
} catch (e) {
395+
log("QueryRef $operationId _streamFromServer loop cache update failed: $e");
396+
}
397+
}
398+
Data typedData = _convertBodyJsonToData(serverResponse.data);
399+
400+
QueryResult<Data, Variables> res =
401+
QueryResult(dataConnect, typedData, DataSource.server, this);
402+
publishResultToStream(res);
403+
},
404+
onError: (e) {
405+
_serverStreamSubscription?.cancel();
406+
_serverStreamSubscription = null;
407+
_serverStream = null;
408+
409+
if (shouldRetry &&
410+
e is DataConnectError &&
411+
e.code == DataConnectErrorCode.unauthorized.toString()) {
412+
_streamFromServer();
413+
} else {
414+
publishErrorToStream(e);
415+
}
416+
},
417+
onDone: () {
418+
_serverStreamSubscription?.cancel();
419+
_serverStreamSubscription = null;
420+
_serverStream = null;
421+
},
422+
);
423+
} catch (e) {
424+
_serverStreamSubscription?.cancel();
425+
_serverStreamSubscription = null;
426+
_serverStream = null;
427+
log("QueryRef $operationId _streamFromServer loop Unknown loop failure: $e");
428+
publishErrorToStream(e);
429+
}
317430
}
318431

319432
void publishResultToStream(QueryResult<Data, Variables> result) {
320433
if (_streamController != null) {
321434
_streamController?.add(result);
435+
} else {
436+
log("QueryRef $operationId _streamFromServer loop _streamController is null");
322437
}
323438
}
324439

325-
void publishErrorToStream(Error err) {
440+
void publishErrorToStream(Object err) {
326441
if (_streamController != null) {
327442
_streamController?.addError(err);
328443
}
@@ -331,24 +446,16 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
331446

332447
class MutationRef<Data, Variables> extends OperationRef<Data, Variables> {
333448
MutationRef(
334-
FirebaseDataConnect dataConnect,
335-
String operationName,
336-
DataConnectTransport transport,
337-
Deserializer<Data> deserializer,
338-
Serializer<Variables> serializer,
339-
Variables? variables,
340-
) : super(
341-
dataConnect,
342-
operationName,
343-
transport,
344-
deserializer,
345-
serializer,
346-
variables,
347-
);
449+
super.dataConnect,
450+
super.operationName,
451+
super.transport,
452+
super.deserializer,
453+
super.serializer,
454+
super.variables,
455+
);
348456

349457
@override
350-
Future<OperationResult<Data, Variables>> execute(
351-
{QueryFetchPolicy fetchPolicy = QueryFetchPolicy.serverOnly}) async {
458+
Future<OperationResult<Data, Variables>> execute() async {
352459
bool shouldRetry = await _shouldRetry();
353460
try {
354461
// Logic below is duplicated due to the fact that `executeOperation` returns
@@ -370,6 +477,7 @@ class MutationRef<Data, Variables> extends OperationRef<Data, Variables> {
370477
) async {
371478
ServerResponse serverResponse =
372479
await _transport.invokeMutation<Data, Variables>(
480+
operationId,
373481
operationName,
374482
deserializer,
375483
serializer,

0 commit comments

Comments
 (0)