feat(mobile): assets + exif stream sync placeholder ()

* feat(mobile): assets + exif stream sync placeholder

* feat(mobile): assets + exif stream sync placeholder

* refactor

* fix: test

* fix:test

* refactor(mobile): sync stream service ()

* refactor: sync stream to use callbacks

* pr feedback

* pr feedback

* pr feedback

* fix: test

---------

Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com>
Co-authored-by: Alex Tran <alex.tran1502@gmail.com>

---------

Co-authored-by: shenlong <139912620+shenlong-tanwen@users.noreply.github.com>
Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com>
This commit is contained in:
Alex 2025-04-18 14:01:16 -05:00 committed by GitHub
parent bd2deda50c
commit 0e6ac87645
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 666 additions and 608 deletions

View file

@ -1,8 +1,12 @@
import 'package:http/http.dart' as http;
import 'package:immich_mobile/domain/models/sync_event.model.dart'; import 'package:immich_mobile/domain/models/sync_event.model.dart';
import 'package:openapi/api.dart';
abstract interface class ISyncApiRepository { abstract interface class ISyncApiRepository {
Future<void> ack(List<String> data); Future<void> ack(List<String> data);
Stream<List<SyncEvent>> getSyncEvents(List<SyncRequestType> type); Future<void> streamChanges(
Function(List<SyncEvent>, Function() abort) onData, {
int batchSize,
http.Client? httpClient,
});
} }

View file

@ -2,9 +2,17 @@ import 'package:immich_mobile/domain/interfaces/db.interface.dart';
import 'package:openapi/api.dart'; import 'package:openapi/api.dart';
abstract interface class ISyncStreamRepository implements IDatabaseRepository { abstract interface class ISyncStreamRepository implements IDatabaseRepository {
Future<bool> updateUsersV1(Iterable<SyncUserV1> data); Future<void> updateUsersV1(Iterable<SyncUserV1> data);
Future<bool> deleteUsersV1(Iterable<SyncUserDeleteV1> data); Future<void> deleteUsersV1(Iterable<SyncUserDeleteV1> data);
Future<bool> updatePartnerV1(Iterable<SyncPartnerV1> data); Future<void> updatePartnerV1(Iterable<SyncPartnerV1> data);
Future<bool> deletePartnerV1(Iterable<SyncPartnerDeleteV1> data); Future<void> deletePartnerV1(Iterable<SyncPartnerDeleteV1> data);
Future<void> updateAssetsV1(Iterable<SyncAssetV1> data);
Future<void> deleteAssetsV1(Iterable<SyncAssetDeleteV1> data);
Future<void> updateAssetsExifV1(Iterable<SyncAssetExifV1> data);
Future<void> updatePartnerAssetsV1(Iterable<SyncAssetV1> data);
Future<void> deletePartnerAssetsV1(Iterable<SyncAssetDeleteV1> data);
Future<void> updatePartnerAssetsExifV1(Iterable<SyncAssetExifV1> data);
} }

View file

@ -2,25 +2,11 @@
import 'dart:async'; import 'dart:async';
import 'package:collection/collection.dart';
import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart';
import 'package:immich_mobile/domain/models/sync_event.model.dart';
import 'package:logging/logging.dart'; import 'package:logging/logging.dart';
import 'package:openapi/api.dart'; import 'package:openapi/api.dart';
import 'package:worker_manager/worker_manager.dart';
const _kSyncTypeOrder = [
SyncEntityType.userDeleteV1,
SyncEntityType.userV1,
SyncEntityType.partnerDeleteV1,
SyncEntityType.partnerV1,
SyncEntityType.assetDeleteV1,
SyncEntityType.assetV1,
SyncEntityType.assetExifV1,
SyncEntityType.partnerAssetDeleteV1,
SyncEntityType.partnerAssetV1,
SyncEntityType.partnerAssetExifV1,
];
class SyncStreamService { class SyncStreamService {
final Logger _logger = Logger('SyncStreamService'); final Logger _logger = Logger('SyncStreamService');
@ -37,164 +23,70 @@ class SyncStreamService {
_syncStreamRepository = syncStreamRepository, _syncStreamRepository = syncStreamRepository,
_cancelChecker = cancelChecker; _cancelChecker = cancelChecker;
Future<bool> _handleSyncData( bool get isCancelled => _cancelChecker?.call() ?? false;
Future<void> sync() => _syncApiRepository.streamChanges(_handleEvents);
Future<void> _handleEvents(List<SyncEvent> events, Function() abort) async {
List<SyncEvent> items = [];
for (final event in events) {
if (isCancelled) {
_logger.warning("Sync stream cancelled");
abort();
return;
}
if (event.type != items.firstOrNull?.type) {
await _processBatch(items);
}
items.add(event);
}
await _processBatch(items);
}
Future<void> _processBatch(List<SyncEvent> batch) async {
if (batch.isEmpty) {
return;
}
final type = batch.first.type;
await _handleSyncData(type, batch.map((e) => e.data));
await _syncApiRepository.ack([batch.last.ack]);
batch.clear();
}
Future<void> _handleSyncData(
SyncEntityType type, SyncEntityType type,
// ignore: avoid-dynamic // ignore: avoid-dynamic
Iterable<dynamic> data, Iterable<dynamic> data,
) async { ) async {
if (data.isEmpty) {
_logger.warning("Received empty sync data for $type");
return false;
}
_logger.fine("Processing sync data for $type of length ${data.length}"); _logger.fine("Processing sync data for $type of length ${data.length}");
// ignore: prefer-switch-expression
try { switch (type) {
if (type == SyncEntityType.partnerV1) { case SyncEntityType.userV1:
return await _syncStreamRepository.updatePartnerV1(data.cast()); return _syncStreamRepository.updateUsersV1(data.cast());
} case SyncEntityType.userDeleteV1:
return _syncStreamRepository.deleteUsersV1(data.cast());
if (type == SyncEntityType.partnerDeleteV1) { case SyncEntityType.partnerV1:
return await _syncStreamRepository.deletePartnerV1(data.cast()); return _syncStreamRepository.updatePartnerV1(data.cast());
} case SyncEntityType.partnerDeleteV1:
return _syncStreamRepository.deletePartnerV1(data.cast());
if (type == SyncEntityType.userV1) { case SyncEntityType.assetV1:
return await _syncStreamRepository.updateUsersV1(data.cast()); return _syncStreamRepository.updateAssetsV1(data.cast());
} case SyncEntityType.assetDeleteV1:
return _syncStreamRepository.deleteAssetsV1(data.cast());
if (type == SyncEntityType.userDeleteV1) { case SyncEntityType.assetExifV1:
return await _syncStreamRepository.deleteUsersV1(data.cast()); return _syncStreamRepository.updateAssetsExifV1(data.cast());
} case SyncEntityType.partnerAssetV1:
} catch (error, stack) { return _syncStreamRepository.updatePartnerAssetsV1(data.cast());
_logger.severe("Error processing sync data for $type", error, stack); case SyncEntityType.partnerAssetDeleteV1:
return false; return _syncStreamRepository.deletePartnerAssetsV1(data.cast());
case SyncEntityType.partnerAssetExifV1:
return _syncStreamRepository.updatePartnerAssetsExifV1(data.cast());
default:
_logger.warning("Unknown sync data type: $type");
} }
_logger.warning("Unknown sync data type: $type");
return false;
} }
Future<void> _syncEvent(List<SyncRequestType> types) {
_logger.info("Syncing Events: $types");
final streamCompleter = Completer();
bool shouldComplete = false;
// the onDone callback might fire before the events are processed
// the following flag ensures that the onDone callback is not called
// before the events are processed and also that events are processed sequentially
Completer? mutex;
StreamSubscription? subscription;
try {
subscription = _syncApiRepository.getSyncEvents(types).listen(
(events) async {
if (events.isEmpty) {
_logger.warning("Received empty sync events");
return;
}
// If previous events are still being processed, wait for them to finish
if (mutex != null) {
await mutex!.future;
}
if (_cancelChecker?.call() ?? false) {
_logger.info("Sync cancelled, stopping stream");
subscription?.cancel();
if (!streamCompleter.isCompleted) {
streamCompleter.completeError(
CanceledError(),
StackTrace.current,
);
}
return;
}
// Take control of the mutex and process the events
mutex = Completer();
try {
final eventsMap = events.groupListsBy((event) => event.type);
final Map<SyncEntityType, String> acks = {};
for (final type in _kSyncTypeOrder) {
final data = eventsMap[type];
if (data == null) {
continue;
}
if (_cancelChecker?.call() ?? false) {
_logger.info("Sync cancelled, stopping stream");
mutex?.complete();
mutex = null;
if (!streamCompleter.isCompleted) {
streamCompleter.completeError(
CanceledError(),
StackTrace.current,
);
}
return;
}
if (data.isEmpty) {
_logger.warning("Received empty sync events for $type");
continue;
}
if (await _handleSyncData(type, data.map((e) => e.data))) {
// ignore: avoid-unsafe-collection-methods
acks[type] = data.last.ack;
} else {
_logger.warning("Failed to handle sync events for $type");
}
}
if (acks.isNotEmpty) {
await _syncApiRepository.ack(acks.values.toList());
}
_logger.info("$types events processed");
} catch (error, stack) {
_logger.warning("Error handling sync events", error, stack);
} finally {
mutex?.complete();
mutex = null;
}
if (shouldComplete) {
_logger.info("Sync done, completing stream");
if (!streamCompleter.isCompleted) streamCompleter.complete();
}
},
onError: (error, stack) {
_logger.warning("Error in sync stream for $types", error, stack);
// Do not proceed if the stream errors
if (!streamCompleter.isCompleted) {
// ignore: avoid-missing-completer-stack-trace
streamCompleter.completeError(error, stack);
}
},
onDone: () {
_logger.info("$types stream done");
if (mutex == null && !streamCompleter.isCompleted) {
streamCompleter.complete();
} else {
// Marks the stream as done but does not complete the completer
// until the events are processed
shouldComplete = true;
}
},
);
} catch (error, stack) {
_logger.severe("Error starting sync stream", error, stack);
if (!streamCompleter.isCompleted) {
streamCompleter.completeError(error, stack);
}
}
return streamCompleter.future.whenComplete(() {
_logger.info("Sync stream completed");
return subscription?.cancel();
});
}
Future<void> syncUsers() =>
_syncEvent([SyncRequestType.usersV1, SyncRequestType.partnersV1]);
} }

View file

@ -7,31 +7,33 @@ import 'package:immich_mobile/utils/isolate.dart';
import 'package:worker_manager/worker_manager.dart'; import 'package:worker_manager/worker_manager.dart';
class BackgroundSyncManager { class BackgroundSyncManager {
Cancelable<void>? _userSyncTask; Cancelable<void>? _syncTask;
BackgroundSyncManager(); BackgroundSyncManager();
Future<void> cancel() { Future<void> cancel() {
final futures = <Future>[]; final futures = <Future>[];
if (_userSyncTask != null) {
futures.add(_userSyncTask!.future); if (_syncTask != null) {
futures.add(_syncTask!.future);
} }
_userSyncTask?.cancel(); _syncTask?.cancel();
_userSyncTask = null; _syncTask = null;
return Future.wait(futures); return Future.wait(futures);
} }
Future<void> syncUsers() { Future<void> sync() {
if (_userSyncTask != null) { if (_syncTask != null) {
return _userSyncTask!.future; return _syncTask!.future;
} }
_userSyncTask = runInIsolateGentle( _syncTask = runInIsolateGentle(
computation: (ref) => ref.read(syncStreamServiceProvider).syncUsers(), computation: (ref) => ref.read(syncStreamServiceProvider).sync(),
); );
_userSyncTask!.whenComplete(() { _syncTask!.whenComplete(() {
_userSyncTask = null; _syncTask = null;
}); });
return _userSyncTask!.future; return _syncTask!.future;
} }
} }

View file

@ -12,22 +12,22 @@ import 'package:openapi/api.dart';
class SyncApiRepository implements ISyncApiRepository { class SyncApiRepository implements ISyncApiRepository {
final Logger _logger = Logger('SyncApiRepository'); final Logger _logger = Logger('SyncApiRepository');
final ApiService _api; final ApiService _api;
final int _batchSize; SyncApiRepository(this._api);
SyncApiRepository(this._api, {int batchSize = kSyncEventBatchSize})
: _batchSize = batchSize;
@override
Stream<List<SyncEvent>> getSyncEvents(List<SyncRequestType> type) {
return _getSyncStream(SyncStreamDto(types: type));
}
@override @override
Future<void> ack(List<String> data) { Future<void> ack(List<String> data) {
return _api.syncApi.sendSyncAck(SyncAckSetDto(acks: data)); return _api.syncApi.sendSyncAck(SyncAckSetDto(acks: data));
} }
Stream<List<SyncEvent>> _getSyncStream(SyncStreamDto dto) async* { @override
final client = http.Client(); Future<void> streamChanges(
Function(List<SyncEvent>, Function() abort) onData, {
int batchSize = kSyncEventBatchSize,
http.Client? httpClient,
}) async {
// ignore: avoid-unused-assignment
final stopwatch = Stopwatch()..start();
final client = httpClient ?? http.Client();
final endpoint = "${_api.apiClient.basePath}/sync/stream"; final endpoint = "${_api.apiClient.basePath}/sync/stream";
final headers = { final headers = {
@ -35,20 +35,38 @@ class SyncApiRepository implements ISyncApiRepository {
'Accept': 'application/jsonlines+json', 'Accept': 'application/jsonlines+json',
}; };
final queryParams = <QueryParam>[];
final headerParams = <String, String>{}; final headerParams = <String, String>{};
await _api.applyToParams(queryParams, headerParams); await _api.applyToParams([], headerParams);
headers.addAll(headerParams); headers.addAll(headerParams);
final request = http.Request('POST', Uri.parse(endpoint)); final request = http.Request('POST', Uri.parse(endpoint));
request.headers.addAll(headers); request.headers.addAll(headers);
request.body = jsonEncode(dto.toJson()); request.body = jsonEncode(
SyncStreamDto(
types: [
SyncRequestType.usersV1,
SyncRequestType.partnersV1,
SyncRequestType.assetsV1,
SyncRequestType.partnerAssetsV1,
SyncRequestType.assetExifsV1,
SyncRequestType.partnerAssetExifsV1,
],
).toJson(),
);
String previousChunk = ''; String previousChunk = '';
List<String> lines = []; List<String> lines = [];
bool shouldAbort = false;
void abort() {
_logger.warning("Abort requested, stopping sync stream");
shouldAbort = true;
}
try { try {
final response = await client.send(request); final response =
await client.send(request).timeout(const Duration(seconds: 20));
if (response.statusCode != 200) { if (response.statusCode != 200) {
final errorBody = await response.stream.bytesToString(); final errorBody = await response.stream.bytesToString();
@ -59,27 +77,38 @@ class SyncApiRepository implements ISyncApiRepository {
} }
await for (final chunk in response.stream.transform(utf8.decoder)) { await for (final chunk in response.stream.transform(utf8.decoder)) {
if (shouldAbort) {
break;
}
previousChunk += chunk; previousChunk += chunk;
final parts = previousChunk.toString().split('\n'); final parts = previousChunk.toString().split('\n');
previousChunk = parts.removeLast(); previousChunk = parts.removeLast();
lines.addAll(parts); lines.addAll(parts);
if (lines.length < _batchSize) { if (lines.length < batchSize) {
continue; continue;
} }
yield _parseSyncResponse(lines); await onData(_parseLines(lines), abort);
lines.clear(); lines.clear();
} }
} finally {
if (lines.isNotEmpty) { if (lines.isNotEmpty && !shouldAbort) {
yield _parseSyncResponse(lines); await onData(_parseLines(lines), abort);
} }
} catch (error, stack) {
_logger.severe("error processing stream", error, stack);
return Future.error(error, stack);
} finally {
client.close(); client.close();
} }
stopwatch.stop();
_logger
.info("Remote Sync completed in ${stopwatch.elapsed.inMilliseconds}ms");
} }
List<SyncEvent> _parseSyncResponse(List<String> lines) { List<SyncEvent> _parseLines(List<String> lines) {
final List<SyncEvent> data = []; final List<SyncEvent> data = [];
for (final line in lines) { for (final line in lines) {
@ -110,4 +139,10 @@ const _kResponseMap = <SyncEntityType, Function(dynamic)>{
SyncEntityType.userDeleteV1: SyncUserDeleteV1.fromJson, SyncEntityType.userDeleteV1: SyncUserDeleteV1.fromJson,
SyncEntityType.partnerV1: SyncPartnerV1.fromJson, SyncEntityType.partnerV1: SyncPartnerV1.fromJson,
SyncEntityType.partnerDeleteV1: SyncPartnerDeleteV1.fromJson, SyncEntityType.partnerDeleteV1: SyncPartnerDeleteV1.fromJson,
SyncEntityType.assetV1: SyncAssetV1.fromJson,
SyncEntityType.assetDeleteV1: SyncAssetDeleteV1.fromJson,
SyncEntityType.assetExifV1: SyncAssetExifV1.fromJson,
SyncEntityType.partnerAssetV1: SyncAssetV1.fromJson,
SyncEntityType.partnerAssetDeleteV1: SyncAssetDeleteV1.fromJson,
SyncEntityType.partnerAssetExifV1: SyncAssetExifV1.fromJson,
}; };

View file

@ -1,4 +1,5 @@
import 'package:drift/drift.dart'; import 'package:drift/drift.dart';
import 'package:flutter/foundation.dart';
import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart';
import 'package:immich_mobile/extensions/string_extensions.dart'; import 'package:immich_mobile/extensions/string_extensions.dart';
import 'package:immich_mobile/infrastructure/entities/partner.entity.drift.dart'; import 'package:immich_mobile/infrastructure/entities/partner.entity.drift.dart';
@ -15,7 +16,7 @@ class DriftSyncStreamRepository extends DriftDatabaseRepository
DriftSyncStreamRepository(super.db) : _db = db; DriftSyncStreamRepository(super.db) : _db = db;
@override @override
Future<bool> deleteUsersV1(Iterable<SyncUserDeleteV1> data) async { Future<void> deleteUsersV1(Iterable<SyncUserDeleteV1> data) async {
try { try {
await _db.batch((batch) { await _db.batch((batch) {
for (final user in data) { for (final user in data) {
@ -25,15 +26,14 @@ class DriftSyncStreamRepository extends DriftDatabaseRepository
); );
} }
}); });
return true; } catch (error, stack) {
} catch (e, s) { _logger.severe('Error while processing SyncUserDeleteV1', error, stack);
_logger.severe('Error while processing SyncUserDeleteV1', e, s); rethrow;
return false;
} }
} }
@override @override
Future<bool> updateUsersV1(Iterable<SyncUserV1> data) async { Future<void> updateUsersV1(Iterable<SyncUserV1> data) async {
try { try {
await _db.batch((batch) { await _db.batch((batch) {
for (final user in data) { for (final user in data) {
@ -49,15 +49,14 @@ class DriftSyncStreamRepository extends DriftDatabaseRepository
); );
} }
}); });
return true; } catch (error, stack) {
} catch (e, s) { _logger.severe('Error while processing SyncUserV1', error, stack);
_logger.severe('Error while processing SyncUserV1', e, s); rethrow;
return false;
} }
} }
@override @override
Future<bool> deletePartnerV1(Iterable<SyncPartnerDeleteV1> data) async { Future<void> deletePartnerV1(Iterable<SyncPartnerDeleteV1> data) async {
try { try {
await _db.batch((batch) { await _db.batch((batch) {
for (final partner in data) { for (final partner in data) {
@ -70,15 +69,14 @@ class DriftSyncStreamRepository extends DriftDatabaseRepository
); );
} }
}); });
return true;
} catch (e, s) { } catch (e, s) {
_logger.severe('Error while processing SyncPartnerDeleteV1', e, s); _logger.severe('Error while processing SyncPartnerDeleteV1', e, s);
return false; rethrow;
} }
} }
@override @override
Future<bool> updatePartnerV1(Iterable<SyncPartnerV1> data) async { Future<void> updatePartnerV1(Iterable<SyncPartnerV1> data) async {
try { try {
await _db.batch((batch) { await _db.batch((batch) {
for (final partner in data) { for (final partner in data) {
@ -95,10 +93,42 @@ class DriftSyncStreamRepository extends DriftDatabaseRepository
); );
} }
}); });
return true;
} catch (e, s) { } catch (e, s) {
_logger.severe('Error while processing SyncPartnerV1', e, s); _logger.severe('Error while processing SyncPartnerV1', e, s);
return false; rethrow;
} }
} }
// Assets
@override
Future<void> updateAssetsV1(Iterable<SyncAssetV1> data) async {
debugPrint("updateAssetsV1 - ${data.length}");
}
@override
Future<void> deleteAssetsV1(Iterable<SyncAssetDeleteV1> data) async {
debugPrint("deleteAssetsV1 - ${data.length}");
}
// Partner Assets
@override
Future<void> updatePartnerAssetsV1(Iterable<SyncAssetV1> data) async {
debugPrint("updatePartnerAssetsV1 - ${data.length}");
}
@override
Future<void> deletePartnerAssetsV1(Iterable<SyncAssetDeleteV1> data) async {
debugPrint("deletePartnerAssetsV1 - ${data.length}");
}
// EXIF
@override
Future<void> updateAssetsExifV1(Iterable<SyncAssetExifV1> data) async {
debugPrint("updateAssetsExifV1 - ${data.length}");
}
@override
Future<void> updatePartnerAssetsExifV1(Iterable<SyncAssetExifV1> data) async {
debugPrint("updatePartnerAssetsExifV1 - ${data.length}");
}
} }

View file

@ -1,11 +1,13 @@
import 'package:auto_route/auto_route.dart'; import 'package:auto_route/auto_route.dart';
import 'package:easy_localization/easy_localization.dart'; import 'package:easy_localization/easy_localization.dart';
import 'package:flutter/foundation.dart';
import 'package:flutter/material.dart'; import 'package:flutter/material.dart';
import 'package:flutter_svg/svg.dart'; import 'package:flutter_svg/svg.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/extensions/build_context_extensions.dart'; import 'package:immich_mobile/extensions/build_context_extensions.dart';
import 'package:immich_mobile/models/backup/backup_state.model.dart'; import 'package:immich_mobile/models/backup/backup_state.model.dart';
import 'package:immich_mobile/models/server_info/server_info.model.dart'; import 'package:immich_mobile/models/server_info/server_info.model.dart';
import 'package:immich_mobile/providers/background_sync.provider.dart';
import 'package:immich_mobile/providers/backup/backup.provider.dart'; import 'package:immich_mobile/providers/backup/backup.provider.dart';
import 'package:immich_mobile/providers/server_info.provider.dart'; import 'package:immich_mobile/providers/server_info.provider.dart';
import 'package:immich_mobile/providers/user.provider.dart'; import 'package:immich_mobile/providers/user.provider.dart';
@ -178,6 +180,11 @@ class ImmichAppBar extends ConsumerWidget implements PreferredSizeWidget {
child: action, child: action,
), ),
), ),
if (kDebugMode)
IconButton(
onPressed: () => ref.read(backgroundSyncProvider).sync(),
icon: const Icon(Icons.sync),
),
if (showUploadButton) if (showUploadButton)
Padding( Padding(
padding: const EdgeInsets.only(right: 20), padding: const EdgeInsets.only(right: 20),

View file

@ -2,3 +2,5 @@ import 'package:mocktail/mocktail.dart';
import 'package:openapi/api.dart'; import 'package:openapi/api.dart';
class MockAssetsApi extends Mock implements AssetsApi {} class MockAssetsApi extends Mock implements AssetsApi {}
class MockSyncApi extends Mock implements SyncApi {}

View file

@ -1,4 +1,4 @@
// ignore_for_file: avoid-unnecessary-futures, avoid-async-call-in-sync-function // ignore_for_file: avoid-declaring-call-method, avoid-unnecessary-futures
import 'dart:async'; import 'dart:async';
@ -8,16 +8,22 @@ import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart';
import 'package:immich_mobile/domain/models/sync_event.model.dart'; import 'package:immich_mobile/domain/models/sync_event.model.dart';
import 'package:immich_mobile/domain/services/sync_stream.service.dart'; import 'package:immich_mobile/domain/services/sync_stream.service.dart';
import 'package:mocktail/mocktail.dart'; import 'package:mocktail/mocktail.dart';
import 'package:openapi/api.dart';
import 'package:worker_manager/worker_manager.dart';
import '../../fixtures/sync_stream.stub.dart'; import '../../fixtures/sync_stream.stub.dart';
import '../../infrastructure/repository.mock.dart'; import '../../infrastructure/repository.mock.dart';
class _AbortCallbackWrapper {
const _AbortCallbackWrapper();
bool call() => false;
}
class _MockAbortCallbackWrapper extends Mock implements _AbortCallbackWrapper {}
class _CancellationWrapper { class _CancellationWrapper {
const _CancellationWrapper(); const _CancellationWrapper();
bool isCancelled() => false; bool call() => false;
} }
class _MockCancellationWrapper extends Mock implements _CancellationWrapper {} class _MockCancellationWrapper extends Mock implements _CancellationWrapper {}
@ -26,35 +32,26 @@ void main() {
late SyncStreamService sut; late SyncStreamService sut;
late ISyncStreamRepository mockSyncStreamRepo; late ISyncStreamRepository mockSyncStreamRepo;
late ISyncApiRepository mockSyncApiRepo; late ISyncApiRepository mockSyncApiRepo;
late StreamController<List<SyncEvent>> streamController; late Function(List<SyncEvent>, Function()) handleEventsCallback;
late _MockAbortCallbackWrapper mockAbortCallbackWrapper;
successHandler(Invocation _) async => true; successHandler(Invocation _) async => true;
failureHandler(Invocation _) async => false;
setUp(() { setUp(() {
mockSyncStreamRepo = MockSyncStreamRepository(); mockSyncStreamRepo = MockSyncStreamRepository();
mockSyncApiRepo = MockSyncApiRepository(); mockSyncApiRepo = MockSyncApiRepository();
streamController = StreamController<List<SyncEvent>>.broadcast(); mockAbortCallbackWrapper = _MockAbortCallbackWrapper();
sut = SyncStreamService( when(() => mockAbortCallbackWrapper()).thenReturn(false);
syncApiRepository: mockSyncApiRepo,
syncStreamRepository: mockSyncStreamRepo,
);
// Default stream setup - emits one batch and closes when(() => mockSyncApiRepo.streamChanges(any()))
when(() => mockSyncApiRepo.getSyncEvents(any())) .thenAnswer((invocation) async {
.thenAnswer((_) => streamController.stream); // ignore: avoid-unsafe-collection-methods
handleEventsCallback = invocation.positionalArguments.first;
});
// Default ack setup
when(() => mockSyncApiRepo.ack(any())).thenAnswer((_) async => {}); when(() => mockSyncApiRepo.ack(any())).thenAnswer((_) async => {});
// Register fallbacks for mocktail verification
registerFallbackValue(<SyncUserV1>[]);
registerFallbackValue(<SyncPartnerV1>[]);
registerFallbackValue(<SyncUserDeleteV1>[]);
registerFallbackValue(<SyncPartnerDeleteV1>[]);
// Default successful repository calls
when(() => mockSyncStreamRepo.updateUsersV1(any())) when(() => mockSyncStreamRepo.updateUsersV1(any()))
.thenAnswer(successHandler); .thenAnswer(successHandler);
when(() => mockSyncStreamRepo.deleteUsersV1(any())) when(() => mockSyncStreamRepo.deleteUsersV1(any()))
@ -63,381 +60,163 @@ void main() {
.thenAnswer(successHandler); .thenAnswer(successHandler);
when(() => mockSyncStreamRepo.deletePartnerV1(any())) when(() => mockSyncStreamRepo.deletePartnerV1(any()))
.thenAnswer(successHandler); .thenAnswer(successHandler);
when(() => mockSyncStreamRepo.updateAssetsV1(any()))
.thenAnswer(successHandler);
when(() => mockSyncStreamRepo.deleteAssetsV1(any()))
.thenAnswer(successHandler);
when(() => mockSyncStreamRepo.updateAssetsExifV1(any()))
.thenAnswer(successHandler);
when(() => mockSyncStreamRepo.updatePartnerAssetsV1(any()))
.thenAnswer(successHandler);
when(() => mockSyncStreamRepo.deletePartnerAssetsV1(any()))
.thenAnswer(successHandler);
when(() => mockSyncStreamRepo.updatePartnerAssetsExifV1(any()))
.thenAnswer(successHandler);
sut = SyncStreamService(
syncApiRepository: mockSyncApiRepo,
syncStreamRepository: mockSyncStreamRepo,
);
}); });
tearDown(() async { Future<void> simulateEvents(List<SyncEvent> events) async {
if (!streamController.isClosed) { await sut.sync();
await streamController.close(); await handleEventsCallback(events, mockAbortCallbackWrapper.call);
}
});
// Helper to trigger sync and add events to the stream
Future<void> triggerSyncAndEmit(List<SyncEvent> events) async {
final future = sut.syncUsers(); // Start listening
await Future.delayed(Duration.zero); // Allow listener to attach
if (!streamController.isClosed) {
streamController.add(events);
await streamController.close(); // Close after emitting
}
await future; // Wait for processing to complete
} }
group("SyncStreamService", () { group("SyncStreamService - _handleEvents", () {
test( test(
"completes successfully when stream emits data and handlers succeed", "processes events and acks successfully when handlers succeed",
() async { () async {
final events = [ final events = [
...SyncStreamStub.userEvents, SyncStreamStub.userDeleteV1,
...SyncStreamStub.partnerEvents, SyncStreamStub.userV1Admin,
SyncStreamStub.userV1User,
SyncStreamStub.partnerDeleteV1,
SyncStreamStub.partnerV1,
]; ];
final future = triggerSyncAndEmit(events);
await expectLater(future, completes); await simulateEvents(events);
// Verify ack includes last ack from each successfully handled type
verify( verifyInOrder([
() => () => mockSyncStreamRepo.deleteUsersV1(any()),
mockSyncApiRepo.ack(any(that: containsAll(["5", "2", "4", "3"]))), () => mockSyncApiRepo.ack(["2"]),
).called(1); () => mockSyncStreamRepo.updateUsersV1(any()),
() => mockSyncApiRepo.ack(["5"]),
() => mockSyncStreamRepo.deletePartnerV1(any()),
() => mockSyncApiRepo.ack(["4"]),
() => mockSyncStreamRepo.updatePartnerV1(any()),
() => mockSyncApiRepo.ack(["3"]),
]);
verifyNever(() => mockAbortCallbackWrapper());
}, },
); );
test("completes successfully when stream emits an error", () async { test("processes final batch correctly", () async {
when(() => mockSyncApiRepo.getSyncEvents(any()))
.thenAnswer((_) => Stream.error(Exception("Stream Error")));
// Should complete gracefully without throwing
await expectLater(sut.syncUsers(), throwsException);
verifyNever(() => mockSyncApiRepo.ack(any())); // No ack on stream error
});
test("throws when initial getSyncEvents call fails", () async {
final apiException = Exception("API Error");
when(() => mockSyncApiRepo.getSyncEvents(any())).thenThrow(apiException);
// Should rethrow the exception from the initial call
await expectLater(sut.syncUsers(), throwsA(apiException));
verifyNever(() => mockSyncApiRepo.ack(any()));
});
test(
"completes successfully when a repository handler throws an exception",
() async {
when(() => mockSyncStreamRepo.updateUsersV1(any()))
.thenThrow(Exception("Repo Error"));
final events = [
...SyncStreamStub.userEvents,
...SyncStreamStub.partnerEvents,
];
// Should complete, but ack only for the successful types
await triggerSyncAndEmit(events);
// Only partner delete was successful by default setup
verify(() => mockSyncApiRepo.ack(["2", "4", "3"])).called(1);
},
);
test(
"completes successfully but sends no ack when all handlers fail",
() async {
when(() => mockSyncStreamRepo.updateUsersV1(any()))
.thenAnswer(failureHandler);
when(() => mockSyncStreamRepo.deleteUsersV1(any()))
.thenAnswer(failureHandler);
when(() => mockSyncStreamRepo.updatePartnerV1(any()))
.thenAnswer(failureHandler);
when(() => mockSyncStreamRepo.deletePartnerV1(any()))
.thenAnswer(failureHandler);
final events = [
...SyncStreamStub.userEvents,
...SyncStreamStub.partnerEvents,
];
await triggerSyncAndEmit(events);
verifyNever(() => mockSyncApiRepo.ack(any()));
},
);
test("sends ack only for types where handler returns true", () async {
// Mock specific handlers: user update fails, user delete succeeds
when(() => mockSyncStreamRepo.updateUsersV1(any()))
.thenAnswer(failureHandler);
when(() => mockSyncStreamRepo.deleteUsersV1(any()))
.thenAnswer(successHandler);
// partner update fails, partner delete succeeds
when(() => mockSyncStreamRepo.updatePartnerV1(any()))
.thenAnswer(failureHandler);
final events = [ final events = [
...SyncStreamStub.userEvents, SyncStreamStub.userDeleteV1,
...SyncStreamStub.partnerEvents, SyncStreamStub.userV1Admin,
]; ];
await triggerSyncAndEmit(events);
// Expect ack only for userDeleteV1 (ack: "2") and partnerDeleteV1 (ack: "4") await simulateEvents(events);
verify(() => mockSyncApiRepo.ack(any(that: containsAll(["2", "4"]))))
.called(1); verifyInOrder([
() => mockSyncStreamRepo.deleteUsersV1(any()),
() => mockSyncApiRepo.ack(["2"]),
() => mockSyncStreamRepo.updateUsersV1(any()),
() => mockSyncApiRepo.ack(["1"]),
]);
verifyNever(() => mockAbortCallbackWrapper());
}); });
test("does not process or ack when stream emits an empty list", () async { test("does not process or ack when event list is empty", () async {
final future = sut.syncUsers(); await simulateEvents([]);
streamController.add([]); // Emit empty list
await streamController.close();
await future; // Wait for completion
verifyNever(() => mockSyncStreamRepo.updateUsersV1(any())); verifyNever(() => mockSyncStreamRepo.updateUsersV1(any()));
verifyNever(() => mockSyncStreamRepo.deleteUsersV1(any())); verifyNever(() => mockSyncStreamRepo.deleteUsersV1(any()));
verifyNever(() => mockSyncStreamRepo.updatePartnerV1(any())); verifyNever(() => mockSyncStreamRepo.updatePartnerV1(any()));
verifyNever(() => mockSyncStreamRepo.deletePartnerV1(any())); verifyNever(() => mockSyncStreamRepo.deletePartnerV1(any()));
verifyNever(() => mockAbortCallbackWrapper());
verifyNever(() => mockSyncApiRepo.ack(any())); verifyNever(() => mockSyncApiRepo.ack(any()));
}); });
test("processes multiple batches sequentially using mutex", () async { test("aborts and stops processing if cancelled during iteration", () async {
final completer1 = Completer<void>(); final cancellationChecker = _MockCancellationWrapper();
final completer2 = Completer<void>(); when(() => cancellationChecker()).thenReturn(false);
int callOrder = 0;
int handler1StartOrder = -1;
int handler2StartOrder = -1;
int handler1Calls = 0;
int handler2Calls = 0;
when(() => mockSyncStreamRepo.updateUsersV1(any())).thenAnswer((_) async { sut = SyncStreamService(
handler1Calls++; syncApiRepository: mockSyncApiRepo,
handler1StartOrder = ++callOrder; syncStreamRepository: mockSyncStreamRepo,
await completer1.future; cancelChecker: cancellationChecker.call,
return true; );
}); await sut.sync();
when(() => mockSyncStreamRepo.updatePartnerV1(any()))
.thenAnswer((_) async { final events = [
handler2Calls++; SyncStreamStub.userDeleteV1,
handler2StartOrder = ++callOrder; SyncStreamStub.userV1Admin,
await completer2.future; SyncStreamStub.partnerDeleteV1,
return true; ];
when(() => mockSyncStreamRepo.deleteUsersV1(any())).thenAnswer((_) async {
when(() => cancellationChecker()).thenReturn(true);
}); });
final batch1 = SyncStreamStub.userEvents; await handleEventsCallback(events, mockAbortCallbackWrapper.call);
final batch2 = SyncStreamStub.partnerEvents;
final syncFuture = sut.syncUsers(); verify(() => mockSyncStreamRepo.deleteUsersV1(any())).called(1);
await pumpEventQueue(); verifyNever(() => mockSyncStreamRepo.updateUsersV1(any()));
verifyNever(() => mockSyncStreamRepo.deletePartnerV1(any()));
streamController.add(batch1); verify(() => mockAbortCallbackWrapper()).called(1);
await pumpEventQueue();
// Small delay to ensure the first handler starts
await Future.delayed(const Duration(milliseconds: 20));
expect(handler1StartOrder, 1, reason: "Handler 1 should start first"); verify(() => mockSyncApiRepo.ack(["2"])).called(1);
expect(handler1Calls, 1);
streamController.add(batch2);
await pumpEventQueue();
// Small delay
await Future.delayed(const Duration(milliseconds: 20));
expect(handler2StartOrder, -1, reason: "Handler 2 should wait");
expect(handler2Calls, 0);
completer1.complete();
await pumpEventQueue(times: 40);
// Small delay to ensure the second handler starts
await Future.delayed(const Duration(milliseconds: 20));
expect(handler2StartOrder, 2, reason: "Handler 2 should start after H1");
expect(handler2Calls, 1);
completer2.complete();
await pumpEventQueue(times: 40);
// Small delay before closing the stream
await Future.delayed(const Duration(milliseconds: 20));
if (!streamController.isClosed) {
await streamController.close();
}
await pumpEventQueue(times: 40);
// Small delay to ensure the sync completes
await Future.delayed(const Duration(milliseconds: 20));
await syncFuture;
verify(() => mockSyncStreamRepo.updateUsersV1(any())).called(1);
verify(() => mockSyncStreamRepo.updatePartnerV1(any())).called(1);
verify(() => mockSyncApiRepo.ack(any())).called(2);
}); });
test( test(
"stops processing and ack when cancel checker is completed", "aborts and stops processing if cancelled before processing batch",
() async { () async {
final cancellationChecker = _MockCancellationWrapper(); final cancellationChecker = _MockCancellationWrapper();
when(() => cancellationChecker.isCancelled()).thenAnswer((_) => false); when(() => cancellationChecker()).thenReturn(false);
final processingCompleter = Completer<void>();
bool handler1Started = false;
when(() => mockSyncStreamRepo.deleteUsersV1(any()))
.thenAnswer((_) async {
handler1Started = true;
return processingCompleter.future;
});
sut = SyncStreamService( sut = SyncStreamService(
syncApiRepository: mockSyncApiRepo, syncApiRepository: mockSyncApiRepo,
syncStreamRepository: mockSyncStreamRepo, syncStreamRepository: mockSyncStreamRepo,
cancelChecker: cancellationChecker.isCancelled, cancelChecker: cancellationChecker.call,
); );
final processingCompleter = Completer<void>(); await sut.sync();
bool handlerStarted = false;
// Make handler wait so we can cancel it mid-flight
when(() => mockSyncStreamRepo.deleteUsersV1(any()))
.thenAnswer((_) async {
handlerStarted = true;
await processingCompleter
.future; // Wait indefinitely until test completes it
return true;
});
final syncFuture = sut.syncUsers();
await pumpEventQueue(times: 30);
streamController.add(SyncStreamStub.userEvents);
// Ensure processing starts
await Future.delayed(const Duration(milliseconds: 10));
expect(handlerStarted, isTrue, reason: "Handler should have started");
when(() => cancellationChecker.isCancelled()).thenAnswer((_) => true);
// Allow cancellation logic to propagate
await Future.delayed(const Duration(milliseconds: 10));
// Complete the handler's completer after cancellation signal
// to ensure the cancellation logic itself isn't blocked by the handler.
processingCompleter.complete();
await expectLater(syncFuture, throwsA(isA<CanceledError>()));
// Verify that ack was NOT called because processing was cancelled
verifyNever(() => mockSyncApiRepo.ack(any()));
},
);
test("completes successfully when ack call throws an exception", () async {
when(() => mockSyncApiRepo.ack(any())).thenThrow(Exception("Ack Error"));
final events = [
...SyncStreamStub.userEvents,
...SyncStreamStub.partnerEvents,
];
// Should still complete even if ack fails
await triggerSyncAndEmit(events);
verify(() => mockSyncApiRepo.ack(any()))
.called(1); // Verify ack was attempted
});
test("waits for processing to finish if onDone called early", () async {
final processingCompleter = Completer<void>();
bool handlerFinished = false;
when(() => mockSyncStreamRepo.updateUsersV1(any())).thenAnswer((_) async {
await processingCompleter.future; // Wait inside handler
handlerFinished = true;
return true;
});
final syncFuture = sut.syncUsers();
// Allow listener to attach
// This is necessary to ensure the stream is ready to receive events
await Future.delayed(Duration.zero);
streamController.add(SyncStreamStub.userEvents); // Emit batch
await Future.delayed(
const Duration(milliseconds: 10),
); // Ensure processing starts
await streamController
.close(); // Close stream (triggers onDone internally)
await Future.delayed(
const Duration(milliseconds: 10),
); // Give onDone a chance to fire
// At this point, onDone was called, but processing is blocked
expect(handlerFinished, isFalse);
processingCompleter.complete(); // Allow processing to finish
await syncFuture; // Now the main future should complete
expect(handlerFinished, isTrue);
verify(() => mockSyncApiRepo.ack(any())).called(1);
});
test("processes events in the defined _kSyncTypeOrder", () async {
final future = sut.syncUsers();
await pumpEventQueue();
if (!streamController.isClosed) {
final events = [ final events = [
SyncEvent( SyncStreamStub.userDeleteV1,
type: SyncEntityType.partnerV1, SyncStreamStub.userV1Admin,
data: SyncStreamStub.partnerV1, SyncStreamStub.partnerDeleteV1,
ack: "1",
), // Should be processed last
SyncEvent(
type: SyncEntityType.userV1,
data: SyncStreamStub.userV1Admin,
ack: "2",
), // Should be processed second
SyncEvent(
type: SyncEntityType.partnerDeleteV1,
data: SyncStreamStub.partnerDeleteV1,
ack: "3",
), // Should be processed third
SyncEvent(
type: SyncEntityType.userDeleteV1,
data: SyncStreamStub.userDeleteV1,
ack: "4",
), // Should be processed first
]; ];
streamController.add(events); final processingFuture =
await streamController.close(); handleEventsCallback(events, mockAbortCallbackWrapper.call);
} await pumpEventQueue();
await future;
verifyInOrder([ expect(handler1Started, isTrue);
() => mockSyncStreamRepo.deleteUsersV1(any()),
() => mockSyncStreamRepo.updateUsersV1(any()),
() => mockSyncStreamRepo.deletePartnerV1(any()),
() => mockSyncStreamRepo.updatePartnerV1(any()),
// Verify ack happens after all processing
() => mockSyncApiRepo.ack(any()),
]);
});
});
group("syncUsers", () { // Signal cancellation while handler 1 is waiting
test("calls getSyncEvents with correct types", () async { when(() => cancellationChecker()).thenReturn(true);
// Need to close the stream for the future to complete await pumpEventQueue();
final future = sut.syncUsers();
await streamController.close();
await future;
verify( processingCompleter.complete();
() => mockSyncApiRepo.getSyncEvents([ await processingFuture;
SyncRequestType.usersV1,
SyncRequestType.partnersV1,
]),
).called(1);
});
test("calls repository methods with correctly grouped data", () async { verifyNever(() => mockSyncStreamRepo.updateUsersV1(any()));
final events = [
...SyncStreamStub.userEvents,
...SyncStreamStub.partnerEvents,
];
await triggerSyncAndEmit(events);
// Verify each handler was called with the correct list of data payloads verify(() => mockSyncApiRepo.ack(["2"])).called(1);
verify( },
() => mockSyncStreamRepo.updateUsersV1( );
[SyncStreamStub.userV1Admin, SyncStreamStub.userV1User],
),
).called(1);
verify(
() => mockSyncStreamRepo.deleteUsersV1([SyncStreamStub.userDeleteV1]),
).called(1);
verify(
() => mockSyncStreamRepo.updatePartnerV1([SyncStreamStub.partnerV1]),
).called(1);
verify(
() => mockSyncStreamRepo
.deletePartnerV1([SyncStreamStub.partnerDeleteV1]),
).called(1);
});
}); });
} }

View file

@ -2,44 +2,44 @@ import 'package:immich_mobile/domain/models/sync_event.model.dart';
import 'package:openapi/api.dart'; import 'package:openapi/api.dart';
abstract final class SyncStreamStub { abstract final class SyncStreamStub {
static final userV1Admin = SyncUserV1( static final userV1Admin = SyncEvent(
deletedAt: DateTime(2020), type: SyncEntityType.userV1,
email: "admin@admin", data: SyncUserV1(
id: "1", deletedAt: DateTime(2020),
name: "Admin", email: "admin@admin",
); id: "1",
static final userV1User = SyncUserV1( name: "Admin",
deletedAt: DateTime(2021),
email: "user@user",
id: "2",
name: "User",
);
static final userDeleteV1 = SyncUserDeleteV1(userId: "2");
static final userEvents = [
SyncEvent(type: SyncEntityType.userV1, data: userV1Admin, ack: "1"),
SyncEvent(
type: SyncEntityType.userDeleteV1,
data: userDeleteV1,
ack: "2",
), ),
SyncEvent(type: SyncEntityType.userV1, data: userV1User, ack: "5"), ack: "1",
]; );
static final userV1User = SyncEvent(
type: SyncEntityType.userV1,
data: SyncUserV1(
deletedAt: DateTime(2021),
email: "user@user",
id: "5",
name: "User",
),
ack: "5",
);
static final userDeleteV1 = SyncEvent(
type: SyncEntityType.userDeleteV1,
data: SyncUserDeleteV1(userId: "2"),
ack: "2",
);
static final partnerV1 = SyncPartnerV1( static final partnerV1 = SyncEvent(
inTimeline: true, type: SyncEntityType.partnerV1,
sharedById: "1", data: SyncPartnerV1(
sharedWithId: "2", inTimeline: true,
); sharedById: "1",
static final partnerDeleteV1 = SyncPartnerDeleteV1( sharedWithId: "2",
sharedById: "3",
sharedWithId: "4",
);
static final partnerEvents = [
SyncEvent(
type: SyncEntityType.partnerDeleteV1,
data: partnerDeleteV1,
ack: "4",
), ),
SyncEvent(type: SyncEntityType.partnerV1, data: partnerV1, ack: "3"), ack: "3",
]; );
static final partnerDeleteV1 = SyncEvent(
type: SyncEntityType.partnerDeleteV1,
data: SyncPartnerDeleteV1(sharedById: "3", sharedWithId: "4"),
ack: "4",
);
} }

View file

@ -0,0 +1,299 @@
import 'dart:async';
import 'dart:convert';
import 'package:flutter_test/flutter_test.dart';
import 'package:http/http.dart' as http;
import 'package:immich_mobile/domain/models/sync_event.model.dart';
import 'package:immich_mobile/infrastructure/repositories/sync_api.repository.dart';
import 'package:mocktail/mocktail.dart';
import 'package:openapi/api.dart';
import '../../api.mocks.dart';
import '../../service.mocks.dart';
class MockHttpClient extends Mock implements http.Client {}
class MockApiClient extends Mock implements ApiClient {}
class MockStreamedResponse extends Mock implements http.StreamedResponse {}
class FakeBaseRequest extends Fake implements http.BaseRequest {}
String _createJsonLine(String type, Map<String, dynamic> data, String ack) {
return '${jsonEncode({'type': type, 'data': data, 'ack': ack})}\n';
}
void main() {
late SyncApiRepository sut;
late MockApiService mockApiService;
late MockApiClient mockApiClient;
late MockSyncApi mockSyncApi;
late MockHttpClient mockHttpClient;
late MockStreamedResponse mockStreamedResponse;
late StreamController<List<int>> responseStreamController;
late int testBatchSize = 3;
setUp(() {
mockApiService = MockApiService();
mockApiClient = MockApiClient();
mockSyncApi = MockSyncApi();
mockHttpClient = MockHttpClient();
mockStreamedResponse = MockStreamedResponse();
responseStreamController =
StreamController<List<int>>.broadcast(sync: true);
registerFallbackValue(FakeBaseRequest());
when(() => mockApiService.apiClient).thenReturn(mockApiClient);
when(() => mockApiService.syncApi).thenReturn(mockSyncApi);
when(() => mockApiClient.basePath).thenReturn('http://demo.immich.app/api');
when(() => mockApiService.applyToParams(any(), any()))
.thenAnswer((_) async => {});
// Mock HTTP client behavior
when(() => mockHttpClient.send(any()))
.thenAnswer((_) async => mockStreamedResponse);
when(() => mockStreamedResponse.statusCode).thenReturn(200);
when(() => mockStreamedResponse.stream)
.thenAnswer((_) => http.ByteStream(responseStreamController.stream));
when(() => mockHttpClient.close()).thenAnswer((_) => {});
sut = SyncApiRepository(mockApiService);
});
tearDown(() async {
if (!responseStreamController.isClosed) {
await responseStreamController.close();
}
});
Future<void> streamChanges(
Function(List<SyncEvent>, Function() abort) onDataCallback,
) {
return sut.streamChanges(
onDataCallback,
batchSize: testBatchSize,
httpClient: mockHttpClient,
);
}
test('streamChanges stops processing stream when abort is called', () async {
int onDataCallCount = 0;
bool abortWasCalledInCallback = false;
List<SyncEvent> receivedEventsBatch1 = [];
onDataCallback(List<SyncEvent> events, Function() abort) {
onDataCallCount++;
if (onDataCallCount == 1) {
receivedEventsBatch1 = events;
abort();
abortWasCalledInCallback = true;
} else {
fail("onData called more than once after abort was invoked");
}
}
final streamChangesFuture = streamChanges(onDataCallback);
await pumpEventQueue();
for (int i = 0; i < testBatchSize; i++) {
responseStreamController.add(
utf8.encode(
_createJsonLine(
SyncEntityType.userDeleteV1.toString(),
SyncUserDeleteV1(userId: "user$i").toJson(),
'ack$i',
),
),
);
}
for (int i = testBatchSize; i < testBatchSize * 2; i++) {
responseStreamController.add(
utf8.encode(
_createJsonLine(
SyncEntityType.userDeleteV1.toString(),
SyncUserDeleteV1(userId: "user$i").toJson(),
'ack$i',
),
),
);
}
await responseStreamController.close();
await expectLater(streamChangesFuture, completes);
expect(onDataCallCount, 1);
expect(abortWasCalledInCallback, isTrue);
expect(receivedEventsBatch1.length, testBatchSize);
verify(() => mockHttpClient.close()).called(1);
});
test(
'streamChanges does not process remaining lines in finally block if aborted',
() async {
int onDataCallCount = 0;
bool abortWasCalledInCallback = false;
onDataCallback(List<SyncEvent> events, Function() abort) {
onDataCallCount++;
if (onDataCallCount == 1) {
abort();
abortWasCalledInCallback = true;
} else {
fail("onData called more than once after abort was invoked");
}
}
final streamChangesFuture = streamChanges(onDataCallback);
await pumpEventQueue();
for (int i = 0; i < testBatchSize; i++) {
responseStreamController.add(
utf8.encode(
_createJsonLine(
SyncEntityType.userDeleteV1.toString(),
SyncUserDeleteV1(userId: "user$i").toJson(),
'ack$i',
),
),
);
}
// emit a single event to skip batching and trigger finally
responseStreamController.add(
utf8.encode(
_createJsonLine(
SyncEntityType.userDeleteV1.toString(),
SyncUserDeleteV1(userId: "user100").toJson(),
'ack100',
),
),
);
await responseStreamController.close();
await expectLater(streamChangesFuture, completes);
expect(onDataCallCount, 1);
expect(abortWasCalledInCallback, isTrue);
verify(() => mockHttpClient.close()).called(1);
},
);
test(
'streamChanges processes remaining lines in finally block if not aborted',
() async {
int onDataCallCount = 0;
List<SyncEvent> receivedEventsBatch1 = [];
List<SyncEvent> receivedEventsBatch2 = [];
onDataCallback(List<SyncEvent> events, Function() _) {
onDataCallCount++;
if (onDataCallCount == 1) {
receivedEventsBatch1 = events;
} else if (onDataCallCount == 2) {
receivedEventsBatch2 = events;
} else {
fail("onData called more than expected");
}
}
final streamChangesFuture = streamChanges(onDataCallback);
await pumpEventQueue();
// Batch 1
for (int i = 0; i < testBatchSize; i++) {
responseStreamController.add(
utf8.encode(
_createJsonLine(
SyncEntityType.userDeleteV1.toString(),
SyncUserDeleteV1(userId: "user$i").toJson(),
'ack$i',
),
),
);
}
// Partial Batch 2
responseStreamController.add(
utf8.encode(
_createJsonLine(
SyncEntityType.userDeleteV1.toString(),
SyncUserDeleteV1(userId: "user100").toJson(),
'ack100',
),
),
);
await responseStreamController.close();
await expectLater(streamChangesFuture, completes);
expect(onDataCallCount, 2);
expect(receivedEventsBatch1.length, testBatchSize);
expect(receivedEventsBatch2.length, 1);
verify(() => mockHttpClient.close()).called(1);
},
);
test('streamChanges handles stream error gracefully', () async {
final streamError = Exception("Network Error");
int onDataCallCount = 0;
onDataCallback(List<SyncEvent> events, Function() _) {
onDataCallCount++;
}
final streamChangesFuture = streamChanges(onDataCallback);
await pumpEventQueue();
responseStreamController.add(
utf8.encode(
_createJsonLine(
SyncEntityType.userDeleteV1.toString(),
SyncUserDeleteV1(userId: "user1").toJson(),
'ack1',
),
),
);
responseStreamController.addError(streamError);
await expectLater(streamChangesFuture, throwsA(streamError));
expect(onDataCallCount, 0);
verify(() => mockHttpClient.close()).called(1);
});
test('streamChanges throws ApiException on non-200 status code', () async {
when(() => mockStreamedResponse.statusCode).thenReturn(401);
final errorBodyController = StreamController<List<int>>(sync: true);
when(() => mockStreamedResponse.stream)
.thenAnswer((_) => http.ByteStream(errorBodyController.stream));
int onDataCallCount = 0;
onDataCallback(List<SyncEvent> events, Function() _) {
onDataCallCount++;
}
final future = streamChanges(onDataCallback);
errorBodyController.add(utf8.encode('{"error":"Unauthorized"}'));
await errorBodyController.close();
await expectLater(
future,
throwsA(
isA<ApiException>()
.having((e) => e.code, 'code', 401)
.having((e) => e.message, 'message', contains('Unauthorized')),
),
);
expect(onDataCallCount, 0);
verify(() => mockHttpClient.close()).called(1);
});
}